Chapter 6: Dispatchers

Now as we turn to the matter of dispatchers, things might get slightly more confusing. It's perhaps best to think of dispatchers as embodying two fundamental concepts: management of the actor mailbox and the threading strategy used to allow actors to do actual work. We'll discuss both of these in a bit more depth as they are the focus of the primary differences between various dispatcher choices.

Let's look at the mailbox choices first, as they are the simpler part of this picture. There are primarily two important concerns that make up the selections available. That is, you have the choice of whether to assign a distinct mailbox for each actor or to share one for all. Further, you can choose whether the mailboxes should be of unlimited capacity or if they should be bounded to some fixed limit. The default dispatcher gives each actor its own unbounded mailbox. You also have the option of implementing a priority-based mailbox, which requires mapping messages to a numeric priority.

The task management piece is closely related to (and uses, behind the scenes) Java's java.util.concurrent.ExecutorService facility. The purpose is to abstract out how threads are managed and how actors are given time and resources in the form of these threads in which to perform their tasks.

The default dispatcher uses the relatively new Fork/Join framework developed by Doug Lea, one of the key masterminds behind Java's java.util.concurrent package. This framework is included with the Java 7 release, but Scala includes a v copy of it, which means you can still use it with the Java 6 JVM. The Fork/Join framework is well suited for actor-based workflows because of the work-stealing approach it uses to keep all threads in the pool busy when possible. This approach tends to work well for systems that spawn lots of small tasks that need to be executed concurrently.

Configuring and using dispatchers

Let's look at how you might typically configure and use the default dispatcher before we dive into other dispatcher choices. The configuration of the default has a number of options (all custom dispatchers do, as well), but we'll focus on a few key points:

 akka.actor {

default-dispatcher {
# We'll look at other possible values for this setting later
type = Dispatcher
# This is the default already, but you also have the option of
# using "thread-pool-executor", which corresponds to a
# java.util.concurrent.ThreadPoolExecutor instance
executor = "fork-join-executor"

# Throughput defines the "fairness" of the executor. This works by
# setting the number of messages that the dispatcher will allow to
# be processed before the thread is returned to the pool
throughput = 10

# Since we defined this type of executor, we can use this section
# to customize its behavior
fork-join-executor {

# The minimum number of threads that should be allocated
parallelism-min = 4

# The maximum number of threads that should be allocated
parallelism-max = 32

# The executor will use this along with the number of available
# CPU cores to select how many threads to allocate within the
# bounds given by the previous settings. The formula used is
# ceil(cores * factor). For example, if you have 8 cores and
# the following 4.0 setting, the result will be 40, but since
# we set an upper bound of 32, only 32 threads will be
# allocated
parallelism-factor = 4.0

}
}
}

Let's talk for a moment about how these settings should be used. This is an area that can take a lot of effort to get right, so understanding some basic guidelines will be helpful. The first couple of configuration settings are fairly straightforward, though the executor setting can be set to a few different values. The fork-join-executor value is shown, and thread-pool-executor, which I'll discuss momentarily, is a fully qualified classname (FQCN) for a custom implementation of the akka.dispatch.ExecutorServiceConfigurator abstract class (again, more on this shortly). I'll discuss throughput in a moment. The parallelism-min and parallelism-max values are simple bounds that are used in combination with the parallelism-factor setting to  determine how many threads the executor is allowed to allocate. You will need to evaluate the nature of the work being performed by the actors (and futures, as we'll see in chapter 8) on this executor to determine how best to adjust these settings. For instance, if you are performing a lot of CPU-bound work, it doesn't make a lot of sense to allow the executor to use more threads than you have available CPU cores. On the other hand, if your actors are spending a lot of time waiting, whether for IO operations, other actors within your system, or whatever, then allocating sufficient threads to allow them enough processor time to check whether they have messages ready for them is important.

The throughput setting is very useful — it allows you to tell the dispatcher how many messages to drain from a given actors mailbox before it returns the thread being used to the pool for another actor to use. A setting of 1 here allows for maximum fairness, but also means that there may be a lot of context switching going on as the executor schedules your actors. Setting this too high, though, can leave some of your actors waiting to be able to drain messages from their mailboxes. In general, it doesn't make sense to set this value very high, unless you have a very limited number of actors running on this dispatcher.

It's perhaps not obvious, but now that we've configured the above settings in akka.actor.default-dispatcher, we don't actually have to modify any code to use it. Any actor for which we don't configure a specific dispatcher will use these settings — that's why it's called the default-dispatcher, of course.

To configure a custom dispatcher, you simply need to add a section defining the settings for that dispatcher and give it a suitable name. Here's a minimal configuration for a dispatcher that uses Java's java.util.concurrent.ThreadPoolExecutor (Akka provides a convenient shortcut with the thread-pool-executor name for this dispatcher and a corresponding implementation of an ExecutorServiceConfigurator for it).

 my-thread-pool-dispatcher {

type = Dispatcher

executor = "thread-pool-executor"

thread-pool-executor {

core-pool-size-min = 4
core-pool-size-max = 32
core-pool-size-factor = 2.0

max-pool-size-min = 8
max-pool-size-max = 84
max-pool-size-factor = 3.0

keep-alive-time = 30ms

task-queue-type = "linked"
}
}

These settings are essentially mirrors of the settings definied for the underlying ThreadPoolExecutor interface (see the Javadocs for a lot more detail). In case you're not familiar with this interface, the first thing we need to cover are the core-pool-size and the max-pool-size settings. The core pool is essentially the pool of threads that this executor tries to keep available for work, while the maximum pool settings define an outer bound for the number of threads available. If a new task is submitted to the pool and there are fewer threads than the core pool size, an additional thread will be created even if there are other idle threads in the core pool as long as the maximum pool size is not exceeded. The min, max and factor settings for each pool provide a way to dynamically size these pools based on the number of CPU cores available. That is, the number of cores will be multiplied by the factor and constrained within the bounds of the minimum and maximum sizes specified with the result being used to size the pools appropriately. As an example, suppose you have set the pool minimum size to 4, maximum size to 32 and a factor of 3.0. With 4 CPU cores this will result in 12 threads being placed in the pool. But with 16 CPU cores, you would get 32 threads due to the maximum size being less than 16 * 3.0. Finally, for pool size related settings, the keep-alive-time shown sets the time that the pool will wait before shutting down additional, non-core threads in the pool.

The task-queue-size and task-queue-type settings are used to define the type of queue used to hold incoming tasks that are waiting for an available thread from the pool. Akka provides two implementations for use here: linked which uses a LinkedBlockingQueue and array which uses an ArrayBlockingQueue. The LinkedBlockingQueue is, by default, an unbounded queue that can grow as needed and, as such, it allows tasks to wait until a core pool thread is available before allocating a thread for a task, so no more than core-pool-size-max threads will ever be used in this case. You can also specify an optional task-queue-size setting for LinkedBlockingQueue to limit the number of tasks that can be submitted when the queue is waiting on threads to be available. Tasks that are submitted when the queue is at capacity will be rejected. The ArrayBlockingQueue is a fixed-size, bounded queue, so it also requires the task-queue-size setting.

You would use the dispatcher just defined using the following code:

 import akka.actor.Props

val anActor =
context.actorOf(Props[MyActor].withDispatcher("my-thread-pool-dispatcher"))

Note that this configuration can be defined anywhere in your configuration. In the case of my-thread-pool-dispatcher, we're assuming that it's a top-level configuration entry. If you instead defined it at myapp.dispatchers.my-thread-pool-dispatcher, the code would need to reference that full patch to access it:

 context.actorOf(Props[MyActor]
.withDispatcher("myapp.dispatchers.my-thread-pool-dispatcher"))

You don't need to know what all the values shown above mean unless you find yourself in need of an alternative to the Fork/Join-based dispatcher. If that's the case, I highly recommend reading Java Concurrency in Practice to get a solid understanding of the different executors offered by the Java concurrency libraries. In general, you will usually want to stick with the default, Fork/Join-based dispatcher, as it provides very good performance characteristics to most cleanly designed non-blocking actor usages.

You should also be aware that these dispatchers can be used as an ExecutionContext since they implement that interface. This means you can use them for executing futures by creating an implicit value that refers to them.

 implicit val myThreadPoolDispatcher = 
system.dispatchers.lookup("myapp.dispatchers.my-thread-pool-dispatcher"))

Provided dispatchers

Akka provides four standard dispatchers for your use: Dispatcher, PinnedDispatcher, BalancingDispatcher, and CallingThreadDispatcher. We've already implicity covered the default Dispatcher, but to complete the picture, we should describe its other characteristics. As we've already seen, you can use and of fork-join-executor, thread-pool-executor or a FQCN pointing to an implementation of akka.dispatcher.ExecutorServiceConfigurator to determine how tasks submitted will be executed on this dispatcher. The dispatcher will allocate a mailbox so that each actor on this dispatcher has a unique mailbox.

The PinnedDispatcher is a special dispatcher that actually allocates a single thread for each actor assigned to it. Said another way, this means that for every actor you assign to use this dispatcher, that actor will always be guaranteed to have a full thread available for use. If you're asking yourself why you would want to do this, think about cases where you had a resource or set of resources that need to be given priority over most other parts of the system. The intent of this dispatcher is to attempt to assure that that these actors will always have some CPU time available, but it's impossible to really guarantee this, of course. Also keep in mind that you don't want to overuse this pattern. In fact, use it very sparingly. You can configure a dispatcher to be of type PinnedDispatcher with the following configuration:

 my-pinned-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher
}

Another dispatcher you might hear about is BalancingDispatcher. This is what you might be tempted to call a work-stealing dispatcher, though that term is not technically accurate. With this dispatcher, all actors using it will share a single mailbox and the dispatcher will give messages in the mailbox to idle actors as they become available. So it's not really work-stealing in typical sense, but it does attempt to distribute the load fairly across a set of actors.

One caveat to be aware of with this dispatcher is that it's intended to be used with actors that are all of the same type. There's nothing that will prevent you from attempting to assign it to different actor types, but you will want to make sure you understand what you're doing if you ever try to do so -- at the least, each actor type should assume it will be receiving the same set of possible messages.

Using this dispatcher in your configuration is similar to the PinnedDispatcher. You simply assign it as the type, but you probably want to use the fork-join-executor here:

 my-balancing-dispatcher {
# this is not strictly necessary, as it is the default
executor = "fork-join-executor"
type = BalancingDispatcher
}

Mailboxes

The subject of mailboxes has been discussed already, but I haven't really devoted much time to them. As described earlier, mailboxes can be either bounded or unbounded. In the case of bounded mailboxes, you should also be aware that they are typically blocking. That is, the act of adding a message to a bounded will block for some period of time if the mailbox is currently full. The timeout for this is set in the configuration, as is the size and type of the mailbox. Also, it is certainly possible to implement a non-blocking, bounded mailbox. 

Here are a couple quick configuration examples showing you basic unbounded and bounded configurations.

A typical unbounded mailbox

 my-unbounded-mailbox-dispatcher {
mailbox-capacity = -1
}

Here I am just asking for the most plain vanilla unbounded, nonblocking mailbox.

A mailbox with a 100 message limit

 my-bounded-mailbox-dispatcher {
mailbox-capacity = 100
mailbox-push-timeout-time = 0
}

And here I create a simple bounded, blocking mailbox. It will accept up to 100 messages and then block indefinitely when new messages are enqueued.

We can also create PriorityMailbox instances, though this requires a bit of custom code. The idea with these mailbox types is that each incoming message is given a weighted priority that is used to determine in what order the messages in the mailbox are processed. Assuming I wanted to create a mailbox that would handle AddBookmark messages and prioritize them based on the contents of their URL, the following approach would be one possible solution:

 package akkaguide

import akka.dispatch.{PriorityGenerator, UnboundedPriorityMailbox}
import akka.actor.ActorSystem
import com.typesafe.config.Config

object BookmarkPriorityQueueMailbox {
import akkaguide.BookmarkStore.AddBookmark

val priorityGenerator = PriorityGenerator {
case AddBookmark(title, url) if url.contains("typesafe.com") => 0
case AddBookmark(title, url) if url.contains("oracle.com") => 2
case _ => 1
}
}

class BookmarkPriorityQueueMailbox(settings: ActorSystem.Settings, config: Config)
extends UnboundedPriorityMailbox(BookmarkPriorityQueueMailbox.priorityGenerator)

As you can see, this relies on creating a PriorityGenerator instance that ranks the messages as integer values. The lower the value, the higher priority the message is given. In this example, URLs containing the value "typesafe.com" are given the highest priority, while those containing "oracle.com" are given the lowest. I assume you can see where my prejudices lay. To use this dispatched, you can simply specify the fully qualified class name of the new mailbox class in the mailbox-type setting of the dispatcher configuration:

 boundedBookmarkDispatcher {
type = Dispatcher
executor = "fork-join-dispatcher"
mailbox-type = "akkaguide.UnboundedBookmarkPriorityQueueMailbox"
}

Now that we've defined this, we can modify the code that starts the BookmarkStoreGuardian we created previously so that it uses this dispatcher to handle all messages that pass through it:

 val bookmarkStoreGuardian =
system.actorOf(Props(new BookmarkStoreGuardian(bookmarkStore))
.withDispatcher("boundedBookmarkDispatcher"))

Other mailboxes

You can also create your own custom mailbox types and while this might seem quite appealing at first, I would advise against it unless you have a very strong reason and know for certain that you really understand well the semantics behind the enqueueing and dequeueing used by whatever container you choose to use for your messages. Creating a custom mailbox itself is easy, but getting it right is another matter, and the mailboxes supplied with Akka are most often all you need. 

Finally, you will also likely run across a reference to durable mailboxes, of which there were historically a number of implementations supplied with Akka. For reasons of maintainability, these were all removed from the distribution with the exception of one that is backed by a journaled transaction log on the filesystem. It's easy enough to use this mailbox with the following configuration:

 journaled-file-dispatcher {
mailbox-type = akka.actor.mailbox.filebased.FileBasedMailboxType
}

Using this mailbox type is just like any other mailbox (though, it provides a wealth of configuration parameters), with the exception that the messages will still be retained if the virtual machine running Akka dies for whatever reason. This can be very handy for protecting yourself against failure, particularly when isolating difference resources (local or remote) within your system, such as a remote webservice or an incoming data feed. There are additional third-party durable mailboxes available, including one based on AMQP, which you can find online.

Wrap-up

This whirlwind tour of dispatchers has hopefully given you an idea of the flexibility Akka gives you for creating robust configurations that can handle very different types of workflow, depending upon your needs. There are a huge range of choices available to you, so you might feel overwhelmed, but it's generally best to start with the minimum you need to get things working. Then, through watching the performance and profiling under real workloads, you can get a better sense of where to apply these different tools and understand how they might impact your overall system.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset