Mono

Creating a Mono is very simple. The following Mono emits one element after a delay of 5 seconds.

   Mono<String> stubMonoWithADelay = 
Mono.just("Ranga").delayElement(Duration.ofSeconds(5));

We want to listen to the events from Mono and log them to the console. We can do that using the statement specified here:

    stubMonoWithADelay.subscribe(System.out::println);

However, if you run the program with the two preceding statements in a Test annotation as shown in the following code, you would see that nothing is printed to the console:

    @Test
public void monoExample() throws InterruptedException {
Mono<String> stubMonoWithADelay =
Mono.just("Ranga").delayElement(Duration.ofSeconds(5));
stubMonoWithADelay.subscribe(System.out::println);
}

Nothing is printed to the console because the Test execution ends before the Mono emits the element after 5 seconds. To prevent this, let's delay the execution of Test using Thread.sleep:

    @Test
public void monoExample() throws InterruptedException {
Mono<String> stubMonoWithADelay =
Mono.just("Ranga").delayElement(Duration.ofSeconds(5));
stubMonoWithADelay.subscribe(System.out::println);
Thread.sleep(10000);
}

When we create a subscriber using stubMonoWithADelay.subscribe(System.out::println), we are using the functional programming feature introduced in Java 8. System.out::println is a method definition. We are passing the method definition as a parameter to a method.

This is possible because of a specific functional interface called Consumer. A functional interface is an interface with only one method. The Consumer functional interface is used to define an operation that accepts a single input argument and returns no result. An outline of the Consumer interface is shown in the following snippet:

     @FunctionalInterface
public interface Consumer<T> {
void accept(T t);
}

Instead of using a lambda expression, we can explicitly define Consumer as well. The following code snippet shows the important details:

    class SystemOutConsumer implements Consumer<String> {
@Override
public void accept(String t) {
System.out.println("Received " + t + " at " + new Date());
}
}
@Test
public void monoExample() throws InterruptedException {
Mono<String> stubMonoWithADelay =
Mono.just("Ranga").delayElement(Duration.ofSeconds(5));
stubMonoWithADelay.subscribe(new SystemOutConsumer());
Thread.sleep(10000);
}

A couple of important things to note are as follows:

  • class SystemOutConsumer implements Consumer<String>: We create a SystemOutConsumer class that implements the functional interface Consumer. The type of input is String.
  • public void accept(String t): We define the accept method to print the content of the string to the console.
  • stubMonoWithADelay.subscribe(new SystemOutConsumer()): We create an instance of SystemOutConsumer to subscribe the events.

The output is shown in the following screenshot:

We can have multiple subscribers listening on events from a Mono or Flux. The following snippet shows how we can create an additional subscriber:

    class WelcomeConsumer implements Consumer<String> {
@Override
public void accept(String t) {
System.out.println("Welcome " + t);
}
}
@Test
public void monoExample() throws InterruptedException {
Mono<String> stubMonoWithADelay =
Mono.just("Ranga").delayElement(Duration.ofSeconds(5));
stubMonoWithADelay.subscribe(new SystemOutConsumer());
stubMonoWithADelay.subscribe(new WelcomeConsumer());
Thread.sleep(10000);
}



A couple of important things to note are as follows:

  • class WelcomeConsumer implements Consumer<String>: We are creating another Consumer class, WelcomeConsumer
  • stubMonoWithADelay.subscribe(new WelcomeConsumer()): We are adding an instance of WelcomeConsumer as a subscriber to the events from Mono

The output is shown in the following screenshot:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset