Akka

The Akka framework extends the original Actor model in Scala by adding extraction capabilities such as support for typed Actor, message dispatching, routing, load balancing, and partitioning, as well as supervision and configurability [12:7].

The Akka framework can be downloaded from the http://akka.io/ website or through the Typesafe Activator at http://www.typesafe.com/platform.

Akka simplifies the implementation of the Actor model by encapsulating some of the details of Scala Actor in the akka.actor.Actor and akka.actor.ActorSystem classes.

The three methods you want to override are as follows:

  • prestart: This is an optional method that is invoked to initialize all the necessary resources such as file or database connection before the Actor is executed
  • receive: This method defines the Actor's behavior and returns a partial function of the PartialFunction[Any, Unit] type
  • postStop: This is an optional method to clean up resources such as releasing memory, closing database connections, and socket or file handles

Note

Typed and untyped actors

Untyped actors can process messages of any type. If the type of the message is not matched by the receiving actor, it is discarded. Untyped actors can be regarded as contract-less actors. They are the default actors in Scala.

Typed actors are similar to Java remote interfaces. They respond to a method invocation. The invocation is declared publicly, but the execution is delegated asynchronously to the private instance of the target actor [12:8].

Akka offers a variety of functionalities to deploy concurrent applications. Let's create a generic template for a master Actor and worker Actors to transform a dataset using any preprocessing or classification algorithm inherited from an explicit or implicit monadic data transformation, as described in the Monadic data transformation section in Chapter 2, Hello World!

The master Actor manages the worker actors in one of the following ways:

  • Individual actors
  • Clusters through a router or a dispatcher

The router is a very simple example of Actor supervision. Supervision strategies in Akka are an essential component to make the application fault-tolerant [12:9]. A supervisor Actor manages the operations, availability, and life cycle of its children, known as subordinates. The supervision among actors is organized as a hierarchy. Supervision strategies are categorized as follows:

  • One-for-one strategy: This is the default strategy. In case of a failure of one of the subordinates, the supervisor executes a recovery, restart, or resume action for that subordinate only.
  • All-for-one strategy: The supervisor executes a recovery or remedial action on all its subordinates in case one of the Actors fails.

Master-workers

The first model to evaluate is the traditional master-slaves or master-workers design for the computation workflow. In this design, the worker Actors are initialized and managed by the master Actor, which is responsible for controlling the iterative process, state, and termination condition of the algorithm. The orchestration of the distributed tasks is performed through message passing.

Note

The design principle

It is highly recommended that you segregate the implementation of the computation or domain-specific logic from the actual implementation of the worker and master actors.

Exchange of messages

The first step in implementing the master-worker design is to define the different classes of messages exchanged between the master and each worker in order to control the execution of the iterative procedure. The implementation of the master-worker design is as follows:

sealed abstract class Message(val i: Int)
case class Terminate(i: Int) extends Message(i)
case class Start(i: Int =0) extends Message(i)  //1
case class Activate(i: Int, x: DblVector) extends Message(i) //2
case class Completed(i: Int, x: DblVector) extends Message(i)//3

Let's define the messages that control the execution of the algorithm. We need at least the following message types or case classes:

  • Start: This is sent by the client code to the master to start the computation (line 1).
  • Activate: This is sent by the master to the workers to activate the computation. This message contains the time series x to be processed by the worker Actors. It also contains the reference to sender (master actor). (line 2).
  • Completed: This is sent by each worker back to sender. It contains the variance of the data in the group (line 3).

The master stops a worker using a PoisonPill message. The different approaches to terminate an actor are described in the The master actor section.

The hierarchy of the Message class is sealed to prevent third-party developers from adding another message type. The worker responds to the activate message by executing a data transformation of the ITransform type. The messages exchanged between master and worker actors are shown in the following diagram:

Exchange of messages

A sketch design of the master-slave communication in an actor framework

Note

Messages as case classes

The actor retrieves the messages queued in its mailbox by managing each message instance (copying, matching, and so on). Therefore, the message type has to be defined as a case class. Otherwise, the developer will have to override the equals and hashCode methods.

Worker actors

The worker actors are responsible for transforming each partitioned datasets created by the master Actor, as follows:

type PfnTransform =  PartialFunction[DblVector, Try[DblVector]]

class Worker(id: Int, 
     fct: PfnTransform) extends Actor  {  //1
  override def receive = {
    case msg: Activate =>  //2
      sender ! Completed(msg.id+id,  fct(msg.xt).get)
   }
}

The Worker class constructor takes the fct (the partial function as an argument) (line 1). The worker launches the processing or transformation of the msg.xt data on arrival of the Activate message (line 2). It returns the Completed message to the master once the fct data transformation is completed.

The workflow controller

In the Scalability section in Chapter 1, Getting Started, we introduced the concepts of workflow and controller to manage the training and classification process as a sequence of transformation on a time series. Let's define an abstract class for all controller actors, Controller, with the following three key parameters:

  • A time series xt to be processed
  • A fct data transformation implemented as a partial function
  • The number of partitions nPartitions to break down a time series for concurrent processing

The Controller class can be defined as follows:

abstract class Controller (
  val xt: DblVector, 
   val fct: PfnTransform, 
   val nPartitions: Int) extends Actor with Monitor { //3

   def partition: Iterator[DblVector] = { //4
      val sz = (xt.size.toDouble/nPartitions).ceil.toInt
      xt.grouped(sz)
   }
}

The controller is responsible for splitting the time series into several partitions and assigning each partition to a dedicated worker (line 4).

The master actor

Let's define a master actor class Master. The three methods to override are as follows:

  • prestart: This is a method invoked to initialize all the necessary resources such as a file or database connection before the actor executes (line 9)
  • receive: This is a partial function that dequeues and processes the messages from the mail box
  • postStop: This cleans up resources such as releasing memory and closing database connections, sockets, or file handles (line 10)

The Master class can be defined as follows:

abstract class Master(  //5
    xt: DblVector, 
    fct: PfnTransform, 
    nPartitions: Int) extends Controller(xt, fct, nPartitions) {

  val aggregator = new Aggregator(nPartitions)  //6
  val workers = List.tabulate(nPartitions)(n => 
        context.actorOf(Props(new Worker(n, fct)), 
               name = s"worker_$n"))  //7
  workers.foreach( context.watch ( _ ) )  //8

  override def preStart: Unit = /* ... */  //9
  override def postStop: Unit = /* ... */  //10
  override def receive 
}

The Master class has the following parameters (line 5):

  • xt: This is the time series to transform
  • fct: This is the transformation function
  • nPartitions: This is the number of partitions

An aggregating class aggregator collects and reduces the results from each worker (line 6):

class Aggregator(partitions: Int) {
  val state = new ListBuffer[DblVector]

  def += (x: DblVector): Boolean = {
    state.append(x)
    state.size == partitions
  }

  def clear: Unit = state.clear
  def completed: Boolean = state.size == partitions
}

The worker actors are created through the actorOf factory method of the ActorSystem context (line 7). The worker actors are attached to the context of the master actor, so it can be notified when the workers terminate (line 8).

The receive message handler processes only two types of messages: Start from the client code and Completed from the workers, as shown in the following code:

override def receive = {
  case s: Start => start  //11

  case msg: Completed =>   //12
    if( aggregator +=  msg.xt) //13
      workers.foreach( context.stop(_) )   //14

  case Terminated(sender) => //15
    if( aggregator.completed ) {  
      context.stop(self)   //16
      context.system.shutdown
    }
}

The Start message triggers the partitioning of the input time series into partitions (line 11):

def start: Unit = workers.zip(partition.toVector)
             .foreach {case (w, s) => w ! Activate(0,s)} //16

The partitions are then dispatched to each worker with the Activate message (line 16).

Each worker sends a Completed message back to master on the completion of their task (line 12). The master aggregates the results from each worker (line 13). Once all the workers have completed their task, they are removed from the master's context (line 14). The master terminates all the workers through a Terminated message (line 15), and finally, terminates itself through a request to its context to stop it (line 16).

The previous code snippet uses two different approaches to terminate an actor. There are four different methods of shutting down an actor, as mentioned here:

  • actorSystem.shutdown: This method is used by the client to shut down the parent actor system
  • actor ! PoisonPill: This method is used by the client to send a poison pill message to the actor
  • context.stop(self): This method is used by the Actor to shut itself down within its context
  • context.stop(childActorRef): This method is used by the Actor to shut itself down through its reference

Master with routing

The previous design makes sense only if each worker has a unique characteristic that requires direct communication with the master. This is not the case in most applications. The communication and internal management of the worker can be delegated to a router. The implementation of the master routing capabilities is very similar to the previous design, as shown in the following code:

class MasterWithRouter(
    xt: DblVector, 
    fct: PfnTransform, 
    nPartitions: Int) extends Controller(xt, fct, nPartitions)  {

  val aggregator = new Aggregator(nPartitions)
  val router = {   //17
    val routerConfig = RoundRobinRouter(nPartitions, //18
           supervisorStrategy = this.supervisorStrategy)
    context.actorOf(
       Props(new Worker(0,fct)).withRouter(routerConfig) )
   }
   context.watch(router)

   override def receive
}

The only difference is that the context.actorOf factory creates an extra actor, router, along with the workers (line 17). This particular implementation relies on round-robin assignment of the message by the router to each worker (line 18). Akka supports several routing mechanisms that select a random actor, or the actor with the smallest mailbox, or the first to respond to a broadcast, and so on.

Note

Router supervision

The router actor is a parent of the worker actors. It is by design a supervisor of the worker actors, which are its children actors. Therefore, the router is responsible for the life cycle of the worker actors, which includes their creation, restarting, and termination.

The implementation of the receive message handler is almost identical to the message handler in the master without routing capabilities, with the exception of the termination of the workers through the router (line 19):

override def receive = {
  case Start => start
  case msg: Completed => 
    if( aggregator += msg.xt) context.stop(router)  //19
   ...
}

The start message handler has to be modified to broadcast the Activate message to all the workers through the router:

def start: Unit = 
  partition.toVector.foreach {router ! Activate(0, _)}

Distributed discrete Fourier transform

Let's select the discrete Fourier transform (DFT) on a time series xt as our data transformation. We discussed this in the Discrete Fourier transform section in Chapter 3, Data Preprocessing. The testing code is exactly the same, whether the master has routing capabilities or not.

First, let's define a master controller DFTMaster dedicated to the execution of the distributed discrete Fourier transform, as follows:

type Reducer = List[DblVector] => immutable.Seq[Double]
class DFTMaster(
    xt: DblVector, 
    nPartitions: Int, 
    reducer: Reducer)   //20
      extends Master(xt, DFT[Double].|>, nPartitions)

The reducer method aggregates or reduces the results of the discrete Fourier transform (frequencies distribution) from each worker (line 20). In the case of the discrete Fourier transform, the fReduce reducer method transposes the list of frequencies distribution and then sums up the amplitude for each frequency (line 21):

def fReduce(buf: List[DblVector]): immutable.Seq[Double] = 
   buf.transpose.map( _.sum).toSeq  //21

Let's take a look at the test code:

val NUM_WORKERS = 4 
val NUM_DATAPOINTS = 1000000
val h = (x: Double) =>2.0*Math.cos(Math.PI*0.005*x) + 
    Math.cos(Math.PI*0.05*x) + 0.5*Math.cos(Math.PI*0.2*x) +
    0.3* Random.nextDouble   //22

val actorSystem = ActorSystem("System")  //23
val xt = Vector.tabulate(NUM_DATA_POINTS)(h(_))
val controller = actorSystem.actorOf(
         Props(new DFTMasterWithRouter(xt, NUM_WORKERS, 
                    fReduce)), "MasterWithRouter")  //24
controller ! Start(1) //25

The input time series is synthetically generated by the noisy sinusoidal function h (line 22). The function h has three distinct harmonics: 0.005, 0.05, and 0.2, so the results of the transformation can be easily validated. The Actor system, ActorSystem, is instantiated (line 23) and the master Actor is generated through the Akka ActorSytem.actorOf factory (line 24). The main program sends a Start message to the master to trigger the distributed computation of the discrete Fourier transform (line 25).

Note

The action instantiation

Although the scala.actor.Actor class can be instantiated using the constructor, akka.actor.Actor is instantiated using an ActorSystem context, an actorOf factory, and a Props configuration object. This second approach has several benefits, including decoupling the deployment of the actor from its functionality and enforcing a default supervisor or parent for the Actor; in this case, ActorSystem.

The following sequential diagram illustrates the message exchange between the main program, master, and worker Actors:

Distributed discrete Fourier transform

A sequential diagram for the normalization of cross-validation groups

The purpose of the test is to evaluate the performance of the computation of the discrete Fourier transform using the Akka framework relative to the original implementation, without actors. As with Scala parallel collections, the absolute timing for the transformation depends on the host and the configuration, as shown in the following graph:

Distributed discrete Fourier transform

The impact of the number of worker (slave) actors on the performance of the discrete Fourier transform

The single-threaded version of the discrete Fourier transform is significantly faster than the implementation using the Akka master-worker model with a single worker actor. The cost of partitioning and aggregating (or reducing) the results adds a significant overhead to the execution of the Fourier transform. However, the master worker model is far more efficient with three or more worker actors.

Limitations

The master-worker implementation has a few problems, which are as follows:

  • In the message handler of the master Actor, there is no guarantee that the poison pill will be consumed by all the workers before the master stops.
  • The main program has to sleep for a period of time long enough to allow the master and workers to complete their tasks. There is no guarantee that the computation will be completed when the main program awakes.
  • There is no mechanism to handle failure in delivering or processing messages.

The culprit is the exclusive use of the fire-and-forget mechanism to exchange data between master and workers. The send-and-receive protocol and futures are remedies to these problems.

Futures

A future is an object, more specifically a monad, used to retrieve the results of concurrent operations, in a nonblocking fashion. The concept is very similar to a callback supplied to a worker, which invokes it when the task is completed. Futures hold a value that might or might not become available in the future when a task is completed, whether successful or not [12:10].

There are two options to retrieve results from futures:

  • Blocking the execution using scala.concurrent.Await
  • The onComplete, onSuccess, and onFailure callback functions

Note

Which future?

A Scala environment provides developers with two different Future classes: scala.actor.Future and scala.concurrent.Future.

The actor.Future class is used to write continuation-passing style workflows in which the current actor is blocked until the value of the future is available. Instances of the scala.concurrent.Future type used in this chapter are the equivalent of java.concurrent.Future in Scala.

The Actor life cycle

Let's reimplement the normalization of cross-validation groups by their variance, which we introduced in the previous section, using futures to support concurrency. The first step is to import the appropriate classes for execution of the main actor and futures, as follows:

import akka.actor.{Actor, ActorSystem, ActorRef, Props} //26
import akka.util.Timeout   //27
import scala.concurrent.{Await, Future}  //28

The Actor classes are provided by the akka.actor package, instead of the scala.actor._ package because of Akka's extended actor model (line 26). The future-related classes, Future and Await, are imported from the scala.concurrent package, which is similar to the java.concurrent package (line 28). The akka.util.Timeout class is used to specify the maximum duration the actor has to wait for the completion of the futures (line 27).

There are two options for a parent actor or the main program to manage the futures it creates, which are as follows:

  • Blocking: The parent actor or main program stops the execution until all futures have completed their tasks.
  • Callback: The parent actor or the main program initiates the futures during the execution. The future tasks are performed concurrently with the parent actor, and it is then notified when each future task is completed.

Blocking on futures

The following design consists of blocking the actor that launches the futures until all the futures have been completed, either returning with a result or throwing an exception. Let's modify the master actor into a TransformFutures class that manages futures instead of workers or routing actors, as follows:

abstract class TransformFutures(
    xt: DblVector, 
    fct: PfnTransform, 
    nPartitions: Int)
    (implicit timeout: Timeout) //29
         extends Controller(xt, fct, nPartitions) {
  override def receive = {
    case s: Start => compute(transform) //30
  }
}

The TransformFutures class requires the same parameters as the Master actor: a time series, xt, a data transformation, fct, and the number of partitions, nPartitions. The timeout parameter is an implicit argument of the Await.result method, and therefore, needs to be declared as an argument (line 29). The only message, Start, triggers the computation of the data transformation of each future, and then the aggregation of the results (line 30). The transform and compute methods have the same semantics as those in the master-workers design.

Note

The generic message handler

You may have read or even written examples of actors that have generic case _ => handlers in the message loop for debugging purposes. The message loop takes a partial function as an argument. Therefore, no error or exception is thrown if the message type is not recognized. There is no need for such a handler apart from the one for debugging purposes. Message types should inherit from a sealed abstract class or a sealed trait in order to prevent a new message type from being added by mistake.

Let's take a look at the transform method. Its main purpose is to instantiate, launch, and return an array of futures responsible for the transformation of the partitions, as shown in the following code:

def transform: Array[Future[DblVector]] = {
  val futures = new Array[Future[DblVector]](nPartitions) //31

  partition.zipWithIndex.foreach { case (x, n) => { //32
    futures(n) = Future[DblVector] { fct(x).get } //33
  }}
  futures
}

An array of futures (one future per partition) is created (line 31). The transform method invokes the partitioning method partition (line 32) and then initializes the future with the fct partial function (line 33):

def compute(futures: Array[Future[DblVector]]): Seq[Double] = 
  reduce(futures.map(Await.result(_, timeout.duration))) //34

The compute method invokes a user-defined reduce function on the futures. The execution of the Actor is blocked until the Await class' scala.concurrent.Await.result method (line 34) returns the result of each future computation. In the case of the discrete Fourier transform, the list of frequencies is transposed before the amplitude of each frequency is summed (line 35), as follows:

def reduce(data: Array[DblVector]): Seq[Double] = 
    data.view.map(_.toArray)
        .transpose.map(_.sum)   //35
            .take(SPECTRUM_WIDTH).toSeq

The following sequential diagram illustrates the blocking design and the activities performed by the Actor and the futures:

Blocking on futures

The sequential diagram for actor blocking on future results

Handling future callbacks

Callbacks are an excellent alternative to having the actor blocks on futures, as they can simultaneously execute other functions concurrently with the future execution.

There are two simple ways to implement the callback function, as follows:

  • Future.onComplete
  • Future.onSuccess and Future.onFailure

The onComplete callback function takes a function of the Try[T] => U type as an argument with an implicit reference to the execution context, as shown in the following code:

val f: Future[T] = future { execute task } f onComplete {   
  case Success(s) => { … }   
  case Failure(e) => { … }
}

You can surely recognize the {Try, Success, Failure} monad.

An alternative implementation is to invoke the onSuccess and onFailure methods that use partial functions as arguments to implement the callbacks, as follows:

f onFailure { case e: Exception => { … } } 
f onSuccess { case t => { … } }

The only difference between blocking one future data transformation and handling callbacks is the implementation of the compute method or reducer. The class definition, message handler, and initialization of futures are identical, as shown in the following code:

def compute(futures: Array[Future[DblVector]]): Seq[Double] = {
  val buffer = new ArrayBuffer[DblVector]
  
  futures.foreach( f => {
    f onSuccess {   //36
      case data: DblVector => buffer.append(data)
    }
    f onFailure { case e: Exception =>  /* .. */ } //37
  })
   buffer.find( _.isEmpty).map( _ => reduce(buffer)) //38
}

Each future calls the master actor back with either the result of the data transformation, the onSuccess message (line 36), or an exception, the OnFailure message (line 37). If every future succeeds, the values of all frequencies for all the partitions are summed (line 38). The following sequential diagram illustrates the handling of the callback in the master actor:

Handling future callbacks

A sequential diagram for actor handling future result with callbacks

Note

The execution context

The application of futures requires that the execution context is implicitly provided by the developer. There are three different ways to define the execution context:

  • Import the context: import ExecutionContext.Implicits.global
  • Create an instance of the context within the actor (or actor context): implicit val ec = ExecutionContext.fromExecutorService( … )
  • Define the context when instantiating the future: val f= Future[T] ={ } (ec)

Putting it all together

Let's reuse the discrete Fourier transform. The client code uses the same synthetically created time series as in the master-worker test model. The first step is to create a transform future for the discrete Fourier transform, DFTTransformFuture, as follows:

class DFTTransformFutures(
    xt: DblVector, 
    partitions: Int)(implicit timeout: Timeout) 
    extends TransformFutures(xt, DFT[Double].|> , partitions)  {

  override def reduce(data: Array[DblVector]): Seq[Double] = 
    data.map(_.toArray).transpose
        .map(_.sum).take(SPECTRUM_WIDTH).toSeq
}

The only purpose of the DFTTransformFuture class is to define the reduce aggregation method for the discrete Fourier transform. Let's reuse the same test case as in the Distributed discrete Fourier transform section under Master-workers:

import akka.pattern.ask

val duration = Duration(8000, "millis")
implicit val timeout = new Timeout(duration)
val master = actorSystem.actorOf(   //39
       Props(new DFTTransformFutures(xt, NUM_WORKERS)), 
                        "DFTTransform")
val future = master ? Start(0)  //40
Await.result(future, timeout.duration)   //41
actorSystem.shutdown  //42

The master actor is initialized as of the TransformFutures type with the input time series xt, the discrete Fourier transform DFT, and the number of workers or partitions nPartitions as arguments (line 39). The program creates a future instance by sending (ask) the Start message to the master (line 40). The program blocks until the completion of the future (line 41), and then shuts down the Akka actor system (line 42).

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset