In this chapter, we will switch our focus from collections to a different topic: concurrency. Being able to take advantage of all the CPU resources that your hardware provides is critical to writing performant software. Unfortunately, writing concurrent code is not an easy task because it is easy to write unsafe programs. If you come from Java, you may still have nightmares involving synchronized
blocks and locks! The java.util.concurrent
package provides numerous tools that make writing concurrent code simpler. However, designing stable and reliable concurrent applications can still be a daunting challenge. In this chapter, we will explore the tools that are provided by the Scala standard library to take advantage of concurrency. After a short presentation of the main abstraction, Future
, we will study its behavior and usage pitfalls that we should avoid. We will end this chapter by exploring a possible alternative to Future
named Task
, which is provided by the Scalaz library. In this chapter, we will explore the following topics:
The data scientists are off and running with the data analysis tools that you built for them to research trading strategies. However, they have hit a wall because backtesting strategies is becoming too expensive. As they have built more sophisticated strategies that require more historical data, and employ more stateful algorithms, backtesting has taken longer. Once again, you are being called upon to help out at MVT by leveraging Scala and the functional paradigm to deliver performant software.
The data scientists have incrementally built out a backtesting tool that allows the team to determine a strategy's performance by replaying historical data. This works by providing a preset strategy to run, the ticker to test against, and the time interval of historical data to replay. The backtester loads market data and applies the strategy to generate trading decisions. Once the backtester finishes replaying historical data, it summarizes and displays strategy performance results. The backtester is heavily depended on to determine the efficacy of proposed trading strategies before putting them into production for live trading.
To begin familiarizing yourself with the backtester, you look into the code, as follows:
sealed trait Strategy case class PnL(value: BigDecimal) extends AnyVal case class BacktestPerformanceSummary(pnl: PnL) case class Ticker(value: String) extends AnyVal def backtest( strategy: Strategy, ticker: Ticker, testInterval: Interval): BacktestPerformanceSummary = ???
In the preceding snapshot from the data analysis repository, you see the primary method that drives backtesting. Given a Strategy
, Ticker
, and Interval
, it can produce BacktestPerformanceSummary
. Scanning the repository, you find a file named CrazyIdeas.scala
that shows Dave as the only commit author. In here, you see example invocations of the backtester:
def lastMonths(months: Int): Interval = new Interval(new DateTime().minusMonths(months), new DateTime()) backtest(Dave1, Ticker("AAPL"), lastMonths(3)) backtest(Dave1, Ticker("GOOG"), lastMonths(3)) backtest(Dave2, Ticker("AAPL"), lastMonths(3)) backtest(Dave2, Ticker("GOOG"), lastMonths(3))
The usage of the backtester gives you a clue to a possible performance improvement. It looks like when Dave has a new idea, he wants to evaluate its performance on multiple symbols and compare it against other strategies. In its current form, backtests are performed sequentially. One way to improve the execution speed of the backtester is to parallelize the execution of all backtesting runs. If each invocation of the backtester is parallelized and if there are spare hardware resources, then backtesting multiple strategies and symbols will finish faster. To understand how to parallelize the backtester, we first need to dive into the topic of asynchronous programming and then see how Scala supports concurrency.
Before diving into the code, we need to enrich our vocabulary to discuss the properties of asynchronous programming. Concurrency and parallelism are often used interchangeably, but there is an important distinction between these two terms. Concurrency involves two (or more) tasks that are started and executed in overlapping time periods. Both tasks are in-progress (that is, they are running) at the same time, but only one task may be performing actual work at any instant in time. This is the case when you write concurrent code on a single-core machine. Only one task can progress at a time, but multiple tasks are ongoing concurrently.
Parallelism exists only when both tasks are truly running at the same time. With a dual-core machine, you can execute two tasks at the same time. From this definition, we see that parallelism depends on the hardware that is available for use. This means that the property of concurrency can be added to a program, but parallelism is outside the control of the software.
To better illustrate these concepts, consider the example of painting a room. If there is only one painter, the painter can paint the first coat on a wall, move on to the next wall, go back to the first wall for the second coat and then finish the second wall. The painter is painting both walls concurrently, but can only spend time on one wall at any given time. If two painters are on the job, they can each focus on one wall and paint them in parallel.
The primary construct in Scala to drive concurrent programming is Future
. Found in the scala.concurrent
package, Future
can be seen as a container for a value that may not yet exist. Let's look at a simple example to illustrate usage:
scala> import scala.concurrent.Future import scala.concurrent.Future scala> import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext scala> val context: ExecutionContext = scala.concurrent.ExecutionContext.global context: scala.concurrent.ExecutionContext = scala.concurrent.impl.ExecutionContextImpl@3fce8fd9 scala> def example(){ println("Starting the example") Future{ println("Starting the Future") Thread.sleep(1000) // simulate computation println("Done with the computation") }(context) println("Ending example") }
The preceding example shows a short method, creating a Future
value simulates an expensive computation and prints a couple of lines to make it easier for us to understand the flow of the application. When running example
, we see the following output:
scala> example() Starting the example Ending example Starting the future // a pause Done with the computation
We can see that Future
was executed after the end of the example
method. This is because when a Future
is created, it starts its computation concurrently. You may be wondering, "What is this context
object of the ExecutionContext
type that is used when creating the Future
?" We will explore ExecutionContext
in-depth shortly, but for now, we treat it as the the object that is responsible for the execution of the Future
. We import scala.concurrent.ExecutionContext.global
, which is a default object that is created by the standard library to be able to execute the Future
.
A Future
object is a stateful object. It is either not yet complete when the computation is underway or completed once the computation finishes. Furthermore, a completed Future
can be either a success when the computation was able to complete, or it can be a failure if an exception was thrown during the computation.
The Future
API provides combinators to compose the Future
instances and manipulate the result that they contain:
scala> import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.ExecutionContext.Implicits.global scala> import scala.concurrent.Future import scala.concurrent.Future scala> Future(1).map(_ + 1).filter(_ % 2 == 0).foreach(println) 2
This snippet from the Scala console shows construction of a Future
data type that wraps a constant integer value. We see that the integer contained in the Future
data type is transformed using functions that are similar to the ones that we expect to find on Option
and collection data types. These transforms are applied once the preceding Future
completes, and return a new Future
.
As promised, we now look into ExecutionContext
. The ExecutionContext
can be thought of as the machinery behind Future
that provides runtime asynchrony. In the previous snippet, a Future
was created to perform simple addition and modulo division without explicitly providing an ExecutionContext
instance at the call site. Instead, only an import of the global
object was provided. The snippet executes because global
is an implicit value and the signature of map
accepts an implicit ExecutionContext
. Let's look at the following signature of map
to deepen our understanding:
def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S]
From the signature of map
, we see that unlike the map
transformation on List
, the Future
requires a curried, implicit ExecutionContext
argument. To understand how an ExecutionContext
provides runtime asynchrony, we need to first understand its operations:
trait ExecutionContext { def execute(runnable: Runnable): Unit def reportFailure(cause: Throwable): Unit def prepare(): ExecutionContext = this }
The execute
is a side-effecting method that operates on a java.lang.Runnable
. For those familiar with concurrency in Java, you most likely recall that Runnable
is the commonly-used interface to allow threads and other java.util.concurrent
abstractions to execute code concurrently. Although we do not know how Future
achieves runtime asynchrony yet, we do know there is a link between Future
execution and creation of a Runnable
.
The next question we will answer is, "How do I create an ExecutionContext
? " By studying the companion object, we discover the following signatures:
def fromExecutorService(e: ExecutorService, reporter: Throwable => Unit): ExecutionContextExecutorService def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService def fromExecutor(e: Executor, reporter: Throwable => Unit): ExecutionContextExecutor def fromExecutor(e: Executor): ExecutionContextExecutor
The standard library provides convenient ways to create an ExecutionContext
from either a java.util.concurrent.Executor
or java.util.concurrent.ExecutorService
.
If you are unfamiliar with the machinery that is provided by the java.util.concurrent
package and you are looking for a deeper treatment than that provided by the API documentation, we encourage you to read Java Concurrency in Practice by Brian Goetz (http://jcip.net/). Although Java Concurrency in Practice was written around the release of JDK 6, it contains numerous principles that continue to apply today. Reading this book will provide you with a deep understanding of the JDK-provided concurrency primitives that are utilized by the Scala standard library.
The return type of the factory methods is a more specialized version of ExecutionContext
. The standard library defines the following inheritance chain for ExecutionContext
:
trait ExecutionContextExecutor extends ExecutionContext with java.util.concurrent.Executor trait ExecutionContextExecutorService extends ExecutionContextExecutor with java.util.concurrent.ExecutorService
Also, in the ExecutionContext
companion object, we find the implicit context used in our first example, as follows:
def global: ExecutionContextExecutor = Implicits.global
The documentation for the definition of Implicits.global
indicates that this ExecutionContext
is backed by a thread pool with a thread count that is equal to the available processor count. Our dive into ExecutionContext
shows us how the simple Future
example runs. We can illustrate how a Future
applies its ExecutionContext
to execute on multiple threads:
Future(1).map(i => { println(Thread.currentThread().getName) i + 1 }).filter(i => { println(Thread.currentThread().getName) i % 2 == 0 }).foreach(println)
We extend the original snippet to print the name of the thread performing each transformation. When run on a machine with multiple cores, this snippet yields variable output, depending on which threads pick up the transformations. Here is an example output:
ForkJoinPool-1-worker-3 ForkJoinPool-1-worker-5 2
This example shows that one worker-3
thread performed the map
transformation while another worker-5
thread performed the filter
transformation. There are two key insights to draw from our simple example about how Future
affects control flow. First, Future
is a data type for concurrency that enables us to break the control flow of a program into multiple logical threads of processing. Second, our example shows that Future
begins execution immediately upon creation. This means that transformations are applied immediately in a different flow of the program. We can use these insights to improve the runtime performance of Dave's crazy ideas.
We apply Future
to Dave's set of backtests to improve performance. We believe there is an opportunity for a performance improvement because Dave's laptop has four CPU cores. This means that by adding concurrency to our program, we will be able to benefit from runtime parallelism. Our first attempt utilizes a for-comprehension:
implicit val ec = scala.concurrent.ExecutionContext.Implicits.global for { firstDaveAapl <- Future(backtest(Dave1, Ticker("AAPL"), lastMonths(3))) firstDaveGoog <- Future(backtest(Dave1, Ticker("GOOG"), lastMonths(3))) secondDaveAapl <- Future(backtest(Dave2, Ticker("AAPL"), lastMonths(3))) secondDaveGoog <- Future(backtest(Dave2, Ticker("GOOG"), lastMonths(3))) } yield (firstDaveAapl, firstDaveGoog, secondDaveAapl, secondDaveGoog)
Each backtest invocation is wrapped with the creation of a Future
instance by calling Future.apply
. This companion object method uses a by-name parameter to defer evaluation of the argument, which, in this case, is the invocation of backtest
:
def apply[T](body: =>T)(executor: ExecutionContext): Future[T]
After running the new version of CrazyIdeas.scala
, you are disappointed to see the runtime execution has not improved. You quickly double-check the number of CPUs on your Linux box, as follows:
$ cat /proc/cpuinfo | grep processor | wc -l 8
Having confirmed there are eight cores available on your laptop, you wonder why the execution time matches the original serial execution time. The solution here is to consider how the for-comprehension is compiled. The for-comprehension is equivalent to the following simpler example:
Future(1).flatMap(f1 => Future(2).flatMap(f2 => Future(3).map(f3 => (f1, f2, f3))))
In this desugared representation of the for-comprehension, we see that the second Future
is created and evaluated within the flatMap
transformation of the first Future
. Any transformation applied to a Future
(for example, flatMap
) is only invoked once the value provided to the transform has been computed. This means that the Future
in the preceding example and the for-comprehension are executed sequentially. To achieve the concurrency that we are looking for, we must instead modify CrazyIdeas.scala
to look like the following:
val firstDaveAaplF = Future(backtest(Dave1, Ticker("AAPL"), lastMonths(3))) val firstDaveGoogF = Future(backtest(Dave1, Ticker("GOOG"), lastMonths(3))) val secondDaveAaplF = Future(backtest(Dave2, Ticker("AAPL"), lastMonths(3))) val secondDaveGoogF = Future(backtest(Dave2, Ticker("GOOG"), lastMonths(3))) for { firstDaveAapl <- firstDaveAaplF firstDaveGoog <- firstDaveGoogF secondDaveAapl <- secondDaveAaplF secondDaveGoog <- secondDaveGoogF } yield (firstDaveAapl, firstDaveGoog, secondDaveAapl, secondDaveGoog)
In this snippet, four backtests are kicked off concurrently and the results are transformed into a Future
of a Tuple4
consisting of four BacktestPerformanceSummary
values. Seeing is believing, and after showing Dave the faster runtime of his backtests, he is excited to iterate quickly on new backtest ideas. Dave never misses a chance to throw around a pun, exclaiming, "Using all my cores is making my laptop fans really whiz. Not sure I'm a fan of the noise, but I sure do like the performance!"
In the previous example, we illustrated the ease of use of the Future
API by investigating how to introduce concurrency to the backtester. Like any powerful tool, your usage of Future
must be disciplined to ensure correctness and performance. This section evaluates topics that commonly cause confusion and error when using the Future
to add concurrency to your program. We will detail performing side-effects, blocking execution, handling failures, choosing an appropriate execution context, and performance considerations.
When programming with Future
, it is important to remember that Future
is inherently a side-effecting construct. Unless the success
or failure
factory methods are used to lift a value into a Future
, work is scheduled to be executed on a different thread (part of the ExecutionContext
that is used to create the Future
). More importantly, once executed, a Future
cannot be executed again. Consider the following snippet:
scala> import scala.concurrent.Future import scala.concurrent.Future scala> import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.ExecutionContext.Implicits.global scala> val f = Future{ println("FOO"); 40 + 2} FOO f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@5575e0df scala> f.value res3: Option[scala.util.Try[Int]] = Some(Success(42))
The Future
is computed and prints FOO
as expected. We can then access the value wrapped in the Future
. Note that when accessing the value, nothing is printing on the console. Once completed, the Future
is merely a wrapper for a realized value. If you want to perform the computation again, you need to create a new instance of Future
.
Note that the preceding example uses Future.value
to extract the result of the computation. This is for the sake of simplicity. Production code should rarely, if ever, use this method. Its return type is defined as Option[Try[A]]
. An Option
is used to represent the case of a completed Future
with a Some
, and an unrealized Future
with a None
. Furthermore, remember that a realized Future
can have two states: success or failure. This is the purpose of the inner Try
. Like Option.get
, it is almost never a good idea to use Future.value
. To extract a value from a Future
, refer to the additional techniques described next.
When we added concurrency to the backtester, we wrote a for-comprehension returning Future[(BacktestPerformanceSummary, BacktestPerformanceSummary, BacktestPerformanceSummary, BacktestPerformanceSummary)]
, which may leave you wondering how you access the value wrapped in the Future
. Another way of asking the question is, "Given Future[T]
, how do I return T
?" The short answer is, "You don't!" Programming with many Future
requires a shift in thinking away from synchronous execution towards asynchronous execution. When programming with an asynchronous model, the goal is to avoid working with T
directly because it implies a synchronous contract.
In practice, there are situations where it is useful to have the Future[T] => T
function. For example, consider the backtester snippet. If the code from the snippet is used to create a program by defining an object
extending App
, the program will terminate before backtesting completes. As the threads in the ExecutionContext
global are daemon threads, the JVM terminates immediately after creating the Future
. In this scenario, we need a synchronization mechanism to pause execution until the result is ready. By extending the Awaitable
trait, Future
is able to provide such facilities. The Await
module exposes two methods that achieve this goal:
def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type def result[T](awaitable: Awaitable[T], atMost: Duration): T
As Future
extends Awaitable
, a Future
can be supplied as an argument to either method. The ready
halts program flow until T
is available and returns the completed Future[T]
. In practice, ready
is rarely used because it is conceptually strange to return a Future[T]
from a synchronous call instead of T
. You are more likely to commonly use result
, which provides the desired transformation returning T
given Future[T]
. For example, CrazyIdeas.scala
can be modified to look like the following:
val summariesF = for { firstDaveAapl <- Future(backtest(Dave1, Ticker("AAPL"), lastMonths(3))) firstDaveGoog <- Future(backtest(Dave1, Ticker("GOOG"), lastMonths(3))) secondDaveAapl <- Future(backtest(Dave2, Ticker("AAPL"), lastMonths(3))) secondDaveGoog <- Future(backtest(Dave2, Ticker("GOOG"), lastMonths(3))) } yield (firstDaveAapl, firstDaveGoog, secondDaveAapl, secondDaveGoog) Await.result(summariesF, scala.concurrent.duration.Duration(1, java.util.concurrent.TimeUnit.SECONDS))
In this snippet, we see the blocking, synchronous invocation of Await.result
to return the Tuple
of Future[BacktestPerformanceSummary]
. This blocking call is parameterized with a timeout to defend against the scenario where the Future
is not computed within a certain amount of time. Using a blocking call to retrieve the backtest results means that the JVM will only exit after the backtest completes or when the timeout expires. When the timeout expires and the backtest is incomplete, result
and ready
throw a TimeoutException
.
Blocking execution of your program is potentially detrimental to your program's performance, and it should be used with caution. Using the methods on the Await
companion object make blocking calls easy to recognize. As ready
and result
throw an exception when timing out, rather than returning a different data type, you must take extra caution to handle this scenario. You should treat any synchronous call involving asynchrony (that either does not provide a timeout or does not handle the timeout) as a bug.
Programming asynchronously requires a mindset shift to write a program that describes what to do when the value appears rather than writing programs that require a value to exist before acting on it. You should be suspicious of any use of Await
that interrupts transformation of a to-be-computed value. A set of transformations should be composed by acting upon Future[T]
instead of T
. Usage of Await
should be restricted to scenarios where a program has no other work to perform and requires the result of the transformation, as we saw with the backtester.
As the standard library models timeout with an exception instead of a different return type, it is hard to enforce that a timeout is always handled One way to improve safety is to write a utility method that returns Option[T]
instead of T
to account for the timeout scenario:
object SafeAwait { def result[T]( awaitable: Awaitable[T], atMost: Duration): Option[T] = Try(Await.result(awaitable, atMost)) match { case Success(t) => Some(t) case Failure(_: TimeoutException) => None case Failure(e) => throw e } }
With this new method, an entire error class is eliminated. As you encounter other unsafe transformations, consider defining methods that return a data type that encodes expected errors to avoid accidentally mishandling the transformation result. What other examples of unsafe transformations come to mind?
Working with Future
requires disciplined handling of error scenarios to avoid writing a set of transformations that are difficult to reason about. When an exception is thrown inside a Future
transformation, it bubbles up within the transformation's thread of computation and interrupts downstream transformations. Consider the following motivating example:
Future("not-an-integer").map(_.toInt).map(i => { println("Multiplying") i * 2 })
What do you expect to occur after the first map
transformation? It is clear that the transformation will fail because the provided input cannot be cast to an integer. In this scenario, the Future
is considered to be a failed Future
and downstream transformations operating on the wrapped Int
value, in this example, will not occur. In this simple example, it is obvious that the transformation cannot continue. Imagine a larger code base operating on data more complicated than a single integer with multiple failure scenarios across multiple namespaces and source files. In a real-world setting, it is more challenging to identify where an asynchronous computation broke down.
Future
provides facilities for handling failures. It provides recover
and recoverWith
in order to continue downstream transformations. The signatures are as follows:
def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U]
The difference between these two recovery methods is that the partial function provided to recover
returns U
, while recoverWith
returns Future[U]
. In our previous example, we can use recover
to supply a default value to continue a transformation, as follows:
Future("not-an-integer").map(_.toInt).recover { case _: NumberFormatException => -2 }.map(i => { println("Multiplying") i * 2 })
Running this snippet produces the following output:
Multiplying Multiplication result = -4
This approach allows you to continue a pipeline of transformations when one transform fails, but it suffers from the same shortcoming as the methods on Await
. The returned Future[T]
data type does not reflect the possibility of failure. Using recovery methods is error-prone because it is impossible to know whether the error conditions have been handled without reading through the code.
The error handling that we investigated is appropriate to handle failures during a computation. It is likely that after a series of transformations complete, you will wish to perform special logic. Imagine you are building a web service that submits trading orders to exchanges. Order submission is successful if the order was submitted to the exchange; otherwise, it is considered a failed submission. As order submission involves communication with an external system, the exchange, you modeled this action with a Future
. Here is what the method handling order submission looks like:
def submitOrder( ec: ExecutionContext, sendToExchange: ValidatedOrder => Future[OrderSubmitted], updatePositions: OrderSubmitted => Future[AccountPositions], o: RawOrder): Unit = { implicit val iec = ec (for { vo <- ValidatedOrder.fromRawOrder(o).fold( Future.failed[ValidatedOrder](new Exception( "Order failed validation")))(Future.successful) os <- sendToExchange(vo) ap <- updatePositions(os) } yield (os, ap)).onComplete { case Success((os, ap)) => // Marshal order submission info to caller case Failure(e) => // Marshal appropriate error response to caller } }
An ExecutionContext
, a way to submit orders, and a way to update a customer's trading positions after the trade is submitted allow a customer provided RawOrder
to be submitted to the exchange. In the first processing step, the RawOrder
is converted into a ValidatedOrder
, and then lifted into a Future
. Future.failure
and Future.successful
are convenient ways to lift or to wrap a computed value into a Future
. The value is lifted into a Future
to allow the entire sequence of steps to be written as a single for-comprehension.
Following the completion of all processing steps, onComplete
is invoked to asynchronously handle completion of request processing. You can imagine in this context that completing request processing implies creating a serialized version of a response and transmitting this to the caller. Previously, the only mechanism at our disposal to perform work once a value is computed is to block using Await
. onComplete
is an asynchronously invoked callback that registers a function to be invoked when the value is completed. As shown in the example, onComplete
supports handling success and failure cases, which makes it a general-purpose tool to handle the outcome of a Future
transformation. In addition to onComplete
, Future
provides onFailure
specifically for failure cases and onSuccess
and foreach
specifically for success cases.
These callback methods expose a method signature that returns Unit
. As a functional programmer, you should be leery of invoking these methods because they are side-effecting. The onComplete
invocations should only happen at the absolute end of a computation when a side-effect can no longer be deferred. In the web service example, the side-effect is transmission of the response to the caller. Another common use case for using these side-effecting callbacks is to handle cross-cutting concerns, such as application metrics. Coming back to the web service, here is one way to increment an error counter when order submission to the exchange fails:
(for { vo <- ValidatedOrder.fromRawOrder(o).fold( Future.failed[ValidatedOrder]( new Exception("Order failed validation")))(Future.successful) os <- { val f = sendToExchange(vo) f.onFailure({ case e => incrementExchangeErrorCount() }) f } ap <- updatePositions(os) } yield (os, ap))
In this snippet, a side-effect is performed when submission to the exchange fails via the onFailure
callback. In this isolated example, it is straightforward to track where the side-effect is happening. However, in a larger system it can be a challenge to identify when and where callbacks were registered. Additionally, from the Future
API documentation, we learn that callback execution is unordered, which indicates that all callbacks must be treated independently. This is why you must be disciplined about when and where you apply these side-effects.
An alternative approach to error handling is to use a data type that can encode errors. We have seen this approach applied with Await
when Option
was the returned data type. Option
makes it clear that the computation might fail while remaining convenient to use because its transformations (for example, map
) operate on the wrapped value. Unfortunately, Option
does not allow us to encode the error. In this case, it is helpful to use another tool from the Scalaz library called disjunction. Disjunction is conceptually similar to Either
, which can be used to represent one of two possible types. Disjunction is different from Either
because its operations are right-biased. Let's take a look at a simple example to illustrate this idea:
scalaz./.right[Throwable, Int](1).map(_ * 2)
The /
is the shorthand symbol used by Scalaz to represent a disjunction. In this example, a right disjunction is created by wrapping the one integer literal. This disjunction either has the Throwable
value or the Int
value, and it is analogous to Either[Throwable, Int]
. In contrast to Either
, the map
transformation operates on the right side of the disjunction. In this example, map
accepts an Int
value as input because the right side of the disjunction is an Int
value. As disjunction is right-biased, it is a natural fit to represent failure and success values. Using the infix notation, it is common to define error handling with Future
as Future[Throwable / T]
. In place of Throwable
, one can define an ADT of possible error types to make error handling explicit. This approach is favorable because it enforces handling of failure cases without relying on the author to invoke a recovery method. If you are interested to learn more about how to use disjunction as a tool for error handling, review Eugene Yokota's excellent Scalaz tutorial at http://eed3si9n.com/learning-scalaz/Either.html.
As Future
provides an expressive and easy-to-use API, it is common to perform numerous transforms to complete a computation in a large-scale system. Reflecting on the order submission web service mentioned in the previous section, you can imagine multiple application layers operating on a Future
. A production-ready web service typically composes together multiple layers to service a single request. An example request flow may contain the following stages: request deserialization, authorization, application service invocation, database lookups and/or third-party service callouts, and response translation to a JSON format. If each of these stages in the workflow is modeled with a Future
, then it is common to have five or more transformations to handle a single request.
Decomposing your software system into small areas of responsibility in a way that is similar to the preceding example is a good engineering practice to support testing in isolation and improving maintainability. However, this approach to software design comes with a performance cost when working with Future
. As we have seen through our example usage, nearly all transforms on a Future
require submitting work to an Executor
. In our example workflow, most stages in the transformation are small. In this scenario, the overhead of submitting work to the executor dominates the execution time of the computation. If the order submission web service services numerous customers with stringent throughput and latency requirements, then it is possible that engineering practices focusing on testability and maintainability will result in poorly performing software.
If you consider the preceding diagram, you can see a thread pool with four threads being used to apply transforms to a Future
. Each transform is submitted to the pool and there is a chance that a different thread is picked for the computation. This diagram visualizes how multiple small transforms may hamper performance due to the overhead of Executor
submissions.
Just how large is the overhead of Executor
submissions? This is the motivating question to write a benchmark to quantify the overhead of submitting work to an Executor
. The benchmark focuses on adding 1 to an integer N-times in two ways. One approach is to perform the addition operation within a single Future
, while the second approach is to perform each addition operation with a new Future
transformation. The latter approach is a proxy for the stages of order submission request processing that uses multiple Future
transformations in a larger software system. Performing integer addition is the proxy operation because it is an extremely cheap computation, which means that the execution time will be dominated by Executor
submissions. The benchmarks look like the following:
@Benchmark def manyTransforms(state: TransformFutureState): Int = { import scala.concurrent.ExecutionContext.Implicits._ val init = Future(0) val res = (1 until state.operations).foldLeft(init) ((f, _) => f.map(_ + 1)) Await.result(res, Duration("5 minutes")) } @Benchmark def oneTransform(state: TransformFutureState): Int = { import scala.concurrent.ExecutionContext.Implicits._ val res = Future { (1 until state.operations).foldLeft(0)((acc, _) => acc + 1) } Await.result(res, Duration("5 minutes")) }
TransformFutureState
allows the number of operations to be parameterized. manyTransforms
represents each addition operation using a map
transformation that involves submitting work to an Executor
. oneTransform
performs all addition operations using a single Executor
submission via Future.apply
. In this controlled test, Await.result
is used as a blocking mechanism to await the completion of the computation. The results of running this test on a two-core machine with five transformations and ten transformations can be seen in the following table:
Benchmark |
Map count |
Throughput (ops per second) |
Error as percentage of throughput |
|
5 |
463,614.88 |
± 1.10 |
|
5 |
412,675.70 |
± 0.81 |
|
10 |
118,743.55 |
± 2.34 |
|
10 |
316,175.79 |
± 1.79 |
While both scenarios yield comparable results with five transformations, we can see a clear difference with ten transforms being applied. This benchmark makes it clear that Executor
submissions can dominate performance. Although the cost can be high, our advice to you is to model your system without considering this cost up-front. In our experience, it is easier to rework a well-modeled system for performance improvements than it is to extend or to rework a poorly-modeled but performant system. For this reason, we advise against going to great lengths to group Executor
submissions when attempting to put together the initial version of a complex system.
Once you have a good design in place, the first step is to benchmark and to profile in order to identify whether Executor
submissions are the bottleneck. In the event that you discover that your style of Future
usage is causing performance bottlenecks, there are several courses of action you should consider.
The lowest cost development option is to replace unnecessarily costly Future
creation with the use of Future.success
or Future.failure
. The order submission web service took advantage of these factory methods to lift values into a Future
. As the value is already computed, these factory methods avoid submitting any tasks to the Executor
that are referenced by the provided ExecutionContext
. Replacing usages of Future.apply
with either Future.successful
or Future.failure
when the value is already computed can yield cost savings.
A more expensive alternative in terms of development effort is to rework your implementation to group together Future
transformations in a way similar to manyTransforms
. This tactic involves reviewing each application layer to determine whether transforms within a single layer can be combined. If possible, we recommend that you avoid merging transformations across application layers (for example, between request deserialization or authorization and application service processing) because this weakens your model and increases maintenance cost.
If neither of these options produces acceptable performance, then it may be worthwhile to discuss with the product owners the option of addressing the performance issue with hardware. As your system's design has not been compromised and it reflects solid engineering practices, then it likely can be horizontally scaled or clustered. Depending on the state tracked by your system, this option might be possible without additional development work. Perhaps product owners value a system that can be easily maintained and extended more than performance. If this is the case, adding scale to your system may be a viable way forward.
Provided that you are unable to buy your way out of the performance challenge, then there are three additional possibilities. One option is to investigate an alternative to Future
, named Task
. This construct, which is provided by the Scalaz library, allows computations to be performed with fewer Executor
submissions. This option involves significant development because the Future
data type will need to be replaced throughout the application with Task
. We will explore Task
at the end of this chapter and investigate the performance benefits that it can provide.
Independent of using Task
, it can be useful to review your application's model to critically question whether or not there is unnecessary work being done on the critical path. As we saw with MVT's reporting infrastructure and the introduction of stream processing, it is sometimes possible to rethink a design to improve performance. Like the introduction of Task
, reconsidering your system's architecture is a large-scale change. The last resort option is to merge application layers in order to support grouping Future
transformations. We advise against exercising this option, unless all other suggestions have failed. This option results in a code base that is more difficult to reason about because concerns are no longer separated. In the short-run, you may reap performance benefits, but in our experience, these benefits are outweighed in the long-run by the cost of maintaining and extending such a system.