Chapter 6. Consistency and Scalability

Consistency is a complex attribute of a system, and the need for it varies depending on the system in question. It is very frequently misunderstood, so let’s first look at what we mean by consistency before we examine ways to control it.

A system can be said to be consistent if different pieces of related data throughout the system are in agreement with one another about the state of the world.

For a simple example, if I calculate the total amount of deposits I’ve made to a bank account as well as the total number of withdrawals, the system would be consistent if it reports a bank balance that agrees with the delta between these two numbers.

On a classic single-processor, single-memory-space von Neumann system, this kind of consistency is not difficult to achieve, in general. Or course, this is assuming that our system doesn’t have any bugs.

As soon as a system has any aspect of parallelism, however, consistency becomes a bit more difficult. If we introduce multiple cores, we have some parallelism. If we make the system distributed, the situation becomes even more complex.

As we explained in Chapter 1, the real world doesn’t actually have a globally consistent state, so perhaps it’s acceptable if our software systems don’t either.

In this chapter, we talk about what a transaction means in the context of a distributed system, what the tradeoffs are on consistency, and why global consistency is not the friend of a scalable system. We’ll talk about message delivery guarantees and how to avoid the enemies of scalability.

Transactions and Consistency

A transaction in the sense of consistency is considered a single atomic change. This means that the values involved in the change are all updated at once or not at all—there is no point at which some values are changed and some others are not, at least not externally. Of course, in a distributed system this can be a challenge.

The classic example of this is a database transaction, during which several tables are updated in a single inviolable unit, but there are many other examples.

As we discuss consistency, we’ll see how it applies in Akka’s situation.

Strong Versus Eventual Consistency

Data updated via a transaction, in a single atomic instant, is considered to be immediately consistent; that is, the overall system goes from one point at which it is fully consistent to another state in which it is also fully consistent (but with the change). There is no period of time when different parts of the system might have a differing view on the state of the world.

Eventual consistency, on the other hand, implies an update that happens over a span of time, no matter how short that span might be, whereby two different parts of the system might, for a moment, have different opinions on the state of the world. They must come into agreement at some point in the future, but that’s the “eventually” part. Consistency is attained, just not instantaneously.

Strangely enough, eventual consistency is usually much easier to achieve than immediate consistency, especially in a distributed environment.

Concurrency Versus Parallelism

Concurrency means that two processes start at some points in time, and their progress overlaps at other points during their processing. This could mean that we start one process, then stop, then start the other, work for a while, stop and start the first again, and so forth. It does not necessarily require the processes to be happening at the same time, just to overlap.

Parallelism, however, introduces the idea of simultaneous progress between two processes; in other words, at least a part of the two (or more) processes are happening in exactly the same time interval. Parallelism implies concurrency, but not the reverse: you can be concurrent without being parallel.

A distributed system is, by definition, parallel, whereas even a single-core, single-processor system can be concurrent by switching which task it’s working on over time until both are complete.

Why Globally Consistent Distributed State Doesn’t Scale

Aside from the violation of the law of causality in the real world, globally consistent distributed state is very difficult to even approach, much less achieve.

Setting aside whether it is even desirable, let’s assume that we want globally consistent distributed state. When we update that state, we need all nodes that are influenced by it (e.g., they have a copy of the data or some data derived from it, such as our bank balance) to have exactly the same opinion as to the state of the world at the moment the update is done.

This effectively means that we need a “stop the world” situation when state is updated, during which some relevant portion of each system is examined and potentially updated to the correct state, and then we can resume processing on all the nodes.

What if one node becomes unavailable during this process? How do we even agree on “now” between nodes (and, no, wall-clock time won’t do it). These are difficult problems, that in and of themselves could take a separate book to explore.

The mechanisms to do this in a distributed environment begin to consume a significant part of the resources of the system as a whole, and this gets worse as the size of the distributed group grows. At the same time, during the periods when we are updating state, each node must avoid computing with that state until they all agree on its new value, severely limiting our ability to do processing in parallel, and thus our scalability.

Location Transparency

Location transparency is the feature of a system by which we can perform computations without being concerned with the location (e.g., the node) at which the computation occurs.

In Akka, the flow of messages between actors should be, to the developer, location transparent; that is, we should not care when we send a message whether the recipient is on the same node as the sender. The system itself moves messages across the network as needed to the correct location, without our direct involvement.

Delivery Guarantees

Delivery guarantees are an often-overlooked part of building distributed systems. We frequently consider them when using some communication mechanisms, but we ignore them when using others. When sending a message via an event bus we are careful to consider the delivery guarantee provided by that message bus, but when we send via an HTTP request, we often don’t think too much about it. We assume that the message will be delivered, and if it is delivered we will get a successful response. No response means no delivery. And yet, that is not the case.

The delivery guarantees provided by a system are important because they can have a dramatic impact on the consistency of that system. There are three basic types of delivery guarantees that can be defined, but in reality only two of them are actually achievable. The third can be approximated only by using various techniques, which we’ll explore later—in the meantime, let’s look at the other two.

At Most Once

At Most Once delivery is the simplest guarantee to achieve. It requires no storage, either in memory or on disk, of the messages on either end of the pipeline. This delivery guarantee means that when you send a message, you have no idea whether it will arrive, but that’s OK. With this delivery guarantee you must accept the possibility that the message might be lost and thus design around that.

At Most Once delivery simply means that you send the message and then move on. There is no waiting for an acknowledgment. This is the default delivery guarantee in Akka. When a message is sent to an actor using Akka, there is no guarantee that the message will be received. If the actor is local, you can probably assume that it will be delivered, as long as the system doesn’t fail, but if the system does fail before the actor can remove the message from the mailbox, that message will be lost. Remember, message processing is asynchronous, so just because the message has been sent does not mean it has been delivered.

Things become even more complicated if the actor is a remote or clustered actor. In this case, you need to deal with the possibility of the remote actor system crashing and losing the message, but you must also consider the possibility that the message might be lost due to a network problem. Regardless of whether the actor is local or remote, the message might not be delivered. If the delivery of the message is critical, you will need to try to work toward an alternative delivery guarantee.

At Least Once

At Least Once delivery is more difficult to achieve in any system. It requires storage of the message on the sender as well as an acknowledgment from the receiver. This storage might be in memory or it might be on disk depending on how important that message delivery is. If the delivery absolutely must succeed, you need to ensure that it is stored somewhere reliable; otherwise you could lose it if the sender fails.

Whether you are storing in memory or on disk, the basic process is to store the message, and then send it and wait for an acknowledgment. If no acknowledgment is received, you resend the message, and continue to do so until you successfully receive an acknowledgment.

With an At Least Once delivery guarantee, there is the possibility of receiving the message twice. If the acknowledgment is lost, when you send the message again, you will have a duplicate delivery. However, because you continue to send until you receive the acknowledgment, you are always guaranteed to get the message at least once, eventually. This delivery mechanism is reliable.

In Akka, you can implement At Least Once delivery a few ways. The first way is to do it manually. In this case, you simply store the message in a queue somewhere, send it, and then expect a response. When the response comes back, you remove the message from the queue. You also need a recovery mechanism so that if the response is never received, you can resend the message.

You can implement this easily in memory by using the Ask pattern. In this case, it might look something like the following:

class MySender(receiver: ActorRef) extends Actor {
  import context.dispatcher
  implicit val askTimeout = Timeout(5.seconds)

  sendMessage(Message("Hello"))

  private def sendMessage(message: Message):Future[Ack] = {
    (receiver ? message).mapTo[Ack].recoverWith {
    case ex: AskTimeoutException => sendMessage(message)
    }
  }

  override def receive: Receive = Actor.emptyBehavior
}

This is a very trivial example—you send a message, and then in the event of an Ask​Ti⁠meoutException, you try resending it. Of course, this type of delivery is only reliable as long as the sender doesn’t crash. If it does, this is not going give you the At Least Once delivery guarantee.

You could adapt the solution that we just described, introducing some database or reliable disk storage. However, it turns out that Akka Persistence has all of this logic built in and provides an AtLeastOnceDelivery trait that you can use for this exact purpose, as shown in the code that follows:

case class SendMessage(message: String)
case class MessageSent(message: String)

case class AcknowledgeDelivery(deliveryId: Long, message: String)
case class DeliveryAcknowledged(deliveryId: Long)

object MySender {
  def props(receiver: ActorRef) = Props(new MySender(receiver))
}

class MySender(receiver: ActorRef) extends PersistentActor
  with AtLeastOnceDelivery {
  override def persistenceId: String = "PersistenceId"

  override def receiveRecover: Receive = {
    case MessageSent(message) =>
      deliver(receiver.path)(deliveryId =>
        AcknowledgeDelivery(deliveryId, message))
    case DeliveryAcknowledged(deliveryId) =>
      confirmDelivery(deliveryId)
  }

  override def receiveCommand: Receive = {
    case SendMessage(message) => persist(MessageSent(message)) { request =>
      deliver(receiver.path)(deliveryId =>
        AcknowledgeDelivery(deliveryId, message))
    }
    case ack: DeliveryAcknowledged => persist(ack) { ack =>
      confirmDelivery(ack.deliveryId)
    }
  }
}

This is a simple implementation of an actor that uses the AtLeastOnceDelivery mechanism. Note that it extends PersistentActor with AtLeastOnceDelivery. This actor will receive a command in the form of SendMessage. When the actor receives this command, it sends the message to another actor, but here you want to guarantee delivery. Delivery is guaranteed by persisting the message prior to sending it, and then using the deliver method rather than the standard tell. When you use the deliver method, a delivery ID is generated, which you need to include in the outgoing message (in this case, AcknowledgeDelivery).

When the receiving actor gets the resulting message, including the delivery ID, it responds to the sender with another message that contains that same delivery ID (in this case, MessageAcknowledged):

sender() ! MessageAcknowledged(deliveryId)

The sender receives and persists the acknowledgment, and simultaneously confirms delivery, as shown in Figure 6-1.

In Figure 6-1, you can see how the sender stores outbound messages and then checks them against DeliveryAcknowledged, at which point the outbound message is confirmed to be delivered and no longer needs to be stored.

At Least Once delivery
Figure 6-1. Message flow for At Least Once delivery

Under the hood, the actor has a configurable timer. If it does not receive the acknowledgment within the allotted time, it will automatically attempt redelivery.

So, what happens if the sender fails? In the event of a failure, the sender will replay the persisted messages and their corresponding acknowledgments when it is restarted. However, unlike with a standard persistent actor, there is a buffering mechanism in place. During the replay, the messages are not resent immediately. Instead, they are buffered. After all messages have been replayed, the sender can then go through the buffer and compare the delivery requests against the confirmations. It will cross off all the messages that have a corresponding acknowledgment. Then, the sender will resend any messages that are left.

This means that if the receiver fails, or the acknowledgment is lost, the timeout will ensure that the message is redelivered. If the sender fails before it can receive the acknowledgment, when it is restored it will see that no acknowledgment was received and will resend the message. This guarantees that the message will be delivered despite any failures.

Exactly Once Doesn’t Exist (But Can Be Approximated)

In many cases, the delivery guarantee that you want is an Exactly Once guarantee. That is, you want each message to be delivered to the destination exactly once, no more, no less. Unfortunately, this type of delivery is impossible to achieve in any system. Instead, you need to strike a balance that uses the delivery guarantees that you can achieve.

Let’s consider a very simple example. In our domain, there are multiple bounded contexts. There’s a bounded context responsible for updating information on the people in the system, and there’s the scheduling bounded context that is responsible for managing the allocation of those people. If you need to delete or deactivate a person (perhaps they have moved to another company), you will need to update both systems. Of course, you want both systems to agree on whether the person is in the system and what the person’s allocation looks like. The simple solution seems to be to update one system and have it send a message to the other to perform the appropriate action.

So, the people information system is updated to remove a specific person. That system then sends a message to the scheduling system to remove that person. It is important that this message is delivered, so you wait for an acknowledgment. But what if the acknowledgment never comes? What if there is a network partition while the people information system was trying to send the message? How do you recover? You could assume the message never made it through and send it again. But if the message did get through and it was the acknowledgment that was lost, resending the message would result in a duplicate delivery. On the other hand, if you assume that the message has been received and don’t resend it, you run the risk of never delivering the message. Any way you look at it, there is no way to guarantee that the message was delivered exactly once.

But Exactly Once delivery is often what you want. You need to find a way to simulate it.

How Do You Approximate Exactly Once Delivery?

The guarantee that many developers and designers think they want is Exactly Once; that is, a message is delivered reliably from the sender to the receiver exactly one time, with no duplications.

In practice, Exactly Once delivery is still impossible, but it is possible to achieve something close to Exactly Once processing, but only by means of an underlying mechanism that abstracts away the problem for us. Under the covers, that solution will be using a mechanism that provides At Least Once delivery to make it appear at the high level that you have Exactly Once processing, at least from the programmer’s perspective.

The developer can then send a message via this Exactly Once abstraction and rely on the underlying mechanism to transmit the message, receive an acknowledgment, retransmit if needed, and so forth until the message has been confirmed as received, deduplicated for any repetition on the receiving side, and finally, delivered to the recipient. Of course, this process can affect performance, especially if many retransmissions must occur before the confirmation can be made. It also definitely requires persistence, at least on the sender’s side, and often on both the sender and receiver’s side.

It is often desirable to have more visibility to the underlying mechanism, so it is fairly uncommon in Akka systems to see an abstraction to simulate Exactly Once.

Cluster Singleton

Sometimes, there are areas within a system that by necessity must be unique. Creating multiple copies is unacceptable because these areas of the system might have state that must be very tightly controlled. An example of this would be a unique ID generator. A unique ID generator might need to keep track of previously used IDs. Or, it might use a monotonically increasing number value. In either case, it might be critical that you ensure that only a single call to generate an ID happens at any given time. Trying to generate IDs in parallel could perhaps result in duplicates.

An ID generator such as in this example is a bottleneck in the system and should be avoided whenever possible. But sometimes there are situations for which it is necessary. In this case, Akka provides a facility for ensuring that only a single instance of an actor is available throughout a clustered system. This is done through an Akka cluster singleton.

A cluster singleton works by having the singleton always run on the oldest available node in the cluster. Determining which node is the oldest is done via the actor system’s gossip mechanism. After the cluster has established the oldest node, the singleton instance of the actor can be instantiated on that node. All of this is handled behind the scenes through the use of a cluster singleton manager. This manager will take care of the details of ensuring the singleton is running.

There is a wrapper actor around the singleton called the cluster singleton proxy. This proxy is used to communicate with the singleton. All messages are passed through the proxy, and it is the job of the proxy to determine to which node to send the message. Again, where that singleton lives is transparent to clients. They need know only about an instance of the proxy, and the details are handled under the hood.

But what happens if the node hosting the singleton is lost? In this case, there will be a period of time during which that singleton is unavailable. In the meantime, the cluster will need to reestablish which node is the oldest and reinitialize the singleton on that node. During this period of unavailability, any messages sent to the cluster singleton proxy will be buffered. As soon as the singleton becomes available again, the messages will be delivered. If those messages had a timeout associated with them, it is possible that they will timeout before the singleton is reestablished.

One of the big disadvantages of the singleton pattern is that you cannot guarantee availability. There is simply no way to ensure that there is always a node available. There will always be a transition period from when the singleton is lost to when it is reestablished. Even though that period might be short in many cases, it still represents a potential failure in the system.

Another disadvantage to the singleton pattern is that it is a bottleneck. Because there can be only one instance of the actor, it can process only one message at a time. This means that the system could be stuck waiting for long periods while it works its way through a backlog.

Another potential issue is that when the singleton is migrated from one node to the next, any state stored in the singleton is lost. It will then be necessary to re-create that state when the singleton is reestablished. One way to do this would be to use Akka Persistence to maintain that state. In that case, it will be automatically restored when the singleton is re-created.

Because of the disadvantages of the cluster singleton pattern, it is recommended that you avoid it, except when absolutely necessary. And when it is necessary, you should keep it as small as possible. For example, if you were using the singleton to generate unique user IDs, it would be best if that was all it did. Although it might be tempting to have the singleton create the users in the database and initialize their state, each additional operation the singleton needs to perform creates a bigger bottleneck. If you can limit it to a single operation, ideally in memory, you can ensure that you are keeping it as available as possible.

Here is an example of what a very simple ID generator might look like if it were implemented using Akka Persistence:

object IdProvider {
  case object GenerateId
  case class IdGenerated(id: Int)

  def props() = Props(new IdProvider)
}

class IdProvider extends PersistentActor {
  import IdProvider._

  override val persistenceId: String = "IdProvider"

  private var currentId = 0

  override def receiveRecover: Receive = {
    case IdGenerated(id) =>
      currentId = id
  }

  override def receiveCommand: Receive = {
    case GenerateId =>
      persist(IdGenerated(currentId + 1)) { evt =>
        currentId = evt.id
        sender() ! evt
      }
  }
}

The IdProvider class is very simple. It takes a single message, GenerateId, and returns a result IdGenerated. The actor itself, upon receiving a GenerateId command, will create an IdGenerated event, persist that event, update the current ID, and then send the event back to the sender.

When the IdProvider is re-created, in the event of a failure, it will playback all the previous messages using the receiveRecover behavior, restoring its state in the process, so nothing will have been lost. It will then continue to generate IDs on the recovered node as though there were no interruption.

This IdProvider is in no way related to Akka Cluster. This code could be used in any actor system, whether it is clustered or not. This is an example of how location transparency can be a benefit. You could build your system under the assumption that this will be a local actor, and later, when you decide to make it a clustered actor, this code doesn’t need to change.

After you have decided that you want to cluster this actor and in fact make it into a singleton, you can do that with very little actual code. To cluster this actor, use the following:

val singletonManager = system.actorOf(ClusterSingletonManager.props(
    singletonProps = IdProvider.props(),
    terminationMessage = PoisonPill,
    settings = ClusterSingletonManagerSettings(system)),
  name = "IdProvider"
)

This is creating the instance of the ClusterSingletonManager. This manager must be created on any nodes on which you might want to host the singleton. It will then ensure that the singleton is created on one of those nodes.

You don’t send messages through the manager; to send messages to the actor, you need to create a ClusterSingletonProxy. To create a proxy, use the following:

val idProvider = system.actorOf(ClusterSingletonProxy.props(
    singletonManagerPath = "/user/IdProvider",
    settings = ClusterSingletonProxySettings(system)),
  name = "IdProvider")

This proxy takes a path to the actor. What you get back is an ActorRef, but that ActorRef will now proxy to the singleton, wherever it might be. You can use this ActorRef just like any other ActorRef in the system. Again, this demonstrates the power of location transparency. Any actors that previously were sending messages to a local ActorRef can now be given this proxy instead. They don’t need to know that the proxy is now a cluster singleton actor. They can continue to operate as though the actor were local, and the bulk of the code remains unchanged. The only place in the code where you do need to make changes is in the construction of the actor. Rather than constructing a local actor, you are now constructing the cluster singleton proxy. That’s it.

Scalability

Scalability refers to the ability of a system to handle higher load without failure or sharp degradation in performance. It is only tangentially related to performance, and optimizing performance is no guarantee of scalability (and vice versa).

Increasing performance means that your system can respond faster to the same load, whereas scalability is more concerned with the system’s reaction to higher load, whether that is more simultaneous requests, a larger dataset, or simply a higher rate of requests.

Figure 6-2 graphs the response time under increasing load for a system. The graph shows that a system that fails to scale will tend to have an inflection point where it either fails (wholly or partially) and some requests are not handled, or where it begins to rapidly decline in performance; that is, the response time for each request goes up rapidly with increasing load. The latter will usually also result in failure at some point—the client will often time out when the response time is too long.

A System Under Load
Figure 6-2. Response time versus number of requests

A system with increased performance will move the response-time line down (indicating a faster response time) without moving anything else, as depicted in Figure 6-3.

Increasing Performance
Figure 6-3. Effect of performance on response time

A system that increases its scalability, however, moves the entire curve to the right; that is, the point of diminishing returns indicated by the response times increasing sharply toward an unacceptable amount doesn’t occur until the system is under higher load. As Figure 6-4 demonstrates, the performance need not change at all (although some degree of coupling between these two is normal).

Increasing Scalability
Figure 6-4. Effect of scalability on the number of requests

Systems tend to scale along two axes: horizontal and vertical. Vertical scalability means if you run your system on a larger machine with more resources (CPU capability, memory, and so on), you can handle a higher load. This is usually easier, but is not a given—some systems don’t benefit much by this, because they become bound by constraints other than CPU and memory, so increasing them doesn’t help beyond a certain point.

The horizontal axis, on the other hand, is of much more interest. This is when you add additional systems or nodes to your group of nodes that comprise the entire system. By doing so, you gain the ability to handle more load. Often, scaling out this way doesn’t result in any performance gain: the system continues to handle each request on average in about the same amount of time—it’s just that you have the ability to handle more of them.

Let’s take a very very simple compute problem, adding two integers, and think about its scalability.

On a single system, adding more compute and memory might buy you a bit, until you hit the maximum number of incoming requests—let’s assume, as is often the case, that requests come in over the network in some fashion. You will likely run out of network bandwidth at some point on the scale of increasing volume of requests, no matter how many integers the system can add and no matter the speed.

This problem, however, scales out horizontally really well. By adding a second machine, the system can handle roughly twice as many requests in the same period. In this super-simple example of adding two integers, the only potential point of commonality in the group of servers you are building would be in request routing—for example, a network switch or load balancer, perhaps—and such units can be scaled quite effectively.

We are starting with what is known as a share nothing architecture; that is, our nodes don’t share any information at all with each other—they just add integers, return the result, and are ready for the next request.

As the computational demands on the system grow, however, the picture of course changes. As soon as there is a need for the nodes to share state, no matter how straightforward, the possibility of introducing a bottleneck arises. Let’s say, to keep it simple, that you want to count how many requests the system has processed in total and make this number available via some kind of API.

The total requests for the cluster is a surprisingly complex computational problem. For example, when is this total incremented? Is it at the end of the request or at the beginning? If the total is continually accurate, you have a need for global state, which, as we’ve already discussed, is the root of many problems in a distributed system. It’s easy enough to think about computing the requests on a single node (although even this has a microcosm of the same issues), but as soon as you add a second machine, you add a difficult problem.

In distributed systems, and in Akka in particular, eventual consistency and the Actor Model are the way that you can approach this problem. Instead of trying to have a single up-to-the-instant total of the number of requests, you should opt for an eventually consistent number on each node and an eventually consistent total for the cluster. In the Actor Model, you would likely send a message to an accumulator actor on each node for each request, and then have that accumulator report the totals in turn to a cluster singleton accumulator. The cluster singleton accumulator would then have a view on the number of requests that is being updated constantly, but would always lag in reality by a short (and likely irrelevant) interval.

Handling such a count is actually a good use case for monitoring, which is a different flavor of computation designed to allow near-real-time visualization of the operation of a system—in this case, a distributed system.

In cases of high load, you likely wouldn’t have each computing actor (e.g., the ones adding the two numbers) report every request. Instead, you would probably hold a count for a few requests and report every 100 or so (unless there are no requests for a short interval) to keep the message volume more manageable. You might want to consider allowing the “rollup” count to vary, depending on load. This way, when the load is low, the system reports every request or two, but under higher volume, it reports only every 100 or every 1,000.

As you can see, it can become surprisingly complicated in a short period of time, but following the pattern of the Actor Model in general prevents you from doing things that are simply not suitable to high scale.

The key portions of the definition of scalability are “without failure” and “degradation in performance,” so let’s examine each of these in turn.

The “without failure” portion implies that even as the load increases, the system won’t suddenly give up and fail to handle requests. It should “degrade in performance,” ideally in a nonabrupt manner, as resources become fully consumed on the cluster.

These two go hand in hand, because when a portion of an Actor Model system fails, it should not affect other parts. This means that from the client’s point of view—the client being the component that is sending all these numbers—nothing broke, even though one or more of your actors or nodes did in fact have a problem.

If you have enough nodes (and figuring out “enough” is another whole conversation, although it should generally be an absolute minimum of three), the system’s performance should decrease slowly as load increases, as some nodes (ideally evenly distributed) become saturated.

A strongly related concept to scalability is elasticity. This is the ability to add resources to a running system, thus increasing its ability to scale, all without the clients being affected.

In this scenario, if you can add new nodes to your load-balancer, and those new nodes begin sending statistics to the cluster singleton accumulator, you can achieve this. As the cluster grows, you might get to the point at which the messages to the cluster singleton become the limiting factor, but we discussed the tactic of sending accumulated messages less frequently as a way to compensate.

Even in this relatively trivial example, you can see that monitoring can be very helpful: if you can see, for instance, that the queue length of unprocessed messages for the actor that is rolling up your counts is becoming higher over time, you can use this information to adjust the accumulation factor, providing some simple self-tuning in the system. If you also use the totals as a means to detect when new nodes should be added as load increases, you have achieved one half of elasticity. The other half is the reverse: you also need to be able to scale down your cluster when load decreases.

If compute resources are a cost, this scaling down can be as important as scaling up, and an elastic system is a significant tool for saving money.

The patterns for building a scalable system generally revolve around a few basic principles, although the application of these principles can be quite sophisticated. Let’s take a look at them in the following subsections.

Avoid Global State

If you can avoid global state, you will have bypassed the biggest single limiting factor on the scalability of a system. If you require such state, you can achieve it using a cluster singleton actor as a wrapper; however, you will be sacrificing scalability by doing this.

Avoid Shared State

Global state is, of course, just the extreme case of any shared state; that is, information shared between the nodes of your cluster. If you minimize or ideally eliminate such shared state, many of the obstacles to scalability go away, as well. Failing that, you again isolate the shared state behind an actor, and carefully monitor its impact.

Follow the Actor Model

Building your system entirely in the Actor Model, and not just using actors here and there, allows the maximum flexibility in adapting to the changing load because any portion of the system can be distributed further with the location transparency of actors.

Avoid Sequential Operations

We did not encounter this in our simple example, but many systems with a goal of scalability have the flaw of requiring sequential operations. This is a form of state, and it spans time, as well, which is a double hazard.

If a certain operation cannot be performed until some other operation is complete, you are forced to make your actors use something like a finite-state machine (FSM) model to accommodate this, which in many cases severely limits scalability.

Although it is often very difficult to take a design and make its messages not depend on sequence, you can achieve it with some careful design, and it is always worth it from the perspective of scalability (not to mention the simplicity of the system itself usually increases—a pleasant side effect).

Isolate Blocking Operations

Our example didn’t include I/O, but this is the usual culprit when introducing blocking operations into any actor-based system. If you must have such operations, again let the Actor Model be your guide, and with Akka, isolate the dispatcher for blocking operations from the dispatcher used for nonblocking operations. This way, you are able to tune them independently. Isolating these operations to another thread pool is key to keeping your application responsive. We will discuss how and where to do this in more detail in Chapter 9 when we discuss dispatchers.

Monitor and Tune

Monitoring a running actor system under load is the only way to truly gauge its behavior because it is by nature nondeterministic, especially when distributed.

Knowing what to monitor and how this should affect your tuning decisions is discussed in other sections of this book, and indeed in many other books. Monitoring is an art worthy of study in and of itself because it can yield significant gains in scalability.

Cluster Sharding and Consistency

In many systems, consistency and scalability are enemies. If you want consistency, you must sacrifice scalability. Consistency requires that nodes in a cluster share information among themselves. This in turn reduces scalability. But it doesn’t need to be that way. Akka cluster sharding provides a way to bridge the gap between consistency and scalability within a single concept. It does this by allowing you to create boundaries that control consistency and scalability. But before we delve deeply into how it does that, let’s first talk about what sharding is.

Sharding

The concept of sharding has been in use in database systems for a long time. It is a powerful tool that makes it possible to scale and distribute databases, while keeping them consistent. The basic idea is that every record in your database has a shard key attached to it. This shard key is used to determine the distribution of data. Rather than storing all records for the database on a single node in the cluster, the records are distributed according to this shard key.

Figure 6-5 depicts a very simple two-shard setup. In this example, you could use a numeric key: all odd-numbered entries would be on one shard (node), whereas all even-numbered entries would be on the other shard (node).

Database Sharding
Figure 6-5. Effect of sharding a database

By distributing the records in this way, you reduce contention for the shared resource. It gives you the ability to distribute the load across the nodes in the cluster rather than requiring all requests to go to the same node. This also distributes the consistency requirements. All requests for a specific record will always go to the shard that contains that record. You need to maintain consistency of that record within that shard. However, maintaining consistency among different shards is not required, because they don’t share any data.

To enable sharding, there needs to be a reliable way to determine on which shard the data resides. This is often done by using a special node that routes traffic to the appropriate shard. This router doesn’t need to do a lot of work; it just keeps track of the shards and routes traffic as necessary. Although it is still a bottleneck in the system, the work it does is trivial, so the effect of that bottleneck is minimized.

Sharding in Akka

Akka cluster sharding takes this idea a step further. Rather than sharding the data, you shard live actors across the cluster. Each actor is assigned an entity ID. This ID is unique within the cluster. It usually represents the identifier for the domain entity that the actor is modeling. You provide a function that will extract that entity ID from the message being sent. You also provide a function that takes the message and computes the ID for the shard on which the actor will reside.

When a message is sent, the aforementioned functions are applied to locate the appropriate shard using the shard ID, and then to locate the actor within that shard using the entity ID (see Figure 6-6). This makes it possible for you to locate the unique instance of that actor within the cluster. In the event that no actor currently exists, the actor will be created. The sharding system will ensure that only one instance of each actor exists in the cluster at any time.

Locating Sharded Actors
Figure 6-6. Message flow in a sharded system

Shards are distributed across the cluster in what are known as shard regions. These regions act as hosts for the shards. Each node that participates in sharding will host a single shard region for each type of sharded actor. Each region can in turn host multiple shards. All entities within a shard region are represented by the same type of actor.

Each individual shard can host many entities. These entities are distributed across the shards according to the computation of the shard ID that you provided.

Internally, a shard coordinator is used to manage where the shards are located. The coordinator informs shard regions as to the location of individual shards. Those shard regions can in turn be used to send a message to the entities hosted by the shards. This coordinator is implemented as a cluster singleton. However, its interaction within the actual message flow is minimized. It only participates in the messaging if the location of the shard is not known. In this case, the shard region can communicate with the shard coordinator to locate the shard. That information is then cached. Going forward, messages can be sent directly without the need to communicate with the coordinator. The caching mechanisms mean that the communication with the coordinator is minimal and as a result it rarely, if ever, becomes a bottleneck. Figure 6-7 presents an overview.

Cluster Sharding
Figure 6-7. Components of cluster sharding

Shard Key Generation

For sharding to be efficient, the function that is used to determine the shard ID must provide a fairly even distribution. If the distribution is poor, you can end up with a situation in which all of the traffic is routed to the same shard, which defeats the entire purpose of sharding. Let’s consider a quick example.

In Chapter 2, we talked about the possibility of using the first letter in a person’s name as the shard ID. This accommodates 26 shards with actors distributed across these shards. However, this strategy has some serious drawbacks. Certain letters are more common for names. T, S, and A, for example, might be very popular, whereas Q or X are much less popular. This means that the distribution of shards won’t be even. Some shards will have many actors, whereas others will have very few. This can lead to certain nodes in the cluster receiving the bulk of the traffic while other nodes sit idle.

The goal is to have the entity actors be evenly distributed among the shards. A common way to achieve this is to take your entity’s unique identifier and compute a numeric hash. This gives you something fairly random, which should allow for a fairly even distribution. This by itself is insufficient, however. Simply computing a numeric hash will result in a very large number of shards. This is usually undesirable. Too many shards can increase the maintenance of the cluster. Thus, you want to reduce the number of possible shards. You can do this by computing the modulo of the hash by the desired number of shards, as shown here:

(entityId.hashCode() % maxShards).toString

Shard Distribution

So, how do you determine what the appropriate number of shards is? Having too many shards creates a large maintenance burden on the sharding system; having too few can create other issues.

If there are too few shards, it can become impossible to distribute them evenly across the cluster. For example, if you have only two shards in a cluster of three nodes, you will have one node that is doing nothing. This obviously is undesirable. Conversely, if you have four shards in a cluster of three nodes, one node in the cluster will be required to do roughly twice as much work as the others. This, too, is a situation that you want to avoid.

A good rule of thumb is to have at least 10 times the maximum number of cluster nodes. This means that each node in the cluster will be hosting 10 or more shards. This is a good balance. It means that as nodes are added or removed from the cluster, the rebalance of the cluster won’t put a significant burden on any single node. The shards can be redistributed evenly across the cluster.

Consistency Boundary

With this basic understanding of how sharding works, how does it help with consistency specifically? Sharding provides a consistency boundary in the form of a single consistent actor in the cluster. All communication that is done with a specific entity ID always goes through that actor. The Akka cluster sharding mechanism ensures that only one actor with the given ID is running in the cluster at any time. This means that you can use the single-threaded illusion to your advantage. You know that any requests will always go to that actor. You also know that actor can process only one message at a time. This means that you can guarantee message ordering and consistent state within the bounds of that actor.

This does create a potential bottleneck, though. As Figure 6-8 illustrates, all requests for a single ID go through the single entity actor assigned to that ID. Thus, if many requests are coming into that actor, the actor becomes a bottleneck. Messages might become backed up in the actor’s mailbox if the load is large enough. In practice, though, this almost never manifests as a real problem. As long as you have been careful in deciding which actors to shard, you should be able to easily avoid this bottleneck. And if it should ever manifest itself, it will be isolated to a single entity. All requests for that entity go through the same actor, but all requests for all other entities go through different actors. So, in the event that a flood of messages comes into that actor, it will slow only that ID. All other IDs will not experience issues. In a real-world scenario, if we sharded based on User ID, only a single user would experience the bottleneck; all other users could continue to operate without noticing any issues.

Consistency
Figure 6-8. The consistency boundary of an entity

This approach yields total consistency for the given entity within the boundary of that actor, but there is a cost to maintaining this consistency that you need to be aware of. According to CAP theorem, you must choose two characteristics from among the three that you’d ideally like to have: consistency, availability, and partition tolerance. You can never have all three. Partition tolerance is not usually something that production systems are willing to sacrifice, which leaves you selecting between consistency and availability. In our case, we have opted for consistency, and as a result we have lost availability. In the event that the node hosting the shard is lost, or a rebalance occurs, there will be a period of time during which the entity actors are unavailable while the shard is being moved to another node. Even though the migration itself is fast, taking only a few seconds, the decision as to whether to migrate is largely dependent on your configured failure-detection mechanisms. Depending on how you have set up failure detection, the migration could be significantly longer.

Scalability Boundary

We have used sharding to enable consistency, but if consistency is the enemy of scalability, how can sharding help? By creating a boundary around the consistency, isolating it to a single entity, you can use that boundary to allow scalability. Consistency is limited to a single entity, and therefore you can scale the system across multiple entities.

When you need to scale the system, you can create new nodes and redistribute the shards across those new nodes, as demonstrated in Figure 6-9. This means that the work that was previously handled by one node can potentially be handled by multiple nodes. This reduces the overall amount of work that any individual node needs to perform, yielding great scalability potential.

Scalability
Figure 6-9. Scaling multiple entities but maintaining consistency within those entities

Sharding Aggregate Roots

Sharding relies heavily on our ability to correctly select the proper entities to shard. This requires an understanding of where consistency is required within your domain. So how then do you decide what is a suitable candidate for sharding?

As usual, when deciding where to apply sharding, you should go back to the principles of domain-driven design (DDD). You have already established certain criteria for sharding. The actors must be uniquely identifiable. This means that if you have modeled your actor system after your domain, the actors will be entities in the domain. But we can take it a step further than that.

When sharding, it is often desirable to branch out to the sharded actors based on the entity ID, but then within that entity ID, you often want to keep all operations on the local machine. That is, you use the cluster to locate the sharded actor, but after that you minimize any remote communication. Minimizing the remote communication improves performance because you eliminate the network latency from the equation. It also eliminates the need to serialize and deserialize messages.

Often, a good place to begin looking at sharding is the system’s aggregate roots. Aggregate roots often give us a natural consistency boundary within a domain. We usually care that the data within that aggregate is consistent, but across multiple aggregates it is not critical. When performing operations in the domain, you typically touch only a single aggregate root, which means that you can potentially minimize network communication. The bulk of the operation can be performed by the aggregate root, its children, or local services, calling out over the network only when it becomes necessary. By reducing the amount of communication that happens over the network, you improve the efficiency of the operation.

In our project management domain, one of the aggregate roots is a person. This will be a good place for us to investigate sharding. From the perspective of consistency, this provides a lot of benefits. You can guarantee all messages go through a single actor for a single person. This means that you never need to worry about two projects trying to schedule the same person at the same time. This single-entity actor provides a consistency boundary that prevents this from happening. It also gives you a very nice place to provide scalability. You can process multiple people across many different machines, which allows you to distribute the load. The asynchronous, distributed nature of actor systems are a natural fit to provide the consistency and scalability that you need.

Persistence

Distributing the actors in this way means that the sharding system must occasionally perform rebalances. These rebalances mean that any actors that were in memory need to be unloaded from memory. When this happens, you lose any state. The actors might eventually be reestablished in their new home, but you will have lost everything.

There are a number of ways to reestablish this state in the new location, but perhaps the most convenient way is to take advantage of Akka Persistence. Akka Persistence requires that actors be uniquely identifiable. Cluster sharding has the same requirement. This makes it very natural to pair the two. The Persistence ID for the actor becomes the Entity ID.

Akka Persistence provides built-in support for event sourcing. Commands that are sent to the actor are persisted as events in a journal. When the actor is re-created later, it can replay those events to reestablish the previous state. This means that the actor can be unloaded from memory at any time because you can always rebuild the state later from the event journal. You can think of commands as the thing you are asking your actor to do, and the events are the things that the actor has done. There is not necessarily a one-to-one relationship between the commands and events. It is possible for a command to be rejected, resulting in no events. It is also possible for a command to issue multiple events.

With your actor established as persistent, you can ensure that if a rebalance occurs and the actor needs to migrate to another node, when it does so it will be reestablished with all of its state intact. This means that the location of that actor doesn’t need to be consistent. It can be flexible as necessary.

Passivation

It is unreasonable to expect that a system will maintain all actors in memory at all times. The presence of persistence means that you no longer need to keep your actors in memory. You can load them and unload them as required, and because they are persistent, when they are loaded again, they will reestablish their state. However, if you simply stop the actor, any messages pending waiting in that actor’s mailbox will be lost. A PoisonPill allows the mailbox to be drained but doesn’t prevent new messages from entering the mailbox after the PoisonPill.

To alleviate this, sharding introduces the concept of passivation. Rather than simply stopping, an actor can instead passivate. In this case, a message is sent to the shard region, informing it that this specific actor is passivating. This signals the shard region to begin buffering any new messages to that actor. In the meantime, the actor is sent a custom message that is placed in its mailbox. The actor will eventually receive that message and can then shut itself down. If no new messages were buffered by the shard region, the actor will remain shut down. In the event that a new message is received, after the shutdown the actor will be re-created and the new message will be delivered.

Often this is achieved by using the setReceiveTimeout operation within an actor. This operation makes it possible for you to trigger a ReceiveTimeout message if no new messages are received within a given amount of time. When this idle period has elapsed, the actor can then choose to passivate.

Passivation means that when an actor is working hard, all of its state can be kept in memory and it can process new requests very quickly and efficiently. However, when that actor becomes idle, it can be unloaded from memory. When more requests come into the actor, it can be re-created on the first message and then, again, every message after that can operate on the in-memory state, allowing for huge efficiency gains. This basically turns your actor into a write-through cache for the state of that entity.

Using Cluster Sharding for Consistency

Cluster sharding requires a minimal amount of code to use in its most basic form. Without persistence or passivation, any actor can be sharded with a few very simple changes. The main change is in the way that you send a message to the actor. Rather than sending directly to the actor, you need to send it to the shard region. You do this by creating the shard region:

val shardRegion: ActorRef = ClusterSharding(system).start(
    typeName = "People",
    entityProps = Person.props(),
    settings = ClusterShardingSettings(system),
    extractEntityId = extractEntityId,
    extractShardId = extractShardId)

This example creates a shard region called People. This shard region is going to host actors of type Person. You can see where we have passed the Props to construct a Person so that the shard region will know how to create them. This code also passes the extractEntityId function and the extractShardId function. These allow you to take the incoming message and parse it to extract the necessary shard ID, entity ID, and the actual message.

Again, in the absence of Akka Persistence and passivation, this will work as is. No other changes are necessary. However, a nonpersistent sharded actor is not that useful due to the loss of state that might happen during a rebalance. So what does the actor look like if we want to include persistence and passivation? Let’s take a look at it in the following example:

class Person extends PersistentActor {

  override def persistenceId: String = s"Person-${self.path.name}"

  context.setReceiveTimeout(timeout)

  override def receiveRecover: Receive = {
    case AddToProject(project) =>
      // Update State
  }

  override def receiveCommand: Receive = {
    case AddToProject(project) =>
      persist(AddedToProject(project)) { addedToProject =>
     // Update State
    }
    case ReceiveTimeout =>
      context.parent ! Passivate(PoisonPill)
  }
}

This is a very simple stub of what such an actor might look like. It is a very simple persistent actor. The only difference from any other persistent actor is the presence of the context.setReceiveTimeout(timeout). This timeout will cause the actor to receive the ReceiveTimeout message if it hasn’t seen any new messages within the given timeout. When this occurs, we send the Passivate(PoisonPill) to the parent actor. This in turn instructs the host shard region to shut down the actor using a PoisonPill. If you want to, you can use a custom shutdown message, which would be delivered to the actor in order to shut it down.

So, what do the extractEntityId and extractShardId functions look like? This will vary depending on your implementation, but let’s have a look at a very simple example.

A common practice when using cluster sharding is to create an envelope message to contain the various pieces necessary for sharding. This envelope is not required. If all of the information is present in the existing message, you can just set up the extractors to pull that information as required. However, if pieces of information are not present, you can wrap the message in an envelope:

case class Envelope(entityId: EntityId, message: Message)

This envelope can take whatever form is best for your particular use case. For this case, the envelope simply gives you the entityId and message as separate pieces.

The extractor functions are then trivial to implement:

def extractEntityId: ExtractEntityId  = {
  case Envelope(id, message) => (id.toString, message)
}

def extractShardId: ExtractShardId = {
  case Envelope(id, _) => (id.hashCode % maxShards).toString
}

ExtractEntityId is actually an alias to a partial function with the signature PartialFunction[Any, (String, Any)]. These, and other, types of aliases provide better readability to the code. You can see that this function is just extracting the components of the envelope and converting id to a string.

ExtractShardId is an alias to a partial function with the signature PartialFunction[Any, String]. In this case, it is computing the shard id and then converting the numeric value to a string.

Again, we emphasize that the envelope and the extractor functions can be as simple or as complex as your use case requires. If your use case requires additional information to be stored in the envelope to compute the shard key, you can add it in. There is nothing special about the structure of that envelope or the extractor functions. You can define them however you like.

In general, though, it is probably best to start simple. Use a very simple envelope (or no envelope at all) and do a minimal amount of work in your extractors. As the complexity of the application grows, you can look at enriching this functionality if necessary.

Using cluster sharding, you can create systems that provide a good balance between consistency and scalability. It can allow you to scale a system to many nodes with relative ease within some clearly defined boundaries. Combined with Akka Persistence and passivation, you can guard against the loss of state or the loss of messages in the event of a rebalance or failure. But you still need to accept the fact that failures can, and will, occur, and although these techniques can help with certain types of failures, they don’t cover everything. In Chapter 7, we will look in more detail at the different types of failures our application can experience, and how we can use the principles of Akka to mitigate those failures.

Conclusion

Now that we have discussed the balance of consistency and scalability, and given examples of the appropriate levels of consistency in a distributed system, we will consider the failure cases more closely. What happens when portions of your system fail, and how should your system react?

A highly distributed system is more likely to have some portion of the system fail than a single-node system—there are simply more moving parts that can potentially go wrong.

At the same time, however, a properly formed distributed system is far less likely to have the system as a whole fail, as we will see in Chapter 7.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset