Tasked with more backtest performance improvements

Discovering Future and adopting an asynchronous mindset helped you better utilize your computing resources to test multiple strategies and tickers faster. You improved performance by treating the backtest as a black box. Without changing the implementation of the backtest, there were straightforward performance wins. Identifying logical sequences of transformations as candidates for concurrency is a good strategy to apply when considering how to speed up your software.

Let's extend this idea to a smaller logical unit of processing within the backtester. A backtest exercises a strategy for a ticker across a time period. After speaking with Dave, you discover that MVT does not maintain positions overnight. At the end of each trading day, MVT trading systems mitigate risk by ensuring that all stock positions are liquidated. This is done to defend against volatile overnight price moves after the market closes, which the company is unable to react to by trading. As positions are not held overnight, each trading day can be simulated independently of the previous trading day. Returning to our asynchronous mindset, this insight implies that trading day simulations can be performed concurrently.

Before jumping into the implementation using Future, we will share an alternative abstraction, named Task, which is provided by the Scalaz library. Task provides compelling usage reasons for our proposed backtest modifications. We introduce Task next, provided that you are up to the task!

Introducing Scalaz Task

Scalaz Task provides a different approach to achieve concurrency. Although Task can be used in a way that mimics Future, there are important conceptual differences between these two abstractions. Task allows fine-grained control over asynchronous execution, which provides performance benefits. Task maintains referential transparency as well, which provides stronger reasoning abilities. Referential transparency is a property of expressions that are side-effect free. To better understand this principle, consider the following simple sum method:

def sum(x: Int, y: Int): Int = x + y 

Imagine that we are performing two summations:

sum(sum(2, 3), 4)  

As sum is side-effect free, we can replace sum(2, 3) with its result, as follows:

sum(5, 4) 

This expression will always evaluate to 9, which satisfies referential transparency. Now imagine a twist in the implementation of sum:

class SumService(updateDatabase: () => Unit) { 
  def sum(x: Int, y: Int): Int = { 
    updateDatabase() 
    x + y 
  } 
} 

Now, sum includes a side-effect of writing to a database that breaks referential transparency. We can no longer perform the replacement of sum(2, 3) with the value 9 because then the database will not be updated. Referential transparency is a concept at the heart of the functional programming paradigm because it provides strong reasoning guarantees. The Haskell wiki provides additional commentary and examples worth reviewing at https://wiki.haskell.org/Referential_transparency.

Let's take a look at common Task API usage to better understand how Task works.

Creating and executing Task

The methods provided by the Task companion object are the main entry points to the API, and the best ways to create an instance of Task. The Task.apply is the first method to inspect. It takes a computation returning an instance of A (that is, a by-name parameter of the A type) and an implicit ExecutorService to run the computation. Contrary to Future, which uses ExecutionContext as an abstraction for a thread pool, Task uses the ExecutorService, which is defined in the Java standard library:

scala> val t = Task { 
     |   println("Starting task") 
     |   40 + 2 
     | } 
t: scalaz.concurrent.Task[Int] = scalaz.concurrent.Task@300555a9 

The first thing that you may have noticed is that, even though we instantiated a new Task, nothing is printed on the screen. This is an important difference when comparing Task and Future; while Future is eagerly evaluated, a Task is not computed until you explicitly ask for it:

scala> t.unsafePerformSync 
Starting task 
res0: Int = 42 

The preceding example calls the unsafePerformSync instance method to execute the task. We can see the println as well as the returned result 42. Note that unsafePerformSync is an unsafe call. If the computation throws an exception, the exception is re-thrown by unsafePerformSync. To avoid this side-effect, calling unsafePerformSyncAttempt is preferred. The unsafePerformSyncAttempt instance catches the exception and has a return type of Throwable / A, which allows you to cleanly handle the failure case. Note that when creating the task t, we did not provide an ExecutorService. By default, apply creates a Task to be run on DefaultExecutorService, a fixed thread pool for which the size is based on the count of available processors on the machine using a default parameter. The DefaultExecutorService is analogous to the global ExecutionContext that we explored with Future. It is CPU-bound and sized based on the available cores on the machine. We can also supply a different ExecutorService at creation time:

scala> val es = Executors.newFixedThreadPool(4) 
es: java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPoolExecutor@4c50cd8c[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 
 
scala> val t = Task { 
 println("Starting task on thread " + Thread.currentThread.getName) 
 40 + 2 
}(es) 
t: scalaz.concurrent.Task[Int] = scalaz.concurrent.Task@497db010 
 
scala> println("Calling run from " + Thread.currentThread.getName) 
Calling run from run-main-1 
 
scala> t.unsafePerformSync 
Starting task on thread pool-8-thread-2 
res2: Int = 42 

The output shows that the Task is executed on the supplied ExecutorService, not on the main thread.

Speaking of Task execution, let's perform a little experiment. We will create an instance of Task and call unsafePerformSync twice in a row:

scala> val t = Task { 
     |   println("Starting task") 
     |   40 + 2 
     | } 
t: scalaz.concurrent.Task[Int] = scalaz.concurrent.Task@300555a9 
 
scala> t.unsafePerformSync 
Starting task 
res0: Int = 42 
 
scala> t.unsafePerformSync 
Starting task 
res1: Int = 42 

We observe that Starting task prints after each call to unsafePerformSync. This indicates that the full computation is executed each time we call unsafePerformSync. That is another difference with Future. While a Future memorizes its result after the computation, a Task performs its computation each time we call unsafePerformSync. In other words, Task is referentially transparent and, therefore, closer to the functional programming paradigm than Future.

Asynchronous behavior

Like Future, it is possible (and even recommended) to use Task in an asynchronous way. An instance of Task can be executed asynchronously by calling unsafePerformAsync. This method takes a callback of type(Throwable / A) => Unit that is called at the end of the computation. Observe the following snippet:

def createAndRunTask(): Unit = { 
 val t = Task { 
   println("Computing the answer...") 
   Thread.sleep(2000) 
   40 + 2 
 } 
 
 t.unsafePerformAsync { 
   case /-(answer) => println("The answer is " + answer) 
   case -/(ex) => println("Failed to compute the answer: " + ex) 
 } 
 
 println("Waiting for the answer") 
} 

We create ourTask, and add a Thread.sleep to simulate an expensive computation. We call unsafePerformAsync and use a simple callback to print the answer (or an exception, if the computation fails). We call createAndRunTask and observe the following output:

scala> TaskExample.createAndRunTask() 
Waiting for the answer 
 
scala> Computing the answer... 
The answer is 42 

We can see that our last statement, "Waiting for the answer" was printed first. This is because unsafePerformAsync returns immediately. We can see the statement from our computation, as well as the answer printed in our callback. This method is a rough equivalent to onComplete, which is defined on Scala's Future.

Another useful method provided by the companion object of Task is async. Remember how we previously used Promise to turn a callback-based API into an API returning an instance of Future? It is possible to achieve the same goal with Task; that is, we can turn a callback-based API into a more monadic API returning a Task, as follows:

object CallbackAPI { 
  def doCoolThings[A](a: => A, f: (Throwable / A) => Unit): Unit = ??? 
} 
 
def doCoolThingsToTask[A](a: => A): Task[A] = 
 Task.async { f => 
   CallbackAPI.doCoolThings[A](a, res => f(res)) 
 } 

Evaluating this method in the REPL yields the following:

> val t = doCoolThingsToTask(40+2) 
> t.map(res => res / 2).unsafePerformSync 
res2: Int = 21 

Our doCoolThingsToTask method uses Task.async to create a Task instance from a callback-based API that is defined in CallbackAPI. The Task.async can even be used to turn a Scala Future into a Scalaz Task:

def futureToTask[A](future: Future[A])(implicit ec: ExecutionContext): Task[A] = 
 Task.async { f => 
   future.onComplete { 
     case Success(res) => f(/-(res)) 
     case Failure(ex) => f(-/(ex)) 
   } 
 } 

Note that we have to supply an ExecutionContext to be able to call onComplete on Future. This is due to Future eager evaluation. Almost all methods that are defined on Future will submit a computation to a thread pool immediately.

Note

It is also possible to convert a Task to a Future:

def taskToFuture[A](t: Task[A]): Future[A] = { 
  val p = Promise[A]() 
  t.unsafePerformAsync { 
    case /-(a) => p.success(a) 
    case -/(ex) => p.failure(ex) 
  } 
  p.future 
}

The execution model

Understanding the Task execution model requires understanding the Scalaz Future execution model because Task composes a Scalaz Future and adds error handling. This is visible from the definition of Task:

class Task[+A](val get: Future[Throwable / A]) 

In this definition, Future is the not the Scala standard library version, but instead this is an alternative version that is provided by Scalaz. The Scalaz Future decouples defining transformations from execution strategy, providing us with fine-grained control over Executor submissions. Scalaz Future accomplishes this by defining itself as a trampolining computation. Trampolining is a technique that describes a computation as a discrete series of chunks that are run using constant space. To dive into the details of how a trampoline works, we recommend reading Runar Bjarnason's paper, Stackless Scala With Free Monads, available at http://blog.higher-order.com/assets/trampolines.pdf.

Task builds on Scalaz Future by providing error handling with the Scalaz / disjunction. Task is the description of a computation. Transformations add to the description of the computation that will eventually be executed by a thread pool. To begin evaluation, a Task must be explicitly started. This behavior is interesting because when a Task is finally executed, we can limit computation execution to a single thread. This improves thread reuse and reduces context switching.

The execution model

In the preceding diagram, we see various calls to apply and map. These calls are merely modifying the definition of the task to be performed. It is only when we call unsafePerformAsync that the computation is realized in a different thread. Note that all the transforms are applied by the same thread.

We can exercise Future and Task performance in a short microbenchmark comparing their throughput based on the transform (for example, map and flatMap), and the count of transformations applied. A snippet of the benchmark can be found, as follows:

@Benchmark 
def mapWithFuture(state: TaskFutureState): Int = { 
  implicit val ec = state.context 
  val init = Future(0) 
  val res = (1 until state.operations).foldLeft(init) 
    ((f, _) => f.map(_ + 1)) 
  Await.result(res, Duration("5 minutes")) 
} 
 
@Benchmark 
def mapWithTask(state: TaskFutureState): Int = { 
  val init = Task(0)(state.es) 
  val res = (1 until state.operations).foldLeft(init)
    ((t, _) => t.map(_ + 1)) 
  res.unsafePerformSync 
} 

Both scenarios run similar computations. We create an initial instance of Future or Task containing 0, and we apply several consecutive map operations to add 1 to the accumulator. Two other scenarios performed the same computation but with flatMap instead. The results for flatMap are displayed in the following table:

Benchmark

Operation count

Throughput (ops per second)

Error as percentage of throughput

flatMapWithFuture

5

41,602.33

± 0.69

flatMapWithTask

5

59,478.50

± 2.14

flatMapWithFuture

10

31,738.80

± 0.52

flatMapWithTask

10

43,811.15

± 0.47

flatMapWithFuture

100

4,390.11

± 1.91

flatMapWithTask

100

13,415.30

± 0.60

The results for map operations can be found in the following table:

Benchmark

Operation count

Throughput (ops per second)

Error as percentage of throughput

mapWithFuture

5

45,710.02

± 1.30

mapWithTask

5

93,666.73

± 0.57

mapWithFuture

10

44,860.44

± 1.80

mapWithTask

10

91,932.14

± 0.88

mapWithFuture

100

19,974.24

± 0.55

mapWithTask

100

46,288.17

± 0.46

This benchmark highlights the performance gain due to the different execution model of Task. Even for a small number of transforms, the throughput is better with a deferred evaluation.

Modeling trading day simulations with Task

Equipped with our understanding of Task, we now have the knowledge necessary to add concurrency to the execution of a single backtest run. You may recall that we discovered from Dave that MVT closes its positions at the end of each trading day. This insight allows us to model each trading day independently. Let's familiarize ourselves with the current implementation by beginning with the model, as follows:

  case class PnL(value: BigDecimal) extends AnyVal 
  object PnL { 
    def merge(x: PnL, y: PnL): PnL = PnL(x.value + y.value) 
    val zero: PnL = PnL(0) 
  } 
  case class BacktestPerformanceSummary(pnl: PnL) 
  case class DecisionDelayMillis(value: Long) extends AnyVal 

The profit-and-loss is the output of each simulated trading day. PnL provides a convenient method to add together two PnL instances, which can be used to sum the simulation PnL across multiple trading days. Once all the trading days are simulated, a BacktestPerformanceSummary is created to capture the simulation profit-and-loss. For our work on the backtester, we will use a Thread.sleep to simulate computationally expensive work in place of an actual decisioning strategy. The length of the sleep is parameterized by DecisionDelayMillis.

We show a simplified version of the backtester that shows how DecisionDelayMillis is used to simulate a trading day, as follows:

  def originalBacktest( 
    testDays: List[MonthDay], 
    decisionDelay: DecisionDelayMillis): BacktestPerformanceSummary = 
    { 
    val pnls = for { 
      d <- testDays 
      _ = Thread.sleep(decisionDelay.value) 
    } yield PnL(10) 
    BacktestPerformanceSummary(pnls.reduceOption(PnL.merge).getOrElse(
      PnL.zero)) 
  } 

The original backtest displays how a list of days is simulated in a synchronous fashion. For reproducibility, we substitute a constant profit-and-loss of $10 in place of a dynamic value. This backtest ignores the application of a ticker and a strategy to focus on the core of our dilemma: How can we use Task to add concurrency to a backtest?

From our examples, we saw that Task introduces concurrency through submission of multiple Tasks to an ExecutorService and by performing the side-effect of running a Task with unsafePerformAsync to avoid a blocking wait for the result. As a first step, let's implement a version of the backtest that uses Task without introducing concurrency:

  def backtestWithoutConcurrency( 
    testDays: List[MonthDay], 
    decisionDelay: DecisionDelayMillis): Task[BacktestPerformanceSummary] = 
    { 
    val ts = for (d <- testDays) yield Task.delay { 
      Thread.sleep(decisionDelay.value) 
      PnL(10) 
    } 
    Task.gatherUnordered(ts).map(pnls => BacktestPerformanceSummary( 
      pnls.reduceOption(PnL.merge).getOrElse(PnL.zero))) 
  } 

This implementation changes the return type to Task[BacktestPerformanceSummary]. Since the Task is not run, referential transparency is maintained within this method. Each trading day is simulated using Task.delay. delay is a lazy variant of Task.now that defers evaluation of the provided value. Let's look at the following signature to confirm:

def delay[A](a: => A): Task[A] 

If we had instead used Task.now in place of Task.delay, the sleep (that is, the simulation) would have taken effect before running Task. We also see the use of another new capability, Task.gatherUnorderedgatherUnordered is useful when you wish to make the following transformation:

List[Task[A]] => Task[List[A]] 

Although List is used here, this relationship exists for any Seq. gatherUnordered provides a way to take a collection of Task and instead operate on a single Task that wraps a collection of the underlying type. Let's look at the following signature to make our understanding more concrete:

def gatherUnordered[A](tasks: Seq[Task[A]], exceptionCancels: Boolean = false): Task[List[A]] 

This signature closely matches the previous function that we defined with the addition of an optional Boolean parameter. When exceptionCancels is set to true, any pending Task will not be evaluated. gatherUnordered allows us to merge together the results of each trading day's profit-and-loss and return a single Task wrapping BacktestPerformanceSummary. The Scala Future companion object provides an analogous method, named sequence, that performs the same operation on a sequence of Futures.

This is a functioning implementation of the backtest, but it does not add concurrency to the simulation of historical trading days. For our next iteration, we take advantage of a new part of the Task API, Task.fork. Let's see how it is used, and then we will explain how it works:

  def backtestWithAllForked( 
    testDays: List[MonthDay], 
    decisionDelay: DecisionDelayMillis): Task[BacktestPerformanceSummary] = 
    { 
    val ts = for (d <- testDays) yield Task.fork { 
      Thread.sleep(decisionDelay.value) 
      Task.now(PnL(10)) 
    } 
    Task.gatherUnordered(ts).map(pnls => BacktestPerformanceSummary( 
      pnls.reduceOption(PnL.merge).getOrElse(PnL.zero))) 
  } 

This implementation gathers trading day PnL in the same way as before, but instead this uses a combination of Task.fork and Task.now to simulate the trading day. Let's look at the signature of Task.fork to understand how runtime behavior changes:

def fork[A](a: => Task[A])(implicit pool: ExecutorService = Strategy.DefaultExecutorService): Task[A] 

fork accepts a Task as a by-name parameter and an implicit ExecutorService that defaults to the CPU-bound executor. The signature shows that fork submits the provided Task to pool in order to fork the computation into a different thread. fork is an explicit way to control concurrency with Task. Conceptually, fork is analogous to any Future transformation (for example, map) that involves submission to an executor. As fork lazily evaluates its argument, Task.now can be used to lift the trading day's profit-and-loss into a Task. With this implementation, the Task that represents each trading day is submitted to an executor. If we assume 30 trading days are being backtested and the computer used has two cores, then this implementation allows each core to simulate 15 trading days instead of a single core simulating 30 days.

As we saw in earlier benchmarks, submitting a high volume of small computations to an executor is expensive. As we have explicit control over concurrency with Task using fork, we can improve our performance by optimizing the frequency of executor submissions. In our third attempt, we take advantage of knowing the number of trading days to be simulated to control executor submissions. The implementation now looks like the following:

  def backtestWithBatchedForking( 
    testDays: List[MonthDay], 
    decisionDelay: DecisionDelayMillis): Task[BacktestPerformanceSummary] = 
    { 
    val ts = for (d <- testDays) yield Task.delay { 
      Thread.sleep(decisionDelay.value) 
      PnL(10) 
    } 
    Task.gatherUnordered(ts.sliding(30, 30).toList.map(xs => 
      Task.fork(Task.gatherUnordered(xs)))).map(pnls => 
      BacktestPerformanceSummary( 
        pnls.flatten.reduceOption(PnL.merge).getOrElse(PnL.zero))) 
  } 

This implementation returns to representing the simulation of each trading day without any concurrency using Task.delay. In contrast to the previous implementations, the list of trading day simulation Tasks is divided into chunks of 30 using sliding. Each chunk of 30 Tasks is wrapped with an invocation of Task.fork to execute concurrently. This approach allows us to balance the benefits of concurrency with the overhead of executor submissions.

Of these three implementations, which is most performant? The answer is not straightforward because it depends on the number of simulation trading days and the computational cost of simulating a trading day. To better understand the tradeoffs, we write a microbenchmark that tests each of the three backtest implementations. We show the state required to run the benchmark, as follows:

  @State(Scope.Benchmark) 
  class BenchmarkState { 
    @Param(Array("1", "10")) 
    var decisionDelayMillis: Long = 0 
    @Param(Array("1", "12", "24" )) 
    var backtestIntervalMonths: Int = 0 
 
    var decisionDelay: DecisionDelayMillis = DecisionDelayMillis(-1) 
    var backtestDays: List[MonthDay] = Nil 
 
    @Setup 
    def setup(): Unit = { 
      decisionDelay = DecisionDelayMillis(decisionDelayMillis) 
      backtestDays = daysWithin(trailingMonths(backtestIntervalMonths)) 
    } 
  } 

This benchmark allows us to sweep different backtest interval and decision delay combinations. Using a daysWithin method, which is omitted from the snippet, a count representing the number of months is converted into the list of simulation trading days. We display the implementation of only one benchmark because the other two are identical, as follows:

@Benchmark 
def withBatchedForking(state: BenchmarkState): BacktestPerformanceSummary = 
  Backtest.backtestWithBatchedForking(state.backtestDays, 
  state.decisionDelay) 
    .unsafePerformSync 

To accurately time how long it takes to complete the Task computation, we start the computation with the blocking unsafePerformSync method. This is a rare example where it is acceptable to make a blocking call without a timeout. In this controlled test, we are confident that all invocations will return. For this test, we sweep the the month count, leaving the decision delay fixed at 1 ms. Running this benchmark on a machine with four cores produces the following results:

Benchmark

Months

Decision delay milliseconds

Throughput (ops per second)

Error as percentage of throughput

withoutConcurrency

1

1

25.96

± 0.46

withAllForked

1

1

104.89

± 0.36

withBatchedForking

1

1

27.71

± 0.70

withoutConcurrency

12

1

1.96

± 0.41

withAllForked

12

1

7.25

± 0.22

withBatchedForking

12

1

8.60

± 0.49

withoutConcurrency

24

1

0.76

± 2.09

withAllForked

24

1

1.98

± 1.46

WithBatchedForking

24

1

4.32

± 0.88

The results make the tradeoff between the overhead and the benefits of batching clearer. Batching is a clear win as the number of months increase with a short 1 ms computational delay. Consider the scenario of backtesting 24 months with a 1 ms decision delay. Assuming 30-day months, there are 720 trading days to simulate. Split into batches of 30, there are 24 invocations of fork instead of 720. The overhead for splitting the Task into batches, and gathering each batch's results, is overshadowed by the order of magnitude of fewer executor submissions. Our explicit control over forking yielded a doubling of throughput in this scenario.

As the number of months decreases, the overhead of creating Task batches becomes a dominating factor. In a 12-month backtest, there are 360 trading days, yielding 12 batches. Here, batching yields about a 20% throughput improvement over forking all Task. Cutting the number of trading days in half from the 24-month test reduced the performance advantage by more than half. In the worst-case scenario, when there is one month to simulate, the batching strategy fails to take to advantage of all the cores on the machine. In this scenario, one batch is created, leaving CPU resources underutilized.

Wrapping up the backtester

As we have seen, there are a number of variables at play here. Accounting for computational costs, the number of available cores, the expected number of Task executor submissions, and batching overhead can be challenging. To extend our work, we can investigate a more dynamic batching strategy that takes better advantage of CPU resources with smaller backtest intervals. Using this benchmark, we got a taste for the additional tools that Task provides, and how explicit control of executor submissions can affect throughput.

The insights that we gleaned by working on the backtester can be applied to larger-scale software systems as well. We focused on analyzing results with a short 1 ms decision delay. As the cost of executing each Task increases (for example, 10 ms decision delay), diminishing marginal performance improvements are gained from batching. This is because the cost of executor submissions becomes overshadowed by the cost of the computation. While 1 ms appears to be a small amount of time, there are a potentially surprising number of computations that can be completed in this time frame. Consider that a throughput of 1,000 operations per second translates to 1 operation per millisecond. Reflecting on benchmarks that we have performed in our earlier efforts and through your own work, you can find numerous examples where we worked with operations that have a throughput higher than 1 operation per millisecond. The takeaway from this thought experiment is a large number of use cases fit within the definition of a short computation (that is, 1 ms), which means that there are a significant number of opportunities to optimize concurrency through the judicious use of fork.

Note

The backtester is a prime candidate for batching because the amount of work, namely the number of days to simulate, is known at the start of processing. In a stream processing environment, the amount of work is unknown. For example, consider the order book receiving events on-the-fly. How can you implement batching in a streaming environment?

We hope that backtester provided an illustrative example to give you a feeling for Task. There are additional tools that are provided by Task that we did not explore. We invite you to read the documentation for Task in the Scalaz library. In the book entitled, Functional Programming in Scala, written by two Scalaz contributors, Rúnar Bjarnason and Paul Chiusano, there is an excellent chapter describing the implementation of a simplified version of Scalaz Task. This is a great resource to understand the design of the API.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset