Processor

The processor application will pick up the message from the input channel on the message broker. It will process the message and put it out on the output channel of the message broker. In this specific example, processing involves adding the position of current holdings to the message.

Let's use Spring Initializr (https://start.spring.io) to set up the application. Provide the details listed here and click on Generate Project:

  • Group: com.mastering.spring.cloud.data.flow
  • Artifact: stock-intelligence-processor
  • Dependencies: Stream Rabbit

Update the SpringBootApplication file with the following code:

    @EnableBinding(Processor.class)@SpringBootApplication
public class StockIntelligenceProcessorApplication {
private static Logger logger =
LoggerFactory.getLogger
(StockIntelligenceProcessorApplication.class);
private static Map < StockTicker, Integer > holdings =
getHoldingsFromDatabase();
private static Map < StockTicker,
Integer > getHoldingsFromDatabase() {
final Map < StockTicker,
Integer > holdings = new HashMap < >();
holdings.put(StockTicker.FACEBOOK, 10);
holdings.put(StockTicker.GOOGLE, 0);
holdings.put(StockTicker.IBM, 15);
holdings.put(StockTicker.MICROSOFT, 30);
holdings.put(StockTicker.TWITTER, 50);
return holdings;
}
@Transformer(inputChannel = Processor.INPUT,
outputChannel = Processor.OUTPUT)
public Object addOurInventory(StockPriceChangeEvent event) {
logger.info("started processing event " + event);
Integer holding = holdings.get(
StockTicker.valueOf(event.getStockTicker()));
StockPriceChangeEventWithHoldings eventWithHoldings =
new StockPriceChangeEventWithHoldings(event, holding);
logger.info("ended processing eventWithHoldings "
+ eventWithHoldings);
return eventWithHoldings;
}
public static void main(String[] args) {
SpringApplication.run(
StockIntelligenceProcessorApplication.class,args);
}
}

A few important things to note are as follows:

  • @EnableBinding(Processor.class): The EnableBinding annotation enables binding a class with the respective channel it needs--an input and/or an output. The Processor class is used to register a Cloud Stream with one input channel and one output channel.
  • private static Map<StockTicker, Integer> getHoldingsFromDatabase(): This method processes a message, updates the holdings, and return a new object, which will be put as a new message into the output channel.
  • @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT): The Transformer annotation is used to indicate a method that is capable of transforming/enhancing one message format into another.

As shown in the following code, the Processor class extends the Source and Sink classes. Hence, it defines both the output and input channels:

   public abstract interface 
org.springframework.cloud.stream.messaging.Processor extends
org.springframework.cloud.stream.messaging.Source,
org.springframework.cloud.stream.messaging.Sink {
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset