Rethinking reporting architecture

After deploying a new version of the web portal that generates the performance report containing your view changes, you begin wondering what else can be done to improve report generation performance. It strikes you that, for a particular time interval, the report is immutable. The computed PnL trend for a particular hour never changes once computed. Although the report is immutable, it is needlessly being recomputed each time a client requests the report. Given this line of thinking, you wonder how difficult it is to generate a new report each hour as new execution data becomes available. On-the-fly, order and execution events can be transformed as they are created into the inputs that are required for the client performance trend report. With a pregenerated report, the web portal performance issues should completely disappear because the responsibility of report generation no longer belongs to the web portal.

This new report generation strategy leads us to explore a new design paradigm, called event sourcing. Event sourcing describes an architectural approach to designing systems that relies on processing events over time instead of relying on a model of the current state to answer different questions. The reporting system that we worked on performs significant work to identify the subset of orders that executed because current state rather than events is stored. Imagine that, instead of working with data, such as Order and Execution, we instead worked with events that represent things that happened in the system over time. One relevant event to report could be the OrderExecuted event that can be modeled, as follows:

case class OrderExecuted(created: CreatedTimestamp, orderId: OrderId, price: Price) 

This event describes something that happened instead of representing a snapshot of current state. To extend this example, imagine if Order also included an optional Price to denote execution price:

sealed trait Order { 
  def created: CreatedTimestamp 
  def id: OrderId 
  def ticker: Ticker 
  def price: Price 
  def clientId: ClientId 
  def executionPrice: Option[Price] 
} 

If this data model is mapped to a relational database, executionPrice would be a nullable database value that is overwritten when an execution occurs. When the domain model only reflects the current state, then immutability is lost. As a functional programmer, this statement should concern you because you understand the reasoning capabilities that immutability provides. Storing only the current state of data may also lead to excessively large objects that are difficult to program with. For example, how would you represent that an Order was canceled? With the current approach, the most expedient method is to add a Boolean flag named isCanceled. Over time, as your system's requirements become more complicated, the Order object will grow and you will track more characteristics about the current state. This means that loading a set of Order objects into memory from a database will grow more unwieldy due to growing memory requirements. This is a dilemma that you likely have experienced if you have extensive Object Relational Mapping (ORM) experience.

To avoid bloating Order, you may try to deconstruct the concept of an order to support multiple use cases. For example, if you are only interested in executed orders, the model may change the executionPrice datatype from Option[Price] to Price, and you may no longer require the canceled Boolean flag because, by definition, an executed order could not have been canceled.

Identifying multiple definitions or representations for what you once thought was a single concept is an important step toward addressing the shortcomings that we walked through. Extending this approach, we come back to the topic of event sourcing. We can replay a set of events to build OrderExecuted. Let's slightly modify the events emitted from the order book to look like the following:

sealed trait OrderBookEvent 
case class BuyOrderSubmitted(created: CreatedTimestamp,  
  id: OrderId, ticker: Ticker, price: Price, clientId: ClientId) 
  extends OrderBookEvent 
case class SellOrderSubmitted(created: CreatedTimestamp,  
  id: OrderId, ticker: Ticker, price: Price clientId: ClientId) 
  extends OrderBookEvent 
case class OrderCanceled(created: CreatedTimestamp, id: OrderId) 
  extends OrderBookEvent 
case class OrderExecuted(created: CreatedTimestamp,  
  id: OrderId, price: Price) extends OrderBookEvent 

If all OrderBookEvents were persisted (for example, to disk), it is then possible to write a program that reads all the events and constructs a set of ExecutedOrders by correlating BuyOrderSubmitted and SellOrderSubmitted events with OrderExecuted events. An advantage that we see with this approach is that, over time, we are able to ask new questions about what happened in our system and then easily answer them by reading the events. In contrast, if a model built on the current state did not include executions when it was first designed, it is impossible to retroactively answer the question, "Which orders executed last week?"

Our new idea is exciting, and it has the potential to yield great improvements. However, it comes with a set of challenges. The main difference with the previous section is that our new use case does not load the Order and Execution collections in memory from a data store. Instead, we are planning to process the incoming OrderBookEvent as it is generated by the order book. Conceptually, this approach still involves processing a sequence of data. However, with the previous approach, the entire data set existed prior to beginning any transformations. Processing events on-the-fly requires designing software that handles data that has not yet been generated. Clearly, neither eager collections nor views are a good tool for our new system. Luckily, the standard Scala library provides us with the right abstraction: Stream. Let's take a closer look at this new collection type to better understand how Stream can help us implement an event sourcing approach to the client performance reporting architecture.

An overview of Stream

A stream can be seen as a mix between a list and a view. Like a view, it is lazily evaluated and transformations are applied only when its elements are accessed or collected. Like a List, the elements of a Stream are only evaluated once. A Stream is sometimes described as an unrealized List, meaning that it is essentially a List that has not yet been fully evaluated, or realized.

Where a List can be constructed with the cons (::) operator, a Stream can be similarly constructed with its own operator:

> val days = "Monday" :: "Tuesday" :: "Wednesday" :: Nil 
days: List[String] = List(Monday, Tuesday, Wednesday) 
 
> val months = "January" #:: "February" #:: "March" #:: Stream.empty 
months: scala.collection.immutable.Stream[String] = Stream(January, ?) 

The syntax to create a Stream is close to the one to create a List. One difference is the returned value. Where a List is immediately evaluated, a Stream is not. Only the first element ("January") is computed; the remaining values are still unknown (and denoted by a? character).

Let's observe what happens when we access part of the stream:

scala> println(months.take(2).toList) 
List(January, February) 
scala> months 
res0: scala.collection.immutable.Stream[String] = Stream(January, February, ?) 

We forced the evaluation of the first two elements of the Stream by turning it into a List (see the following sidebar). The first two months are printed. We then display the value of months to discover that the second element ("February") is now computed.

Note

In the preceding example, toList is the call that forces the evaluation of the Stream. take(2) is a lazily applied transformer that also returns an unevaluated Stream:

scala> months.take(2) 
res0: scala.collection.immutable.Stream[String] = Stream(January, ?) 

To highlight the evaluation characteristics of a Stream, we look at another example of creating a Stream:

def powerOf2: Stream[Int] = { 
  def next(n: Int): Stream[Int] = { 
    println(s"Adding $n") 
    n #:: next(2 * n) 
  } 
  1 #:: next(1) 
} 

This short snippet defines a function that creates a Stream of powers of 2. It is an infinite Stream initialized with the first value 1 and the tail is defined as another Stream. We added a println statement to allow us to study the evaluation of the elements:

scala> val s = powerOf2 
s: Stream[Int] = Stream(1, ?) 
 
scala> s.take(8).toList 
Adding 1 
Adding 2 
Adding 4 
Adding 8 
Adding 16 
Adding 32 
Adding 64 
res0: List[Int] = List(1, 1, 2, 4, 8, 16, 32, 64) 
 
scala> s.take(10).toList 
Adding 128 
Adding 256 
res1: List[Int] = List(1, 1, 2, 4, 8, 16, 32, 64, 128, 256) 

Note how the first eight elements are only evaluated when we perform the first conversion to a List. In the second call, only elements 9 and 10 are computed; the first eight are already realized and are part of the Stream.

Note

Based on the previous example, you may wonder if a Stream is an immutable data structure. Its fully qualified name is scala.collection.immutable.Stream, so this should give you a good hint. It is true that accessing the Stream and realizing some of its elements causes a modification of the Stream. However, the data structure is still considered immutable. The values it contains never change once assigned; even before being evaluated, the values exist and have a definition in the Stream.

The previous example shows an interesting property of Stream: it is possible to create a virtually infinite Stream. The Stream that is created by powerOf2 is unbounded and it is always possible to create one more element thanks to our next method. Another useful technique is the creation of recursive streams. A recursive Stream refers to itself in its definition. Let's adapt our previous example. Instead of returning the complete sequence of powers of 2, we will allow the caller to set a starting value:

def powerOf2(n: Int): Stream[Int] = math.pow(2, n).toInt #:: powerOf2(n+1) 

The math.pow is used to compute 2^n. Note that we calculate the first value and define the rest of the Stream as powerOf2(n+1), that is, the next power of 2:

scala> powerOf2(3).take(10).toList 
res0: List[Int] = List(8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096) 

The companion object of Stream provides several factory methods to instantiate a Stream. Let's look at a few of them:

  • Stream.apply: This allows us to create a Stream for a finite sequence of values:
       scala> Stream(1,2,3,4) 
       res0: scala.collection.immutable.Stream[Int] = Stream(1, ?) 
       scala> Stream(List(1,2,3,4):_*) 
       res1: scala.collection.immutable.Stream[Int] = Stream(1, ?) 
  • Stream.fill[A](n: Int)(a: => A): This produces a Stream containing the element a, n times:
       scala> Stream.fill(4)(10) 
       res0: scala.collection.immutable.Stream[Int] = Stream(10, ?) 
       scala> res0.toList 
       res1: List[Int] = List(10, 10, 10, 10) 
  • Stream.from(start: Int): This creates an increasing sequence of integers beginning with start:
       scala> Stream.from(4) 
       res0: scala.collection.immutable.Stream[Int] = Stream(4, ?) 
       scala> res0.take(3).toList 
       res1: List[Int] = List(4, 5, 6) 

We invite you to look at the other methods that are available on the companion object. Note that a Stream can also be constructed from a List directly, as follows:

scala> List(1,2,3,4,5).toStream 
res0: scala.collection.immutable.Stream[Int] = Stream(1, ?) 

The previous code may be misleading. Turning a List into a Stream does not spare the price of evaluating the whole List in memory. Similarly, if we were to apply transformations (such as map or filter) to the List before the call to toStream, we would be performing these computations on the entire List.

Just like a List, you can pattern match on a Stream, as follows:

scala> val s = Stream(1,2,3,4) 
s: scala.collection.immutable.Stream[Int] = Stream(1, ?) 
scala> s match { 
     | case _ #:: _ #:: i #:: _ => i 
     | } 
res0: Int = 3 

This pattern matching extracts the third element from the s stream. Pattern matching on a stream forces the realization of the elements required to evaluate the match expression. In the preceding case, the first three items are calculated.

Note

To pattern match on an empty stream, you can use the Stream.Empty object. It is a singleton instance to represent an empty Stream. It works similarly to Nil for List. Note that the object Stream contains an empty method returning this singleton; however, pattern matching requires a stable identifier, and it cannot use calls to a method as a valid case.

Transforming events

Returning to the reporting system, how can we apply the principles of event sourcing and leverage Stream to change how reports are generated? To compute TradingPerformanceTrend for a client, we need to compute PnL trend values for three time periods: each hour, each day, and each seven days. We can write a method with the following signature that gets us closer to identifying the PnL for each trend:

def processPnl(e: OrderBookEvent, s: TradeState): (TradeState, Option[PnlEvent]) 

The signature of processPnl accepts an OrderBookEvent and state in the form of TradeState to produce a new TradeState and, optionally, a PnlEvent. Let's first inspect PnlEvent to understand the end result of this method before inspecting TradeState:

sealed trait PnlEvent 
case class PnlIncreased(created: EventInstant, clientId: ClientId, 
  ticker: Ticker, profit: Pnl) extends PnlEvent 
case class PnlDecreased(created: EventInstant, clientId: ClientId, 
  ticker: Ticker, loss: Pnl)extends PnlEvent 
 
case class Pnl(value: BigDecimal) extends AnyVal { 
  def isProfit: Boolean = value.signum >= 0 
} 
object Pnl { 
  def fromExecution(buy: Price, sell: Price): Pnl = 
    Pnl(sell.value - buy.value) 
 
  val zero: Pnl = Pnl(BigDecimal(0)) 
} 

We see that PnlEvent models an ADT that expresses when a client's PnL increased or decreased. Using the past tense to name the event (for example, increased) makes it clear that this is a fact or a record of something that has completed. We have not yet looked at how TradeState is defined or the implementation of processPnl, but we can already infer the behavior by studying the emitted events. We display the definition of TradeState, which is needed to correlate submitted orders with executions, as follows:

case class PendingOrder(ticker: Ticker, price: Price,  
  clientId: ClientId)  
 
  case class TradeState( 
    pendingBuys: Map[OrderId, PendingOrder], 
    pendingSells: Map[OrderId, PendingOrder]) { 
    def cancelOrder(id: OrderId): TradeState = copy( 
      pendingBuys = pendingBuys - id, pendingSells = pendingSells - id) 
    def addPendingBuy(o: PendingOrder, id: OrderId): TradeState = 
      copy(pendingBuys = pendingBuys + (id -> o)) 
    def addPendingSell(o: PendingOrder, id: OrderId): TradeState = 
      copy(pendingSells = pendingSells + (id -> o)) 
  } 
object TradeState { 
 val empty: TradeState = TradeState(Map.empty, Map.empty) 
} 

Next, we inspect the implementation of processPnl to view how PnlEvents are created, as follows:

  def processPnl( 
    s: TradeState, 
    e: OrderBookEvent): (TradeState, Option[PnlEvent]) = e match { 
    case BuyOrderSubmitted(_, id, t, p, cId) => 
      s.addPendingBuy(PendingOrder(t, p, cId), id) -> None 
    case SellOrderSubmitted(_, id, t, p, cId) => 
      s.addPendingSell(PendingOrder(t, p, cId), id) -> None 
    case OrderCanceled(_, id) => s.cancelOrder(id) -> None 
    case OrderExecuted(ts, id, price) => 
      val (p, o) = (s.pendingBuys.get(id), s.pendingSells.get(id)) match { 
        case (Some(order), None) => 
          Pnl.fromBidExecution(order.price, price) -> order 
        case (None, Some(order)) => 
          Pnl.fromOfferExecution(price, order.price) -> order 
        case error => sys.error( 
          s"Unsupported retrieval of ID = $id returned: $error") 
      } 
      s.cancelOrder(id) -> Some( 
        if (p.isProfit) PnlIncreased(ts, o.clientId, o.ticker, p) 
        else PnlDecreased(ts, o.clientId, o.ticker, p)) 
  } 

This implementation shows that the PnlEvent is pattern matched to determine the event type, and this is handled accordingly. When an order is submitted, TradeState is updated to reflect that there is a new pending order that will be either canceled or executed. When an order is canceled, the pending order is removed from TradeState. When an execution occurs, the pending order is removed and, additionally, a PnlEvent is emitted after computing the trade PnL. The trade PnL compares the execution price to the pending order's original price.

PnlEvent provides enough information to compute PnL trend performance for all three time periods (hour, day, and seven days) required by TradingPerformanceTrend. The transformation from OrderBookEvent to PnlEvent is side-effect-free, and the creation of a new event, instead of replacing current state, leads to an immutable model. In the light of these characteristics, processPnl is easily unit-testable and makes the intent explicit. By making the intent explicit, it is possible to communicate with less technical stakeholders about how the system works.

Using PnlEvent as an input to a method that follows the analogous (State, InputEvent) => (State, Option[OutputEvent]) signature, we can now compute hourly PnL trend, as follows:

def processHourlyPnl(e: PnlEvent, s: HourlyState): (HourlyState, Option[HourlyPnlTrendCalculated]) 

This signature shows that, by maintaining state in HourlyState, it is possible to emit the HourlyPnlTrendCalculated event. The emitted event is defined, as follows:

case class HourlyPnlTrendCalculated( 
      start: HourInstant, 
      clientId: ClientId, 
      ticker: Ticker, 
      pnl: LastHourPnL) 

For a particular hour, client ID, and ticker, HourlyPnlTrendCalculated is a record of whether the last hour PnL is positive or negative. The HourInstant class is a value class with a companion object method that transforms an instant to the start of the hour:

case class HourInstant(value: Instant) extends AnyVal { 
  def isSameHour(h: HourInstant): Boolean = 
    h.value.toDateTime.getHourOfDay == value.toDateTime.getHourOfDay 
} 
object HourInstant { 
  def create(i: EventInstant): HourInstant = 
    HourInstant(i.value.toDateTime.withMillisOfSecond(0) 
     .withSecondOfMinute(0).withMinuteOfHour(0).toInstant) 
} 

Let's have a look at how HourlyState is defined to better understand the state that is needed to yield HourlyPnlTrendCalculated:

case class HourlyState( 
      keyToHourlyPnl: Map[(ClientId, Ticker), (HourInstant, Pnl)]) 
object HourlyState { 
 val empty: HourlyState = HourlyState(Map.empty) 
} 

For a ClientId and a Ticker, the PnL for the current hour is stored in HourlyState. Accumulating the PnL allows processHourlyPnl to determine the PnL trend at the end of an hour. We now inspect the implementation of processHourlyPnl to see how PnlEvent is transformed into HourlyPnlTrendCalculated:

def processHourlyPnl( 
 s: HourlyState, 
 e: PnlEvent): (HourlyState, Option[HourlyPnlTrendCalculated]) = { 
 def processChange( 
   ts: EventInstant, 
   clientId: ClientId, 
   ticker: Ticker, 
   pnl: Pnl): (HourlyState, Option[HourlyPnlTrendCalculated]) = { 
   val (start, p) = s.keyToHourlyPnl.get((clientId, ticker)).fold( 
     (HourInstant.create(ts), Pnl.zero))(identity) 
   start.isSameHour(HourInstant.create(ts)) match { 
     case true => (s.copy(keyToHourlyPnl = s.keyToHourlyPnl + 
       ((clientId, ticker) ->(start, p + pnl))), None) 
     case false => (s.copy(keyToHourlyPnl = 
       s.keyToHourlyPnl + ((clientId, ticker) -> 
         (HourInstant.create(ts), Pnl.zero + pnl))), 
       Some(HourlyPnlTrendCalculated(start, clientId, ticker, 
         p.isProfit match { 
           case true => LastHourPositive 
           case false => LastHourNegative 
         }))) 
   } 
 } 
 
 e match { 
   case PnlIncreased(ts, clientId, ticker, pnl) => processChange( 
     ts, clientId, ticker, pnl) 
   case PnlDecreased(ts, clientId, ticker, pnl) => processChange( 
     ts, clientId, ticker, pnl) 
 } 
} 

Handling an increased and decreased PnL follows the same flow. The inner-method named processChange handles the identical processing steps. The processChange determines whether or not to emit HourlyPnlTrendCalculated by comparing the HourInstant value that is added when an entry is first added to the state with the hour of the timestamp provided by the event. When the comparison shows the hour has changed, then the hourly PnL trend has been computed because the hour is completed. When the hour is unchanged, the provided PnL is added to the state's PnL to continue accumulating the hour's PnL.

Note

An obvious shortcoming of this approach is that, when a client or a ticker does not have any executed orders, it will not be possible to determine that the hour is completed. For simplicity, we are not treating time as a first-class event. However, you can imagine how it is possible to model the passing of time as an event that is a second input to processHourlyPnl. For example, the event might be the following:

case class HourElapsed(hour: HourInstant)

To use this event, we could change the signature of processHourlyPnl to receive an event argument that is of the Either[HourElapsed, PnlEvent] type. Scheduling HourElapsed on a timer enables us to modify the implementation of processHourlyPnl to emit HourlyPnlTrendCalculated as soon as the hour elapses instead of when a trade occurs in the next hour. This simple example shows how you can model time as an explicit part of the domain when you consider your system from an event sourcing point of view.

It is straightforward to imagine writing analogous methods that emit events for the daily and seven day PnL trend events, and then a method that awaits all three PnL trend events to produce the TradingPerformanceTrendGenerated event. The final step is to write a side-effecting method that persists TradingPerformanceTrend so that it can be read by the web portal. At this point, we have a collection of methods that performs transformations on events, but they are not yet wired together cohesively. Next, we take a look at how to create a pipeline to transform events.

Note

Note that, in this case study, we do not actually calculate a PnL. Performing a real PnL calculation would involve more complicated algorithms and would force us to introduce more domain concepts. We opted for a simpler approach with a report that is closer to an exposure report. This allows us to focus on the code and the programming practices that we want to illustrate.

Building the event sourcing pipeline

We use the term pipeline to refer to an arranged set of transformations that may require multiple steps to yield a desired end result. This term brings to mind an image of a set of pipes spanning multiple directions with twists and turns along the way. Our goal is to write a program that receives PnlEvents traits and prints the HourlyPnlTrendCalculated events to a standard output. In a true production environment, you can imagine replacing printing to standard output with writing to a persistent data store. In either case, we are building a pipeline that performs a set of referentially transparent transformations and concludes with a side-effect.

The pipeline must accumulate the intermediate state of each transformation as new events are processed. In the functional programming paradigm, accumulation is often associated with a foldLeft operation. Let's look at a toy example that sums a list of integers to better understand accumulation:

val sum = Stream(1, 2, 3, 4, 5).foldLeft(0) { case (acc, i) => acc + i } 
println(sum) // prints 15  

Here, we see foldLeft applied to compute the sum of a list of integers by providing an initial sum value of zero and currying a function to add the current element to the accumulated sum. The acc value is an often used shorthand for 'accumulator'. In this example, the accumulator and the list elements share the same data type, integer. This is merely a coincidence and is not a requirement for foldLeft operations. This implies that the accumulator can be a different type than the collection element.

We can use foldLeft as the basis of our event sourcing pipeline to support processing a list of OrderBookEvents while accumulating intermediate state. From the implementation of the two processing methods, we saw the need to maintain TradeState and HourlyState. We define PipelineState to encapsulate the required state, as follows:

case class PipelineState(tradeState: TradeState, hourlyState: HourlyState) 
object PipelineState { 
  val empty: PipelineState = PipelineState(TradeState.empty, HourlyState.empty) 
} 

PipelineState serves as the accumulator when folding over the OrderBookEvent, allowing us to store the intermediate state for both of the transformation methods. Now, we are ready to define the signature of our pipeline:

def pipeline(initial: PipelineState, f: HourlyPnlTrendCalculated => Unit, xs: List[OrderBookEvent]): PipelineState 

The pipeline accepts the initial state, a side-effecting function to be invoked when an HourlyPnlTrendCalculated event is generated, and a set of OrderBookEvents to source. The return value of the pipeline is the state of the pipeline once the events are processed. Let's look at how we can leverage foldLeft to implement pipeline:

def pipeline( 
    initial: PipelineState, 
    f: HourlyPnlTrendCalculated => Unit, 
    xs: Stream[OrderBookEvent]): PipelineState = xs.foldLeft(initial) { 
    case (PipelineState(ts, hs), e) => 
      val (tss, pnlEvent) = processPnl(ts, e) 
      PipelineState(tss, 
        pnlEvent.map(processHourlyPnl(hs, _)).fold(hs) { 
          case (hss, Some(hourlyEvent)) => 
            f(hourlyEvent) 
            hss 
          case (hss, None) => hss 
        }) 
  } 

The implementation of pipeline is based on folding over the provided events using the provided PipelineState as a starting point for accumulation. The curried function provided to foldLeft is where the wiring of transformations takes place. Stitching together the two transformation methods and the side-effecting event handler requires handling several different scenarios. Let's walk through each of the possible cases to better understand how the pipeline works. The processPnl is invoked to produce a new TradeState and optionally yield a PnlEvent. If no PnlEvent is generated, then processHourlyPnl is not invoked and the previous HourlyState is returned.

If a PnlEvent is generated, then processHourlyPnl is evaluated to determine whether an HourlyPnlTrendCalculated is created. When HourlyPnlTrendCalculated is generated, then the side-effecting HourlyPnlTrendCalculated event handler is invoked and the new HourlyState is returned. If no HourlyPnlTrendCalculated is generated, then the existing HourlyState is returned.

We construct a simple example to prove that the pipeline works as intended, as follows:

val now = EventInstant(HourInstant.create(EventInstant( 
      new Instant())).value) 
    val Foo = Ticker("FOO") 
 
    pipeline(PipelineState.empty, println, Stream( 
      BuyOrderSubmitted(now, OrderId(1), Foo, Price(21.07), ClientId(1)), 
      OrderExecuted(EventInstant(now.value.plus(Duration.standardMinutes(30))), 
        OrderId(1), Price(21.00)), 
      BuyOrderSubmitted(EventInstant(now.value.plus( 
        Duration.standardMinutes(35))), 
        OrderId(2), Foo, Price(24.02), ClientId(1)), 
      OrderExecuted(EventInstant(now.value.plus(Duration.standardHours(1))), 
        OrderId(2), Price(24.02)))) 

At the start of the hour, a buy order is submitted for the stock, FOO. Within the hour, the buy order is executed at a price lower than the buying price, indicating the trade was profitable. As we know, the current implementation relies on executions in the subsequent hour in order to produce HourlyPnlTrendCalculated.  To create this event, a second buy order is submitted at the start of the second hour. Running this snippet produces a single HourlyPnlTrendCalculated event that is written to standard output:

HourlyPnlTrendCalculated(HourInstant(2016-02-15T20:00:00.000Z),ClientId(1),Ticker(FOO),LastHourPositive) 

Although the wiring together of transformations is somewhat involved, we managed to build a simple event sourcing pipeline using only the Scala standard library and our existing knowledge of Scala collections. This example demonstrated the power of foldLeft to help build an event sourcing pipeline. Using this implementation, we can write a fully-featured program that is able to write a pregenerated version of the performance report to a persistent data store that can be read by the web portal. This new design allows us to shift the burden of report generation outside the web portal's responsibilities, allowing the web portal to provide a responsive user experience. Another benefit of this new approach is how it puts a domain-oriented language at the center of the design. All our events use business terms and focus on modeling domain concepts, making it easier for developers and stakeholders to communicate with each other.

Note

You might be wondering about a data structure that shares some characteristics of Stream that we did not yet mention: Iterator. As the name implies, Iterator provides facilities to iterate over a sequence of data. Its simplified definition boils down to the following:

trait Iterator[A] { 
  def next: A 
  def hasNext: Boolean 
}

Like Stream, an Iterator is able to avoid loading an entire dataset into memory, which enables programs to be written with constant memory usage. Unlike Stream, an Iterator is mutable and intended for only a single iteration over a collection (it extends the TraversableOnce trait). It should be noted that, according to the standard library documentation, one should never use an iterator after calling a method on it. For example, calling size on an Iterator returns the size of the sequence, but it also consumes the entire sequence and renders the instance of Iterator useless. The only exceptions to this rule are next and hasNext. These properties lead to software that is difficult to reason with, which is the antithesis of what we strive for as functional programmers. For this reason, we omit an in-depth discussion about Iterator.

We encourage you to further explore event sourcing by reading the documentation of the Event Store database at http://docs.geteventstore.com/introduction/event-sourcing-basics/. Event Store is a database that is developed around the concept of event sourcing. Event Store was created by Greg Young, a notable writer on the topic of event sourcing. While enriching your understanding about event sourcing, reflect on when you believe it is appropriate to apply the event sourcing technique. For CRUD applications that have simple behavior, event sourcing may not be a worthwhile time investment. When you model more complex behaviors or consider scenarios involving strict performance and scaling requirements, the time investment for event sourcing may become justified. For example, like we saw with performance trend reporting, considering the performance challenges from the event sourcing paradigm exposed an entirely different way of approaching the design.

As you continue exploring the world of stream processing, you will discover that you wish to construct more complex transformations than our event sourcing pipeline example. To continue digging deeper into the the topic of stream processing, we suggest researching two relevant libraries: akka streams and functional streams (formerly, scalaz-stream). These libraries provide tools to build more sophisticated transformation pipelines using different abstractions than Stream. In combination with learning about Event Store, you will deepen your understanding of how event sourcing ties in with stream processing.

Streaming Markov chains

With the simple program at the end of the previous section, we demonstrated that we can wire together a pipeline of transformations operating on events. As a well-intentioned engineer, you wish to develop automated tests that prove the pipeline works as intended. One approach is to add a sample of historical production data into the repository to build tests. This is often a good choice, but you are concerned that the sample is not large enough to represent a broad number of scenarios. Another option is to write a generator of events that can create production-like data. This approach requires more up-front effort, but it yields a more dynamic way to exercise the pipeline.

A recent lunchtime conversation with Dave about Markov chains sparked the thought about testing the event sourcing pipeline with generated data. Dave described how a Markov chain is a statistical model of state transitions that only relies on the current state to determine the next state. Dave is representing the states of the stock market as a Markov chain, allowing him to build trading strategies based on whether or not he perceives the stock market to be in an upswing, downswing, or steady state. After reading through the Markov chain Wikipedia page, you envision writing an event generator based on a Markov chain.

Our end goal is to be able to generate an infinite number of OrderBookEvents that follows production-like patterns. For example, we know from previous experience that proportionally there are often more cancelations than executions, particularly during volatile markets. The event generator should be able to represent different probabilities of events occurring. As a Markov chain only depends on its current state to identify its next state, a Stream is a natural fit because we only need to inspect the current element to determine the next element. For our representation of a Markov chain, we need to identify the chance of transitioning from the current state to any of the other possible states. The following table illustrates one possible set of probabilities:

Current state

Chance of buy

Chance of sell

Chance of execution

Chance of cancel

BuyOrderSubmitted

10%

15%

40%

40%

SellOrderSubmitted

25%

10%

35%

25%

OrderCanceled

60%

50%

40%

10%

OrderExecuted

30%

30%

55%

30%

This table defines the likelihood of receiving an OrderBookEvent given the current OrderBookEvent. For example, given that a sell order was submitted, there is a 10% chance of seeing a second sell order next and a 35% chance that an execution occurs next. We can develop state transition probabilities according to the market conditions that we wish to simulate in the pipeline.

We can model the transitions using the following domain:

  sealed trait Step 
  case object GenerateBuy extends Step 
  case object GenerateSell extends Step 
  case object GenerateCancel extends Step 
  case object GenerateExecution extends Step 
 
  case class Weight(value: Int) extends AnyVal 
  case class GeneratedWeight(value: Int) extends AnyVal 
  case class StepTransitionWeights( 
    buy: Weight, 
    sell: Weight, 
    cancel: Weight, 
    execution: Weight) 

In this domain, Step is an ADT that models the possible states. For a given Step, we will associate StepTransitionWeights to define the probability of transitioning to different states based on provided weightings. GeneratedWeight is a value class that defines the weight generated for the current Step. We will use GeneratedWeight to drive the transition from one Step to the next Step.

Our next step, so-to-speak, is to make use of our domain to generate events according to probabilities that we define. To make use of Step, we define a representation of the Markov chain state that is required, as follows:

  case class State( 
    pendingOrders: Set[OrderId], 
    step: Step) 

The Markov chain requires knowledge of the current state, which is represented by step. Additionally, we put a twist on the Markov chain by maintaining the set of orders that are submitted that are neither canceled nor executed in pendingOrders. This additional state is needed for two reasons. First, generating cancel and execution events requires linking to a known order ID. Second, we constrain our representation of a Markov chain by requiring at least one pending order to exist before creating a cancel or an execution. If there are no pending orders, it is invalid to transition to a state that generates either OrderCanceled or OrderExecuted.

Using State, we can write a method with the following signature to manage transitions:

def nextState( 
      weight: StepTransitionWeights => GeneratedWeight, 
      stepToWeights: Map[Step, StepTransitionWeights], 
      s: State): (State, OrderBookEvent) 

Given a way to generate a weight from the current StepTransitionWeights, a mapping of Step to StepTransitionWeights, and the current State, we are able to produce a new State and an OrderBookEvent. For brevity, we omit the implementation of nextState because we want to focus most intently on stream processing. From the signature, we have enough insight to apply the method, but we encourage you to inspect the repository to fill in any blanks in your understanding.

The nextState method is the driver of state transitions in our Markov chain representation. We can now generate an infinite Stream of OrderBookEvents based on transition probabilities using the convenience Stream method, iterate. From the Scala documentation, iterate produces an infinite stream by repeatedly applying a function to the start value. Let's see how we can use iterate:

val stepToWeights = Map[Step, StepTransitionWeights]( 
      GenerateBuy -> StepTransitionWeights( 
        Weight(10), Weight(25), Weight(40), Weight(40)), 
      GenerateSell -> StepTransitionWeights( 
        Weight(25), Weight(10), Weight(40), Weight(25)), 
      GenerateCancel -> StepTransitionWeights( 
        Weight(60), Weight(50), Weight(40), Weight(10)), 
      GenerateExecution -> StepTransitionWeights( 
        Weight(30), Weight(30), Weight(60), Weight(25))) 
 
    val next = State.nextState( 
      t => GeneratedWeight(Random.nextInt(t.weightSum.value) + 1), 
      stepToWeights, _: State) 
 
    println("State	Event") 
    Stream.iterate(State.initialBuy) { case (s, e) => next(s) } 
      .take(5) 
      .foreach { case (s, e) => println(s"$s	$e")  } 

This snippet creates a Markov chain to generate various OrderBookEvents by providing a mapping of Step to StepTransitionWeights as the basis to invoke State.nextState. State.nextState is partially applied, leaving the current state unapplied. The next function has the State => (State, OrderBookEvent) signature. With the necessary scaffolding in place, Stream.iterate is used to generate an infinite sequence of multiple OrderBookEvents by invoking next. Similar to foldLeft, we provide an initial value to begin the initialBuy iteration, which is defined as follows:

val initialBuy: (State, OrderBookEvent) = { 
      val e = randomBuySubmitted() 
      State(Set(e.id), GenerateBuy) -> e 
    } 

Running this snippet produces output that is similar to the following:

    State = State(Set(OrderId(1612147067584751204)),GenerateBuy)
    Event = BuyOrderSubmitted(EventInstant(2016-02-22T23:52:40.662Z),OrderId(1612147067584751204),Ticker(FOO),Price(32),ClientId(28))
    State = State(Set(OrderId(1612147067584751204), OrderId(7606120383704417020)),GenerateBuy)
    Event = BuyOrderSubmitted(EventInstant(2016-02-22T23:52:40.722Z),OrderId(7606120383704417020),Ticker(XYZ),Price(18),ClientId(54))
    State = State(Set(OrderId(1612147067584751204), OrderId(7606120383704417020), OrderId(5522110701609898973)),GenerateBuy)
    Event = BuyOrderSubmitted(EventInstant(2016-02-22T23:52:40.723Z),OrderId(5522110701609898973),Ticker(XYZ),Price(62),ClientId(28))
    State = State(Set(OrderId(7606120383704417020), OrderId(5522110701609898973)),GenerateExecution)
    Event = OrderExecuted(EventInstant(2016-02-22T23:52:40.725Z),OrderId(1612147067584751204),Price(21))
    State = State(Set(OrderId(7606120383704417020), OrderId(5522110701609898973), OrderId(5898687547952369568)),GenerateSell)
    Event = SellOrderSubmitted(EventInstant(2016-02-22T23:52:40.725Z),OrderId(5898687547952369568),Ticker(BAR),Price(76),ClientId(45))

Of course, each invocation differs depending upon the random values that are created for GeneratedWeight, which is used to probabilistically select the next transition. This snippet provides a base to compose larger-scale tests for the reporting infrastructure. Through this example, we see an interesting application of Markov chains to support generating representative events from various market conditions without requiring access to volumes of production data. We are now able to write tests to confirm whether or not the reporting infrastructure correctly computes PnL trends in different market conditions.

Stream caveats

For all their goodness, Stream should be used with caution. In this section, we mention a few of the main caveats of Stream, and how to avoid them.

Streams are memoizers

While views do not cache the result of a computation and, therefore, recalculate and realize each element each time it is accessed, Stream does save the final form of its elements. An element is only ever realized once, the first time it is accessed. While this is a great characteristic to avoid computing the same result several times, it can also lead to a large consumption of memory, to the point where your program may eventually run out of memory.

To avoid Stream memoization, it is good practice to avoid storing a Stream in a val. Using a val creates a permanent reference to the head of the Stream, ensuring that every element that is realized will be cached. If a Stream is defined as a def, it can be garbage collected as soon as it is no longer needed.

Memoization can happen when calling certain methods that are defined on Stream. For example, drop or dropWhile will evaluate and memoize all the intermediate elements to be dropped. The elements are memoized as the methods are defined on an instance of Stream (and Stream has a reference on its own head). We can implement our own drop function to avoid caching the intermediate elements in memory:

@tailrec 
def drop[A](s: Stream[A], count: Int): Stream[A] = count match { 
  case 0 => s 
  case n if n > 0 => drop(s.tail, count - 1) 
  case n if n < 0 => throw new Exception("cannot drop negative count") 
} 

We pattern match on the value of count to know whether we can return the given Stream or need to perform a recursive call on the tail. Our method is tail-recursive. This makes sure that we do not keep a reference to the head of the Stream, since a tail-recursive function recycles its reference each time that it loops. Our s reference will only point to the remaining part of the Stream, not the head.

Another example of a problematic method is max. Calling max will memoize all the elements of the Stream to determine which one is the greatest. Let's implement a safe version of max, as follows:

def max(s: => Stream[Int]): Option[Int] = { 
 @tailrec 
 def loop(ss: Stream[Int], current: Option[Int]): Option[Int] = ss match { 
   case Stream.Empty => current 
   case h #:: rest if current.exists(_ >= h) => loop(rest, current) 
   case h #:: rest => loop(rest, Some(h)) 
 } 
 loop(s, None) 
} 

This time, we used an internal tail recursive function to be able to expose a friendly API. We represent the current max value as an Option[Int] to handle the case where the method is called with an empty Stream. Note that max accepts s as a by-name parameter. This is important because, otherwise, we would be keeping a reference to the head of the Stream before calling the internal tail-recursive loop method. Another possible implementation is as follows:

def max(s: => Stream[Int]): Option[Int] = { 
 @tailrec 
 def loop(ss: Stream[Int], current: Int): Int = ss match { 
   case Stream.Empty => current 
   case h #:: hs if h > current => loop(hs, h) 
   case h #:: hs if h <= current => loop(hs, current) 
 } 
 
 s match { 
   case Stream.Empty => None 
   case h #:: rest => Some(loop(rest, h)) 
 } 
} 

This implementation is arguably simpler. We check in the max function whether the Stream is empty or not; this allows us to either return right away (with None), or call loop with a valid default value (the first element in the Stream). The loop does not have to deal with Option[Int] anymore. However, this example does not achieve the goal of avoiding memoization. The pattern matching will cause rest to keep a reference on the entire tail of the original Stream, which will prevent garbage collection of the intermediate elements. A good practice is to only pattern match on a Stream inside a consuming, tail-recursive method.

Stream can be infinite

We saw during our overview that it is possible to define an infinite Stream. However, you need to be careful when working with an infinite Stream. Some methods may cause the evaluation of the entire Stream, leading to OutOfMemoryError. Some are obvious, such as toList, which will try to store the entire Stream into a List, causing the realization of all the elements. Others are more subtle. For example, Stream has a size method that is similar to the one defined on List. Calling size on an infinite Stream will cause the program to run out of memory. Similarly, max and sum will attempt to realize the entire sequence and crash your system. This behavior is particularly dangerous as Stream extends Seq, the base trait for sequences. Consider the following code:

def range(s: Seq[Int]): Int = s.max - s.min 

This short method takes a Seq[Int] as single parameter, and returns its range, that is, the difference between the greatest and lowest elements. As Stream extends Seq the following call is valid:

val s: Stream[Int] = ??? 
range(s) 

The compiler will happily and promptly generate the bytecode for this snippet. However, s could be defined as an infinite Stream:

val s: Stream[Int] = powerOf2(0) 
range(s) 
java.lang.OutOfMemoryError: GC overhead limit exceeded 
  at .powerOf2(<console>:10) 
  at $anonfun$powerOf2$1.apply(<console>:10) 
  at $anonfun$powerOf2$1.apply(<console>:10) 
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233) 
  at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223) 
  at scala.collection.immutable.Stream.reduceLeft(Stream.scala:627) 
  at scala.collection.TraversableOnce$class.max(TraversableOnce.scala:229) 
  at scala.collection.AbstractTraversable.max(Traversable.scala:104) 
  at .range(<console>:10) 
  ... 23 elided 

The call to range never returns due to the implementation of max and min. This example illustrates a good practice that we mentioned earlier in this chapter.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset