The Akka framework extends the original Actor model in Scala by adding extraction capabilities such as support for typed Actor, message dispatching, routing, load balancing, and partitioning, as well as supervision and configurability [12:7].
The Akka framework can be downloaded from the www.akka.io website, or through the Typesafe Activator at http://www.typesafe.com/platform.
Akka simplifies the implementation of Actor by encapsulating some of the details of Scala Actor in the akka.actor.Actor
and akka.actor.ActorSystem
classes.
The three methods you want to override are as follows:
preStart
: This is an optional method, invoked to initialize all the necessary resources such as file or database connection before the Actor is executedreceive
: This method defines the Actor's behavior and returns a partial function of type PartialFunction[Any, Unit]
postStop
: This is an optional method to clean up resources such as releasing memory, closing database connections, and socket or file handlesTyped versus untyped actors
Untyped actors can process messages of any type. If the type of the message is not matched by the receiving actor, it is discarded. Untyped actors can be regarded as contract-less actors. They are the default actors in Scala.
Typed actors are similar to Java remote interfaces. They respond to a method invocation. The invocation is declared publicly, but the execution is delegated asynchronously to the private instance of the target actor [12:8].
Akka offers a variety of functionalities to deploy concurrent applications. Let us create a generic template for a master Actor and worker Actors to transform a dataset using any preprocessing or classification algorithm inherited from the PipeOperator
trait, as explained in the The pipe operator section under Designing a workflow in Chapter 2, Hello World!. The master Actor manages the worker actors in one of the following ways:
The router is a very simple example of Actor supervision. Supervision strategies in Akka are an essential component to make the application fault-tolerant [12:9]. A supervisor Actor manages the operations, availability, and life cycle of its children, known as subordinates. The supervision among actors is organized as a hierarchy. Supervision strategies are categorized as follows:
The first model to evaluate is the traditional master-slaves or master-workers design for computation workflow. In this design, the worker Actors are initialized and managed by the master Actor which is responsible for controlling the iterative process, state, and termination condition of the algorithm. The orchestration of the distributed tasks is performed through message passing.
The first step in implementing the master-worker design is to define the different classes of messages exchanged between the master and each worker, to control the execution of the iterative procedure. The implementation of the master-worker design is as follows:
type DblSeries = XTSeries[Double] sealed abstract class Message(val id: Int) case class Start(i: Int =0) extends Message(i) //1 case class Activate(i: Int, xt: DblSeries extends Message(i) //2 case class Completed(i: Int, xt: DblSeries) extends Message(i) //3
Let's define the messages that control the execution of the algorithm. We need at least the following message types or case classes:
Start
is sent by the client code to the master to start the computation.Activate
is sent by the master to the workers to activate the computation. This message contains the time series, xt
, to be processed by the worker Actors.Completed
is sent by each worker back to sender
. It contains the variance of the data in the group.PoisonPill
message. The different approaches to terminate an actor are described in the The Master actor section.The hierarchy of the Message
class is sealed to prevent third-party developers from adding another message type. The worker responds to the activate message by executing a data transformation of type inherited from PipeOperator
. The messages exchanged between master and worker actors are shown in the following diagram:
The worker actors are responsible for transforming each partition created by the master Actor, as follows:
class Worker(id: Int, fct: PipeOperator[DblSeries, DblSeries]) extends Actor { //1 override def receive = { case msg: Activate => { msg.sender ! Completed(msg.id+id, transform(msg.xt)) //2 context.stop(self) } case _ => Display.show("Unknown message", logger) } def transform(xt: DblSeries): DblSeries = fct |> }
The Worker
class constructor takes the fct
data transformation as an argument (line 1
). The worker launches the processing or transformation of the msg.xt
data upon arrival of the Activate
message (line 2
). It returns the Completed
message to the master once the data transformation, transform
, is completed.
In the Scalability section in Chapter 1, Getting Started, we introduced the concepts of workflow and controller, to manage the training and classification process as a sequence of transformation on time series. Let's define an abstract class for all controller actors, Controller
, with the following three key parameters:
xt
, to be a processfct
, of type PipeOperator
partitioner
, to break down a time series for concurrent processingThe Controller
class can be defined as follows:
abstract class Controller(val xt: DblSeries, val fct: PipeOperator[DblSeries, DblSeries],val partitioner: Partitioner) extends Actor
The workflow controller is responsible for splitting the time series into several partitions and assigning each partition to a dedicated worker Actor. A helper class, Partitioner
, implements the partitioning of the dataset as follows:
class Partitioner(val numPartitions: Int) { def split(xt: DblSeries): Array[Int] = { val sz = (xt.size.toDouble/numPartitions).floor.toInt val indices = Array.tabulate(numPartitions)(i=>(i+1)*sz) indices.update(numPartitions -1, xt.size) indices } }
The split
method breaks down a time series, xt
, into numPartitions
partitions, and returns the index of each partition relative to the original time series.
Let's define a master Actor class, Master
. The three methods to override are as follows:
preStart
is a method invoked to initialize all the necessary resources such as file or database connection before the actor executesreceive
is a partial function that dequeues and processes the messages from the mail boxpostStop
cleans up resources such as releasing memory and closing database connections, sockets, or file handlesThe Master
class can be defined as follows:
abstract class Master(xt: DblSeries, fct: PipeOperator[DblSeries, DblSeries], partitioner: Partitioner) extends Controller(xt,fct, partitioner) { val workers = List.tabulate(partitioner.numPartitions)(n => context.actorOf(Props(new Worker(n, fct)))) //4 val aggregator = new ListBuffer[DblVector] //5 override def preStart: Unit = {} //6 override def postStop: Unit = {} //7 override def receive
The Master
class has the following parameters:
The worker actors are created through the actorOf
factory method of the ActorSystem
context (line 4
). A list buffer, aggregator
, collects and reduces the results from each worker (line 5
). The preStart
method implements any initialization required to process the messages (line 6
). The postStop
method releases all the resources allocated to process the messages (line 7
).
The receive
message handler processes only two types of messages: Start
from the client code and Completed
from the workers, as shown in the following code:
override def receive = { case Start => split //8 case msg: Completed => { //10 if(aggregator.size >= partitioner.numPartitions-1) { //12 aggregate //14 //13 workers.foreach( _ ! PoisonPill) context.stop(self) //15 } aggregator.append(msg.xt.toArray) //11 } } def aggregate: Seq[Double] def split: Unit = { val partIdx = partitioner.split(xt) workers.zip(partIdx).foreach(w => w._1 ! Activate(0, xt.slice(w._2-partIdx(0), w._2))) //9 }
The Start
message triggers the split of the input time series into partitions (line 8
), which are then dispatched to each worker with the Activate
message (line 9
). Each worker
sends a Completed
message back to master upon the completion of their task (line 10
). The master aggregates the results from the each worker (line 11
). Once every worker has completed its task (line 12
), the master terminates all the workers, through a PoisonPill
message in case the worker actors do not terminate themselves (line 13
). The master aggregate the results (line 14
) before it terminates itself through a request to its context
to stop it (line 15
).
The aggregate
method can be defined as a parameter either of the Master
class or of one of its subclasses.
The previous code snippet uses two different approaches to terminate an actor. There are four different methods of shutting down an actor, as mentioned here:
actorSystem.shutdown
: This method is used by the client to shut down the parent actor systemactor ! PoisonPill
: This method is used by the client to send a poison pill message to the actorcontext.stop(self)
: This method is used by the Actor to shut itself down within its contextcontext.stop(childActorRef)
: This method is used by the Actor to shut itself down through its referenceThe previous design makes sense only if each worker has a unique characteristic that requires direct communication with the master. This is not the case in most applications. The communication and internal management of the worker can be delegated to a router. The implementation of the master routing capabilities is very similar to the previous design, as shown in the following code:
abstract class MasterWithRouter(xt: DblSeries, fct: PipeOperator[DblSeries, DblSeries], partitioner: Partitioner) extends Controller(xt, fct, partitioner) { val router = context.actorOf(Props(new Worker(0, fct)) .withRouter(RoundRobinPool(partitioner.numPartitions, supervisorStrategy = this.supervisorStrategy))) …
The only difference is that the context.actorOf
factory creates an extra actor, router
, along with the workers. This particular implementation relies on round-robin assignment of the message by the router to each worker. Akka supports several routing mechanisms that select a random actor, or the actor with the smallest mailbox, or the first to respond to a broadcast, and so on.
The implementation of the receive
message handler is almost identical to the message handler in the master without routing capabilities, except that the partitioning (line 1
) is delegated to the router instead of being applied to each individual worker, as follows:
override def receive = {
case msg: Start => split
case msg: Completed => {
if(aggregator.size >= partitioner.numPartitions-1) {
aggregate
context.stop(self) //2
}
aggregator.append(msg.xt.toarray)
}
}
def split: Unit = {
val indices = partitioner.split(xt)
indices.foreach(n =>
router ! Activate(xt.slice(n - indices(0), n))) //1
}
The supervising router terminates itself automatically once all its child actors are terminated (line 2
).
Let's select the discrete Fourier transform (DFT) on a time series, xt
, as our data transformation. We discussed it in the Discrete Fourier transform (DFT) section in Chapter 3, Data Preprocessing. The testing code is exactly the same, whether the master has routing capabilities or not.
First, let's define a master controller, DFTMaster
, dedicated to the execution of the distributed discrete Fourier transform, as follows:
class DFTMaster(xt: XTSeries[Double], partitioner: Partitioner) extends Master(xt, DFT[Double], partitioner) { override def aggregate: Seq[Double] = aggregator.transpose.map( _.sum).toSeq }
The aggregate
method aggregates or reduces the results of the discrete Fourier transform (frequencies distribution) from each worker. In the case of the discrete Fourier transform, the aggregate
method transposes the list of frequencies distribution then summed the amplitude for each frequency, as shown here:
val NUM_WORKERS = 4 val NUM_DATAPOINTS = 1000000 val h = (x:Double) =>2.0*Math.cos(Math.PI*0.005*x) + Math.cos(Math.PI*0.05*x) + 0.5*Math.cos(Math.PI*0.2*x) + 0.3* Random.nextDouble //1 val xt = XTSeries[Double](Array.tabulate(NUM_DATAPOINTS)(h(_))) val partitioner = new Partitioner(NUM_WORKERS) //2 implicit val actorSystem = ActorSystem("system") //3 val master = actorSystem.actorOf(Props(new DFTMaster(xt, partitioner)), "DFTMaster") //4 master ! Start //5 Thread.sleep(15000) actorSystem.shutdown //6
The input time series is synthetically generated by the noisy function, h
(line 1
). The function h
has three distinct harmonics, 0.005
, 0.05
, and 0.2
, so the results of the transformation can be easily validated. A partitioner
instance is created for NUM_WORKERS
worker Actors (line 2
). The Actor system, ActorSystem
, is instantiated (line 3
) and the master
Actor is generated through the Akka ActorSytem.actorOf
factory. The main program sends a Start
message to the master to trigger the distributed computation of the discrete Fourier transform. The main program has to sleep for a period of time long enough to allow the master to complete its task. Finally, the main program shuts down the actor system (line 6
).
Actor instantiation
Although the scala.actor.Actor
class can be instantiated using the constructor, akka.actor.Actor
is instantiated using a context, ActorSystem
; a factory, actorOf
; and a configuration object, Props
. This second approach has several benefits, including decoupling the deployment of the actor from its functionality and enforcing a default supervisor or parent for the Actor, in this case ActorSystem
.
The following sequential diagram illustrates the message exchange between the main program, master, and worker Actors:
The purpose of the test is to evaluate the performance of the computation of the discrete Fourier transform using the Akka framework relative to the original implementation, without actors. As with the Scala parallel collections, the absolute timing for the transformation depends on the host and the configuration, as shown in the following graph:
The single-threaded version of the discrete Fourier transform is significantly faster than the implementation using the Akka master-worker model with a single worker actor. The cost of partitioning and the aggregating (or reducing) the results adds a significant overhead to execution of the Fourier transform. However, the master-worker model is far more efficient with three or more worker actors.
The master-worker implementation has a few problems:
The culprit is the exclusive use of the fire-and-forget mechanism to exchange data between master and workers. The send-and-receive protocol and futures are remedies to these problems.
A future is an object, more specifically a monad, used to retrieve the results of concurrent operations, in a non-blocking fashion. The concept is very similar to a callback supplied to a worker, which invokes it when the task is completed. Futures hold a value that might or might not become available in the future when a task is completed, successful or not [12:10].
There are two options to retrieve results from futures:
scala.concurrent.Await
onComplete
, onSuccess
, and onFailure
Which future?
A Scala environment provides developers with two different Future
classes: scala.actor.Future
and scala.concurrent.Future
. The actor.Future
class is used to write continuation-passing style workflows in which the current actor is blocked until the value of the future is available. Instances of type scala.concurrent.Future
used in this chapter are the equivalent of java.concurrent.Future
in Scala.
Let's reimplement the normalization of cross-validation groups by their variance, which we introduced in the previous section, using futures to support concurrency. The first step is to import the appropriate classes for execution of the main actor and futures, as follows:
import akka.actor.{Actor, ActorSystem, ActorRef, Props} import akka.util.Timeout import scala.concurrent.{Await, Future}
The Actor classes are provided by the package akka.actor
, instead of the scala.actor._
package because of Akka's extended actor model. The future-related classes, Future
and Await
, are imported from the scala.concurrent
package, which is similar to the java.concurrent
package. The akka.util.Timeout
class is used to specify the maximum duration the actor has to wait for the completion of the futures.
There are two options for a parent actor or the main program to manage the futures it creates:
The following design consists of blocking the actor that launches the futures until all the futures have been completed, either returning with a result or throwing an exception. Let's modify the master Actor into a class, TransformFutures
, that manages futures instead of workers or routing actors, as follows:
abstract class TransformFutures(xt: DblSeries, fct: PipeOperator[DblSeries, DblSeries], partitioner: Partitioner)(implicit timeout: TimeOut) extends Controller(xt,fct, partitioner) { //1 override def receive = { case Start => compute(transform) //2 case _ => Display.error("Message not recognized", logger) } def aggregate(results: Array[DblSeries]): Seq[Double] … }
The TransformFutures
class requires the same parameters as the Master
actor: a time series, xt
; a data transformation, fct
; and partitioner
. The timeout parameter is an implicit argument of the Await.result
method, and therefore, needs to be declared as an argument (line 1
). The only message, Start
, triggers the computation of the data transformation of each future, and then the aggregation of the results (line 2
). The transform
and compute
methods have the same semantics as those in the master-workers design.
The generic message handler
You may have read or even written examples of actors that have generic case _ =>
handlers in the message loop for debugging purposes. The message loop takes a partial function as argument. Therefore, no error or exception is thrown in case the message type is not recognized. There is no need for such a handler aside from one for debugging purposes. Message types should inherit from a sealed abstract class or a sealed trait in order to prevent a new message type from being added by mistake.
Let's have a look at the transform
method. Its main purpose is to instantiate, launch, and return an array of futures responsible for the transformation of the partitions, as shown in the following code:
def transform: Array[Future[DblSeries]] = { val partIdx = partitioner.split(xt) val partitions = partIdx.map(n => XTSeries[Double](xt.slice(n - partIdx(0), n).toArray)) //3 val futures = new Array[Future[DblSeries]](partIdx.size) //4 partitions.zipWithIndex.foreach(pi => { futures(pi._2) = Future[DblSeries] { fct |> pi._1 } }) futures }
First, the
transform
method splits the input time series into several partitions (line 3
), similar to the master Actor in the previous section. An array of futures (one future per partition) is created (line 4
). Each future executes the data transformation, fct
, to the partition assigned to the future (line 5
) as the worker Actor did in the previous section.
The compute
method has the same purpose as the aggregate
method in the master-workers design. The execution of the Actor is blocked until the Await
class method (line 6
) scala.concurrent.Await.result
returns the result of each future computation. In the case of the discrete Fourier transform, the list of frequencies is transposed before the amplitude of each frequency is summed (7), as follows:
def compute(futures: Array[Future[DblSeries]]): Seq[Double] = {
val results = futures.map(Await.result(_, timeout.duration))
aggregate(results)
}
The following sequential diagram illustrates the blocking design and the activities performed by the Actor and the futures:
Callbacks are an excellent alternative to having the actor blocks on futures, as they can simultaneously execute other functions concurrently with the future execution.
There are two simple ways to implement the callback function:
Future.onComplete
Future.onSuccess
and Future.onFailure
The onComplete
callback function takes a function of type Try[T] => U
as argument with an implicit reference to the execution context, as shown in the following code:
val f: Future[T] = future { executeSomeTask }
f onComplete {
case Success(s) => { … }
case Failure(e) => { … }
}
You can surely recognize the {Try, Success, Failure}
monad.
An alternative implementation is to invoke the onSuccess
and onFailure
methods that use partial functions as arguments to implement the callbacks, as follows:
f onFailure { case e: Exception => { … } }
f onSuccess { case t => { … } }
The only difference between blocking one future data transformation and handling callbacks is the implementation of the compute
method or reducer. The class definition, message handler, and initialization of futures are identical, as shown in the following code:
def compute(futures: Array[Future[DblSeries]]): Seq[Double] = { val aggregation = new ArrayBuffer[DblSeries] futures.foreach(f => { f onSuccess { //1 case data: DblSeries => aggregation.append(data) } f onFailure { //2 case e: Exception => aggregation.append(XTSeries.empty) } }) if( aggregation.find( _.isEmpty) == None) //3 aggregate(aggregation.toArray)//4 else Seq.empty }
Each future calls the master Actor back with either the result of the data transformation, the onSuccess
message (line 1
), or an exception, the OnFailure
message (line 2
). If every future succeeds (line 3
), the values of every frequency for all the partitions are summed (line 4
). The following sequential diagram illustrates the handling of the callback in the master Actor:
Execution context
The Futures method requires that the execution context be implicitly provided by the developer. There are three different ways to define the execution context:
import ExecutionContext.Implicits.global
implicit val ec = ExecutionContext.fromExecutorService( … )
val f= Future[T] ={ } (ec)
Let's reuse the discrete Fourier transform. The client code uses the same synthetically created time series as with the master-worker test model.
The first step is to create a transform future for the discrete Fourier transform, DFTTransformFuture
, as follows:
class DFTTransformFutures(xt: DblSeries, partitioner: Partitioner)(implicit timeout: Timeout) extends TransformFutures(xt, DFT[Double], partitioner) { override def aggregate(xt: Array[DblSeries]): Seq[Double] = xt.map(_.toArray).transpose.map(_.sum).toSeq }
The only purpose of the DFTTransformFuture
class is to define the aggregation method, aggregate
, for the discrete Fourier transform, as follows:
import akka.pattern.ask val duration = Duration(10000, "millis") implicit val timeout = new Timeout(duration) implicit val actorSystem = ActorSystem("system") val xt = XTSeries[Double](Array.tabulate(NUM_DATAPOINTS)(h(_))) val partitioner = new Partitioner(NUM_WORKERS) val master = actorSystem.actorOf(Props(new DFTTransformFutures(xt, partitioner)), "DFTTransform") //1 val future = master ? Start //2 Await.result(future, timeout.duration) //3 actorSystem.shutdown //4
The master Actor is initialized as of the TransformFutures
type with the input time series, xt
; discrete Fourier transform, DFT
; and partitioner
as arguments (line 1
). The program creates a future
instance, by sending (ask) the Start
message to master
. The program blocks until the completion of the future (line 3
), and then shuts down the Akka actor system (line 4
).