Creating a Mono is very simple. The following Mono emits one element after a delay of 5 seconds.
Mono<String> stubMonoWithADelay =
Mono.just("Ranga").delayElement(Duration.ofSeconds(5));
We want to listen to the events from Mono and log them to the console. We can do that using the statement specified here:
stubMonoWithADelay.subscribe(System.out::println);
However, if you run the program with the two preceding statements in a Test annotation as shown in the following code, you would see that nothing is printed to the console:
@Test
public void monoExample() throws InterruptedException {
Mono<String> stubMonoWithADelay =
Mono.just("Ranga").delayElement(Duration.ofSeconds(5));
stubMonoWithADelay.subscribe(System.out::println);
}
Nothing is printed to the console because the Test execution ends before the Mono emits the element after 5 seconds. To prevent this, let's delay the execution of Test using Thread.sleep:
@Test
public void monoExample() throws InterruptedException {
Mono<String> stubMonoWithADelay =
Mono.just("Ranga").delayElement(Duration.ofSeconds(5));
stubMonoWithADelay.subscribe(System.out::println);
Thread.sleep(10000);
}
When we create a subscriber using stubMonoWithADelay.subscribe(System.out::println), we are using the functional programming feature introduced in Java 8. System.out::println is a method definition. We are passing the method definition as a parameter to a method.
This is possible because of a specific functional interface called Consumer. A functional interface is an interface with only one method. The Consumer functional interface is used to define an operation that accepts a single input argument and returns no result. An outline of the Consumer interface is shown in the following snippet:
@FunctionalInterface
public interface Consumer<T> {
void accept(T t);
}
Instead of using a lambda expression, we can explicitly define Consumer as well. The following code snippet shows the important details:
class SystemOutConsumer implements Consumer<String> {
@Override
public void accept(String t) {
System.out.println("Received " + t + " at " + new Date());
}
}
@Test
public void monoExample() throws InterruptedException {
Mono<String> stubMonoWithADelay =
Mono.just("Ranga").delayElement(Duration.ofSeconds(5));
stubMonoWithADelay.subscribe(new SystemOutConsumer());
Thread.sleep(10000);
}
A couple of important things to note are as follows:
- class SystemOutConsumer implements Consumer<String>: We create a SystemOutConsumer class that implements the functional interface Consumer. The type of input is String.
- public void accept(String t): We define the accept method to print the content of the string to the console.
- stubMonoWithADelay.subscribe(new SystemOutConsumer()): We create an instance of SystemOutConsumer to subscribe the events.
The output is shown in the following screenshot:
We can have multiple subscribers listening on events from a Mono or Flux. The following snippet shows how we can create an additional subscriber:
class WelcomeConsumer implements Consumer<String> {
@Override
public void accept(String t) {
System.out.println("Welcome " + t);
}
}
@Test
public void monoExample() throws InterruptedException {
Mono<String> stubMonoWithADelay =
Mono.just("Ranga").delayElement(Duration.ofSeconds(5));
stubMonoWithADelay.subscribe(new SystemOutConsumer());
stubMonoWithADelay.subscribe(new WelcomeConsumer());
Thread.sleep(10000);
}
A couple of important things to note are as follows:
- class WelcomeConsumer implements Consumer<String>: We are creating another Consumer class, WelcomeConsumer
- stubMonoWithADelay.subscribe(new WelcomeConsumer()): We are adding an instance of WelcomeConsumer as a subscriber to the events from Mono
The output is shown in the following screenshot: