Handling blocking calls and callbacks

As described in the first part of this chapter, the Future API provides an elegant way to write concurrent programs. As it is considered a bad practice to block on a Future, it is not unusual to see Future being widely used across an entire code base. However, it is unlikely that your system is only composed of your own code. Most real-world applications leverage existing libraries and third-party software to avoid re-implementing existing solutions to some common problems (such as data encoding and decoding, communication over HTTP, database drivers, and so on). Unfortunately, not all libraries use the future API, and it may become a challenge to gracefully integrate them into your system. In this section, we will examine some common pitfalls that you may encounter and mention possible workarounds.

ExecutionContext and blocking calls

While working on the backtester, you noticed that one module of the code is used to load some historical buy orders from a relational database. Since you started rewriting the application to take advantage of Future, the module API is fully asynchronous:

def findBuyOrders( 
 client: ClientId, 
 ticker: Ticker)(implicit ec: ExecutionContext): Future[List[Order]] = ??? 

However, after profiling the application, you noticed that this part of the code performs quite poorly. You attempted to increase the database connection count, first doubling it, then tripling it, both without success. Attempting to understand the cause of the problem, you look at all the locations where the method is called, and you noticed the following pattern:

import scala.concurrent.ExecutionContext.Implicits.global 
findBuyOrders(clientId, tickerFoo)   

All the callers are importing the global ExecutionContext to be implicitly used by the method. The default thread pool is backed by a ForkJoinPool, and it is sized based on the available cores on the machine. As such, it is CPU-bound and designed to handle nonblocking, CPU intensive operations. This is a good choice for applications that do not perform blocking calls. However, if your application runs blocking calls asynchronously (that is, in a Future execution), relying on the default ExecutionContext will most likely quickly degrade performance.

Asynchronous versus nonblocking

Before going further, we want to clarify some of the terms used in this section. Nonblocking can be a confusing term in the context of concurrency. When using Future, we perform asynchronous operations, meaning that we start a computation so it can proceed with the flow of the program. The computation is executed in the background and will eventually yield a result. This behavior is sometimes called nonblocking, meaning that the API call returns immediately. However, blocking and nonblocking most often refer to I/O operations and how they are performed, especially how the thread that is performing the operation is used. For example, writing a sequence of bytes to a local file can be a blocking operation because the thread calling write will have to wait (block) until the I/O operation is completed. When using nonblocking constructs, such as the ones provided in the java.nio package, it is possible to perform I/O operations that will be executed without blocking a thread.

It is possible to implement an API with a combination of the following behaviors:

API characteristics

Returns

Blocks a thread?

Synchronous/blocking

At the end of the computation

Yes, the calling thread executes the operation

Asynchronous/blocking

Immediately

Yes, this blocks a thread from a dedicated pool

Asynchronous/nonblocking

Immediately

No, the thread is freed-up while the blocking operation is performed

Using a dedicated ExecutionContext to block calls

Clearly, our problem is that we are using the ExecutionContext global to perform blocking calls. We are querying a relational database, and most JDBC drivers are implemented to perform blocking calls. The pooled threads call the driver and block while waiting for the query and the response to travel over the network, making them unusable by other computations. An option is to create a dedicated ExecutionContext to execute the Future, including blocking operations. This ExecutionContext is sized with more threads in the anticipation that they will be blocked when performing their computation:

val context = ExecutionContext.fromExecutorService( 
  Executors.newFixedThreadPool(20) 
) 
findBuyOrders(clientId, tickerFoo)(context)   

The first benefit is that we have more threads available, meaning that we can initiate more queries concurrently. The second benefit is that the other asynchronous computations performed in our system are done on a separate pool (for example, the global context), and they will avoid starvation since no threads are blocked.

We write a short benchmark to evaluate the performance of our new system. In this example, we use a mock implementation of findBuyOrders to simulate querying the database:

def findBuyOrders( 
 client: ClientId, 
 ticker: Ticker)(ec: ExecutionContext): Future[List[Order]] = Future { 
 Thread.sleep(100) 
 Order.staticList.filter(o => o.clientId == client  
   && o.ticker == ticker) 
}(ec) 

We pass the ExecutionContext as a parameter. Our benchmark compares the throughput of an application relying on the default ExecutionContext and one using an ExecutionContext, which is dedicated to blocking operations; the latter is initialized with twenty times more threads. The results are as follows:

Benchmark

Operation count

Throughput (ops per second)

Error as percentage of throughput

withDefaultContext

10

3.21

± 0.65

withDedicatedContext

10

9.34

± 1.00

withDefaultContext

1,000

0.04

± 2.56

withDedicatedContext

1,000

0.73

± 0.41

The results confirm our intuition. The dedicated pool is bigger than the default context in anticipation of threads being blocked waiting for a blocking operation to finish. Having more threads available, it is able to start more blocking operations concurrently, thus achieving a better throughput. Creating a dedicated ExecutionContext is a good way to isolate blocking operations and make sure that they do not slow down CPU-bound computations. When designing your dedicated thread pool, make sure that you understand how the underlying resources (for example, connections, file handles, and so on) are used. For example, when dealing with a relational database, we know that one connection can only be used to perform one query at a time. A good rule of thumb is to create a thread pool with as many threads as the amount of connections that you want to open with your database server. If the number of connections is less than the thread count, some threads may be waiting for a connection and remain unused. If you have more connections than threads, the opposite situation may occur and some connections may remain unused.

A good strategy is to rely on the type system and the compiler to ensure that you are not mixing up different ExecutionContext instances. Unless the type is differentiated, you may accidentally use a CPU-bound context when performing blocking operations. You can create your own DatabaseOperationsExecutionContext type wrapping an ExecutionContext, and accept this type when creating your database access module. Another idea is to use tagged types that are provided by Scalaz. Refer to Chapter 3, Unleashing Scala Performance, for a refresher on tagged types. Consider the following example:

object DatabaseAccess { 
 
 sealed trait BlockingExecutionContextTag 
 
 type BlockingExecutionContext = ExecutionContext @@ BlockingExecutionContextTag 
 
 object BlockingExecutionContext { 
   def fromContext(ec: ExecutionContext): BlockingExecutionContext = 
     Tag[ExecutionContext, BlockingExecutionContextTag](ec) 
 
  def withSize(size: Int): BlockingExecutionContext =  
 fromContext(ExecutionContext.fromExecutor(Executors.newFixedThreadPool(size))) 
} 
} 
 
class DatabaseAccess(ec: BlockingExecutionContext) { 
  // Implementation elided 
} 

Using a tagged types for our ExecutionContext gives us additional safety. It is easy to make a mistake in the main method while wiring up your application, and inadvertently use the wrong ExecutionContext when creating your modules.

Using the blocking construct

The standard library provides a blocking construct that can be used to signal blocking operations executed inside a Future. We can modify our previous example to leverage blocking instead of a dedicated ExecutionContext:

import scala.concurrent.ExecutionContext.Implicits.global 
def findBuyOrders( 
 client: ClientId, 
 ticket: Ticker): Future[List[Order]] = Future { 
   scala.concurrent.blocking{ 
     Thread.sleep(100) 
     Order.staticList.filter(o => o.clientId == client && o.ticker == ticker) 
   } 
} 

Note that in the preceding implementation, we use the default ExecutionContext to execute the Future. The blocking construct is used to notify the ExecutionContext that a computation is blocking. This allows the ExecutionContext to adapt its execution strategy. For example, the default global ExecutionContext will temporarily increase the number of threads in the pool when it performs a computation wrapped with blocking. A dedicated thread is created in the pool to execute the blocking computation, making sure that the rest of the pool remains available for CPU-bound computations.

You should use blocking cautiously. The blocking construct is merely used to notify ExecutionContext that the wrapped operation is blocking. It is the responsibility of the ExecutionContext to implement a specific behavior or ignore the notification. The only implementation that actually takes it into account and implements special behavior is the default ExecutionContext global.

Translating callbacks with Promise

While Future is the main construct of the scala.concurrent API, another useful abstraction is Promise. Promise is another way to create and complete a Future. The Future is a read-only container for a result that will eventually be computed. Promise is a handle that allows you to explicitly set the value contained in a Future. A Promise is always associated with only one Future, and this Future is specific to the Promise. It is possible to complete the Future of a Promise with a successful result, or an exception (which will fail the Future).

Let's look at a short example to understand how Promise works:

scala> val p = Promise[Int]  // this promise will provide an Int 
p: scala.concurrent.Promise[Int] = scala.concurrent.impl.Promise$DefaultPromise@d343a81 
 
scala> p.future.value 
res3: Option[scala.util.Try[Int]] = None 
// The future associated to this Promise is not yet completed 
 
scala> p.success(42) 
res4: p.type = scala.concurrent.impl.Promise$DefaultPromise@d343a81 
 
scala> p.future.value 
res5: Option[scala.util.Try[Int]] = Some(Success(42))  

A Promise can only be used once to complete its associated Future, either with a success or a failure. Attempting to complete an already realized Promise will throw an exception, unless you use trySuccesstryFailure, or tryComplete. These three methods will attempt to complete the Future that is linked to the Promise and return true if the Future was completed or false if it was already previously completed.

At this point, you may be wondering in what circumstances you would really take advantage of Promise. Especially considering the previous example, would it be simpler to return the internal Future instead of relying on a Promise? Keep in mind that the preceding snippet is meant to demonstrate a simple workflow that illustrates the Promise API. However, we understand your question. In practice, we see two common use cases for Promise.

From callbacks to a Future-based API

The first use case is to turn a callback-based API into a Future-based API. Imagine having to integrate with a third-party product, such as the proprietary database that MVT obtained recently by purchasing usage licenses. This is a great product that is used to store historical quotes per timestamp and ticker. It comes with a library to be used by a client application. Unfortunately, this library, while fully asynchronous and nonblocking, is callback-oriented, as follows:

object DatabaseClient { 
  def findQuote(instant: Instant, ticker: Ticker,  
    f: (Quote) => Unit): Unit = ??? 
 
  def findAllQuotes(from: Instant, to: Instant, ticker: Ticker,
    f: (List[Quote]) => Unit, h: Exception => Unit): Unit = ??? 
} 

There is no doubt that the client works fine; after all, MVT paid a lot of money for it! However, it will not be easy to integrate it with your own application. Your program relies heavily on Future. This is where Promise can help us, as follows:

object DatabaseAdapter { 
 
  def findQuote(instant: Instant, ticker: Ticker): Future[Quote] = { 
    val result = Promise[Quote] 
 
    DatabaseClient.findQuote(instant, ticker, { 
      q: Quote => 
        result.success(q) 
    }) 
 
    result.future 
  } 
 
  def findAllQuotes(from: Instant, to: Instant, ticker: Ticker): 
  Future[List[Quote]] = { 
    Val result = Promise[List[Quote]] 
    DatabaseClient.findQuote(from, to, ticker, {
      quotes: List[Quote] => result.success(quotes)
    }, {
      ex: Exception => result.failure(ex)
    } 
  } 
 
  result.future 
} 

Thanks to the Promise abstraction, we are able to return a Future. We simply use success and failure in the respective callbacks to call the proprietary client. This use case often arises in production when you have to integrate with a Java library. Even though Java 8 introduced a significant improvement to the Java concurrent package, most Java libraries still rely on callbacks to implement asynchronous behavior. Using Promise, you can fully leverage the existing Java ecosystem in your program without giving up on Scala support for concurrent programming.

Combining Future with Promise

Promise can also be used to combine instances of Future. For example, let's add a timeout capability to Future:

def run[A](f: => Future[A], timeout: Duration): Future[A] = { 
  val res = Promise[A] 
 
  Future { 
    Thread.sleep(timeout.getMillis) 
     res.tryFailure(new Exception("Timed out") 
  }  
 
  f onComplete { 
  case r => res.tryCompleteWith(f) 
  } 
 
  res.future 
} 

Our method takes a by-name Future (that is, a Future that has not started its execution yet) as well as the timeout value to apply. In the method, we use a Promise as a container for the result. We start an internal Future that will block for the timeout duration before failing the Promise with an Exception. We also start the main Future and register a callback to complete the Promise with the result of the computation. The first of the two Futures that terminates will effectively complete the Promise with its result. Note that in this example, we use tryFailure and tryCompleteWith. It is likely that both Futures will eventually terminate and try to complete the Promise. We are only interested in the result of the first one that completes, but we also want to avoid throwing an Exception when attempting to complete an already realized Promise.

Note

The preceding example is a naive implementation of a timeout. It is mostly a prototype used to demonstrate how Promise can be leveraged to enriched Future and implement complex behavior. A more realistic implementation would probably involve a ScheduledExecutorService. A ScheduledExecutorService allows you to schedule the execution of a computation after a certain delay. It allows us to schedule the call to tryFailure without blocking a thread with a call to Thread.sleep. We made the choice to keep this example simple and not introduce a new type, but we encourage you to research this implementation of ScheduledExecutorService.

In practice, you may occasionally have to write your own custom combinators for Future. Promise is a useful abstraction in your toolbox if you need to do this. However, Future and its companion object already provide a number of built-in combinators and methods that you should try to leverage as much as possible.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset