Stacking and consuming tasks with RabbitMQ and AMQP

This recipe will demonstrate how to implement a Message-oriented-Middleware (MoM). This is a very popular technique in scalability based on asynchronous communication between components.

Getting ready

We have already introduced the new cloudstreetmarket-shared and cloudstreetmarket-websocket Java projects. WebSockets are now split away from cloudstreetmarket-api, but cloudstreetmarket-websocket and cloudstreetmarket-api will still communicate with each other using messaging.

In order to decouple secondary tasks from the request thread (secondary tasks like event producing), you need to learn how to configure and use AMQP message templates and listeners with RabbitMQ.

How to do it…

  1. Access the RabbitMQ web console at http://localhost:15672.

    Note

    If you cannot reach the web console for some reason, please return to the previous recipe where the download and installation guidance can be found.

  2. In the Queue tab of the web console, create a new queue named AMQP_USER_ACTIVITY. Create it with the parameters Durable and Auto-delete: "No":
    How to do it…

Sender side

When the API is requested to perform operations such as create a transaction or create a like activity, we produce events.

Tip

With very few adjustments changes, we now use the RabbitTemplate rather than the former SimpMessagingTemplate and we target an intermediate AMQP queue instead of the ultimate STOMP client.

In the TransactionController, the POST handler has been updated as follows:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
@RestController
public class TransactionController extends CloudstreetApiWCI<Transaction> {
  @Autowired
  private RabbitTemplate messagingTemplate;

  @RequestMapping(method=POST)
  @ResponseStatus(HttpStatus.CREATED)
  public TransactionResource post(@Valid @RequestBody Transaction transaction, HttpServletResponse response, BindingResult result) {
   ...
   messagingTemplate.convertAndSend("AMQP_USER_ACTIVITY", new UserActivityDTO(transaction));
   ...
   return resource;
  }
}

In the LikeActionController, the POST handler has been updated as follows:

import org.springframework.amqp.rabbit.core.RabbitTemplate;

@RestController
public class LikeActionController extends CloudstreetApiWCI<LikeAction> {
  @Autowired
  private RabbitTemplate messagingTemplate;
  @RequestMapping(method=POST)
  @ResponseStatus(HttpStatus.CREATED)
  public LikeActionResource post(@RequestBody LikeAction likeAction, HttpServletResponse response) {
  ...
   messagingTemplate.convertAndSend("AMQP_USER_ACTIVITY", new UserActivityDTO(likeAction));
   ...
   return resource;
  }
}

Consumer side

As explained previously, the cloudstreetmarket-websocket module now listens to the AMQP_USER_ACTIVITY queue.

  1. The necessary configuration is set in the displatcher-servlet.xml (cloudstreetmarket-websocket). There, we create a rabbitConnectionFactory and a rabbitListenerContainerFactory bean:
    <rabbit:connection-factory id="rabbitConnectionFactory" username="guest" host="localhost" password="guest"/>
    <bean id="rabbitListenerContainerFactory" class="org.sfw.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
        <property name="connectionFactory" ref="rabbitConnectionFactory"/>
        <property name="concurrentConsumers" value="3"/>
        <property name="maxConcurrentConsumers" value="10"/>
        <property name="prefetchCount" value="12"/>
    </bean>
  2. Finally, the listener bean is created as follows with a CSMReceiver class:
    @Component
    public class CSMReceiver {
      @Autowired
      private SimpMessagingTemplate simpMessagingTemplate;
    
      @RabbitListener(queues = "AMQP_USER_ACTIVITY_QUEUE")
      public void handleMessage(UserActivityDTO payload) {
      simpMessagingTemplate.convertAndSend("/topic/actions", payload);
      }
    } 

    Tip

    You can recognize the SimpMessagingTemplate used here to forward incoming message payloads to the final STOMP clients.

  3. A new WebSocketConfig bean has been created in cloudstreetmarket-websocket. This one is very similar to the one we had in cloudstreetmarket-api.

Client-side

We haven't changed many things on the client side (cloudstreetmarket-webapp), as we are still focused on the landing page (home_community_activity.js) at this point.

The main difference is that the STOMP endpoint now targets the /ws context path. WebSockets are initiated from the init() function after a 5-second delay. Also, the SockJS socket and the STOMP client are now centralized in global variables (using the Window object) to simplify the WebSockets lifecycle during user navigation:

var timer = $timeout( function(){ 
  window.socket = new SockJS('/ws/channels/users/broadcast');
  window.stompClient = Stomp.over(window.socket);
    window.socket.onclose = function() {
        window.stompClient.disconnect();
      };
  window.stompClient.connect({}, function(frame) {
    window.stompClient.subscribe('/topic/actions', function(message){
        var newActivity = $this.prepareActivity(JSON.parse(message.body));
        $this.addAsyncActivityToFeed(newActivity);
        $scope.$apply();
      });
    });
     $scope.$on(
      "$destroy",
        function( event ) {
          $timeout.cancel( timer );
          window.stompClient.disconnect();
          }
      );
                }, 5000);

How it works...

This type of infrastructure couples application components together in a loose but reliable way.

Messaging architecture overview

In this recipe, we have given our application a MoM. The main idea was to decouple processes as much as possible from the client-request lifecycle.

In an effort to keep our REST API focused on resource handling, some business logic clearly appeared secondary, such as:

  • Notifying the community that a new user has registered an account
  • Notifying the community that a user has performed a specific transaction
  • Notifying the community that a user has liked another user's action

We have decided to create a new webapp dedicated to handle WebSockets. Our API now communicates with the ws web app by sending messages to it.

The message payloads are community Action objects (from the Action.java superclass). From the cloudstreetmarket-api web app to the cloudstreetmarket-websocket webapp, these action objects are serialized and wrapped in AMQP messages. Once sent, they are stacked in one single RabbitMQ queue (AMQP_USER_ACTIVITY).

Both the sender and the receiver parts are AMQP implementations (RabbitTemplate and RabbitListener). This logic will now be processed at the pace that the websocket web app can afford without having an impact on the user experience. When received (on the cloudstreetmarket-websocket side), message payloads are sent on the fly to WebSocket clients as STOMP messages.

The benefit in direct performance here is arguable (in this example). We have after all mostly deferred the publishing of secondary events with an extra messaging layer. However, the benefits in design clarity and business components separation are priceless.

A scalable model

We have talked much about the benefits of keeping web apps stateless. This is what we have tried to do so far with the API and we have been proud of it!

Without HTTP sessions, it would be pretty easy for us to react to traffic surges on the api web app, or on the portal web app. Without too much hassle, we would be able to set up a load balancer on the Apache HTTP proxy with mod_proxy_balancer for HTTP connections.

You can read more about this in the Apache HTTP doc: http://httpd.apache.org/docs/2.2/mod/mod_proxy_balancer.html

A scalable model

For WebSocket web apps, it would work basically the same in stateless. In the Apache HTTP configuration, a configured mod_proxy_wstunnel should handle load balancing over WebSockets and provide an application failover.

AMQP or JMS?

Advanced Message Queuing Protocol (AMQP) defines a wire-level protocol and guarantees interoperability between senders and consumers. Any party compliant with this protocol can create and interpret messages, and thus interoperate with any other compliant component regardless of the underlying technologies.

In comparison, JMS is part of Java platform Enterprise Edition (EE). Coming with the JSR-914, JMS is a standard for APIs that defines how APIs should create, send, receive, and read messages. JMS does not provide wire-level guidance, and it doesn't guarantee interoperability between parties either.

AMQP controls the format of the messages and the flow these messages go through, while JMS controls the technical implementations of the boundaries (operators). When we look for communication consistency within a potentially complex environment, AMQP appears to be a good choice for MoM protocols.

There's more…

This section provides external resources to extend your knowledge about AMQP and about event publishing methods.

A great introduction to AMQP by pivotal

If you want to get a better understanding of AMQP and its differences with JMS, check out the following article on the spring.io website:

https://spring.io/understanding/AMQP

A better way to publish application events

Right now, we didn't implement a proper pattern to publish events. The article accessible from the link below comes from the spring.io blog. It introduces best practices for event publishing with Spring 4.2+:

https://spring.io/blog/2015/02/11/better-application-events-in-spring-framework-4-2

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset