Chapter 2. Patterns of Actor Usage

Now that we understand the varying types of actor systems that can be created, what are some patterns of usage that we can define so that we can avoid making common mistakes when writing actor-based applications? Let’s look at a few of them.

The Extra Pattern

One of the most difficult tasks in asynchronous programming is trying to capture context so that the state of the world at the time the task was started can be accurately represented at the time the task finishes. However, creating anonymous instances of Akka actors is a very simple and lightweight solution for capturing the context at the time the message was handled to be utilized when the tasks are successfully completed. They are like extras in the cast of a movie—helping provide realistic context to the primary actors who are working around them.

The Problem

A great example is an actor that is sequentially handling messages in its mailbox but performing the tasks based on those messages off-thread with futures. This is a great way to design your actors in that they will not block waiting for responses, allowing them to handle more messages concurrently and increase your application’s performance. However, the state of the actor will likely change with every message.

Let’s define the boilerplate of this example. These are classes that will be reused for each of the iterations of our development process going forward. Note that all of this code is available in my GitHub repository, should you want to clone it and test yourself. First, we have a message telling our actor to retrieve the customer account balances for a particular customer ID:

case class GetCustomerAccountBalances(id: Long)

Next, we have data transfer objects in which we return the requested account information. Because customers may or may not have any accounts of each type, and it is possible they may have more than one of any of the account types, we return Option [List[(Long, BigDecimal)]] in each case, where Long represents an account identifier, and BigDecimal represents a balance:

case class AccountBalances(
  val checking: Option[List[(Long, BigDecimal)]],
  val savings: Option[List[(Long, BigDecimal)]],
  val moneyMarket: Option[List[(Long, BigDecimal)]])
case class CheckingAccountBalances(
  val balances: Option[List[(Long, BigDecimal)]])
case class SavingsAccountBalances(
  val balances: Option[List[(Long, BigDecimal)]])
case class MoneyMarketAccountBalances(
  val balances: Option[List[(Long, BigDecimal)]])

I promised in the preface of this book that I would show how this ties back to Eric Evans’ concepts with domain-driven design. Look at the classes I have created to perform this work. We can consider the entire AccountService to be a context bound, where an individual CheckingAccount or SavingsAccount is an entity. The number represented by the balance inside of one of those classes is a value. The checkingBalances, savingsBalances, and mmBalances fields are aggregates, and the AccountBalances return type is an aggregate root. Finally, Vaughn Vernon in his excellent “Implementing Domain-Driven Design” points to Akka as a possible implementation for an event-driven context bound. It is also quite easy to implement command query responsibility separation (per Greg Young’s specification) and event sourcing (using the open source eventsourced library) with Akka.

Finally, we have proxy traits that represent service interfaces. Just like with the Java best practice of exposing interfaces to services rather than the implementations of the classes, we will follow that convention here and define the service interfaces which can then be stubbed out in our tests:

trait SavingsAccountsProxy extends Actor
trait CheckingAccountsProxy extends Actor
trait MoneyMarketAccountsProxy extends Actor

Let’s take an example of an actor that will act as a proxy to get a customer’s account information for a financial services firm from multiple data sources. Further, let’s assume that each of the subsystem proxies for savings, checking and money market account balances will optionally return a list of accounts and their balances of that kind for this customer, and we’ll inject those as dependencies to the retriever class. Let’s write some basic Akka actor code to perform this task:

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout

class AccountBalanceRetriever(savingsAccounts: ActorRef,
                              checkingAccounts: ActorRef,
                              moneyMarketAccounts: ActorRef) extends Actor {
  implicit val timeout: Timeout = 100 milliseconds
  implicit val ec: ExecutionContext = context.dispatcher
  def receive = {
    case GetCustomerAccountBalances(id) =>
      val futSavings = savingsAccounts ? GetCustomerAccountBalances(id)
      val futChecking = checkingAccounts ? GetCustomerAccountBalances(id)
      val futMM = moneyMarketAccounts ? GetCustomerAccountBalances(id)
      val futBalances = for {
        savings <- futSavings.mapTo[Option[List[(Long, BigDecimal)]]]
        checking <- futChecking.mapTo[Option[List[(Long, BigDecimal)]]]
        mm <- futMM.mapTo[Option[List[(Long, BigDecimal)]]]
      } yield AccountBalances(savings, checking, mm)
      futBalances map (sender ! _)
  }
}

This code is fairly concise. The AccountBalanceRetriever actor receives a message to get account balances for a customer, and then it fires off three futures in parallel. The first will get the customer’s savings account balance, the second will get the checking account balance, and the third will get a money market balance. Doing these tasks in parallel allows us to avoid the expensive cost of performing the retrievals sequentially. Also, note that while the futures will return Options of some account balances by account ID, if they return None, they will not short-circuit the for comprehension—if None is returned from futSavings, it will still continue the for comprehension.

However, there are a couple of things about it that are not ideal. First of all, it is using futures to ask other actors for responses, which creates a new PromiseActorRef for every message sent behind the scenes. This is a waste of resources. It would be better to have our AccountBalanceRetriever actor send messages out in a “fire and forget” fashion and collect results asynchronously into one actor.

Furthermore, there is a glaring race condition in this code—can you see it? We’re referencing the “sender” in our map operation on the result from futBalances, which may not be the same ActorRef when the future completes, because the AccountBalanceRetriever ActorRef may now be handling another message from a different sender at that point!

Avoiding Ask

Let’s focus on eliminating the need to ask for responses in our actor first. We can send the messages with the ! and collect responses into an optional list of balances by account number. But how would we go about doing that?

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import akka.actor._

class AccountBalanceRetriever(savingsAccounts: ActorRef,
                              checkingAccounts: ActorRef,
                              moneyMarketAccounts: ActorRef) extends Actor {
  val checkingBalances,
      savingsBalances,
      mmBalances: Option[List[(Long, BigDecimal)]] = None
  var originalSender: Option[ActorRef] = None
  def receive = {
    case GetCustomerAccountBalances(id) =>
      originalSender = Some(sender)
      savingsAccounts ! GetCustomerAccountBalances(id)
      checkingAccounts ! GetCustomerAccountBalances(id)
      moneyMarketAccounts ! GetCustomerAccountBalances(id)
    case AccountBalances(cBalances, sBalances, mmBalances) =>
      (checkingBalances, savingsBalances, mmBalances) match {
        case (Some(c), Some(s), Some(m)) => originalSender.get !
          AccountBalances(checkingBalances, savingsBalances, mmBalances)
        case _ =>
      }
  }
}

This is better but still leaves a lot to be desired. First of all, we’ve created our collection of balances we’ve received back at the instance level, which means we can’t differentiate the aggregation of responses to a single request to get account balances. Worse, we can’t time out a request back to our original requestor. Finally, while we’ve captured the original sender as an instance variable that may or may not have a value (since there is no originalSender when the AccountBalanceRetriever starts up), we have no way of being sure that the originalSender is who we want it to be when we want to send data back!

Capturing Context

The problem is that we’re attempting to take the result of the off-thread operations of retrieving data from multiple sources and return it to whomever sent us the message in the first place. However, the actor will likely have moved on to handling additional messages in its mailbox by the time these futures complete, and the state represented in the AccountBalanceRetriever actor for “sender” at that time could be a completely different actor instance. So how do we get around this?

The trick is to create an anonymous inner actor for each GetCustomerAccountBalances message that is being handled. In doing so, you can capture the state you need to have available when the futures are fulfilled. Let’s see how:

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import akka.actor._

class AccountBalanceRetriever(savingsAccounts: ActorRef,
                              checkingAccounts: ActorRef,
                              moneyMarketAccounts: ActorRef) extends Actor {
  val checkingBalances,
      savingsBalances,
      mmBalances: Option[List[(Long, BigDecimal)]] = None
  def receive = {
    case GetCustomerAccountBalances(id) => {
      context.actorOf(Props(new Actor() {
        var checkingBalances,
            savingsBalances,
            mmBalances: Option[List[(Long, BigDecimal)]] = None
        val originalSender = sender
        def receive = {
          case CheckingAccountBalances(balances) =>
            checkingBalances = balances
            isDone
          case SavingsAccountBalances(balances) =>
            savingsBalances = balances
            isDone
          case MoneyMarketAccountBalances(balances) =>
            mmBalances = balances
            isDone
        }

        def isDone =
          (checkingBalances, savingsBalances, mmBalances) match {
            case (Some(c), Some(s), Some(m)) =>
              originalSender ! AccountBalances(checkingBalances,
                                               savingsBalances,
                                               mmBalances)
              context.stop(self)
            case _ =>
          }

        savingsAccounts ! GetCustomerAccountBalances(id)
        checkingAccounts ! GetCustomerAccountBalances(id)
        moneyMarketAccounts ! GetCustomerAccountBalances(id)
      }))
    }
  }
}

This is much better. We’ve captured the state of each receive and only send it back to the originalSender when all three have values. But there are still two issues here. First, we haven’t defined how we can time out on the original request for all of the balances back to whomever requested them. Secondly, our originalSender is still getting a wrong value—the “sender” from which it is assigned is actually the sender value of the anonymous inner actor, not the one that sent the original GetCustomerAccountBalances message!

Sending Yourself a Timeout Message

We can send ourselves a timeout message to handle our need to timeout the original request, by allowing another task to compete for the right to complete the operation with a timeout. This is a very clean way to allow the work to occur, while still enforcing timeout semantics on the request. If the data for all three of the account types is enqueued in the mailbox before the timeout message, the proper response of an AccountBalances type is sent back to the original sender. However, if the timeout message from the scheduled task is enqueued before any one of those three responses, a timeout is returned to the client.

Note that I am using None to represent only when I don’t have any data returned from one of my specific account type proxies. In the case where a customer is looked up and no data is found, I’m expecting to receive a response of Some(List()), meaning no data was found for that customer in that account type. This way, I can semantically differentiate whether or not I’ve received a response and when no data was found.

For the sake of additional clarity, I am using the LoggingReceive block in this example. This tells Akka to automatically log the handling of each message dequeued from the mailbox. It is a best practice to give yourself as much information as possible at runtime so you can debug your actors, and it can easily be turned off in the configuration file for the application. For more information, see the Akka online documentation.

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import org.jamieallen.effectiveakka.common._
import akka.actor.{ Actor, ActorRef, Props, ActorLogging }
import akka.event.LoggingReceive

object AccountBalanceRetrieverFinal {
  case object AccountRetrievalTimeout
}

class AccountBalanceRetrieverFinal(savingsAccounts: ActorRef,
                                   checkingAccounts: ActorRef,
                                   moneyMarketAccounts: ActorRef)
                                       extends Actor with ActorLogging {
  import AccountBalanceRetrieverFinal._

  def receive = {
    case GetCustomerAccountBalances(id) => {
      log.debug(s"Received GetCustomerAccountBalances for ID: $id from $sender")
      val originalSender = sender

      context.actorOf(Props(new Actor() {
        var checkingBalances,
            savingsBalances,
            mmBalances: Option[List[(Long, BigDecimal)]] = None
        def receive = LoggingReceive {
          case CheckingAccountBalances(balances) =>
            log.debug(s"Received checking account balances: $balances")
            checkingBalances = balances
            collectBalances
          case SavingsAccountBalances(balances) =>
            log.debug(s"Received savings account balances: $balances")
            savingsBalances = balances
            collectBalances
          case MoneyMarketAccountBalances(balances) =>
            log.debug(s"Received money market account balances: $balances")
            mmBalances = balances
            collectBalances
          case AccountRetrievalTimeout =>
            sendResponseAndShutdown(AccountRetrievalTimeout)
        }

        def collectBalances = (checkingBalances,
                               savingsBalances,
                               mmBalances) match {
          case (Some(c), Some(s), Some(m)) =>
            log.debug(s"Values received for all three account types")
            timeoutMessager.cancel
            sendResponseAndShutdown(AccountBalances(checkingBalances,
                                                    savingsBalances,
                                                    mmBalances))
          case _ =>
        }

        def sendResponseAndShutdown(response: Any) = {
          originalSender ! response
          log.debug("Stopping context capturing actor")
          context.stop(self)
        }

        savingsAccounts ! GetCustomerAccountBalances(id)
        checkingAccounts ! GetCustomerAccountBalances(id)
        moneyMarketAccounts ! GetCustomerAccountBalances(id)

        import context.dispatcher
        val timeoutMessager = context.system.scheduler.
            scheduleOnce(250 milliseconds) {
              self ! AccountRetrievalTimeout
            }
      }))
    }
  }
}

Now we can collect our results and check to see if we received the expected values and place them into the AccountBalances result to return to the caller, while also cancelling the scheduled task so that it doesn’t waste resources. Finally, we must remember to stop our anonymous inner actor so that we do not leak memory for every GetCustomerAccountBalances message we receive, regardless of whether we received all three responses or the timeout!

So why do we have to send the AccountRetrievalTimeout message to ourselves, into the queue of our Extra actor, rather than just sending it directly back to the originalSender in our scheduleOnce lambda? The scheduled task will run on another thread! If we perform work relative to cleaning up the actor on that thread, we’re introducing concurrency into the actor. While we are only telling our actor to stop itself after sending the message in this example, it would be very easy to fall into the trap of closing over some state and manipulating it if you do not send a message to yourself. There are other interfaces for scheduling that might make it more apparent for some that the operation is asynchronous, such as the method call style seen here:

val timeoutMessager = context.system.scheduler.
    scheduleOnce(250 milliseconds, self, AccountRetrievalTimeout)

You have to be vigilant about this. Sometimes, it can be very easy to fall into the trap of introducing concurrency into our actors where there never should be any. If you see yourself using curly braces inside of an actor, think about what is happening inside of there and what you might be closing over.

Why not use a promise?

In an earlier version of this example, I tried to use a promise to perform this work, where either the successful result of the AccountBalances type was put into the future inside of the promise, or the timeout failure was used to complete it. However, this is unnecessary complexity, as we can allow the ordering inside of the Extra actor’s queue of when messages are enqueued to perform the same basic task. But also, you cannot return a future value from a promise—they cannot be sent to an actor, which may or may not be remote. And due to the beauty of location transparency, that is an implementation detail on which your actors should never focus.

Warning

Futures should never be passed between actors because you cannot serialize a thread.

How to test this logic

So now that we have some code that we think will work, we need to write tests to prove that it does. If you’re a TDD-adherent, you’re probably mortified that I didn’t do that up front. I’m not dogmatic about when someone writes tests; I just care that the tests get written.

The first thing we have to do is define the test stubs that will be used in our tests and injected as dependencies to the retriever actor. These stubs can be very simple actors—when asked for account information of their type by a specific customer ID, each non-failure test case stub will return an optional list of balances by account ID. Data for each customer to be used in tests needs to be placed into a map to be found, and if no data is returned, we must return a value of Some(List()) to meet our API:

import akka.actor.{ Actor, ActorLogging }
import akka.event.LoggingReceive

class CheckingAccountsProxyStub
extends CheckingAccountsProxy with ActorLogging {
  val accountData = Map[Long, List[(Long, BigDecimal)]](
    1L -> List((3, 15000)),
    2L -> List((6, 640000), (7, 1125000), (8, 40000)))

  def receive = LoggingReceive {
    case GetCustomerAccountBalances(id: Long) =>
      log.debug(s"Received GetCustomerAccountBalances for ID: $id")
      accountData.get(id) match {
        case Some(data) => sender ! CheckingAccountBalances(Some(data))
        case None => sender ! CheckingAccountBalances(Some(List()))
      }
  }
}

class SavingsAccountsProxyStub
    extends SavingsAccountsProxy with ActorLogging {

  val accountData = Map[Long, List[(Long, BigDecimal)]](
    1L -> (List((1, 150000), (2, 29000))),
    2L -> (List((5, 80000))))

  def receive = LoggingReceive {
    case GetCustomerAccountBalances(id: Long) =>
      log.debug(s"Received GetCustomerAccountBalances for ID: $id")
      accountData.get(id) match {
        case Some(data) => sender ! SavingsAccountBalances(Some(data))
        case None => sender ! SavingsAccountBalances(Some(List()))
      }
  }
}

class MoneyMarketAccountsProxyStub
    extends MoneyMarketAccountsProxy with ActorLogging {

  val accountData = Map[Long, List[(Long, BigDecimal)]](
    2L -> List((9, 640000), (10, 1125000), (11, 40000)))

  def receive = LoggingReceive {
    case GetCustomerAccountBalances(id: Long) =>
      log.debug(s"Received GetCustomerAccountBalances for ID: $id")
      accountData.get(id) match {
        case Some(data) => sender ! MoneyMarketAccountBalances(Some(data))
        case None => sender ! MoneyMarketAccountBalances(Some(List()))
      }
  }
}

In the failure condition (represented by a timeout), a stub will simulate a long-running blocking database call that does not complete in time by never sending a response to the calling actor:

class TimingOutSavingsAccountProxyStub
extends SavingsAccountsProxy with ActorLogging {
  def receive = LoggingReceive {
    case GetCustomerAccountBalances(id: Long) =>
      log.debug(s"Forcing timeout by not responding!")
  }
}

The following examples show how to write a test case for the successful return of AccountBalances. Since this example uses stubbed out proxies to what would be services from which receive account information, it is trivial to inject test-only stub proxies that will cause the timeout functionality to occur.

We also want to be sure that the integrity of the context of each message handled is maintained by our retriever. To do this, we send multiple messages from different TestProbe instances one after the other, and we verify that the different values were appropriately returned to each.

Note how I use the within block to verify the timing of expected responses. This is a great way to verify that your tests are executing to meet the nonfunctional requirements of your system. Use the within block to specify either a maximum time of execution, or as we see in the failure case, that we didn’t receive a response too early or too late.

Finally, we test the timeout condition by injecting a timing out stub into our retriever and making sure that the timeout response is what our test receives in response:

import akka.testkit.{ TestKit, TestProbe, ImplicitSender }
import akka.actor.{ Actor, ActorLogging, ActorSystem, Props }
import org.scalatest.WordSpecLike
import org.scalatest.matchers.MustMatchers
import scala.concurrent.duration._
import org.jamieallen.effectiveakka.common._
import org.jamieallen.effectiveakka.pattern.extra.AccountBalanceRetrieverFinal._

class ExtraFinalSpec extends TestKit(ActorSystem("ExtraTestAS"))
    with ImplicitSender with WordSpecLike with MustMatchers {

  "An AccountBalanceRetriever" should {
    "return a list of account balances" in {
      val probe2 = TestProbe()
      val probe1 = TestProbe()
      val savingsAccountsProxy =
        system.actorOf(Props[SavingsAccountsProxyStub],
          "extra-success-savings")
      val checkingAccountsProxy =
        system.actorOf(Props[CheckingAccountsProxyStub],
          "extra-success-checkings")
      val moneyMarketAccountsProxy = system.actorOf(
        Props[MoneyMarketAccountsProxyStub], "extra-success-money-markets")
      val accountBalanceRetriever = system.actorOf(
        Props(new AccountBalanceRetrieverFinal(savingsAccountsProxy,
                                               checkingAccountsProxy,
                                               moneyMarketAccountsProxy)),
                                               "extra-retriever")

      within(300 milliseconds) {
        probe1.send(accountBalanceRetriever, GetCustomerAccountBalances(1L))
        val result = probe1.expectMsgType[AccountBalances]
        result must equal(AccountBalances(
            Some(List((3, 15000))),
            Some(List((1, 150000), (2, 29000))),
            Some(List())))
      }
      within(300 milliseconds) {
        probe2.send(accountBalanceRetriever, GetCustomerAccountBalances(2L))
        val result = probe2.expectMsgType[AccountBalances]
        result must equal(AccountBalances(
            Some(List((6, 640000), (7, 1125000), (8, 40000))),
            Some(List((5, 80000))),
            Some(List((9, 640000), (10, 1125000), (11, 40000)))))
      }
    }

    "return a TimeoutException when timeout is exceeded" in {
      val savingsAccountsProxy = system.actorOf(
        Props[TimingOutSavingsAccountProxyStub], "extra-timing-out-savings")
      val checkingAccountsProxy = system.actorOf(
        Props[CheckingAccountsProxyStub], "extra-timing-out-checkings")
      val moneyMarketAccountsProxy = system.actorOf(
        Props[MoneyMarketAccountsProxyStub], "extra-timing-out-money-markets")
      val accountBalanceRetriever = system.actorOf(
        Props(new AccountBalanceRetrieverFinal(savingsAccountsProxy,
                                               checkingAccountsProxy,
                                               moneyMarketAccountsProxy)),
                                               "extra-timing-out-retriever")
      val probe = TestProbe()

      within(250 milliseconds, 500 milliseconds) {
        probe.send(accountBalanceRetriever, GetCustomerAccountBalances(1L))
        probe.expectMsg(AccountRetrievalTimeout)
      }
    }
  }
}

Now our test checks the success case and that the failure results in expected behavior. And because the AccountRetrievalTimeout is a case object, it is a “term,” not a “type,” and I therefore can use the expectMsg() method instead of expectMsgType[].

Tip

Asynchronous programming is simply not easy, even with powerful tools at our disposal. We always must think about the state we need and the context from which we get it at the time we need it.

The Cameo Pattern

The Extra Pattern does help in some scenarios, but it could easily be argued that it muddles your code by putting too many details in once place. It is also similar to lambdas, in that using an anonymous instance gives you less information in stack traces on the JVM, is harder to use with a debugging tool, and is easier to close over state.

The good news is that it is very easily fixed by using pre-defined types. You get information specific to the class that you created in stack traces without an obfuscated, generated name. And you can’t close over external state, because there is none: you have to pass the data into the class for it to be visible.

There is a time and a place for everything, and that includes lambdas and anonymous implementations of interfaces, just like we did in the Extra Pattern. When the code is trivial, you can generally use these constructs without fear. However, learn to notice when the code in such literals is crossing a threshold of complexity that will make it harder to reason about in production when failure occurs.

Those are good reasons for pulling the type you’re creating with the Extra Pattern into a pre-defined type of actor, where you create an instance of that type for every message handled. To do so, we can move the anonymous implementation of the actor trait out into its own type definition. This results in a type only used for simple interactions between actors, similar to a cameo role in the movies.

Let’s pull the anonymous implementation out and bind it to a type. How would that look?

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import org.jamieallen.effectiveakka.common._
import akka.actor._
import akka.event.LoggingReceive

object AccountBalanceResponseHandler {
  case object AccountRetrievalTimeout

  // Factory method for our actor Props
  def props(savingsAccounts: ActorRef, checkingAccounts: ActorRef,
    moneyMarketAccounts: ActorRef, originalSender: ActorRef): Props = {
    Props(new AccountBalanceResponseHandler(savingsAccounts, checkingAccounts,
      moneyMarketAccounts, originalSender))
  }
}

class AccountBalanceResponseHandler(savingsAccounts: ActorRef,
                                    checkingAccounts: ActorRef,
                                    moneyMarketAccounts: ActorRef,
                                    originalSender: ActorRef)
                                        extends Actor with ActorLogging {

  import AccountBalanceResponseHandler._
  var checkingBalances,
      savingsBalances,
      mmBalances: Option[List[(Long, BigDecimal)]] = None
  def receive = LoggingReceive {
    case CheckingAccountBalances(balances) =>
      log.debug(s"Received checking account balances: $balances")
      checkingBalances = balances
      collectBalances
    case SavingsAccountBalances(balances) =>
      log.debug(s"Received savings account balances: $balances")
      savingsBalances = balances
      collectBalances
    case MoneyMarketAccountBalances(balances) =>
      log.debug(s"Received money market account balances: $balances")
      mmBalances = balances
      collectBalances
    case AccountRetrievalTimeout =>
      log.debug("Timeout occurred")
      sendResponseAndShutdown(AccountRetrievalTimeout)
  }

  def collectBalances = (checkingBalances,
                         savingsBalances,
                         mmBalances) match {
    case (Some(c), Some(s), Some(m)) =>
      log.debug(s"Values received for all three account types")
      timeoutMessager.cancel
      sendResponseAndShutdown(AccountBalances(checkingBalances,
                                              savingsBalances,
                                              mmBalances))
    case _ =>
  }

  def sendResponseAndShutdown(response: Any) = {
    originalSender ! response
    log.debug("Stopping context capturing actor")
    context.stop(self)
  }

  import context.dispatcher
  val timeoutMessager = context.system.scheduler.
    scheduleOnce(250 milliseconds) {
      self ! AccountRetrievalTimeout
    }
}

class AccountBalanceRetriever(savingsAccounts: ActorRef,
                              checkingAccounts: ActorRef,
                              moneyMarketAccounts: ActorRef) extends Actor {
  def receive = {
    case GetCustomerAccountBalances(id) =>
      val originalSender = Some(sender)

      // I'm now using a factory method now from the companion object above!
      val handler = context.actorOf(
        AccountBalanceResponseHandler.props(savingsAccounts,
                                      checkingAccounts,
                                      moneyMarketAccounts,
                                      originalSender), "cameo-message-handler")
      savingsAccounts.tell(GetCustomerAccountBalances(id), handler)
      checkingAccounts.tell(GetCustomerAccountBalances(id), handler)
      moneyMarketAccounts.tell(GetCustomerAccountBalances(id), handler)
  }
}

Note that now we have to use the tell method on the ActorRefs for the accounts so that we can pass the handler reference as the actor to receive all responses. But the code is much cleaner from having excised the anonymous actor implementation from the body of the AccountBalanceRetriever.

Despite the fact that we’ve created a new instance of the AccountBalanceResponseHandler for every request to get balances, I’m placing the AccountBalanceRetriever’s sender into a local variable in the receive block before passing it to the new instance of the AccountBalanceResponseHandler. Make certain you follow that pattern, since passing the sender ActorRef without first capturing it will expose your handler to the same problem (losing the sender of the message to whom we want to send our response) that we saw earlier where the sender ActorRef changed.

Also note that by using a “named” type, AccountBalanceResponseHandler, we’ll have more useful information when debugging because anonymous types are assigned names in the JVM which aren’t very easy to decipher. In my opinion, it is always preferrable to have named types over anonymous actors for this reason.

The Companion Object Factory Method

You may have noticed the comment in the AccountBalanceResponseHandler companion object that I now have defined a props method as a factory for my actor. See The Companion Object Factory Method for more details about why I have done this.

How to Test This Logic

Testing this code is virtually identical to how we did it previously, and we can reuse the common stubs that we created as well. After we test the success case, we can inject a stub that will induce timeout to test the failure case:

import akka.testkit.{ TestKit, TestProbe, ImplicitSender }
import akka.actor.{ Actor, ActorLogging, ActorSystem, Props }
import org.scalatest.WordSpecLike
import org.scalatest.matchers.MustMatchers
import scala.concurrent.duration._
import org.jamieallen.effectiveakka.common._
import org.jamieallen.effectiveakka.pattern.cameo.AccountBalanceResponseHandler._

class CameoSpec extends TestKit(ActorSystem("CameoTestAS"))
    with ImplicitSender with WordSpecLike with MustMatchers {

  val checkingAccountsProxy = system.actorOf(
    Props[CheckingAccountsProxyStub], "checkings")
  val moneyMarketAccountsProxy = system.actorOf(
    Props[MoneyMarketAccountsProxyStub], "money-markets")

  "An AccountBalanceRetriever" should {
    "return a list of account balances" in {
      val probe1 = TestProbe()
      val probe2 = TestProbe()
      val savingsAccountsProxy = system.actorOf(
        Props[SavingsAccountsProxyStub], "cameo-success-savings")
      val checkingAccountsProxy = system.actorOf(
        Props[CheckingAccountsProxyStub], "cameo-success-checkings")
      val moneyMarketAccountsProxy = system.actorOf(
        Props[MoneyMarketAccountsProxyStub], "cameo-success-money-markets")
      val accountBalanceRetriever = system.actorOf(
        Props(new AccountBalanceRetriever(savingsAccountsProxy,
                                          checkingAccountsProxy,
                                          moneyMarketAccountsProxy)),
                                          "cameo-retriever1")

      within(300 milliseconds) {
        probe1.send(accountBalanceRetriever, GetCustomerAccountBalances(1L))
        val result = probe1.expectMsgType[AccountBalances]
        result must equal(AccountBalances(
          Some(List((3, 15000))),
          Some(List((1, 150000), (2, 29000))),
          Some(List())))
      }
      within(300 milliseconds) {
        probe2.send(accountBalanceRetriever, GetCustomerAccountBalances(2L))
        val result = probe2.expectMsgType[AccountBalances]
        result must equal(AccountBalances(
          Some(List((6, 640000), (7, 1125000), (8, 40000))),
          Some(List((5, 80000))),
          Some(List((9, 640000), (10, 1125000), (11, 40000)))))
      }
    }

    "return a TimeoutException when timeout is exceeded" in {
      val savingsAccountsProxy = system.actorOf(
        Props[TimingOutSavingsAccountProxyStub], "cameo-timing-out-savings")
      val checkingAccountsProxy = system.actorOf(
        Props[CheckingAccountsProxyStub], "cameo-timing-out-checkings")
      val moneyMarketAccountsProxy = system.actorOf(
        Props[MoneyMarketAccountsProxyStub], "cameo-timing-out-money-markets")
      val accountBalanceRetriever = system.actorOf(
        Props(new AccountBalanceRetriever(savingsAccountsProxy,
                                          checkingAccountsProxy,
                                          moneyMarketAccountsProxy)),
                                          "cameo-timing-out-retriever")
      val probe = TestProbe()

      within(250 milliseconds, 500 milliseconds) {
        probe.send(accountBalanceRetriever, GetCustomerAccountBalances(1L))
        probe.expectMsg(AccountRetrievalTimeout)
      }
    }
  }
}

When creating the stubs I use to inject mocks of services for the tests, I am not using the props() companion object for those actors. Can you see why? In this case, the instantiation of the Props instance for each stubbed actor is happening inside the context of a test, not another actor. So I don’t have to worry about closing over “this” from the test context.

The cameo pattern allows you to be explicit about the type of the actor that will perform the work for each GetCustomerAccountBalances message sent to the AccountBalanceRetriever, which I generally prefer. I also think it separates concerns nicely, whereas the extra pattern can begin to make your code more difficult to read because of the amount of extra code it has inside of the body of the AccountBalanceRetriever.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset