After deploying a new version of the web portal that generates the performance report containing your view changes, you begin wondering what else can be done to improve report generation performance. It strikes you that, for a particular time interval, the report is immutable. The computed PnL trend for a particular hour never changes once computed. Although the report is immutable, it is needlessly being recomputed each time a client requests the report. Given this line of thinking, you wonder how difficult it is to generate a new report each hour as new execution data becomes available. On-the-fly, order and execution events can be transformed as they are created into the inputs that are required for the client performance trend report. With a pregenerated report, the web portal performance issues should completely disappear because the responsibility of report generation no longer belongs to the web portal.
This new report generation strategy leads us to explore a new design paradigm, called event sourcing. Event sourcing describes an architectural approach to designing systems that relies on processing events over time instead of relying on a model of the current state to answer different questions. The reporting system that we worked on performs significant work to identify the subset of orders that executed because current state rather than events is stored. Imagine that, instead of working with data, such as Order
and Execution
, we instead worked with events that represent things that happened in the system over time. One relevant event to report could be the OrderExecuted
event that can be modeled, as follows:
case class OrderExecuted(created: CreatedTimestamp, orderId: OrderId, price: Price)
This event describes something that happened instead of representing a snapshot of current state. To extend this example, imagine if Order
also included an optional Price
to denote execution price:
sealed trait Order { def created: CreatedTimestamp def id: OrderId def ticker: Ticker def price: Price def clientId: ClientId def executionPrice: Option[Price] }
If this data model is mapped to a relational database, executionPrice
would be a nullable database value that is overwritten when an execution occurs. When the domain model only reflects the current state, then immutability is lost. As a functional programmer, this statement should concern you because you understand the reasoning capabilities that immutability provides. Storing only the current state of data may also lead to excessively large objects that are difficult to program with. For example, how would you represent that an Order
was canceled? With the current approach, the most expedient method is to add a Boolean flag named isCanceled
. Over time, as your system's requirements become more complicated, the Order
object will grow and you will track more characteristics about the current state. This means that loading a set of Order
objects into memory from a database will grow more unwieldy due to growing memory requirements. This is a dilemma that you likely have experienced if you have extensive Object Relational Mapping (ORM) experience.
To avoid bloating Order
, you may try to deconstruct the concept of an order to support multiple use cases. For example, if you are only interested in executed orders, the model may change the executionPrice
datatype from Option[Price]
to Price
, and you may no longer require the canceled Boolean flag because, by definition, an executed order could not have been canceled.
Identifying multiple definitions or representations for what you once thought was a single concept is an important step toward addressing the shortcomings that we walked through. Extending this approach, we come back to the topic of event sourcing. We can replay a set of events to build OrderExecuted
. Let's slightly modify the events emitted from the order book to look like the following:
sealed trait OrderBookEvent case class BuyOrderSubmitted(created: CreatedTimestamp, id: OrderId, ticker: Ticker, price: Price, clientId: ClientId) extends OrderBookEvent case class SellOrderSubmitted(created: CreatedTimestamp, id: OrderId, ticker: Ticker, price: Price clientId: ClientId) extends OrderBookEvent case class OrderCanceled(created: CreatedTimestamp, id: OrderId) extends OrderBookEvent case class OrderExecuted(created: CreatedTimestamp, id: OrderId, price: Price) extends OrderBookEvent
If all OrderBookEvents
were persisted (for example, to disk), it is then possible to write a program that reads all the events and constructs a set of ExecutedOrders
by correlating BuyOrderSubmitted
and SellOrderSubmitted
events with OrderExecuted
events. An advantage that we see with this approach is that, over time, we are able to ask new questions about what happened in our system and then easily answer them by reading the events. In contrast, if a model built on the current state did not include executions when it was first designed, it is impossible to retroactively answer the question, "Which orders executed last week?"
Our new idea is exciting, and it has the potential to yield great improvements. However, it comes with a set of challenges. The main difference with the previous section is that our new use case does not load the Order
and Execution
collections in memory from a data store. Instead, we are planning to process the incoming OrderBookEvent
as it is generated by the order book. Conceptually, this approach still involves processing a sequence of data. However, with the previous approach, the entire data set existed prior to beginning any transformations. Processing events on-the-fly requires designing software that handles data that has not yet been generated. Clearly, neither eager collections nor views are a good tool for our new system. Luckily, the standard Scala library provides us with the right abstraction: Stream
. Let's take a closer look at this new collection type to better understand how Stream
can help us implement an event sourcing approach to the client performance reporting architecture.
A stream can be seen as a mix between a list and a view. Like a view, it is lazily evaluated and transformations are applied only when its elements are accessed or collected. Like a List
, the elements of a Stream
are only evaluated once. A Stream
is sometimes described as an unrealized List
, meaning that it is essentially a List
that has not yet been fully evaluated, or realized.
Where a List
can be constructed with the cons (::
) operator, a Stream
can be similarly constructed with its own operator:
> val days = "Monday" :: "Tuesday" :: "Wednesday" :: Nil days: List[String] = List(Monday, Tuesday, Wednesday) > val months = "January" #:: "February" #:: "March" #:: Stream.empty months: scala.collection.immutable.Stream[String] = Stream(January, ?)
The syntax to create a Stream
is close to the one to create a List
. One difference is the returned value. Where a List
is immediately evaluated, a Stream
is not. Only the first element ("January"
) is computed; the remaining values are still unknown (and denoted by a?
character).
Let's observe what happens when we access part of the stream:
scala> println(months.take(2).toList) List(January, February) scala> months res0: scala.collection.immutable.Stream[String] = Stream(January, February, ?)
We forced the evaluation of the first two elements of the Stream
by turning it into a List
(see the following sidebar). The first two months are printed. We then display the value of months
to discover that the second element ("February"
) is now computed.
scala> months.take(2) res0: scala.collection.immutable.Stream[String] = Stream(January, ?)
To highlight the evaluation characteristics of a Stream
, we look at another example of creating a Stream
:
def powerOf2: Stream[Int] = { def next(n: Int): Stream[Int] = { println(s"Adding $n") n #:: next(2 * n) } 1 #:: next(1) }
This short snippet defines a function that creates a Stream
of powers of 2. It is an infinite Stream
initialized with the first value 1 and the tail is defined as another Stream
. We added a println
statement to allow us to study the evaluation of the elements:
scala> val s = powerOf2 s: Stream[Int] = Stream(1, ?) scala> s.take(8).toList Adding 1 Adding 2 Adding 4 Adding 8 Adding 16 Adding 32 Adding 64 res0: List[Int] = List(1, 1, 2, 4, 8, 16, 32, 64) scala> s.take(10).toList Adding 128 Adding 256 res1: List[Int] = List(1, 1, 2, 4, 8, 16, 32, 64, 128, 256)
Note how the first eight elements are only evaluated when we perform the first conversion to a List
. In the second call, only elements 9 and 10 are computed; the first eight are already realized and are part of the Stream
.
Based on the previous example, you may wonder if a Stream
is an immutable data structure. Its fully qualified name is scala.collection.immutable.Stream
, so this should give you a good hint. It is true that accessing the Stream
and realizing some of its elements causes a modification of the Stream
. However, the data structure is still considered immutable. The values it contains never change once assigned; even before being evaluated, the values exist and have a definition in the Stream
.
The previous example shows an interesting property of Stream
: it is possible to create a virtually infinite Stream
. The Stream
that is created by powerOf2
is unbounded and it is always possible to create one more element thanks to our next
method. Another useful technique is the creation of recursive streams. A recursive Stream
refers to itself in its definition. Let's adapt our previous example. Instead of returning the complete sequence of powers of 2, we will allow the caller to set a starting value:
def powerOf2(n: Int): Stream[Int] = math.pow(2, n).toInt #:: powerOf2(n+1)
The math.pow
is used to compute 2^n. Note that we calculate the first value and define the rest of the Stream
as powerOf2(n+1)
, that is, the next power of 2:
scala> powerOf2(3).take(10).toList res0: List[Int] = List(8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096)
The companion object of Stream
provides several factory methods to instantiate a Stream
. Let's look at a few of them:
Stream.apply
: This allows us to create a Stream
for a finite sequence of values:scala> Stream(1,2,3,4) res0: scala.collection.immutable.Stream[Int] = Stream(1, ?) scala> Stream(List(1,2,3,4):_*) res1: scala.collection.immutable.Stream[Int] = Stream(1, ?)
Stream.fill[A](n: Int)(a: => A)
: This produces a Stream
containing the element a
, n
times:scala> Stream.fill(4)(10) res0: scala.collection.immutable.Stream[Int] = Stream(10, ?) scala> res0.toList res1: List[Int] = List(10, 10, 10, 10)
Stream.from(start: Int)
: This creates an increasing sequence of integers beginning with start
:scala> Stream.from(4) res0: scala.collection.immutable.Stream[Int] = Stream(4, ?) scala> res0.take(3).toList res1: List[Int] = List(4, 5, 6)
We invite you to look at the other methods that are available on the companion object. Note that a Stream
can also be constructed from a List
directly, as follows:
scala> List(1,2,3,4,5).toStream res0: scala.collection.immutable.Stream[Int] = Stream(1, ?)
The previous code may be misleading. Turning a List
into a Stream
does not spare the price of evaluating the whole List
in memory. Similarly, if we were to apply transformations (such as map
or filter
) to the List
before the call to toStream
, we would be performing these computations on the entire List
.
Just like a List
, you can pattern match on a Stream
, as follows:
scala> val s = Stream(1,2,3,4) s: scala.collection.immutable.Stream[Int] = Stream(1, ?) scala> s match { | case _ #:: _ #:: i #:: _ => i | } res0: Int = 3
This pattern matching extracts the third element from the s
stream. Pattern matching on a stream forces the realization of the elements required to evaluate the match expression. In the preceding case, the first three items are calculated.
To pattern match on an empty stream, you can use the Stream.Empty
object. It is a singleton instance to represent an empty Stream
. It works similarly to Nil
for List
. Note that the object Stream
contains an empty
method returning this singleton; however, pattern matching requires a stable identifier, and it cannot use calls to a method as a valid case
.
Returning to the reporting system, how can we apply the principles of event sourcing and leverage Stream
to change how reports are generated? To compute TradingPerformanceTrend
for a client, we need to compute PnL trend values for three time periods: each hour, each day, and each seven days. We can write a method with the following signature that gets us closer to identifying the PnL for each trend:
def processPnl(e: OrderBookEvent, s: TradeState): (TradeState, Option[PnlEvent])
The signature of processPnl
accepts an OrderBookEvent
and state in the form of TradeState
to produce a new TradeState
and, optionally, a PnlEvent
. Let's first inspect PnlEvent
to understand the end result of this method before inspecting TradeState
:
sealed trait PnlEvent case class PnlIncreased(created: EventInstant, clientId: ClientId, ticker: Ticker, profit: Pnl) extends PnlEvent case class PnlDecreased(created: EventInstant, clientId: ClientId, ticker: Ticker, loss: Pnl)extends PnlEvent case class Pnl(value: BigDecimal) extends AnyVal { def isProfit: Boolean = value.signum >= 0 } object Pnl { def fromExecution(buy: Price, sell: Price): Pnl = Pnl(sell.value - buy.value) val zero: Pnl = Pnl(BigDecimal(0)) }
We see that PnlEvent
models an ADT that expresses when a client's PnL increased or decreased. Using the past tense to name the event (for example, increased) makes it clear that this is a fact or a record of something that has completed. We have not yet looked at how TradeState
is defined or the implementation of processPnl
, but we can already infer the behavior by studying the emitted events. We display the definition of TradeState
, which is needed to correlate submitted orders with executions, as follows:
case class PendingOrder(ticker: Ticker, price: Price, clientId: ClientId) case class TradeState( pendingBuys: Map[OrderId, PendingOrder], pendingSells: Map[OrderId, PendingOrder]) { def cancelOrder(id: OrderId): TradeState = copy( pendingBuys = pendingBuys - id, pendingSells = pendingSells - id) def addPendingBuy(o: PendingOrder, id: OrderId): TradeState = copy(pendingBuys = pendingBuys + (id -> o)) def addPendingSell(o: PendingOrder, id: OrderId): TradeState = copy(pendingSells = pendingSells + (id -> o)) } object TradeState { val empty: TradeState = TradeState(Map.empty, Map.empty) }
Next, we inspect the implementation of processPnl
to view how PnlEvents
are created, as follows:
def processPnl( s: TradeState, e: OrderBookEvent): (TradeState, Option[PnlEvent]) = e match { case BuyOrderSubmitted(_, id, t, p, cId) => s.addPendingBuy(PendingOrder(t, p, cId), id) -> None case SellOrderSubmitted(_, id, t, p, cId) => s.addPendingSell(PendingOrder(t, p, cId), id) -> None case OrderCanceled(_, id) => s.cancelOrder(id) -> None case OrderExecuted(ts, id, price) => val (p, o) = (s.pendingBuys.get(id), s.pendingSells.get(id)) match { case (Some(order), None) => Pnl.fromBidExecution(order.price, price) -> order case (None, Some(order)) => Pnl.fromOfferExecution(price, order.price) -> order case error => sys.error( s"Unsupported retrieval of ID = $id returned: $error") } s.cancelOrder(id) -> Some( if (p.isProfit) PnlIncreased(ts, o.clientId, o.ticker, p) else PnlDecreased(ts, o.clientId, o.ticker, p)) }
This implementation shows that the PnlEvent
is pattern matched to determine the event type, and this is handled accordingly. When an order is submitted, TradeState
is updated to reflect that there is a new pending order that will be either canceled or executed. When an order is canceled, the pending order is removed from TradeState
. When an execution occurs, the pending order is removed and, additionally, a PnlEvent
is emitted after computing the trade PnL. The trade PnL compares the execution price to the pending order's original price.
PnlEvent
provides enough information to compute PnL trend performance for all three time periods (hour, day, and seven days) required by TradingPerformanceTrend
. The transformation from OrderBookEvent
to PnlEvent
is side-effect-free, and the creation of a new event, instead of replacing current state, leads to an immutable model. In the light of these characteristics, processPnl
is easily unit-testable and makes the intent explicit. By making the intent explicit, it is possible to communicate with less technical stakeholders about how the system works.
Using PnlEvent
as an input to a method that follows the analogous (State, InputEvent) => (State, Option[OutputEvent])
signature, we can now compute hourly PnL trend, as follows:
def processHourlyPnl(e: PnlEvent, s: HourlyState): (HourlyState, Option[HourlyPnlTrendCalculated])
This signature shows that, by maintaining state in HourlyState
, it is possible to emit the HourlyPnlTrendCalculated
event. The emitted event is defined, as follows:
case class HourlyPnlTrendCalculated( start: HourInstant, clientId: ClientId, ticker: Ticker, pnl: LastHourPnL)
For a particular hour, client ID, and ticker, HourlyPnlTrendCalculated
is a record of whether the last hour PnL is positive or negative. The HourInstant
class is a value class with a companion object method that transforms an instant to the start of the hour:
case class HourInstant(value: Instant) extends AnyVal { def isSameHour(h: HourInstant): Boolean = h.value.toDateTime.getHourOfDay == value.toDateTime.getHourOfDay } object HourInstant { def create(i: EventInstant): HourInstant = HourInstant(i.value.toDateTime.withMillisOfSecond(0) .withSecondOfMinute(0).withMinuteOfHour(0).toInstant) }
Let's have a look at how HourlyState
is defined to better understand the state that is needed to yield HourlyPnlTrendCalculated
:
case class HourlyState( keyToHourlyPnl: Map[(ClientId, Ticker), (HourInstant, Pnl)]) object HourlyState { val empty: HourlyState = HourlyState(Map.empty) }
For a ClientId
and a Ticker
, the PnL for the current hour is stored in HourlyState
. Accumulating the PnL allows processHourlyPnl
to determine the PnL trend at the end of an hour. We now inspect the implementation of processHourlyPnl
to see how PnlEvent
is transformed into HourlyPnlTrendCalculated
:
def processHourlyPnl( s: HourlyState, e: PnlEvent): (HourlyState, Option[HourlyPnlTrendCalculated]) = { def processChange( ts: EventInstant, clientId: ClientId, ticker: Ticker, pnl: Pnl): (HourlyState, Option[HourlyPnlTrendCalculated]) = { val (start, p) = s.keyToHourlyPnl.get((clientId, ticker)).fold( (HourInstant.create(ts), Pnl.zero))(identity) start.isSameHour(HourInstant.create(ts)) match { case true => (s.copy(keyToHourlyPnl = s.keyToHourlyPnl + ((clientId, ticker) ->(start, p + pnl))), None) case false => (s.copy(keyToHourlyPnl = s.keyToHourlyPnl + ((clientId, ticker) -> (HourInstant.create(ts), Pnl.zero + pnl))), Some(HourlyPnlTrendCalculated(start, clientId, ticker, p.isProfit match { case true => LastHourPositive case false => LastHourNegative }))) } } e match { case PnlIncreased(ts, clientId, ticker, pnl) => processChange( ts, clientId, ticker, pnl) case PnlDecreased(ts, clientId, ticker, pnl) => processChange( ts, clientId, ticker, pnl) } }
Handling an increased and decreased PnL follows the same flow. The inner-method named processChange
handles the identical processing steps. The processChange
determines whether or not to emit HourlyPnlTrendCalculated
by comparing the HourInstant
value that is added when an entry is first added to the state with the hour of the timestamp provided by the event. When the comparison shows the hour has changed, then the hourly PnL trend has been computed because the hour is completed. When the hour is unchanged, the provided PnL is added to the state's PnL to continue accumulating the hour's PnL.
An obvious shortcoming of this approach is that, when a client or a ticker does not have any executed orders, it will not be possible to determine that the hour is completed. For simplicity, we are not treating time as a first-class event. However, you can imagine how it is possible to model the passing of time as an event that is a second input to processHourlyPnl
. For example, the event might be the following:
case class HourElapsed(hour: HourInstant)
To use this event, we could change the signature of processHourlyPnl
to receive an event argument that is of the Either[HourElapsed, PnlEvent]
type. Scheduling HourElapsed
on a timer enables us to modify the implementation of processHourlyPnl
to emit HourlyPnlTrendCalculated
as soon as the hour elapses instead of when a trade occurs in the next hour. This simple example shows how you can model time as an explicit part of the domain when you consider your system from an event sourcing point of view.
It is straightforward to imagine writing analogous methods that emit events for the daily and seven day PnL trend events, and then a method that awaits all three PnL trend events to produce the TradingPerformanceTrendGenerated
event. The final step is to write a side-effecting method that persists TradingPerformanceTrend
so that it can be read by the web portal. At this point, we have a collection of methods that performs transformations on events, but they are not yet wired together cohesively. Next, we take a look at how to create a pipeline to transform events.
Note that, in this case study, we do not actually calculate a PnL. Performing a real PnL calculation would involve more complicated algorithms and would force us to introduce more domain concepts. We opted for a simpler approach with a report that is closer to an exposure report. This allows us to focus on the code and the programming practices that we want to illustrate.
We use the term pipeline to refer to an arranged set of transformations that may require multiple steps to yield a desired end result. This term brings to mind an image of a set of pipes spanning multiple directions with twists and turns along the way. Our goal is to write a program that receives PnlEvents
traits and prints the HourlyPnlTrendCalculated
events to a standard output. In a true production environment, you can imagine replacing printing to standard output with writing to a persistent data store. In either case, we are building a pipeline that performs a set of referentially transparent transformations and concludes with a side-effect.
The pipeline must accumulate the intermediate state of each transformation as new events are processed. In the functional programming paradigm, accumulation is often associated with a foldLeft
operation. Let's look at a toy example that sums a list of integers to better understand accumulation:
val sum = Stream(1, 2, 3, 4, 5).foldLeft(0) { case (acc, i) => acc + i } println(sum) // prints 15
Here, we see foldLeft
applied to compute the sum of a list of integers by providing an initial sum value of zero and currying a function to add the current element to the accumulated sum. The acc
value is an often used shorthand for 'accumulator'. In this example, the accumulator and the list elements share the same data type, integer. This is merely a coincidence and is not a requirement for foldLeft
operations. This implies that the accumulator can be a different type than the collection element.
We can use foldLeft
as the basis of our event sourcing pipeline to support processing a list of OrderBookEvents
while accumulating intermediate state. From the implementation of the two processing methods, we saw the need to maintain TradeState
and HourlyState
. We define PipelineState
to encapsulate the required state, as follows:
case class PipelineState(tradeState: TradeState, hourlyState: HourlyState) object PipelineState { val empty: PipelineState = PipelineState(TradeState.empty, HourlyState.empty) }
PipelineState
serves as the accumulator when folding over the OrderBookEvent
, allowing us to store the intermediate state for both of the transformation methods. Now, we are ready to define the signature of our pipeline:
def pipeline(initial: PipelineState, f: HourlyPnlTrendCalculated => Unit, xs: List[OrderBookEvent]): PipelineState
The pipeline
accepts the initial state, a side-effecting function to be invoked when an HourlyPnlTrendCalculated
event is generated, and a set of OrderBookEvents
to source. The return value of the pipeline is the state of the pipeline once the events are processed. Let's look at how we can leverage foldLeft
to implement pipeline
:
def pipeline( initial: PipelineState, f: HourlyPnlTrendCalculated => Unit, xs: Stream[OrderBookEvent]): PipelineState = xs.foldLeft(initial) { case (PipelineState(ts, hs), e) => val (tss, pnlEvent) = processPnl(ts, e) PipelineState(tss, pnlEvent.map(processHourlyPnl(hs, _)).fold(hs) { case (hss, Some(hourlyEvent)) => f(hourlyEvent) hss case (hss, None) => hss }) }
The implementation of pipeline
is based on folding over the provided events using the provided PipelineState
as a starting point for accumulation. The curried function provided to foldLeft
is where the wiring of transformations takes place. Stitching together the two transformation methods and the side-effecting event handler requires handling several different scenarios. Let's walk through each of the possible cases to better understand how the pipeline works. The processPnl
is invoked to produce a new TradeState
and optionally yield a PnlEvent
. If no PnlEvent
is generated, then processHourlyPnl
is not invoked and the previous HourlyState
is returned.
If a PnlEvent
is generated, then processHourlyPnl
is evaluated to determine whether an HourlyPnlTrendCalculated
is created. When HourlyPnlTrendCalculated
is generated, then the side-effecting HourlyPnlTrendCalculated
event handler is invoked and the new HourlyState
is returned. If no HourlyPnlTrendCalculated
is generated, then the existing HourlyState
is returned.
We construct a simple example to prove that the pipeline works as intended, as follows:
val now = EventInstant(HourInstant.create(EventInstant( new Instant())).value) val Foo = Ticker("FOO") pipeline(PipelineState.empty, println, Stream( BuyOrderSubmitted(now, OrderId(1), Foo, Price(21.07), ClientId(1)), OrderExecuted(EventInstant(now.value.plus(Duration.standardMinutes(30))), OrderId(1), Price(21.00)), BuyOrderSubmitted(EventInstant(now.value.plus( Duration.standardMinutes(35))), OrderId(2), Foo, Price(24.02), ClientId(1)), OrderExecuted(EventInstant(now.value.plus(Duration.standardHours(1))), OrderId(2), Price(24.02))))
At the start of the hour, a buy order is submitted for the stock, FOO. Within the hour, the buy order is executed at a price lower than the buying price, indicating the trade was profitable. As we know, the current implementation relies on executions in the subsequent hour in order to produce HourlyPnlTrendCalculated
. To create this event, a second buy order is submitted at the start of the second hour. Running this snippet produces a single HourlyPnlTrendCalculated
event that is written to standard output:
HourlyPnlTrendCalculated(HourInstant(2016-02-15T20:00:00.000Z),ClientId(1),Ticker(FOO),LastHourPositive)
Although the wiring together of transformations is somewhat involved, we managed to build a simple event sourcing pipeline using only the Scala standard library and our existing knowledge of Scala collections. This example demonstrated the power of foldLeft
to help build an event sourcing pipeline. Using this implementation, we can write a fully-featured program that is able to write a pregenerated version of the performance report to a persistent data store that can be read by the web portal. This new design allows us to shift the burden of report generation outside the web portal's responsibilities, allowing the web portal to provide a responsive user experience. Another benefit of this new approach is how it puts a domain-oriented language at the center of the design. All our events use business terms and focus on modeling domain concepts, making it easier for developers and stakeholders to communicate with each other.
You might be wondering about a data structure that shares some characteristics of Stream
that we did not yet mention: Iterator
. As the name implies, Iterator
provides facilities to iterate over a sequence of data. Its simplified definition boils down to the following:
trait Iterator[A] { def next: A def hasNext: Boolean }
Like Stream
, an Iterator
is able to avoid loading an entire dataset into memory, which enables programs to be written with constant memory usage. Unlike Stream
, an Iterator
is mutable and intended for only a single iteration over a collection (it extends the TraversableOnce
trait). It should be noted that, according to the standard library documentation, one should never use an iterator after calling a method on it. For example, calling size
on an Iterator
returns the size of the sequence, but it also consumes the entire sequence and renders the instance of Iterator
useless. The only exceptions to this rule are next
and hasNext
. These properties lead to software that is difficult to reason with, which is the antithesis of what we strive for as functional programmers. For this reason, we omit an in-depth discussion about Iterator
.
We encourage you to further explore event sourcing by reading the documentation of the Event Store database at http://docs.geteventstore.com/introduction/event-sourcing-basics/. Event Store is a database that is developed around the concept of event sourcing. Event Store was created by Greg Young, a notable writer on the topic of event sourcing. While enriching your understanding about event sourcing, reflect on when you believe it is appropriate to apply the event sourcing technique. For CRUD applications that have simple behavior, event sourcing may not be a worthwhile time investment. When you model more complex behaviors or consider scenarios involving strict performance and scaling requirements, the time investment for event sourcing may become justified. For example, like we saw with performance trend reporting, considering the performance challenges from the event sourcing paradigm exposed an entirely different way of approaching the design.
As you continue exploring the world of stream processing, you will discover that you wish to construct more complex transformations than our event sourcing pipeline example. To continue digging deeper into the the topic of stream processing, we suggest researching two relevant libraries: akka streams
and functional streams
(formerly, scalaz-stream
). These libraries provide tools to build more sophisticated transformation pipelines using different abstractions than Stream
. In combination with learning about Event Store, you will deepen your understanding of how event sourcing ties in with stream processing.
With the simple program at the end of the previous section, we demonstrated that we can wire together a pipeline of transformations operating on events. As a well-intentioned engineer, you wish to develop automated tests that prove the pipeline works as intended. One approach is to add a sample of historical production data into the repository to build tests. This is often a good choice, but you are concerned that the sample is not large enough to represent a broad number of scenarios. Another option is to write a generator of events that can create production-like data. This approach requires more up-front effort, but it yields a more dynamic way to exercise the pipeline.
A recent lunchtime conversation with Dave about Markov chains sparked the thought about testing the event sourcing pipeline with generated data. Dave described how a Markov chain is a statistical model of state transitions that only relies on the current state to determine the next state. Dave is representing the states of the stock market as a Markov chain, allowing him to build trading strategies based on whether or not he perceives the stock market to be in an upswing, downswing, or steady state. After reading through the Markov chain Wikipedia page, you envision writing an event generator based on a Markov chain.
Our end goal is to be able to generate an infinite number of OrderBookEvent
s that follows production-like patterns. For example, we know from previous experience that proportionally there are often more cancelations than executions, particularly during volatile markets. The event generator should be able to represent different probabilities of events occurring. As a Markov chain only depends on its current state to identify its next state, a Stream
is a natural fit because we only need to inspect the current element to determine the next element. For our representation of a Markov chain, we need to identify the chance of transitioning from the current state to any of the other possible states. The following table illustrates one possible set of probabilities:
Current state |
Chance of buy |
Chance of sell |
Chance of execution |
Chance of cancel |
|
10% |
15% |
40% |
40% |
|
25% |
10% |
35% |
25% |
|
60% |
50% |
40% |
10% |
|
30% |
30% |
55% |
30% |
This table defines the likelihood of receiving an OrderBookEvent
given the current OrderBookEvent
. For example, given that a sell order was submitted, there is a 10% chance of seeing a second sell order next and a 35% chance that an execution occurs next. We can develop state transition probabilities according to the market conditions that we wish to simulate in the pipeline.
We can model the transitions using the following domain:
sealed trait Step case object GenerateBuy extends Step case object GenerateSell extends Step case object GenerateCancel extends Step case object GenerateExecution extends Step case class Weight(value: Int) extends AnyVal case class GeneratedWeight(value: Int) extends AnyVal case class StepTransitionWeights( buy: Weight, sell: Weight, cancel: Weight, execution: Weight)
In this domain, Step
is an ADT that models the possible states. For a given Step
, we will associate StepTransitionWeights
to define the probability of transitioning to different states based on provided weightings. GeneratedWeight
is a value class that defines the weight generated for the current Step
. We will use GeneratedWeight
to drive the transition from one Step
to the next Step
.
Our next step, so-to-speak, is to make use of our domain to generate events according to probabilities that we define. To make use of Step
, we define a representation of the Markov chain state that is required, as follows:
case class State( pendingOrders: Set[OrderId], step: Step)
The Markov chain requires knowledge of the current state, which is represented by step
. Additionally, we put a twist on the Markov chain by maintaining the set of orders that are submitted that are neither canceled nor executed in pendingOrders
. This additional state is needed for two reasons. First, generating cancel and execution events requires linking to a known order ID. Second, we constrain our representation of a Markov chain by requiring at least one pending order to exist before creating a cancel or an execution. If there are no pending orders, it is invalid to transition to a state that generates either OrderCanceled
or OrderExecuted
.
Using State
, we can write a method with the following signature to manage transitions:
def nextState( weight: StepTransitionWeights => GeneratedWeight, stepToWeights: Map[Step, StepTransitionWeights], s: State): (State, OrderBookEvent)
Given a way to generate a weight from the current StepTransitionWeights
, a mapping of Step
to StepTransitionWeights
, and the current State
, we are able to produce a new State
and an OrderBookEvent
. For brevity, we omit the implementation of nextState
because we want to focus most intently on stream processing. From the signature, we have enough insight to apply the method, but we encourage you to inspect the repository to fill in any blanks in your understanding.
The nextState
method is the driver of state transitions in our Markov chain representation. We can now generate an infinite Stream
of OrderBookEvent
s based on transition probabilities using the convenience Stream
method, iterate
. From the Scala documentation, iterate
produces an infinite stream by repeatedly applying a function to the start value. Let's see how we can use iterate
:
val stepToWeights = Map[Step, StepTransitionWeights]( GenerateBuy -> StepTransitionWeights( Weight(10), Weight(25), Weight(40), Weight(40)), GenerateSell -> StepTransitionWeights( Weight(25), Weight(10), Weight(40), Weight(25)), GenerateCancel -> StepTransitionWeights( Weight(60), Weight(50), Weight(40), Weight(10)), GenerateExecution -> StepTransitionWeights( Weight(30), Weight(30), Weight(60), Weight(25))) val next = State.nextState( t => GeneratedWeight(Random.nextInt(t.weightSum.value) + 1), stepToWeights, _: State) println("State Event") Stream.iterate(State.initialBuy) { case (s, e) => next(s) } .take(5) .foreach { case (s, e) => println(s"$s $e") }
This snippet creates a Markov chain to generate various OrderBookEvent
s by providing a mapping of Step
to StepTransitionWeights
as the basis to invoke State.nextState
. State.nextState
is partially applied, leaving the current state unapplied. The next
function has the State => (State, OrderBookEvent)
signature. With the necessary scaffolding in place, Stream.iterate
is used to generate an infinite sequence of multiple OrderBookEvent
s by invoking next
. Similar to foldLeft
, we provide an initial value to begin the initialBuy
iteration, which is defined as follows:
val initialBuy: (State, OrderBookEvent) = { val e = randomBuySubmitted() State(Set(e.id), GenerateBuy) -> e }
Running this snippet produces output that is similar to the following:
State = State(Set(OrderId(1612147067584751204)),GenerateBuy)
Event = BuyOrderSubmitted(EventInstant(2016-02-22T23:52:40.662Z),OrderId(1612147067584751204),Ticker(FOO),Price(32),ClientId(28))
State = State(Set(OrderId(1612147067584751204), OrderId(7606120383704417020)),GenerateBuy)
Event = BuyOrderSubmitted(EventInstant(2016-02-22T23:52:40.722Z),OrderId(7606120383704417020),Ticker(XYZ),Price(18),ClientId(54))
State = State(Set(OrderId(1612147067584751204), OrderId(7606120383704417020), OrderId(5522110701609898973)),GenerateBuy)
Event = BuyOrderSubmitted(EventInstant(2016-02-22T23:52:40.723Z),OrderId(5522110701609898973),Ticker(XYZ),Price(62),ClientId(28))
State = State(Set(OrderId(7606120383704417020), OrderId(5522110701609898973)),GenerateExecution)
Event = OrderExecuted(EventInstant(2016-02-22T23:52:40.725Z),OrderId(1612147067584751204),Price(21))
State = State(Set(OrderId(7606120383704417020), OrderId(5522110701609898973), OrderId(5898687547952369568)),GenerateSell)
Event = SellOrderSubmitted(EventInstant(2016-02-22T23:52:40.725Z),OrderId(5898687547952369568),Ticker(BAR),Price(76),ClientId(45))
Of course, each invocation differs depending upon the random values that are created for GeneratedWeight
, which is used to probabilistically select the next transition. This snippet provides a base to compose larger-scale tests for the reporting infrastructure. Through this example, we see an interesting application of Markov chains to support generating representative events from various market conditions without requiring access to volumes of production data. We are now able to write tests to confirm whether or not the reporting infrastructure correctly computes PnL trends in different market conditions.
For all their goodness, Stream
should be used with caution. In this section, we mention a few of the main caveats of Stream
, and how to avoid them.
While views do not cache the result of a computation and, therefore, recalculate and realize each element each time it is accessed, Stream
does save the final form of its elements. An element is only ever realized once, the first time it is accessed. While this is a great characteristic to avoid computing the same result several times, it can also lead to a large consumption of memory, to the point where your program may eventually run out of memory.
To avoid Stream
memoization, it is good practice to avoid storing a Stream
in a val
. Using a val
creates a permanent reference to the head of the Stream
, ensuring that every element that is realized will be cached. If a Stream
is defined as a def
, it can be garbage collected as soon as it is no longer needed.
Memoization can happen when calling certain methods that are defined on Stream
. For example, drop
or dropWhile
will evaluate and memoize all the intermediate elements to be dropped. The elements are memoized as the methods are defined on an instance of Stream
(and Stream
has a reference on its own head). We can implement our own drop
function to avoid caching the intermediate elements in memory:
@tailrec def drop[A](s: Stream[A], count: Int): Stream[A] = count match { case 0 => s case n if n > 0 => drop(s.tail, count - 1) case n if n < 0 => throw new Exception("cannot drop negative count") }
We pattern match on the value of count
to know whether we can return the given Stream
or need to perform a recursive call on the tail. Our method is tail-recursive. This makes sure that we do not keep a reference to the head of the Stream
, since a tail-recursive function recycles its reference each time that it loops. Our s
reference will only point to the remaining part of the Stream
, not the head.
Another example of a problematic method is max
. Calling max
will memoize all the elements of the Stream
to determine which one is the greatest. Let's implement a safe version of max
, as follows:
def max(s: => Stream[Int]): Option[Int] = { @tailrec def loop(ss: Stream[Int], current: Option[Int]): Option[Int] = ss match { case Stream.Empty => current case h #:: rest if current.exists(_ >= h) => loop(rest, current) case h #:: rest => loop(rest, Some(h)) } loop(s, None) }
This time, we used an internal tail recursive function to be able to expose a friendly API. We represent the current max value as an Option[Int]
to handle the case where the method is called with an empty Stream
. Note that max
accepts s
as a by-name parameter. This is important because, otherwise, we would be keeping a reference to the head of the Stream
before calling the internal tail-recursive loop
method. Another possible implementation is as follows:
def max(s: => Stream[Int]): Option[Int] = { @tailrec def loop(ss: Stream[Int], current: Int): Int = ss match { case Stream.Empty => current case h #:: hs if h > current => loop(hs, h) case h #:: hs if h <= current => loop(hs, current) } s match { case Stream.Empty => None case h #:: rest => Some(loop(rest, h)) } }
This implementation is arguably simpler. We check in the max
function whether the Stream
is empty or not; this allows us to either return right away (with None
), or call loop
with a valid default value (the first element in the Stream
). The loop
does not have to deal with Option[Int]
anymore. However, this example does not achieve the goal of avoiding memoization. The pattern matching will cause rest
to keep a reference on the entire tail of the original Stream
, which will prevent garbage collection of the intermediate elements. A good practice is to only pattern match on a Stream
inside a consuming, tail-recursive method.
We saw during our overview that it is possible to define an infinite Stream
. However, you need to be careful when working with an infinite Stream
. Some methods may cause the evaluation of the entire Stream
, leading to OutOfMemoryError
. Some are obvious, such as toList
, which will try to store the entire Stream
into a List
, causing the realization of all the elements. Others are more subtle. For example, Stream
has a size
method that is similar to the one defined on List
. Calling size
on an infinite Stream
will cause the program to run out of memory. Similarly, max
and sum
will attempt to realize the entire sequence and crash your system. This behavior is particularly dangerous as Stream
extends Seq
, the base trait for sequences. Consider the following code:
def range(s: Seq[Int]): Int = s.max - s.min
This short method takes a Seq[Int]
as single parameter, and returns its range, that is, the difference between the greatest and lowest elements. As Stream
extends Seq
the following call is valid:
val s: Stream[Int] = ??? range(s)
The compiler will happily and promptly generate the bytecode for this snippet. However, s
could be defined as an infinite Stream
:
val s: Stream[Int] = powerOf2(0) range(s) java.lang.OutOfMemoryError: GC overhead limit exceeded at .powerOf2(<console>:10) at $anonfun$powerOf2$1.apply(<console>:10) at $anonfun$powerOf2$1.apply(<console>:10) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223) at scala.collection.immutable.Stream.reduceLeft(Stream.scala:627) at scala.collection.TraversableOnce$class.max(TraversableOnce.scala:229) at scala.collection.AbstractTraversable.max(Traversable.scala:104) at .range(<console>:10) ... 23 elided
The call to range
never returns due to the implementation of max
and min
. This example illustrates a good practice that we mentioned earlier in this chapter.