Building an asynchronous redis server

Now that we're familiar with the asynchronous I/O solutions that the Rust ecosystem provides, it's time to revisit our Redis server implementation. We'll port our rudis_sync server to the asynchronous version using the tokio and futures crates. As with any asynchronous code, using futures and tokio can be daunting at first, and it can take time getting used to its API. However, We'll try to make things easy to understand here. Let's start by creating our project by running cargo new rudis_async with the following dependencies in Cargo.toml:

# rudis_async/Cargo.toml

[dependencies]
tokio = "0.1.13"
futures = "0.1.25"
lazy_static = "1.2.0"
resp = { git = "https://github.com/creativcoder/resp" }
tokio-codec = "0.1.1"
bytes = "0.4.11"

We are using a bunch of crates here:

  • futures: Provides a cleaner abstraction for dealing with async code
  • tokio: Encapsulates mio and provides a runtime for running asynchronous code
  • lazy_static: Allows us to create a dynamic global variable that can be mutated
  • resp: A crate that can parse Redis protocol messages
  • tokio-codec: This allows you to convert a stream of bytes from the network into a given type, which is parsed as a definite message according to the specified codec. A codec converts stream of bytes into a parsed message termed as a Frame in the tokio ecosystem.
  • bytes: This is used with the tokio codec to efficiently convert a stream of bytes into a given Frame

Our initial code in main.rs follows a similar structure:

// rudis_async/src/main.rs

mod codec;
use crate::codec::RespCodec;

use lazy_static::lazy_static;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Mutex;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::prelude::*;
use tokio_codec::Decoder;
use std::env;

mod commands;
use crate::commands::process_client_request;

lazy_static! {
static ref RUDIS_DB: Mutex<HashMap<String, String>> = Mutex::new(HashMap::new());
}

We have a bunch of imports and the same RUDIS_DB in a lazy_static! block. We then have our function main:

// rudis_async/main.rs

fn main() -> Result<(), Box<std::error::Error>> {
let addr = env::args()
.skip(1)
.next()
.unwrap_or("127.0.0.1:6378".to_owned());
let addr = addr.parse::<SocketAddr>()?;

let listener = TcpListener::bind(&addr)?;
println!("rudis_async listening on: {}", addr);

let server_future = listener
.incoming()
.map_err(|e| println!("failed to accept socket; error = {:?}", e))
.for_each(handle_client);

tokio::run(server_future);
Ok(())
}

We parse the string that's been passed in as an argument or use a default address of 127.0.0.1:6378. We then create a new TcpListener instance with addr. This returns us a future in listener. We then chain on this future by calling incoming on and invoke for_each on it which takes in a closure and call handle_client on it. This future gets stored as server_future.In the end, we call tokio::run passing in server_future. This creates a main tokio task and schedules the future for execution.

In the same file, our handle_client function is defined like so:

// rudis_async/src/main.rs

fn handle_client(client: TcpStream) -> Result<(), ()> {
let (tx, rx) = RespCodec.framed(client).split();
let reply = rx.and_then(process_client_request);
let task = tx.send_all(reply).then(|res| {
if let Err(e) = res {
eprintln!("failed to process connection; error = {:?}", e);
}
Ok(())
});

tokio::spawn(task);
Ok(())
}

In handle_client, we first split our TcpStream into a writer (tx) and reader (rx) half by first converting the stream to a framed future calling framed on RespCodec receives the client connection and converts it into a framed future by calling framed on RudisFrame. Following that, we call split on it, which converts the frame into a Stream and Sink future, respectively. This simply gives us a tx and rx to read and write from the client socket. However, when we read this, we get the decoded message. When we write anything to tx, we write the encoded byte sequence.

On rx, we call and_then passing the process_client_request function, which will resolve the future to a decoded frame. We then take the writer half tx, and call send_all with the reply. We then spawn the future task by calling tokio::spawn.

In our codec.rs file, we have defined RudisFrame, which implements Encoder and Decoder traits from the tokio-codec crate:

// rudis_async/src/codec.rs

use std::io;
use bytes::BytesMut;
use tokio_codec::{Decoder, Encoder};
use resp::{Value, Decoder as RespDecoder};
use std::io::BufReader;
use std::str;

pub struct RespCodec;

impl Encoder for RespCodec {
type Item = Vec<u8>;
type Error = io::Error;

fn encode(&mut self, msg: Vec<u8>, buf: &mut BytesMut) -> io::Result<()> {
buf.reserve(msg.len());
buf.extend(msg);
Ok(())
}
}

impl Decoder for RespCodec {
type Item = Value;
type Error = io::Error;

fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Value>> {
let s = if let Some(n) = buf.iter().rposition(|b| *b == b' ') {
let client_query = buf.split_to(n + 1);

match str::from_utf8(&client_query.as_ref()) {
Ok(s) => s.to_string(),
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, "invalid string")),
}
} else {
return Ok(None);
};

if let Ok(v) = RespDecoder::new(&mut BufReader::new(s.as_bytes())).decode() {
Ok(Some(v))
} else {
Ok(None)
}
}
}

The Decoder implementation specify how to parse incoming bytes into a resp::Value type, whereas the Encoder trait specifies how to encode a resp::Value to a stream of bytes to the client.

Our commands.rs file implementation is the same as the previous one so we'll skip going through that. With that said, let's try our new server by running cargo run:

With the official redis-cli client, we can connect to our server by running:

$ redis-cli -p 6378

Here's a session of running redis-cli against rudis_async server:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset