Chapter 5. Good Data Flow

In Chapter 4, you saw some of the ways that you can implement individual actors and some of the pitfalls you can encounter when doing so. Now you need to consider how you can use those actors to build up systems. A single actor alone isn’t much help to us. It’s only by combining them that we unlock the real potential of Akka. In this chapter, we take a look at some patterns to follow when using multiple actors. We also introduce the concept of Akka Streams, which provides a domain-specific language for streaming messages.

Throughput Versus Latency

Before we go too far discussing how to combine actors effectively, let’s digress briefly to talk about throughput versus latency.

When we measure our code to determine its performance characteristics, often we are measuring how long a particular action takes. We measure from the moment that action is initiated, to the moment it completes. It might take 10 milliseconds or perhaps 10 seconds. In either case, we call this latency.

Sometimes, instead of measuring how long a particular action takes, we will instead measure how many actions can occur in a specific amount of time. Here we aren’t concerned with how long each individual action takes, just how many of them we can complete within a fixed amount of time. For example, we might measure how many actions we can perform in 10 seconds. In this case, we are measuring throughput.

Whether you measure latency or throughput, you always need to look at the values through the lens of your domain. In some domains, a 10-second latency, or even a 10-hour latency, might be more than sufficient, whereas in other domains it is far too long. You need to consider the domain in which these measurements were taken and whether those measurements represent a good user experience.

But when you build highly concurrent systems, you can’t simply look at one measurement or the other. You need both throughput and latency. If you measure only one or the other, you get an incomplete picture that can be quite misleading. Let’s consider a couple of examples.

Suppose that a user logs into your system. You measure the latency of this login and it takes 500 milliseconds. From a user perspective, 500 milliseconds isn’t that noticeable. If a login takes 500 milliseconds, the user is probably happy and your system is probably working just fine. But that is purely a measure of latency. When you actually turn on the system, you find that you have users waiting as long as 10 seconds to log in! What happened?

Because you measured the latency alone, you failed to account for the fact that the system (for whatever reason) is basically single threaded. Maybe there is a shared resource on which each operation has to block. Maybe you literally allocated only a single thread to do the work. Whatever the case, if each user takes 500 milliseconds and 20 users attempt to log in at the same time, either those users are going to have to wait in line, or they are going to need to fail. So, although the first user was able to log in within 500 milliseconds, the last person in the queue had to wait a full 10 seconds.

What if you measured only throughput, instead? When you do so, you discover that 500 users can log in within 10 seconds. In this case, 500 people logging in within 10 seconds seems reasonable. So you don’t need to do any more than that, right? Or do you?

In this scenario, you have measured the throughput but ignored the latency. You discovered that you can have 500 users logged in within 10 seconds, but you didn’t bother to see how long each user takes. If it takes the full 10 seconds for each user and you are just doing 500 users at the same time, you might still have created a terrible user experience. If an application consistently takes 10 seconds just to log in the user, there is a good chance the users are going to walk away.

It’s only by measuring both throughput and latency that you can truly understand the full picture. You need to understand not just how long events take, but how many events can happen at once. Using this information, you can detect the true bottlenecks in your system and work to resolve them.

Now that you understand the basics of throughput and latency, let’s take a look at how you can build actor systems that optimize both.

Streams

When we build software systems, we often talk about building streams of data. These are processes wherein the data goes through some series of steps, one after the other, to produce some output. Generally, when we talk about streams, there is an assumption of order. If you put two pieces of data into the stream, you expect their transformed output to come back in the order in which the inputs were sent. Of course, when you introduce concurrency into the mix that changes. Suddenly, you can send the two inputs in order but the outputs come back reversed. Sometimes, that’s OK, but often it isn’t. So, how can you use a concurrent system like Akka to build streams in which you can still have a guarantee of order?

Recall that actors in Akka use a mailbox. That mailbox acts like a queue. It operates on a first-in/first-out basis. This means that you can guarantee that whatever we put into a single actor, the outputs will return in the expected order. It is this concept of an ordered mailbox that allows you to build streams using Akka.

You can use ordered mailboxes to pass messages between a discrete set of actors. If you are careful with how you do it, you can take advantage of concurrency and yet still keep the ordering guarantees. Figure 5-1 shows what this might look like.

Pipeline of actors maintaining message ordering
Figure 5-1. Pipeline of actors maintaining message ordering

The figure depicts a very basic stream. Some data is read, and then it is validated, transformed, and written. You could implement this easily without actors. You could implement each step as a blocking operation so that you could guarantee that data being read would be written in the same order. Of course this isn’t particularly fair with how you share your resources. If you are waiting for the write to complete, nothing upstream can take place because you are blocked.

If, however, you implement each of these stages as an actor, it opens up some interesting improvements. Because you have no branch points in this stream, you can still guarantee order just as you could without actors. The Read stage cannot process any more messages until it has passed its current message onto the Validate stage. The Validate stage cannot process any more messages until it passes its message to the Transform stage. But because these are actors, the Read, Validate, Transform, and Write stages can all be working on messages concurrently. The Read stage can accept the first message, pass it on to the Validate stage, and immediately start on the next message. It doesn’t need to wait for the message to trickle all the way down to the Write stage. If the Write stage takes a long time to complete, it is possible that messages will back up in the queue until those writes complete, but this also means that the Write stage will be able to begin working on the next message as soon as it completes. It also doesn’t need to wait for messages to flow down through the stream. That already happened while you were waiting.

This is a perfect example of how measuring just latency can be deceiving. If you were to measure the latency of each message going through the entire stream, it would appear that the time to process each message completely is equal to the sum of the times from each stage, as shown here:

Latency = timeToRead + timeToValidate + timeToTransform + timeToWrite

When you measure the values for each of the stages, you might end up with something like the following:

timeToRead

timeToValidate

timeToTransform

timeToWrite

2 ms

2 ms

6 ms

10 ms

Now you could use the equation to determine that the latency was 20 milliseconds. You could then infer that the system can deliver new results every 20 milliseconds, giving you a throughput of 50 messages per second. But if you did, you would be wrong. Although this is true for a single-threaded system, it does not apply to a system built using actors. Using actors, you must account for the fact that each stage can be happening in parallel. The throughput of the system is limited by the slowest stage in the stream—in this case, the Write stage. This means that the throughput could end up being 100 messages per second or one message every 10 milliseconds. By measuring only the latency, you would miss this fact.

One of the major benefits to the actor approach is that it introduces concurrency in the system, without sacrificing determinism. You can still guarantee the message order on both sides of the stream.

Streams like this are a powerful tool for optimizing Akka applications. They allow you to take full advantage of the modern multicore systems to which we have access. But by themselves, they are not going to solve all of our scalability problems. Even though you can see that streams like this can improve throughput, you can’t scale like this forever. And there is a hidden cost here. Although our total time to process messages might be better using actors than it would have been using the blocking functions, the reality is that at each stage you can still process only a single message at a time. That means the other messages must sit waiting in a queue, and the length of time in that queue is potentially unbounded. So, although streams are an important tool to have, you need other tools to reach high scalability.

Routers

Although streams give us an ability to improve throughput, we can see that this ability to do so is restricted by the number of stages into which we can break our operation and the time it takes at each stage. We also can see that in certain use cases it might be more efficient to use futures because they have the potential to yield higher throughput. But what if you want to achieve the same level of throughput by using actors? Can you do it?

To improve throughput beyond the level that a stream can provide, you need to be able to process multiple streams at the same time. This is where routers come in. As Figure 5-2 demonstrates, a router is simply an actor that can take a message and route it to one of many actors. Routers are important because they offer the means to scale beyond what a stream can provide, at the cost of introducing nondeterminism.

Router
Figure 5-2. A router

Akka provides many different types of routers, but at heart they all serve a similar purpose. A router takes a single message and sends it to one or more routee actors. The logic of which routee to send to or how many of them to send to depends on the implementation of the router. Of course, if the existing routers are insufficient for the needs of your application, you can simply create your own actor that encapsulates the routing logic that you desire.

Because of the branching nature of routers, you lose the determinism that a stream provides. When you send a message through a router, you can’t guarantee that the first message in will be the first message out. Depending on the routees it passes through and how the threads have been allocated, it is possible that the messages might leave the system in any order. The trick then is to ensure that you use routers for cases in which the message order is not important, and use streams for cases in which it is important.

Often, when you build systems for which order is important, it is only important at a certain level of the system. For example, in our sample project management domain, when processing requests, it might be critical that certain requests are processed in order. If we are attempting to commit someone to a project, having messages arrive out of order can be disastrous. We can end up committing someone to a project who is actually unavailable. Or we might end up agreeing to a contract that we can’t possibly fulfill. It might be important that at certain levels of the application we process messages in a very specific order. On the other hand, there might be areas of the system in which that requirement is less critical. A very simple example is that you might need messages for a particular user to be processed in order, but you might not require that messages be in order across multiple users. In this case, it is possible to design a stream that provides the ordering guarantee when processing an individual user, while using a router in order to process multiple users in parallel, as demonstrated in the setup shown in Figure 5-3.

Routers and Streams
Figure 5-3. Routers and streams

We have created a router that has the job of directing messages to one of three streams. All messages for UserA go to the first stream, all messages for UserB go to the second stream, and all messages for UserC are sent to the third stream. You can create this setup by using a router with consistent hashing logic. You could also do it by using a custom router. This setup is also common when using Akka cluster sharding, which we will discuss later. Because all messages for a particular user will go to the same stream, it’s possible to maintain your ordering guarantee within that user. However, across multiple users, for which the ordering doesn’t matter, you take advantage of the router to allow scaling. You have introduced nondeterminism into the system, but at a level at which it doesn’t violate the system requirements.

With this new structure, you can now process more messages concurrently. Even though each stream is capable of handling only three concurrent messages due to the limit of three stages, you can now process more of those streams at the same time. In Figure 5-3, you can now potentially be processing up to nine messages at the same time (three streams multiplied by three stages). And all of this without sacrificing the ordering guarantees that the application might require.

Another benefit to using routers is that within a single stream there can be stages of that stream in which you need to perform multiple tasks, but the order of those tasks doesn’t matter. For example, if you need to check the availability for a person across a longer time period, you might want to check each date individually and aggregate the results. The order in which this happens isn’t really relevant. It would be fine to process each day concurrently. Here is an area in which you use something like a router to “branch out” and perform checks. This in turn can improve your end-to-end latency because some of the operations are running concurrently, so the total time to complete is reduced.

Routers are a critical piece of the puzzle. Whether you are using Akka routers or simply implementing your own actors that route messages to other actors, you need them, not only to accommodate scaling out, but also to allow us to scale up. Routers make it possible to use system resources more fully, and more importantly, to reduce bottlenecks that might occur. They are a tool with which we can improve both throughput and latency, and as such they are critical to building highly scalable systems.

Mailboxes

During our discussion about streams and routers, we have been ignoring a glaring issue: the slow consumer. A slow consumer occurs when we are able to produce more messages in one part of the system than can be processed in another part. The natural result is that the mailbox for the consumer grows in size over time—in the worst case, the system begins to experience memory issues if we are not using a queuing mechanism that can page out to disk as needed.

A temporary, small surge of message volume from a producer that results in a growing queue for a short period is not necessarily a problem. The problem comes when this is a normal situation, and the queue is growing faster than it is shrinking most of the time, or when the surge is large enough to overrun the available memory.

At its heart, this is a design problem, not a deployment or configuration issue. You can’t solve it by tuning, and it probably shouldn’t be solved by just having a higher capacity queue between the two parts. There are patterns that can help us here, but before we discuss them, let’s first look at the details of the mailboxes and how they might help.

Mailboxes can generally be placed into one of two categories: bounded mailboxes and unbounded mailboxes. The default is an unbounded mailbox, so we will discuss it first.

Unbounded Mailboxes

Unbounded mailboxes are the default in Akka. When used properly, an unbounded mailbox is sufficient for most use cases. If you use them improperly, however, they can lead to out-of-memory issues. If the actor can’t process messages fast enough, you can end up pushing messages into the mailbox until the system runs out of memory.

This out-of-memory error is problematic. When it happens, it isn’t just going to kill that actor, it is going to bring down the entire application. Obviously, this is not ideal. Application designers have developed a fear of the unbounded mailbox. Using it is deemed dangerous because of the problems it can cause, and yet it is the default mailbox. How can that be? Why would the default choice for a mailbox be one that could potentially bring down the entire system? Shouldn’t we instead choose a mailbox type that prevents this issue?

The reality is that the mailbox isn’t the problem. If you take a pail to the sink to fill with water and walk away while the water is running, what is going to happen to the pail? It will fill up and overflow. But is that the fault of the pail? Or is it a problem with a system that has no checks in place to prevent the overflow? Wouldn’t it be better if someone or some mechanism were there to shut off the flow in the event that the pail reached its capacity? We will talk about how we can implement systems like this, but first let’s consider an alternative mailbox type.

Bounded Mailboxes

Bounded mailboxes seem like the solution to the memory overflow, and in the right use case, they are. However, just as with unbounded mailboxes, you need to be aware of how they work in order to apply them in the proper situation. With a bounded mailbox, you set a limit on how many messages it can hold. When the number of messages exceeds that limit, the mailbox will perform some action to alleviate the problem. What that action is depends on the type of mailbox, which fall into one of two categories: a blocking bounded mailbox or a nonblocking bounded mailbox.

Blocking bounded mailboxes were present in previous versions of Akka, but they have been been removed. One of the key reasons for their removal is that they were misleading and potentially dangerous. The idea behind a blocking bounded mailbox is that if the mailbox were full, any further inserts to that mailbox would be blocked until the mailbox was cleared. This meant that the thread attempting to do the insert would be blocked and the system would slow down. This in theory would allow the consumer to catch up. There are two problems here. The first is what happens if that actor is remote? There is no communication back and forth between remote actors and local actors about the size of the mailbox. It is therefore impossible to block the sender. The result is that when dealing with remote actors, blocking mailboxes just didn’t work. A blocking bounded mailbox became an unbounded mailbox, and the out-of-memory issue returned.

The second problem is a bit more subtle. Suppose that you use a blocking mailbox to block the thread. Let’s further assume that you have a limit of four threads and all of our actors are running within that thread pool. Now suppose that four actors attempt to send a message to a slow consumer whose mailbox is full. Those four actors are going to have to block. They will block the threads and they will wait. In the meantime, the slow consumer is supposed to catch up. But the slow consumer is operating in the same thread pool—a thread pool that was limited to four threads, all of which are now blocked. This means that there are no threads left for the consumer to operate on and you have now hit a deadlock.

Speaking more generally, the real problem with a bounded blocking mailbox is that it creates a synchronous dependency between actors. This violates the ideals of the Actor Model, which has actors communicating with one another via asynchronous message passing. Creating a blocking bounded mailbox breaks the model. Therefore, blocking bounded mailboxes have been removed.

So what about nonblocking bounded mailboxes? How do they work? For a nonblocking bounded mailbox to work, you need to know what to do when the mailbox overflows. You can’t block and slow down the producer and you can’t stash the overflow because that would suggest the mailbox is unbounded. Thus, when the mailbox overflows, you would need to throw away the messages. There aren’t really any other feasible options. Of course, this results in message loss, but remember that Akka’s delivery guarantee is at most once. Throwing away messages fits within this delivery guarantee. If a stronger guarantee is required, you need to be prepared to resend messages by using tools like AtLeastOnceDelivery, which we’ll discuss in detail later.

Unbounded mailboxes have the flaw that they can overflow their memory. Bounded mailboxes have the flaw that they can lose messages. So what if a situation arises for which the mailbox absolutely cannot overflow and it’s unacceptable to lose messages? How do you handle that?

Of course, the best solution to the slow-consumer problem is to make the consumer faster. If it is possible to horizontally scale the consumer—perhaps by having many instances of the consumer—the slow consumer issue doesn’t happen at all. When this isn’t possible, you must reevaluate the design and the message guarantees; you might be able to adjust the system (by allowing messages to be processed out of order, for instance) to allow the consumer to be scaled.

If you can’t avoid the slow consumer altogether, you must seek alternative patterns to solve the problem.

Work Pulling

One solution to slow consumers is to, in essence, “reverse” the normal responsibilities of the participants, having the consuming component pull messages, rather than having the producer push them.

In this scenario, you have a master actor to manage the messages and a series of worker actors. The master actor doesn’t do any real work; instead, it just delegates to the worker actors. The flow of messages in this case works as follows (see Figure 5-4):

  1. Master receives new work.

  2. Master notifies one or more workers that work is available (WorkAvailable).

  3. A worker, who is available to process the message, notifies the master that it is ready for more work (GetWork).

  4. Master sends work item to the available worker.

Work Pulling Pattern
Figure 5-4. Work Pulling pattern

The advantage to this pattern is that it eliminates the possibility of overflow in the worker’s mailbox. The worker’s mailbox will have only a small number of messages in it at any given time.

The key to implementing this pattern is that the master needs to be capable of slowing down the work flow. This might mean pulling work from a file or database when needed, or it might mean discarding incoming messages. If the master cannot control the work flow in this manner, you might have just moved the problem: rather than the workers overflowing their mailboxes, the master ends up with that problem.

When building a distributed system, the master actor is often implemented as a cluster singleton to ensure that there is only one master at any given time. As long as the master can control the rate of flow, this works well, but if it can’t, it becomes a bottleneck, and you might need to look at alternative options.

A big benefit to this pattern is that it is very easy to scale up or down as the load increases or decreases. As new workers come online, they can notify the master of their availability, and the master can begin distributing work to them. This means that if the master begins to back up due to the workers being overloaded, you can simply scale up the number of workers. In a distributed system, this is especially true because the workers can be brought up on new nodes in a cluster, probably running on different hardware.

In our example scheduling domain, if a project is added to the system, that project needs to be scheduled. We could just push a message directly into the system, but scheduling is a time-consuming process. Consequently, if many projects are scheduled simultaneously, we could run into a mailbox problem. To solve this issue, we can use the Work Pulling pattern. When a project is created, we will write a record to a data store. A project scheduler will then pull those records from the database and send a message to a series of worker actors. Each worker actor will then try to schedule that project. Here’s how it looks:

object ProjectWorker {
  case class ScheduleProject(project: Project)
  case class ProjectScheduled(project: Project)

  def props(projectMaster: ActorRef): Props =
    Props(new ProjectWorker(projectMaster))
}

class ProjectWorker(projectMaster: ActorRef) extends Actor {
  projectMaster ! ProjectMaster.RegisterWorker(self)

  override def receive: Actor.Receive = {
    case ScheduleProject(project) =>
      scheduleProject(project)
      projectMaster ! ProjectScheduled(project)
  }

  private def scheduleProject(project: Project) = {
    // perform project scheduling tasks
  }
}

Here is a simple implementation for our ProjectWorker actor. When this actor comes online, it registers itself with the ProjectMaster by using the RegisterWorker message. If it receives a ScheduleProject message, it will perform the necessary scheduling tasks and then send a ProjectScheduled message back to ProjectMaster. This ProjectScheduled message is the cue to the ProjectMaster that the worker has completed its task and it is ready to accept more work, as demonstrated here:

object ProjectMaster {
  case class ProjectAdded(projectId: ProjectId)
  case class RegisterWorker(worker: ActorRef)
  private case class CheckForWork(worker: ActorRef)

  def props(projectRepository:
     ProjectRepository, pollingInterval: FiniteDuration): Props = {
    Props(new ProjectMaster(projectRepository, pollingInterval))
  }
}

class ProjectMaster(projectRepository:
   ProjectRepository, pollingInterval: FiniteDuration) extends Actor {
  import ProjectMaster._
  import context.dispatcher

  override def receive: Receive = {
    case RegisterWorker(worker) => scheduleNextProject(worker)
    case CheckForWork(worker) => scheduleNextProject(worker)
    case ProjectWorker.ProjectScheduled(project) =>
       scheduleNextProject(sender())
  }

  private def scheduleNextProject(worker: ActorRef) = {
    projectRepository.nextUnscheduledProject() match {
      case Some(project) =>
        worker ! ProjectWorker.ScheduleProject(project)
      case None =>
        context.system.scheduler.scheduleOnce(pollingInterval, self,
           CheckForWork(worker))
        self ! CheckForWork(worker)
    }
  }
}

In this example, the ProjectMaster can receive a few different messages; however, the behavior in all cases is the same. If it receives a RegisterWorker message, it knows this is a new worker. In that case, it will try to find some work for that new worker to do. If it receives a ProjectScheduled message, it knows that a worker has just completed a task. Again, the ProjectMaster will try to find more for the worker to do. In the event that it is unable to find more work, it will schedule a CheckForWork message to itself to try again later. When the ProjectMaster receives this message, it knows that the worker is idle and will again attempt to find more work.

The ProjectMaster only pulls work when a worker is idle so there is no risk that the worker’s mailbox will overflow. At the same time, the messages for the ProjectMaster are triggered only by idle workers. This means that at any given time, there shouldn’t be more messages in the ProjectMaster’s mailbox than there are workers. So again, there is no risk of mailbox overflow.

A related pattern to work-pulling is to use an external queuing system such as Apache Kafka or RabbitMQ as a means of distributing work to the workers. In this way, you get the safety of a persistent queue or topic, but preserve parallelism. It is possible to write such a queue in Akka directly, but it is frequently more convenient to use an existing tool.

Back Pressure

An alternative to work-pulling is to provide a means for the consumer to provide back pressure. Much like the analogy of a pipe with water flowing in it, the data “fills the pipe,” and the system exerts pressure back on the sender to slow down the flow, as illustrated in Figure 5-5.

Back pressure
Figure 5-5. Back pressure

This can take the form of a special message that the consumer emits when it is reaching a maximum rate of processing, or it can be supplied by the transport mechanism, as we will see soon.

In either case, the message has the same effect: notify the producer to slow down its rate of production so as not to overwhelm the queue and the consumer.

There are several approaches to back pressure.

Acks

A simple means of producing back pressure is to have the consumer send back to the producer an acknowledgment, usually abbreviated as it is in networking to an “ack.” This ack could be identified; for example, it could specify which message was received, or it could simply say “OK, I’ve processed one, you can send one more.” Of course, the same technique can be applied for a size greater than one—the consumer, for instance, could confirm processing every hundred or every thousand messages—the principle is the same.

High-Water Marks

Instead of acknowledging messages, singly or in groups, another option is to do the reverse: send a message when the producer should slow down or stop, in response to some value exceeding a so-called high-water mark.

Instead of saying, “I’ve done X messages, send some more,” the high-water mark technique says the reverse: “Things are getting hot, slow down,” is the meaning of the message in this case.

You should take care to use a different channel to send the control messages, of course, because a backlog on that channel can defeat the operation of the control message, leading to obvious issues.

Queue-Size Monitoring

Tracking the size of the queue between the producer and the consumer is another option; in fact, this can be the high-water mark that is used in the previous technique.

Caution is warranted, however, because it’s possible to place additional load on the system by monitoring the queue, given that this is a performance-critical area, especially if the producer and consumer are not singular components—there could be a great many producers and consumers, and they might be separated via a network.

Rate Monitoring

If you have a previously benchmarked consumer, you might be able to find the approximate rate at which messages can be consumed and control the flow entirely from the producer end by using this information.

This technique says, “If the consumer can handle X messages in Y time, the producer must send no more than X messages every Y period.” Assuming that all messages take close to the same amount of time to process, this can be a simple technique that requires no direct communication between the producer and consumer.

Like all time-based operations, however, it is subject to variance over time, depending on the load on the overall system, so it is not reliable when conditions change. Suppose that you begin running some new process on the same node as your consumer; suddenly you’re not consuming at the same rate as you were before, so the producer will begin sending faster than can be handled.

Of course, it’s possible to build a self-tuning system wherein the consumer actually sends a message periodically to the producer, updating its rate information from time to time. This is a bit more complex, but very flexible, and still requires comparatively little communication between the consumer and producer.

The problem with all of these techniques—whether it’s work-pulling, back pressure, rate monitoring, or something else—is that they all require extra effort on your part. None of them are built in to the system. When building pure actor-based systems, you need these techniques in order to prevent memory problems, but it would be nice if there were something built in to help you.

Akka Streams

You have seen how streams can improve message throughput and how routers can improve both throughput and latency depending on how they are used. You have also seen how mailboxes can overflow and how you can fix that by using techniques like back pressure. The evolution of these ideas takes the form of Akka Streams. Akka Streams are Akka’s implementation of Reactive Streams. They take the concepts of streams, routers, back pressure, and more and roll it all into a single consistent domain-specific language (DSL) that allows you to write type-safe streams, including junction points, that support back pressure all the way through. They make it possible to build complex flows of data without having fear of overrunning the actor’s mailbox. At the same time, they provide all the same advantages that we got from creating actor streams with routers as branch points.

Akka Streams are built on a few basic concepts. These include sources, sinks, flows, and junctions. Each of these has a role to play when building up a stream of data. We will explore them each in detail so that you can fully understand how they address the problems we face.

To help visualize the role that each of these pieces play, it is helpful to imagine an Akka Stream as a flow of water through a set of pipes. As we explore each concept, we will see how this works.

Source

A source in Akka Streams represents a source of data. This is the origin point of your stream, whether it is a file, a database, or some other input to the system. When back pressure needs to be applied and you need to slow down the system, this is where it will need to occur. There are different ways in which you can slow down the flow depending on what the source of the data is. It might mean slowing down the rate you pull from a file, it might mean buffering data to disk, or it might mean dropping data. It all depends on the nature of the source and what level of control you have over its speed.

Continuing with the plumbing analogy, the source is the origin of the water. Whether it’s coming from a river, a lake, or a reservoir, at some point, you need to draw water from that source and pump it through the pipes. Imagine then that there is a giant lake from which you want to draw water. You put a pump in the lake and begin connecting pipes (Figure 5-6). The pump is the source of the water. We can speed up or slow down the rate that water flows through the system by adjusting the flow rate on the pump.

Plumbing source
Figure 5-6. Plumbing source

There are many different ways in which you can build a source. A source can be constructed by using a simple iterator like the following:

val positiveIntegers = Source.fromIterator(Iterator.from(1))

A simple case like this is not always possible. Sometimes, you need something more complicated. There are a variety of different techniques available to implement custom sources using graph stages. Let’s look at one here:

class RandomIntegers extends GraphStage[SourceShape[Int]] {
  private val out: Outlet[Int] = Outlet("NumbersSource")
  override val shape: SourceShape[Int] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          push(out, Random.nextInt())
        }
      })
    }
  }
}

A source, GraphStage, is going to consist of one outlet. This outlet is the output from the source. It connects to other elements in the stream. If the source is a pump, the outlet is the connector that attaches to the pipes. This outlet is passed to a shape—in this case, a SourceShape. There are other shapes that you could use depending on the circumstances. A SourceShape is a simple shape that has just one outlet, as illustrated in Figure 5-7, but if necessary you can create shapes with multiple outlets.

Plumbing
Figure 5-7. Plumbing

From here, you apply GraphStageLogic. The GraphStageLogic has an OutHandler with a corresponding onPull method. The onPull method will determine what action to take when data is requested from the stream. In this case, you have implemented something very simple (a random number), but this can become as complex as you need it to be. It can read from a file, draw from a buffer, or whatever you need it to do.

One key to implementing this is that any mutable state must be contained within the GraphStageLogic. The GraphStage is just a template for the logic. It is reusable and can potentially be used in multiple streams. In all likelihood, you don’t want to share state across those streams. The GraphStageLogic, on the other hand, will be created each time a stream is created. This means that each time you connect a source to a sink, you will get a new instance of the GraphStageLogic.

Sink

A sink in Akka Streams represents the endpoint of the stream. At some point all streams need to end. Without an endpoint, streams don’t really have a purpose. Whether we are writing to a database, producing a file, or displaying something in a user interface, the goal of the stream is to produce this output. In fact, for a stream to be complete it needs only two pieces: the source and the sink.

Carrying on with our plumbing analogy, you can imagine that the pump, which is drawing water from the lake, is going to push that water into the pipes so that when the user in the house opens the faucet, water will flow into a sink, as depicted in Figure 5-8. This is the end destination for the water—its endpoint. Of course, it’s unreasonable to assume that the sink in the house is connected directly to the pump in the lake without anything between them. Plumbing in the real world doesn’t work that way.

Plumbing sink
Figure 5-8. Plumbing sink

As with a source, there are multiple ways to create a sink. There are very simple examples, like the following:

val printString = Sink.foreach[Any](println)

This creates a sink that will simply call println on any items coming into it. You also can use a fold method:

val sumIntegers = Sink.fold[Int, Int](0) { case (sum, value) => sum + value }

The fold method is going to take each element and fold it into some result—in this case, a sum. The result of the fold will be a Future[Int], which will resolve to the sum, assuming that the stream eventually terminates.

And, like with a source, sometimes these simple methods are too simple and you need something more complex. In this case, you can fall back to the Graph DSL:

class Printer extends GraphStage[SinkShape[Int]] {
  private val in: Inlet[Int] = Inlet("NumberSink")
  override val shape: SinkShape[Int] = SinkShape(in)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      override def preStart(): Unit = {
        pull(in)
      }

      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          println(grab(in))
          pull(in)
        }
      })
    }
  }
}

This code implements a custom SinkShape. It contains a single Inlet. Similar to the source example, this inlet represents the input to the stage. It will be connected to the upstream elements. This example uses a SinkShape, which is a shape with just a single inlet (Figure 5-9), but again there are more complex shapes that you can use that provide multiple inlets.

A sink
Figure 5-9. A sink

The logic here is very simple: an InHandler and its corresponding onPush method. The onPush method is called when data is pushed into the stream. To access that data, you can call grab, which returns the next element. And when you’re ready to request more data, you can call pull. This signals to the upstream elements that you are ready to accept more data.

To initiate the flow of data, you need to indicate demand. You do that here by overriding the preStart method and calling pull.

RunnableGraph

A RunnableGraph is the result of connecting a source and a sink together. These two things give you the minimal complete system. After you have a RunnableGraph, the data can begin moving. Until you have both, you can’t do anything. Either you lack a source for the data or you lack a destination. One way or the other, your system is incomplete.

For a graph to be complete, each outlet must be connected to exactly one inlet, as demonstrated in Figure 5-10. If any inlets or outlets remain unconnected, the graph is incomplete and cannot function.

Runnable Graph
Figure 5-10. RunnableGraph

From the plumbing perspective, a RunnableGraph occurs when you connect your water source to your sink (Figure 5-11). Now the water can flow freely from one place to the other. Until you make that connection, you are either pumping water and spraying it out the end of the pump with no real destination, or you have a sink and no water to put in it.

Plumbing Graph
Figure 5-11. Plumbing graph

Flow

As already mentioned, simply connecting your source and your sink is enough to create a runnable flow but it’s not really that useful. You are basically just taking the data from one place and putting it into another place without any modification or change. Sometimes, this is desirable, but more commonly you are going to want to perform an operation on that data. You’ll want to transform it in some way. This is the purpose of a flow. A flow is a “connector” in which you can transform the data (Figure 5-12). There are many different kinds of flows. Some might modify the data coming from the Source. Others might reduce the data by filtering it or by only taking a portion of it. Flows are connected between the source and the sink. They take your basic RunnableFlow and enhance it, giving it more functionality.

Flow
Figure 5-12. Flow

Referring back to our plumbing analogy, a flow is your pipes. You have a source and you have a sink, but it is unrealistic to think that you can simply attach the two with a single length of pipe. Typically, you need to transport the water over some distance. As Figure 5-13 demonstrates, you probably need to change directions a few times. The pipes might become narrower or wider in order to change the pressure. All of these things happen using pipes of various shapes and sizes. Each of these pipes represents a flow.

Plumbing flow
Figure 5-13. Plumbing flow

Junctions

As your data moves through the system, you might find it necessary to branch out and send portions of it down different paths. Or, it might be necessary to take data from multiple sources and combine it in some way. This is accomplished by using a junction. A junction is basically a branch point. It can be either a fan-in or a fan-out. You can either take a single flow and branch it into many, or combine many flows into one, as shown in Figure 5-14. When you begin using junctions, you are no longer dealing with a simple stream: you are dealing with a graph.

Junctions are created as graph elements with multiple inlets or multiple outlets.

A junction
Figure 5-14. A junction

In the plumbing analogy, junctions are represented by things like T-joints and manifolds. They take the water flow coming from the source and branch it to go to multiple different houses, each with its own sink. Or, they might take hot water and cold water and combine them to give us warm water.

Akka Streams give us the means to use combinations of sources, sinks, flows, and junctions to create complex systems called graphs, such as the one depicted in Figure 5-15.

Complex Graph
Figure 5-15. A complex graph

Back Pressure in Akka Streams

Akka Streams is built on the concept of back pressure. In this case, the back pressure is implemented by using a push–pull mechanism. When a source and a sink are connected to form a runnable flow, a message is sent from the sink back through the various flows to the source. The message is basically saying that the sink can accept more data. When the source receives this message, it knows that it can send more data to the sink. When data becomes available, the source will pass it through the flows into the sink.

The source is allowed to send only as much data as the sink has requested. If the source receives data and the sink has not requested any, the source must decide what to do with it. It will need to either stash it somewhere or discard it. This might mean that the source simply doesn’t draw any more data from the database or file, or it may mean that it must stash or drop a request. This sounds a bit like we have simply moved our overflow problem, not solved it; instead of the overflow happening at the end of the pipe, we have just pushed it up to the beginning of the pipe.

Initially that sounds bad, but when you think about it, that’s the best case. At the beginning of the pipe, we are likely far more able to control the data flow and recover from any issues. In most cases, if your system is overloaded, it is likely going to be preferable to block a single user, rather than accepting all requests and then crashing the entire system when it runs out of memory. In the former case, a single user is affected in a very controllable fashion. In the latter case, all users might be affected, and how they are affected could be very nondeterministic.

At the end of the day, with a limited amount of hardware, it is impossible to continue to accept all data no matter how fast it is pushed into the system. At some point, you need to put pressure back on the input to slow down. This is what Akka Streams enables. It forces that pressure back to the entry point into the system. It provides the necessary mechanisms so that when the system experiences excessive load, you can communicate the need to slow down back to the source, which is the only place where we are truly equipped to deal with it.

Using Akka Streams

So how do Akka Streams apply in the real world? Where can we use them? What do they allow us to do? Let’s have a look at the scheduling domain for a more concrete example.

Imagine a situation in which someone in the business wants to do some analysis of the data in the system. To do this analysis, she needs to get access to the schedules for a number of different users. She would like those schedules in a JSON format so that the schedules can be fed into another application that will actually perform the analysis. This is a fairly standard Extract, Transform, and Load (ETL) process. We extract the data from one data source, transform it, and then load it into another data source. It’s also an excellent candidate for Akka Streams.

Now, in our use case, the person coming with this request has only the email addresses of the people whose data she wants. She doesn’t have the actual IDs. Those will need to be looked up. This can be done by using a PersonRepository, which looks like this:

trait PersonRepository {
  def findByEmail(emailAddress: EmailAddress): Future[Person]
}

This code is going to live in a separate bounded context from some of our other services. So the PersonApi is going to need to communicate with those other bounded contexts in some manner. This is a scenario for which we might want to use a clustered actor system, or, alternatively, we might choose to use something like Akka Http to expose this data through a REST API. In either case, the PersonRepository trait acts as an insulating layer that hides the exact implementation details.

After we have a Person, we can obtain the PersonId, and we can then use that to find the Schedule details. For this, we will make use of a ScheduleRepository, which looks like this:

trait ScheduleRepository {
  def find(personId: PersonId): Future[Schedule]
}

Again, as with the PersonRepository, the ScheduleRepository is communicating with a separate bounded context. That context is likely not the same as the one with which the PersonRepository was communicating. After all, scheduling probably has no need to know about things like email addresses. However, for the report that we are building, we need to have access to information from both contexts. Generating this report is going to involve pulling the data from the necessary sources and then transforming it into the required JSON format.

The input data is going to be just a series of email address strings. However, our trait uses a case class that represents an email address, so we will need to perform a transformation. We can create a flow that will do that for us:

val toEmailAddress = Flow[String].map(str => EmailAddress(str))

After we have the EmailAddress, we’ll need to do a lookup on the PersonRepository to find the person associated with that email. We can use another flow for this:

val findPerson = Flow[EmailAddress].mapAsync(parallelism)(email =>
   personRepository.findByEmail(email))

In this case, because the PersonRepository returns a future, the flow can’t be a simple map operation. Instead, we will use the mapAsync operation, which will operate over a future.

After we have a person, we will need to obtain the schedule for that person:

val findSchedule = Flow[Person].mapAsync(parallelism)(person =>
  scheduleRepository.find(person.id))

And finally, after we have the Schedule, we need to do one final operation that will serialize the schedule to JSON format. This might look like the following:

val toJson = Flow[Schedule].map(schedule => serializer.toJson(schedule))

When all of the pieces are in place, we can stitch it all together into our stream:

Source(emailStrings)
  .via(toEmailAddress)
  .via(findPerson)
  .via(findSchedule)
  .via(toJson)
  .runForeach(json => println(json))

In this simple example, we are just printing the JSON, but instead of printing, we might send that JSON to another API, or write it to a file, or save it to a database.

Running this will pull our email address strings and run them through the full stream, finally sending them on to their destination. This will all be done in a way that supports back pressure so that we don’t overload any aspect of the system. If our findSchedule flow happens to be a bottleneck, we won’t allow it to overflow because the back pressure will prevent that. At the same time, we can take advantage of the asynchronous nature of this system. While the findSchedule method is waiting to return, we can continue to process data further upstream or further downstream. We can continue to perform the faster findPerson lookups so that when we are ready to do the next findSchedule, there is already data waiting. We can also continue to perform the toJson operations as long as we have data for them.

At some point, if the findSchedule process is slow enough, something is going to end up waiting. The toJson process will run out of data to process or the back pressure will force us to stop consuming our emailStrings. This is the nature of slow operations. But until that happens, we will make the most of the resources we have available.

Conclusion

Now that you have seen how well-formed actors can be integrated into a system that supports a good flow of data, while maintaining back pressure and reliability, in Chapter 6 we will dig a bit deeper into the specifics of maintaining the right level of data consistency while not impacting the ability to scale.

This is a delicate balance, and relies heavily on the correct construction of actors in the model and a proper integration of those actors, as we have outlined.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset