Chapter 11
Concurrency

There is a great chance that you've heard about the old Moore's law, which states that processing power will double every two years. It was handy for quite some time: you could develop your applications and if the actual performance of the software was poor, you could just wait for a few months so that everyone would have a better computer to handle the appetites of your application. At that time concurrency was only used to handle multitasking, as in a web browser: when you click on a download link, it should be going in parallel, without blocking out the main window.

Sadly, the “free lunch” is over and today to increase the fluidity of your software it is necessary to know how to handle more than one CPU core at the same time. This task is not easy, and the concurrency problem is considered as one of the most difficult in today developer's craft.

But wait, there is more! The long-time reign of mainframes is nearly nonexistent today, and now we are dealing with many small servers instead of a lonely large one. Distributed systems are no longer a scientific subject. Today they are used to solve complex problems by many of the world's top 500 technological companies. All of this is due to the fact that building one great computer (called “vertical scaling”) is much, much more expensive than connecting a cluster of a bunch of cheap ones (also called “horizontal scaling”). That's why developers who understand and know how to handle distributed (or, at least, multi core) programming are very valued today.

Java was created in 1995 when there was no widespread multi core or distributed applications. Even if it is true that Java has evolved during the last years and nearly everything is available as a linkable library, Scala has some of these tools directly out of the box. Things like monaïdic Future, Parallel collections, Akka, and Spark were specifically made to be as readable and expressive as possible when using Scala.

So why bother learning concurrency? As an example, consider that you need to query a distant database. Why not just wait for a reply from the server and then proceed with computations? Take a look at this schema in Figure 11.1.

Illustration depicting two processes concurrently fighting for CPU’s time.

Figure 11.1

Here you have two processes concurrently fighting for CPU's time. It's normal, on a low level, so the scheduler handles those things by giving each one of the processes some time according to their priority. Now let's see what happens when a process makes a call to a remote MongoDB database (Figure 11.2).

Illustration depicting a process that makes a call to a remote MongoDB database.

Figure 11.2

The second process took some of its time to make a request, but after that it did nothing but wait for the reply. If the MongoDB is located on a separate server, it can mean that it is blocking the thread for a quite some time as the request latency is an order of magnitude larger than the time to prepare the request.

Now here is what it looks like when using a non-blocking, reactive MongoDB driver (Figure 11.3).

Schematic for a non-blocking, reactive MongoDB driver.

Figure 11.3

Quite an improvement, don't you think so? You are using all of the power of the processor without blocking it. Now imagine for a second that you have thousands of threads that are doing asynchronous requests on many cores. This is where the reactive (or asynchronous) programming may be something that separates a system with non-viable performances on an expensive server from the one that will handle nearly any number of requests on a few cheap machines.

In this chapter we discuss the main ways to handle concurrency in Scala. We start with a small reminder of how this problem was solved originally, and how the concurrency model looks on a low level. Then we gradually proceed to more advanced techniques available in which the concurrency is handled behind powerful abstractions.

Each of the parts in this chapter may take a whole book to be described in detail, so instead we will analyze where to use the described tool and where it should be avoided at all costs. With this in mind, to have the best results from this chapter, don't be afraid to experiment and launch the examples yourself, since it's one of the best ways to feel the unpredictability and the power of the concurrent programming.

SYNCHRONIZE/ATOMIC VARIABLES

Java 1.5 introduced the concurrency model that became the building bricks of today's concurrent abstractions in JVM languages. Even if it is rarely used in Scala (as there are better alternatives), you should still understand how it works under the hood. As you may already know, every time you need to execute a concurrent process, you need to create a thread:

  class CustomThread extends Thread {
    override def run(): Unit = {
      println("Custom thread is running.")
    }
  }
  val thread = new CustomThread
  thread.start()
  thread.join()
  println("Custom thread has joined.")

In this code you create a custom class that extends Thread. The start() function effectively executes the run() method of the CustomThread class on a different system's thread. It is the OS's job to find an available CPU core to run the method. The join() method, on the other hand, notifies the main thread (the one on which the program is executed) to wait for the end of thread's execution. As a consequence, the string “Custom thread has joined” will be always shown after “Custom thread is running.” It's worth noting that the main thread is not busy waiting while doing the join(), so no additional CPU-cycles are consumed.

Now, let's say you have a variable that two threads should modify at the same time:

  var a = 0
  class CustomThread extends Thread {
    override def run(): Unit = {
      a += 1
    }
  }
  val thread1 = new CustomThread
  val thread2 = new CustomThread
  thread1.start()
  thread2.start()

  thread1.join()
  thread2.join()
  println(a)

After executing this code several times, you may notice that the last line does not always show “2”, but sometimes it's “1” instead. What is happening here is that two threads are modifying the variable at the same time: thread1 reads the value that is “0” and before it modifies it, thread2 also reads the value “0” of the variable. So both threads see “0” and they both add one to it to save the final value as “1”. This phenomenon is called “race conditions”: when the result of a concurrent program depends on the order of the execution of its statements.

Luckily there is a way to handle this situation, and it's called synchronization. Let's explain it with an analogy.

A while back there was a version control system called RCS. It worked as follows: when you want to modify a file you should have locked it so that nobody other than you can modify it until you're done. What happens when you want to modify a file that is locked by another person? Well, you have to wait some time, probably doing nothing but waiting. Here is how it looks applied to code with an incrementing variable:

  var a = 0
  val obj = new Object
  class CustomThread extends Thread {
    override def run(): Unit = {
      obj.synchronized {
        a += 1
      }
    }
  }
  val thread1 = new CustomThread
  val thread2 = new CustomThread
  thread1.start()
  thread2.start()

  thread1.join()
  thread2.join()
  println(a)

In this case the last line will always print “2” no matter how many times you execute the program. It is not possible to do a synchronize on a as it is an integer, and that's why we introduced the new obj value so that you could have a lock for the modifications on a.

The problem with this model is that you could easily stumble upon a deadlock. Consider this code:

var a = 0
val obj1 = new Object
val obj2 = new Object
class CustomThread1 extends Thread {
  override def run(): Unit = {
    obj1.synchronized {
      obj2.synchronized {
        a += 1
      }
    }
  }
}
class CustomThread2 extends Thread {
  override def run(): Unit = {
    obj2.synchronized {
      obj1.synchronized {
        a += 1
      }
    }
  }
}
(1 to 100).foreach(i => {
  println(s"current iteration: $i")
  val thread1 = new CustomThread1
  val thread2 = new CustomThread2
  thread1.start()
  thread2.start()
  thread1.join()
  thread2.join()
})
println(a)

If you execute this code, there is a very high chance that it will just stack on some iteration (just re-launch it a few times if that's not the case for you). You will have to halt the execution. What happens here is that at some moment thread1 synchronizes on obj1 when, at the same time, thread2 synchronizes on obj2. Then thread1 needs a synchronization on obj2 that is locked and thread2 needs it on obj1. Nobody can move, since we are in deadlock!

This is why the low-level model is better to be avoided. But especially for these cases SUN ­introduced so called Atomic Variables that, in some cases, may help avoid these problems with concurrent access. As an example, let's rewrite the “counter” example where each thread tries to increment the variable:

import java.util.concurrent.atomic.AtomicInteger
var a = new AtomicInteger()
class CustomThread extends Thread {
  override def run(): Unit = {
    a.getAndIncrement()
  }
}
val thread1 = new CustomThread
val thread2 = new CustomThread
thread1.start()
thread2.start()

thread1.join()
thread2.join()
println(a)

Now no matter how many times you execute this code, it will always print “2” as the output. Sure, you may no longer use symbolic methods like “+” or “—” when working with AtomicInteger, but that's what the low-level concurrent integer primitive looks like. There are other “atomic” variables like AtomicLong, AtomicBoolean, or even AtomicRef, and they all have their specific use cases, but these are outside of the scope of this chapter.

There should be a small word about “volatile” variables, as they are quite common in Java concurrency programming. Take a look at this application:

class CustomThread extends Thread {
  var flag = true
  override def run(): Unit = {
    while(flag) { }
    println("Thread terminated")
  }
}
val thread = new CustomThread
thread.start()
Thread.sleep(2000)
thread.flag = false
println("App terminated")

Here you have a main application thread that launches another custom thread. In the run() method of the custom thread you may notice that it does a while loop infinitely, until someone tells it to stop. That is called “busy-waiting” and it also locks the CPU core, so never use while in a thread to wait for something!

The problem in this application is that when you execute thread.flag = false, the thread won't see the change and will continue to “busy-wait” for something to change. It will work as expected if, instead, you mark that variable with @volatile annotation, like the following:

@volatile var flag = true

In this case, the custom thread will successfully see the change of the flag variable and will terminate the execution.

To conclude, in this section you saw why the low-level concurrent programming is considered hard and very dangerous. In the following sections of the chapter you will see how this problem can be solved with powerful abstractions that Scala brings to the table.

FUTURE COMPOSITION

Nearly all modern languages have some means of an asynchronous call. For example, promises exist in JavaScript, gorutines exist in Go, and fibers exist in Ruby. Scala has its own asynchronous structure called Future that is directly included in the standard library. This way other libraries that need to use it don't need to include other external implementations, as in Java.

Future is a container for asynchronous computations. It's like a magic box that promises you an object, but you must wait for some time until you open it. Consider the following code:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
Future {
    print("World ")
}
print("Hello ")

Here you import the Future class that will hold your computations and you also import the global execution context that will be discussed in a few moments. If you execute it, you will print “Hello World.” While it is not guaranteed that the sequence will be printed in that order and not in the order of “World Hello,” asynchronous computations need a separate thread from a thread pool as well as some time scheduled by the CPU. As this manipulation takes some time, the “Hello” gets printed before. Let's spice it up a little bit:

for(i <- 1 to 30){
    Future{
        print("World ")
    }
    print("Hello ")
}

Now, you can't predict anything about the resulting string except, maybe, that the first word will be “Hello.” This word continues to be printed with every loop, but the “World” word will be printed when the Future is executed, which isn't predictable. For an even more unpredictable output, wrap the second print() in a Future, but be careful because your for loop may end before all of the “futures” are finished, effectively halting your application with all of the uncompleted Futures.

Now let's talk about why we imported the global ExecutionContext. Execution contexts are used to handle a threads pool in your application that also depends on a number of cores that your CPU has. You may create your own ExecutionContext to handle threads in another manner or to handle just your database threads, but for these examples the default global one will be just fine. Where is it used? Take a look at the definition of the Future's factory method:

object Future {
  def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T
}

It needs an implicit executor somewhere in the scope and the import scala.concurrent.ExecutionContext.Implicits.global does exactly that. Note that this provides a pool of a finite numbers of threads, so if there are for example, 8 threads available and you create 8 Futures that will block all of those threads, subsequent Futures will not be executed. This is called “thread starvation.” To avoid it, when you know that you will be creating many blocking Futures, wrap the contents into a blocking statement:

for (i <- 1 to 30) {
  Future{
    blocking {
      Thread.sleep(10000)
      println("Done")
    }
  }
}

Now let's discuss how to work with a method that returns a Future. Imagine you have a DAO layer in your application:

object Dao {
  def findUserById(id: Int): Future[User] = Future {
    User("Alex", 26)
  }
}

We defined a DAO with a single method returning a user by id. For the sake of simplicity, it will return the same user every time. Normally, this method should have a type Future[Option[User]], and as a user with a specific Id that may not exist, but, again, to avoid complications let's imagine that it will always return a user.

OK, now we have a task to show a user's name for a specific id. Here's how to do it:

import scala.util.{Success, Failure}
Dao.findUserById(1).onComplete {
  case Success(user) => println(s"User's name is ${user.name}")
  case Failure(ex) => println("An error has occurred!")
}

A callback is attached to the future, and when it's complete, we will immediately execute either the “success” case or the “failure” one (this happens if something throws an exception inside of a Future). As you may notice, the onComplete() method returns a Unit type, which means that the result of the computation in the Success case will be left there and nobody will be able to use it. But what if you need to create a “service layer” with a method having following signature:

def retrieveUserNameById(id: Int): Future[String]

In this case you won't do the fetch in the database, because it's Dao's job. But this means that you need to find a way to manipulate Future to transform it from Future[User] into Future[String]. You already know what's a Monad from the previous chapter on advanced types, and Future also has all of those useful methods like foreach(), map(), fold(), and others. The transformation from Future[User] into c can be handled by a map() method:

object UserService {
  def retrieveUserNameById(id: Int): Future[String] =
    Dao.findUserById(id).map { user =>
      user.name
    }
}

Just as with Collections, map() method transforms the element inside of the Future container and returns the wrapper with a new content. Now the top-level function will be able to use the Service layer without knowing how the DAO is implemented:

def sayHello(): Unit = UserService.retrieveUserNameById(1).onComplete {
  case Success(name) => println(s"Hello $name !")
  case Failure(ex) => println("An error has occurred!")
}

Or its more functional version:

def sayHello(): Unit = UserService.retrieveUserNameById(1).foreach { name  =>
  println(s"Hello $name !")
}

The filter() method returns the Future if its element satisfies the predicate. If the element does not satisfy the predicate, this method returns a failed Future:

Dao.findUserById(1).filter(_.age > 18) // Successful, with the user
Dao.findUserById(1).filter(_.name == "Sam") // Failed Future

You may be sometimes tempted to use Await.result to wait for the Future's result like this:

val userName = Await.result(UserService.retrieveUserNameById(1), 1 second)

Don't do it, since this will effectively block the current thread as well as the already blocked thread that is doing the computation, thus losing all of the benefits of the Future! The only place it may be appropriate is during your unit tests because you need to make sure the execution order is predictable.

And last but not least, consider using the library called “scala-async” because it may make your code more readable, so instead of:

def calculate: Future[Int] = {
  val future1 = Future(1 + 2)
  val future2 = Future(3 + 4)
  for {
    result1 <- future1
    result2 <- future2
  } yield result1 + result2
}

You may do an, arguably, more readable version:

def calculate: Future[Int] = async {
  val future1 = Future(1 + 2)
  val future2 = Future(3 + 4)

  await(future1) + await(future2)
}

But don't forget: these are just macros that are transformed into a version with Future upon the compilation. In addition, scala-async has some limitations: for example, you may not use a closure inside of an async{} statement.

This all just barely scratches the surface of the work with the Future structure. You should now have a sufficient knowledge of how to work with functions that compute an asynchronous result, or how to initialize an asynchronous computation yourself.

PARALLEL COLLECTIONS

So far you've learned about the ways to launch an asynchronous task. Now let's talk about data structures that have parallelism inside of their bones. Parallel collections were introduced in Java 8 (for streams), but they already existed in Scala even before that (since version 2.9). Parallel collections may be a great way to improve the performance of your application, but they may also make it worse. You may think “Well, our CPU is multi-core, so let's make all of the collection operations parallel! What can go wrong?” Now consider following code:

(1 to 10000000).filter(_ % 2 == 0) 

Would it improve the performance if you transform it into parallel computation?

(1 to 10000000).par.filter(_ % 2 == 0)

The small .par there does exactly that: it transforms a collection into its “parallel” counterpart. It's easy to see an improvement if the operation takes several dozens of minutes (in that case, parallel or normal collections may not be the best tool to use, and you may want to consider learning about Spark). But in this case the execution takes somewhere near 250 milliseconds. That's too fast for humans, so we will need to test it through micro benchmarks.

Usually, to measure how fast an expression executes, you may consider using System.nanoTime:

val start = System.nanoTime
val result = (1 to 10000000).filter(_ % 2 == 0)
println(((System.nanoTime - start) / 1000000) + " milliseconds")

Let's see the results. Ah, a whole 4 seconds and 293 milliseconds! Quite a long time for such a small application, so there must be an error somewhere here. Let's execute the microbenchmark again: 2 seconds 759 milliseconds! That's strange, the code is the same, but the execution time has 30% difference between the two runs. We may try a version with .par, so maybe it will be considerably faster:

val start = System.nanoTime
val result = (1 to 10000000).par.filter(_ % 2 == 0)
println(((System.nanoTime - start) / 1000000) + " milliseconds")

And now it's 6 seconds and 438 milliseconds. At this stage you may say that the parallel collection is clearly slower and thus, useless in this situation. But instead, let's explain why there is such a difference between the results for three separate runs.

Scala's code is not directly compiled into machine code as it's done for C. Instead, it is compiled into an intermediate state that is called a “byte-code.” That byte code is then interpreted by a JVM machine that is different for each platform, effectively making Scala's (and also Java's) code cross-platform. You should know that the JVM is intelligent and sees when you are executing the same code more than once; in that case, it optimizes it so that subsequent executions don't take so long. This is called “Just In Time” compilation, or simply JIT. To take this optimization into account, use “warmup” cycles:

var start = 0L

for (i <- 1 to 10){
  start = System.nanoTime
  (1 to 10000000).par.filter(_ % 2 == 0)
  println(((System.nanoTime - start) / 1000000) + " milliseconds")
}

And the execution will output:

$ scala chap11.scala
1891 milliseconds
628 milliseconds
1262 milliseconds
393 milliseconds
348 milliseconds
330 milliseconds
361 milliseconds
324 milliseconds
332 milliseconds
347 milliseconds

You may clearly see that the first execution is not representative at all. It also takes some time for the code to get warmed up.

There are numerous other things to keep in mind when doing micro benchmarks: run-specific JVM optimizations that may not be representative at all, Garbage Collector may kick in during the execution, affecting the results, shared state problems, and so on. But it is possible to avoid some of these problems by using a specialized library.

At the moment, the JDK 9 is not officially released, but we already know what new features it will have. Among numerous improvements, there will be a micro benchmark tool called JMH (Java Microbenchmark Harness). You may find a description with quite a few examples for Java here: http://openjdk.java.net/projects/code-tools/jmh/. But we obviously want to use it with Scala, and there is a handy version of JMH adapted for this language called sbt-jmh: https://github.com/ktoso/sbt-jmh. Consider learning the basics of this tool so that you won't do optimizations without some real performance numbers.

With this in mind, let's see if the parallel version of the filter() method is better (Figure 11.4). Both versions were executed in JMH with 20 warm up iterations, 20 measurement iterations, and 20 JVM forks (so that there is no run-specific JVM optimization), and here are my results on an i7-4700HQ quad core processor with JVM version 1.8.0_66.

Illustration depicting  the parallel version of the filter() method.

Figure 11.4

So now you know that the parallel version is faster, but not by much, and it's up to you to decide if the optimization is worthwhile. It is faster because the filter() operation is parallelizable: the collection is split into several chunks, and each chunk is filtered separately so the results are concatenated together. Now, imagine you have a list of elements (integers in our case) and you need to apply a parallelizable operation to it (find a maximum):

val list = Random.shuffle(Vector.tabulate(5000000)(i => i)).toList
val max = list.max

This will produce a list with elements from 1 to 5000000 in a random order to find the maximum value. You might think that this is great, so why not throw a .par into it? This way you can use the power of the multi core CPU to suit your needs:

val max = list.par.max

With happy thoughts about gained optimizations, let's launch the same micro benchmark used for the filter() operation (Figure 11.5).

Illustration depicting micro benchmark used for the filter() operation.

Figure 11.5

Now that is very strange: the parallel operation is nearly six times slower than the iterative one! Here is why that is the case: To execute methods in parallel, the collection, as mentioned before, needs to be split into several chunks. If the collection cannot do it in a constant time O(1) (as in the case when we are using a List), it will first be converted to such a collection (Array, Vector, Range, etc.). The important thing is that the conversion is not parallelized, so it takes quite some time to convert the list of five million elements. Let's see what performances you have if you use a Vector type:

val vector = Random.shuffle(Vector.tabulate(5000000)(i => i))
val max = vector.max
val maxPar = vector.par.max

The results are shown in Figure 11.6.

Illustration depicting the outcome of max () performance with Vector.

Figure 11.6

Notice that the parallel version is almost two times faster than the iterative one, but the version with a non-parallelized list is even two times faster than that. Always micro benchmark your code before doing .par optimizations!

Let's talk about another pitfall. As you know, parallel operations are executed more or less at the same time, so consider the following code:

var a = 0

(1 to 100).par.foreach(_ => a += 1)

println(a)

Can you predict what number this code will show on a multi-core CPU? It may be “100,” but this is highly improbable, so you may bet that the number will be anything but “100” and be sure to win it. This example looks almost the same as the one at the beginning of the chapter, when we talked about Threads: two or more different threads are reading the same value of a (for example, 41), then they increment it by 1 (for example, 42) and store it into the a. In the end, two or more threads applied an increment, but the result is still the same (42). That's why it is very important to keep the functions side-effect free for your parallel methods, (such functions are described in detail in Chapter 2).

As a rule of thumb, don't blindly add .par to a collection operation in hopes that it will improve the performances. Instead create a micro benchmark with a tool like “JMH” to measure the potential gains. Also, be aware of the problem with “side effects” when working with parallel collections.

REACTIVE STREAMS

In a chapter about concurrency it would be a shame not to talk about reactive programming. What is it? Imagine a simple operation:

var a = 1
var b = a + 1

In normal programming languages, b will be evaluated to 2, but what if we change the code a little bit by adding another operation:

var a = 1
var b = a + 1
a = 2

In non-reactive programming, the value of b is still 2, no matter how much you change a, b will remain the same. b does not depend on a. But in reactive programming, the value of b does depend on the value of a and reacts to each one of its changes. It's more or less like Excel's tables where changing a value in one cell may modify everything else.

One of the most mature libraries for reactive programming is RxJava, it is maintained by Netflix, and often considered a de facto standard when dealing with implementations for reactive extensions. One of the alternatives is Akka-streams, but it is still in an experimental state at the time of writing. We will use RxJava's Scala-adaption, RxScala (https://github.com/ReactiveX/RxScala). To include the library in your project, add this line to the SBT file:

libraryDependencies += "io.reactivex" %% "rxscala" % "0.25.1"

Here is an example of a trivial application:

println("Start")
val observable = Observable.from(1 to 100)
observable.subscribe(println(_))
println("End")

Here we may witness a simple implementation of a “Publisher/Subscriber” pattern. The code is simply printing numbers from 1 to 100. In this case the “publisher” is the Observable and the “subscriber” is the println() function. The interesting thing is that this output will be printed between the words “Start” and “End.” Not quite asynchronous as we would expect, this is the case because the Observable created with from() is synchronous by default. To make it execute on another thread, you should provide a scheduler:

println("Start")
val observable = Observable.from(1 to 100).observeOn(IOScheduler())
observable.subscribe(println(_))
println("End")

And now the “End” word should be printed long before the number “100.”

By now you may say that observables look like a way to process a collection of elements asynchronously. But is it better? As we learned from the previous part about parallel collections, we should never trust our feelings when dealing with parallel computations; instead we should be wise and create a micro-benchmark that tests our assumptions. As a test subject we will take a filter() and sum() operations:

val observable = Observable.from(1 to 1000000)
observable.filter(_ % 2 == 0).sum.toBlocking.first

Here we transform a Range of one to a million into an Observable, then we filter the even numbers from it, and finally, we are calculating the sum of them. toBlocking and first are needed here to get the result of the computation without creating a separate subscription on the Observable.

In the other corner you have a standard List operation that you already saw for uncountable amounts of time:

val list = (1 to 1000000).toList
list.filter(_ % 2 == 0).sum

It is the same operation as with Observable, but with a simpler, more readable language. We will now benchmark those two blocks of code with JMH, using 20 warmup iterations, 20 measurement iterations, and 20 JVM forks to get the most representative results as possible. Here they are, as shown in Figure 11.7.

Schematic for an operation with filter + sum performance.

Figure 11.7

What a result! Observable is nearly 2.5 times slower than a List! So, with performances like that, why should anyone consider using RxScala? In fact, there are a few things that are possible with observables but are impossible with collections or streams. For instance, with Observable you may do the following:

import scala.concurrent.duration._
println(s"(Thread: ${Thread.currentThread().getId}) Start")
val observable = Observable.interval(1 second).observeOn(IOScheduler()).take(5)
observable.subscribe(x => println(s"(Thread: ${Thread.currentThread().getId}) $x"))

Thread.sleep(6000)
println(s"(Thread: ${Thread.currentThread().getId}) End")

In this code the created Observable will emit a number every second so that a subscriber can print it. Here is what the output will look like (thread's id may be different for you):

(Thread: 70) Start
(Thread: 74) 0
(Thread: 74) 1
(Thread: 74) 2
(Thread: 74) 3
(Thread: 74) 4
(Thread: 70) End

As you can see, the subscription operations are done on a separate thread. The take(5) is necessary because otherwise the subscriber won't stop printing numbers even after the main thread has terminated. As for the intervals, you are not limited to seconds, and it may be anything from nanoseconds to hours. With RxScala it becomes easy to manipulate events that are timed in a certain manner.

The other area where RxScala shines is in merging streams. Let's say you have several observables coming from different sources, and you need to merge them into one to create an observer, instead of creating separate observers for each of the Observables:

import scala.concurrent.duration._
println(s"(Thread: ${Thread.currentThread().getId}) Start")
val observable1 = Observable.interval(1 second).observeOn(IOScheduler()).take(3)
val observable2 = Observable.interval(700 millis).observeOn(IOScheduler()).take(4)
val observable3 = Observable.interval(300 millis).observeOn(IOScheduler()).take(6)
val mainObservable = Observable.from(Array(observable1, observable2,
  observable3)).flatten
mainObservable.subscribe(x => println(s"(Thread: ${Thread.currentThread()
  .getId}) $x"))
Thread.sleep(4000)
println(s"(Thread: ${Thread.currentThread().getId}) End")

And here is its output:

(Thread: 100) Start
(Thread: 108) 0
(Thread: 108) 1
(Thread: 106) 0
(Thread: 108) 2
(Thread: 104) 0
(Thread: 108) 3
(Thread: 106) 1
(Thread: 108) 4
(Thread: 108) 5
(Thread: 104) 1
(Thread: 106) 2
(Thread: 106) 3
(Thread: 104) 2
(Thread: 100) End

This is not readable at all! But there is a way to make it make it more explicit, if you use a “marble diagram” to express what is happening (Figure 11.8).

Schematic representation of marble diagram.

Figure 11.8

The small marbles from observables are placed on the “time arrow” according to the time when they were observed. After the merging operation, Observable.from(), you are only subscribed to the ObservableMain value that is the combination of the 3 separate Observables. It's like combining three lists into one, but the elements are placed according to their execution time. You may make yourself more familiar with this kind of diagram by visiting this site: http://rxmarbles.com/. Nearly all of Observable's methods are described in a form of a marble diagram.

There are quite a few things to say about reactive programming with RxScala, but we are here to decide if it is the right tool for the applications or not. If the logic of your application needs timed executions, instead of using Thread.sleep(), use this library. Also, when you need to merge a few streams into one, or you may imagine a marble diagram that represents events in your ­application, ­consider trying the RxScala library because it may make your application's domain logic more ­natural and readable.

STM

It would be strange not to talk about Software Transactional Memory in a chapter about concurrency. This technique is less known than others, but it is worth mentioning because it may solve a particular problem you may have in your application.

Software Transactional Memory has much in common with database transactions. It is an alternative to the synchronization that “commits” a block of code potentially accessed concurrently and rolls back the execution if two threads are trying to change the variables in the “transaction” at the same time. The history of STM started in 1986 when Tom Knight proposed transactions on a hardware level. In 1995 the STM began to live as software-only transactions. Today it is still an area of research, but the practical implementations are widely used in frameworks such as Akka.

In this section of the chapter we will talk about a particular implementation called ScalaSTM. This library was created by experts in STM and it is the one that is currently used in the Akka framework. It will soon be introduced in Scala's standard library. Do you remember the example of AtomicInteger from the beginning of the chapter? All of the operations with a single instance are atomic and, thus, will work well in concurrent environments. But if you introduce another variable, things might get complicated. And it doesn't matter if the other variable is also AtomicInteger, the sequence of operations where we use them both is not considered atomic itself without a proper synchronization. Take a look at this code:

import java.util.concurrent.atomic.AtomicInteger
var a = new AtomicInteger()
var b = new AtomicInteger()
class CustomThread extends Thread {
  override def run(): Unit = {
    for (_ <- 1 to 30) {
      a.getAndIncrement()
      b.addAndGet(a.get())
    }
  }
}
val thread1 = new CustomThread
val thread2 = new CustomThread
thread1.start()
thread2.start()
thread1.join()
thread2.join()
println(a)
println(b)

It will print “60” for a and “1840” for b. Or “1834,” or “1844,” or even “1839.” You can't predict the result, because our computations are not atomic! One of the ways to handle this situation is to use the synchronize method that was used earlier in this chapter. But this, as you already saw, could easily lead to a deadlock. Also, programming with synchronized is quite difficult without any substantial advantages. A better way to handle the situation would be to use Scala STM. To include Scala STM in your project, add the following line to your dependencies:

"org.scala-stm" %% "scala-stm" % "0.7"

With that done, you now can modify your code so that it works as expected:

import scala.concurrent.stm._
var a = Ref(0)
var b = Ref(0)
class CustomThread extends Thread {
  override def run(): Unit = {
    for (_ <- 1 to 30) {
      atomic{ implicit txn =>
        a() = a() + 1
        b() = b() + a()
      }
    }
  }
}
val thread1 = new CustomThread
val thread2 = new CustomThread
thread1.start()
thread2.start()
thread1.join()
thread2.join()
println(a.single())
println(b.single())

Great! Now in the end, you always get “60” for a and “1830” for b. So, how does it work? Notice that instead of a plain Integer or an atomic one, you have a Ref at the variable definition, which stands for “transactional reference.” A Ref variable has to be used inside a transaction (the atomic statement), as its apply() and update() methods are requiring an implicit InTxn value:

override def apply()(implicit txn: InTxn): T = impl.get(handle)
override def update(v: T)(implicit txn: InTxn) { impl.set(handle, v) }

The way that ScalaSTM implements atomic transactions is quite complex: it keeps a log of every “write” and “read” inside of the atomic statement and once it gets to the end of the block, it “commits” the result. But it is highly possible that someone else tries to read or write the value at the same time. In this case there is a “transactional conflict,” and when this happens, both transactions are canceled and executed in sequential order (you can't predict in what exact order). Doesn't it look the same as database transactions?

At the last line with println(a.single()) we printed the value of a without using any atomic statement. It is called “single-operation transaction” and it doesn't need to be inside a transaction.

Be aware that the transaction may be rolled back and re-executed, so any side effects inside of the atomic statement will be re-executed as well. Change the contents of the CustomThread's run() method to the following:

override def run(): Unit = {
  for (_ <- 1 to 10) {
    atomic{ implicit txn =>
      println(s"The value of a is ${a()}")
      a() = a() + 1
      b() = b() + a()
    }
  }
}

Here you have a println() statement that is, obviously, a side-effect. In the output of your application you may find lines that are repeated:

The value of a is 2
The value of a is 2
The value of a is 3

This means that both threads accessed the value at the same time, so the transaction was rolled back and executed in sequential order. In this particular case, the side effect is not critical, but may not always be like that. If you really need to use side effects inside of atomic statements, you may use Txn.afterCommit or Txn.afterRollback statements:

override def run(): Unit = {
  for (_ <- 1 to 10) {
    atomic { implicit txn =>
      Txn.afterRollback { _ => println(s"Rollback!") }
      a() = a() + 1
      b() = b() + a()
      val aValue = a()
      Txn.afterCommit { _ => println(s"The value of a after commit is $aValue") }
    }
  }
}

The output may contain the following lines:

The value of a after commit is 10
The value of a after commit is 11
Rollback!
The value of a after commit is 12
Rollback!
The value of a after commit is 13

Notice that you didn't directly use a() inside of println() because you can't use references outside of transactions. Instead, you need to create an intermediate value that will be used for the println().

To conclude, you saw how it is possible to create thread-safe, deadlock-free mutable state between threads. This is a great replacement for the synchronized statement, not only because it is safer, but also because it is more readable. The developers of the framework did some benchmarking here: https://nbronson.github.io/scala-stm/benchmark.html, which states that even if STM is a tiny bit slower than tricky locks, it is much safer to use.

ACTORS (AKKA)

Most of the Scala developers come to the language because they need to use Akka or Spark in their project and Java is just not good enough for this job. Only a few of us are in the domain because of Scala itself, and not because of its ecosystem. So, let's see why those frameworks are so popular that they make people learn a new language just to use them efficiently.

Akka was first created in 2009 and today it is more popular than Playframework, Typesafe company, or even Scala itself (judging by the number of Akka's stickers taken at a typical programming conference where Typesafe is participating). This framework may be used with Java, but it becomes apparent that Akka was not created for this language, and only has some adapters making it possible to use it by Java-only developers.

Akka is one of the most popular frameworks for Actor programming and, by chance, it's made in Scala. Actor programming is a more natural, human way to approach concurrency problems: imagine you are a manager and you are given a task to calculate Pi number up to a 1,000th decimal. You also have 10 people under your subordination who crave to do some work for you. One of the options could be to give all the work to your favorite co-worker and to wait quite some time until the result is done. Another option is to use a special algorithm that precisely calculates Pi using a sum (see Figure 11.9):

A math formula involving Pi.

Figure 11.9

As you may see, the fact that Pi's calculation may be done as a form of a simple sum lets you distribute your work in individual chunks. The “worker#1” will calculate n=0 to n=19, the “worker#2” will do the calculations from n=20 to n=39, etc. In the end you just get the result from each of the workers, add them up, and multiply the sum by four! You may even employ a worker who is not in your office, where a result is sent remotely (via TCP/IP or any other protocol).

How do Akka's actors work? Each actor receives messages in its mail box and treats these messages strictly in the order they were received. Not all messages are treated, but only the ones that have a name known to the receiver. Let's start with an elementary application that is using Akka to solve a problem we had at the beginning of this chapter: a concurrent counter. First you need to include the dependency into your build.sbt:

"com.typesafe.akka" %% "akka-actor" % "2.4.1"

Following that, you will need to add two files, one for the actor:

class CountingActor extends Actor with ActorLogging {
  var counter = 0
  def receive = {
    case "+1" =>
      counter += 1
      log.info(s"Current count is $counter")
      if (counter == 3) context.system.terminate()
  }
}

and one for the Application that will manipulate the Actor:

object ApplicationMain extends App {
  val system = ActorSystem("MyActorSystem")
  val countingActor = system.actorOf(Props[CountingActor], "pingActor")
  countingActor ! "+1"
  countingActor ! "+1"
  countingActor ! "+1"
  Await.result(system.whenTerminated, 10 seconds)
}

Pretty simple, so let's start with the Actor's file. The CountingActor has a mutable field counter and an overridden receive method. The method is of the type PartialFunction[Any, Unit], and it means that you may send any type of message to the actor: String, Integer, a Case Class, or anything else, and it will still receive it and try to react to it. In this case, the only message that it may react to is a string “+1”; after receiving it, the actor will increment the counter by one, log the result, and check if the counter is already equal to 3 to shut down the Actor platform. Notice that the actor won't react to a “+2” message as it, obviously, does not understand it.

In the application class a new actor system can be created, which initializes an instance of CountingActor and sends a few messages. You can terminate the actor system either when the actor increments the counter three times, or after 10 seconds. The countingActor is not actually an instance, but a reference to the actor because it may be located on a remote machine, just as an IP address is to an online server.

In this example, to keep it simple, we send a message to the actor from the main application (! means “send” coming from the Actor system in the Erlang language). In real life it's mostly actors that communicate with other actors. What's important here is that all communications are asynchronous: actor doesn't wait for a reply to his message. If the other actor wants to send a reply, it will be a normal message with a reply data. The whole system is created to be scalable, so you can initialize thousands of actors that are located on the same machine or distributed over a cluster of servers. It's amazing!

When working with actors, there are a few things to keep in mind, so take a look at the following schema (Figure 11.10).

Schema of a cluster of two messages A and B and two actors 1 and 2.

Figure 11.10

In this case, if the message “A” was sent before the message “B” it is guaranteed that the actor “2” will receive the messages in that exact order. Now here is another schema shown in Figure 11.11.

Schema of a cluster of three messages A, B, and C and three actors 1, 2, and 3.

Figure 11.11

In this case, even if the message “A” was sent before the message “B,” we don't have any guarantee that the message “A” will arrive before the message “C,” and it may also be the other way around. This way the system becomes nondeterministic and you may need to add some code to make it more predictable.

The Akka framework is a great way to abstract mutable state inside an isolated entity called Actor. Everything that happens there is sequential and the information is shared through messages, asynchronously. If your application may be imagined as a group of separate workers, Akka may be a great way to improve performance as well as add a possibility of an easy horizontal scaling.

SPARK

It's worth mentioning Spark, which is a wonderful framework that is very helpful for those who are working with Big Data assets. As you may know, terabytes of data can't be treated with a simple, well known “for loop,” since you need either a very powerful mainframe (vertical scaling) or a bunch of low-cost servers connected together (horizontal scaling), as well as some specific tools to work with them. The former approach is, often, too expensive to be used in the Big Data context, which is why the instruments for horizontal scaling are so popular today.

Spark was created as an alternative to Hadoop's map-reduce. From the high level standpoint, this framework looks just like collection of methods that are executed on a distributed environment. If applied properly, the projects using this framework are much faster and use less code than the projects that were built with plain Hadoop. For the first example you will need the library to be included in the build.sbt file:

"org.apache.spark" %% "spark-core" % "1.6.0"

and this code:

val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]")
val sc = new SparkContext(conf)
val readme: RDD[String] = sc.textFile("README.md")
val numAs = readme.filter(line => line.contains("a")).count()
val numBs = readme.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))

First, you need to initialize your Spark instance with the provided configuration. “local[4]” means that it will be executed locally with four parallel threads. The SparkContext is ready and you need to provide it with something to compute. For this example you will read the readme file from the root of the project and feed it to some filters. But before that, let's talk a little bit about the RDD[String] type.

The “RDD” it is translated as “resilient distributed dataset” and you work with it just as with a simple collection of lines from the file. The distribution is transparent to you just as parallelism is in Parallel collections. If you apply some operation to it like, for instance, the filter(), it will still return an RDD. The important thing here is that the result of the operation is not computed until you need to return something that isn't an RDD. Such operations are called “actions,” and by contrast, the operations that return RDD are called “transformations.”

As you know, in the case of Scala's collections even simple “transformation” operations like ­filter() or map() will need a full traversal. If the collection contains a terabyte worth of elements, it would be a disaster to do it each time you add a new operation. Instead, Spark postpones all of the transformations until an “action” operation is applied, such as count(), collect, or reduce().

By default, each time you apply an “action” operation, the RDD is re-computed (in this case it is re-read from the file). You can use the value two times, so you need some way to speed it up. One of the ways to do it is to cache it and keep the data mounted in memory. Don't worry, if your operational memory is not sufficient, Spark will handle it by storing the remaining data inside of a storage file:

val readme: RDD[String] = sc.textFile("README.md").cache()

This way the data will be kept on the cluster (in this case, in the local memory) for faster access. Note that the type is left the same, but the operations are quicker! But remember, only use it if the RDD is manipulated more than once, because it is better for memory management and time execution.

This is just a simple example of how Spark works. The data file may be located on an HDFS filesystem, in a database, as a CSV file, or even as a JSON extract from your MongoDB. In addition to the problems we talked about in the section in this chapter about Parallel Collections, now you need to worry about data locality, so that transformations don't need to transfer a lot of data between the servers in the cluster.

Another way of doing distributed data analysis with Spark is the Spark SQL package. If your data can be represented as a table (it was read from an actual database table, CSV file, JSON file, etc.), you may query it as you do in relational databases: the SQL statement will be automatically translated as a sequence of Spark's operations. For example, analyze the following .csv file:

name,age
Alex,26
Sam,24
Bob,15

It will look like the following:

val conf = new SparkConf().setAppName("Simple Application").setMaster("local[4]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", "true") // Use first line of all files as header
  .option("inferSchema", "true") // Automatically infer data types
  .load("src/main/resources/users.csv")
df.show() // will show current DataFrame as a table
df.registerTempTable("users")
val adults = sqlContext.sql("SELECT name FROM users WHERE age > 18")
adults.show()

The first 8 lines are just Spark context initialization and file reading. The show() method applied to a DataFrame prints its content in a nice table-like format. registerTempTable() makes it possible to apply SQL queries to your dataset, so you can select adults within our users.

This was a fraction of Spark's capabilities, but it should give you the idea if it is the tool you need for your application. It is easy to use, fast, reliable, and it has a wonderful ecosystem that, in addition to traditional batches, let's you work with streams of data, graphs, or even machine learning algorithms.

SUMMARY

Concurrent, parallel, or even distributed programming is considered to be one of the most complex topics in today's programming. In this chapter you saw what tools Scala has up its sleeve to deal with it and in what situations they can be applied. Even though we only touched the surface on all of these topics, you should be able to choose the right tools to work with your concurrent problems.

We started from the basics, where the roots of concurrent programming in Java introduced in ­version 1.5 were discussed. You saw why it is considered complicated and why it is better to use more high-level abstractions instead of pure threads and synchronization. You also learned that if your needs are limited to a single variable, then there is no need to use synchronize, and you should use Atomic variables instead!

Following that we covered how you can work with an asynchronous monad called Future and how it is convenient to work with a method that is returning it. You also saw parallel collections available in Java 8 and how it is hard to predict if adding .par can improve performance. Always benchmarking the optimization before adding it!

We covered more exotic libraries like RxScala and ScalaSTM, both of which are useful in some specific situations.

Last but not least, we covered the basics of developing with Akka and Spark frameworks. Akka can improve your application with asynchronous messaging and easier horizontal scaling if only your domain logic may be imagined as a work of a group of actors, communicating with messages. And Spark is a tool of choice when dealing with large-scale, distributed data.

All in all, this chapter has provided you with the fundamentals of how to choose the right tool for the right problem. If you are not sure whether the concurrent optimization helps, pass it through a JMH micro benchmark so you can be certain that there are some actual benefits.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset