The QWholesaler and QRetailer

Let's rethink our wholesaler/retailer scenario in terms of the distinction between the two message models. The pub/sub model is well suited for sending price quotes, since that is naturally a one-to-many broadcast. However, when the retailer responds with a "buy" order, it is more appropriate to use a point-to-point queue. In the real world, retailers naturally deal with many wholesalers, and you would only send a purchase order to the wholesaler that offered the quote.

From the user's perspective, the QWholesaler and QRetailer examples that we'll develop now are functionally equivalent to the Wholesaler and Retailer examples introduced in Chapter 4. The difference lies in the use of the point-to-point queue for responses to price quotes. If you wish to see these classes in action, start your JMS provider and execute the following commands, each in a separate command window:

java chap5.B2B.QWholesaler localhost username password
java chap5.B2B.QRetailer localhost username password

The QRetailer Class

Here is the listing for the QRetailer class in its entirety. Later, we will examine this class in detail:

import java.util.StringTokenizer;
import java.util.Properties;
import javax.naming.InitialContext;
import javax.jms.TopicConnectionFactory;
import javax.jms.QueueConnectionFactory;
import javax.jms.Topic;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

public class QRetailer implements javax.jms.MessageListener {

    private javax.jms.QueueConnection qConnect = null;    
    private javax.jms.QueueSession qSession = null;
    private javax.jms.QueueSender qSender = null;

    private javax.jms.TopicConnection tConnect = null;
    private javax.jms.TopicSession tSession = null;

    private javax.jms.Topic hotDealsTopic = null;
    private javax.jms.TopicSubscriber tsubscriber = null;
    private static boolean useJNDI = false;
    private static String uname = null;

    public QRetailer( String broker, String username, String password){
        try  {
            TopicConnectionFactory tFactory = null;
            QueueConnectionFactory qFactory = null;
            InitialContext jndi = null;
            uname = username;
            
            Properties env = new Properties( );
            // ... specify the JNDI properties specific to the vendor
            jndi = new InitialContext(env);
            tFactory = 
                (TopicConnectionFactory)jndi.lookup(broker);
            qFactory = 
                (QueueConnectionFactory)jndi.lookup(broker);
            tConnect = 
                tFactory.createTopicConnection (username, password);
            qConnect = 
                qFactory.createQueueConnection (username, password);
            tConnect.setClientID(username);
            qConnect.setClientID(username);

            tSession = 
                tConnect.createTopicSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            qSession = 
                qConnect.createQueueSession(false,
                    javax.jms.Session.AUTO_ACKNOWLEDGE);
    
            hotDealsTopic = (Topic)jndi.lookup("Hot Deals");

            tsubscriber = 
                tSession.createDurableSubscriber(hotDealsTopic, 
                                             "Hot Deals Subscription");
            tsubscriber.setMessageListener(this);
            tConnect.start( );
            
        } catch  (javax.jms.JMSException jmse){
            jmse.printStackTrace( );
            System.exit(1);
        } catch (javax.naming.NamingException jne){
         jne.printStackTrace( ); System.exit(1);
        }
    }
    public void onMessage(javax.jms.Message aMessage){
        try  {
            autoBuy(aMessage);
        } catch  (java.lang.RuntimeException rte){
            rte.printStackTrace( );
        }            
    }
    private void autoBuy (javax.jms.Message message){
        try  {
            StreamMessage strmMsg = (StreamMessage)message;
            String dealDesc = strmMsg.readString( );
            String itemDesc = strmMsg.readString( );
            float oldPrice = strmMsg.readFloat( );
            float newPrice = strmMsg.readFloat( );
            System.out.println("Received Hot Buy: "+dealDesc);
            
            // If price reduction is greater than 10 percent, buy
            if (newPrice == 0 || oldPrice / newPrice > 1.1) {
                int count = (int)(java.lang.Math.random( )*(double)1000);
                System.out.println("
Buying " + count +" "+ itemDesc);

                TextMessage textMsg = tSession.createTextMessage( );
                textMsg.setText(count + " " + itemDesc );
                textMsg.setIntProperty("QTY", count);

                textMsg.setJMSCorrelationID(uname);

                Queue buyQueue = (Queue)message.getJMSReplyTo( );
                
	            qSender = qSession.createSender(buyQueue);
                qSender.send( textMsg, 
                              javax.jms.DeliveryMode.PERSISTENT,
                              javax.jms.Message.DEFAULT_PRIORITY,
                              1800000);
            } else {
                System.out.println ("
Bad Deal.  Not buying");
            }
        } catch  (javax.jms.JMSException jmse){
            jmse.printStackTrace( );
        }
    }
    private void exit(String s){
        try {
            if ( s != null && 
                s.equalsIgnoreCase("unsubscribe"))
            {
                tsubscriber.close( );
                tSession.unsubscribe("Hot Deals Subscription");
            }
            tConnect.close( );
            qConnect.close( );
        } catch  (javax.jms.JMSException jmse){
            jmse.printStackTrace( );
        }
        System.exit(0);
    }
    public static void main(String argv[]) {
        String broker, username, password;
        if (argv.length == 3){
            broker = argv[0];
            username = argv[1];
            password = argv[2];
        } else {
            System.out.println("Invalid arguments. Should be: ");
            System.out.println
            ("java QRetailer broker username password");
            return;
        }
        
        QRetailer retailer  = new QRetailer(broker, username, password);
        
        try  {
            System.out.println("
Retailer application started.
");
            // Read all standard input and send it as a message
            java.io.BufferedReader stdin =
                new java.io.BufferedReader
                ( new java.io.InputStreamReader( System.in ) );
            while ( true ){
                String s = stdin.readLine( );
                if ( s == null )retailer.exit(null);
                else if ( s.equalsIgnoreCase("unsubscribe") )
                    retailer.exit ( s );
            }
        } catch  ( java.io.IOException ioe ){
            ioe.printStackTrace( );
        }
    }
}

Now let's look at the code in detail. A session can either be a QueueSession (point-to-point) or a TopicSession (publish/subscribe). It cannot be both at the same time. Similarly, a queue can be either a QueueConnection or a TopicConnection. Therefore we create a connection and a session for each model in the QRetailer's constructor:

public QRetailer( String broker, String username, String password){
    ...
       tFactory = 
            (TopicConnectionFactory)jndi.lookup(broker);
       qFactory = 
            (QueueConnectionFactory)jndi.lookup(broker);
       tConnect = 
            tfactory.createTopicConnection (username, password);
       qConnect = 
            qfactory.createQueueConnection (username, password);

       tConnect.setClientID(username);
       qConnect.setClientID(username);

       tSession = 
            tConnect.createTopicSession(false,
                Session.AUTO_ACKNOWLEDGE);
       qSession = 
           qConnect.createQueueSession(false,
               javax.jms.Session.AUTO_ACKNOWLEDGE);
    
    ...

The autoBuy( ) method is responsible for sending the "Buy Order" messages. This method is invoked by the onMessage( ) handler for the "Hot Deals" topic:

private void autoBuy (javax.jms.Message message){
       ...
       textMsg.setJMSCorrelationID("DurableRetailer");

       Queue buyQueue = (Queue)message.getJMSReplyTo( );
                
       qSender = qSession.createSender(buyQueue);
                  qSender.send( textMsg, 
       javax.jms.DeliveryMode.PERSISTENT,
       javax.jms.Message.DEFAULT_PRIORITY,
       1800000);
       ...
}

The Message object itself is independent of the domain being used to transport it. All of the same headers and properties apply to each. Therefore, we can set a correlation ID, extract an object to reply to (this time, it's a Queue), and use the Queue to create a sender. The QueueSender.send( ) method is identical in form to the TopicPublisher.publish( ) method. Here is the interface definition for the QueueSender object:

public interface QueueSender
       Extends MessageProducer
{
       Queue getQueue( ) throws JMSException;
       void send(Message message) throws JMSException, 
           MessageFormatException, InvalidDestinationException;
	
       void send(Message message, int deliveryMode, 
       int priority, long timeToLive) throws JMSException, 
           MessageFormatException, InvalidDestinationException;

       void send(Queue queue, Message message) throws JMSException, 
           MessageFormatException, InvalidDestinationException;

       void send(Queue queue, Message message, int deliveryMode, 
       int priority, long timeToLive) throws JMSException, 
           MessageFormatException, InvalidDestinationException;
}

The QueueSender object is created as part of the onMessage( ) handler, using the call to the createSender( ) method. As an alternative, we could have created the QueueSender once in the QRetailer's constructor using null as a parameter, then specified buyQueue as a parameter to the send( ) operation each time the onMessage( ) method is invoked. For most applications, this would be more efficient; the example recreates the QueueSender for clarity.

The QWholesaler Class

Here is the complete listing for the QWholesaler class:

import java.util.StringTokenizer;
import java.util.Properties;
import javax.naming.InitialContext;
import javax.jms.TopicConnectionFactory;
import javax.jms.QueueConnectionFactory;
import javax.jms.Topic;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.jms.Session;
import javax.jms.TextMessage;

public class QWholesaler implements javax.jms.MessageListener{

    private javax.jms.TopicConnection tConnect = null;
    private javax.jms.TopicSession tSession = null;
    private javax.jms.TopicPublisher tPublisher = null;
   
    private javax.jms.QueueConnection qConnect = null;    
    private javax.jms.QueueSession qSession = null;
    private javax.jms.Queue receiveQueue = null;

    private javax.jms.Topic hotDealsTopic = null;
    private javax.jms.TemporaryTopic buyOrdersTopic = null;

   public QWholesaler(String broker, String username, String password){
      try  {
        TopicConnectionFactory tFactory = null;
        QueueConnectionFactory qFactory = null;
        InitialContext jndi = null;
        
        Properties env = new Properties( );
        // ... specify the JNDI properties specific to the vendor
        jndi = new InitialContext(env);
                          
        tFactory = 
            (TopicConnectionFactory)jndi.lookup(broker);
        qFactory = 
            (QueueConnectionFactory)jndi.lookup(broker);
        tConnect = tFactory.createTopicConnection (username, password);
        qConnect = qFactory.createQueueConnection (username, password);
        tSession = 
            tConnect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
        qSession = 
            qConnect.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
             
        hotDealsTopic = (Topic)jndi.lookup("Hot Deals");
        receiveQueue = (Queue)jndi.lookup("SampleQ1");        

        tPublisher = tSession.createPublisher(hotDealsTopic);

        QueueReceiver qReceiver = qSession.createReceiver(receiveQueue);
        qReceiver.setMessageListener(this);

        // Now that setup is complete, start the Connection
        qConnect.start( );
        tConnect.start( );
      } catch  (javax.jms.JMSException jmse){
         jmse.printStackTrace( ); System.exit(1);
      } catch (javax.naming.NamingException jne){
         jne.printStackTrace( ); System.exit(1);
      }
   }
   private void publishPriceQuotes(String dealDesc, String username, 
                                   String itemDesc,  float oldPrice, 
                                   float newPrice){
      try  {
        javax.jms.StreamMessage message = 
              tSession.createStreamMessage( );
        message.writeString(dealDesc);
        message.writeString(itemDesc);
        message.writeFloat(oldPrice);
        message.writeFloat(newPrice);
                   
        message.setStringProperty("Username", username);
        message.setStringProperty("itemDesc", itemDesc);
                   
        message.setJMSReplyTo(receiveQueue);
                   
        tPublisher.publish(
            message,
            javax.jms.DeliveryMode.PERSISTENT,
            javax.jms.Message.DEFAULT_PRIORITY,
            1800000);
      } catch  ( javax.jms.JMSException jmse ){
         jmse.printStackTrace( );
      }      
   }
   public void onMessage( javax.jms.Message message){
      try  {
         TextMessage textMessage = (TextMessage) message;
         String text = textMessage.getText( );
         System.out.println("Order received - "+text+
                            " from " + message.getJMSCorrelationID( ));
      } catch  (java.lang.Exception rte){
         rte.printStackTrace( );
      }         
   }
   public void exit( ){
      try  {
        tConnect.close( );
        qConnect.close( );
      } catch  (javax.jms.JMSException jmse){
        jmse.printStackTrace( );
      }
      System.exit(0);
   }
   public static void main(String argv[]) {
      String broker, username, password;
      if (argv.length == 3){
         broker = argv[0];
         username = argv[1];
         password = argv[2];
      } else {
         System.out.println("Invalid arguments. Should be: ");
         System.out.println
               ("java QWholesaler broker username password");
         return;
      }
      
      QWholesaler wholesaler = new QWholesaler(broker, username, password);
      
      try  {
         // Read all standard input and send it as a message
         java.io.BufferedReader stdin = new java.io.BufferedReader
            (new java.io.InputStreamReader( System.in ) );
         System.out.println ("Enter: Item, Old Price, New Price");
         System.out.println("
e.g. Bowling Shoes, 100.00, 55.00");

         while ( true ){
            String dealDesc = stdin.readLine( );
            if (dealDesc != null && dealDesc.length( ) > 0){
               // Parse the deal description
               StringTokenizer tokenizer = 
               new StringTokenizer(dealDesc,",") ;
                  String itemDesc = tokenizer.nextToken( );
                  String temp = tokenizer.nextToken( );
                  float oldPrice = 
                    Float.valueOf(temp.trim()).floatValue( );
                  temp = tokenizer.nextToken( );
                  float newPrice = 
                    Float.valueOf(temp.trim()).floatValue( );
                  
               wholesaler.publishPriceQuotes(dealDesc,username,
                                          itemDesc, oldPrice,newPrice);
            } else {
                wholesaler.exit( );
            }
         }
      } catch ( java.io.IOException ioe ){
         ioe.printStackTrace( );
      }
   }
}

The job of QWholesaler is to establish a publisher for broadcasting the price quotes, and to establish a QueueReceiver for consuming the "Buy Order" messages. These objects are created in the constructor:

public QWholesaler(String broker, String username, String password){
	...
        tSession = 
            tConnect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
        qSession = 
            qConnect.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
             
        hotDealsTopic = (Topic)jndi.lookup("Hot Deals");
        receiveQueue = (Queue)jndi.lookup("SampleQ1"); // Buy Order

        tPublisher = tSession.createPublisher(hotDealsTopic);

        QueueReceiver qReceiver = qSession.createReceiver(receiveQueue);
        qReceiver.setMessageListener(this);
        qConnect.start( );
    ...           
   }

Here we are creating a QueueSession using the createQueueSession( ) method on the QueueConnection object. We are creating a receiver using the createReceiver( ) method on the QueueSession object. These methods are identical to their counterparts in the pub/sub domain.

There is very little left to explain. The publishPricesQuotes( ) method works exactly as it did in Chapter 4, with the exception that it now places a Queue in the JMSReplyTo header:

private void publishPriceQuotes(String dealDesc, String username, 
                                   String itemDesc,  float oldPrice, 
                                   float newPrice){
    ...
                   
    message.setJMSReplyTo(receiveQueue);
                   
    tPublisher.publish(
        message,
        javax.jms.DeliveryMode.PERSISTENT,
        javax.jms.Message.DEFAULT_PRIORITY,
        1800000);
    ...
}

The onMessage( ) method also works exactly as it did before. The programming model is the same whether we use a Queue or a Topic. Likewise we could have used the alternate QueueReceiver.receive( ) method to do a synchronous receive. This method is the same as the TopicSubscriber.recieve( ) method discussed in Chapter 4.

The similarity between this code and the code in Chapter 4 is the beauty of JMS. Even though there are two separate messaging domains, the interfaces follow the same idiom, making it easier to remember and easy to change from one domain to the other.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset