Implementing asynchronous communication with message broker

For implementation, we are taking an example of a burger shop, say "Crispy Bun." It is a drive-through burger shop where customers place their orders at one window and wait for the orders are the next window in their cars. There is a system that takes an order at the first window and pushes the order as a message or event/message to some queue/topic. There is another component in front of the chef where all the orders are listed. So, our requirement from this is that the order should be able to submit some middle broker (queue/topic) so that all the chefs can listen. The order should not vanish if no chef component is available, and it should save orders until the chef component reads it for processing. This example can be expanded. For example, the number of components can listen to this topic, or the same order should be passed to the chef component and the packing team as well so that they can start making boxes to pack this order, or perhaps it should be passed to the beverage team to make a beverage (if it is in the order). So, different components can listen to this order and can take respective actions for it. For simplicity, we are taking one order-taking component and one chef component.

RabbitMQ is one of most popular message broker solutions with the implementation of Advance Message Queing Protocol (AMQP). It is based on the Erlang language. It is different from a normal message broker. Here, messages are published directly to the queue rather than the producer sending messages to an exchange. Exchanges are messages responsible for messages routing to different queues based on attributes, binding, and routing queues. Many queues can be on the same exchange with a different routing key. For instance, in our example, there could be a beverage placed in the order. Then, the producer can send a message for the order created with either the Binding Chef_key binding key or with the Binding All_key key. The message with Binding All_key will go to both queues, but the message with the binding Binding Chef_key will only go to Chef - Queue. If any order doesn't have beverages included, then the message will go with Binding Chef_key. The following diagram describes the concept:

It is highly recommended that you have knowledge of RabbitMQ. Here is a brief description of how to install RabbitMQ on a Unix-flavored machine:

  1. Run the sudo apt-get update command.
  2. Add RabbitMQ application repository and key using the following commands:
      echo "deb http://www.rabbitmq.com/debian/ testing main" >> 
/etc/apt/sources.list
curl http://www.rabbitmq.com/rabbitmq-signing-key-public.asc |
sudo apt-key add -
  1. Update again, using the sudo apt-get update command.
  2. Then, run the sdo apt-get install rabbitmq-server command.

It will install Erlang and RabbitMQ on the machine and start the RabbitMQ service. In case the service is not started automatically, you can use the service rabbitmq-server start command to start the RabbitMQ. Along with this installation, you might also need the admin console for handling the cluster and nodes of RabbitMQ. To do this, you need to install RabbitMQ plugins using the following command:

sudo rabbitmq-plugins enable rabbitmq_management  

Once it is done, you can open up the management console on http://<yourip>:15672. It will show a page similar to the following screenshot. The default user name and password to open this is guest:

As we mentioned earlier, there will be two applications: one is the producer and one is the consumer. Now that our RabbitMQ is up, we will write a small code for the producer that will actually produce the order and submit it to the default exchange. The default exchange has no name. It is an empty string "" and the message will be forwarded to the queue named crispyBunOrder.

Its pom.xml file will be as follows:

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
 
    <groupId>com.practicalMircorservice</groupId> 
    <artifactId>EventProducer</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
    <packaging>jar</packaging> 
 
    <name>EventProducer</name> 
    <description>com.practicalMircorservice </description> 
 
    <parent> 
        <groupId>org.springframework.boot</groupId> 
        <artifactId>spring-boot-starter-parent</artifactId> 
        <version>1.4.1.RELEASE</version> 
        <relativePath /> <!-- lookup parent from repository --> 
    </parent> 
 
    <properties> 
        <project.build.sourceEncoding>UTF-
8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-
8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Camden.SR1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

In the producer application, there will be two Java files:

  • EventProducerApplication.java: This file will be the main application and will have the REST controller in it
  • CrispyBunOrder.java: This is the order object that will be submitted

The following is the code for EventProducerApplication.java:

@SpringBootApplication 
@EnableBinding 
@RestController 
public class EventProducerApplication { 
 
    private final String Queue = "crispyBunOrder"; 
    @Autowired 
    private RabbitTemplate rabbitTemplate; 
 
    public static void main(String[] args) { 
        SpringApplication.run(EventProducerApplication.class, args); 
    } 
 
    @RequestMapping(method = RequestMethod.POST, value = 
"/orders/{orderId}") public void placeOrder(@PathVariable("orderId") UUID
orderId,@RequestParam("itemId") Integer
itemId,@RequestParam("userName") String userName) { CrispyBunOrder orderObject =
createOrder(orderId,itemId,userName); rabbitTemplate.convertAndSend(Queue,orderObject); } private CrispyBunOrder createOrder(UUID orderId,Integer itemId,
String userName){ CrispyBunOrder order = new CrispyBunOrder(); order.setItemId(itemId); order.setOrderId(orderId); order.setUserName(userName); order.setOrderPlacedTime(new Date()); return order; } }

The preceding class will take the orderId, itemId, and userName as a parameter and create an order. It will submit the order to the queue named crispy bun order. As we have added cloud stream RabbitMQ dependencies in the Project Object Model (POM) file, Spring Boot will create a Rabbit template automatically for us. With the help of the RabbitMQ template, we can send any object to the given queue name. Actually here, we are submitting to the default exchange. The default exchange has no name. It is an empty string "". So, any message submitted to the default exchange will go directly to the queue name.

The CrispyBunOrder object class will have four parameters and will look like this:

package com.practicalMircorservices.eventProducer; 
 
import java.io.Serializable; 
import java.util.Date; 
import java.util.UUID; 
 
public class CrispyBunOrder implements Serializable{ 
 
    /** 
     *  
     */ 
    private static final long serialVersionUID = 6572547218488352566L; 
 
    private UUID orderId; 
    private Integer itemId; 
    private Date orderPlacedTime; 
    private String userName; 
    public UUID getOrderId() { 
        return orderId; 
    } 
    public void setOrderId(UUID orderId) { 
        this.orderId = orderId; 
    } 
    public Integer getItemId() { 
        return itemId; 
    } 
    public void setItemId(Integer itemId) { 
        this.itemId = itemId; 
    } 
    public Date getOrderPlacedTime() { 
        return orderPlacedTime; 
    } 
    public void setOrderPlacedTime(Date orderPlacedTime) { 
        this.orderPlacedTime = orderPlacedTime; 
    } 
    public String getUserName() { 
        return userName; 
    } 
    public void setUserName(String userName) { 
        this.userName = userName; 
    } 
} 

Its application.properties will have two properties:

spring.rabbitmq.host=localhost 
spring.rabbitmq.port=5672 

These are the default ports of RabbitMQ.

Now, coming to the consumer side, which is an altogether new application named EventConsumerApplication. This application can be created by downloading the generated code from start.spring.io. Before downloading make sure you have clicked the checkbox of Stream Rabbit. In this application, the CrispyBunOrder class will be the same as the producer, as we have to deserialize it here. Other than that, there will be one listener, who will be listening to the RabbitMQ queue name crispyBunOrder. The code of this class will be as follows:

package com.practicalMircorservices.eventProducer; 
 
import org.springframework.amqp.core.Queue; 
import org.springframework.amqp.rabbit.annotation.RabbitHandler; 
import org.springframework.amqp.rabbit.annotation.RabbitListener; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.context.annotation.Bean; 
import org.springframework.messaging.handler.annotation.Payload; 
 
 
 
@SpringBootApplication 
@RabbitListener(queues = "crispyBunOrder") 
@EnableAutoConfiguration 
public class EventConsumerApplication { 
 
    @Bean 
    public Queue crispyBunOrderQueue() { 
        return new Queue("crispyBunOrder"); 
    } 
     
    @RabbitHandler 
    public void process(@Payload CrispyBunOrder order) { 
        StringBuffer SB = new StringBuffer(); 
        SB.append("New Order Received : 
"); 
        SB.append("OrderId : " + order.getOrderId()); 
        SB.append("
ItemId : " + order.getItemId()); 
        SB.append("
UserName : " + order.getUserName()); 
        SB.append("
Date : " + order.getOrderPlacedTime()); 
        System.out.println(SB.toString()); 
    } 
 
    public static void main(String[] args) throws Exception { 
        SpringApplication.run(EventConsumerApplication.class, args); 
    } 
 
} 

The @RabbitListener(queues = "crispyBunOrder") annotation defines which queue it has to listen to. In addition to this, we can define many parameters here, with listeners such as exchange name, routing key, and so on. In that case, the annotation will be as follows:

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"), exchange = @Exchange(value = "auto.exch"), key = "orderRoutingKey") )

Start both the application/components in different consoles with mvn spring-boot:run. Make sure both run on different ports by adding the server.port property in application.properties.

Now, you can use the curl command from the command line to hot the producer URL to test the response , like curl -H "Content-Type: application/x-www-form-urlencoded" --data "itemId=1&userName=john" http://localhost:8080/orders/02b425c0-da2b-445d-8726-3cf4dcf4326d; it will result in the order showing on the consumer console. This is the basic understanding of how messaging works. There are more complex examples of this in real life such as taking user action feed from a website, stock price feed, and so on. Mostly, it comes in handy when you have a stream of data coming, and based on the routing key, the broker can send the data to different queues. Kafka is another good tool for doing this. Spring has inbuilt support for Kafka, RabbitMQ, and some other messaging brokers. This inbuilt support helps developers to setup and start development with supported messaging brokers pretty quickly.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset