Using Mono to emit one element

Creating Mono is very simple. The following Mono emits one element after a delay of 5 seconds:

   Mono<String> stubMonoWithADelay = Mono.just("Ranga")
.delayElement(Duration.ofSeconds(5));

We want to listen to the events from Mono and log them to the console. We can do that using the statement specified here:

    stubMonoWithADelay.subscribe(System.out::println);

However, if you run the program with the two preceding statements in a Test annotation, as shown in the following code, you will see that nothing is printed to the console:

@Test
public void monoExample() throws InterruptedException {

Mono<String> stubMonoWithADelay = Mono.just("Ranga")
.delayElement(Duration.ofSeconds(5));

stubMonoWithADelay.subscribe(System.out::println);

}

Nothing is printed to the console because the Test execution ends before Mono emits the element after 5 seconds. To prevent this, let's delay the execution of Test using Thread.sleep:

    @Test
public void monoExample() throws InterruptedException {

Mono<String> stubMonoWithADelay = Mono.just("Ranga")
.delayElement(Duration.ofSeconds(5));

stubMonoWithADelay.subscribe(System.out::println);

Thread.sleep(10000);

}

When we create a subscriber using stubMonoWithADelay.subscribe(System.out::println), we are using the functional programming feature introduced in Java 8; System.out::println is a method definition. We are passing the method definition as a parameter to a method.

This is possible because of a specific functional interface called Consumer. A functional interface is an interface with only one method. The Consumer functional interface is used to define an operation that accepts a single input argument and returns no result. An outline of the Consumer interface is shown in the following snippet:

     @FunctionalInterface
public interface Consumer<T> {

void accept(T t);

}

Instead of using a lambda expression, we can explicitly define Consumer as well. The following code snippet shows the important details:

    class SystemOutConsumer implements Consumer<String> {

@Override
public void accept(String t) {
System.out.println("Received " + t + " at " + new Date());
}

}

@Test
public void monoExample() throws InterruptedException {

Mono<String> stubMonoWithADelay = Mono.just("Ranga")
.delayElement(Duration.ofSeconds(5));

stubMonoWithADelay.subscribe(new SystemOutConsumer());

Thread.sleep(10000);

}

Bear in mind the following:

  • class SystemOutConsumer implements Consumer<String>: We create a SystemOutConsumer class that implements the Consumer functional interface. The type of input is String.
  • public void accept(String t): We define the accept method to print the content of the string to the console.
  • stubMonoWithADelay.subscribe(new SystemOutConsumer()): We create an instance of SystemOutConsumer to subscribe to the events.

The output is shown in the following screenshot:

We can have multiple subscribers listening to events from a Mono or Flux. The following snippet shows how we can create an additional subscriber:

    class WelcomeConsumer implements Consumer<String> {

@Override
public void accept(String t) {
System.out.println("Welcome " + t);
}

}

@Test
public void monoExample() throws InterruptedException {

Mono<String> stubMonoWithADelay = Mono.just("Ranga")
.delayElement(Duration.ofSeconds(5));

stubMonoWithADelay.subscribe(new SystemOutConsumer());

stubMonoWithADelay.subscribe(new WelcomeConsumer());

Thread.sleep(10000);

}

A couple of important things to note are as follows:

  • class WelcomeConsumer implements Consumer<String>: We are creating another Consumer class, WelcomeConsumer.
  • stubMonoWithADelay.subscribe(new WelcomeConsumer()): We are adding an instance of WelcomeConsumer as a subscriber to the events from Mono.

The output is shown in the following screenshot:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset