Akka

The Akka framework extends the original Actor model in Scala by adding extraction capabilities such as support for typed Actor, message dispatching, routing, load balancing, and partitioning, as well as supervision and configurability [12:7].

The Akka framework can be downloaded from the www.akka.io website, or through the Typesafe Activator at http://www.typesafe.com/platform.

Akka simplifies the implementation of Actor by encapsulating some of the details of Scala Actor in the akka.actor.Actor and akka.actor.ActorSystem classes.

The three methods you want to override are as follows:

  • preStart: This is an optional method, invoked to initialize all the necessary resources such as file or database connection before the Actor is executed
  • receive: This method defines the Actor's behavior and returns a partial function of type PartialFunction[Any, Unit]
  • postStop: This is an optional method to clean up resources such as releasing memory, closing database connections, and socket or file handles

Note

Typed versus untyped actors

Untyped actors can process messages of any type. If the type of the message is not matched by the receiving actor, it is discarded. Untyped actors can be regarded as contract-less actors. They are the default actors in Scala.

Typed actors are similar to Java remote interfaces. They respond to a method invocation. The invocation is declared publicly, but the execution is delegated asynchronously to the private instance of the target actor [12:8].

Akka offers a variety of functionalities to deploy concurrent applications. Let us create a generic template for a master Actor and worker Actors to transform a dataset using any preprocessing or classification algorithm inherited from the PipeOperator trait, as explained in the The pipe operator section under Designing a workflow in Chapter 2, Hello World!. The master Actor manages the worker actors in one of the following ways:

  • Individual actors
  • Clusters through a router or a dispatcher

The router is a very simple example of Actor supervision. Supervision strategies in Akka are an essential component to make the application fault-tolerant [12:9]. A supervisor Actor manages the operations, availability, and life cycle of its children, known as subordinates. The supervision among actors is organized as a hierarchy. Supervision strategies are categorized as follows:

  • One-for-one strategy: This is the default strategy. In case of a failure of one of the subordinates, the supervisor executes a recovery, restart, or resume action for that subordinate only.
  • All-for-one strategy: The supervisor executes a recovery or remedial action on all its subordinates in case one of the Actors fails.

Master-workers

The first model to evaluate is the traditional master-slaves or master-workers design for computation workflow. In this design, the worker Actors are initialized and managed by the master Actor which is responsible for controlling the iterative process, state, and termination condition of the algorithm. The orchestration of the distributed tasks is performed through message passing.

Note

The design principle

It is highly recommended that you segregate the implementation of the computation or domain-specific logic from the actual implementation of the worker and master Actors.

Messages exchange

The first step in implementing the master-worker design is to define the different classes of messages exchanged between the master and each worker, to control the execution of the iterative procedure. The implementation of the master-worker design is as follows:

type DblSeries = XTSeries[Double]

sealed abstract class Message(val id: Int)
case class Start(i: Int =0) extends Message(i) //1
case class Activate(i: Int, xt: DblSeries extends Message(i) //2
case class Completed(i: Int, xt: DblSeries) extends Message(i) //3

Let's define the messages that control the execution of the algorithm. We need at least the following message types or case classes:

  1. Start is sent by the client code to the master to start the computation.
  2. Activate is sent by the master to the workers to activate the computation. This message contains the time series, xt, to be processed by the worker Actors.
  3. Completed is sent by each worker back to sender. It contains the variance of the data in the group.
  4. The master stops a worker using a PoisonPill message. The different approaches to terminate an actor are described in the The Master actor section.

The hierarchy of the Message class is sealed to prevent third-party developers from adding another message type. The worker responds to the activate message by executing a data transformation of type inherited from PipeOperator. The messages exchanged between master and worker actors are shown in the following diagram:

Messages exchange

Tip

Messages as case classes

The actor retrieves the messages queued in its mailbox by managing each message instance (copy, matching, and so on). Therefore, the message type has to be defined as a case class. Otherwise, the developer will have to override the equals and hashCode methods.

Worker actors

The worker actors are responsible for transforming each partition created by the master Actor, as follows:

class Worker(id: Int, fct: PipeOperator[DblSeries, DblSeries]) extends Actor { //1
  override def receive = {
    case msg: Activate => {
          msg.sender ! Completed(msg.id+id, transform(msg.xt)) //2
          context.stop(self)
    }
    case _ => Display.show("Unknown message", logger)
  }
  def transform(xt: DblSeries): DblSeries =  fct |> 
}

The Worker class constructor takes the fct data transformation as an argument (line 1). The worker launches the processing or transformation of the msg.xt data upon arrival of the Activate message (line 2). It returns the Completed message to the master once the data transformation, transform, is completed.

Tip

The design principle

It is highly recommended that you segregate the implementation of the computation or domain-specific logic from the actual implementation of the worker and master Actors.

The workflow controller

In the Scalability section in Chapter 1, Getting Started, we introduced the concepts of workflow and controller, to manage the training and classification process as a sequence of transformation on time series. Let's define an abstract class for all controller actors, Controller, with the following three key parameters:

  • A time series, xt, to be a process
  • A data transformation, fct, of type PipeOperator
  • A partitioning method, partitioner, to break down a time series for concurrent processing

The Controller class can be defined as follows:

abstract class Controller(val xt: DblSeries, val  fct: PipeOperator[DblSeries, DblSeries],val partitioner: Partitioner) extends Actor

The workflow controller is responsible for splitting the time series into several partitions and assigning each partition to a dedicated worker Actor. A helper class, Partitioner, implements the partitioning of the dataset as follows:

class Partitioner(val numPartitions: Int) {
  def split(xt: DblSeries): Array[Int] = {
    val sz = (xt.size.toDouble/numPartitions).floor.toInt
    val indices = Array.tabulate(numPartitions)(i=>(i+1)*sz)
    indices.update(numPartitions -1, xt.size)
    indices
  }
}

The split method breaks down a time series, xt, into numPartitions partitions, and returns the index of each partition relative to the original time series.

The master Actor

Let's define a master Actor class, Master. The three methods to override are as follows:

  • preStart is a method invoked to initialize all the necessary resources such as file or database connection before the actor executes
  • receive is a partial function that dequeues and processes the messages from the mail box
  • postStop cleans up resources such as releasing memory and closing database connections, sockets, or file handles

The Master class can be defined as follows:

abstract class Master(xt: DblSeries, fct: PipeOperator[DblSeries, DblSeries], partitioner: Partitioner) extends Controller(xt,fct, partitioner) {
  val workers = List.tabulate(partitioner.numPartitions)(n => 
               context.actorOf(Props(new Worker(n, fct)))) //4
  val aggregator = new ListBuffer[DblVector]  //5

  override def preStart: Unit = {} //6
  override def postStop: Unit = {} //7
  override def receive

The Master class has the following parameters:

  • xt: This is the time series to transform
  • fct: This is the transformation function
  • partitioner: This is the instance of time series partitioning

The worker actors are created through the actorOf factory method of the ActorSystem context (line 4). A list buffer, aggregator, collects and reduces the results from each worker (line 5). The preStart method implements any initialization required to process the messages (line 6). The postStop method releases all the resources allocated to process the messages (line 7).

The receive message handler processes only two types of messages: Start from the client code and Completed from the workers, as shown in the following code:

override def receive = {
   case Start => split //8
   case msg: Completed => { //10
     if(aggregator.size >= partitioner.numPartitions-1) { //12
       aggregate //14
       //13  workers.foreach( _ ! PoisonPill)
       context.stop(self) //15
     }
     aggregator.append(msg.xt.toArray) //11
   }
}

def aggregate: Seq[Double]

def split: Unit = {
   val partIdx = partitioner.split(xt)
   workers.zip(partIdx).foreach(w => 
     w._1 ! Activate(0, xt.slice(w._2-partIdx(0), w._2))) //9
  }

The Start message triggers the split of the input time series into partitions (line 8), which are then dispatched to each worker with the Activate message (line 9). Each worker sends a Completed message back to master upon the completion of their task (line 10). The master aggregates the results from the each worker (line 11). Once every worker has completed its task (line 12), the master terminates all the workers, through a PoisonPill message in case the worker actors do not terminate themselves (line 13). The master aggregate the results (line 14) before it terminates itself through a request to its context to stop it (line 15).

The aggregate method can be defined as a parameter either of the Master class or of one of its subclasses.

The previous code snippet uses two different approaches to terminate an actor. There are four different methods of shutting down an actor, as mentioned here:

  • actorSystem.shutdown: This method is used by the client to shut down the parent actor system
  • actor ! PoisonPill: This method is used by the client to send a poison pill message to the actor
  • context.stop(self): This method is used by the Actor to shut itself down within its context
  • context.stop(childActorRef): This method is used by the Actor to shut itself down through its reference

Master with routing

The previous design makes sense only if each worker has a unique characteristic that requires direct communication with the master. This is not the case in most applications. The communication and internal management of the worker can be delegated to a router. The implementation of the master routing capabilities is very similar to the previous design, as shown in the following code:

abstract class MasterWithRouter(xt: DblSeries, fct: PipeOperator[DblSeries, DblSeries], partitioner: Partitioner) extends Controller(xt, fct, partitioner) {
   val router = context.actorOf(Props(new Worker(0, fct))
      .withRouter(RoundRobinPool(partitioner.numPartitions, 
                  supervisorStrategy = this.supervisorStrategy)))  
 …

The only difference is that the context.actorOf factory creates an extra actor, router, along with the workers. This particular implementation relies on round-robin assignment of the message by the router to each worker. Akka supports several routing mechanisms that select a random actor, or the actor with the smallest mailbox, or the first to respond to a broadcast, and so on.

Note

Routing supervision

The router actor is a parent of the worker actors. It is by design a supervisor of the worker actors, which are its children actors. Therefore, the router is responsible for the life cycle of the worker actors which includes their creation, restarting, and termination.

The implementation of the receive message handler is almost identical to the message handler in the master without routing capabilities, except that the partitioning (line 1) is delegated to the router instead of being applied to each individual worker, as follows:

override def receive = {
  case msg: Start => split
  case msg: Completed => {
     if(aggregator.size >= partitioner.numPartitions-1) {
          aggregate
          context.stop(self)    //2
     }
     aggregator.append(msg.xt.toarray)
 }
} 
def split: Unit = {
   val indices = partitioner.split(xt)
   indices.foreach(n => 
       router ! Activate(xt.slice(n - indices(0), n))) //1
}

The supervising router terminates itself automatically once all its child actors are terminated (line 2).

Distributed discrete Fourier transform

Let's select the discrete Fourier transform (DFT) on a time series, xt, as our data transformation. We discussed it in the Discrete Fourier transform (DFT) section in Chapter 3, Data Preprocessing. The testing code is exactly the same, whether the master has routing capabilities or not.

First, let's define a master controller, DFTMaster, dedicated to the execution of the distributed discrete Fourier transform, as follows:

class DFTMaster(xt: XTSeries[Double], partitioner: Partitioner) extends Master(xt, DFT[Double], partitioner) {
  override def aggregate: Seq[Double] = 
        aggregator.transpose.map( _.sum).toSeq
}

The aggregate method aggregates or reduces the results of the discrete Fourier transform (frequencies distribution) from each worker. In the case of the discrete Fourier transform, the aggregate method transposes the list of frequencies distribution then summed the amplitude for each frequency, as shown here:

val NUM_WORKERS = 4
val NUM_DATAPOINTS = 1000000
val h = (x:Double) =>2.0*Math.cos(Math.PI*0.005*x) + 
                         Math.cos(Math.PI*0.05*x) + 
                     0.5*Math.cos(Math.PI*0.2*x) +
                     0.3* Random.nextDouble  //1
val xt = XTSeries[Double](Array.tabulate(NUM_DATAPOINTS)(h(_)))
val partitioner = new Partitioner(NUM_WORKERS) //2
   
implicit val actorSystem = ActorSystem("system") //3
val master = actorSystem.actorOf(Props(new DFTMaster(xt, partitioner)), "DFTMaster") //4
master ! Start  //5
Thread.sleep(15000)
actorSystem.shutdown //6

The input time series is synthetically generated by the noisy function, h (line 1). The function h has three distinct harmonics, 0.005, 0.05, and 0.2, so the results of the transformation can be easily validated. A partitioner instance is created for NUM_WORKERS worker Actors (line 2). The Actor system, ActorSystem, is instantiated (line 3) and the master Actor is generated through the Akka ActorSytem.actorOf factory. The main program sends a Start message to the master to trigger the distributed computation of the discrete Fourier transform. The main program has to sleep for a period of time long enough to allow the master to complete its task. Finally, the main program shuts down the actor system (line 6).

Note

Actor instantiation

Although the scala.actor.Actor class can be instantiated using the constructor, akka.actor.Actor is instantiated using a context, ActorSystem; a factory, actorOf; and a configuration object, Props. This second approach has several benefits, including decoupling the deployment of the actor from its functionality and enforcing a default supervisor or parent for the Actor, in this case ActorSystem.

The following sequential diagram illustrates the message exchange between the main program, master, and worker Actors:

Distributed discrete Fourier transform

Sequential diagram for the normalization of cross-validation groups

The purpose of the test is to evaluate the performance of the computation of the discrete Fourier transform using the Akka framework relative to the original implementation, without actors. As with the Scala parallel collections, the absolute timing for the transformation depends on the host and the configuration, as shown in the following graph:

Distributed discrete Fourier transform

The single-threaded version of the discrete Fourier transform is significantly faster than the implementation using the Akka master-worker model with a single worker actor. The cost of partitioning and the aggregating (or reducing) the results adds a significant overhead to execution of the Fourier transform. However, the master-worker model is far more efficient with three or more worker actors.

Limitations

The master-worker implementation has a few problems:

  • In the message handler of the master Actor, there is no guarantee that the poison pill will be consumed by all the workers before the master stops.
  • The main program has to sleep for a period of time long enough to allow the master and workers to complete their tasks. There is no guarantee that the computation will be completed when the main program awakes.
  • There is no mechanism to handle failure in delivering or processing messages.

The culprit is the exclusive use of the fire-and-forget mechanism to exchange data between master and workers. The send-and-receive protocol and futures are remedies to these problems.

Futures

A future is an object, more specifically a monad, used to retrieve the results of concurrent operations, in a non-blocking fashion. The concept is very similar to a callback supplied to a worker, which invokes it when the task is completed. Futures hold a value that might or might not become available in the future when a task is completed, successful or not [12:10].

There are two options to retrieve results from futures:

  • Blocking execution using scala.concurrent.Await
  • Callback functions, onComplete, onSuccess, and onFailure

Note

Which future?

A Scala environment provides developers with two different Future classes: scala.actor.Future and scala.concurrent.Future. The actor.Future class is used to write continuation-passing style workflows in which the current actor is blocked until the value of the future is available. Instances of type scala.concurrent.Future used in this chapter are the equivalent of java.concurrent.Future in Scala.

The Actor life cycle

Let's reimplement the normalization of cross-validation groups by their variance, which we introduced in the previous section, using futures to support concurrency. The first step is to import the appropriate classes for execution of the main actor and futures, as follows:

import akka.actor.{Actor, ActorSystem, ActorRef, Props}
import akka.util.Timeout
import scala.concurrent.{Await, Future} 

The Actor classes are provided by the package akka.actor, instead of the scala.actor._ package because of Akka's extended actor model. The future-related classes, Future and Await, are imported from the scala.concurrent package, which is similar to the java.concurrent package. The akka.util.Timeout class is used to specify the maximum duration the actor has to wait for the completion of the futures.

There are two options for a parent actor or the main program to manage the futures it creates:

  • Blocking: The parent actor or main program stops execution until all futures have completed their tasks.
  • Callback: The parent actor or the main program initiates the futures during execution. The future tasks are performed concurrently with the parent actor, that is then notified when each future task is completed.

Blocking on futures

The following design consists of blocking the actor that launches the futures until all the futures have been completed, either returning with a result or throwing an exception. Let's modify the master Actor into a class, TransformFutures, that manages futures instead of workers or routing actors, as follows:

abstract class TransformFutures(xt: DblSeries, 
    fct: PipeOperator[DblSeries, DblSeries], 
    partitioner: Partitioner)(implicit timeout: TimeOut) 
  extends Controller(xt,fct, partitioner) { //1

  override def receive = {
    case Start => compute(transform) //2
    case _ => Display.error("Message not recognized", logger)
  }
  def aggregate(results: Array[DblSeries]): Seq[Double]
…
}

The TransformFutures class requires the same parameters as the Master actor: a time series, xt; a data transformation, fct; and partitioner. The timeout parameter is an implicit argument of the Await.result method, and therefore, needs to be declared as an argument (line 1). The only message, Start, triggers the computation of the data transformation of each future, and then the aggregation of the results (line 2). The transform and compute methods have the same semantics as those in the master-workers design.

Tip

The generic message handler

You may have read or even written examples of actors that have generic case _ => handlers in the message loop for debugging purposes. The message loop takes a partial function as argument. Therefore, no error or exception is thrown in case the message type is not recognized. There is no need for such a handler aside from one for debugging purposes. Message types should inherit from a sealed abstract class or a sealed trait in order to prevent a new message type from being added by mistake.

Let's have a look at the transform method. Its main purpose is to instantiate, launch, and return an array of futures responsible for the transformation of the partitions, as shown in the following code:

def transform: Array[Future[DblSeries]] = {   
  val partIdx = partitioner.split(xt)
  val partitions = partIdx.map(n => 
     XTSeries[Double](xt.slice(n - partIdx(0), n).toArray)) //3

  val futures = new Array[Future[DblSeries]](partIdx.size) //4
  partitions.zipWithIndex.foreach(pi => {
     futures(pi._2) = Future[DblSeries] { fct |> pi._1 } 
  })
  futures
}

First, the transform method splits the input time series into several partitions (line 3), similar to the master Actor in the previous section. An array of futures (one future per partition) is created (line 4). Each future executes the data transformation, fct, to the partition assigned to the future (line 5) as the worker Actor did in the previous section.

The compute method has the same purpose as the aggregate method in the master-workers design. The execution of the Actor is blocked until the Await class method (line 6) scala.concurrent.Await.result returns the result of each future computation. In the case of the discrete Fourier transform, the list of frequencies is transposed before the amplitude of each frequency is summed (7), as follows:

def compute(futures: Array[Future[DblSeries]]): Seq[Double] = {
  val results = futures.map(Await.result(_, timeout.duration))
  aggregate(results)
}

The following sequential diagram illustrates the blocking design and the activities performed by the Actor and the futures:

Blocking on futures

Sequential diagram for actor blocking on future results

Handling future callbacks

Callbacks are an excellent alternative to having the actor blocks on futures, as they can simultaneously execute other functions concurrently with the future execution.

There are two simple ways to implement the callback function:

  • Future.onComplete
  • Future.onSuccess and Future.onFailure

The onComplete callback function takes a function of type Try[T] => U as argument with an implicit reference to the execution context, as shown in the following code:

val f: Future[T] = future { executeSomeTask }
f onComplete {
  case Success(s) => { … }
  case Failure(e) => { … }
}

You can surely recognize the {Try, Success, Failure} monad.

An alternative implementation is to invoke the onSuccess and onFailure methods that use partial functions as arguments to implement the callbacks, as follows:

f onFailure { case e: Exception => { … } }
f onSuccess { case t => { … } }

The only difference between blocking one future data transformation and handling callbacks is the implementation of the compute method or reducer. The class definition, message handler, and initialization of futures are identical, as shown in the following code:

def compute(futures: Array[Future[DblSeries]]): Seq[Double] = {
  val aggregation = new ArrayBuffer[DblSeries]
  futures.foreach(f => { 
    f onSuccess {  //1
      case data: DblSeries => aggregation.append(data)
    }
    f onFailure { //2
      case e: Exception => aggregation.append(XTSeries.empty)
    }
  })
  if( aggregation.find( _.isEmpty) == None) //3
     aggregate(aggregation.toArray)//4
  else Seq.empty
}

Each future calls the master Actor back with either the result of the data transformation, the onSuccess message (line 1), or an exception, the OnFailure message (line 2). If every future succeeds (line 3), the values of every frequency for all the partitions are summed (line 4). The following sequential diagram illustrates the handling of the callback in the master Actor:

Handling future callbacks

Sequential diagram for actor handling future result with Callbacks

Tip

Execution context

The Futures method requires that the execution context be implicitly provided by the developer. There are three different ways to define the execution context:

  • Import the context:
    import ExecutionContext.Implicits.global
    
  • Create an instance of the context within the actor (or actor context):
    implicit val ec = ExecutionContext.fromExecutorService( … )
  • Define the context when instantiating the future:
    val f= Future[T] ={  } (ec)

Putting all together

Let's reuse the discrete Fourier transform. The client code uses the same synthetically created time series as with the master-worker test model.

The first step is to create a transform future for the discrete Fourier transform, DFTTransformFuture, as follows:

class DFTTransformFutures(xt: DblSeries, partitioner: Partitioner)(implicit timeout: Timeout) 
        extends TransformFutures(xt, DFT[Double], partitioner)  {

  override def aggregate(xt: Array[DblSeries]): Seq[Double] = 
    xt.map(_.toArray).transpose.map(_.sum).toSeq
}

The only purpose of the DFTTransformFuture class is to define the aggregation method, aggregate, for the discrete Fourier transform, as follows:

import akka.pattern.ask
val duration = Duration(10000, "millis")
implicit val timeout = new Timeout(duration)
implicit val actorSystem = ActorSystem("system")

val xt = XTSeries[Double](Array.tabulate(NUM_DATAPOINTS)(h(_)))
val partitioner = new Partitioner(NUM_WORKERS)
   
val master = actorSystem.actorOf(Props(new DFTTransformFutures(xt, partitioner)), "DFTTransform")  //1
val future = master ? Start //2
Await.result(future, timeout.duration) //3
actorSystem.shutdown //4

The master Actor is initialized as of the TransformFutures type with the input time series, xt; discrete Fourier transform, DFT; and partitioner as arguments (line 1). The program creates a future instance, by sending (ask) the Start message to master. The program blocks until the completion of the future (line 3), and then shuts down the Akka actor system (line 4).

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset