Chapter 15: Reactive Programming

In this chapter, you will be introduced to the Reactive Manifesto and the world of reactive programming. We start with defining and discussing the main concepts of reactive programming – asynchronous, non-blocking, and responsive. Using them, we then define and discuss reactive programming, the main reactive frameworks, and talk about RxJava in more detail.

In this chapter, we will cover the following topics:

  • Asynchronous processing
  • Non-blocking APIs
  • Reactive – responsive, resilient, elastic, and message-driven systems
  • Reactive streams
  • RxJava

By the end of the chapter, you will be able to write code for asynchronous processing using reactive programming.

Technical requirements

To be able to execute the code examples that are provided in this chapter, you will need the following:

  • A computer with an operating system: Microsoft Windows, Apple macOS, or Linux
  • Java SE version 17 or later
  • Any IDE or code editor you prefer

The instructions for how to set up a Java SE and IntelliJ IDEA editor were provided in Chapter 1, Getting Started with Java 17, of this book. The files and the code examples for this chapter are available from the GitHub repository at https://github.com/PacktPublishing/Learn-Java-17-Programming.git. You can locate them in the examples/src/main/java/com/packt/learnjava/ch15_reactive folder.

Asynchronous processing

Asynchronous means that the requestor gets the response immediately, but the result is not there. Instead, the requestor waits until the result is sent to them, saved in the database, or, for example, presented as an object that allows you to check whether the result is ready. If the latter is the case, the requestor calls a certain method to this object periodically and, when the result is ready, retrieves it using another method on the same object. The advantage of asynchronous processing is that the requestor can do other things while waiting.

In Chapter 8, Multithreading and Concurrent Processing, we demonstrated how a child thread can be created. Such a child thread then sends a non-asynchronous (blocking) request and waits for its return doing nothing. Meanwhile, the main thread continues executing and periodically calls the child thread object to see whether the result is ready. That is the most basic of asynchronous processing implementations. In fact, we already used it when we used parallel streams.

The parallel stream operations that work behind the scenes to create the child threads break the stream into segments, assign each segment to a dedicated thread for processing, and then aggregate the partial results from all the segments into the final result. In the previous chapter, we even wrote functions that did the aggregating job. As a reminder, the function was called combiner.

Let’s compare the performance of sequential and parallel streams using an example.

Sequential and parallel streams

To demonstrate the difference between sequential and parallel processing, let’s imagine a system that collects data from 10 physical devices (such as sensors) and calculates an average. The following is the get() method, which collects a measurement from a sensor identified by its ID:

double get(String id){
    try{
        TimeUnit.MILLISECONDS.sleep(100);
    } catch(InterruptedException ex){
        ex.printStackTrace();
    }
    return id * Math.random();
}

We have included a delay of 100 ms to imitate the time it takes to collect the measurement from the sensor. As for the resulting measurement value, we use the Math.random() method. We are going to call this get() method using an object of the MeasuringSystem class, which is where the method belongs.

Then, we are going to calculate an average to offset the errors and other idiosyncrasies of individual devices:

void getAverage(Stream<Integer> ids) {
    LocalTime start = LocalTime.now();
    double a = ids.mapToDouble(id -> new MeasuringSystem()
                  .get(id))
                  .average()
                  .orElse(0);
    System.out.println((Math.round(a * 100.) / 100.) + " in " +
       Duration.between(start, LocalTime.now()).toMillis() +  
                                                         "ms");
}

Notice how we convert the stream of IDs into DoubleStream using the mapToDouble() operation so that we can apply the average() operation. The average() operation returns an Optional<Double> object, and we call its orElse(0) method, which returns either the calculated value or zero (for example, if the measuring system could not connect to any of its sensors and returned an empty stream).

The last line of the getAverage() method prints the result and the time it took to calculate it. In real code, we would return the result and use it for other calculations. However, for demonstration purposes, we will just print it.

Now we can compare the performance of sequential stream processing with the performance of parallel processing (see the MeasuringSystem class and the compareSequentialAndParallelProcessing() method):

List<Integer> ids = IntStream.range(1, 11)
                             .mapToObj(i -> i)
                             .collect(Collectors.toList());
getAverage(ids.stream());          //prints: 2.99 in 1030 ms
getAverage(ids.parallelStream());  //prints: 2.34 in  214 ms

The results might be different if you run this example because, as you might recall, we simulate the collected measurements as random values.

As you can see, the processing of a parallel stream is five times faster than the processing of a sequential stream. The results are different because the measurement produces a slightly different result each time.

Although the parallel stream uses asynchronous processing behind the scenes, this is not what programmers have in mind when talking about the asynchronous processing of requests. From the application’s perspective, it is just parallel (also called concurrent) processing. It is faster than sequential processing, but the main thread has to wait until all the calls are made and the data has been retrieved. If each call takes at least 100 ms (as it is in our case), then the processing of all the calls cannot be completed in less time, even when each call is made by a dedicated thread.

Of course, we can create a service that uses a child thread to make all the calls, while the main thread does something else. Later, the main thread can call the service again and get the result or pick it up from a previously agreed location. That truly would be the asynchronous processing programmers are talking about.

But before writing such code, let’s look at the CompletableFuture class located in the java.util.concurrent package. It does everything described and more.

Using the CompletableFuture object

Using the CompletableFuture object, we can separate sending the request to the measuring system from getting the result from the CompletableFuture object. That is exactly the scenario we described while explaining what asynchronous processing was. Let’s demonstrate it in the code (see the MeasuringSystem class and the completableFuture() method):

List<CompletableFuture<Double>> list = ids.stream()
     .map(id -> CompletableFuture.supplyAsync(() ->
  new MeasuringSystem().get(id))).collect(Collectors.toList());

The supplyAsync() method does not wait for the call to the measuring system to return. Instead, it immediately creates a CompletableFuture object and returns it. This is so that a client can use this object any time later on to retrieve the result returned by the measuring system. The following code takes the list of CompletableFuture objects and iterates over it, retrieving the result from each object and calculating the average value:

LocalTime start = LocalTime.now();
double a = list.stream()
               .mapToDouble(cf -> cf.join().doubleValue())
               .average()
               .orElse(0);
System.out.println((Math.round(a * 100.) / 100.) + " in " +
  Duration.between(start, LocalTime.now()).toMillis() + " ms"); 
                                         //prints: 2.92 in 6 ms

Additionally, some methods allow you to check whether the value was returned at all, but that is not the point of this demonstration, which is to show how the CompletableFuture class can be used to organize asynchronous processing.

The created list of CompletableFuture objects can be stored anywhere and processed very quickly (in our case, in 6 ms), provided that the measurements have been received already (all the get() methods were invoked and returned values). After creating the list of CompletableFuture objects and before processing it, the system is not blocked and can do something else. That is the advantage of asynchronous processing.

The CompletableFuture class has many methods and is supported by several other classes and interfaces. For example, a fixed-size thread pool can be added to limit the number of threads (see the MeasuringSystem class and the threadPool() method):

ExecutorService pool = Executors.newFixedThreadPool(3);
List<CompletableFuture<Double>> list = ids.stream()
        .map(id -> CompletableFuture.supplyAsync(() -> 
                         new MeasuringSystem().get(id), pool))
        .collect(Collectors.toList());

There is a variety of such pools for different purposes and different performances. But using a pool does not change the overall system design, so we omit such a detail.

As you can see, the power of asynchronous processing is great. There is also a variation of the asynchronous API called a non-blocking API. We are going to discuss this in the next section.

Non-blocking APIs

The client of a non-blocking API gets the results without being blocked for a significant amount of time, thus allowing the client to do something else during the period when the results are being prepared. So, the notion of a non-blocking API implies a highly responsive application. The processing of the request (that is, getting the results) can be done synchronously or asynchronously – it does not matter to the client. In practice, though, typically, the application uses asynchronous processing to facilitate an increased throughput and improved performance of the API.

The term non-blocking came into use with the java.nio package. The non-blocking input/output (NIO) provides support for intensive input/output (I/O) operations. It describes how the application is implemented: it does not dedicate an execution thread to each of the requests but provides several lightweight worker threads that do the processing asynchronously and concurrently.

The java.io package versus the java.nio package

Writing and reading data to and from external memory (for example, a hard drive) is a much slower operation than processing in memory only. Initially, the already-existing classes and interfaces of the java.io package worked well, but once in a while, they would create a performance bottleneck. The new java.nio package was created to provide more effective I/O support.

The java.io implementation is based on I/O stream processing. As we saw in the previous section, essentially, this is a blocking operation even if some kind of concurrency is happening behind the scenes. To increase speeds, the java.nio implementation was introduced based on the reading/writing to/from a buffer in the memory. Such a design allowed it to separate the slow process of filling/emptying the buffer and quickly reading/writing from/to it.

In a way, it is similar to what we have done in our example of CompletableFuture usage. The additional advantage of having data in a buffer is that it is possible to inspect the data, going there and back along with the buffer, which is impossible while reading sequentially from the stream. It has provided more flexibility during data processing. In addition, the java.nio implementation introduced another middleman process called a channel for bulk data transfers to and from a buffer.

The reading thread is getting data from a channel and only receives what is currently available or nothing at all (if no data is in the channel). If data is not available, the thread, instead of remaining blocked, can do something else–for example, reading/writing to/from other channels in the same way the main thread in our CompletableFuture example was free to do whatever had to be done while the measuring system was getting data from its sensors.

This way, instead of dedicating a thread to one I/O process, a few worker threads can serve many I/O processes. Such a solution was eventually called NIO and was later applied to other processes, the most prominent being the event processing in an event loop, which is also called a run loop.

The event/run loop

Many non-blocking systems are based on the event (or run) loop – a thread that is continually executed. It receives events (requests and messages) and then dispatches them to the corresponding event handlers (workers). There is nothing special about event handlers. They are just methods (functions) dedicated by the programmer for the processing of the particular event type.

Such a design is called a reactor design pattern. It is constructed around processing events and service requests concurrently. Also, it gives the name to the reactive programming and reactive systems that react to events and process them concurrently.

Event loop-based design is widely used in operating systems and graphical user interfaces. It has been available in Spring WebFlux since Spring 5 and can be implemented in JavaScript and the popular executing environment, Node.js. The latter uses an event loop as its processing backbone. The toolkit, Vert.x, is built around the event loop, too.

Before the adoption of an event loop, a dedicated thread was assigned to each incoming request – much like in our demonstration of stream processing. Each of the threads required the allocation of a certain amount of resources that were not request-specific, so some of the resources – mostly memory allocation – were wasted. Then, as the number of requests grew, the CPU needed to switch its context from one thread to another more frequently to allow more or less concurrent processing of all the requests. Under the load, the overhead of switching the context is substantial enough to affect the performance of an application.

Implementing an event loop has addressed these two issues. It has eliminated the waste of resources by avoiding the creation of a thread for each request and removed the overhead of switching the context. With an event loop in place, a much smaller memory allocation is needed for each request to capture its specifics, which makes it possible to keep many more requests in memory so that they can be processed concurrently. The overhead of the CPU context-switching has become far smaller too because of the diminishing context size.

The non-blocking API is a way of processing requests so that systems are able to handle a much bigger load while remaining highly responsive and resilient.

Reactive

Usually, the term reactive is used in the context of reactive programming and reactive systems. Reactive programming (which is also called Rx programming) is based on asynchronous data streams (which is also called reactive streams). It was introduced as a Reactive Extension (RX) of Java, which is also called RxJava (http://reactivex.io). Later, RX support was added to Java 9 in the java.util.concurrent package. It allows a Publisher to generate a stream of data to which a Subscriber can asynchronously subscribe.

One principal difference between reactive streams and standard streams (which are also called Java 8 streams and are located in the java.util.stream package) is that a source (publisher) of the reactive stream pushes elements to subscribers at its own rate, while in standard streams, a new element is pulled and emitted only after the previous one has been processed (in fact, it acts like a for loop).

As you have seen, we were able to process data asynchronously even without this new API by using CompletableFuture. But after writing such code a few times, you might notice that most of the code is just plumbing, so you get the feeling that there has to be an even simpler and more convenient solution. That’s how the reactive streams initiative (http://www.reactive-streams.org) was born. The scope of the effort was defined as follows:

The scope of Reactive Streams is to find a minimal set of interfaces, methods, and protocols that will describe the necessary operations and entities to achieve the goal – asynchronous streams of data with non-blocking back pressure.

The term non-blocking backpressure refers to one of the problems of asynchronous processing: coordinating the speed rate of the incoming data with the ability of the system to process them without the need for stopping (blocking) the data input. The solution is to inform the source that the consumer has difficulty keeping up with the input. Also, processing should react to the change in the rate of the incoming data in a more flexible manner than just blocking the flow, hence the name reactive.

Several libraries already implement the reactive streams API: RxJava (http://reactivex.io), Reactor (https://projectreactor.io), Akka Streams (https://akka.io/docs), and Vert.x (https://vertx.io/) are among the most well known. Writing code using RxJava or another library of asynchronous streams constitutes reactive programming. It realizes the goal declared in the Reactive Manifesto (https://www.reactivemanifesto.org) by building reactive systems that are responsive, resilient, elastic, and message-driven.

Responsive

This term is relatively self-explanatory. The ability to respond in a timely manner is one of the primary qualities of any system. There are many ways to achieve it. Even a traditional blocking API supported by enough servers and other infrastructure can achieve decent responsiveness under a growing load.

Reactive programming helps to do this using less hardware. It comes at a price, as reactive code requires changing the way we think about control flow. But after some time, this new way of thinking becomes as natural as any other familiar skill.

In the following sections, we will see quite a few examples of reactive programming.

Resilient

Failures are inevitable. The hardware crashes, the software has defects, unexpected data is received, or an untested execution path has been taken – any of these events, or a combination of them, can happen at any time. Resilience is the ability of a system to continue delivering the expected results under unexpected circumstances.

For example, it can be achieved using redundancy of the deployable components and hardware, using isolation of parts of the system so the domino effect becomes less probable, by designing the system with automatically replaceable parts, or by raising an alarm so that qualified personnel can interfere. Additionally, we have talked about distributed systems as a good example of resilient systems by design.

A distributed architecture eliminates a single point of failure. Also, breaking the system into many specialized components that talk to one another using messages allows better tuning for the duplication of the most critical parts and creates more opportunities for their isolation and potential failure containment.

Elastic

Usually, the ability to sustain the biggest possible load is associated with scalability. But the ability to preserve the same performance characteristics under a varying load, not just under the growing one, is called elasticity.

The client of an elastic system should not notice any difference between the idle periods and the periods of peak load. A non-blocking reactive style of implementation facilitates this quality. Also, breaking the program into smaller parts and converting them into services that can be deployed and managed independently allows for the fine-tuning of resource allocation.

Such small services are called microservices, and many of them together can comprise a reactive system that can be both scalable and elastic. We will talk about such architecture, in more detail, in the following sections and the next chapter.

Message-driven

We have already established that component isolation and system distribution are two aspects that help to keep the system responsive, resilient, and elastic. Loose and flexible connections are important conditions that support these qualities, too. And the asynchronous nature of the reactive system simply does not leave the designer any other choice but to build communication between the components and the messages.

It creates breathing space around each component without which the system would become a tightly coupled monolith that was susceptible to all kinds of problems, not to mention a maintenance nightmare.

In the next chapter, we are going to look at an architectural style that can be used to build an application as a collection of loosely coupled microservices that communicate using messages.

Reactive streams

The reactive streams API, which was introduced in Java 9, consists of the following four interfaces:

@FunctionalInterface
public static interface Flow.Publisher<T> {
    public void subscribe(Flow.Subscriber<T> subscriber);
}
public static interface Flow.Subscriber<T> {
    public void onSubscribe(Flow.Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}
public static interface Flow.Subscription {
    public void request(long numberOfItems);
    public void cancel();
}
public static interface Flow.Processor<T,R>
               extends Flow.Subscriber<T>, Flow.Publisher<R> {
}

A Flow.Subscriber object can be passed, as a parameter, into the subscribe() method of Flow.Publisher<T>. Then, the publisher calls the subscriber’s onSubscribe() method and passes to it a Flow.Subscription object as a parameter. Now, the subscriber can call request(long numberOfItems) on the subscription object to request data from the publisher. That is the way the pull model can be implemented, which leaves it up to a subscriber to decide when to request another item for processing. The subscriber can unsubscribe from the publisher services by calling the cancel() method on the subscription.

In return, the publisher can pass a new item to the subscriber by calling the subscriber’s onNext() method. When no more data will be coming (that is, all the data from the source was emitted) the publisher calls the subscriber’s onComplete() method. Also, by calling the subscriber’s onError() method, the publisher can tell the subscriber that it has encountered a problem.

The Flow.Processor interface describes an entity that can act as both a subscriber and a publisher. It allows you to create chains (or pipelines) of such processors, so a subscriber can receive an item from a publisher, transform it, and then pass the result to the next subscriber or processor.

In a push model, the publisher can call onNext() without any request from the subscriber. If the rate of processing is lower than the rate of the item being published, the subscriber can use various strategies to relieve the pressure. For example, it can skip the items or create a buffer for temporary storage with the hope that the item production will slow down and the subscriber will be able to catch up.

This is the minimal set of interfaces that the reactive streams initiative has defined in support of the asynchronous data streams with non-blocking backpressure. As you can see, it allows the subscriber and publisher to talk to each other and coordinate the rate of incoming data; therefore, it makes possible a variety of solutions for the backpressure problem that we discussed in the Reactive section.

There are many ways to implement these interfaces. Currently, in JDK 9, there is only one implementation of one of the interfaces: the SubmissionPublisher class implements Flow.Publisher. The reason for this is that these interfaces are not supposed to be used by an application developer. It is a Service Provider Interface (SPI) that is used by the developers of the reactive streams libraries. If needed, use one of the already-existing toolkits to implement the reactive streams API that we mentioned earlier: RxJava, Reactor, Akka Streams, Vert.x, or any other library of your preference.

RxJava

In our examples, we will use RxJava 2.2.21 (http://reactivex.io) . It can be added to the project using the following dependency:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.21</version>
</dependency>

First, let’s compare two implementations of the same functionality using the java.util.stream package and the io.reactivex package. The sample program is going to be very simple:

  • Create a stream of integers: 1, 2, 3, 4, and 5.
  • Only filter the even numbers (that is, 2 and 4).
  • Calculate the square root of each of the filtered numbers.
  • Calculate the sum of all the square roots.

Here is how it can be implemented using the java.util.stream package (see the ObservableIntro class and the squareRootSum() method):

double a = IntStream.rangeClosed(1, 5)
                    .filter(i -> i % 2 == 0)
                    .mapToDouble(Double::valueOf)
                    .map(Math::sqrt)
                    .sum();
System.out.println(a);          //prints: 3.414213562373095

Additionally, the same functionality implemented with RxJava looks like this:

Observable.range(1, 5)
      .filter(i -> i % 2 == 0)
      .map(Math::sqrt)
      .reduce((r, d) -> r + d)
      .subscribe(System.out::println); 
                                    //prints: 3.414213562373095

RxJava is based on the Observable object (which plays the role of Publisher) and Observer that subscribes to the Observable object and waits for the data to be emitted.

In contrast to the Stream functionality, Observable has significantly different capabilities. For example, a stream, once closed, cannot be reopened, while an Observable object can be used again. Here is an example (see the ObservableIntro class and the reuseObservable() method):

Observable<Double> observable = Observable.range(1, 5)
     .filter(i -> i % 2 == 0)
     .doOnNext(System.out::println)    //prints 2 and 4 twice
     .map(Math::sqrt);
observable
     .reduce((r, d) -> r + d)
     .subscribe(System.out::println);  
                                    //prints: 3.414213562373095
observable
     .reduce((r, d) -> r + d)
     .map(r -> r / 2)
     .subscribe(System.out::println);  
                                   //prints: 1.7071067811865475

In the preceding example, as you can see from the comments, the doOnNext() operation was called twice, which means the observable object also emitted values twice, once for each processing pipeline:

If we do not want Observable to run twice, we can cache its data, by adding the cache() operation (see the ObservableIntro class and the cacheObservableData() method):

Observable<Double> observable = Observable.range(1,5)
     .filter(i -> i % 2 == 0)
     .doOnNext(System.out::println)  //prints 2 and 4 only once
     .map(Math::sqrt)
     .cache();
observable
     .reduce((r, d) -> r + d)
     .subscribe(System.out::println); 
                                    //prints: 3.414213562373095
observable
     .reduce((r, d) -> r + d)
     .map(r -> r / 2)
     .subscribe(System.out::println);  
                                   //prints: 1.7071067811865475

As you can see, the second usage of the same Observable object took advantage of the cached data, thus allowing for better performance:

RxJava provides such a rich functionality that there is no way we can review it all in this book. Instead, we will try to cover the most popular functionality. The API describes the methods available for invocation using an Observable object. Such methods are also called operations (as in the case with the standard Java 8 streams) or operators (this term is mostly used in connection to reactive streams). We will use these three terms – methods, operations, and operators – interchangeably as synonyms.

Observable types

Talking about the RxJava 2 API (notice that is it quite different from RxJava 1), we will use the online documentation, which can be found at http://reactivex.io/RxJava/2.x/javadoc/index.html.

An observer subscribes to receive values from an observable object, which can behave as one of the following types:

  • Blocking: This waits until the result is returned.
  • Non-blocking: This processes the emitted elements asynchronously.
  • Cold: This emits an element at the observer’s request.
  • Hot: This emits elements whether an observer has subscribed or not.

An observable object can be an object of one of the following classes of the io.reactivex package:

  • Observable<T>: This can emit none, one, or many elements; it does not support backpressure.
  • Flowable<T>: This can emit none, one, or many elements; it supports backpressure.
  • Single<T>: This can emit either one element or an error; the notion of backpressure does not apply.
  • Maybe<T>: This represents a deferred computation. It can emit either no value, one value, or an error; the notion of backpressure does not apply.
  • Completable: This represents a deferred computation without any value. This indicates the completion of a task or an error; the notion of backpressure does not apply.

An object of each of these classes can behave as a blocking, non-blocking, cold, or hot observable. They differ from each other by the number of values that can be emitted, their ability to defer the returning of the result or returning the flag of the task completion only, and their ability to handle backpressure.

Blocking versus non-blocking

To demonstrate this behavior, we create an observable that emits five sequential integers, starting with 1 (see the BlockingOperators class and the observableBlocking1() method):

Observable<Integer> obs = Observable.range(1,5);

All the blocking methods (operators) of Observable start with the “blocking.” For example, the blockingLast() operator blocks the pipeline until the last elements are emitted:

Double d2 = obs.filter(i -> i % 2 == 0)
               .doOnNext(System.out::println)  //prints 2 and 4
               .map(Math::sqrt)
               .delay(100, TimeUnit.MILLISECONDS)
               .blockingLast();
System.out.println(d2);                        //prints: 2.0

In this example, we only select even numbers, print the selected element, and then calculate the square root and wait for 100 ms (imitating a long-running calculation). The result of this example is as follows:

The non-blocking version of the same functionality is as follows (see the BlockingOperators class and the second half of the observableBlocking1() method):

List<Double> list = new ArrayList<>();
obs.filter(i -> i % 2 == 0)
   .doOnNext(System.out::println)  //prints 2 and 4
   .map(Math::sqrt)
   .delay(100, TimeUnit.MILLISECONDS)
   .subscribe(d -> {
        if(list.size() == 1){
            list.remove(0);
        }
        list.add(d);
   });
System.out.println(list);          //prints: []

We use the List object to capture the result because, as you might remember, the lambda expression does not allow us to use the non-final variables.

As you can see, the resulting list is empty. That is because the pipeline calculations are performed without blocking (asynchronously). We set a delay of 100 ms (to simulate processing, which takes a long time), but there is no blocking operation, so the control goes down to the next line that prints the list content, which is still empty.

To prevent the control from going to this line too early, we can set a delay in front of it (see the BlockingOperators class and the observableBlocking2() method):

try {
    TimeUnit.MILLISECONDS.sleep(250);
} catch (InterruptedException e) {
    e.printStackTrace();
}
System.out.println(list);   //prints: [2.0]

Note that the delay has to be 200 ms at least because the pipeline processes two elements, each with a delay of 100 ms. Now you can see the list contains an expected value of 2.0.

Esentially, that is the difference between blocking and non-blocking operators. Other classes that represent an observable have similar blocking operators. Here are some examples of blocking Flowable, Single, and Maybe (see the BlockingOperators class and the flowableBlocking(), singleBlocking(), and maybeBlocking() methods):

Flowable<Integer> obs = Flowable.range(1,5);
Double d2 = obs.filter(i -> i % 2 == 0)
        .doOnNext(System.out::println)  //prints 2 and 4
        .map(Math::sqrt)
        .delay(100, TimeUnit.MILLISECONDS)
        .blockingLast();
System.out.println(d2);                 //prints: 2.0
Single<Integer> obs2 = Single.just(42);
int i2 = obs2.delay(100, TimeUnit.MILLISECONDS).blockingGet();
System.out.println(i2);                 //prints: 42
Maybe<Integer> obs3 = Maybe.just(42); 
int i3 = obs3.delay(100, TimeUnit.MILLISECONDS).blockingGet(); 
System.out.println(i3);                 //prints: 42 

The Completable class has blocking operators that allow us to set a timeout (see the BlockingOperators class and the second half of the completableBlocking() method):

(1) Completable obs = Completable.fromRunnable(() -> {
         System.out.println("Run");           //prints: Run
         try {
              TimeUnit.MILLISECONDS.sleep(200);
         } catch (InterruptedException e) {
              e.printStackTrace();
         }
    });                                           
(2) Throwable ex = obs.blockingGet();
(3) System.out.println(ex);                   //prints: null
//(4) ex = obs.blockingGet(15, TimeUnit.MILLISECONDS);
// java.util.concurrent.TimeoutException: 
//      The source did not signal an event for 15 milliseconds.
(5) ex = obs.blockingGet(150, TimeUnit.MILLISECONDS);
(6) System.out.println(ex);                   //prints: null
(7) obs.blockingAwait();
(8) obs.blockingAwait(15, TimeUnit.MILLISECONDS);

The result of the preceding code is presented in the following screenshot:

 

The first Run message comes from line 2 in response to the call of the blocking blockingGet() method. The first null message comes from line 3. Line 4 throws an exception because the timeout was set to 15 ms, while the actual processing was set to a delay of 100 ms. The second Run message comes from line 5 in response to the blockingGet() method call. This time, the timeout is set to 150 ms, which is more than 100 ms, so the method is able to return before the timeout was up.

The last two lines, 7 and 8, demonstrate the usage of the blockingAwait() method with and without a timeout. This method does not return a value but allows the observable pipeline to run its course. Interestingly, it does not break with an exception even when the timeout is set to a smaller value than the time the pipeline takes to finish. Apparently, it starts waiting after the pipeline has finished processing unless it is a defect that will be fixed later (the documentation is not clear regarding this point).

Although blocking operations do exist (and we will review more of them while talking about each observable type in the following sections), they are and should only be used in cases when it is not possible to implement the required functionality of using non-blocking operations only. The main thrust of reactive programming is to strive to process all requests asynchronously in a non-blocking style.

Cold versus hot

So far, all the examples we have seen have only demonstrated a cold observable, which only provides the next value at the request of the processing pipeline after the previous value has been processed. Here is another example (see the ColdObservable class and the main() method):

Observable<Long> cold = 
        Observable.interval(10, TimeUnit.MILLISECONDS);
cold.subscribe(i -> System.out.println("First: " + i));
pauseMs(25);
cold.subscribe(i -> System.out.println("Second: " + i));
pauseMs(55);

We have used the interval() method to create an Observable object that represents a stream of sequential numbers emitted at every specified interval (in our case, every 10 ms). Then, we subscribe to the created object, wait 25 ms, subscribe again, and wait another 55 ms. The pauseMs() method is as follows:

void pauseMs(long ms){
    try {
        TimeUnit.MILLISECONDS.sleep(ms);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

If we run the preceding example, the output will look similar to the following:

As you can see, each of the pipelines processed every value emitted by the cold observable.

To convert the cold observable into a hot one, we use the publish() method, which converts the observable into a ConnectableObservable object that extends the Observable object (see the HotObservable class and the hot1() method):

ConnectableObservable<Long> hot = 
      Observable.interval(10, TimeUnit.MILLISECONDS).publish();
hot.connect();
hot.subscribe(i -> System.out.println("First: " + i));
pauseMs(25);
hot.subscribe(i -> System.out.println("Second: " + i));
pauseMs(55);

As you can see, we have to call the connect() method so that the ConnectableObservable object starts emitting values. The output looks similar to the following:

The preceding output shows that the second pipeline did not receive the first three values because it was subscribed to the observable later on. So, the observable emits values independent of the ability of the observers to process them. If the processing falls behind, and new values keep coming while the previous ones are not fully processed yet, the Observable class puts them into a buffer. If this buffer grows large enough, the JVM can run out of memory because, as we mentioned earlier, the Observable class is not capable of backpressure management.

For such cases, the Flowable class is a better candidate for the observable because it does have the ability to handle backpressure. Here is an example (see the HotObservable class and the hot2() method):

PublishProcessor<Integer> hot = PublishProcessor.create();
hot.observeOn(Schedulers.io(), true)
   .subscribe(System.out::println, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
    hot.onNext(i);
}

The PublishProcessor class extends Flowable and has an onNext(Object o) method that forces it to emit the passed-in object. Before calling it, we have subscribed to the observable using the Schedulers.io() thread. We will talk about schedulers in the Multithreading (scheduler) section.

The subscribe() method has several overloaded versions. We decided to use the one that accepts two Consumer functions: the first one processes the passed-in value, and the second one processes an exception if it was thrown by any of the pipeline operations (it works similar to a Catch block).

If we run the preceding example, it will successfully print the first 127 values and then throw MissingBackpressureException, as shown in the following screenshot:

The message in the exception provides a clue: Could not emit value due to lack of requests. Apparently, the rate of emitting values is higher than the rate of consuming them, while an internal buffer can only keep 128 elements. If we add a delay (to simulate a longer processing time), the result will be even worse (see the HotObservable class and the hot3() method):

PublishProcessor<Integer> hot = PublishProcessor.create();
hot.observeOn(Schedulers.io(), true)
   .delay(10, TimeUnit.MILLISECONDS)
   .subscribe(System.out::println, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
    hot.onNext(i);
}

Even the first 128 elements will not get through and the output will only have MissingBackpressureException.

To address this issue, a backpressure strategy has to be set. For example, let’s drop every value that the pipeline did not manage to process (see the HotObservable class and the hot4() method):

PublishProcessor<Integer> hot = PublishProcessor.create();
hot.onBackpressureDrop(v -> System.out.println("Dropped: "+ v))
   .observeOn(Schedulers.io(), true)
   .subscribe(System.out::println, Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
    hot.onNext(i);
}

Notice that the strategy has to be set before the observeOn() operation, so it will be picked up by the created Schedulers.io() thread.

The output shows that many of the emitted values were dropped. Here is an output fragment:

We will talk about other backpressure strategies in the Operators section when we overview the corresponding operators.

Disposable

Notice that a subscribe() method actually returns a Disposable object that can be queried to check whether the pipeline processing has been completed and disposed of (see the DisposableUsage class and the disposable1() method):

Observable<Integer> obs = Observable.range(1,5);
List<Double> list = new ArrayList<>();
Disposable disposable =
     obs.filter(i -> i % 2 == 0)
        .doOnNext(System.out::println)     //prints 2 and 4
        .map(Math::sqrt)
        .delay(100, TimeUnit.MILLISECONDS)
        .subscribe(d -> {
            if(list.size() == 1){
                list.remove(0);
            }
            list.add(d);
        });
System.out.println(disposable.isDisposed()); //prints: false
System.out.println(list);                    //prints: []
try {
    TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
    e.printStackTrace();
}
System.out.println(disposable.isDisposed());  //prints: true
System.out.println(list);                     //prints: [2.0]

Also, it is possible to enforce the disposing of a pipeline, thus effectively canceling the processing (see the DisposableUsage class and the disposable2() method):

Observable<Integer> obs = Observable.range(1,5);
List<Double> list = new ArrayList<>();
Disposable disposable =
     obs.filter(i -> i % 2 == 0)
        .doOnNext(System.out::println)       //prints 2 and 4
        .map(Math::sqrt)
        .delay(100, TimeUnit.MILLISECONDS)
        .subscribe(d -> {
            if(list.size() == 1){
                list.remove(0);
            }
            list.add(d);
        });
System.out.println(disposable.isDisposed()); //prints: false
System.out.println(list);                    //prints: []
disposable.dispose();
try {
    TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
    e.printStackTrace();
}
System.out.println(disposable.isDisposed()); //prints: true
System.out.println(list);                    //prints: []

As you can see, by adding the call to disposable.dispose(), we have stopped processing, so even after a delay of 200 ms, the list remains empty (see the last line of the preceding example).

This method of forced disposal can be used to make sure that there are no runaway threads. Each created Disposable object can be disposed of in the same way that resources are released in a finally block. The CompositeDisposable class helps us to handle multiple Disposable objects in a coordinated manner.

When an onComplete or onError event happens, the pipeline is disposed of automatically.

For example, you can use the add() method and add a newly created Disposable object to the CompositeDisposable object. Then, when necessary, the clear() method can be invoked on the CompositeDisposable object. It will remove the collected Disposable objects and call the dispose() method on each of them.

Creating an observable

You have already seen a few methods of how to create an observable in our examples. There are many other factory methods, including Observable, Flowable, Single, Maybe, and Completable. However, not all of the following methods are available in each of these interfaces (see the comments; all means that all of the listed interfaces have it):

  • create(): This creates an Observable object by providing the full implementation (all).
  • defer(): This creates a new Observable object every time a new Observer subscribes (all).
  • empty(): This creates an empty Observable object that completes immediately upon subscription (all, except for Single).
  • never(): This creates an Observable object that does not emit anything and does nothing at all; it does not even complete (all).
  • error(): This creates an Observable object that emits an exception immediately upon subscription (all).
  • fromXXX(): This creates an Observable object, where XXX can be Callable, Future (all), Iterable, Array, Publisher (Observable and Flowable), Action, or Runnable (Maybe and Completable); this means it creates an Observable object based on the provided function or object.
  • generate(): This creates a cold Observable object that generates values based on the provided function or object (Observable and Flowable only).
  • range(), rangeLong(), interval(), intervalRange(): This creates an Observable object that emits sequential int or long values, which may or may not be limited by the specified range and spaced by the specified time interval (Observable and Flowable only).
  • just(): This creates an Observable object based on the provided object or a set of objects (all, except for Completable).
  • timer(): This creates an Observable object that, after the specified time, emits an 0L signal (all) and then completes for Observable and Flowable.

There are also many other helpful methods, such as repeat(), startWith(), and more. We just do not have enough space to list all of them. Refer to the online documentation (http://reactivex.io/RxJava/2.x/javadoc/index.html).

Let’s look at an example of the create() method usage. The create() method of Observable is as follows:

public static Observable<T> create(ObservableOnSubscribe<T> source)

The passed-in object has to be an implementation of the ObservableOnSubscribe<T> functional interface, which only has one abstract method, subscribe():

void subscribe(ObservableEmitter<T> emitter)

The ObservableEmitter<T> interface contains the following methods:

  • boolean isDisposed(): This returns true if the processing pipeline was disposed of or the emitter was terminated.
  • ObservableEmitter<T> serialize(): This provides the serialization algorithm used by the calls to onNext(), onError(), and onComplete(), located in the Emitter base class.
  • void setCancellable(Cancellable c): This sets, on this emitter, a Cancellable implementation (that is, a functional interface that has only one method, cancel()).
  • void setDisposable(Disposable d): This sets, on this emitter, a Disposable implementation (which is an interface that has two methods: isDispose() and dispose()).
  • boolean tryOnError(Throwable t): This handles the error condition, attempts to emit the provided exception, and returns false if the emission is not allowed.

To create an observable, all the preceding interfaces can be implemented as follows (see the CreateObservable class and the main() method):

ObservableOnSubscribe<String> source = emitter -> {
    emitter.onNext("One");
    emitter.onNext("Two");
    emitter.onComplete();
};
Observable.create(source)
          .filter(s -> s.contains("w"))
          .subscribe(v -> System.out.println(v),
                     e -> e.printStackTrace(),
                    () -> System.out.println("Completed"));
pauseMs(100); 

Let’s take a closer look at the preceding example. We created an ObservableOnSubscribe function as source and implemented the emitter: we told the emitter to emit One at the first call to onNext(), to emit Two at the second call to onNext(), and then to call onComplete(). We passed the source function to the create() method and built the pipeline to process all of the emitted values.

To make it more interesting, we added the filter() operator, which only allows you to further propagate the values with the w character. Additionally, we chose the subscribe() method version with three parameters: the Consumer onNext, Consumer onError, and Action onComplete functions. The first is called every time a next value reached the method, the second is called when an exception was emitted, and the third is called when the source emits an onComplete() signal. After creating the pipeline, we paused for 100 ms to give the asynchronous process a chance to finish. The result is as follows:

If we remove the emitter.onComplete() line from the emitter implementation, only the message Two will be displayed.

So, those are the basics of how the create() method can be used. As you can see, it allows for full customization. In practice, it is rarely used because there are far simpler ways to create an observable. We will review them in the following sections.

Additionally, you will see examples of other factory methods that are used in our examples throughout other sections of this chapter.

Operators

There are literally hundreds (if we count all of the overloaded versions) of operators available in each of the observable interfaces, Observable, Flowable, Single, Maybe, and Completable.

In the Observable and Flowable interfaces, the number of methods goes beyond 500. That is why, in this section, we are going to provide just an overview and a few examples that will help you to navigate the maze of possible options.

We have grouped all the operators into 10 categories: transforming, filtering, combining, converting from XXX, exceptions handling, life cycle events handling, utilities, conditionals and Booleans, backpressure, and connectable.

Please note that these are not all of the operators that are available. You can see more in the online documentation (http://reactivex.io/RxJava/2.x/javadoc/index.html).

Transforming

The following operators transform the values emitted by an observable:

  • buffer(): This collects the emitted values into bundles according to the provided parameters or by using the provided functions. It periodically emits these bundles one at a time.
  • flatMap(): This produces observables based on the current observable and inserts them into the current flow; it is one of the most popular operators.
  • groupBy(): This divides the current Observable object into groups of observables (GroupedObservables objects).
  • map(): This transforms the emitted value using the provided function.
  • scan(): This applies the provided function to each value in combination with the value produced as the result of the previous application of the same function to the previous value.
  • window(): This emits groups of values similar to buffer() but as observables, each of which emits a subset of values from the original observable and then terminates with onCompleted().

The following code demonstrates the use of map(), flatMap(), and groupBy() (see the NonBlockingOperators class and the transforming() method):

Observable<String> obs = Observable.fromArray("one", "two");
obs.map(s -> s.contains("w") ? 1 : 0)
   .forEach(System.out::print);              //prints: 01
System.out.println();
List<String> os = new ArrayList<>();
List<String> noto = new ArrayList<>();
obs.flatMap(s -> Observable.fromArray(s.split("")))
        .groupBy(s -> "o".equals(s) ? "o" : "noto")
        .subscribe(g -> g.subscribe(s -> {
            if (g.getKey().equals("o")) {
                os.add(s);
            } else {
                noto.add(s);
            }
        }));
System.out.println(os);                  //prints: [o, o]
System.out.println(noto);                //prints: [n, e, t, w]

Filtering 

The following operators (and their multiple overloaded versions) select which of the values will continue to flow through the pipeline:

  • debounce(): This emits a value only when a specified span of time has passed without the observable emitting another value.
  • distinct(): This selects unique values only.
  • elementAt(long n): This emits only one value with the specified n position in the stream.
  • filter(): This emits only the values that match the specified criteria.
  • firstElement(): This emits the first value only.
  • ignoreElements(): This does not emit values; only the onComplete() signal goes through.
  • lastElement(): This emits the last value only.
  • sample(): This emits the most recent value emitted within the specified time interval.
  • skip(long n): This skips the first n values.
  • take(long n): This only emits the first n values.

The following code showcases examples of some of the uses of the preceding operators (see the NonBlockingOperators class and the filtering() method):

Observable<String> obs = Observable.just("onetwo")
        .flatMap(s -> Observable.fromArray(s.split("")));
// obs emits "onetwo" as characters           
obs.map(s -> {
            if("t".equals(s)){
               NonBlockingOperators.pauseMs(15);
            }
            return s;
        })
        .debounce(10, TimeUnit.MILLISECONDS)
        .forEach(System.out::print);               //prints: eo
obs.distinct().forEach(System.out::print);      //prints: onetw
obs.elementAt(3).subscribe(System.out::println);   //prints: t
obs.filter(s -> s.equals("o"))
   .forEach(System.out::print);                    //prints: oo
obs.firstElement().subscribe(System.out::println); //prints: o
obs.ignoreElements().subscribe(() -> 
       System.out.println("Completed!"));  //prints: Completed!
Observable.interval(5, TimeUnit.MILLISECONDS)
   .sample(10, TimeUnit.MILLISECONDS)
   .subscribe(v -> System.out.print(v + " ")); 
                                            //prints: 1 3 4 6 8 
pauseMs(50);

Combining

The following operators (and their multiple overloaded versions) create a new observable using multiple source observables:

  • concat(src1, src2): This creates an Observable object that emits all values of src1 and then all values of src2.
  • combineLatest(src1, src2, combiner): This creates an Observable object that emits a value emitted by either of the two sources combined with the latest value emitted by each source using the provided combiner function.
  • join(src2, leftWin, rightWin, combiner): This combines the values emitted by two observables during the leftWin and rightWin time windows according to the combiner function.
  • merge(): This combines multiple observables into one; in contrast to concat(), it might interleave them, whereas concat() never interleaves the emitted values from different observables.
  • startWith(T item): This adds the specified value before emitting values from the source observable.
  • startWith(Observable<T> other): This adds the values from the specified observable before emitting values from the source observable.
  • switchOnNext(Observable<Observable> observables): This creates a new Observable object that emits the most-recently emitted values of the specified observables.
  • zip(): This combines the values of the specified observables using the provided function.

The following code demonstrates the use of some of these operators (see the NonBlockingOperators class and the combined() method):

Observable<String> obs1 = Observable.just("one")
             .flatMap(s -> Observable.fromArray(s.split("")));
Observable<String> obs2 = Observable.just("two")
             .flatMap(s -> Observable.fromArray(s.split("")));
Observable.concat(obs2, obs1, obs2)
          .subscribe(System.out::print);    //prints: twoonetwo
Observable.combineLatest(obs2, obs1, (x,y) -> "("+x+y+")")
          .subscribe(System.out::print); //prints: (oo)(on)(oe)
System.out.println();
obs1.join(obs2, i -> Observable.timer(5, 
                TimeUnit.MILLISECONDS),i -> Observable.timer(5, 
                TimeUnit.MILLISECONDS),(x,y) -> "("+x+y+")")
                                 .subscribe(System.out::print); 
                 //prints: (ot)(nt)(et)(ow)(nw)(ew)(oo)(no)(eo)
Observable.merge(obs2, obs1, obs2)
          .subscribe(System.out::print);  
                       //prints: twoonetwo obs1.startWith("42")
    .subscribe(System.out::print);         //prints: 42one
Observable.zip(obs1, obs2, obs1,  (x,y,z) -> "("+x+y+z+")")
          .subscribe(System.out::print); 
                                      //prints: (oto)(nwn)(eoe) 

Converting from XXX

These operators are pretty straightforward. Here is a list of from-XXX operators of the Observable class:

  • fromArray(T... items): This creates an Observable object from a varargs.
  • fromCallable(Callable<T> supplier): This creates an Observable object from a Callable function.
  • fromFuture(Future<T> future): This creates an Observable object from a Future object.
  • fromFuture(Future<T> future, long timeout, TimeUnit unit): This creates an Observable object from a Future object with the timeout parameters applied to the future.
  • fromFuture(Future<T> future, long timeout, TimeUnit unit, Scheduler scheduler): This creates an Observable object from a Future object with the timeout parameters applied to the future and the scheduler (note that Schedulers.io() is recommended; please see the Multithreading (scheduler) section).
  • fromFuture(Future<T> future, Scheduler scheduler): This creates an Observable object from a Future object on the specified scheduler (note that Schedulers.io() is recommended; please see the Multithreading (scheduler) section).
  • fromIterable(Iterable<T> source): This creates an Observable object from an iterable object (for example, List).
  • fromPublisher(Publisher<T> publisher): This creates an Observable object, for example from a Publisher object.

Exceptions handling

The subscribe() operator has an overloaded version that accepts the Consumer<Throwable> function, which handles exceptions raised anywhere in the pipeline. It works in a similar way to the all-embracing try-catch block. If you have this function passed into the subscribe() operator, you can be sure that is the only place where all exceptions will end up.

However, if you need to handle the exceptions in the middle of the pipeline, the values flow can be recovered and processed by the rest of the operators, that is, after the operator has thrown the exception. The following operators (and their multiple overloaded versions) can help with that:

  • onErrorXXX(): This resumes the provided sequence when an exception was caught; XXX indicates what the operator does: onErrorResumeNext(), onErrorReturn(), or onErrorReturnItem().
  • retry(): This creates an Observable object that repeats the emissions emitted from the source; it resubscribes to the source Observable if it calls onError().

The demo code appears as follows (see the NonBlockingOperators class and the exceptions() method):

Observable<String> obs = Observable.just("one")
              .flatMap(s -> Observable.fromArray(s.split("")));
Observable.error(new RuntimeException("MyException"))
  .flatMap(x -> Observable.fromArray("two".split("")))
  .subscribe(System.out::print,
      e -> System.out.println(e.getMessage()) 
                                          //prints: MyException
  );
Observable.error(new RuntimeException("MyException"))
          .flatMap(y -> Observable.fromArray("two".split("")))
          .onErrorResumeNext(obs)
          .subscribe(System.out::print);          //prints: one
Observable.error(new RuntimeException("MyException"))
          .flatMap(z -> Observable.fromArray("two".split("")))
          .onErrorReturnItem("42")
          .subscribe(System.out::print);          //prints: 42

Life cycle events handling

These operators are each invoked on a certain event that happened anywhere in the pipeline. They work similarly to the operators described in the Exceptions handling section.

The format of these operators is doXXX(), where XXX is the name of the event: onComplete, onNext, onError, and similar. Not all of them are available in all the classes, and some of them are slightly different in Observable, Flowable, Single, Maybe, or Completable. However, we do not have space to list all the variations of all these classes and will limit our overview to a few examples of the life cycle events-handling operators of the Observable class:

  • doOnSubscribe(Consumer<Disposable> onSubscribe): This executes when an observer subscribes.
  • doOnNext(Consumer<T> onNext): This applies the provided Consumer function when the source observable calls onNext.
  • doAfterNext(Consumer<T> onAfterNext): This applies the provided Consumer function to the current value after it is pushed downstream.
  • doOnEach(Consumer<Notification<T>> onNotification): This executes the Consumer function for each emitted value.
  • doOnEach(Observer<T> observer): This notifies an Observer object for each emitted value and the terminal event it emits.
  • doOnComplete(Action onComplete): This executes the provided Action function after the source observable generates the onComplete event.
  • doOnDispose(Action onDispose): This executes the provided Action function after the pipeline was disposed of downstream.
  • doOnError(Consumer<Throwable> onError): This executes when the onError event is sent.
  • doOnLifecycle(Consumer<Disposable> onSubscribe, Action onDispose): This calls the corresponding onSubscribe or onDispose function for the corresponding event.
  • doOnTerminate(Action onTerminate): This executes the provided Action function when the source observable generates the onComplete event or an exception (the onError event) is raised.
  • doAfterTerminate(Action onFinally): This executes the provided Action function after the source observable generates the onComplete event or an exception (the onError event) is raised.
  • doFinally(Action onFinally): This executes the provided Action function after the source observable generates the onComplete event or an exception (the onError event) is raised, or the pipeline was disposed of downstream.

Here is demo code (see the NonBlockingOperators class and the events() method):

Observable<String> obs = Observable.just("one")
            .flatMap(s -> Observable.fromArray(s.split("")));
obs.doOnComplete(() -> System.out.println("Completed!")) 
        .subscribe(v -> {
            System.out.println("Subscribe onComplete: " + v);
        });        
pauseMs(25);

If we run this code, the output will be as follows:

You will also see other examples of these operators’ usage in the Multithreading (scheduler) section.

Utilities

Various useful operators (and their multiple overloaded versions) can be used for controlling the pipeline behavior:

  • delay(): This delays the emission for a specified period.
  • materialize(): This creates an Observable object that represents both the emitted values and the notifications sent.
  • dematerialize(): This reverses the result of the materialize() operator.
  • observeOn(): This specifies the Scheduler (thread) on which the Observer should observe the Observable object (see the Multithreading (scheduler) section).
  • serialize(): This forces the serialization of the emitted values and notifications.
  • subscribe(): This subscribes to the emissions and notifications from an observable; various overloaded versions accept callbacks used for a variety of events, including onComplete and onError; only after subscribe() is invoked do the the values start flowing through the pipeline.
  • subscribeOn(): This subscribes the Observer to the Observable object asynchronously using the specified Scheduler (see the Multithreading (scheduler) section).
  • timeInterval(), timestamp(): This converts an Observable<T> class that emits values into Observable<Timed<T>>, which, in turn, emits the amount of time elapsed between the emissions or the timestamp correspondingly.
  • timeout(): This repeats the emissions of the source Observable; it generates an error if no emissions happen after the specified period of time.
  • using(): This creates a resource that is disposed of automatically along with the Observable object; it works similarly to the try-with-resources construct.

The following code contains examples of some of these operators being used in a pipeline (see the NonBlockingOperators class and the utilities() method):

Observable<String> obs = Observable.just("one")
          .flatMap(s -> Observable.fromArray(s.split("")));
obs.delay(5, TimeUnit.MILLISECONDS)
   .subscribe(System.out::print);           //prints: one
pauseMs(10);
System.out.println(); //used here just to break the line
Observable source = Observable.range(1,5);
Disposable disposable = source.subscribe();
Observable.using(
  () -> disposable,
  x -> source,
  y -> System.out.println("Disposed: " + y) 
                               //prints: Disposed: DISPOSED
)
.delay(10, TimeUnit.MILLISECONDS)
.subscribe(System.out::print);              //prints: 12345
pauseMs(25);

If we run all these examples, the output will appear as follows:

As you can see, when completed, the pipeline sends the DISPOSED signal to the using operator (the third parameter), so the Consumer function we pass as the third parameter can dispose of the resources used by the pipeline.

Conditional and Boolean

The following operators (and their multiple overloaded versions) allow you to the evaluate one or more observables or emitted values and change the logic of the processing accordingly:

  • all(Predicate criteria): This returns Single<Boolean> with a true value, that is, if all the emitted values match the provided criteria.
  • amb(): This accepts two or more source observables and emits values from only the first of them that starts emitting.
  • contains(Object value): This returns Single<Boolean> with true, that is, if the observable emits the provided value.
  • defaultIfEmpty(T value): This emits the provided value if the source Observable does not emit anything.
  • sequenceEqual(): This returns Single<Boolean> with true, that is, if the provided sources emit the same sequence; an overloaded version allows us to provide the equality function used for comparison.
  • skipUntil(Observable other): This discards emitted values until the provided Observable other emits a value.
  • skipWhile(Predicate condition): This discards emitted values as long as the provided condition remains true.
  • takeUntil(Observable other): This discards emitted values after the provided Observable other emits a value.
  • takeWhile(Predicate condition): This discards emitted values after the provided condition becomes false.

The following code contains a few demo examples (see the NonBlockingOperators class and the conditional() method):

Observable<String> obs = Observable.just("one")
              .flatMap(s -> Observable.fromArray(s.split("")));
Single<Boolean> cont = obs.contains("n");
System.out.println(cont.blockingGet());          //prints: true
obs.defaultIfEmpty("two")
   .subscribe(System.out::print);                 //prints: one
Observable.empty().defaultIfEmpty("two")
          .subscribe(System.out::print);          //prints: two
Single<Boolean> equal = Observable.sequenceEqual(obs, 
                                 Observable.just("one"));
System.out.println(equal.blockingGet());        //prints: false
equal = Observable.sequenceEqual(Observable.just("one"), 
                                 Observable.just("one"));
System.out.println(equal.blockingGet());         //prints: true
equal = Observable.sequenceEqual(Observable.just("one"), 
                                 Observable.just("two"));
System.out.println(equal.blockingGet());        //prints: false

Backpressure

So, we discussed and demonstrated the backpressure effect and the possible drop strategy in the Cold versus hot section. The other strategy might be as follows:

Flowable<Double> obs = Flowable.fromArray(1.,2.,3.);
obs.onBackpressureBuffer().subscribe();
//or
obs.onBackpressureLatest().subscribe();

The buffering strategy allows you to define the buffer size and provide a function that can be executed if the buffer overflows. The latest strategy tells the values producer to pause (when the consumer cannot process the emitted values on time) and emit the next value on request.

Note that the backpressure operators are available only in the Flowable class.

Connectable 

The operators of this category allow us to connect observables and, thus, achieve more precisely controlled subscription dynamics:

  • publish(): This converts an Observable object into a ConnectableObservable object.
  • replay(): This returns a ConnectableObservable object that repeats all the emitted values and notifications every time a new Observer subscribes.
  • connect(): This instructs a ConnectableObservable object to begin emitting values to the subscribers.
  • refCount(): This converts a ConnectableObservable object into an Observable object.

We have demonstrated how ConnectableObservable works in the Cold versus hot section. One principal difference between ConnectableObservable and Observable is that ConnectableObservable does not start emitting values until its connect operator has been called.

Multithreading (scheduler)

By default, RxJava is single-threaded. This means that the source observable and all its operators notify the observers on the same thread that the subscribe() operator is called.

Тhere are two operators, observeOn() and subscribeOn(), that allow you to move the execution of individual actions to a different thread. These methods take a Scheduler object as an argument that, well, schedules the individual actions to be executed on a different thread.

The subscribeOn() operator declares which scheduler should emit the values.

The observeOn() operator declares which scheduler should observe and process values.

The Schedulers class contains factory methods that create Scheduler objects with different life cycles and performance configurations:

  • computation(): This creates a scheduler based on a bounded thread pool with a size up to the number of available processors; it should be used for CPU-intensive computations. Use Runtime.getRuntime().availableProcessors() to avoid using more of these types of schedulers than available processors; otherwise, the performance might become degraded because of the overhead of the thread-context switching.
  • io(): This creates a scheduler based on an unbounded thread pool used for I/O-related work, such as working with files and databases in general when the interaction with the source is blocking by nature; avoid using it otherwise because it might spin too many threads and negatively affect performance and memory usage.
  • newThread(): This creates a new thread every time and does not use any pool; it is an expensive way to create a thread, so you are expected to know exactly what the reason is for using it.
  • single(): This creates a scheduler based on a single thread that executes all the tasks sequentially; this is useful when the sequence of the execution matters.
  • trampoline(): This creates a scheduler that executes tasks in a first-in-first-out manner; this is useful for executing recursive algorithms.
  • from(Executor executor): This creates a scheduler based on the provided executor (thread pool), which allows for better control over the max number of threads and their life cycles.

In Chapter 8, Multithreading and Concurrent Processing, we talked about thread pools. To remind you, here are the pools we discussed:

          Executors.newCachedThreadPool();
          Executors.newSingleThreadExecutor();
          Executors.newFixedThreadPool(int nThreads);
          Executors.newScheduledThreadPool(int poolSize);
          Executors.newWorkStealingPool(int parallelism);

As you can see, some of the other factory methods of the Schedulers class are backed by one of these thread pools, and they serve as just a simpler and shorter expression of a thread pool declaration. To make the examples simpler and more comparable, we are only going to use a computation() scheduler. Let’s look at the basics of parallel/concurrent processing in RxJava.

The following code is an example of delegating CPU-intensive calculations to dedicated threads (see the Scheduler class and the parallel1() method):

Observable.fromArray("one","two","three")
          .doAfterNext(s -> System.out.println("1: " + 
                Thread.currentThread().getName() + " => " + s))
          .flatMap(w -> Observable.fromArray(w.split(""))
                           .observeOn(Schedulers.computation())
              //.flatMap(s -> {             
              //      CPU-intensive calculations go here
              // }  
                .doAfterNext(s -> System.out.println("2: " + 
                Thread.currentThread().getName() + " => " + s))
          )
          .subscribe(s -> System.out.println("3: " + s));
pauseMs(100);

In this example, we decided to create a sub-flow of characters from each emitted word and let a dedicated thread process the characters of each word. The output of this example appears as follows:

As you can see, the main thread was used to emit the words, and the characters of each word were processed by a dedicated thread. Please note that although in this example the sequence of the results coming to the subscribe() operation corresponds to the sequence the words and characters were emitted, in real-life cases, the calculation time of each value will not be the same. So, there is no guarantee that the results will come in the same sequence.

If needed, we can put each word emission on a dedicated non-main thread too, so the main thread can be free to do anything else. For example, note the following (see the Scheduler class and the parallel2() method):

Observable.fromArray("one","two","three")
        .observeOn(Schedulers.computation())
        .doAfterNext(s -> System.out.println("1: " + 
                Thread.currentThread().getName() + " => " + s))
        .flatMap(w -> Observable.fromArray(w.split(""))
                .observeOn(Schedulers.computation())
                .doAfterNext(s -> System.out.println("2: " + 
                Thread.currentThread().getName() + " => " + s))
        )
        .subscribe(s -> System.out.println("3: " + s));
pauseMs(100);

The output of this example is as follows:

As you can see, the main thread no longer emits the words.

In RxJava 2.0.5, a new, simpler way of parallel processing was introduced, similar to parallel processing in the standard Java 8 streams. Using ParallelFlowable, the same functionality can be achieved as follows (see the Scheduler class and the parallel3() method):

ParallelFlowable src = 
            Flowable.fromArray("one","two","three").parallel();
src.runOn(Schedulers.computation())
   .doAfterNext(s -> System.out.println("1: " + 
                Thread.currentThread().getName() + " => " + s))
   .flatMap(w -> Flowable.fromArray(((String)w).split("")))
   .runOn(Schedulers.computation())
   .doAfterNext(s -> System.out.println("2: " + 
                Thread.currentThread().getName() + " => " + s))
   .sequential()
   .subscribe(s -> System.out.println("3: " + s));
pauseMs(100);

As you can see, the ParallelFlowable object is created by applying the parallel() operator to the regular Flowable operator. Then, the runOn() operator tells the created observable to use the computation() scheduler to emit the values. Please note that there is no need to set another scheduler (for processing the characters) inside the flatMap() operator. It can be set outside it – just in the main pipeline, which makes the code simpler. The result looks like this:

Text

Description automatically generated

As for the subscribeOn() operator, its location in the pipeline does not play any role. Wherever it is placed, it still tells the observable which scheduler should emit the values. Here is an example (see the Scheduler class and the subscribeOn1() method):

Observable.just("a", "b", "c")
          .doAfterNext(s -> System.out.println("1: " + 
                Thread.currentThread().getName() + " => " + s))
          .subscribeOn(Schedulers.computation())
          .subscribe(s -> System.out.println("2: " + 
               Thread.currentThread().getName() + " => " + s));
pauseMs(100);

The result looks like this:

Even if we change the location of the subscribeOn() operator, as shown in the following example, the result does not change (see the Scheduler class and the subscribeOn2() method):

Observable.just("a", "b", "c")
          .subscribeOn(Schedulers.computation())
          .doAfterNext(s -> System.out.println("1: " + 
                Thread.currentThread().getName() + " => " + s))
          .subscribe(s -> System.out.println("2: " + 
               Thread.currentThread().getName() + " => " + s));
pauseMs(100);

Finally, here is the example with both operators (see the Scheduler class and the subscribeOnAndObserveOn() method):

Observable.just("a", "b", "c")
          .subscribeOn(Schedulers.computation())
          .doAfterNext(s -> System.out.println("1: " + 
                Thread.currentThread().getName() + " => " + s))
          .observeOn(Schedulers.computation())
          .subscribe(s -> System.out.println("2: " + 
               Thread.currentThread().getName() + " => " + s));
pauseMs(100);

Now the result shows that two threads are used: one for subscribing and another for observing:

This concludes our short overview of RxJava, which is a big and still-growing library with a lot of possibilities, many of which we just did not have space in this book to review. We encourage you to try and learn it because it seems that reactive programming is the way modern data processing is heading.

In the following chapters, we will demonstrate how to build reactive applications (microservices) using Spring Boot and Vert.x.

Summary

In this chapter, you learned what reactive programming is and what its main concepts are: asynchronous, non-blocking, responsive, and more. Reactive streams were introduced and explained in simple terms, along with the RxJava library, which is the first solid implementation that supports reactive programming principles.

Now you can write code for asynchronous processing using reactive programming.

In the next chapter, we will talk about microservices as the foundation for creating reactive systems, and we will review another library that successfully supports reactive programming: Vert.x. We will use it to demonstrate how various microservices can be built.

Quiz

  1. Select all the correct statements:
    1. Asynchronous processing always provides results later.
    2. Asynchronous processing always provides responses quickly.
    3. Asynchronous processing can use parallel processing.
    4. Asynchronous processing always provides results faster than a blocking call.
  2. Can CompletableFuture be used without using a thread pool?
  3. What does the nio in java.nio stand for?
  4. Is an event loop the only design that supports a non-blocking API?
  5. What does the Rx in RxJava stand for?
  6. Which Java package of the Java Class Library (JCL) supports reactive streams?
  7. Select all classes from the following list that can represent an observable in a reactive stream:
    1. Flowable
    2. Probably
    3. CompletableFuture
    4. Single
  8. How do you know that a particular method (operator) of the Observable class is blocking?
  9. What is the difference between a cold and a hot observable?
  10. The subscribe() method of Observable returns a Disposable object. What happens when the dispose() method is called on this object?
  11. Select all the names of the methods that create an Observable object:
    1. interval()
    2. new()
    3. generate()
    4. defer()
  12. Name two transforming Observable operators.
  13. Name two filtering Observable operators.
  14. Name two backpressure-processing strategies.
  15. Name two Observable operators that allow you to add threads to the pipeline processing.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset