This recipe will demonstrate how to implement a Message-oriented-Middleware (MoM). This is a very popular technique in scalability based on asynchronous communication between components.
We have already introduced the new cloudstreetmarket-shared
and cloudstreetmarket-websocket
Java projects. WebSockets are now split away from cloudstreetmarket-api
, but cloudstreetmarket-websocket
and cloudstreetmarket-api
will still communicate with each other using messaging.
In order to decouple secondary tasks from the request thread (secondary tasks like event producing), you need to learn how to configure and use AMQP message templates and listeners with RabbitMQ.
When the API is requested to perform operations such as create a transaction or create a like activity, we produce events.
In the TransactionController
, the POST handler has been updated as follows:
import org.springframework.amqp.rabbit.core.RabbitTemplate; @RestController public class TransactionController extends CloudstreetApiWCI<Transaction> { @Autowired private RabbitTemplate messagingTemplate; @RequestMapping(method=POST) @ResponseStatus(HttpStatus.CREATED) public TransactionResource post(@Valid @RequestBody Transaction transaction, HttpServletResponse response, BindingResult result) { ... messagingTemplate.convertAndSend("AMQP_USER_ACTIVITY", new UserActivityDTO(transaction)); ... return resource; } }
In the LikeActionController
, the POST handler has been updated as follows:
import org.springframework.amqp.rabbit.core.RabbitTemplate; @RestController public class LikeActionController extends CloudstreetApiWCI<LikeAction> { @Autowired private RabbitTemplate messagingTemplate; @RequestMapping(method=POST) @ResponseStatus(HttpStatus.CREATED) public LikeActionResource post(@RequestBody LikeAction likeAction, HttpServletResponse response) { ... messagingTemplate.convertAndSend("AMQP_USER_ACTIVITY", new UserActivityDTO(likeAction)); ... return resource; } }
As explained previously, the cloudstreetmarket-websocket
module now listens to the AMQP_USER_ACTIVITY
queue.
displatcher-servlet.xml
(cloudstreetmarket-websocket
). There, we create a rabbitConnectionFactory
and a rabbitListenerContainerFactory
bean:<rabbit:connection-factory id="rabbitConnectionFactory" username="guest" host="localhost" password="guest"/> <bean id="rabbitListenerContainerFactory" class="org.sfw.amqp.rabbit.config.SimpleRabbitListenerContainerFactory"> <property name="connectionFactory" ref="rabbitConnectionFactory"/> <property name="concurrentConsumers" value="3"/> <property name="maxConcurrentConsumers" value="10"/> <property name="prefetchCount" value="12"/> </bean>
CSMReceiver
class:@Component public class CSMReceiver { @Autowired private SimpMessagingTemplate simpMessagingTemplate; @RabbitListener(queues = "AMQP_USER_ACTIVITY_QUEUE") public void handleMessage(UserActivityDTO payload) { simpMessagingTemplate.convertAndSend("/topic/actions", payload); } }
WebSocketConfig
bean has been created in cloudstreetmarket-websocket
. This one is very similar to the one we had in cloudstreetmarket-api
.We haven't changed many things on the client side (cloudstreetmarket-webapp
), as we are still focused on the landing page (home_community_activity.js
) at this point.
The main difference is that the STOMP endpoint now targets the /ws
context path. WebSockets are initiated from the init()
function after a 5-second delay. Also, the SockJS
socket and the STOMP client are now centralized in global variables (using the Window
object) to simplify the WebSockets lifecycle during user navigation:
var timer = $timeout( function(){ window.socket = new SockJS('/ws/channels/users/broadcast'); window.stompClient = Stomp.over(window.socket); window.socket.onclose = function() { window.stompClient.disconnect(); }; window.stompClient.connect({}, function(frame) { window.stompClient.subscribe('/topic/actions', function(message){ var newActivity = $this.prepareActivity(JSON.parse(message.body)); $this.addAsyncActivityToFeed(newActivity); $scope.$apply(); }); }); $scope.$on( "$destroy", function( event ) { $timeout.cancel( timer ); window.stompClient.disconnect(); } ); }, 5000);
This type of infrastructure couples application components together in a loose but reliable way.
In this recipe, we have given our application a MoM. The main idea was to decouple processes as much as possible from the client-request lifecycle.
In an effort to keep our REST API focused on resource handling, some business logic clearly appeared secondary, such as:
We have decided to create a new webapp dedicated to handle WebSockets. Our API now communicates with the ws
web app by sending messages to it.
The message payloads are community Action
objects (from the Action.java
superclass). From the cloudstreetmarket-api
web app to the cloudstreetmarket-websocket
webapp, these action objects are serialized and wrapped in AMQP messages. Once sent, they are stacked in one single RabbitMQ queue (AMQP_USER_ACTIVITY
).
Both the sender and the receiver parts are AMQP implementations (RabbitTemplate
and RabbitListener
). This logic will now be processed at the pace that the websocket
web app can afford without having an impact on the user experience. When received (on the cloudstreetmarket-websocket
side), message payloads are sent on the fly to WebSocket clients as STOMP messages.
The benefit in direct performance here is arguable (in this example). We have after all mostly deferred the publishing of secondary events with an extra messaging layer. However, the benefits in design clarity and business components separation are priceless.
We have talked much about the benefits of keeping web apps stateless. This is what we have tried to do so far with the API and we have been proud of it!
Without HTTP sessions, it would be pretty easy for us to react to traffic surges on the api
web app, or on the portal
web app. Without too much hassle, we would be able to set up a load balancer on the Apache HTTP proxy with mod_proxy_balancer
for HTTP connections.
You can read more about this in the Apache HTTP doc: http://httpd.apache.org/docs/2.2/mod/mod_proxy_balancer.html
For WebSocket web apps, it would work basically the same in stateless. In the Apache HTTP configuration, a configured mod_proxy_wstunnel
should handle load balancing over WebSockets and provide an application failover.
Advanced Message Queuing Protocol (AMQP) defines a wire-level protocol and guarantees interoperability between senders and consumers. Any party compliant with this protocol can create and interpret messages, and thus interoperate with any other compliant component regardless of the underlying technologies.
In comparison, JMS is part of Java platform Enterprise Edition (EE). Coming with the JSR-914, JMS is a standard for APIs that defines how APIs should create, send, receive, and read messages. JMS does not provide wire-level guidance, and it doesn't guarantee interoperability between parties either.
AMQP controls the format of the messages and the flow these messages go through, while JMS controls the technical implementations of the boundaries (operators). When we look for communication consistency within a potentially complex environment, AMQP appears to be a good choice for MoM protocols.
This section provides external resources to extend your knowledge about AMQP and about event publishing methods.
If you want to get a better understanding of AMQP and its differences with JMS, check out the following article on the spring.io website:
Right now, we didn't implement a proper pattern to publish events. The article accessible from the link below comes from the spring.io blog. It introduces best practices for event publishing with Spring 4.2+:
https://spring.io/blog/2015/02/11/better-application-events-in-spring-framework-4-2
http://blog.arungupta.me/load-balance-websockets-apache-httpd-techtip48