Streaming social events with STOMP over SockJS

In this recipe, we broadcast user activities (events) with STOMP over SockJS. SockJS provides a custom implementation of WebSocket.

Getting ready

There is some configuration work to be done beforehand, especially on the Apache HTTP proxy. After that, we will see how to initiate a WebSocket with SockJS and with AngularJS on the client side.

Our WebSocket will subscribe to a topic (for broadcasting) published via Spring from the cloudstreetmarket-api module.

How to do it…

  1. From the Git Perspective in Eclipse, checkout the latest version of the branch v8.1.x.
  2. Run the Maven clean and Maven install commands on the zipcloud-parent project (right-click on the project, select Run as… | Maven Clean, then select Run as… | Maven Install). After this, operate a Maven | Update Project to synchronize Eclipse with the Maven configuration (right-click on the project and then click Maven | Update Project…).
  3. Similarly, run the Maven clean and Maven install commands on cloudstreetmarket-parent followed by a Maven | Update Project… (in order to update all cloudstreetmarket-parent modules).

Apache HTTP Proxy configuration

  1. In the Apache httpd.conf file, change the VirtualHost definition to:
    <VirtualHost cloudstreetmarket.com:80>
      ProxyPass        /portal http://localhost:8080/portal
      ProxyPassReverse /portal http://localhost:8080/portal
      ProxyPass        /api  	http://localhost:8080/api
      ProxyPassReverse /api  	http://localhost:8080/api
      RewriteEngine on
      RewriteCond %{HTTP:UPGRADE} ^WebSocket$ [NC]
      RewriteCond %{HTTP:CONNECTION} ^Upgrade$ [NC]
      RewriteRule .* ws://localhost:8080%{REQUEST_URI} [P]
      RedirectMatch ^/$ /portal/index
    </VirtualHost>
  2. Still in httpd.conf, uncomment the line:
    LoadModule proxy_wstunnel_module modules/mod_proxy_wstunnel.so

Frontend

  1. In the index.jsp file (in the cloudstreetmarket-webapp module), two extra JavaScript files are imported:
    <script src="js/util/sockjs-1.0.2.min.js"></script>
    <script src="js/util/stomp-2.3.3.js"></script> 

    Note

    These two files have been copied locally, but originally, both were found online at:

    https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.0.2/sockjs.min.js

    https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.js

  2. For this recipe, all the changes on the client side, are related to the file src/main/webapp/js/home/home_community_activity.js (which drives the feed of User Activities on the landing page). This file is associated with the template /src/main/webapp/html/home.html.
  3. As part of the init() function of homeCommunityActivityController, the following section was added:
    cloudStreetMarketApp.controller('homeCommunityActivityController', function ($scope, $rootScope, httpAuth, modalService, communityFactory, genericAPIFactory, $filter){
      var $this = this,
      socket = new SockJS('/api/users/feed/add'),
      stompClient = Stomp.over(socket);
      pageNumber = 0;
      $scope.communityActivities = {};
      $scope.pageSize=10;
      $scope.init = function () {
        $scope.loadMore();
        socket.onclose = function() {
          stompClient.disconnect();
        };
        stompClient.connect({}, function(frame) {
        stompClient.subscribe('/topic/actions', 	function(message){
         var newActivity = $this.prepareActivity( JSON.parse(message.body)
           );
            $this.addAsyncActivityToFeed(newActivity);
            $scope.$apply();
        });
        });
      ...
      }
    ...
  4. The loadMore() function is still invoked to pull new activities when the bottom of the scroll is reached. However now, because new activities can be inserted asynchronously, the communityActivities variable is no longer an array but an object used as a map with activity IDs as keys. Doing so allows us to merge the synchronous results with the asynchronous ones:
      $scope.loadMore = function () {
        communityFactory.getUsersActivity(pageNumber, $scope.pageSize).then(function(response) {
          var usersData = response.data,
          status = response.status,
          headers  = response.headers,
          config = response.config;
          $this.handleHeaders(headers);
          if(usersData.content){
            if(usersData.content.length > 0){
              pageNumber++;
            }
            $this.addActivitiesToFeed(usersData.content);
          }
        });
      };
  5. As before (since the Chapter4, Building a REST API for a Stateless Architecture), we loop over the community activities to build the activity feed. Now each activity carries a number of likes and comments. Currently, if a user is authenticated, he has the capability to see the number of likes:
    Frontend
  6. TheAngularized HTML bound to the thumb-up image is the following:
        <span ng-if="userAuthenticated() && value.amountOfLikes == 0">
        <img ng-src="{{image}}" class="like-img" 
          ng-init="image='img/icon-finder/1441189591_1_like.png'"
          ng-mouseover="image='img/icon-finder/1441188631_4_like.png'"
          ng-mouseleave="image='img/icon-finder/1441189591_1_like.png'"
          ng-click="like(value.id)"/>
      </span>
  7. In the controller, the like() scope function supports this DOM element to create a new like activity that targets the original activity:
      $scope.like = function (targetActionId){
        var likeAction = {
          id: null,
          type: 'LIKE',
          date: null,
          targetActionId: targetActionId,
          userId: httpAuth.getLoggedInUser()
        };
        genericAPIFactory.post("/api/actions/likes", likeAction);
      }
  8. The opposite logic can also be found to unlike an activity.

Backend

  1. The following Maven dependencies have been added to cloudstreetmarket-api:
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-websocket</artifactId>
          <version>${spring.version}</version>
       </dependency>
       <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-messaging</artifactId>
          <version>${spring.version}</version>
       </dependency>
  2. In the web.xml file (the one from cloudstreetmarket-api), the following attribute must be added to our servlet and to each of its filters:
    <async-supported>true</async-supported>
  3. The following dedicated configuration bean has been created:
    @Configuration
    @ComponentScan("edu.zipcloud.cloudstreetmarket.api")
    @EnableWebSocketMessageBroker
    public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    
       @Override
       public void registerStompEndpoints(final StompEndpointRegistry registry) {
             registry.addEndpoint("/users/feed/add")
                .withSockJS();
        }
       @Override
       public void configureMessageBroker(final MessageBrokerRegistry registry) {
          registry.setApplicationDestinationPrefixes("/app");
           registry.enableSimpleBroker("/topic");
        }
    }

    A new controller ActivityFeedWSControllerhas been added as follows:

    @RestController
    public class ActivityFeedWSController extends CloudstreetApiWCI{
        @MessageMapping("/users/feed/add")
        @SendTo("/topic/actions")
        public UserActivityDTO handle(UserActivityDTO message) throws Exception{
            return message;
        }
        @RequestMapping(value="/users/feed/info", method=GET)
        public String infoWS(){
            return "v0";
        }
    }
  4. As Spring configuration, we have added the following bean to the dispatcher-servlet.xml:
    <bean
      class="org.sfw.web.socket.server.support.OriginHandshakeInterceptor">
        <property name="allowedOrigins">
          <list>
          <value>http://cloudstreetmarket.com</value>
          </list>
        property>
    </bean>

    In security-config.xml, the following configuration has been added to the http Spring Security namespace:

        <security:http create-session="stateless" 
            entry-point-ref="authenticationEntryPoint" authentication-manager-ref="authenticationManager">
        ...
        <security:headers>
          <security:frame-options policy="SAMEORIGIN"/>
        </security:headers>
        ...
        </security:http>

Now let's see how events are generated.

  1. When a new financial transaction is created, a message is sent to the topic /topic/actions. This is done in the TransactionController:
    @RestController
    @ExposesResourceFor(Transaction.class)
    @RequestMapping(value=ACTIONS_PATH + TRANSACTIONS_PATH, produces={"application/xml", "application/json"})
    public class TransactionController extends CloudstreetApiWCI<Transaction> {
      @Autowired
      private SimpMessagingTemplate messagingTemplate;
      @RequestMapping(method=POST)
      @ResponseStatus(HttpStatus.CREATED)
      public TransactionResource post(@Valid @RequestBody Transaction transaction, HttpServletResponse response, BindingResult result) {
        ...
       messagingTemplate.convertAndSend("/topic/actions", new UserActivityDTO(transaction));
        ...
      }
    }

    Similarly, when a like activity is created, a message is also sent to the /topic/actions topic in LikeActionController:

     @RequestMapping(method=POST)
    @ResponseStatus(HttpStatus.CREATED)
    public LikeActionResource post(@RequestBody LikeAction likeAction, HttpServletResponse response) {
       ...
        likeAction = likeActionService.create(likeAction);
       messagingTemplate.convertAndSend("/topic/actions", new UserActivityDTO(likeAction));
       ...
    }
  2. Now start the Tomcat server. Log in to the application using Yahoo! Oauth2 and your personal Yahoo! account (if you don't have one yet, please create one). Register a new user for the Cloudstreet Market application.
  3. In your web browser, open two different tabs in the application with your logged-in user. Keep one of these tabs on the landing page.
  4. With the other tab, navigate to the Prices and market | All prices search menu. Search for a ticker, let's say Facebook, and buy three stocks of it.
  5. Wait to receive the information message:
    Backend

    Then check the first tab of the browser (the tab you were not using).

    Backend

    You will notice that the activity feed has received a new element at the top!

  6. Also, in the console you should have the following log trace:
    Backend
  7. Similarly, like events are refreshed in real time:
    Backend

How it works...

Here, we are going to look at a couple of general concepts about WebSockets, STOMP, and SockJS before introducing the Spring-WebSocket support tools.

An introduction to WebSockets

WebSocket is a full-duplex communication protocol based on TCP. A full-duplex communication system allows two parties to speak and to be heard simultaneously through a bidirectional channel. A conversation by telephone is probably the best example of a full-duplex system.

This technology is particularly useful for applications that need to leverage the overhead induced by new HTTP connections. Since 2011, the WebSocket protocol has been an Internet Standard (https://tools.ietf.org/html/rfc6455).

WebSocket Lifecycle

Before the WebSocket connection is established, the client initiates a handshake HTTP to which the server responds. The handshake request also represents a protocol upgrade request (from HTTP to WebSocket), formalized with an Upgrade header. The server confirms this protocol upgrade with the same Upgrade header (and value) in its response. In addition to the Upgrade header, and in a perspective of protection against caching-proxy attacks, the client also sends a base-64 encoded random key. To this, the server sends back a hash of this key in a Sec-WebSocket-Accept header.

Here is an example of a handshake occurring in our application:

WebSocket Lifecycle

The protocol lifecycle can be summarized by the following sequence diagram:

WebSocket Lifecycle

Two dedicated URI schemes

The protocol defines two URI schemes for WebSockets ws:// and wss:// (with wss allowing encrypted connections).

The STOMP protocol

STOMP stands for Simple Text Oriented Messaging Protocol. This protocol provides a frame-based interoperable format that allows STOMP clients to communicate with STOMP message brokers.

It is a messaging protocol that requires and trusts an existing 2-way streaming network protocol on a higher level. WebSocket provides a frame-based data-transfer, and the WebSocket frames can indeed be STOMP-formatted frames.

Here is an example of a STOMP frame:

CONNECTED
session:session-4F_y4UhJTEjabe0LfFH2kg
heart-beat:10000,10000
server:RabbitMQ/3.2.4
version:1.1
user-name:marcus

A frame has the following structure:

The STOMP protocol

The STOMP protocol specification defines a set of client commands (SEND, SUBSCRIBE, UNSUBSCRIBE, BEGIN, COMMIT, ABORT, ACK, NACK, DISCONNECT, CONNECT, and STOMP) and server commands (CONNECTED, MESSAGE, RECEIPT, and ERROR).

Only SEND, MESSAGE, and ERROR frames can have a body. The protocol specification can be found online at: http://stomp.github.io/stomp-specification-1.2.html.

On the client side, we have used the JavaScript library STOMP Over WebSocket identified with the file stomp.js. This library maps STOMP formatted frames to WebSocket frames. By default, it looks up the web browser WebSocket class to make the STOMP client create the WebSocket.

The library can also create STOMP clients from custom WebSocket implementations. From the SockJS WebSockets, we create STOMP clients like so:

    var socket = new SockJS('/app/users/feed/add');
    var stompClient = Stomp.over(socket);
        stompClient.connect({}, function(frame) {
  ...
    });
    socket.onclose = function() {
    stompClient.disconnect();
  };

SockJS

WebSockets are supported by almost all browsers nowadays. Still, we don't have control over the versions that our customers are using. In many cases, hiding such a technology from 7 to 15% of the audience is simply not an option.

On the client side, SockJS provides a custom implementation that can be seen as a decorator around the browser-native WebSocket implementation. With a simple and handy library, SockJS ensures cross-browser compatibility. With a list of fallback transport options (xhr-streaming, xdr-streaming, iframe-eventsource, iframe-htmlfile, xhr-polling, and so on), it emulates WebSockets as much as possible.

For server implementations, to match the clients' fallback behaviors, SockJS also defines its own protocol:

http://sockjs.github.io/sockjs-protocol/sockjs-protocol-0.3.3.html

Spring WebSocket support

As per the Java WebSocket API specification (JSR-356), Spring 4+ provides a solution that is packaged within the modules spring-websocket and spring-messaging. But Spring provides more than just an implementation of JSR-356. For example, based upon the facts that:

  • WebSockets without the use of a message protocol are too low level to be directly used in applications without custom handling frameworks: the Spring team made the choice to provide and support a messaging protocol implementation (STOMP).
  • WebSockets are not supported by all browsers yet: Spring also provides a WebSocket fallback support with its implementation of the SockJS protocol.

All-in-one configuration

We have enabled the WebSocket engine and configured it for SockJS and STOMP from only one configuration bean—WebSocketConfig::

@Configuration
@ComponentScan("edu.zipcloud.cloudstreetmarket.api")
@EnableWebSocketMessageBroker
public class WebSocketConfig extends   AbstractWebSocketMessageBrokerConfigurer {

  
  @Override
  public void registerStompEndpoints(final StompEndpointRegistry registry) {
        registry.addEndpoint("/users/feed/add")
        .withSockJS();
  }

  @Override
  public void configureMessageBroker(final MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");
        registry.enableSimpleBroker("/topic");
  }
}

The WebSocket endPoint is defined for the context path /users/feed/add. It matches on the client side, the defined SockJS client constructor argument:

var socket = new SockJS('/api/users/feed/add');

From the endpoint (clientInboundChannel), the WebSocket engine needs to choose where to route the message to, and we have two options here for this. Depending on the situation and what we want to achieve, we can target an in-app consumer (message handler) or directly the message broker in order to dispatch the message to the subscribed clients.

This split is configured by defining two different destination prefixes. In our case, we decided to use the /app prefix to route messages to the corresponding message handlers and the /topic prefix to identify messages that are ready to be dispatched to clients.

Let's see now how message handlers can be defined and how they can be used.

Defining message handlers via @MessageMapping

@MessageMapping annotations are used in Spring MVC controller methods to mark them available as message handler methods.

From a message in the clientInboundChannel to be routed to a message handler, the WebSocket engine narrows down the right @MessageMapping method based upon their configured value.

As usual in Spring MVC, this value can be defined in an Ant-style (such as/targets/** for example). However, in the same way as the @RequestParam and @PathVariable annotations, template variables can also be passed through using @DestinationVariable annotations on method arguments (destination templates are defined like so: /targets/{target}).

Sending a message to dispatch

A message broker must be configured. In the case of this recipe, we are using a simple message broker (simpMessageBroker) that we have enabled from MessageBrokerRegistry. This type of in-memory broker is suited to stack STOMP messages when there is no need for external brokers (RabbitMQ, ActiveMQ, and so on). When there is availability to dispatch messages to WebSocket clients, these messages are sent to clientOutboundChannel.

We have seen that when message destinations are prefixed with /topic (like in our case), messages are directly sent to the message broker. But what about sending messages for dispatch when we are in a message handler method or elsewhere in the back-end code? We can use for this the SimpMessagingTemplate described in the next section.

SimpMessagingTemplate

We auto-wired a SimpMessagingTemplate in the CSMReceiver class and we will use it later to forward the payload of AMQP messages to WebSocket clients.

A SimpMessagingTemplate serves the same purpose as the Spring JmsTemplate (if you are familiar with it), but it suits simple messaging protocols (such as STOMP).

A handy and inherited famous method is the convertAndSend method, which tries to identify and use a MessageConverter to serialize an object and put it into a new message before sending this message to the specified destination:

simpMessagingTemplate.convertAndSend(String destination, Object message);

The idea is to target an identified destination (with a /topic prefix in our case) for a message broker.

The @SendTo annotation

This annotation saves us from having to explicitly use the SimpMessagingTemplate. The destination is specified as the annotation value. This method will also handle the conversion from payload to message:

@RestController
public class ActivityFeedWSController extends CloudstreetApiWCI{

  @MessageMapping("/users/feed/add")
  @SendTo("/topic/actions")
  public UserActivityDTO handle(UserActivityDTO payload) throws Exception{
        return payload;
 }
}

There's more…

In this section, we provide and extra source of information related to the SockJS fallback options.

As introduced earlier, Spring provides a SockJS protocol implementation. It is easy to configure SockJS in Spring using the withSockJS() functional method during the StompEndPoint registration. This little piece of configuration alone, tells Spring to activate SockJS fallback options on our endpoint.

The very first call of the SockJS client to the server is an HTTP request to the endpoint path concatenated with /info to assess the server configuration. If this HTTP request does not succeed, no other transport is attempted (not even WebSocket).

You can read more in the Spring reference guide if you want to understand how a SockJS client queries the server for a suitable fallback option:

http://docs.spring.io/spring/docs/current/spring-framework-reference/html/websocket.html#websocket-server-handshake

See also

  • JSR-356: You can find the specification document online to read more about the Java API for WebSocket specification that spring-websocket is complying with: https://jcp.org/en/jsr/detail?id=356
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset