Chapter 7. Architecting for Performance

We have come a long way in our exploration of Scala and various techniques to write performant code. In this final chapter, we look at more open-ended topics. The final topics are largely applicable beyond Scala and the JVM. We dive into various tools and practices to improve the architecture and the design of an application. In this chapter, we explore the following topics:

  • Conflict-free replicated data types (CRDTs)
  • Throughput and latency impact of queueing
  • The Free monad

Distributed automated traders

Thanks to our hard work, MVT is thriving. The sales department is signing contracts like there is no tomorrow, and the sales bell is ringing from sunrise to sunset. The order book is able to handle more orders, and as a result of the increase in traffic, another product offered by MVT is incurring performance issues: the automated trading system. The automated trader receives orders from the order book and applies various trading strategies in real time to automatically place orders on behalf of the customers. As the order book is processing an order of magnitude of more trade orders, the automated trading system is unable to keep up, and, therefore, cannot efficiently apply its strategies. Several big customers recently lost a lot of money due to bad decisions made by the algorithm and the high latency of execution. The engineering team needs to solve this performance issue. Alice, your technical lead, has tasked you with finding a solution and preventing the company from losing newly-acquired customers.

In the previous chapter, we studied and took advantage of concurrency. We learned how to design code to leverage the power of multicore hardware. The automated trader is already optimized to run concurrent code and utilize all the CPU resources on the machine. The truth is, there is only so much one machine can handle, even with several cores. To scale the system and keep up with the traffic coming from the order book, we will have to start implementing a distributed system.

A glimpse into distributed architectures

Distributed computing is a rich topic, and we cannot pretend to address it entirely in a single chapter. This short section gives a brief and incomplete description of distributed computing. We will try to give you an overview of the paradigm and point to some of the main benefits and challenges of distributed systems.

The idea behind distributed computing is to design a system involving several components, which runs on different machines and communicates with each other (for example, over a network) to achieve a task or provide a service. A distributed system can involve components of different natures, each component providing a specific service and participating in the realization of the task. For example, a web server can be deployed to receive HTTP requests. To service a request, the web server may communicate over the network to query an authentication service to validate credentials and a database server in order to store and retrieve data and complete the request. Together, the web server, the authentication service, and the database form a distributed system.

A distributed system can also involve several instances of the same component. These instances form a cluster of nodes, and they can be used to divide the work among them. This topology allows a system to scale out and support a higher load by adding more instances to the cluster. As an example, if a web server is able to handle 20,000 requests per second, it may be possible to run a cluster of three identical servers to handle 60,000 requests per second (assuming that your architecture allows your application to scale linearly). Distributed clusters also help achieve high availability. If one of the nodes crashes, the others are still up and able to fulfill requests while the crashed instance is restarted or recovers. As there is no single-point of failure, there is no interruption of service.

For all their benefits, distributed systems come with their drawbacks and challenges. The communication between components is subject to failure and network disruptions. The application needs to implement a retry mechanism and error handling, and then deal with lost messages. Another challenge is managing shared state. For example, if all the nodes use a single database server to save and retrieve information, the database has to implement some form of a locking mechanism to ensure that concurrent modifications do not collide. It is also possible that once the cluster node count grows sufficiently large, the database will not be able to serve them all efficiently and will become a bottleneck.

Now that you have been briefly introduced to distributed systems, we will go back to MVT. The team has decided to turn the automated trader into a distributed application to be able to scale the platform. You have been tasked with the design of the system. Time to go to the whiteboard.

The first attempt at a distributed automated trader

Your first strategy is simple. You plan to deploy several instances of the automated trader to form a cluster of nodes. These nodes can share the work and handle each part of the incoming orders. A load balancer in front of the cluster can distribute the load evenly among the nodes. This new architecture helps scale out the automated trader. However, you are facing a common problem with distributed systems: the nodes have to share a common state to operate. To understand this requirement, we explore one of the features of the automated trader. To be able to use MVT's automated trading system, customers have to open an account with MVT and provision it with enough money to cover their trades. This is used as a safety net by MVT to execute orders on behalf of its clients without running the risk of a customer being unable to honor their transactions. To ensure that the automated strategies do not overspend, the automated trader keeps track of the current balance of each customer and checks the balance of a customer before placing an automated order on their behalf.

Your plan consists of deploying several instances of the automated trading system. Each instance receives a portion of the orders processed by the order book, runs a strategy and places matching order on behalf of a customer. Now that the system consists of several identical instances running in parallel, each instance can place orders on behalf of the same customer. To be able to perform the balance validation, they all need to be aware of the current balance of all customers. Customer balances become a shared state that has to be synchronized in the cluster. To solve this problem, you envision a balance monitor server deployed as an independent component and holding the state of each customer's balance. When a trade order is received by a node of the automated trading cluster, the node interrogates the balance monitor server to verify that a customer's account has enough funds to place an automated trade. Similarly, when a trade is executed, a node instructs the balance monitor server to update the balance of the customer.

The first attempt at a distributed automated trader

The preceding diagram describes various interactions between the components of your architecture. Automated Trader 1 receives an incoming trade and queries the balance monitor server to check whether the client has enough funds to perform a trade. The balance monitor server either authorizes or rejects the order. At the same time, Automated Trader 3 sends an order that was previously approved by the balance monitor server and updates the client's balance.

Note

You probably spotted a flaw in this design. It is possible to run into a race condition where two different instances of the automated trader may validate the balance of the same customer, receive an authorization from the Balance Monitor Server, place both trades in parallel and go over the limit of the client's account. This is comparable to a race condition that you can encounter with a concurrent system running on a single machine. In practice, the risk is low and is accepted by companies that are similar to MVT. The limit used to cut-off a client is usually set lower than the actual balance to account for this risk. Designing a platform to handle this case would increase the latency of the system because we would have to introduce more drastic synchronization across the nodes. This is a good example of business and technical domains working together to optimize the solution.

At the end of this design session, you take a short walk to clear your mind while drinking a bottle of carbonated water. As you return to the whiteboard, the crude reality hits you. Like a flat bottle of carbonated water, your idea has fizzled out. You realize that all these arrows linking rectangles are in reality messages that are traveling over the network. Currently, while a single automated trader relies on its internal state to execute strategies and place orders, this new design requires the automated trader to query an external system over the network and wait for the answer. This query happens on the critical path. This is another common issue with distributed systems: components with focused roles need to communicate with each other to accomplish their tasks. This communication comes at a cost. It involves serialization, I/O operations, and transfer over a network. You share your reflections with Alice, who confirms that this is a problem. The automated trader has to keep the internal latency as low as possible for its decisions to be relevant. After a short discussion, you agree that it would endanger performance for the automated trader to perform a remote call on the critical path. You are now left with the task of implementing a distributed system with components sharing a common state without communicating with each other on the critical path. This is where we can start talking about CRDTs.

Introducing CRDTs

CRDT stands for Conflict-free Replicated Data Types. CRDTs were formally defined by Marc Shapiro and Nuno PreguiƧa in their paper, Designing a commutative replicated data type (refer to https://hal.inria.fr/inria-00177693/document). A CRDT is a data structure that is specifically designed to ensure eventual consistency across multiple components without the need for synchronization. Eventual consistency is a well-known concept in distributed system, which is not exclusive to CRDTs. This model guarantees that eventually, if a piece of data is no longer modified, all nodes in a cluster will end up with the same value for this piece of data. Nodes send each other update notifications to keep their state synchronized. The difference with strong consistency is that at a given time, some nodes may see a slightly outdated state until they receive the update notice:

Introducing CRDTs

The preceding diagram shows an example of eventual consistency. All the nodes of the cluster hold the same piece of data (A = 0). Node 1 receives an update to set the value of A to 1. After updating its internal state, it broadcasts the update to the rest of the cluster. The messages reach their targets at different instants, which means that until we reach step 4, A has a different value depending on the node. If a client queries node 4 for the value of A at step 3, they receive an older value as the change has not yet been reflected in node 4.

A problem that may arise with eventual consistency is the resolution of conflicts. Imagine a simple example where nodes in a cluster share the state of an array of integers. The following table describes a sequence of events involving updating the state of this array:

Instant

Event

State change

T0

Initialization of the cluster

Nodes 1 and 2 hold the same value for the array of integers: [1,2,3]

T1

Node 1 receives a request to update the value at index 1 from 2 to 4

Node 1 updates its internal state to [1,4,3] and sends an update message to node 2

T2

Node 2 receives a request to update the value at index 1 from 2 to 5

Node 2 updates its internal state to [1,5,3] and sends an update message to node 1

T3

Node 1 receives the update from node 2

Node 1 needs to decide whether it should ignore or take into account the update message

Our cluster now needs to resolve the conflict. Should node 1 update its state when receiving the update from node 2? If node 2 does the same, we end up with two nodes holding a different state. What about the other nodes? Some may receive the broadcast from node 2 before the one from node 1 and vice versa.

Various strategies exist to deal with this problem. Some protocols use timestamps or vector clocks to determine which update was performed later in time and should take precedence. Others simply assume that the last writer wins. This is not a simple problem and CRDTs are designed to completely avoid conflicts altogether. Actually, CRDTs are defined to make conflicts mathematically impossible. To be defined as a CRDT, a data structure has to support only commutative updates. That is, regardless of the ordering in which the update operations are applied, the end state must always be the same. This is the secret of eventual consistency without merge conflict. When a system uses CRDTs, all the nodes can send each other update messages without a need for strict synchronization. The messages can be received in any order, and all the local states will converge to the same value eventually.

Introducing CRDTs

In the preceding diagram, we see that node 3 and node 1 receive two different changes. They send this update information to all the other nodes. Note that we are not concerned with the order in which the updates are received by the other nodes. As the updates are commutative, their order has no impact on the final state that will be computed by each node. They are guaranteed to hold the same piece of data once all of them have received all the update broadcasts.

There exist two types of CRDT:

  • Operation-based
  • State-based

They are equivalent in that it is always possible to define a state-based CRDT for each operation-based CRDT and vice-versa. However, their implementations differ and provide different guarantees in terms of error-recovery and performance. We define each type and consider its characteristics. As an example, we implement each version of the simplest CRDT: an increase-only counter.

The state-based increase-only counter

With this model, when a CRDT receives an operation to perform from a client, it updates its state accordingly and sends an update message to all the other CRDTs in the cluster. This update message contains the full state of the CRDT. When the other CRDTs receive this message, they perform a merge of their state with the received new state. This merge operation has to guarantee that the end state will always be the same. It has to be commutative, associative, and idempotent. Let's look at a possible implementation of this data type:

case class CounterUpdate(i: Int) 
case class GCounterState(uid: Int, counter: Int) 
 
class StateBasedGCounter( 
 uid: Int, count: Int, otherCounters: Map[Int, Int]) { 
 
 def value: Int = count + otherCounters.values.sum 
 
 def update( 
   change: CounterUpdate): (StateBasedGCounter, GCounterState) = 
   (new StateBasedGCounter(uid, count + change.i, otherCounters), 
     GCounterState(uid, count)) 
 
 def merge(other: GCounterState): StateBasedGCounter = { 
   val newValue = other.counter max otherCounters.getOrElse(other.uid,0) 
   new StateBasedGCounter(uid, count, otherCounters.+(other.uid -> newValue) ) 
 } 
} 

The update method can be used by clients to increase the value of the counter. This returns a new state-based counter containing an updated count, and it generates a CounterState object that can be sent to all the other CRDTs in the cluster. The merge is used to handle these CounterState messages and merge the new state of the other counters with the local state. A counter has a unique ID in the cluster. The internal state is composed of the local state (that is, count) and the states of all the other counters in the cluster. We keep these counters in a map that we update in the merge method when receiving state information from a different counter. Merging is a simple operation. We compare the incoming value with the one that we have in the map and keep the greatest one. This is to ensure that if we receive two update messages in the wrong order, we do not override the latest state (that is, the greatest number) with an older update message that was delayed.

The operation-based increase-only counter

Operation-based CRDTs are similar to state-based CRDTs with the difference that update messages only contain a description of the operation that was just performed. These CRDTs do not send their full-state in an update message, but they are merely a copy of the operation that they just performed to update their own state. This ensures that all the other CRDTs in the cluster perform the same operation and maintain their state in sync. The updates can be received in a different order by each node of the cluster. To guarantee that the end state is the same for all the nodes, the updates have to be commutative. You can see an example of this data structure, as follows:

class OperationBasedCounter(count: Int) { 
 
 def value: Int = count 
 
 def update(change: CounterUpdate): (OperationBasedCounter, CounterUpdate) 
 = 
   new OperationBasedCounter(count + change.i) -> change 
 
 def merge(operation: CounterUpdate): OperationBasedCounter = 
   update(operation)._1 
} 

This implementation is shorter than the state-based example. The update method still returns an updated instance of the counter, and the CounterUpdate object that was applied. For an operation-based counter, it is enough to broadcast the operation that was applied. This update is received by the merge method of the other instances to apply the same operation to their own internal state. Note that update and merge are equivalent, merge is even implemented in terms of update. In this model, there is no need for a unique ID per counter.

Operation-based CRDTs use potentially smaller messages because they only send each discrete operation as opposed to their full internal state. In our example, the state-based update contains two integers, as opposed to only one for the operation-based update. Smaller messages can help reduce bandwidth usage and improve the throughput of your system. However, they are sensitive to communication failures. If an update message is lost during the transmission and does not reach a node, this node will be out of sync with the rest of the cluster with no way of recovering. If you decide to use operation-based CRDTs, you have to be able to trust your communication protocol and be confident that all update messages reach their destination and are properly processed. State-based CRDTs do not suffer from this issue because they always send their entire state in an update message. If a message is lost and does not reach a node, this node will only be out of sync until it receives the next update message. It is possible to make this model even more robust by implementing a periodic broadcast of the node's state, even when no updates are performed. This would force all nodes to regularly send their current state and ensure that the cluster is always eventually consistent.

CRDTs and automated traders

Based on the requirements of our system, it seems that CRDTs are a good fit for our implementation. Each node can keep the current state of each customer's balance in memory as a counter, update it when placing orders, and broadcast update messages to the rest of the system. This broadcast can be done outside the critical path, and we do not have to worry about handling conflicts, as this is what CRDTs are designed for. Eventually, all nodes will have in memory the same value for each balance, and they will be able to locally check for trade authorization. The balance monitor server can be removed entirely.

To implement the state of the balance as a CRDT, we need a more sophisticated counter than the one we previously explored. The balance cannot be represented as an increase-only counter because, occasionally, orders are canceled and the system must credit the customer's account. The counter has to be able to handle both increment and decrement operations. Luckily, such a counter exists. Let's look at a simple implementation of a state-based counter:

case class PNCounterState(incState: GCounterState, decState: GCounterState) 
 
class StateBasedPNCounter private( 
 incCounter: StateBasedGCounter, 
 decCounter: StateBasedGCounter) { 
 
 def value = incCounter.value - decCounter.value 
 
 def update(change: CounterUpdate): (StateBasedPNCounter, PNCounterState) = { 
   val (newIncCounter, newDecCounter, stateUpdate) = 
     change match { 
       case CounterUpdate(c) if c >= 0 => 
         val (iC, iState) = incCounter.update(change) 
         val dState = GCounterState(decCounter.uid, decCounter.value) 
         (iC, decCounter, PNCounterState(iState, dState)) 
       case CounterUpdate(c) if c < 0 => 
         val (dC, dState) = decCounter.update(change) 
         val iState = GCounterState(incCounter.uid, incCounter.value) 
         (incCounter, dC, PNCounterState(iState, dState)) 
     } 
 
   (new StateBasedPNCounter(newIncCounter, newDecCounter), stateUpdate) 
 } 
 
 def merge(other: PNCounterState): StateBasedPNCounter = 
   new StateBasedPNCounter( 
     incCounter.merge(other.incState), 
     decCounter.merge(other.decState) 
   ) 
} 

The PN counter leverages our previous implementation of an increase-only counter to provide the decrement capability. To be able to represent a counter as a state-based CRDT, we need to keep track of the state of both increment and decrement operations. This is necessary to guarantee that we do not lose information if our update messages are received in the wrong order by other nodes.

Tip

Remember that the increase-only counter guarantees conflict resolution by assuming that the highest value of the counter is necessarily the most up-to-date. This invariant does not hold true for the PN counter.

This implementation shows you another interesting property of CRDTs: simple and basic structures can be composed to create more complex and feature-rich CRDTs. Should we proceed to demonstrate the implementation of an operation-based counter? As it turns out and we are sure you spotted this earlier, our previous increase-only counter already supports decrement operations. Applying a positive or a negative delta is handled by the operation-based counter.

When the balance is not enough

You have finished the implementation of the proof-of-concept and call Alice to get some feedback. She spends a few minutes studying your new design and your code. "Looks good to me. Do not forget to synchronize the account blacklist as well." What is she talking about? "Checking the account balance is only one of the criteria to allow or block an automated trade. Other attributes of the client need to be taken into consideration. Today, the automated trader runs a trust algorithm in the background, and it calculates a score for each customer. If the score falls below a certain threshold, the account is blacklisted until the end of the trading day, and all automated orders are denied. I like your design, but you need to incorporate this blacklist into the new system." Faced with this new challenge, you think that the best solution would be to implement the blacklist as a CRDT as well, provided that it fits your current design.

A new CRDT - the grow-only set

One CRDT is designed to handle our new use case. The grow-only set data type implements a set that only supports the addition of new elements without duplicates. We can implement the blacklist as a grow-only set. Each node can run its own trust algorithm and can decide whether a client should be blacklisted and denied automated trading for the rest of the day. At the end of the day, the system can clear the set. We display a possible implementation of a state-based grow-only set, as follows:

case class AddElement[A](a: A) 
case class GSetState[A](set: Set[A]) 
 
class StateBasedGSet[A](val value: Set[A]) { 
 
 def contains(a: A): Boolean = value.contains(a) 
 
 def update(a: AddElement[A]): (StateBasedGSet[A], GSetState[A]) = { 
   val newSet = new StateBasedGSet(value + a.a) 
   (newSet, GSetState(newSet.value)) 
 } 
 
 def merge(other: GSetState[A]): StateBasedGSet[A] = { 
   new StateBasedGSet(value ++ other.set) 
 } 
 
} 

Our implementation supports adding an element by calling the update method. It returns a new instance of StateBasedGSet with an updated set, as well as a GSetState instance to be broadcast to the other nodes. This update contains the entire state of the counter, that is, the internal set. An operation-based implementation is trivial and left as an exercise for the reader (a possible solution is provided in the code repository). Similar to the increment-decrement counter explored earlier, it is possible to create a set that supports both adding and removing an element. There is one caveat though: as adding and removing an element are not commutative operations, one must take precedence on the other. In practice, a 2P-set can be created to support adding and removing items, but once removed, an element cannot be added again. The remove operation takes precedence and guarantees that the operations are commutative and can be handled without conflicts. A possible implementation is to combine two grow-only sets, one for adding elements, and the other to remove them. Again, we see the power of simple CRDTs that can be combined to create more powerful data types.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset