Discovering Future
and adopting an asynchronous mindset helped you better utilize your computing resources to test multiple strategies and tickers faster. You improved performance by treating the backtest as a black box. Without changing the implementation of the backtest, there were straightforward performance wins. Identifying logical sequences of transformations as candidates for concurrency is a good strategy to apply when considering how to speed up your software.
Let's extend this idea to a smaller logical unit of processing within the backtester. A backtest exercises a strategy for a ticker across a time period. After speaking with Dave, you discover that MVT does not maintain positions overnight. At the end of each trading day, MVT trading systems mitigate risk by ensuring that all stock positions are liquidated. This is done to defend against volatile overnight price moves after the market closes, which the company is unable to react to by trading. As positions are not held overnight, each trading day can be simulated independently of the previous trading day. Returning to our asynchronous mindset, this insight implies that trading day simulations can be performed concurrently.
Before jumping into the implementation using Future
, we will share an alternative abstraction, named Task
, which is provided by the Scalaz library. Task
provides compelling usage reasons for our proposed backtest modifications. We introduce Task
next, provided that you are up to the task!
Scalaz Task
provides a different approach to achieve concurrency. Although Task
can be used in a way that mimics Future
, there are important conceptual differences between these two abstractions. Task
allows fine-grained control over asynchronous execution, which provides performance benefits. Task
maintains referential transparency as well, which provides stronger reasoning abilities. Referential transparency is a property of expressions that are side-effect free. To better understand this principle, consider the following simple sum
method:
def sum(x: Int, y: Int): Int = x + y
Imagine that we are performing two summations:
sum(sum(2, 3), 4)
As sum
is side-effect free, we can replace sum(2, 3)
with its result, as follows:
sum(5, 4)
This expression will always evaluate to 9, which satisfies referential transparency. Now imagine a twist in the implementation of sum
:
class SumService(updateDatabase: () => Unit) { def sum(x: Int, y: Int): Int = { updateDatabase() x + y } }
Now, sum
includes a side-effect of writing to a database that breaks referential transparency. We can no longer perform the replacement of sum(2, 3)
with the value 9 because then the database will not be updated. Referential transparency is a concept at the heart of the functional programming paradigm because it provides strong reasoning guarantees. The Haskell wiki provides additional commentary and examples worth reviewing at https://wiki.haskell.org/Referential_transparency.
Let's take a look at common Task
API usage to better understand how Task
works.
The methods provided by the Task
companion object are the main entry points to the API, and the best ways to create an instance of Task
. The Task.apply
is the first method to inspect. It takes a computation returning an instance of A
(that is, a by-name parameter of the A
type) and an implicit ExecutorService
to run the computation. Contrary to Future
, which uses ExecutionContext
as an abstraction for a thread pool, Task
uses the ExecutorService
, which is defined in the Java standard library:
scala> val t = Task { | println("Starting task") | 40 + 2 | } t: scalaz.concurrent.Task[Int] = scalaz.concurrent.Task@300555a9
The first thing that you may have noticed is that, even though we instantiated a new Task
, nothing is printed on the screen. This is an important difference when comparing Task
and Future
; while Future
is eagerly evaluated, a Task
is not computed until you explicitly ask for it:
scala> t.unsafePerformSync Starting task res0: Int = 42
The preceding example calls the unsafePerformSync
instance method to execute the task. We can see the println
as well as the returned result 42
. Note that unsafePerformSync
is an unsafe call. If the computation throws an exception, the exception is re-thrown by unsafePerformSync
. To avoid this side-effect, calling unsafePerformSyncAttempt
is preferred. The unsafePerformSyncAttempt
instance catches the exception and has a return type of Throwable / A
, which allows you to cleanly handle the failure case. Note that when creating the task t
, we did not provide an ExecutorService
. By default, apply
creates a Task
to be run on DefaultExecutorService
, a fixed thread pool for which the size is based on the count of available processors on the machine using a default parameter. The DefaultExecutorService
is analogous to the global ExecutionContext
that we explored with Future
. It is CPU-bound and sized based on the available cores on the machine. We can also supply a different ExecutorService
at creation time:
scala> val es = Executors.newFixedThreadPool(4) es: java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPoolExecutor@4c50cd8c[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] scala> val t = Task { println("Starting task on thread " + Thread.currentThread.getName) 40 + 2 }(es) t: scalaz.concurrent.Task[Int] = scalaz.concurrent.Task@497db010 scala> println("Calling run from " + Thread.currentThread.getName) Calling run from run-main-1 scala> t.unsafePerformSync Starting task on thread pool-8-thread-2 res2: Int = 42
The output shows that the Task
is executed on the supplied ExecutorService
, not on the main thread.
Speaking of Task
execution, let's perform a little experiment. We will create an instance of Task
and call unsafePerformSync
twice in a row:
scala> val t = Task { | println("Starting task") | 40 + 2 | } t: scalaz.concurrent.Task[Int] = scalaz.concurrent.Task@300555a9 scala> t.unsafePerformSync Starting task res0: Int = 42 scala> t.unsafePerformSync Starting task res1: Int = 42
We observe that Starting task
prints after each call to unsafePerformSync
. This indicates that the full computation is executed each time we call unsafePerformSync
. That is another difference with Future
. While a Future
memorizes its result after the computation, a Task
performs its computation each time we call unsafePerformSync
. In other words, Task
is referentially transparent and, therefore, closer to the functional programming paradigm than Future
.
Like Future
, it is possible (and even recommended) to use Task
in an asynchronous way. An instance of Task
can be executed asynchronously by calling unsafePerformAsync
. This method takes a callback of type(Throwable / A) => Unit
that is called at the end of the computation. Observe the following snippet:
def createAndRunTask(): Unit = { val t = Task { println("Computing the answer...") Thread.sleep(2000) 40 + 2 } t.unsafePerformAsync { case /-(answer) => println("The answer is " + answer) case -/(ex) => println("Failed to compute the answer: " + ex) } println("Waiting for the answer") }
We create ourTask
, and add a Thread.sleep
to simulate an expensive computation. We call unsafePerformAsync
and use a simple callback to print the answer (or an exception, if the computation fails). We call createAndRunTask
and observe the following output:
scala> TaskExample.createAndRunTask() Waiting for the answer scala> Computing the answer... The answer is 42
We can see that our last statement, "Waiting for the answer" was printed first. This is because unsafePerformAsync
returns immediately. We can see the statement from our computation, as well as the answer printed in our callback. This method is a rough equivalent to onComplete
, which is defined on Scala's Future
.
Another useful method provided by the companion object of Task
is async
. Remember how we previously used Promise
to turn a callback-based API into an API returning an instance of Future
? It is possible to achieve the same goal with Task
; that is, we can turn a callback-based API into a more monadic API returning a Task
, as follows:
object CallbackAPI { def doCoolThings[A](a: => A, f: (Throwable / A) => Unit): Unit = ??? } def doCoolThingsToTask[A](a: => A): Task[A] = Task.async { f => CallbackAPI.doCoolThings[A](a, res => f(res)) }
Evaluating this method in the REPL yields the following:
> val t = doCoolThingsToTask(40+2) > t.map(res => res / 2).unsafePerformSync res2: Int = 21
Our doCoolThingsToTask
method uses Task.async
to create a Task
instance from a callback-based API that is defined in CallbackAPI
. The Task.async
can even be used to turn a Scala Future
into a Scalaz Task
:
def futureToTask[A](future: Future[A])(implicit ec: ExecutionContext): Task[A] = Task.async { f => future.onComplete { case Success(res) => f(/-(res)) case Failure(ex) => f(-/(ex)) } }
Note that we have to supply an ExecutionContext
to be able to call onComplete
on Future
. This is due to Future
eager evaluation. Almost all methods that are defined on Future
will submit a computation to a thread pool immediately.
Understanding the Task
execution model requires understanding the Scalaz Future
execution model because Task
composes a Scalaz Future
and adds error handling. This is visible from the definition of Task
:
class Task[+A](val get: Future[Throwable / A])
In this definition, Future
is the not the Scala standard library version, but instead this is an alternative version that is provided by Scalaz. The Scalaz Future
decouples defining transformations from execution strategy, providing us with fine-grained control over Executor
submissions. Scalaz Future
accomplishes this by defining itself as a trampolining computation. Trampolining is a technique that describes a computation as a discrete series of chunks that are run using constant space. To dive into the details of how a trampoline works, we recommend reading Runar Bjarnason's paper, Stackless Scala With Free Monads, available at http://blog.higher-order.com/assets/trampolines.pdf.
Task
builds on Scalaz Future
by providing error handling with the Scalaz /
disjunction. Task
is the description of a computation. Transformations add to the description of the computation that will eventually be executed by a thread pool. To begin evaluation, a Task
must be explicitly started. This behavior is interesting because when a Task
is finally executed, we can limit computation execution to a single thread. This improves thread reuse and reduces context switching.
In the preceding diagram, we see various calls to apply
and map
. These calls are merely modifying the definition of the task to be performed. It is only when we call unsafePerformAsync
that the computation is realized in a different thread. Note that all the transforms are applied by the same thread.
We can exercise Future
and Task
performance in a short microbenchmark comparing their throughput based on the transform (for example, map
and flatMap
), and the count of transformations applied. A snippet of the benchmark can be found, as follows:
@Benchmark def mapWithFuture(state: TaskFutureState): Int = { implicit val ec = state.context val init = Future(0) val res = (1 until state.operations).foldLeft(init) ((f, _) => f.map(_ + 1)) Await.result(res, Duration("5 minutes")) } @Benchmark def mapWithTask(state: TaskFutureState): Int = { val init = Task(0)(state.es) val res = (1 until state.operations).foldLeft(init) ((t, _) => t.map(_ + 1)) res.unsafePerformSync }
Both scenarios run similar computations. We create an initial instance of Future
or Task
containing 0, and we apply several consecutive map
operations to add 1 to the accumulator. Two other scenarios performed the same computation but with flatMap
instead. The results for flatMap
are displayed in the following table:
Benchmark |
Operation count |
Throughput (ops per second) |
Error as percentage of throughput |
|
5 |
41,602.33 |
± 0.69 |
|
5 |
59,478.50 |
± 2.14 |
|
10 |
31,738.80 |
± 0.52 |
|
10 |
43,811.15 |
± 0.47 |
|
100 |
4,390.11 |
± 1.91 |
|
100 |
13,415.30 |
± 0.60 |
The results for map
operations can be found in the following table:
Benchmark |
Operation count |
Throughput (ops per second) |
Error as percentage of throughput |
|
5 |
45,710.02 |
± 1.30 |
|
5 |
93,666.73 |
± 0.57 |
|
10 |
44,860.44 |
± 1.80 |
|
10 |
91,932.14 |
± 0.88 |
|
100 |
19,974.24 |
± 0.55 |
|
100 |
46,288.17 |
± 0.46 |
This benchmark highlights the performance gain due to the different execution model of Task
. Even for a small number of transforms, the throughput is better with a deferred evaluation.
Equipped with our understanding of Task
, we now have the knowledge necessary to add concurrency to the execution of a single backtest run. You may recall that we discovered from Dave that MVT closes its positions at the end of each trading day. This insight allows us to model each trading day independently. Let's familiarize ourselves with the current implementation by beginning with the model, as follows:
case class PnL(value: BigDecimal) extends AnyVal object PnL { def merge(x: PnL, y: PnL): PnL = PnL(x.value + y.value) val zero: PnL = PnL(0) } case class BacktestPerformanceSummary(pnl: PnL) case class DecisionDelayMillis(value: Long) extends AnyVal
The profit-and-loss is the output of each simulated trading day. PnL
provides a convenient method to add together two PnL
instances, which can be used to sum the simulation PnL
across multiple trading days. Once all the trading days are simulated, a BacktestPerformanceSummary
is created to capture the simulation profit-and-loss. For our work on the backtester, we will use a Thread.sleep
to simulate computationally expensive work in place of an actual decisioning strategy. The length of the sleep is parameterized by DecisionDelayMillis
.
We show a simplified version of the backtester that shows how DecisionDelayMillis
is used to simulate a trading day, as follows:
def originalBacktest( testDays: List[MonthDay], decisionDelay: DecisionDelayMillis): BacktestPerformanceSummary = { val pnls = for { d <- testDays _ = Thread.sleep(decisionDelay.value) } yield PnL(10) BacktestPerformanceSummary(pnls.reduceOption(PnL.merge).getOrElse( PnL.zero)) }
The original backtest displays how a list of days is simulated in a synchronous fashion. For reproducibility, we substitute a constant profit-and-loss of $10 in place of a dynamic value. This backtest ignores the application of a ticker and a strategy to focus on the core of our dilemma: How can we use Task
to add concurrency to a backtest?
From our examples, we saw that Task
introduces concurrency through submission of multiple Task
s to an ExecutorService
and by performing the side-effect of running a Task
with unsafePerformAsync
to avoid a blocking wait for the result. As a first step, let's implement a version of the backtest that uses Task
without introducing concurrency:
def backtestWithoutConcurrency( testDays: List[MonthDay], decisionDelay: DecisionDelayMillis): Task[BacktestPerformanceSummary] = { val ts = for (d <- testDays) yield Task.delay { Thread.sleep(decisionDelay.value) PnL(10) } Task.gatherUnordered(ts).map(pnls => BacktestPerformanceSummary( pnls.reduceOption(PnL.merge).getOrElse(PnL.zero))) }
This implementation changes the return type to Task[BacktestPerformanceSummary]
. Since the Task
is not run, referential transparency is maintained within this method. Each trading day is simulated using Task.delay
. delay
is a lazy variant of Task.now
that defers evaluation of the provided value. Let's look at the following signature to confirm:
def delay[A](a: => A): Task[A]
If we had instead used Task.now
in place of Task.delay
, the sleep (that is, the simulation) would have taken effect before running Task
. We also see the use of another new capability, Task.gatherUnordered
. gatherUnordered
is useful when you wish to make the following transformation:
List[Task[A]] => Task[List[A]]
Although List
is used here, this relationship exists for any Seq
. gatherUnordered
provides a way to take a collection of Task
and instead operate on a single Task
that wraps a collection of the underlying type. Let's look at the following signature to make our understanding more concrete:
def gatherUnordered[A](tasks: Seq[Task[A]], exceptionCancels: Boolean = false): Task[List[A]]
This signature closely matches the previous function that we defined with the addition of an optional Boolean parameter. When exceptionCancels
is set to true
, any pending Task
will not be evaluated. gatherUnordered
allows us to merge together the results of each trading day's profit-and-loss and return a single Task
wrapping BacktestPerformanceSummary
. The Scala Future
companion object provides an analogous method, named sequence
, that performs the same operation on a sequence of Future
s.
This is a functioning implementation of the backtest, but it does not add concurrency to the simulation of historical trading days. For our next iteration, we take advantage of a new part of the Task
API, Task.fork
. Let's see how it is used, and then we will explain how it works:
def backtestWithAllForked( testDays: List[MonthDay], decisionDelay: DecisionDelayMillis): Task[BacktestPerformanceSummary] = { val ts = for (d <- testDays) yield Task.fork { Thread.sleep(decisionDelay.value) Task.now(PnL(10)) } Task.gatherUnordered(ts).map(pnls => BacktestPerformanceSummary( pnls.reduceOption(PnL.merge).getOrElse(PnL.zero))) }
This implementation gathers trading day PnL
in the same way as before, but instead this uses a combination of Task.fork
and Task.now
to simulate the trading day. Let's look at the signature of Task.fork
to understand how runtime behavior changes:
def fork[A](a: => Task[A])(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Task[A]
fork
accepts a Task
as a by-name parameter and an implicit ExecutorService
that defaults to the CPU-bound executor. The signature shows that fork
submits the provided Task
to pool
in order to fork the computation into a different thread. fork
is an explicit way to control concurrency with Task
. Conceptually, fork
is analogous to any Future
transformation (for example, map
) that involves submission to an executor. As fork
lazily evaluates its argument, Task.now
can be used to lift the trading day's profit-and-loss into a Task
. With this implementation, the Task
that represents each trading day is submitted to an executor. If we assume 30 trading days are being backtested and the computer used has two cores, then this implementation allows each core to simulate 15 trading days instead of a single core simulating 30 days.
As we saw in earlier benchmarks, submitting a high volume of small computations to an executor is expensive. As we have explicit control over concurrency with Task
using fork
, we can improve our performance by optimizing the frequency of executor submissions. In our third attempt, we take advantage of knowing the number of trading days to be simulated to control executor submissions. The implementation now looks like the following:
def backtestWithBatchedForking( testDays: List[MonthDay], decisionDelay: DecisionDelayMillis): Task[BacktestPerformanceSummary] = { val ts = for (d <- testDays) yield Task.delay { Thread.sleep(decisionDelay.value) PnL(10) } Task.gatherUnordered(ts.sliding(30, 30).toList.map(xs => Task.fork(Task.gatherUnordered(xs)))).map(pnls => BacktestPerformanceSummary( pnls.flatten.reduceOption(PnL.merge).getOrElse(PnL.zero))) }
This implementation returns to representing the simulation of each trading day without any concurrency using Task.delay
. In contrast to the previous implementations, the list of trading day simulation Task
s is divided into chunks of 30 using sliding
. Each chunk of 30 Task
s is wrapped with an invocation of Task.fork
to execute concurrently. This approach allows us to balance the benefits of concurrency with the overhead of executor submissions.
Of these three implementations, which is most performant? The answer is not straightforward because it depends on the number of simulation trading days and the computational cost of simulating a trading day. To better understand the tradeoffs, we write a microbenchmark that tests each of the three backtest implementations. We show the state required to run the benchmark, as follows:
@State(Scope.Benchmark) class BenchmarkState { @Param(Array("1", "10")) var decisionDelayMillis: Long = 0 @Param(Array("1", "12", "24" )) var backtestIntervalMonths: Int = 0 var decisionDelay: DecisionDelayMillis = DecisionDelayMillis(-1) var backtestDays: List[MonthDay] = Nil @Setup def setup(): Unit = { decisionDelay = DecisionDelayMillis(decisionDelayMillis) backtestDays = daysWithin(trailingMonths(backtestIntervalMonths)) } }
This benchmark allows us to sweep different backtest interval and decision delay combinations. Using a daysWithin
method, which is omitted from the snippet, a count representing the number of months is converted into the list of simulation trading days. We display the implementation of only one benchmark because the other two are identical, as follows:
@Benchmark def withBatchedForking(state: BenchmarkState): BacktestPerformanceSummary = Backtest.backtestWithBatchedForking(state.backtestDays, state.decisionDelay) .unsafePerformSync
To accurately time how long it takes to complete the Task
computation, we start the computation with the blocking unsafePerformSync
method. This is a rare example where it is acceptable to make a blocking call without a timeout. In this controlled test, we are confident that all invocations will return. For this test, we sweep the the month count, leaving the decision delay fixed at 1 ms. Running this benchmark on a machine with four cores produces the following results:
Benchmark |
Months |
Decision delay milliseconds |
Throughput (ops per second) |
Error as percentage of throughput |
|
1 |
1 |
25.96 |
± 0.46 |
|
1 |
1 |
104.89 |
± 0.36 |
|
1 |
1 |
27.71 |
± 0.70 |
|
12 |
1 |
1.96 |
± 0.41 |
|
12 |
1 |
7.25 |
± 0.22 |
|
12 |
1 |
8.60 |
± 0.49 |
|
24 |
1 |
0.76 |
± 2.09 |
|
24 |
1 |
1.98 |
± 1.46 |
|
24 |
1 |
4.32 |
± 0.88 |
The results make the tradeoff between the overhead and the benefits of batching clearer. Batching is a clear win as the number of months increase with a short 1 ms computational delay. Consider the scenario of backtesting 24 months with a 1 ms decision delay. Assuming 30-day months, there are 720 trading days to simulate. Split into batches of 30, there are 24 invocations of fork
instead of 720. The overhead for splitting the Task
into batches, and gathering each batch's results, is overshadowed by the order of magnitude of fewer executor submissions. Our explicit control over forking yielded a doubling of throughput in this scenario.
As the number of months decreases, the overhead of creating Task
batches becomes a dominating factor. In a 12-month backtest, there are 360 trading days, yielding 12 batches. Here, batching yields about a 20% throughput improvement over forking all Task
. Cutting the number of trading days in half from the 24-month test reduced the performance advantage by more than half. In the worst-case scenario, when there is one month to simulate, the batching strategy fails to take to advantage of all the cores on the machine. In this scenario, one batch is created, leaving CPU resources underutilized.
As we have seen, there are a number of variables at play here. Accounting for computational costs, the number of available cores, the expected number of Task
executor submissions, and batching overhead can be challenging. To extend our work, we can investigate a more dynamic batching strategy that takes better advantage of CPU resources with smaller backtest intervals. Using this benchmark, we got a taste for the additional tools that Task
provides, and how explicit control of executor submissions can affect throughput.
The insights that we gleaned by working on the backtester can be applied to larger-scale software systems as well. We focused on analyzing results with a short 1 ms decision delay. As the cost of executing each Task
increases (for example, 10 ms decision delay), diminishing marginal performance improvements are gained from batching. This is because the cost of executor submissions becomes overshadowed by the cost of the computation. While 1 ms appears to be a small amount of time, there are a potentially surprising number of computations that can be completed in this time frame. Consider that a throughput of 1,000 operations per second translates to 1 operation per millisecond. Reflecting on benchmarks that we have performed in our earlier efforts and through your own work, you can find numerous examples where we worked with operations that have a throughput higher than 1 operation per millisecond. The takeaway from this thought experiment is a large number of use cases fit within the definition of a short computation (that is, 1 ms), which means that there are a significant number of opportunities to optimize concurrency through the judicious use of fork
.
The backtester is a prime candidate for batching because the amount of work, namely the number of days to simulate, is known at the start of processing. In a stream processing environment, the amount of work is unknown. For example, consider the order book receiving events on-the-fly. How can you implement batching in a streaming environment?
We hope that backtester provided an illustrative example to give you a feeling for Task
. There are additional tools that are provided by Task
that we did not explore. We invite you to read the documentation for Task
in the Scalaz library. In the book entitled, Functional Programming in Scala, written by two Scalaz contributors, Rúnar Bjarnason and Paul Chiusano, there is an excellent chapter describing the implementation of a simplified version of Scalaz Task
. This is a great resource to understand the design of the API.