Building a synchronous redis server

To make this example short and easy to follow, our Redis clone will implement a very small subset of the RESP protocol and will be able to process only SET and GET calls. We'll use the official redis-cli that comes with the official Redis package to make queries against our server. To use the redis-cli, we can install Redis on Ubuntu by running apt-get install redis-server.

Let's create a new project by running cargo new rudis_sync and adding the following dependencies in our Cargo.toml file:

rudis_sync/Cargo.toml

[dependencies]
lazy_static = "1.2.0"
resp = { git = "https://github.com/creativcoder/resp" }

We have named our project rudis_sync. We depend on two crates:

  • lazy_static: We'll use this to store our in-memory database.
  • resp: This is a forked crate that resides on my GitHub repository. We'll use this to parse the stream of bytes from the client.

To make the implementation easier to follow, rudis_sync has very minimal error-handling integration. When you are done experimenting with the code, you are encouraged to implement better error-handling strategies.

Let's start with the contents of our main.rs file:

// rudis_sync/src/main.rs

use lazy_static::lazy_static;
use resp::Decoder;
use std::collections::HashMap;
use std::env;
use std::io::{BufReader, Write};
use std::net::Shutdown;
use std::net::{TcpListener, TcpStream};
use std::sync::Mutex;
use std::thread;

mod commands;
use crate::commands::process_client_request;

type STORE = Mutex<HashMap<String, String>>;

lazy_static! {
static ref RUDIS_DB: STORE = Mutex::new(HashMap::new());
}

fn main() {
let addr = env::args()
.skip(1)
.next()
.unwrap_or("127.0.0.1:6378".to_owned());

let listener = TcpListener::bind(&addr).unwrap();
println!("rudis_sync listening on {} ...", addr);

for stream in listener.incoming() {
let stream = stream.unwrap();
println!("New connection from: {:?}", stream);
handle_client(stream);
}
}

We have a bunch of imports, followed by an in-memory RUDIS_DB hashmap that's declared in a lazy_static! block. We are using this as an in-memory database to store key and value pairs that are sent by clients. In our main function, we create a listening address in addr from the user-provided argument or use 127.0.0.1:6378 as the default. We then create a TcpListener instance by calling the associated bind method, passing the addr. This creates a TCP listening socket. Later, we call the incoming method on listener, which then returns an iterator of new client connections. For each client connection stream that is of the TcpStream type (a client socket), we call the handle_client function, passing in the stream.

In the same file, the handle_client function is responsible for parsing queries that are sent from the client, which would be one of the GET or SET queries:

// rudis_sync/src/main.rs

fn handle_client(stream: TcpStream) {
let mut stream = BufReader::new(stream);
let decoder = Decoder::new(&mut stream).decode();
match decoder {
Ok(v) => {
let reply = process_client_request(v);
stream.get_mut().write_all(&reply).unwrap();
}
Err(e) => {
println!("Invalid command: {:?}", e);
let _ = stream.get_mut().shutdown(Shutdown::Both);
}
};
}

The handle_client function receives the client TcpStream socket in the stream variable. We wrap our client stream in a BufReader, which is then passed as a mutable reference to the Decoder::new method from the resp crate. The Decoder reads bytes from the stream to create a RESP Value type. We then have a match block to check whether our decoding succeeded. If it fails, we print an error message and close the socket by calling shutdown() and requesting both the reader part and writer part of our client socket connection to be closed with the Shutdown::Both value. The shutdown method needs a mutable reference, so we call get_mut() before that. In a real-world implementation, you obviously need to handle this error gracefully.

If the decoding succeeds, we call process_client_request, which returns the reply to send back to the client. We write this reply to the client by calling write_all on the client stream. The process_client_request function is defined in commands.rs as follows:

// rudis_sync/src/commands.rs

use crate::RUDIS_DB;
use resp::Value;

pub fn process_client_request(decoded_msg: Value) -> Vec<u8> {
let reply = if let Value::Array(v) = decoded_msg {
match &v[0] {
Value::Bulk(ref s) if s == "GET" || s == "get" => handle_get(v),
Value::Bulk(ref s) if s == "SET" || s == "set" => handle_set(v),
other => unimplemented!("{:?} is not supported as of now", other),
}
} else {
Err(Value::Error("Invalid Command".to_string()))
};

match reply {
Ok(r) | Err(r) => r.encode(),
}
}

This function takes the decoded Value and matches it on the parsed query. In our implementation, we expect the client to send an array of bulk strings, so we match on the Value::Array variant of Value, using if let, and store the array in v. If we match as an Array value in the if branch, we take that array and match on the first entry in v, which will be our command type, that is, GET or SET. This is again a Value::Bulk variant that wraps the command as a string.

We take the reference to the inner string as s and match only if the string has a GET or SET as a value. In the case of GET, we call handle_get, passing the v array, and in the case of SET, we call handle_set. In the else branch, we simply send a Value::Error reply to the client with invalid Command as the description.

The value that's returned by both branches is assigned to the reply variable. It is then matched for the inner type r and turned into Vec<u8> by invoking the encode method on it, which is then returned from the function.

Our handle_set and handle_get functions are defined in the same file as follows:

// rudis_sync/src/commands.rs

use crate::RUDIS_DB;
use resp::Value;

pub fn handle_get(v: Vec<Value>) -> Result<Value, Value> {
let v = v.iter().skip(1).collect::<Vec<_>>();
if v.is_empty() {
return Err(Value::Error("Expected 1 argument for GET command".to_string()))
}
let db_ref = RUDIS_DB.lock().unwrap();
let reply = if let Value::Bulk(ref s) = &v[0] {
db_ref.get(s).map(|e| Value::Bulk(e.to_string())).unwrap_or(Value::Null)
} else {
Value::Null
};
Ok(reply)
}

pub fn handle_set(v: Vec<Value>) -> Result<Value, Value> {
let v = v.iter().skip(1).collect::<Vec<_>>();
if v.is_empty() || v.len() < 2 {
return Err(Value::Error("Expected 2 arguments for SET command".to_string()))
}
match (&v[0], &v[1]) {
(Value::Bulk(k), Value::Bulk(v)) => {
let _ = RUDIS_DB
.lock()
.unwrap()
.insert(k.to_string(), v.to_string());
}
_ => unimplemented!("SET not implemented for {:?}", v),
}

Ok(Value::String("OK".to_string()))
}

In handle_get(), we first check whether the GET command has no key present in the query and fails with an error message. Next, we match on v[0], which is the key for the GET command, and check whether it exists in our database. If it exists, we wrap it in Value::Bulk using the map combinator, otherwise we return a Value::Null reply:

db_ref.get(s).map(|e| Value::Bulk(e.to_string())).unwrap_or(Value::Null)

We then store it in a reply variable and return it as a Result type, that is, Ok(reply).

A similar thing happens in handle_set, where we bail out if we don't have enough arguments to the SET command. Next, we match on our key and value using &v[0] and &v[1] and insert it into RUDIS_DB. As an acknowledgement of the SET query., we reply with Ok.

Back in our process_client_request function, once we create the reply bytes, we match on the Result type and convert them into a Vec<u8> by calling encode(), which is then written to the client. With that walk-through out of the way, it's time to test our client with the official redis-cli tool. We'll run it by invoking redis-cli -p 6378:

In the preceding session, we did a few GET and SET queries with an expected reply from rudis_sync. Also, here's our output log from the rudis_server of our new connection(s):

But the problem with our server is that we have to wait until the initial client has finished being served. To demonstrate this, we'll introduce a bit of delay in our for loop that handles new client connections:

    for stream in listener.incoming() {
let stream = stream.unwrap();
println!("New connection from: {:?}", stream);
handle_client(stream);
thread::sleep(Duration::from_millis(3000));
}

The sleep call simulates a delay in request processing. To see the latencies, we'll start two clients at almost the same time, where one of them makes a SET request and the other one makes a GET request on the same key. Here's our first client, which does the SET request:

Here's our second client, which does a GET request on the same key, foo:

As you can see, the second client had to wait for almost three seconds to get the second GET reply.

Due to its nature, the synchronous mode becomes a bottleneck when you need to process more than 100,000 (say) clients at the same time, with each client taking varying amounts of processing time. To get around this, you usually need to spawn a thread for handling each client connection. Whenever a new client connection is made, we spawn a new thread and offload the handle_client invocation from the main thread, allowing the main thread to accept other client connections. We can achieve this by using a single line change in our main function, like so:

    for stream in listener.incoming() {
let stream = stream.unwrap();
println!("New connection from: {:?}", stream);
thread::spawn(|| handle_client(stream));

}

This removes the blocking nature of our server, but introduces the overhead of spawning a new thread every time a new client connection is received. First, there is an overhead of spawning threads and, second, the context switch time between threads adds another overhead.

As we can see, our rudis_sync server works as expected. But it will soon be bottlenecked by the amount of threads our machine can handle. This threading model of handling connections worked well until the internet began gaining a wider audience and more and more clients connecting to the internet became the norm. Today, however, things are different and we need highly efficient servers that can handle millions of requests at the same time. It turns out that we can tackle the problem of handling more clients on a more foundational level, that is, by using non-blocking sockets. Let's explore them next.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset