Historical data analysis

You have done great work with the order book, and we hope, have learned valuable skills along the way! It is now time to explore a new facet of MVT's activities. A group of expert traders and data scientists are constantly studying historical market data to design performant trading strategies. Until now, the company has not had the luxury of allocating technical resources to this team. As a result, this group has been using clunky, unreliable, and under-performing tools to analyze market data and build elaborate trading strategies. With a performant order book, the top priority is to focus on improving the strategies implemented by the company. Your new best friend, Dave, has explicitly asked for you to join the team and help them modernize their infrastructure.

Lagged time series returns

The main tool used by the team is a simple program designed to compute lagged time series returns from historical trade execution data. So far, this tool has been a big disappointment. Not only does it return mostly invalid results, it is also slow and fragile. Before diving into the code, Dave gives you a short presentation of the business rules involved. Return time series are derived from midpoint time series. A midpoint is calculated on each minute, and it is based on the bid and ask prices of each trade execution. Consider the following table as a simple example:

Execution time

Bid price

Ask price

Midpoint

01/29/16 07:45

2.3

2.5

2.55

01/29/16 07:46

2.1

2.4

2.25

01/29/16 07:47

2.9

3.4

3.15

01/29/16 07:48

3.2

3.4

3.3

01/29/16 07:49

3.1

3.3

3.2

The formula to calculate a midpoint is (bid_price + ask_price) / 2. For example, the midpoint at 01/29/16 07:47 is (2.9 + 3.4) / 2, that is, 3.15.

Note

In the real world, a midpoint would be weighed by the volume of the transaction, and the time series would use a more fine-grained time unit, such as seconds or even milliseconds. To keep the example simple, we disregard the volume dimension by assuming a volume of 1 for all executions. We also focus on calculating one data point per minute instead of a more granular time series that would use seconds or even milliseconds.

A series of midpoints is used to compute a series of returns. A series of returns is defined for a certain rollup value in minutes. To calculate the three minute return at time t3, the formula is: (midpoint_at_t3 - midpoint_at_t0) / midpoint_at_t0. We also multiply the result by 100 to use percentages. If we use the previous midpoint series to calculate a three minute return series, we obtain the following table:

Time

Midpoint

3 minute return

01/29/16 07:45

2.55

N/A

01/29/16 07:46

2.25

N/A

01/29/16 07:47

3.15

N/A

01/29/16 07:48

3.3

22.73

01/29/16 07:49

3.2

29.69

Note that the first three midpoints do not have a corresponding three minute return as there is no midpoint that is old enough to be used.

You are now familiar with the domain and can have a look at the existing code. Starting with this model:

case class TimestampMinutes(value: Int) extends AnyVal { 
  def next: TimestampMinutes = TimestampMinutes(value + 1) 
} 
 
case class AskPrice(value: Int) extends AnyVal 
case class BidPrice(value: Int) extends AnyVal 
case class Execution(time: TimestampMinutes, ask: AskPrice, bid: BidPrice) 
 
case class Midpoint(time: TimestampMinutes, value: Double) 
object Midpoint { 
  def fromAskAndBid(time: TimestampMinutes,askPrice: AskPrice, 
   bidPrice: BidPrice): Midpoint = 
   Midpoint(time, (bidPrice.value + askPrice.value) / 2D) 
} 
 
case class MinuteRollUp(value: Int) extends AnyVal 
case class Return(value: Double) extends AnyVal 
 
object Return { 
  def fromMidpoint(start: Midpoint, end: Midpoint): Return = 
    Return((end.value - start.value) / start.value * 100) 
} 

Everything looks straightforward. Note that prices, midpoints, and returns are represented as Int and Double. We assume that our system is able to normalize the prices as integers instead of decimals. This simplifies our code, and also improves the performance of the program since we use primitive Double instead of, for example, BigDecimal instances. TimestampMinutes is similar to the more commonly used Epoch timestamp, but only down to the minute (see https://en.wikipedia.org/wiki/Unix_time).

After studying the model, we look at the existing implementation of the computeReturnsWithList method:

def computeReturnsWithList( 
  rollUp: MinuteRollUp, 
  data: List[Midpoint]): List[Return] = { 
  for { i <- (rollUp.value until data.size).toList} yield       Return.fromMidpoint(data(i - rollUp.value), data(i)) 
} 

This method assumes that the list of midpoint received as input is already sorted by execution time. This randomly accesses various indices of the list to read the midpoints that are required to compute each return. To compute the second return value (index 1 in the returned list) with a rollup value of three minutes, we access elements at index 4 and 1 in the input list. The following diagram provides a visual reference for how returns are computed:

Lagged time series returns

You have been warned that this method is slow, but it is also incorrect. Dave has verified many times that it returns incorrect results. Before tackling the performance issue, you have to handle the correctness problem. Optimizing an incorrect approach would not be a good use of your time and, therefore, of the company's money! Rapidly, you realize that this method puts too much trust in the data that it is fed. For this algorithm to work, the input list of midpoints has to do the following:

  • This has to be properly sorted by execution time, from the oldest to the newest execution
  • This has to have no more than one midpoint per minute
  • This has to not contain any minutes without a midpoint, that is, it has no missing data points

You bring this up to Dave to better understand how the midpoint series is generated. He explains that it is loaded from sequential logs that are recorded by the order book. It is certain that the list is sorted by execution time. Also, he assures you that considering the large volume of trades handled by the order book, it is impossible to have a minute without a single execution. However, he acknowledges that it is more than likely that more than one midpoint is computed for the same execution time. It looks like you have found the problem causing invalid returns. Fixing it should not be too complicated, and you think that it is now time to reflect on the performance issue.

We spent time studying the structure of a singly-linked list in the previous section. You know that it is optimized for operations involving the head and the tail of the list. On the contrary, randomly accessing an element by its index is an expensive operation requiring linear time. To improve midpoint execution performance, we turn to a data structure with improved random access performance: Vector.

Vector

To improve the performance of our system, we should reconsider the data structure that stores Midpoint values. A good option is to replace List with Vector, another Scala collection provided by the standard library. The Vector is an efficient collection that provides effectively constant time random access. The cost of random access operations depends on various assumptions, such as, the maximum length of the Vector. The Vector is implemented as an ordered tree data structure called a trie. In a trie, the keys are the indices of the values stored in the Vector (to learn more about tries and their use cases, see https://en.wikipedia.org/wiki/Trie). As Vector implements the Seq trait, just like List, modifying the existing method is straightforward:

def computeReturnsWithVector( 
  rollUp: MinuteRollUp, 
  data: Vector[Midpoint]): Vector[Return] = { 
  for { 
    i <- (rollUp.value until data.size).toVector 
  } yield Return.fromMidpoint(data(i - rollUp.value), data(i)) 
} 

Changing the type of the collection is enough to switch to a more performant implementation. To make sure that we actually improved the performance, we devise a simple benchmark that is designed to use a few hours of historical trade executions and measure the throughput of each implementation. The results are as follows:

Benchmark

Return rollup in minutes

Throughput (ops per second)

Error as percentage of throughput

computeReturnsWithList

10

534.12

± 1.69

computeReturnsWithVector

10

49,016.77

± 0.98

computeReturnsWithList

60

621.28

± 0.64

computeReturnsWithVector

60

51,666.50

± 1.64

computeReturnsWithList

120

657.44

± 1.07

computeReturnsWithVector

120

43,297.88

± 0.99

Not only does Vector yield significantly better performance, it delivers the same throughput regardless of the size of the rollup. As a general rule, it is better to use Vector as a default implementation for immutable indexed sequences. Vector effectively provides constant time complexity not only for element random access but also for head and tail operations, as well as to append and prepend elements to an existing Vector.

The implementation of Vector is a tree structure of parity 32. Each node is implemented as an array of size 32, and it can store either up to 32 references to child nodes or up to 32 values. This 32-ary tree structure explains why the complexity of Vector is "effectively constant" instead of "constant". The real complexity of the implementation is log(32, N), where N is the size of the vector. This is considered close enough to actual constant time. This collection is a good choice to store very large sequences because the memory is allocated in chunks of 32 elements. These chunks are not preallocated for all levels of the tree, but only allocated as needed.

Until Scala 2.10, one downside of Vector as compared to List was the lack of pattern matching support. This is now fixed and you can pattern-match an instance of Vector in the same way you pattern match a List. Consider this short example of a method pattern matching a Vector to access and return its third element or return None if it contains fewer than three elements:

def returnThirdElement[A](v: Vector[A]): Option[A] = v match { 
 case _ +: _ +: x +: _ => Some(x) 
  case _ => None 
} 

Invoking this method in the REPL demonstrates that pattern matching can be applied, as follows:

    scala> returnThirdElement(Vector(1,2,3,4,5))
    res1: Option[Int] = Some(3)

Data clean up

The return algorithm is now blazingly fast. That is, blazingly fast to return incorrect results! Remember that we still have to handle some edge cases and clean up the input data. Our algorithm only works if there is exactly one midpoint per minute, and Dave informed us that we are likely to see more than one midpoint computed for the same minute.

To handle this problem, we create a dedicated MidpointSeries module and make sure that an instance of MidpointSeries, wrapping a series of Midpoint instances, is properly created without duplicates:

class MidpointSeries private(val points: Vector[Midpoint]) extends AnyVal 
object MidpointSeries { 
 
 private def removeDuplicates(v: Vector[Midpoint]): Vector[Midpoint] = { 
   @tailrec 
   def loop( 
     current: Midpoint, 
     rest: Vector[Midpoint], 
     result: Vector[Midpoint]): Vector[Midpoint] = { 
     val sameTime = current +: rest.takeWhile(_.time == current.time) 
     val average = sameTime.map(_.value).sum / sameTime.size 
 
     val newResult = result :+ Midpoint(current.time, average) 
     rest.drop(sameTime.size - 1) match { 
       case h +: r => loop(h, r, newResult) 
       case _ => newResult 
     } 
   } 
 
   v match { 
     case h +: rest => loop(h, rest, Vector.empty) 
     case _ => Vector.empty 
   } 
 } 
 
 def fromExecution(executions: Vector[Execution]): MidpointSeries = { 
   new MidpointSeries(removeDuplicates( 
     executions.map(Midpoint.fromExecution))) 
 } 

Our removeDuplicates method uses a tail recursive method (Refer to Chapter 3, Unleashing Scala Performance). This groups all the midpoints with the same execution time, calculates the average value of these data points, and builds a new series with these average values. Our module provides a fromExecution factory method to build an instance of MidpointSeries from a Vector of Execution. This factory method calls removeDuplicates to clean up the data.

To improve our module, we add our previous computeReturns method to the MidpointSeries class. That way, once constructed, an instance of MidpointSeries can be used to compute any return series:

class MidpointSeries private(val points: Vector[Midpoint]) extends AnyVal { 
 
 def returns(rollUp: MinuteRollUp): Vector[Return] = { 
   for { 
     i <- (rollUp.value until points.size).toVector 
   } yield Return.fromMidpoint(points(i - rollUp.value), points(i)) 
 } 
} 

This is the same code that we previously wrote, but this time, we are confident that points does not contain duplicates. Note that the constructor is marked private, so the only way to instantiate an instance of MidpointSeries is via our factory method. This guarantees that it is impossible to create an instance of MidpointSeries with a "dirty" Vector. You release this new version of the program, wish good luck to Dave and his team, and leave for a well deserved lunch break.

As you return, you are surprised to find Vanessa, one of the data scientists, waiting at your desk. "The return series code still doesn't work", she says. The team was so excited to finally be given a working algorithm that they decided to skip lunch to play with it. Unfortunately, they discovered some inconsistencies with the results. You try to collect as much data as possible, and spend an hour looking at the invalid results that Vanessa is talking about. You noticed that they all involved trade executions for two specific symbols: FOO and BAR. A surprisingly small amount of trades is recorded for these symbols, and it is not unusual for several minutes to elapse between trade executions. You questioned Dave about these symbols. He explains that these are thinly traded tickers, and it is expected to see a lower trading volume for them. The problem is now clear to you. The midpoint series recorded for these symbols do not fulfill one of the prerequisite of your algorithm: at least one execution per minute. You refrain from reminding Dave that he assured you this situation was impossible and start working on a fix. The trader is always right!

You are not confident that you can rework the algorithm to make it more robust while preserving the current throughput. A better option would be to find a way to clean up the data to generate the missing data points. You seek advice from Vanessa. She explains that it would not disturb the trading algorithm to perform a linear extrapolation of the missing data points, based on the surrounding existing points. You write a short method to extrapolate a midpoint at a certain time using the previous and following points (respectively, a and b in the following snippet):

private def extrapolate(a: Midpoint,b: Midpoint, time: TimestampMinutes): Midpoint = { 
 val price = a.value + 
   ((time.value - a.time.value) / (b.time.value - a.time.value)) * 
     (b.value - a.value) 
 Midpoint(time, price) 
} 

With this method, we can write a clean up method that follows the model of the previously mentioned removeDuplicates function to preprocess the data:

private def addMissingDataPoints( 
  v: Vector[Midpoint]): Vector[Midpoint] = { 
 @tailrec 
 def loop( 
   previous: Midpoint, 
   rest: Vector[Midpoint], 
   result: Vector[Midpoint]): Vector[Midpoint] = rest match { 
   case current +: mPoints if previous.time.value == current.time.value - 1 => 
     // Nothing to extrapolate, the data points are consecutive 
     loop(current, mPoints, result :+ previous) 
 
   case current +: mPoints if previous.time.value < current.time.value - 1 => 
     //Need to generate a data point 
     val newPoint = extrapolate(previous, current, previous.time.next) 
     loop(newPoint, rest, result :+ previous) 
 
   case _ => result :+ previous 
 } 
 
 v match { 
   case h +: rest => loop(h, rest, Vector.empty) 
   case _ => Vector.empty 
 } 
} 

Our internal tail-recursive method handles the case where two points are already consecutive, and the case where a point is missing. In the latter case, we create a new point with our extrapolate method and insert it in the result Vector. Note that we use this new point to extrapolate consecutive missing points. We update our factory method to perform this additional clean up after removing possible duplicates:

def fromExecution(executions: Vector[Execution]): MidpointSeries = { 
 new MidpointSeries( 
   addMissingDataPoints( 
     removeDuplicates( 
       executions.map(Midpoint.fromExecution)))) 
} 

We now have the assurance that our input data is clean and ready to be used by our return series algorithm.

Handling multiple return series

The team is impressed by the improvements that you implemented, and by how quickly you were able to fix the existing code. They mention a project they have had in mind for a while without knowing how to approach it. A couple of weeks ago, Vanessa designed a machine learning algorithm to evaluate trading strategies over several tickers, based on their return series. This algorithm requires that all the return series involved contain the same amount of data points. Your previous changes already took care of this requirement. However, another condition is that the return values must be normalized or scaled. A feature is a machine learning term for an individual measurable property. In our example, each return data point is a feature. Feature scaling is used to standardize the range of possible values to ensure that broad ranges of values do not distort a learning algorithm. Vanessa explains that scaling features will help her algorithm to deliver better results. Our program will handle a set of return series, compute a scaling vector, and calculate a new set of normalized return series.

Array

For this system, we consider switching from Vector to Array. Array is a mutable, indexed collection of values. It provides real constant complexity for random access, as opposed to Vector, which implements this operation in effectively constant time. However, contrary to Vector,  Array is allocated once as a single and contiguous chunk of memory. Furthermore, it does not permit append and prepend operations. A Scala Array is implemented with a Java Array, which is memory optimized. A Scala Array is more user-friendly than the native Java Array. Most methods that are available on other Scala collections are made available when using Array. Implicit conversions are used to augment Array with ArrayOps and WrappedArray. ArrayOps is a simple wrapper for Array to temporarily enrich Array with all the operations found in indexed sequences. Methods called on ArrayOps will yield an Array. On the contrary, a conversion from Array to WrappedArray is permanent. Transformer methods called on WrappedArray yield another WrappedArray. We see this in the standard library documentation, as follows:

val arr = Array(1, 2, 3) 
val arrReversed = arr.reverse   // arrReversed is an Array[Int] 
val seqReversed: Seq[Int] = arr.reverse   
// seqReversed is a WrappedArray 

Having decided to use Array for our new module, we start working on the code to scale the features of each return series:

class ReturnSeriesFrame(val series: Array[Array[Return]]) { 
  val scalingVector: Array[Double] = { 
    val v = new Array[Double](series.length) 
    for (i <- series.indices) { 
      v(i) = series(i).max.value 
    } 
      v 
  } 
} 

A scaling vector is computed for a set of series. The first value of the vector is used to scale the first series, the second value for the second series, and so on. The scaling value is simply the greatest value in the series. We can now write the code to use the scaling vector and compute the normalized version of the frame:

object ReturnSeriesFrame { 
def scaleWithMap(frame: ReturnSeriesFrame): ReturnSeriesFrame = { 
 new ReturnSeriesFrame( 
   frame.series.zip(frame.scalingVector).map {  
case (series, scaling) => series.map(point => Return(point.value / scaling)) 
   }) 
} 
} 

We zip each series with its scaling value, and create a new scaled return series. We can compare the presented version of the code using Array with another, almost identical, implementation using Vector (this code is omitted here for brevity, but it can be found in the source code attached to the book):

Benchmark

Series Size

Throughput in operations per second

Error as percentage of throughput

normalizeWithVector

60

101,116.50

± 0.85

normalizeWithArray

60

176,260.52

± 0.68

normalizeWithVector

1,440

4,077.74

± 0.71

normalizeWithArray

1,440

7,865.85

± 1.39

normalizeWithVector

28,800

282.90

± 1.06

normalizeWithArray

28,800

270.36

± 1.85

These results show that Array performs better than Vector for shorter series. As the size of the series increases, their respective performances are on-par. We can even see that the throughput is identical for a series containing 20 days of data (28,800 minutes). For larger sequences, the locality of Vector and its memory allocation model alleviate the difference with Array.

Our implementation is idiomatic: it uses higher-order functions and immutable structures. However, using transform functions, such as zip and map, creates new instances of Array. An alternative is to leverage the mutable nature of Array to limit the amount of garbage generated by our program.

Looping with the Spire cfor macro

Scala supports two loop constructs: the for loop and the while loop. The latter, in spite of its good performance characteristics, is usually avoided in functional programming. It requires the usage of mutable state and var to keep track of the looping condition. In this section, we will show you a technique to take advantage of while loop performance that prevents mutable references from leaking into application code.

Spire is a numeric library written for Scala that allows developers to write efficient numeric code. Spire leverages patterns, such as, type classes, macros, and specialization (remember specialization from Chapter 3, Unleashing Scala Performance). You can learn more about Spire at https://github.com/non/spire.

One of the macros made available by Spire is cfor. Its syntax is inspired from the more traditional for loop that is encountered in Java. In the following implementation of feature scaling, we use the cfor macro to iterate over our series and normalize the values:

def scaleWithSpire(frame: ReturnSeriesFrame): ReturnSeriesFrame = { 
 import spire.syntax.cfor._ 
 
 val result = new Array[Array[Return]](frame.series.length) 
 
 cfor(0)(_ < frame.series.length, _ + 1) { i => 
   val s = frame.series(i) 
   val scaled = new Array[Return](s.length) 
   cfor(0)(_ < s.length, _ + 1) { j => 
     val point = s(j) 
     scaled(j) = Return(point.value / frame.scalingVector(i)) 
   } 
   result(i) = scaled 
 } 
 
 new ReturnSeriesFrame(result) 
} 

This example highlights that cfor macros can be nested. The macro is essentially syntactic sugar that compiles to a Scala while loop. We can examine the following generated bytecode to prove this:

public highperfscala.dataanalysis.ArrayBasedReturnSeriesFrame scaleWithSpire(highperfscala.dataanalysis.ArrayBasedReturnSeriesFrame); 
    Code: 
       0: aload_1 
       1: invokevirtual #121                // Method highperfscala/dataanalysis/ArrayBasedReturnSeriesFrame.series:()[[Lhighperfscala/dataanalysis/Return; 
       4: arraylength 
       5: anewarray     #170                // class "[Lhighperfscala/dataanalysis/Return;" 
       8: astore_2 
       9: iconst_0 
      10: istore_3 
      11: iload_3 
     [... omitted for brevity] 
      39: iload         6 
      [... omitted for brevity] 
      82: istore        6 
      84: goto          39 
      [... omitted for brevity] 
      95: istore_3 
      96: goto          11 
      99: new           #16                 // class highperfscala/dataanalysis/ArrayBasedReturnSeriesFrame 
     102: dup 
     103: aload_2 
     104: invokespecial #19                 // Method highperfscala/dataanalysis/ArrayBasedReturnSeriesFrame."<init>":([[Lhighperfscala/dataanalysis/Return;)V 
     107: areturn 

We notice the two goto statements, instructions 96 and 84, which are used to loop back respectively to the beginning of the outer loop and the inner loop (which respectively begin with instructions 11 and 39). We can run a benchmark of this new implementation to confirm the performance gain:

Benchmark

Series size

Throughput (ops per second)

Error as percentage of throughput

normalizeWithArray

60

176,260.52

± 0.68

normalizeWithCfor

60

256,303.49

± 1.33

normalizeWithArray

1,440

7,865.85

± 1.39

normalizeWithCfor

1,440

11,446.47

± 0.89

normalizeWithArray

28,800

270.36

± 1.85

normalizeWithCfor

28,800

463.56

± 1.51

The macro, which is compiled to a while loop, is able to deliver better performance. Using the cfor construct, we are able to retain performance while avoiding the introduction of multiple vars. Although this approach sacrifices immutability, the scope of mutability is limited and less error-prone than an equivalent implementation using an imperative while or for loop.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset