Chapter 4. Publish-and-Subscribe Messaging

This chapter focuses on the publish-and-subscribe (pub/sub) messaging model that was introduced in Chapter 2. The pub/sub messaging model allows a message producer (also called a publisher) to broadcast a message to one or more consumers (called subscribers). There are three important aspects of the pub/sub model:

  • Messages are pushed to consumers, which means that consumers are delivered messages without having to request them. Messages are exchanged through a virtual channel called a topic. A topic is a destination where producers can publish, and subscribers can consume, messages. Messages delivered to a topic are automatically pushed to all qualified consumers.

  • As in enterprise messaging in general, there is no coupling of the producers to the consumers. Subscribers and publishers can be added dynamically at runtime, which allows the system to grow or shrink in complexity over time.

  • Every client that subscribes to a topic receives its own copy of messages published to that topic. A single message produced by one publisher may be copied and distributed to hundreds, or even thousands of subscribers.

In Chapter 2 you learned the basics of the pub/sub model by developing a simple chat client. In this chapter we will build on those lessons and examine more advanced features of this model, including guaranteed messaging, topic-based addressing, durable subscriptions, request-reply, and temporary topics.

Getting Started with the B2B Application

In this chapter we abandon the simple chat example for a more complex and real-world Business-to-Business (B2B) scenario. In our new example, a wholesaler wants to distribute price information to retailers, and the retailers want to respond by generating orders. We'll implement this scenario using the publish-and-subscribe model: the wholesaler will publish messages containing new prices and hot deals, and the retailers will respond by creating their own messages to order stock.

This scenario is typical of many Business-to-Business operations. We call the clients retailers and wholesalers, but these names are really only for convenience. There's little difference between our wholesaler/retailer scenario and a stock broker broadcasting stock prices to investors, or a manufacturer broadcasting bid requests to multiple suppliers. The fact that we use a retailer and a wholesaler to illustrate our example is much less important than the way we apply JMS.

Our simple trading system is implemented by two classes, both of which are JMS clients: Wholesaler and Retailer . In the interest of keeping the code simple, we won't implement a fancy user interface; our application has a rudimentary command-line user interface.

Running the B2B Application

Before looking at the code, let's look at how the application works. As with the Chat application, the Wholesaler class includes a main( ) method so it can be run as a standalone Java application. It's executed from the command line as follows:

java chap4.B2B.Wholesaler localhost username password

username and password are the authentication information for the client. The Retailer class can be executed in the same manner:

java chap4.B2B.Retailer localhost username password

Start your JMS server, then run one instance of a Wholesaler client and a Retailer client in separate command windows. In the Wholesaler client you are prompted to enter an item description, an old price, and a new price. Enter the following as shown:

Bowling Shoes, 100.00, 55.00

Upon hitting the Enter key, you should see the Retailer application display information on the screen indicating that it has received a price change notice. You should then see the Wholesaler indicating that it has received a "buy" order from the Retailer. Here's the complete interaction with the Wholesaler and the Retailer:[1]

               java chap4.B2B.Wholesaler localhost WHOLESALER passwd1
Enter: Item, Old Price, New Price
e.g., Bowling Shoes, 100.00, 55.00
Bowling Shoes, 100.00, 55.00
Order received - 1000 Bowling Shoes from DurableRetailer
-----------------------
java chap4.B2B.Retailer localhost RETAILER passwd2
Retailer application started.
Received Hot Buy: Bowling Shoes, 100.00, 55.00
Buying 1000 Bowling Shoes

Here's what happened. The Wholesaler publishes a price quotation on a topic, "Hot Deals," which is intended for one or more Retailers. The Retailers subscribe to the "Hot Deals" topic in order to receive price quotes. The Retailer application has no interaction with a live user. Instead, it has an autoBuy( ) method that examines the old price and the new price. If the new price represents a reduction of greater than ten percent, the Retailer sends a message back to the Wholesaler on the "Buy Order" topic, telling it to purchase 1,000 items. In JMS terms, the Wholesaler is a producer of the "Hot Deals" topic and a consumer of the "Buy Order" topic. Conversely, the Retailer is a consumer of the "Hot Deals" topic and a producer of the "Buy Order" topic, as illustrated in Figure 4.1.

Producers and consumers in the B2B example

Figure 4.1. Producers and consumers in the B2B example

The B2B Source Code

The rest of this chapter examines the source code for the Wholesaler and Retailer classes, and covers several advanced subjects related to the pub/sub messaging model.

The Wholesaler class

After the listing, we will take a brief tour of the methods in this class, and discuss their responsibilities. We will go into detail about the implementation later in this chapter. Now, here is the complete definition of the Wholesaler class, which is responsible for publishing items to the "Hot Deals" topic and receiving "Buy Orders" on those deals from retailers:

public class Wholesaler implements javax.jms.MessageListener{

   private javax.jms.TopicConnection connect = null;
   private javax.jms.TopicSession pubSession = null;
   private javax.jms.TopicSession subSession = null;
   private javax.jms.TopicPublisher publisher = null;
   private javax.jms.TopicSubscriber subscriber = null;
   private javax.jms.Topic hotDealsTopic = null;
   private javax.jms.TemporaryTopic buyOrdersTopic = null;

   public Wholesaler(String broker, String username, String password){
      try {
         Properties env = new Properties( );
         // ... specify the JNDI properties specific to the vendor
         
         InitialContext jndi = new InitialContext(env);
                  
         TopicConnectionFactory factory = 
          (TopicConnectionFactory)jndi.lookup(broker);
         connect = factory.createTopicConnection (username, password);

         pubSession = 
          connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
         subSession = 
          connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
         
         hotDealsTopic = (Topic)jndi.lookup("Hot Deals");
         publisher = pubSession.createPublisher(hotDealsTopic);

         buyOrdersTopic = subSession.createTemporaryTopic( );

         subscriber = subSession.createSubscriber(buyOrdersTopic);
         subscriber.setMessageListener(this);
         
         connect.start( );
         
      } catch (javax.jms.JMSException jmse){
         jmse.printStackTrace( ); System.exit(1);
      } catch (javax.naming.NamingException jne){
         jne.printStackTrace( ); System.exit(1);
      }
   }
   private void publishPriceQuotes(String dealDesc, String username, 
                                   String itemDesc,  float oldPrice, 
                                   float newPrice){
      try {
        javax.jms.StreamMessage message = 
           pubSession.createStreamMessage( );
        message.writeString(dealDesc);
        message.writeString(itemDesc);
        message.writeFloat(oldPrice);
        message.writeFloat(newPrice);
                   
        message.setStringProperty("Username", username);
        message.setStringProperty("Itemdesc", itemDesc);
                   
        message.setJMSReplyTo(buyOrdersTopic);               
                   
        publisher.publish(
            message,
            javax.jms.DeliveryMode.PERSISTENT,
            javax.jms.Message.DEFAULT_PRIORITY,
            1800000);
            
      } catch ( javax.jms.JMSException jmse ){
         jmse.printStackTrace( );
      }      
   }
   public void onMessage( javax.jms.Message message){
      try {
         TextMessage textMessage = (TextMessage) message;
         String text = textMessage.getText( );
         System.out.println("
Order received - "+text+
                            " from " + message.getJMSCorrelationID( ));
      } catch (java.lang.Exception rte){
         rte.printStackTrace( );
      }         
   }
   public void exit( ){
      try {
        connect.close( );
      } catch (javax.jms.JMSException jmse){
        jmse.printStackTrace( );
      }
      System.exit(0);
   }
   public static void main(String argv[]) {
      String broker, username, password;
      if  (argv.length == 3){
         broker = argv[0];
         username = argv[1];
         password = argv[2];
      } else  {
         System.out.println("Invalid arguments. Should be: ");
         System.out.println("java Wholesaler broker username password");
         return;
      }
      
      Wholesaler wholesaler = new Wholesaler(broker,username,password);
      
      try {
         // Read all standard input and send it as a message.
         java.io.BufferedReader stdin = new java.io.BufferedReader
            (new java.io.InputStreamReader( System.in ) );
         System.out.println ("Enter: Item, Old Price, New Price");
         System.out.println("
e.g., Bowling Shoes, 100.00, 55.00");

         while ( true ){
            String dealDesc = stdin.readLine( );
            if  (dealDesc != null && dealDesc.length( ) > 0){
               // Parse the deal description
               StringTokenizer tokenizer = 
               new StringTokenizer(dealDesc,",") ;
                  String itemDesc = tokenizer.nextToken( );
                  String temp = tokenizer.nextToken( );
                  float oldPrice = 
                    Float.valueOf(temp.trim()).floatValue( );
                  temp = tokenizer.nextToken( );
                  float newPrice = 
                    Float.valueOf(temp.trim()).floatValue( );
                  
               wholesaler.publishPriceQuotes(dealDesc,username,
                                           itemDesc,oldPrice,newPrice);
            } else  {
                wholesaler.exit( );
            }
         }
      } catch ( java.io.IOException ioe ){
         ioe.printStackTrace( );
      }
   }
}

The main( ) method creates an instance of the Wholesaler class, passing it the information it needs to set up its publishers and subscribers.

In the Wholesaler class's constructor, JNDI is used to obtain the "Hot Deals" topic identifier, which is then used to create a publisher. Most of this should look familiar to you; it's similar in many ways to the Chat application, except for the creation of a temporary topic, which is discussed in more detail later in this section.

Once the Wholesaler is instantiated, the main( ) method continues to monitor the command line for new "Hot Deals." When a "Hot Deal" is entered at the command prompt, the main( ) method parses the information and passes it to the Wholesaler instance via the publishPriceQuotes( ) method.

The publishPriceQuotes( ) method is responsible for publishing messages containing information about price quotes to the "Hot Deals" topic.

The onMessage( ) method receives messages from clients responding to deals published on the "Hot Deals" topic. The contents of these messages are simply printed to the command line.

The Retailer class

Here is the complete definition of the Retailer class, which subscribes to the "Hot Deals" topic and responds with "Buy Orders" on attractive deals:

public class Retailer implements javax.jms.MessageListener{

    private javax.jms.TopicConnection connect = null;
    private javax.jms.TopicSession session = null;
    private javax.jms.TopicPublisher publisher = null;
    private javax.jms.Topic hotDealsTopic = null;

    public Retailer( String broker, String username, String password){
        try {
            Properties env = new Properties( );
            // ... specify the JNDI properties specific to the vendor
                 
            InitialContext jndi = new InitialContext(env);
                          
            TopicConnectionFactory factory = 
            (TopicConnectionFactory)jndi.lookup(broker);
                
            connect = factory.createTopicConnection(username, password);
            connect.setClientID("DurableRetailer");

            session = 
            connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
                
            hotDealsTopic = (Topic)jndi.lookup("Hot Deals");
                
            javax.jms.TopicSubscriber subscriber = 
                session.createDurableSubscriber(hotDealsTopic,
                   "Hot Deals Subscription");
            subscriber.setMessageListener(this);
            connect.start( );
            
        } catch (javax.jms.JMSException jmse){
            jmse.printStackTrace( );
            System.exit(1);
        } catch (javax.naming.NamingException jne){
         jne.printStackTrace( ); System.exit(1);
        }
    }
    public void onMessage(javax.jms.Message aMessage){
        try {
            autoBuy(aMessage);
        } catch (java.lang.RuntimeException rte){
            rte.printStackTrace( );
        }            
    }
    
    private void autoBuy (javax.jms.Message message){
        int count = 1000;
        try {
            StreamMessage strmMsg = (StreamMessage)message;
            String dealDesc = strmMsg.readString( );
            String itemDesc = strmMsg.readString( );
            float oldPrice = strmMsg.readFloat( );
            float newPrice = strmMsg.readFloat( );
            System.out.println("Received Hot Buy :"+dealDesc);
            
            // If price reduction is greater than 10 percent, buy
            if (newPrice == 0 || oldPrice / newPrice > 1.1){
                System.out.println("
Buying " + count +" "+ itemDesc);

                TextMessage textMsg = session.createTextMessage( );
                textMsg.setText(count + " " + itemDesc );

                javax.jms.Topic buytopic = 
                    (javax.jms.Topic)message.getJMSReplyTo( );
                    
                publisher = session.createPublisher(buytopic);
                
                textMsg.setJMSCorrelationID("DurableRetailer");
                
                publisher.publish(
                    textMsg,
                    javax.jms.DeliveryMode.PERSISTENT,
                    javax.jms.Message.DEFAULT_PRIORITY,
                    1800000);
            } else  {
                System.out.println ("
Bad Deal- Not buying.");
            }
        } catch (javax.jms.JMSException jmse){
            jmse.printStackTrace( );
        }
    }
    private void exit(String s){
        try {
            if ( s != null && 
                s.equalsIgnoreCase("unsubscribe"))
            {
                subscriber.close( );
                session.unsubscribe("Hot Deals Subscription");
            }
            connect.close( );
        } catch (javax.jms.JMSException jmse){
            jmse.printStackTrace( );
        }
        System.exit(0);
    }
    public static void main(String argv[]) {
        String broker, username, password;
        if  (argv.length == 3){
            broker = argv[0];
            username = argv[1];
            password = argv[2];
        } else  {
            System.out.println("Invalid arguments. Should be: ");
            System.out.println
            ("java Retailer broker username password");
            return;
        }
        
        Retailer retailer  = new Retailer(broker, username, password);
        
        try {
            System.out.println("
Retailer application started.
");
            // Read all standard input and send it as a message.
            java.io.BufferedReader stdin =
                new java.io.BufferedReader
                ( new java.io.InputStreamReader( System.in ) );
            while ( true ){
                String s = stdin.readLine( );
                if ( s == null )retailer.exit(null);
                else if ( s.equalsIgnoreCase("unsubscribe") )
                    retailer.exit ( s );
            }
        } catch ( java.io.IOException ioe ){
            ioe.printStackTrace( );
        }
    }
}

The main( ) method of Retailer is much like the main( ) method of Wholesaler. It creates an instance of the Retailer class and passes it the information it needs to set up its publishers and subscribers.

The constructor of the Retailer class is also similar to that of the Wholesaler class, except that it creates a durable subscription using the "Hot Deals" topic. Durable subscriptions will be discussed in more detail later in this section.

Once the Retailer is instantiated, the main( ) method uses the readLine( ) method as a way of blocking program execution in order to monitor for message input.

The publishPriceQuotes( ) method is responsible for publishing messages containing information about price quotes to the "Hot Deals" topic.

The onMessage( ) method receives messages from the Wholesaler client, then delegates its work to the autoBuy( ) method. The autoBuy( ) method examines the message, determines whether the price change is significant, and arbitrarily orders 1000 items. It orders the items by publishing a persistent message back to the Wholesaler client's temporary topic, using the JMSCorrelationID as a way of identifying itself. We will examine persistent publishing and temporary topics in the next section.



[1] WHOLESALER and RETAILER are usernames you have set up when configuring your JMS server. passwd1 and passwd2 are the passwords you've assigned to those usernames. If you are using an evaluation version of a JMS provider, it may not be necessary to set up usernames and passwords; check your vendor's documentation for more information.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset