Concurrency using the actor model

Another model of concurrency that is quite similar to the message passing model is the actor model. The actor model became popular with Erlang, a functional programming language popular in the telecom industry, known for its robustness and distributed by default nature.

The actor model is a conceptual model that implements concurrency at the type level using entities called actors. It was first introduced by Carl Eddie Hewitt in 1973. It removes the need for locks and synchronization and provides a cleaner way to introduce concurrency in a system. The actor model consists of three things:

  • Actor: This is a core primitive in the actor model. Each actor consists of its address, using which we can send messages to an actor's and mailbox, which is just a queue to store the messages it has received. The queue is generally a First In, First Out (FIFO) queue. The address of an actor is needed so that other actors can send messages to it. The supervisor actor can create child actors that can create other child actors.
  • Messages: Actors communicate only via messages. They are processed asynchronously by actors. The actix-web framework provides a nice wrapper for synchronous operations in an asynchronous wrapper.

In Rust, we have the actix crate that implements the actor model. The actix crate, uses the tokio and futures crate which we'll cover in Chapter 12, Network Programming in Rust. The core objects to that crate is the Arbiter type which is simply a thread which spawns an event loop underneath and provides a handle to the event loop as an Addr type. Once created, we can use this handle to send messages to the actor.

In actix, creation of actor follows a simple step of creating a type, defining a message and implementing the handler for the message for the actor type. Once that is done, we can create the actor and spawn them into one of the created arbiters.

Each actor runs within an arbiter.

When we create an actor, they don't execute right away. It's when we put these actors into arbiter threads, they then start executing.

To keep the code example simple and to show how to setup actors and run them in actix, we'll create a actor that can add two numbers. Let's create a new project by running cargo new actor_demo with the following dependencies in Cargo.toml:

# actor_demo/Cargo.toml

[dependencies]
actix = "0.7.9"
futures = "0.1.25"
tokio = "0.1.15"

Our main.rs contains the following code:

// actor_demo/src/main.rs

use actix::prelude::*;
use tokio::timer::Delay;
use std::time::Duration;
use std::time::Instant;
use futures::future::Future;
use futures::future;

struct Add(u32, u32);

impl Message for Add {
type Result = Result<u32, ()>;
}

struct Adder;

impl Actor for Adder {
type Context = SyncContext<Self>;
}

impl Handler<Add> for Adder {
type Result = Result<u32, ()>;

fn handle(&mut self, msg: Add, _: &mut Self::Context) -> Self::Result {
let sum = msg.0 + msg.0;
println!("Computed: {} + {} = {}",msg.0, msg.1, sum);
Ok(msg.0 + msg.1)
}
}

fn main() {
System::run(|| {
let addr = SyncArbiter::start(3, || Adder);
for n in 5..10 {
addr.do_send(Add(n, n+1));
}

tokio::spawn(futures::lazy(|| {
Delay::new(Instant::now() + Duration::from_secs(1)).then(|_| {
System::current().stop();
future::ok::<(),()>(())
})
}));
});
}

In the preceding code, we have created an actor named Adder. This actor can send and receive messages of type Add. This is a tuple struct that encapsulates two numbers to be added. To allow Adder to receive and process Add messages, we implement the Handler trait for Adder parameterized over the Add message type. In the Handler implementation, we print the computation being performed and return the sum of the given numbers.

Following that, in main, we first create a System actor by calling its run method which takes in a closure. Within the closure, we start a SyncArbiter with 3 threads by calling its start method. This create 3 actors ready to receive messages. It returns a Addr type which is a handle to the event loop to which we can send messages to the Adder actor instance. We then send 5 messages to our arbiter address addr. As the System::run is an parent event loop that runs forever, we spawn a future to stop the System actor after a delay of 1 second. We can ignore the details of this part of the code as it is simply to shutdown the System actor in an asynchronous way.

With that said, let's take this program for a spin:

$ cargo run
Running `target/debug/actor_demo`

Computed: 5 + 6 = 10
Computed: 6 + 7 = 12
Computed: 7 + 8 = 14
Computed: 8 + 9 = 16
Computed: 9 + 10 = 18

Similar to the actix crate, there are other crates in the Rust ecosystem that implements various concurrency models suitable for different use cases.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset