Chapter 12. Scalable and distributed applications using Akka

This chapter covers

  • The philosophy behind Akka
  • Simpler concurrency with actors, STM, agents, and dataflow
  • Building a large scalable application with Akka called Akkaoogle

This chapter introduces an exciting Scala toolkit called Akka, which allows you to build next-generation, event-based, fault-tolerant, scalable, and distributed applications for the JVM. Akka provides multiple concurrency abstractions (mentioned in section 9.2.3), and this chapter explores each one of them. So far, you’ve only seen how actors can be used to build message-oriented concurrency. Here we’ll go beyond actors and look into concurrency abstractions like STM, Agent, and Dataflow.

To understand how the pieces of Akka fit together, you’re going to build a realtime product search application using Akka called Akkaoogle. This application is similar to Froogle (www.google.com/products), Google’s service that finds the lowest price on products you search for. You’ll build this product incrementally so you can see which Akka features you can use in which situations.

Note

All of the Akka features covered in this chapter are also available as a Java API. I can’t cover the Java side of things in this chapter, but you can check out the latest documentation at http://akka.io/docs/ for details.

Akka is written in Scala but exposes all its features through both the Java and Scala APIs. Because this is a Scala book, in this chapter I mainly discuss the Scala API, but I include Java examples as well. You can build the Akkaoogle application in Java by following the Scala examples because both APIs look almost the same. First I’ll talk about the philosophy behind Akka so you understand the goal behind the Akka project and the problems it tries to solve.

12.1. The philosophy behind Akka

The philosophy behind Akka is simple: make it easier for developers to build correct, concurrent, scalable, and fault-tolerant applications. To that end, Akka provides a higher level of abstractions to deal with concurrency, scalability, and faults. Figure 12.1 shows the three core modules provided by Akka for concurrency, scalability, and fault tolerance.

Figure 12.1. Akka core modules

The concurrency module provides options to solve concurrency-related problems. By now I’m sure you’re comfortable with actors (message-oriented concurrency). But actors aren’t a be-all-end-all solution for concurrency. You need to understand alternative concurrency models available in Akka, and in the next section you’ll explore all of them. At the core, Akka is an event-based platform and relies on actors for message passing and scalability. Akka puts both local and remote actors at your disposal. Using local actors with routing (the ability to send work with multiple instances of an actor) you can scale up and you can use remote actors to help you scale out. We’ll look into this in more detail when you build a sample application at the end of the chapter.

12.2. Simple concurrency with Akka

To scale up your applications, use concurrency. In chapter 9 you learned that threads are a difficult and error-prone way to implement concurrency and should be the tool you choose last. The question then becomes what are your first, second, or third options? This section introduces you to those options and helps you decide which are appropriate. Table 12.1 describes all the concurrency techniques available in Akka. The good news is you can combine all these concurrency techniques, which is what most Akka developers end up doing.

Table 12.1. Concurrency alternatives available in Akka

Name

Description

Actors An actor is an object that processes messages asynchronously and encapsulates state. Actors implement message-passing concurrency. We explored actors in chapter 9.
Software transactional memory (STM) Software transactional memory is a concurrency model analogous to database transactions for controlling access to a shared state. It’s a better alternative to locks and provides composability.
Agents Agents provide abstraction over mutable data. They only allow you to mutate the data through an asynchronous write action.
Dataflow Dataflow concurrency is deterministic. This means that it behaves the same every time you execute it. So if your problem deadlocks the first time, it will always deadlock, helping you to debug the problem. Akka implements Oz-style[a] dataflow concurrency using Future.

a “Concurrency,” Oz documentation, www.mozart-oz.org/documentation/tutorial/node8.html.

These options provide the flexibility you need to design your concurrency applications correctly. For example, you can model an application using actors, handle mutable state with STM or agents, and use dataflow concurrency to compose multiple concurrent processes. The possibilities are endless.

Note

Akka no longer includes the STM module but instead supports Scala STM.[1]

1 Information on Scala STM, http://nbronson.github.com/scala-stm/.

Let’s begin the journey into the world of Akka concurrency—it will be a fun ride.

12.2.1. Remote actors

In chapter 9 we explored actors in detail. Actor programming is not restricted to only a single JVM, so actors can communicate with each other across multiple JVMs (figure 12.2). Akka remote actors allow you to deploy actors in remote machines and send messages back and forth transparently. Remote actors are a great way to make your application scalable and distributed. The messages are automatically serialized using the Google protocol buffer (http://code.google.com/p/protobuf/), and communication between the two nodes is handled using JBoss Netty (www.jboss.org/netty). Think of the Google protocol buffer as XML but smaller and faster, and Netty as a nonblocking I/O (NIO) implementation, which allows Akka to efficiently use threads for I/O operations.

Figure 12.2. Two actor systems running in two different nodes

Akka implements transparent remoting, where the remoteness of the actor is completely configured at deployment time. You can work with local actors while building the solution and configure remote details of each individual actor during deployment.

Note

In a future version of Akka, Netty will be replaced with an actor-based I/O library called Actor I/O.

Before we proceed further let’s add dependencies for remote actors. Akka is modular, so instead of pulling in the entire Akka library, you’re only depending on the Akka actors. You can find the complete build.sbt file in the accompanying codebase for this chapter.

resolvers ++= Seq(
  "Akka Repo" at "http://akka.io/repository",
  "Typesafe Repo" at "http://repo.typesafe.com/typesafe/repo"
)
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.1.0",
  "com.typesafe.akka" %% "akka-remote % "2.1.0"
)

The resolvers define the location of the dependencies, and the libraryDependencies add remote actors.

We will take the same word count example we built in chapter 9 and change the worker actor to implement it in Java and, instead of files, we will work with a list of URLs. The goal is to connect to the URL and count all the words on the page. To create an Akka actor in Java you have to extend the akka.actor.UntypedActor class and override the onReceive method:

import akka.actor.UntypedActor;
public class WordCountWorker extends UntypedActor {
    @Override
    public void onReceive(Object message) {
    }
}

The class is called UntypedActor because Akka includes the concept of a TypedActor. The typed actors implement the active object[2] pattern, which turns any POJO interface into an asynchronous API using actors.

2 “Active object,” http://en.wikipedia.org/wiki/Active_object.

Note

Akka typed actors are an implementation of the active object pattern. It turns synchronous method invocations into asynchronous dispatches. The one advantage using typed actors has over untyped actors is that you can have a static compile-type contract and you don’t have to define messages. Read more about Akka typed actors in the Akka documentation.

Because your WordCountWorker needs to handle the FileToCount message, you need to typecast the message received as a parameter to FileToCount:

if (message instanceof FileToCount) {
  FileToCount c = (FileToCount)message;
} else {
  throw new IllegalArgumentException("Unknown message: " + message);
}

The code is checking the type of message received using the instanceof operator, and if the message isn’t of type FileToCount, an exception is thrown. Because you want to write most, if not all, of your code in Scala, add the countWords method to the FileToCount case class that counts all the words in a resource, to which the URL points:

case class FileToCount(url:String) {
  def countWords = {
    Source.fromURL(new URL(url))
            .getLines.foldRight(0)(_.split(" ").size + _)
  }
}

The countWords method counts the number of words in a resource using the scala.io.Source class provided in the Scala library. From the WordCountWorker actor, you can invoke the countWords method to count words:

FileToCount c = (FileToCount)message;
Integer count = c.countWords();

To reply with a response to the sender, use the getSender().tell(...) method. The tell method allows actors to reply to the sender. To reply to the master, the worker actor needs to construct the WordCount message by passing the filename and the word count:

FileToCount c = (FileToCount)message;
Integer count = c.countWords();
getSender().tell(new WordCount(c.url(), count));

The getSelf method returns the actor reference to the current actor. The following listing shows the complete WordCountWorker actor in Java. Save this in the src/main/ java folder of your SBT project.

Listing 12.1. WordCountWorker Akka actor in Java

To take advantage of the remote actors, we run all the worker actors in a JVM separate from the master actor. To achieve that, let’s create two actor systems with different properties. The easiest way to configure Akka actors is by providing a configuration file in the classpath. You can find the details of all the configuration properties in the Akka documentation.[3] The following example defines two actor systems: the main actor system and the worker actor system:

3 Configuration version 2.0.3, http://mng.bz/vmsQ.

Separating the configuration by actor systems provides the flexibility to define settings for each actor system. The following elements add remoteness to your actor system:

  • Change the actor provider to akka.remote.RemoteActorRefProvider .
  • Add the host name of the machine in which the actor system will be running . Make sure this IP address is reachable.
  • The port number which the remote actor system should listen on.
  • Map the actor name to the actor system in which it will be deployed .

Now save the preceding configuration into the application.conf file under the src/ main/resources folder of the project. This will make the application.conf file available in the classpath. To make the workersystem run on a different JVM, run the following code in a different terminal:

This will start the "workersystem" running and listening for messages on port number 2560. Now let’s create a new actor that will tie all the pieces together. It will run on the main actor system:

Listing 12.2. MainActor running on a main actor system

Now if you start the WorkerSystem and the MainSystem in two different JVM instances you will have the workers running on one JVM and the main actor running on another. This opens up myriad possibilities to scale, because now you can distribute work to multiple machines.

12.2.2. Making mutable data safe with STM

Software transactional memory (STM) turns a Java heap into a transactional dataset. STM is similar to database transactions, but is used for memory instead. Because memory isn’t durable with STM, you only get the first three properties of ACID (atomicity, consistency, isolation, durability):

  • AtomicityThis property states that all modifications should follow the “all or nothing” rule. In STM, all the modification is done through an atomic transaction, and if one change fails all the other changes are rolled back.
  • ConsistencyThis property ensures that an STM transaction takes the system from one consistent state to another. If you want to delete one element from a Map and insert into another Map, then at the end of the STM transaction both Maps will be modified appropriately.
  • IsolationThis property requires that no other STM transaction sees partial changes from other transactions.

The best part of STM is freedom from locks. It rolls back from exceptions and is composable. You can also take two smaller STM operations and combine them to create bigger STM operations. Before I show you STM examples, let’s step back in order to understand what state is and how it’s represented in STM.

How State is Defined in STM

Let’s look at how state is handled in imperative programming. Figure 12.3 shows how state is handled in an imperative world. You directly access the data in memory and mutate it. In the figure, an object, A, is directly accessing the data represented by B and C. The problem with this approach is that it doesn’t work in the concurrent world. What will happen when some other thread or process tries to access the data residing in B or C when A is trying to mutate that data? The result is unexpected behavior.

Figure 12.3. State represented in imperative programming

To solve the problem with this approach, STM defines mutable state differently. In STM, state is defined as the value that an entity with a specific identity has at a particular point. A value is something that doesn’t change (it’s immutable). And identity is a stable reference to a value at a given point in time. Figure 12.4 shows how the previous structure would be represented in STM. The mutable part here is the identity, which gets associated with a series of values. And STM makes the mutation of reference from one value to another atomic. What will happen in this case when some other thread or process tries to access the data residing in B or C when A is trying to mutate it? You’ll see the value associated with B or C, because STM transactions are isolated and no partial change is visible outside the transaction.

Figure 12.4. State represented in STM

This idea of defining state in terms of identities and values is derived from the programming language Clojure (http://clojure.org). Now let’s see how STM works in Akka through examples.

Handling Mutable Data in Akka Using STM

Akka uses the Scala STM library for its STM. To add the library to your SBT project use the following snippet:

resolvers += ("Typesafe Repository" at "http://repo.typesafe.com/typesafe/
     releases/")

libraryDependencies ++= Seq(
  "org.scala-stm" %% "scala-stm" % "0.7",
  "org.specs2" %% "specs2" % "1.13" % "test"
)

To demonstrate how STM works let’s take a simple example in which you create atomic operations for deleting and inserting elements into an immutable Map. To manage this mutability, wrap the value (in this case, the immutable HashMap) in the scala.concurrent.stm.Ref object as follows:

val ref1 = Ref(HashMap[String, Any](
     "service1" -> "10",
     "service2" -> "20",
     "service3" -> null))
val ref2 = Ref(HashMap[String, Int]())

Refs are nothing but mutable references to values that you can share safely with multiple concurrent participants. The preceding snippet creates two refs pointing to the immutable HashMap. To perform any operation on Ref you have to use the atomic method defined in the STM package by passing an in-transaction parameter. The Scala STM library creates the transaction object and grants the caller permission to perform transactional reads and writes. Any refs you change in the closure will be done in an STM transaction. For example, in the following code you’re trying to add a new element to the Map managed by ref2:

def atomicInsert(key: String, value: Int) = atomic { implicit txn =>
   val oldMap = ref2.get
   val newMap = oldMap + ( key -> value)
   ref2.swap(newMap)
}

By invoking ref2.get you’re getting the value currently associated with the Ref and using swap to replace the old value with the new value. If the operation fails, the changes will be rolled back. The transaction parameter is marked as implicit so you don’t have to pass it around.

To implement atomic deletion of key from ref1, you can use the transform method defined in Ref. The transform method allows you to transform the value referenced by Ref by applying the given function:

def atomicDelete(key: String): Option[Any] = atomic {
    val oldMap = ref1.get
    val value = oldMap.get(key)
    ref1.transform(_ - key)
    value
}

The atomicDelete function returns the value that’s deleted. Why return the old value from the function? I have a plan to use it later, so hang in there.

I keep talking about the composability of STM, but haven’t yet shown you an example. Your wait is over. Imagine you have to build an atomic swap function that moves one item from one Map to another. With STM, it’s easy: all you have to do is wrap both the atomicDelete and atomicInsert functions in an atomic function, as in the following:

def atomicSwap(key: String) = atomic { implicit txn =>
    val value: Option[Any] = atomicDelete(key)
    atomicInsert(key, Integer.parseInt(value.get.toString))
}

Because ref2 only holds an Int type value, you have to parse it to Int before insertion. To fully understand the beauty of the swap function, look at the following specification:

"Atomic operations in composition" should {
    "rollback on exception" in {
      swap("service3")
      ref1.single().contains("service3") must beEqualTo(true)
      ref2.single().contains("service3") must beEqualTo(false)
    }
}

The single method of STM lets you access the contents of Ref without requiring a transaction. When you try to swap "service3" (which maps to a null value), the Integer.parseInt will throw an exception. At that point the delete is already successful, but thanks to STM it will roll back the entire transaction. Can your locks do that? No.

STM is a great way to build smaller atomic operations and large ones by composition, similar to how functions are composed in functional programming. To learn more about STM, consult the Scala STM documentation.[4] Let’s move our attention to another concurrency abstraction called Agent.

4 Scala STM Expert Group, http://nbronson.github.com/scala-stm/index.html.

12.2.3. Agents

Agents provide asynchronous changes to any individual storage location bound to it. An agent only lets you mutate the location by applying an action. Actions in this case are functions that asynchronously are applied to the state of Agent and in which the return value of the function becomes the new value of Agent. However, reading a value from Agent is synchronous and instantaneous. The difference between Ref and Agent is that Ref is a synchronous read and write; Agent is reactive. To apply any action asynchronously, Akka provides two methods: send and sendOff. The send method uses the reactive thread pool allocated for agents, and sendOff uses a dedicated thread, ideal for a long-running processes. Here’s an example of Agent associated with a file writer that logs messages to the log file through send actions:

Agent will be running until you invoke the close method . An actor system is created for the agent because, behind the scenes, agents are implemented using actors. If you have to do more than logging to a file, something that will take time, use the sendOff method:

a.sendOff { someLongRunningProcess }

Note that at any time, only one send action is invoked. Even if actions are sent from multiple concurrent processes, the actions will be executed in sequential order. Note that actions might get interleaved between multiple threads.

To use Agents in your project you must add the following to your SBT libraryDependencies:

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.1.0",
  "com.typesafe.akka" %% "akka-agent" % "2.1.0",
  "org.specs2" %% "specs2" % "1.13" % "test"
)

Agents also participant in STM transactions when used in the atomic block and messages are held until the transaction is completed. This is important because if you have a side-effect action, like logging to a file, you don’t want to do that with STM. Why? Because if STM transactions fail, they retry automatically, meaning your side-effecting operation is executed multiple times. This might not be what you want, so combining agents with STM is a great pattern to execute side-effecting actions along with STM transactions. Sometimes the asynchronous nature of Agent confuses people into thinking that agents are similar to actors, but they’re completely different in the way you design them. Agent is associated with data, and you send behavior to Agent from outside, in the form of a function. In the case of actors, the behavior is defined inside the actor, and you send data in the form of a message.

You’ll revisit agents when using Akkaoogle to log transactions, but now let’s continue with our next concurrency model: dataflow. Dataflow is a great way to encapsulate concurrency from a program. It can be read sequentially.

12.2.4. Dataflow

Dataflow concurrency is a deterministic concurrency model. If you run it and it works, it will always work without deadlock. Alternatively, if it deadlocks the first time, it will always deadlock. This is a powerful guarantee to have in a concurrent application because you can easily understand the code. The dataflow concurrency allows you to write sequential code that performs parallel operations. The limitation is that your code should be completely side-effect free. You can’t have deterministic behavior if your code is performing side-effecting operations.

Dataflow is implemented in Akka using Scala’s delimited continuations compiler plug-in. To enable the plug-in within your SBT project, add the following lines to the build.sbt file:

autoCompilerPlugins := true

libraryDependencies <+= scalaVersion { v => compilerPlugin(
     "org.scala-lang.plugins" % "continuations" % v) }

scalacOptions += "-P:continuations:enable"

libraryDependencies += "com.typesafe.akka."  %% " akka-dataflow"  %  "2.1.0"

To work with dataflow concurrency, you have to work with dataflow variables. A dataflow variable is like a single-assignment variable. Once the value is bound, it won’t change, and any subsequent attempt to bind a new value will be ignored. The following example defines a dataflow variable:

val messageFromFuture = Promise[String]()

Here Akka Promise is used to create a dataflow variable. A Promise is a read handle to a value that will be available at some point in the future. Any dataflow operation is performed in the Future.flow block:

Future.flow {
     messageFromFuture()
}

The preceding call will wait in a thread unless a value is bound to messageFromFuture. Future.flow returns a Future so you can perform other operations without blocking the main thread of execution. Think of a Future as a data structure to retrieve the result of some concurrent operation. To assign a value to a dataflow variable, use the << method as in the following:

Future.flow {
   messsageFromFuture << "Future looks very cool"
}

Once a value is bound to a dataflow variable, all the Futures that are waiting on the value will be unblocked and able to continue with execution. The following listing shows a complete example of using the dataflow variable.

Listing 12.3. Complete dataflow concurrency example
import akka.actor._
import akka.dispatch._
import Future.flow

object Main extends App {
     implicit val system = ActorSystem("dataflow")
     val messageFromFuture, rawMessage, parsedMessage = Promise[String]()
     flow {
          messageFromFuture << parsedMessage()
          println("z = " + messageFromFuture())
     }
     flow { rawMessage << "olleh" }
     flow { parsedMessage << toPresentFormat(rawMessage()) }


     def toPresentFormat (s: String) = s.reverse
}

The next section dives into building an application using some of the Akka concepts you have learned so that you can see practical use cases of Akka concurrency.

12.3. Building a real-time pricing system: Akkaoogle

You’ve covered a lot of ground so far in this chapter, you’ve learned new concepts, and you’ve seen several examples. But now it’s time to see how these concepts are applied in a large application. In this section you’ll build a large web-based product search site called Akkaoogle (see figure 12.4). It will be similar to Google’s product search application (www.google.com/products) except that, instead of returning all products matching your criteria, your application will only return the cheapest deal found on the web.

Figure 12.4. Homepage of the Akkaoogle application

So how does this application work? It gets the product price from two types of vendors that are offering the product. You can pay money to Akkaoogle and become an internal vendor. In this case, the product information is stored in Akkaoogle, and you pay a small service charge. You can also sign up as external vendor, in which case Akkaoogle makes a RESTful web service call to fetch the price—but the downside is you pay a bigger service charge. When the user is looking for the cheapest deal, Akkaoogle checks with all the vendors (internal and external) and finds the lowest price for the user. Because this all has to be done in real time and you want your Akkaoogle users to have a good experience, you have to find the cheapest deal in no more than 200 to 300 milliseconds. That’s a challenge if you have to implement it using primitive concurrency constructs. But with Akka? Let’s see how you could implement this.

12.3.1. The high-level architecture of Akkaoogle

Figure 12.5 shows you the high-level view of how Akkaoogle will be implemented. At first glance it may look complicated, but don’t worry—you’re going to build it in small pieces. Keep this figure in mind when building the application so you know where you’re heading.

Figure 12.5. Akkaoogle architecture

Here are the descriptions for each important component in figure 12.5 that you’ll now start building:

  • Request handlerThis is an actor that handles HTTP requests from the user. You’ll use an asynchronous HTTP library called Mist, provided by Akka, to implement this actor.
  • Search cheapest productThis is the main entry point to execute a search to find the cheapest deal. This actor will search both internal and external vendors.
  • Internal load balancerThis is a load-balancing actor that sends messages to worker actors to find the cheapest product available in the internal database.
  • External load balancerThis actor invokes all the external vendor services and finds the cheapest price among them.
  • Find product price and find vendor priceThe worker actors do the work of finding the price.
  • MonitorA simple monitor actor logs the failures that happen in external vendor services.
  • Data loaderAn actor that loads data to the database. This could be used to load product data for internal vendors.

You’re also going to build a supervisor hierarchy to handle failures. You don’t want Akkaoogle going down because you’ll lose new business and money. Let’s begin the fun by setting up the product with all the dependencies you need.

12.3.2. Setting up the project for Akkaoogle

Create an SBT project as shown in figure 12.6. Use the Build.scala file as your definition file and build.properties to configure the version of SBT. This project uses SBT 0.11.2, but you can easily upgrade to a newer version by modifying the build.properties file.

Figure 12.6. Akkaoogle project structure

The first thing to configure is the database, and for that you’ll use our old friends Squeryl (http://squeryl.org) and H2 (www.h2database.com). You used them extensively in chapter 7. To make your life easier, use the custom tasks in the following listing to start and stop the H2 database.

Listing 12.4. H2 start and stop actions
object H2TaskManager {
  var process: Option[Process] = None
  lazy val H2 = config("h2") extend(Compile)

  val startH2 = TaskKey[Unit]("start", "Starts H2 database")
  val startH2Task =
     startH2 in H2 <<= (fullClasspath in Compile) map { cp =>
     startDatabase(cp.map(_.data).map(_.getAbsolutePath()).filter(_.contains(
     "h2database")))}

      def startDatabase(paths: Seq[String]) = {
        process match {
          case None =>
            val cp = paths.mkString(System.getProperty("path.seperator"))
            val command = "java -cp " + cp + " org.h2.tools.Server"
            println("Starting Database with command: " + command)
            process = Some(Process(command).run())
            println("Database started ! ")
          case Some(_) =>
            println("H2 Database already started")
        }
      }

  val stopH2 = TaskKey[Unit]("stop", "Stops H2 database")
  val stopH2Task = stopH2 in H2 :={
    process match {
      case None => println("Database already stopped")
      case Some(_) =>
        println("Stopping database...")
        process.foreach{_.destroy()}
        process = None
        println("Database stopped...")
    }
  }
}

The detailed explanation of how these tasks are implemented is in chapter 7, but in a nutshell, the h2-start and h2-stop tasks allow you to start and stop the H2 in-memory database from the SBT command prompt. The following listing shows the complete project definition file with all the dependencies you need for the Akkaoogle project.

Listing 12.5. Akkaoogle build.scala file

The build.scala file declares all the dependences and settings you need to start working on the Akkaoogle project. Once all the dependencies are downloaded you’re ready to implement the Akkaoogle application. The next section builds the domain models you need to implement the internal vendor service. I test drove most of the application, but I won’t show you test cases here. I encourage you to go through the test cases in this chapter’s accompanying codebase. Even better, write tests for the code used throughout the rest of the chapter.

12.3.3. Implementing the domain models

You need a way to implement the products provided by internal vendors and also a model that represents external vendors. To reduce duplication between domain models, create a common trait called Model that extends the KeyedEntity trait provided by Squeryl. This trait provides an id field that acts as a primary key in the database for all the domain models:

trait Model[A] extends KeyedEntity[Long] { this: A =>
   val id: Long = 0
}

Here the Model also declares that it will get mixed in with a type represented by A. You’re declaring a self type here because later on it will let you add generic methods that work on all the model objects (more about this later). Now you can create model classes that represent products and external vendors by extending the Model trait:

class Product(val description: String,
                val vendorName: String,
                val basePrice: Double,
                val plusPercent: Double)
    extends Model[Product]

class ExternalVendor(val name: String, val url: String)
     extends Model[ExternalVendor]

You’re keeping these models simple so you can focus more on the Akka-related features you’ll implement. The url property of the external vendor specifies the url you’ll use to invoke the remote RESTful service. Add a method to the Product class to calculate the price using the basePrice and the plusPercentage fields:

def calculatePrice = basePrice + (basePrice * plusPercent / 100)

You’ll need this method to determine the price of products offered by internal vendors. Because Akkaoogle cares about quality, you need to track the service availability of all the external vendors so you can rate them quarterly. You’ll log (to the database) every time a call to an external vendor service fails and you’ll need a domain model to represent it:

class TransactionFailure(val vendorId: String,
                       val message: String,
                       val timestamp: Date)
     extends Model[TransactionFailure]

The following listing creates an Akkaoogle schema object that will create the necessary tables in the database and provide helper methods to work with domain models.

Listing 12.6. Akkaoogle schema

The listing defines the schema you’ll use for Akkaoogle. The schema defines three tables of products , vendors , and transactionFailures . The init method makes the database connection (in this case to the H2 database) and is used by the tx method to make sure the application is connected before initiating the transaction.

To fetch values back from the database, you can define finder methods in the companion objects of the domain models:

The TransactionFailure, Product, and ExternalVendor companion objects define finder methods to help fetch the values from the database. TransactionFailure.findAll returns all the transaction failures stored in the database (for large database tables, you should define finders that take some criteria to filter data). The findByDescription returns the first matching product from the database, and the ExternalVendor.findAll returns all the vendors that are registered with Akkaoogle.

Using the same technique you used in chapter 7, you’ll add a save method to each domain model. The save method for all the domain objects is almost identical, except for the Squeryl table object. Create a generalized version of the save method that works with all the domain models that extend the Model trait:

trait Model[A] extends KeyedEntity[Long] { this: A =>
    val id: Long = 0
    def save(implicit table: Table[A]): Either[Throwable, String] = {
      tx {
        try {
          table.insert(this)
          Right("Domain object is saved successfully")
        } catch {
          case exception => Left(exception)
        }
      }
    }
  }

The save method takes an implicit parameter that’s of type org.squeryl.Table, which can handle type A. For example, when the Model is an instance of Product, then A is going to be of type Product, and Table[A] is going to be a table that knows how to save Product, which is the products property defined in the AkkaoogleSchema object. Because the parameter is defined as an implicit parameter, the compiler will now look for a value that matches the parameter type within the scope. This way, the caller won’t have to specify the parameter when saving an instance of a domain model. The following listing shows the complete models package.

Listing 12.7. Complete models package of Akkaoogle

The models package defines all the classes you need to make Akkaoogle work with the database. It defines a parameterized version of save that works with all the table objects created inside the package. To make save work for all these table types, you have defined implicit values ( ) for each table so that the Scala compiler picks the appropriate one when saving your domain objects.

It’s time to move to the core of the application: implementing the price lookup actors for both the internal and external vendors.

12.3.4. Implementing the core with actors

The core of Akkaoogle is to find the cheapest deal on the web and track the availability of the external services for quality purposes. To support these functionalities, the Akkaoogle application needs to handle the message types shown in the following listing.

Listing 12.8. List of message types supported by Akkaoogle (messages.scala)
package com.akkaoogle.calculators

object messages {
  case class FindPrice(productDescription: String, quantity: Int)
  case class LowestPrice(vendorName: String,
                         productDescription: String,
                         price: Double)

  case class LogTimeout(actorId: String, msg: String)

  case class FindStats(actorId: String)
  case class Stats(actorId: String, timeouts: Int)
}

The FindPrice message type represents a request triggered by a user looking for the cheapest deal. Needless to say, it’s the most important message type in the application. The response of the FindPrice message is represented by the LowestPrice message, and it contains all the information the user needs about the cheapest deal. Akkaoogle internally uses the rest of the message types to track the availability of external services. Every time an external service times out, the LogTimeout message is sent to an actor to log the details. The FindStats and Stats messages are used for administration purposes.

First, implement a way to find the cheapest price for the products offered by the internal providers. Remember: products offered by internal providers are stored in a database. The InternalPriceCalculator actor calculates the lowest price by looking up the product by description, shown in the following listing.

Listing 12.9. Actor that calculates lowest price for internal products
package com.akkaoogle.calculators

import messages._
import com.akkaoogle.db.models._
import akka.actor._

class InternalPriceCalculator extends Actor {
  def receive = {
    case FindPrice(productDescription, quantity) =>
      val price = calculatePrice(productDescription, quantity)
      sender ! price
  }
 
 def calculatePrice( productDescription: String, qty: Int):
     Option[LowestPrice] = {
    Product.findByDescription(productDescription) map { product =>
      Some(LowestPrice(product.vendorName,
        product.description,
        product.calculatePrice * qty))
    } getOrElse Option.empty[LowestPrice]
  }
}

This actor is simple. When it receives the FindPrice message, it uses the product description provided to find the product in the database and calculate the price using the calculatePrice method, as defined in the Product domain class. At the end it returns the LowestPrice response. The current implementation only considers the first matching product, which in some cases isn’t the right behavior. It should match all the products with a description (even a partial description) and derive the lowest price. I leave it to you to make the necessary changes to the calculation. For now, let’s move on to the external providers.

Because you can have many external vendors for your application, you can’t make the remote service calls sequentially because it would increase the response time and affect your users. You have to invoke the service calls in parallel and find a way to set a timeout for each service so you can respond to the user within a reasonable time. Actors are a nice and easy way to achieve parallelism. You’ll next create an actor for each external vendor and broadcast the FindPrice message to these actors. These actors will act as a proxy client to the remote service. The following listing shows how the proxy actor for each external vendor is implemented.

Listing 12.10. The external vendor proxy actor

Source.fromURL makes the REST call to the vendor web service . Because the web service might take some time to respond, it’s wrapped in a Future. A Future is a data structure to retrieve the result of some concurrent operation without blocking. Typically, some other actor does the work and completes the Future so you aren’t blocked. Future is a great and easy way to execute code asynchronously. Akka Future can also monadically (using map and flatMap) compose with other Futures.

The reference of the Future is then piped to a sender . This is a common pattern in Akka, called the pipeTo pattern. Instead of waiting for the Future to complete and send the result to the sender, you’re piping Future to the sender. pipeTo adds an onComplete callback to the Future so that when it completes, its output can be sent to the sender. If the Future fails with an exception, the recover method returns the empty option. The Future construct in Akka is powerful. Make sure you explore it in the Akka documentation( http://mng.bz/wc7D).

You need the actor in the following listing to broadcast the FindPrice message to each proxy actor and, at the end, find the lowest price out of all the responses.

Listing 12.11. External vendor lowest price calculator

The ExternalPriceCalculator actor is created with references to ExternalVendorProxyActor. The FindPrice message is broadcast to all the proxy actors using the ? method (ask pattern). The ask method implements the Request Reply pattern. But instead of waiting for the reply, it returns a Future. This way you’re even-handed in terms of dispatching messages to each proxy so that all the external vendors get their fair share of time to respond with prices. The findLowestPrice method at line determines the lowest price out of all the responses, and here’s how it’s implemented:

The findLowestPrice method uses the fold operation over all the Futures to find the lowest price. The beauty of this fold is it’s performed in a nonblocking fashion. This is an important criterion—otherwise, the entire operation would take more time. The fold method creates a new Future that wraps the entire operation and is performed on the thread of the Future that completes last. If any of the Futures throws an exception, the result of the fold becomes that exception.

So far, you’ve implemented actors to get both the internal and external lowest price. The following listing shows one more actor you need; it can find the lowest price from both internal and external vendors and return the result.

Listing 12.12. Cheapest deal finder actor

The actor uses AkkaoogleActorServer (you’ll implement this in section 12.3.6) to look up the "internal-load-balancer" actor in order to calculate the internal price. This actor acts as a router for InternalPriceCalculator (you’ll learn about Akka routers in the next section). Similarly, "external-load-balancer" finds the router actor for the ExternalPriceCalculator actor. The CheapestDealFinder actor finds the lowest price from both internal and external vendors and then finds the cheapest price among them.

You use the router actors to increase the scalability of the application because they help in routing messages to multiple instances of actors, based on an algorithm. The next section discusses routers and dispatchers, two of the neat features of Akka.

12.3.5. Increase scalability with remote actors, dispatchers, and routers

I haven’t discussed Akka dispatchers and message routing on purpose, because to truly understand them you need a context where they’re valuable—like our current example. What will happen if you deploy the current application in production?

Setting up Message Routing

For one user at a time, the current setup would work fine, but with multiple concurrent users it won’t scale. When the CheapestDealFinder actor is processing a message, other messages are waiting in the mailbox for processing. In some cases you may want that behavior, but in this case you can’t do that. If you could create multiple instances of the CheapestDealFinder actor, you could process messages in parallel. Then you’d have to route messages to these actors effectively so you don’t overload some actors. But how will the caller know which actor instance has the fewest messages to process? The good news is Akka comes with special kinds of actors called routers, which can effectively route messages between multiple instances of actors. The router actor acts as a gateway to a collection of actors. You send a message to the router actor, and the router actor forwards the message to one of the actors, based on some routing policy. For example, the SmallestMailboxRouter router routes messages based on the mailbox. The actor with the least number of messages in the mailbox wins. The following code snippet creates 10 instances of CheapestDealFinder actors and creates a SmallestMailboxRouter to route messages to them:

val cheapestDealFinderLoadBalancer = system.actorOf(
  Props[CheapestDealFinder]
      .withRouter(SmallestMailboxRouter(nrOfInstances = 10)),
        name = "cheapest-deal-finder-balancer")

Here the CheapestDealFinder actor is created with SmallestMailboxRouter by passing the number of instances that this router will manage. Note that the router will automatically create CheapestDealFinder actors. To create your own routing logic, you need to extend RouterConfig.

Similarly, the following code example creates routers for both InternalPriceCalculator and ExternalPriceCalculator using RoundRobinRouter:

val internalPriceCalculators: List[ActorRef] =
    createInternalPriceCalculators(10)

val internalLoadBalancer = system.actorOf(
        Props[InternalPriceCalculator]
    .withRouter(RoundRobinRouter (routees = internalPriceCalculators)),
     name = "internal-load-balancer")

val proxies = createExternalProxyActors(ExternalVendor.findAll)

val externalPriceCalculators: List[ActorRef] =
    createExternalPriceCalculators(10, proxies)

val externalLoadBalancer = system.actorOf(
        Props [ExternalPriceCalculator]
    .withRouter(RoundRobinRouter(routees = externalPriceCalculators)),
     name="external-load-balancer")

RoundRobinRouter routes messages to actors in round-robin fashion. Instead of allowing the router to create the actor instances, the instances are passed as a parameter (they are called routees). The reason is that you want to specify additional parameters to customize them further (discussed in the next section).

These routers let you scale and handle multiple users at a time. But what about performance? You still need the underlying threads to run all the event-based actors you’ve created. The next section explores how to customize Akka to allocate dedicated threads for each actor type.

Improve Performance with Dispatchers

Every actor system has a default dispatcher that’s used if nothing is configured. In Akka, message dispatchers are the engine behind the actors that makes Actor run. Think of a dispatcher as a service with a thread pool that knows how to execute actors when a message is received. In most scenarios, the default settings work best. In fact, when you’re building an Akka application, I recommend starting with that—don’t create a special configuration.

But if you notice some contention on a single dispatcher, you can start creating dedicated dispatchers for a group of actors. Assume you’re in that situation. You notice that your InternalPriceCalculatorActor and ExternalVendorProxyActor actors aren’t getting executed as quickly as you want, and it’s because of the contention in the default dispatcher. Remember, all the actors are created from the same actor system. You can easily configure the default dispatcher by adding more threads to it, but for learning purposes you’re going to use different dispatchers. Akka comes with four types of message dispatchers:

  • DispatcherThe default dispatcher used by the actor system. It’s an event-based dispatcher that binds actors to a thread pool. It creates one mailbox per actor.
  • PinnedDispatcherDedicates one thread per actor. It’s like creating thread-based actors.
  • BalancingDispatcherThis event-driven dispatcher redistributes work from busy actors to idle actors. All the actors of the same type share one mailbox.
  • CallingThreadDispatcherIt runs the actor on the calling thread. It doesn’t create any new thread. Great for unit testing purposes.

Using dispatchers in Akka is a simple two-step process: first, specify them in the configuration file, then set up the actor with the dispatcher. The following configuration snippet declares the dispatcher you’ll use for the ExternalPriceCalculator actor:

akkaoogle {
  dispatchers {
     external-price-calculator-actor-dispatcher {
       # Dispatcher is the name of the event-based dispatcher
       type = Dispatcher
       # What kind of ExecutionService to use
       executor = "fork-join-executor"
       # Configuration for the fork-join pool
       fork-join-executor {
          # Min number of threads to cap factor-based parallelism number to
          parallelism-min = 2
          # Parallelism (threads) ... ceil(available processors * factor)
          parallelism-factor = 2.0
          # Max number of threads to cap factor-based parallelism number to
          parallelism-max = 100
       }
       # Throughput defines the maximum number of messages to be
       # processed per actor before the thread jumps to the next actor.
       # Set to 1 for as fair as possible.
       throughput = 100
     }
   }
}

The external-price-calculator-actor-dispatcher uses a Dispatcher (the default event-based dispatcher) with a fork-join thread pool. The fork-join thread pool is configured with additional properties. Akka dispatchers are configurable (read the Akka documentation for details). Similarly, the following dispatcher could be used for the InternalPriceCalculator actor:

akkaoogle {
  dispatchers {
    ...
    ...
    internal-price-calculator-actor-dispatcher {
       # Dispatcher is the name of the event-based dispatcher
       type = BalancingDispatcher
       # What kind of ExecutionService to use
       executor = "thread-pool-executor"
       thread-pool-executor {
         # Min number of threads to cap factor-based core number to
         core-pool-size-min = 5
       }
     }
     ...
  }
}

This time the internal-price-calculator-actor-dispatcher uses BalancingDispatcher with the thread pool executor, with the minimum number of threads set to 5. In the real world, you should do performance testing before choosing a configuration that works for everybody.

To use these dispatchers you will use the withDispatcher method of Props, as in the following:

private def createInternalPriceCalculators(initialLoad: Int)(
  implicit system: ActorSystem) = {
  (for (i <- 0 until initialLoad) yield
      system.actorOf(Props[InternalPriceCalculator]
         .withDispatcher
         ("dispatchers.internal-price-calculator-actor-dispatcher"),
      name=("internal-price-calculator" + i))).toList
  }
  private def createExternalPriceCalculators(initialLoad: Int,
    proxies: List[ActorRef])(implicit system: ActorSystem) = {
    (for (i <- 0 until initialLoad) yield system.actorOf(
       Props(new ExternalPriceCalculator(proxies))
         .withDispatcher(
           "dispatchers.external-price-calculator-actor-dispatcher"),
      name = ("external-price-calculator" + i))).toList
  }

The createInternalPriceCalculators method creates all the InternalPriceCalculator actors and configures the dispatchers.internal-price-calculator-actor-dispatcher. Now these actors will no longer use the default dispatcher that comes with ActorSystem but rather the one that’s configured. Similarly, createExternalPriceCalculators configures the ExternalPriceCalculator actors. The following listing shows the completed AkkaoogleActorServer which creates and configures all the actors used in the Akkaoogle application.

Listing 12.13. Akkaoogle actor server

Where do the remote actors fit in? Making an actor remote is a matter of changing the configuration at deployment time. There’s no special API for remote actors. This lets you scale further by deploying them into multiple remote machines, if required. Refer to the codebase for this chapter for an example. So far, you’ve implemented the FindPrice message. The next section implements the LogTimeout message using Agents.

12.3.6. Handling shared resources with Agent

To build the monitoring piece for the Akkaoogle application, you have to rely on a shared mutable state, and this section shows you how to put Agent to use.

The monitor actor needs to log any transaction failure with external vendors. You can always extend its functionality for internal use, but for now it needs to handle the following two message types:

case class LogTimeout(actorId: String, msg: String)
case class FindStats(actorId: String)

On receiving a LogTimeout message, it needs to save the transaction failure information to the database and also keep track of the number of times a particular service failed. Agent fits the requirement well. It can store shared data and, if required, participate in the STM transaction.

To keep things simple you’ll use the Map provided by Akka. The side effect that’s saving information to the database can’t be done safely within an STM transaction, because an STM transaction could retry the operations in a transaction multiple times if there’s any read/write inconsistency. If you try to save information to the database within the transaction, it may get saved more than once. If you use Agent, it can participate in the STM transaction and get executed only when the STM transaction completes successfully. Here’s how to increment the failure counter and save the information in the database in an atomic operation:

 val errorLogger = Agent(Map.empty[String, Int])
...
...
 private def logTimeout(actorId: String, msg: String): Unit = {
   errorLogger send { errorLog =>
      val current = errorLog.getOrElse(actorId, 0)
      val newErrorLog =  errorLog + (actorId -> (current + 1))
      val l = new TransactionFailure(actorId, msg,
         new Date(System.currentTimeMillis))
      l.save
      newErrorLog
   }
 }

The logTimeout method first gets the actorId and the message that needs to be logged in the database. The send method of Agent takes a function that increments the failure counts and saves the message into the database. With this setup, implementing the FindStats message is easy:

case FindStats(actorId) =>
  val timeouts = errorLogger().getOrElse(actorId, 0)
  sender ! Stats(actorId, timeouts = timeouts)

Get the latest count from the map and return it. In the real world, you’ll monitor other information, but as of now you’re done with the monitor actor. The following is the complete code.

Listing 12.14. Complete monitor actor

MonitorActor checks the health of external vendor services and provides stats. The preRestart is a callback method defined by the Akka actor trait, which is invoked when the actor is about to be restarted. In the preRestart you’re clearing up the log count but ideally you may want to save the existing error count in some persistence storage so you can fetch the errors for later use. Now let’s hook all these actors with a simple UI.

12.4. Adding asynchronous HTTP support with Play2-mini

Play2-mini is a lightweight REST framework on top of the Play2 framework. It maps an HTTP request to a function that takes an HTTP request and returns a response. Behind the scenes, all the requests are handled using actors. It also provides support for nonblocking, asynchronous HTTP support of the Play2 framework. Behind the scenes, the Play2 framework uses the Netty server that implements the Java NIO API. The next section sets up the Akkaoogle project to use the Play2-mini framework.

12.4.1. Setting up Play2-mini

To make your Akkaoogle project aware of the Play2-mini project, you need to add the necessary libraryDependencies to the SBT build definition. Because you’re also going to use the Netty server built into Play2-mini as your web server, you’ll configure play.core.server.NettyServer as the main entry point. To run the Akkaoogle application, all you have to do is enter "sbt run". The following trait captures the necessary settings to convert the Akkaoogle project to Play2-mini:

trait ConfigureScalaBuild {
  lazy val typesafe = "Typesafe Repository" at "http://repo.typesafe.com/
     typesafe/releases/"

  lazy val typesafeSnapshot = "Typesafe Snapshots Repository" at "http://
     repo.typesafe.com/typesafe/snapshots/"

  val netty = Some("play.core.server.NettyServer")

  def scalaMiniProject(org: String, name: String, buildVersion: String,
     baseFile: java.io.File = file(".")) = Project(id = name, base =
     baseFile, settings = Project.defaultSettings).settings(
    version := buildVersion,
    organization := org,
    resolvers += typesafe,
    resolvers += typesafeSnapshot,
    libraryDependencies += "com.typesafe" %% "play-mini" % "2.1=RC2",
    mainClass in (Compile, run) := netty,
    ivyXML := <dependencies> <exclude org="org.springframework"/>
</dependencies>
  )
}

The scalaMiniProject method creates an SBT project with all the Play2-mini dependencies. For Akkaoogle, you’ll mix in this trait and use the scalaMiniProject method to create the project, as in the following listing.

Listing 12.15. Akkaoogle SBT project with Play2-mini
import sbt._
import Keys._

object AkkaoogleBuild extends Build with ConfigureScalaBuild {

  import H2TaskManager._
  lazy val root = scalaMiniProject("com.akkaoogle","akkaoogle","1.0")
   .settings(startH2Task, stopH2Task)
   .settings(
       organization := "scalainaction",
       scalaVersion := "2.10.0",
       scalacOptions ++= Seq("-unchecked", "-deprecation"),
       resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/
     repo",
       parallelExecution in Test := false
     )
   .settings(
          libraryDependencies ++= Seq(

          "com.typesafe.akka" %% "akka-actor" % "2.1.0",
          "com.typesafe.akka" %% "akka-remote" % "2.1.0",
          "com.typesafe.akka" %% "akka-agent" % "2.1.0",
          "com.h2database" % "h2" % "1.2.127",
          "org.squeryl" % "squery1_2.10-RC5" % "0.9.5-5",
          "org.specs2" %% "specs2" % "1.13" % "test",
       "org.eclipse.jetty" % "jetty-distribution" % "8.0.0.M2" % "test"
      ))
}

After you save and reload the build definition, you should have everything you need to give a UI look to your application. In the next section, you’ll build your first Play2-mini action, which can take the HTTP request and send messages to the actors.

12.4.2. Running with Play2-mini

When you start a Play2-mini–based application, the first thing it does is look for an implementation of com.typesafe.play.mini.Setup. Every Play2-mini–based application needs to implement this class:

package com.typesafe.play.mini
 class Setup(a: Application) extends GlobalSettings {
...
 }

This class takes an instance of Application as a parameter. Think of Application as a controller of the MVC model that handles all the requests. In the case of Play2-mini, the only abstract method you have to implement is the routes method:

package com.typesafe.play.mini
trait Application {
  def route: PartialFunction[RequestHeader, Handler]
}

For your application to work with Play2-mini, you need to implement the Setup by passing a concrete implementation of the Application trait. Here’s the implementation of the Setup:

import com.akkaoogle.infrastructure._
import org.h2.tools.Server
import com.akkaoogle.db.AkkaoogleSchema._


object Global extends com.typesafe.play.mini.Setup(com.akkaoogle.http.App)
{
  println("initializing the Akkaoogle schema")
  createSchema()
  AkkaoogleActorServer.run()
}

The Global object extends the Setup class by passing com.akkaoogle.http.App as an implementation of the Application trait. com.akkaoogle.http.App will handle all the HTTP requests for the Akkaoogle application. Global is also a great place to initialize the various parts of the system. In this case, you’re creating the schema, and AkkaoogleActorServer starts all the actors. Here’s the complete implementation of the com.akkaoogle.http.App Play2-mini Application:

Listing 12.16. Akkaoogle Play2-mini Application

The routes method is a partial function that matches the HTTP URL to an action. Action is nothing but a function object that takes an HTTP request and returns a Result. For example, case GET(Path("/")) matches the HTTP GET to "/" URL. The Play2-mini framework provides a nice DSL to parse the HTTP URL and verb. The following action returns Ok(HTTP 200 response code) with the output of the views.index() method:

Action { request =>
  Ok(views.index()).as("text/html")
}

views.index returns the HTML code to render the Akkaoogle homepage shown in figure 12.4. The following GET pattern match for product search is more interesting:

GET(Path("/akkaoogle/search")) & QueryString(qs)

In this case, you’re using the QueryString helper of Play2-mini to parse the query parameters and give you a value mapped to a parameter. In the action, you extract the description given by the user to create the FindPrice message, which in turn is sent to cheapest-deal-finder-balancer to find the cheapest price available for the product. You can find the complete working version of the Akkaoogle project in the chapter’s codebase. The Akkaoogle application isn’t completed yet, so you can take over and add more features to make it better. That’s the best way to learn and explore Akka.

12.5. Summary

Akka is a powerful toolkit that you can use to build frameworks or applications. Akka makes concurrency easy for programmers by raising the abstraction level. Akka is a concurrency framework built on actors, but it provides all the popular concurrency abstractions available on the market. It provides the flexibility you need to build your next enterprise application. Akka’s STM support lets you safely operate on mutable data structures without leaving the comfort of actor-based concurrency. Most importantly, you learned that STM composes, so you can build smaller atomic operations and compose them to solve problems. You also explored agents as another concurrency model that lets you send behavior from outside to manipulate data in a safe manner. Exploring dataflow concurrency was also interesting because it lets you write sequential programs without worrying about concurrency. And dataflow concurrency code is very easy to understand and follow.

By building Akkaoogle, you explored various considerations, constraints, and design decisions that typically arise when building large concurrent applications. I’m sure that the insights you gained will enable you to build your next Akka application. Always remember that Akka provides lots of options and configuration flexibility, and you should pick the features and options that work best for your application requirements. Akka is already used in various real-world applications,[5] and now you can use it too. From here on, let’s keep all our CPU cores busy.

5 Akka use cases, http://akka.io/docs/akka/1.1.2/intro/use-cases.html.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset