The source application

The source application will be the producer of stock price change events. It will define an output channel and put a message on the message broker.

Let's use Spring Initializr (https://start.spring.io) to set up the application. Provide the details listed here and click on Generate Project:

  • Group: com.mastering.spring.cloud.data.flow
  • Artifact: significant-stock-change-source
  • Dependencies: Stream Rabbit

Listed here are some of the important dependencies from the pom.xml file:

    <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

Update the SpringBootApplication file with the following code:

    @EnableBinding(Source.class)
@SpringBootApplication
public class SignificantStockChangeSourceApplication {
private static Logger logger = LoggerFactory.getLogger
(SignificantStockChangeSourceApplication.class);
// psvm - main method
@Bean
@InboundChannelAdapter(value = Source.OUTPUT,
poller = @Poller(fixedDelay = "60000", maxMessagesPerPoll = "1"))
public MessageSource<StockPriceChangeEvent>
stockPriceChangeEvent() {
StockTicker[] tickers = StockTicker.values();
String randomStockTicker =
tickers[ThreadLocalRandom.current().nextInt(tickers.length)]
.name();
return () - > {
StockPriceChangeEvent event = new
StockPriceChangeEvent(randomStockTicker,
new BigDecimal(getRandomNumber(10, 20)), new
BigDecimal(getRandomNumber(10, 20)));
logger.info("sending " + event);
return MessageBuilder.withPayload(event).build();
};
}
private int getRandomNumber(int min, int max) {
return ThreadLocalRandom.current().nextInt(min, max + 1);
}
}

A few important things to note are as follows:

  • @EnableBinding(Source.class): The EnableBinding annotation enables binding a class with the respective channel it needs--an input and/or an output. The source class is used to register a Cloud Stream with one output channel.
  • @Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "60000", maxMessagesPerPoll = "1")): The InboundChannelAdapter annotation is used to indicate that this method can create a message to be put on a message broker. The value attribute is used to indicate the name of the channel where the message is to be put. Poller is used to schedule the generation of messages. In this example, we are using fixedDelay to generate messages every minute (60 * 1000 ms).
  • private int getRandomNumber(int min, int max): This method is used to create a random number in the range passed as parameters.

The Source interface defines an output channel, as shown in the following code:

    public abstract interface 
org.springframework.cloud.stream.messaging.Source {
public static final java.lang.String OUTPUT = "output";
@org.springframework.cloud.stream.
annotation.Output(value="output")
public abstract org.springframework.
messaging.MessageChannel output();
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset