As described in the first part of this chapter, the Future
API provides an elegant way to write concurrent programs. As it is considered a bad practice to block on a Future
, it is not unusual to see Future
being widely used across an entire code base. However, it is unlikely that your system is only composed of your own code. Most real-world applications leverage existing libraries and third-party software to avoid re-implementing existing solutions to some common problems (such as data encoding and decoding, communication over HTTP, database drivers, and so on). Unfortunately, not all libraries use the future API, and it may become a challenge to gracefully integrate them into your system. In this section, we will examine some common pitfalls that you may encounter and mention possible workarounds.
While working on the backtester, you noticed that one module of the code is used to load some historical buy orders from a relational database. Since you started rewriting the application to take advantage of Future
, the module API is fully asynchronous:
def findBuyOrders( client: ClientId, ticker: Ticker)(implicit ec: ExecutionContext): Future[List[Order]] = ???
However, after profiling the application, you noticed that this part of the code performs quite poorly. You attempted to increase the database connection count, first doubling it, then tripling it, both without success. Attempting to understand the cause of the problem, you look at all the locations where the method is called, and you noticed the following pattern:
import scala.concurrent.ExecutionContext.Implicits.global findBuyOrders(clientId, tickerFoo)
All the callers are importing the global ExecutionContext
to be implicitly used by the method. The default thread pool is backed by a ForkJoinPool
, and it is sized based on the available cores on the machine. As such, it is CPU-bound and designed to handle nonblocking, CPU intensive operations. This is a good choice for applications that do not perform blocking calls. However, if your application runs blocking calls asynchronously (that is, in a Future
execution), relying on the default ExecutionContext
will most likely quickly degrade performance.
Before going further, we want to clarify some of the terms used in this section. Nonblocking can be a confusing term in the context of concurrency. When using Future
, we perform asynchronous operations, meaning that we start a computation so it can proceed with the flow of the program. The computation is executed in the background and will eventually yield a result. This behavior is sometimes called nonblocking, meaning that the API call returns immediately. However, blocking and nonblocking most often refer to I/O operations and how they are performed, especially how the thread that is performing the operation is used. For example, writing a sequence of bytes to a local file can be a blocking operation because the thread calling write
will have to wait (block) until the I/O operation is completed. When using nonblocking constructs, such as the ones provided in the java.nio
package, it is possible to perform I/O operations that will be executed without blocking a thread.
It is possible to implement an API with a combination of the following behaviors:
API characteristics |
Returns |
Blocks a thread? |
Synchronous/blocking |
At the end of the computation |
Yes, the calling thread executes the operation |
Asynchronous/blocking |
Immediately |
Yes, this blocks a thread from a dedicated pool |
Asynchronous/nonblocking |
Immediately |
No, the thread is freed-up while the blocking operation is performed |
Clearly, our problem is that we are using the ExecutionContext
global to perform blocking calls. We are querying a relational database, and most JDBC drivers are implemented to perform blocking calls. The pooled threads call the driver and block while waiting for the query and the response to travel over the network, making them unusable by other computations. An option is to create a dedicated ExecutionContext
to execute the Future
, including blocking operations. This ExecutionContext
is sized with more threads in the anticipation that they will be blocked when performing their computation:
val context = ExecutionContext.fromExecutorService( Executors.newFixedThreadPool(20) ) findBuyOrders(clientId, tickerFoo)(context)
The first benefit is that we have more threads available, meaning that we can initiate more queries concurrently. The second benefit is that the other asynchronous computations performed in our system are done on a separate pool (for example, the global context), and they will avoid starvation since no threads are blocked.
We write a short benchmark to evaluate the performance of our new system. In this example, we use a mock implementation of findBuyOrders
to simulate querying the database:
def findBuyOrders( client: ClientId, ticker: Ticker)(ec: ExecutionContext): Future[List[Order]] = Future { Thread.sleep(100) Order.staticList.filter(o => o.clientId == client && o.ticker == ticker) }(ec)
We pass the ExecutionContext
as a parameter. Our benchmark compares the throughput of an application relying on the default ExecutionContext
and one using an ExecutionContext
, which is dedicated to blocking operations; the latter is initialized with twenty times more threads. The results are as follows:
Benchmark |
Operation count |
Throughput (ops per second) |
Error as percentage of throughput |
|
10 |
3.21 |
± 0.65 |
|
10 |
9.34 |
± 1.00 |
|
1,000 |
0.04 |
± 2.56 |
|
1,000 |
0.73 |
± 0.41 |
The results confirm our intuition. The dedicated pool is bigger than the default context in anticipation of threads being blocked waiting for a blocking operation to finish. Having more threads available, it is able to start more blocking operations concurrently, thus achieving a better throughput. Creating a dedicated ExecutionContext
is a good way to isolate blocking operations and make sure that they do not slow down CPU-bound computations. When designing your dedicated thread pool, make sure that you understand how the underlying resources (for example, connections, file handles, and so on) are used. For example, when dealing with a relational database, we know that one connection can only be used to perform one query at a time. A good rule of thumb is to create a thread pool with as many threads as the amount of connections that you want to open with your database server. If the number of connections is less than the thread count, some threads may be waiting for a connection and remain unused. If you have more connections than threads, the opposite situation may occur and some connections may remain unused.
A good strategy is to rely on the type system and the compiler to ensure that you are not mixing up different ExecutionContext
instances. Unless the type is differentiated, you may accidentally use a CPU-bound context when performing blocking operations. You can create your own DatabaseOperationsExecutionContext
type wrapping an ExecutionContext
, and accept this type when creating your database access module. Another idea is to use tagged types that are provided by Scalaz. Refer to Chapter 3, Unleashing Scala Performance, for a refresher on tagged types. Consider the following example:
object DatabaseAccess { sealed trait BlockingExecutionContextTag type BlockingExecutionContext = ExecutionContext @@ BlockingExecutionContextTag object BlockingExecutionContext { def fromContext(ec: ExecutionContext): BlockingExecutionContext = Tag[ExecutionContext, BlockingExecutionContextTag](ec) def withSize(size: Int): BlockingExecutionContext = fromContext(ExecutionContext.fromExecutor(Executors.newFixedThreadPool(size))) } } class DatabaseAccess(ec: BlockingExecutionContext) { // Implementation elided }
Using a tagged types for our ExecutionContext
gives us additional safety. It is easy to make a mistake in the main
method while wiring up your application, and inadvertently use the wrong ExecutionContext
when creating your modules.
The standard library provides a blocking
construct that can be used to signal blocking operations executed inside a Future
. We can modify our previous example to leverage blocking
instead of a dedicated ExecutionContext
:
import scala.concurrent.ExecutionContext.Implicits.global def findBuyOrders( client: ClientId, ticket: Ticker): Future[List[Order]] = Future { scala.concurrent.blocking{ Thread.sleep(100) Order.staticList.filter(o => o.clientId == client && o.ticker == ticker) } }
Note that in the preceding implementation, we use the default ExecutionContext
to execute the Future
. The blocking
construct is used to notify the ExecutionContext
that a computation is blocking. This allows the ExecutionContext
to adapt its execution strategy. For example, the default global ExecutionContext
will temporarily increase the number of threads in the pool when it performs a computation wrapped with blocking
. A dedicated thread is created in the pool to execute the blocking computation, making sure that the rest of the pool remains available for CPU-bound computations.
You should use blocking
cautiously. The blocking
construct is merely used to notify ExecutionContext
that the wrapped operation is blocking. It is the responsibility of the ExecutionContext
to implement a specific behavior or ignore the notification. The only implementation that actually takes it into account and implements special behavior is the default ExecutionContext
global.
While Future
is the main construct of the scala.concurrent
API, another useful abstraction is Promise
. Promise
is another way to create and complete a Future
. The Future
is a read-only container for a result that will eventually be computed. Promise
is a handle that allows you to explicitly set the value contained in a Future
. A Promise
is always associated with only one Future
, and this Future
is specific to the Promise
. It is possible to complete the Future
of a Promise
with a successful result, or an exception (which will fail the Future
).
Let's look at a short example to understand how Promise
works:
scala> val p = Promise[Int] // this promise will provide an Int p: scala.concurrent.Promise[Int] = scala.concurrent.impl.Promise$DefaultPromise@d343a81 scala> p.future.value res3: Option[scala.util.Try[Int]] = None // The future associated to this Promise is not yet completed scala> p.success(42) res4: p.type = scala.concurrent.impl.Promise$DefaultPromise@d343a81 scala> p.future.value res5: Option[scala.util.Try[Int]] = Some(Success(42))
A Promise
can only be used once to complete its associated Future
, either with a success or a failure. Attempting to complete an already realized Promise
will throw an exception, unless you use trySuccess
, tryFailure
, or tryComplete
. These three methods will attempt to complete the Future
that is linked to the Promise
and return true
if the Future
was completed or false
if it was already previously completed.
At this point, you may be wondering in what circumstances you would really take advantage of Promise
. Especially considering the previous example, would it be simpler to return the internal Future
instead of relying on a Promise
? Keep in mind that the preceding snippet is meant to demonstrate a simple workflow that illustrates the Promise
API. However, we understand your question. In practice, we see two common use cases for Promise
.
The first use case is to turn a callback-based API into a Future
-based API. Imagine having to integrate with a third-party product, such as the proprietary database that MVT obtained recently by purchasing usage licenses. This is a great product that is used to store historical quotes per timestamp and ticker. It comes with a library to be used by a client application. Unfortunately, this library, while fully asynchronous and nonblocking, is callback-oriented, as follows:
object DatabaseClient { def findQuote(instant: Instant, ticker: Ticker, f: (Quote) => Unit): Unit = ??? def findAllQuotes(from: Instant, to: Instant, ticker: Ticker, f: (List[Quote]) => Unit, h: Exception => Unit): Unit = ??? }
There is no doubt that the client works fine; after all, MVT paid a lot of money for it! However, it will not be easy to integrate it with your own application. Your program relies heavily on Future
. This is where Promise
can help us, as follows:
object DatabaseAdapter { def findQuote(instant: Instant, ticker: Ticker): Future[Quote] = { val result = Promise[Quote] DatabaseClient.findQuote(instant, ticker, { q: Quote => result.success(q) }) result.future } def findAllQuotes(from: Instant, to: Instant, ticker: Ticker): Future[List[Quote]] = { Val result = Promise[List[Quote]] DatabaseClient.findQuote(from, to, ticker, { quotes: List[Quote] => result.success(quotes) }, { ex: Exception => result.failure(ex) } } result.future }
Thanks to the Promise
abstraction, we are able to return a Future
. We simply use success
and failure
in the respective callbacks to call the proprietary client. This use case often arises in production when you have to integrate with a Java library. Even though Java 8 introduced a significant improvement to the Java concurrent package, most Java libraries still rely on callbacks to implement asynchronous behavior. Using Promise
, you can fully leverage the existing Java ecosystem in your program without giving up on Scala support for concurrent programming.
Promise
can also be used to combine instances of Future
. For example, let's add a timeout capability to Future
:
def run[A](f: => Future[A], timeout: Duration): Future[A] = { val res = Promise[A] Future { Thread.sleep(timeout.getMillis) res.tryFailure(new Exception("Timed out") } f onComplete { case r => res.tryCompleteWith(f) } res.future }
Our method takes a by-name Future
(that is, a Future
that has not started its execution yet) as well as the timeout value to apply. In the method, we use a Promise
as a container for the result. We start an internal Future
that will block for the timeout duration before failing the Promise
with an Exception
. We also start the main Future
and register a callback to complete the Promise
with the result of the computation. The first of the two Futures
that terminates will effectively complete the Promise
with its result. Note that in this example, we use tryFailure
and tryCompleteWith
. It is likely that both Futures
will eventually terminate and try to complete the Promise
. We are only interested in the result of the first one that completes, but we also want to avoid throwing an Exception
when attempting to complete an already realized Promise
.
The preceding example is a naive implementation of a timeout. It is mostly a prototype used to demonstrate how Promise
can be leveraged to enriched Future
and implement complex behavior. A more realistic implementation would probably involve a ScheduledExecutorService
. A ScheduledExecutorService
allows you to schedule the execution of a computation after a certain delay. It allows us to schedule the call to tryFailure
without blocking a thread with a call to Thread.sleep
. We made the choice to keep this example simple and not introduce a new type, but we encourage you to research this implementation of ScheduledExecutorService
.
In practice, you may occasionally have to write your own custom combinators for Future
. Promise
is a useful abstraction in your toolbox if you need to do this. However, Future
and its companion object already provide a number of built-in combinators and methods that you should try to leverage as much as possible.