Chapter 6. Concurrency in Scala

In this chapter, we will switch our focus from collections to a different topic: concurrency. Being able to take advantage of all the CPU resources that your hardware provides is critical to writing performant software. Unfortunately, writing concurrent code is not an easy task because it is easy to write unsafe programs. If you come from Java, you may still have nightmares involving synchronized blocks and locks! The java.util.concurrent package provides numerous tools that make writing concurrent code simpler. However, designing stable and reliable concurrent applications can still be a daunting challenge. In this chapter, we will explore the tools that are provided by the Scala standard library to take advantage of concurrency. After a short presentation of the main abstraction, Future, we will study its behavior and usage pitfalls that we should avoid. We will end this chapter by exploring a possible alternative to Future named Task, which is provided by the Scalaz library. In this chapter, we will explore the following topics:

  • Concurrency versus parallelism
  • Future usage considerations
  • Blocking calls and callbacks
  • Scalaz Task

Parallelizing backtesting strategies

The data scientists are off and running with the data analysis tools that you built for them to research trading strategies. However, they have hit a wall because backtesting strategies is becoming too expensive. As they have built more sophisticated strategies that require more historical data, and employ more stateful algorithms, backtesting has taken longer. Once again, you are being called upon to help out at MVT by leveraging Scala and the functional paradigm to deliver performant software.

The data scientists have incrementally built out a backtesting tool that allows the team to determine a strategy's performance by replaying historical data. This works by providing a preset strategy to run, the ticker to test against, and the time interval of historical data to replay. The backtester loads market data and applies the strategy to generate trading decisions. Once the backtester finishes replaying historical data, it summarizes and displays strategy performance results. The backtester is heavily depended on to determine the efficacy of proposed trading strategies before putting them into production for live trading.

To begin familiarizing yourself with the backtester, you look into the code, as follows:

  sealed trait Strategy 
  case class PnL(value: BigDecimal) extends AnyVal 
  case class BacktestPerformanceSummary(pnl: PnL) 
  case class Ticker(value: String) extends AnyVal 
 
  def backtest( 
    strategy: Strategy, 
    ticker: Ticker, 
    testInterval: Interval): BacktestPerformanceSummary = ??? 

In the preceding snapshot from the data analysis repository, you see the primary method that drives backtesting. Given a StrategyTicker, and Interval, it can produce BacktestPerformanceSummary. Scanning the repository, you find a file named CrazyIdeas.scala that shows Dave as the only commit author. In here, you see example invocations of the backtester:

def lastMonths(months: Int): Interval = 
    new Interval(new DateTime().minusMonths(months), new DateTime()) 
backtest(Dave1, Ticker("AAPL"), lastMonths(3)) 
backtest(Dave1, Ticker("GOOG"), lastMonths(3)) 
backtest(Dave2, Ticker("AAPL"), lastMonths(3)) 
backtest(Dave2, Ticker("GOOG"), lastMonths(3)) 

The usage of the backtester gives you a clue to a possible performance improvement. It looks like when Dave has a new idea, he wants to evaluate its performance on multiple symbols and compare it against other strategies. In its current form, backtests are performed sequentially. One way to improve the execution speed of the backtester is to parallelize the execution of all backtesting runs. If each invocation of the backtester is parallelized and if there are spare hardware resources, then backtesting multiple strategies and symbols will finish faster. To understand how to parallelize the backtester, we first need to dive into the topic of asynchronous programming and then see how Scala supports concurrency.

Note

Before diving into the code, we need to enrich our vocabulary to discuss the properties of asynchronous programming. Concurrency and parallelism are often used interchangeably, but there is an important distinction between these two terms. Concurrency involves two (or more) tasks that are started and executed in overlapping time periods. Both tasks are in-progress (that is, they are running) at the same time, but only one task may be performing actual work at any instant in time. This is the case when you write concurrent code on a single-core machine. Only one task can progress at a time, but multiple tasks are ongoing concurrently.

Parallelism exists only when both tasks are truly running at the same time. With a dual-core machine, you can execute two tasks at the same time. From this definition, we see that parallelism depends on the hardware that is available for use. This means that the property of concurrency can be added to a program, but parallelism is outside the control of the software.

To better illustrate these concepts, consider the example of painting a room. If there is only one painter, the painter can paint the first coat on a wall, move on to the next wall, go back to the first wall for the second coat and then finish the second wall. The painter is painting both walls concurrently, but can only spend time on one wall at any given time. If two painters are on the job, they can each focus on one wall and paint them in parallel.

Exploring Future

The primary construct in Scala to drive concurrent programming is Future. Found in the scala.concurrent package, Future can be seen as a container for a value that may not yet exist. Let's look at a simple example to illustrate usage:

scala> import scala.concurrent.Future 
import scala.concurrent.Future 
 
scala> import scala.concurrent.ExecutionContext 
import scala.concurrent.ExecutionContext 
 
scala> val context: ExecutionContext = scala.concurrent.ExecutionContext.global 
context: scala.concurrent.ExecutionContext = scala.concurrent.impl.ExecutionContextImpl@3fce8fd9 
 
scala> def example(){ 
  println("Starting the example") 
  Future{  
    println("Starting the Future") 
    Thread.sleep(1000)  // simulate computation 
    println("Done with the computation")   
 }(context) 
 
println("Ending example") 
} 

The preceding example shows a short method, creating a Future value simulates an expensive computation and prints a couple of lines to make it easier for us to understand the flow of the application. When running example, we see the following output:

scala> example() 
Starting the example 
Ending example 
Starting the future 
// a pause 
Done with the computation 

We can see that Future was executed after the end of the example method. This is because when a Future is created, it starts its computation concurrently. You may be wondering, "What is this context object of the ExecutionContext type that is used when creating the Future?" We will explore ExecutionContext in-depth shortly, but for now, we treat it as the the object that is responsible for the execution of the Future. We import scala.concurrent.ExecutionContext.global, which is a default object that is created by the standard library to be able to execute the Future.

A Future object is a stateful object. It is either not yet complete when the computation is underway or completed once the computation finishes. Furthermore, a completed Future can be either a success when the computation was able to complete, or it can be a failure if an exception was thrown during the computation.

The Future API provides combinators to compose the Future instances and manipulate the result that they contain:

scala> import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.ExecutionContext.Implicits.global 
 
scala> import scala.concurrent.Future 
import scala.concurrent.Future 
 
scala> Future(1).map(_ + 1).filter(_ % 2 == 0).foreach(println) 
 
2 

This snippet from the Scala console shows construction of a Future data type that wraps a constant integer value. We see that the integer contained in the Future data type is transformed using functions that are similar to the ones that we expect to find on Option and collection data types. These transforms are applied once the preceding Future completes, and return a new Future.

As promised, we now look into ExecutionContext. The ExecutionContext can be thought of as the machinery behind Future that provides runtime asynchrony. In the previous snippet, a Future was created to perform simple addition and modulo division without explicitly providing an ExecutionContext instance at the call site. Instead, only an import of the global object was provided. The snippet executes because global is an implicit value and the signature of map accepts an implicit ExecutionContext. Let's look at the following signature of map to deepen our understanding:

def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] 

From the signature of map, we see that unlike the map transformation on List, the Future requires a curried, implicit ExecutionContext argument. To understand how an ExecutionContext provides runtime asynchrony, we need to first understand its operations:

trait ExecutionContext { 
  def execute(runnable: Runnable): Unit 
  def reportFailure(cause: Throwable): Unit 
  def prepare(): ExecutionContext = this 
} 

The execute is a side-effecting method that operates on a java.lang.Runnable. For those familiar with concurrency in Java, you most likely recall that Runnable is the commonly-used interface to allow threads and other java.util.concurrent abstractions to execute code concurrently. Although we do not know how Future achieves runtime asynchrony yet, we do know there is a link between Future execution and creation of a Runnable.

The next question we will answer is, "How do I create an ExecutionContext? " By studying the companion object, we discover the following signatures:

def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService 
def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService 
def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor 
def fromExecutor(e: Executor): ExecutionContextExecutor 

The standard library provides convenient ways to create an ExecutionContext from either a java.util.concurrent.Executor or java.util.concurrent.ExecutorService.

Note

If you are unfamiliar with the machinery that is provided by the java.util.concurrent package and you are looking for a deeper treatment than that provided by the API documentation, we encourage you to read Java Concurrency in Practice by Brian Goetz (http://jcip.net/). Although Java Concurrency in Practice was written around the release of JDK 6, it contains numerous principles that continue to apply today. Reading this book will provide you with a deep understanding of the JDK-provided concurrency primitives that are utilized by the Scala standard library.

The return type of the factory methods is a more specialized version of ExecutionContext. The standard library defines the following inheritance chain for ExecutionContext:

trait ExecutionContextExecutor extends ExecutionContext with java.util.concurrent.Executor 
trait ExecutionContextExecutorService extends ExecutionContextExecutor with java.util.concurrent.ExecutorService 

Also, in the ExecutionContext companion object, we find the implicit context used in our first example, as follows:

def global: ExecutionContextExecutor = Implicits.global 

The documentation for the definition of Implicits.global indicates that this ExecutionContext is backed by a thread pool with a thread count that is equal to the available processor count. Our dive into ExecutionContext shows us how the simple Future example runs. We can illustrate how a Future applies its ExecutionContext to execute on multiple threads:

  Future(1).map(i => { 
    println(Thread.currentThread().getName) 
    i + 1 
  }).filter(i => { 
    println(Thread.currentThread().getName) 
    i % 2 == 0 
  }).foreach(println) 

We extend the original snippet to print the name of the thread performing each transformation. When run on a machine with multiple cores, this snippet yields variable output, depending on which threads pick up the transformations. Here is an example output:

ForkJoinPool-1-worker-3 
ForkJoinPool-1-worker-5 
2 

This example shows that one worker-3 thread performed the map transformation while another worker-5 thread performed the filter transformation. There are two key insights to draw from our simple example about how Future affects control flow. First, Future is a data type for concurrency that enables us to break the control flow of a program into multiple logical threads of processing. Second, our example shows that Future begins execution immediately upon creation. This means that transformations are applied immediately in a different flow of the program. We can use these insights to improve the runtime performance of Dave's crazy ideas.

Future and crazy ideas

We apply Future to Dave's set of backtests to improve performance. We believe there is an opportunity for a performance improvement because Dave's laptop has four CPU cores. This means that by adding concurrency to our program, we will be able to benefit from runtime parallelism. Our first attempt utilizes a for-comprehension:

implicit val ec = scala.concurrent.ExecutionContext.Implicits.global 
for { 
      firstDaveAapl <- Future(backtest(Dave1, Ticker("AAPL"), lastMonths(3))) 
      firstDaveGoog <- Future(backtest(Dave1, Ticker("GOOG"), lastMonths(3))) 
      secondDaveAapl <- Future(backtest(Dave2, Ticker("AAPL"), lastMonths(3))) 
      secondDaveGoog <- Future(backtest(Dave2, Ticker("GOOG"), lastMonths(3))) 
    } yield (firstDaveAapl, firstDaveGoog, secondDaveAapl, secondDaveGoog) 

Each backtest invocation is wrapped with the creation of a Future instance by calling Future.apply. This companion object method uses a by-name parameter to defer evaluation of the argument, which, in this case, is the invocation of backtest:

def apply[T](body: =>T)(executor: ExecutionContext): Future[T] 

After running the new version of CrazyIdeas.scala, you are disappointed to see the runtime execution has not improved. You quickly double-check the number of CPUs on your Linux box, as follows:

$ cat /proc/cpuinfo  | grep processor | wc -l 
8 

Having confirmed there are eight cores available on your laptop, you wonder why the execution time matches the original serial execution time. The solution here is to consider how the for-comprehension is compiled. The for-comprehension is equivalent to the following simpler example:

Future(1).flatMap(f1 => Future(2).flatMap(f2 => Future(3).map(f3 => (f1, f2, f3)))) 

In this desugared representation of the for-comprehension, we see that the second Future is created and evaluated within the flatMap transformation of the first Future. Any transformation applied to a Future (for example, flatMap) is only invoked once the value provided to the transform has been computed. This means that the Future in the preceding example and the for-comprehension are executed sequentially. To achieve the concurrency that we are looking for, we must instead modify CrazyIdeas.scala to look like the following:

    val firstDaveAaplF = Future(backtest(Dave1, Ticker("AAPL"), 
      lastMonths(3))) 
    val firstDaveGoogF = Future(backtest(Dave1, Ticker("GOOG"), 
      lastMonths(3))) 
    val secondDaveAaplF = Future(backtest(Dave2, Ticker("AAPL"), 
      lastMonths(3))) 
    val secondDaveGoogF = Future(backtest(Dave2, Ticker("GOOG"), 
      lastMonths(3))) 
    for { 
      firstDaveAapl <- firstDaveAaplF 
      firstDaveGoog <- firstDaveGoogF 
      secondDaveAapl <- secondDaveAaplF 
      secondDaveGoog <- secondDaveGoogF 
    } yield (firstDaveAapl, firstDaveGoog, secondDaveAapl, secondDaveGoog) 

In this snippet, four backtests are kicked off concurrently and the results are transformed into a Future of a Tuple4 consisting of four BacktestPerformanceSummary values. Seeing is believing, and after showing Dave the faster runtime of his backtests, he is excited to iterate quickly on new backtest ideas. Dave never misses a chance to throw around a pun, exclaiming, "Using all my cores is making my laptop fans really whiz. Not sure I'm a fan of the noise, but I sure do like the performance!"

Future usage considerations

In the previous example, we illustrated the ease of use of the Future API by investigating how to introduce concurrency to the backtester. Like any powerful tool, your usage of Future must be disciplined to ensure correctness and performance. This section evaluates topics that commonly cause confusion and error when using the Future to add concurrency to your program. We will detail performing side-effects, blocking execution, handling failures, choosing an appropriate execution context, and performance considerations.

Performing side-effects

When programming with Future, it is important to remember that Future is inherently a side-effecting construct. Unless the success or failure factory methods are used to lift a value into a Future, work is scheduled to be executed on a different thread (part of the ExecutionContext that is used to create the Future). More importantly, once executed, a Future cannot be executed again. Consider the following snippet:

scala> import scala.concurrent.Future 
import scala.concurrent.Future 
 
scala> import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.ExecutionContext.Implicits.global 
 
scala> val f = Future{ println("FOO"); 40 + 2} 
FOO 
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@5575e0df 
 
scala> f.value 
res3: Option[scala.util.Try[Int]] = Some(Success(42)) 

The Future is computed and prints FOO as expected. We can then access the value wrapped in the Future. Note that when accessing the value, nothing is printing on the console. Once completed, the Future is merely a wrapper for a realized value. If you want to perform the computation again, you need to create a new instance of Future.

Note

Note that the preceding example uses Future.value to extract the result of the computation. This is for the sake of simplicity. Production code should rarely, if ever, use this method. Its return type is defined as Option[Try[A]]. An Option is used to represent the case of a completed Future with a Some, and an unrealized Future with a None. Furthermore, remember that a realized Future can have two states: success or failure. This is the purpose of the inner Try. Like Option.get, it is almost never a good idea to use Future.value. To extract a value from a Future, refer to the additional techniques described next.

Blocking execution

When we added concurrency to the backtester, we wrote a for-comprehension returning Future[(BacktestPerformanceSummary, BacktestPerformanceSummary, BacktestPerformanceSummary, BacktestPerformanceSummary)], which may leave you wondering how you access the value wrapped in the Future. Another way of asking the question is, "Given Future[T], how do I return T?" The short answer is, "You don't!" Programming with many Future requires a shift in thinking away from synchronous execution towards asynchronous execution. When programming with an asynchronous model, the goal is to avoid working with T directly because it implies a synchronous contract.

In practice, there are situations where it is useful to have the Future[T] => T function. For example, consider the backtester snippet. If the code from the snippet is used to create a program by defining an object extending App, the program will terminate before backtesting completes. As the threads in the ExecutionContext global are daemon threads, the JVM terminates immediately after creating the Future. In this scenario, we need a synchronization mechanism to pause execution until the result is ready. By extending the Awaitable trait, Future is able to provide such facilities. The Await module exposes two methods that achieve this goal:

def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type 
def result[T](awaitable: Awaitable[T], atMost: Duration): T 

As Future extends Awaitable, a Future can be supplied as an argument to either method. The ready halts program flow until T is available and returns the completed Future[T]. In practice, ready is rarely used because it is conceptually strange to return a Future[T] from a synchronous call instead of T. You are more likely to commonly use result, which provides the desired transformation returning T given Future[T]. For example, CrazyIdeas.scala can be modified to look like the following:

    val summariesF = for { 
      firstDaveAapl <- Future(backtest(Dave1, Ticker("AAPL"), lastMonths(3))) 
      firstDaveGoog <- Future(backtest(Dave1, Ticker("GOOG"), lastMonths(3))) 
      secondDaveAapl <- Future(backtest(Dave2, Ticker("AAPL"), lastMonths(3))) 
      secondDaveGoog <- Future(backtest(Dave2, Ticker("GOOG"), lastMonths(3))) 
    } yield (firstDaveAapl, firstDaveGoog, secondDaveAapl, secondDaveGoog) 
 
    Await.result(summariesF, scala.concurrent.duration.Duration(1, java.util.concurrent.TimeUnit.SECONDS)) 

In this snippet, we see the blocking, synchronous invocation of Await.result to return the Tuple of Future[BacktestPerformanceSummary]. This blocking call is parameterized with a timeout to defend against the scenario where the Future is not computed within a certain amount of time. Using a blocking call to retrieve the backtest results means that the JVM will only exit after the backtest completes or when the timeout expires. When the timeout expires and the backtest is incomplete, result and ready throw a TimeoutException.

Blocking execution of your program is potentially detrimental to your program's performance, and it should be used with caution. Using the methods on the Await companion object make blocking calls easy to recognize. As ready and result throw an exception when timing out, rather than returning a different data type, you must take extra caution to handle this scenario. You should treat any synchronous call involving asynchrony (that either does not provide a timeout or does not handle the timeout) as a bug.

Programming asynchronously requires a mindset shift to write a program that describes what to do when the value appears rather than writing programs that require a value to exist before acting on it. You should be suspicious of any use of Await that interrupts transformation of a to-be-computed value. A set of transformations should be composed by acting upon Future[T] instead of T. Usage of Await should be restricted to scenarios where a program has no other work to perform and requires the result of the transformation, as we saw with the backtester.

Note

As the standard library models timeout with an exception instead of a different return type, it is hard to enforce that a timeout is always handled One way to improve safety is to write a utility method that returns Option[T] instead of T to account for the timeout scenario:

object SafeAwait { 
  def result[T]( 
    awaitable: Awaitable[T], 
    atMost: Duration): Option[T] =

    Try(Await.result(awaitable, atMost)) match { 
      case Success(t) => Some(t) 
      case Failure(_: TimeoutException) => None 
      case Failure(e) => throw e 
    } 
}

With this new method, an entire error class is eliminated. As you encounter other unsafe transformations, consider defining methods that return a data type that encodes expected errors to avoid accidentally mishandling the transformation result. What other examples of unsafe transformations come to mind?

Handling failures

Working with Future requires disciplined handling of error scenarios to avoid writing a set of transformations that are difficult to reason about. When an exception is thrown inside a Future transformation, it bubbles up within the transformation's thread of computation and interrupts downstream transformations. Consider the following motivating example:

Future("not-an-integer").map(_.toInt).map(i => { 
      println("Multiplying") 
      i * 2 
    }) 

What do you expect to occur after the first map transformation? It is clear that the transformation will fail because the provided input cannot be cast to an integer. In this scenario, the Future is considered to be a failed Future and downstream transformations operating on the wrapped Int value, in this example, will not occur. In this simple example, it is obvious that the transformation cannot continue. Imagine a larger code base operating on data more complicated than a single integer with multiple failure scenarios across multiple namespaces and source files. In a real-world setting, it is more challenging to identify where an asynchronous computation broke down.

Future provides facilities for handling failures. It provides recover and recoverWith in order to continue downstream transformations. The signatures are as follows:

def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] 
def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] 

The difference between these two recovery methods is that the partial function provided to recover returns U, while recoverWith returns Future[U]. In our previous example, we can use recover to supply a default value to continue a transformation, as follows:

Future("not-an-integer").map(_.toInt).recover { 
  case _: NumberFormatException => -2 
}.map(i => { 
  println("Multiplying") 
  i * 2 
}) 

Running this snippet produces the following output:

Multiplying 
Multiplication result = -4 

This approach allows you to continue a pipeline of transformations when one transform fails, but it suffers from the same shortcoming as the methods on Await. The returned Future[T] data type does not reflect the possibility of failure. Using recovery methods is error-prone because it is impossible to know whether the error conditions have been handled without reading through the code.

The error handling that we investigated is appropriate to handle failures during a computation. It is likely that after a series of transformations complete, you will wish to perform special logic. Imagine you are building a web service that submits trading orders to exchanges. Order submission is successful if the order was submitted to the exchange; otherwise, it is considered a failed submission. As order submission involves communication with an external system, the exchange, you modeled this action with a Future. Here is what the method handling order submission looks like:

  def submitOrder( 
    ec: ExecutionContext, 
    sendToExchange: ValidatedOrder => Future[OrderSubmitted], 
    updatePositions: OrderSubmitted => Future[AccountPositions], 
    o: RawOrder): Unit = { 
    implicit val iec = ec 
 
    (for { 
      vo <- ValidatedOrder.fromRawOrder(o).fold(
        Future.failed[ValidatedOrder](new Exception(
        "Order failed validation")))(Future.successful) 
      os <- sendToExchange(vo) 
      ap <- updatePositions(os) 
    } yield (os, ap)).onComplete { 
      case Success((os, ap)) => // Marshal order submission info to caller 
      case Failure(e) =>  // Marshal appropriate error response to caller 
    } 
  } 

An ExecutionContext, a way to submit orders, and a way to update a customer's trading positions after the trade is submitted allow a customer provided RawOrder to be submitted to the exchange. In the first processing step, the RawOrder is converted into a ValidatedOrder, and then lifted into a FutureFuture.failure and Future.successful are convenient ways to lift or to wrap a computed value into a Future. The value is lifted into a Future to allow the entire sequence of steps to be written as a single for-comprehension.

Following the completion of all processing steps, onComplete is invoked to asynchronously handle completion of request processing. You can imagine in this context that completing request processing implies creating a serialized version of a response and transmitting this to the caller. Previously, the only mechanism at our disposal to perform work once a value is computed is to block using AwaitonComplete is an asynchronously invoked callback that registers a function to be invoked when the value is completed. As shown in the example, onComplete supports handling success and failure cases, which makes it a general-purpose tool to handle the outcome of a Future transformation. In addition to onCompleteFuture provides onFailure specifically for failure cases and onSuccess and foreach specifically for success cases.

These callback methods expose a method signature that returns Unit. As a functional programmer, you should be leery of invoking these methods because they are side-effecting. The onComplete invocations should only happen at the absolute end of a computation when a side-effect can no longer be deferred. In the web service example, the side-effect is transmission of the response to the caller. Another common use case for using these side-effecting callbacks is to handle cross-cutting concerns, such as application metrics. Coming back to the web service, here is one way to increment an error counter when order submission to the exchange fails:

   (for { 
      vo <- ValidatedOrder.fromRawOrder(o).fold( 
        Future.failed[ValidatedOrder]( 
        new Exception("Order failed validation")))(Future.successful) 
      os <- { 
        val f = sendToExchange(vo) 
        f.onFailure({ case e => incrementExchangeErrorCount() }) 
        f 
      } 
      ap <- updatePositions(os) 
    } yield (os, ap)) 

In this snippet, a side-effect is performed when submission to the exchange fails via the onFailure callback. In this isolated example, it is straightforward to track where the side-effect is happening. However, in a larger system it can be a challenge to identify when and where callbacks were registered. Additionally, from the Future API documentation, we learn that callback execution is unordered, which indicates that all callbacks must be treated independently. This is why you must be disciplined about when and where you apply these side-effects.

An alternative approach to error handling is to use a data type that can encode errors. We have seen this approach applied with Await when Option was the returned data type. Option makes it clear that the computation might fail while remaining convenient to use because its transformations (for example, map) operate on the wrapped value. Unfortunately, Option does not allow us to encode the error. In this case, it is helpful to use another tool from the Scalaz library called disjunction. Disjunction is conceptually similar to Either, which can be used to represent one of two possible types. Disjunction is different from Either because its operations are right-biased. Let's take a look at a simple example to illustrate this idea:

scalaz./.right[Throwable, Int](1).map(_ * 2) 

The / is the shorthand symbol used by Scalaz to represent a disjunction. In this example, a right disjunction is created by wrapping the one integer literal. This disjunction either has the Throwable value or the Int value, and it is analogous to Either[Throwable, Int]. In contrast to Either, the map transformation operates on the right side of the disjunction. In this example, map accepts an Int value as input because the right side of the disjunction is an Int value. As disjunction is right-biased, it is a natural fit to represent failure and success values. Using the infix notation, it is common to define error handling with Future as Future[Throwable / T]. In place of Throwable, one can define an ADT of possible error types to make error handling explicit. This approach is favorable because it enforces handling of failure cases without relying on the author to invoke a recovery method. If you are interested to learn more about how to use disjunction as a tool for error handling, review Eugene Yokota's excellent Scalaz tutorial at http://eed3si9n.com/learning-scalaz/Either.html.

Hampering performance through executor submissions

As Future provides an expressive and easy-to-use API, it is common to perform numerous transforms to complete a computation in a large-scale system. Reflecting on the order submission web service mentioned in the previous section, you can imagine multiple application layers operating on a Future. A production-ready web service typically composes together multiple layers to service a single request. An example request flow may contain the following stages: request deserialization, authorization, application service invocation, database lookups and/or third-party service callouts, and response translation to a JSON format. If each of these stages in the workflow is modeled with a Future, then it is common to have five or more transformations to handle a single request.

Decomposing your software system into small areas of responsibility in a way that is similar to the preceding example is a good engineering practice to support testing in isolation and improving maintainability. However, this approach to software design comes with a performance cost when working with Future. As we have seen through our example usage, nearly all transforms on a Future require submitting work to an Executor. In our example workflow, most stages in the transformation are small. In this scenario, the overhead of submitting work to the executor dominates the execution time of the computation. If the order submission web service services numerous customers with stringent throughput and latency requirements, then it is possible that engineering practices focusing on testability and maintainability will result in poorly performing software.

Hampering performance through executor submissions

If you consider the preceding diagram, you can see a thread pool with four threads being used to apply transforms to a Future. Each transform is submitted to the pool and there is a chance that a different thread is picked for the computation. This diagram visualizes how multiple small transforms may hamper performance due to the overhead of Executor submissions.

Just how large is the overhead of Executor submissions? This is the motivating question to write a benchmark to quantify the overhead of submitting work to an Executor. The benchmark focuses on adding 1 to an integer N-times in two ways. One approach is to perform the addition operation within a single Future, while the second approach is to perform each addition operation with a new Future transformation. The latter approach is a proxy for the stages of order submission request processing that uses multiple Future transformations in a larger software system. Performing integer addition is the proxy operation because it is an extremely cheap computation, which means that the execution time will be dominated by Executor submissions. The benchmarks look like the following:

  @Benchmark 
  def manyTransforms(state: TransformFutureState): Int = { 
    import scala.concurrent.ExecutionContext.Implicits._ 
    val init = Future(0) 
    val res = (1 until state.operations).foldLeft(init)
      ((f, _) => f.map(_ + 1)) 
    Await.result(res, Duration("5 minutes")) 
  } 
 
  @Benchmark 
  def oneTransform(state: TransformFutureState): Int = { 
    import scala.concurrent.ExecutionContext.Implicits._ 
    val res = Future { 
      (1 until state.operations).foldLeft(0)((acc, _) => acc + 1) 
    } 
    Await.result(res, Duration("5 minutes")) 
  } 

TransformFutureState allows the number of operations to be parameterized. manyTransforms represents each addition operation using a map transformation that involves submitting work to an Executor. oneTransform performs all addition operations using a single Executor submission via Future.apply. In this controlled test, Await.result is used as a blocking mechanism to await the completion of the computation. The results of running this test on a two-core machine with five transformations and ten transformations can be seen in the following table:

Benchmark

Map count

Throughput (ops per second)

Error as percentage of throughput

manyTransforms

5

463,614.88

± 1.10

oneTransform

5

412,675.70

± 0.81

manyTransforms

10

118,743.55

± 2.34

oneTransform

10

316,175.79

± 1.79

While both scenarios yield comparable results with five transformations, we can see a clear difference with ten transforms being applied. This benchmark makes it clear that Executor submissions can dominate performance. Although the cost can be high, our advice to you is to model your system without considering this cost up-front. In our experience, it is easier to rework a well-modeled system for performance improvements than it is to extend or to rework a poorly-modeled but performant system. For this reason, we advise against going to great lengths to group Executor submissions when attempting to put together the initial version of a complex system.

Once you have a good design in place, the first step is to benchmark and to profile in order to identify whether Executor submissions are the bottleneck. In the event that you discover that your style of Future usage is causing performance bottlenecks, there are several courses of action you should consider.

The lowest cost development option is to replace unnecessarily costly Future creation with the use of Future.success or Future.failure. The order submission web service took advantage of these factory methods to lift values into a Future. As the value is already computed, these factory methods avoid submitting any tasks to the Executor that are referenced by the provided ExecutionContext. Replacing usages of Future.apply with either Future.successful or Future.failure when the value is already computed can yield cost savings.

A more expensive alternative in terms of development effort is to rework your implementation to group together Future transformations in a way similar to manyTransforms. This tactic involves reviewing each application layer to determine whether transforms within a single layer can be combined. If possible, we recommend that you avoid merging transformations across application layers (for example, between request deserialization or authorization and application service processing) because this weakens your model and increases maintenance cost.

If neither of these options produces acceptable performance, then it may be worthwhile to discuss with the product owners the option of addressing the performance issue with hardware. As your system's design has not been compromised and it reflects solid engineering practices, then it likely can be horizontally scaled or clustered. Depending on the state tracked by your system, this option might be possible without additional development work. Perhaps product owners value a system that can be easily maintained and extended more than performance. If this is the case, adding scale to your system may be a viable way forward.

Provided that you are unable to buy your way out of the performance challenge, then there are three additional possibilities. One option is to investigate an alternative to Future, named Task. This construct, which is provided by the Scalaz library, allows computations to be performed with fewer Executor submissions. This option involves significant development because the Future data type will need to be replaced throughout the application with Task. We will explore Task at the end of this chapter and investigate the performance benefits that it can provide.

Independent of using Task, it can be useful to review your application's model to critically question whether or not there is unnecessary work being done on the critical path. As we saw with MVT's reporting infrastructure and the introduction of stream processing, it is sometimes possible to rethink a design to improve performance. Like the introduction of Task, reconsidering your system's architecture is a large-scale change. The last resort option is to merge application layers in order to support grouping Future transformations. We advise against exercising this option, unless all other suggestions have failed. This option results in a code base that is more difficult to reason about because concerns are no longer separated. In the short-run, you may reap performance benefits, but in our experience, these benefits are outweighed in the long-run by the cost of maintaining and extending such a system.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset