Let's rethink our wholesaler/retailer scenario in terms of the distinction between the two message models. The pub/sub model is well suited for sending price quotes, since that is naturally a one-to-many broadcast. However, when the retailer responds with a "buy" order, it is more appropriate to use a point-to-point queue. In the real world, retailers naturally deal with many wholesalers, and you would only send a purchase order to the wholesaler that offered the quote.
From the user's perspective, the QWholesaler
and QRetailer
examples that we'll develop
now are functionally equivalent to the Wholesaler
and Retailer
examples introduced in Chapter 4. The difference lies in the use of the
point-to-point queue for responses to price quotes. If you wish to
see these classes in action, start your JMS provider and execute the
following commands, each in a separate command window:
java chap5.B2B.Q
Wholesalerlocalhost username password
java chap5.B2B.Q
Retailerlocalhost username password
Here is the listing for the
QRetailer
class in its entirety. Later, we will examine this
class
in detail:
import java.util.StringTokenizer; import java.util.Properties; import javax.naming.InitialContext; import javax.jms.TopicConnectionFactory; import javax.jms.QueueConnectionFactory; import javax.jms.Topic; import javax.jms.Queue; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; public class QRetailer implements javax.jms.MessageListener { private javax.jms.QueueConnection qConnect = null; private javax.jms.QueueSession qSession = null; private javax.jms.QueueSender qSender = null; private javax.jms.TopicConnection tConnect = null; private javax.jms.TopicSession tSession = null; private javax.jms.Topic hotDealsTopic = null; private javax.jms.TopicSubscriber tsubscriber = null; private static boolean useJNDI = false; private static String uname = null; public QRetailer( String broker, String username, String password){ try { TopicConnectionFactory tFactory = null; QueueConnectionFactory qFactory = null; InitialContext jndi = null; uname = username; Properties env = new Properties( ); // ... specify the JNDI properties specific to the vendor jndi = new InitialContext(env); tFactory = (TopicConnectionFactory)jndi.lookup(broker); qFactory = (QueueConnectionFactory)jndi.lookup(broker); tConnect = tFactory.createTopicConnection (username, password); qConnect = qFactory.createQueueConnection (username, password); tConnect.setClientID(username); qConnect.setClientID(username); tSession = tConnect.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); qSession = qConnect.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); hotDealsTopic = (Topic)jndi.lookup("Hot Deals"); tsubscriber = tSession.createDurableSubscriber(hotDealsTopic, "Hot Deals Subscription"); tsubscriber.setMessageListener(this); tConnect.start( ); } catch (javax.jms.JMSException jmse){ jmse.printStackTrace( ); System.exit(1); } catch (javax.naming.NamingException jne){ jne.printStackTrace( ); System.exit(1); } } public void onMessage(javax.jms.Message aMessage){ try { autoBuy(aMessage); } catch (java.lang.RuntimeException rte){ rte.printStackTrace( ); } } private void autoBuy (javax.jms.Message message){ try { StreamMessage strmMsg = (StreamMessage)message; String dealDesc = strmMsg.readString( ); String itemDesc = strmMsg.readString( ); float oldPrice = strmMsg.readFloat( ); float newPrice = strmMsg.readFloat( ); System.out.println("Received Hot Buy: "+dealDesc); // If price reduction is greater than 10 percent, buy if (newPrice == 0 || oldPrice / newPrice > 1.1) { int count = (int)(java.lang.Math.random( )*(double)1000); System.out.println(" Buying " + count +" "+ itemDesc); TextMessage textMsg = tSession.createTextMessage( ); textMsg.setText(count + " " + itemDesc ); textMsg.setIntProperty("QTY", count); textMsg.setJMSCorrelationID(uname); Queue buyQueue = (Queue)message.getJMSReplyTo( ); qSender = qSession.createSender(buyQueue); qSender.send( textMsg, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, 1800000); } else { System.out.println (" Bad Deal. Not buying"); } } catch (javax.jms.JMSException jmse){ jmse.printStackTrace( ); } } private void exit(String s){ try { if ( s != null && s.equalsIgnoreCase("unsubscribe")) { tsubscriber.close( ); tSession.unsubscribe("Hot Deals Subscription"); } tConnect.close( ); qConnect.close( ); } catch (javax.jms.JMSException jmse){ jmse.printStackTrace( ); } System.exit(0); } public static void main(String argv[]) { String broker, username, password; if (argv.length == 3){ broker = argv[0]; username = argv[1]; password = argv[2]; } else { System.out.println("Invalid arguments. Should be: "); System.out.println ("java QRetailer broker username password"); return; } QRetailer retailer = new QRetailer(broker, username, password); try { System.out.println(" Retailer application started. "); // Read all standard input and send it as a message java.io.BufferedReader stdin = new java.io.BufferedReader ( new java.io.InputStreamReader( System.in ) ); while ( true ){ String s = stdin.readLine( ); if ( s == null )retailer.exit(null); else if ( s.equalsIgnoreCase("unsubscribe") ) retailer.exit ( s ); } } catch ( java.io.IOException ioe ){ ioe.printStackTrace( ); } } }
Now let's look at the code in detail. A session can either be a
QueueSession
(point-to-point) or a
TopicSession
(publish/subscribe). It cannot be
both at the same time. Similarly, a queue can be either a
QueueConnection
or a
TopicConnection
. Therefore we create a connection
and a session for each model in the
QRetailer
's
constructor:
public QRetailer( String broker, String username, String password){ ... tFactory = (TopicConnectionFactory)jndi.lookup(broker);qFactory = (QueueConnectionFactory)jndi.lookup(broker);
tConnect = tfactory.createTopicConnection (username, password);qConnect = qfactory.createQueueConnection (username, password);
tConnect.setClientID(username);qConnect.setClientID(username);
tSession = tConnect.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);qSession = qConnect.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
...
The autoBuy( )
method
is responsible for sending the "Buy Order" messages. This
method is invoked by the onMessage( )
handler for
the "Hot Deals" topic:
private void autoBuy (javax.jms.Message message){
...
textMsg.setJMSCorrelationID("DurableRetailer");
Queue buyQueue = (Queue)message.getJMSReplyTo( );
qSender = qSession.createSender(buyQueue);
qSender.send( textMsg,
javax.jms.DeliveryMode.PERSISTENT,
javax.jms.Message.DEFAULT_PRIORITY,
1800000);
...
}
The Message
object itself is independent of the
domain being used to transport it. All of the same headers and
properties apply to each. Therefore, we can set a correlation ID,
extract an object to reply to (this time, it's a
Queue
), and use the Queue
to
create a sender. The QueueSender.send(
)
method is identical in form to the
TopicPublisher.publish( )
method. Here is the
interface definition for the QueueSender
object:
public interface QueueSender Extends MessageProducer { QueuegetQueue( )
throws JMSException; voidsend(Message message)
throws JMSException, MessageFormatException, InvalidDestinationException;void send(Message message, int deliveryMode,
int priority, long timeToLive) throws JMSException, MessageFormatException, InvalidDestinationException;void
send(Queue queue, Message message)
throws JMSException, MessageFormatException, InvalidDestinationException;void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException,
MessageFormatException, InvalidDestinationException; }
The QueueSender
object is created as part of the
onMessage( )
handler, using the call to the
createSender( )
method. As an alternative, we
could have created the QueueSender
once in the
QRetailer
's constructor using
null
as a parameter, then specified
buyQueue
as a parameter to the send(
)
operation each time the onMessage( )
method is invoked. For most applications, this would be more
efficient; the example recreates the QueueSender
for clarity.
Here is the complete listing for the
QWholesaler
class:
import java.util.StringTokenizer; import java.util.Properties; import javax.naming.InitialContext; import javax.jms.TopicConnectionFactory; import javax.jms.QueueConnectionFactory; import javax.jms.Topic; import javax.jms.Queue; import javax.jms.QueueReceiver; import javax.jms.Session; import javax.jms.TextMessage; public class QWholesaler implements javax.jms.MessageListener{ private javax.jms.TopicConnection tConnect = null; private javax.jms.TopicSession tSession = null; private javax.jms.TopicPublisher tPublisher = null; private javax.jms.QueueConnection qConnect = null; private javax.jms.QueueSession qSession = null; private javax.jms.Queue receiveQueue = null; private javax.jms.Topic hotDealsTopic = null; private javax.jms.TemporaryTopic buyOrdersTopic = null; public QWholesaler(String broker, String username, String password){ try { TopicConnectionFactory tFactory = null; QueueConnectionFactory qFactory = null; InitialContext jndi = null; Properties env = new Properties( ); // ... specify the JNDI properties specific to the vendor jndi = new InitialContext(env); tFactory = (TopicConnectionFactory)jndi.lookup(broker); qFactory = (QueueConnectionFactory)jndi.lookup(broker); tConnect = tFactory.createTopicConnection (username, password); qConnect = qFactory.createQueueConnection (username, password); tSession = tConnect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); qSession = qConnect.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); hotDealsTopic = (Topic)jndi.lookup("Hot Deals"); receiveQueue = (Queue)jndi.lookup("SampleQ1"); tPublisher = tSession.createPublisher(hotDealsTopic); QueueReceiver qReceiver = qSession.createReceiver(receiveQueue); qReceiver.setMessageListener(this); // Now that setup is complete, start the Connection qConnect.start( ); tConnect.start( ); } catch (javax.jms.JMSException jmse){ jmse.printStackTrace( ); System.exit(1); } catch (javax.naming.NamingException jne){ jne.printStackTrace( ); System.exit(1); } } private void publishPriceQuotes(String dealDesc, String username, String itemDesc, float oldPrice, float newPrice){ try { javax.jms.StreamMessage message = tSession.createStreamMessage( ); message.writeString(dealDesc); message.writeString(itemDesc); message.writeFloat(oldPrice); message.writeFloat(newPrice); message.setStringProperty("Username", username); message.setStringProperty("itemDesc", itemDesc); message.setJMSReplyTo(receiveQueue); tPublisher.publish( message, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, 1800000); } catch ( javax.jms.JMSException jmse ){ jmse.printStackTrace( ); } } public void onMessage( javax.jms.Message message){ try { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText( ); System.out.println("Order received - "+text+ " from " + message.getJMSCorrelationID( )); } catch (java.lang.Exception rte){ rte.printStackTrace( ); } } public void exit( ){ try { tConnect.close( ); qConnect.close( ); } catch (javax.jms.JMSException jmse){ jmse.printStackTrace( ); } System.exit(0); } public static void main(String argv[]) { String broker, username, password; if (argv.length == 3){ broker = argv[0]; username = argv[1]; password = argv[2]; } else { System.out.println("Invalid arguments. Should be: "); System.out.println ("java QWholesaler broker username password"); return; } QWholesaler wholesaler = new QWholesaler(broker, username, password); try { // Read all standard input and send it as a message java.io.BufferedReader stdin = new java.io.BufferedReader (new java.io.InputStreamReader( System.in ) ); System.out.println ("Enter: Item, Old Price, New Price"); System.out.println(" e.g. Bowling Shoes, 100.00, 55.00"); while ( true ){ String dealDesc = stdin.readLine( ); if (dealDesc != null && dealDesc.length( ) > 0){ // Parse the deal description StringTokenizer tokenizer = new StringTokenizer(dealDesc,",") ; String itemDesc = tokenizer.nextToken( ); String temp = tokenizer.nextToken( ); float oldPrice = Float.valueOf(temp.trim()).floatValue( ); temp = tokenizer.nextToken( ); float newPrice = Float.valueOf(temp.trim()).floatValue( ); wholesaler.publishPriceQuotes(dealDesc,username, itemDesc, oldPrice,newPrice); } else { wholesaler.exit( ); } } } catch ( java.io.IOException ioe ){ ioe.printStackTrace( ); } } }
The job of QWholesaler
is to establish a
publisher
for broadcasting the price quotes, and
to establish a QueueReceiver
for consuming the
"Buy Order" messages. These objects are created in the
constructor:
public QWholesaler(String broker, String username, String password){ ... tSession = tConnect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);qSession = qConnect.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
hotDealsTopic = (Topic)jndi.lookup("Hot Deals");receiveQueue = (Queue)jndi.lookup("SampleQ1"); // Buy Order
tPublisher = tSession.createPublisher(hotDealsTopic);QueueReceiver qReceiver = qSession.createReceiver(receiveQueue); qReceiver.setMessageListener(this); qConnect.start( );
... }
Here we are creating a QueueSession
using the
createQueueSession( )
method on the
QueueConnection
object. We are creating a receiver
using the createReceiver( )
method on the
QueueSession
object. These methods are identical
to their counterparts in the pub/sub domain.
There is very little left to explain. The
publishPricesQuotes( )
method works exactly as it
did in Chapter 4, with the exception that it now
places a Queue
in the
JMSReplyTo
header:
private void publishPriceQuotes(String dealDesc, String username, String itemDesc, float oldPrice, float newPrice){ ... message.setJMSReplyTo(receiveQueue); tPublisher.publish( message, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, 1800000); ... }
The onMessage( )
method also works exactly as it
did before. The programming model is the same whether we use a
Queue
or a Topic
. Likewise we
could have used the alternate QueueReceiver.receive(
)
method to do a synchronous receive. This method is the
same as the TopicSubscriber.recieve( )
method
discussed in Chapter 4.
The similarity between this code and the code in Chapter 4 is the beauty of JMS. Even though there are two separate messaging domains, the interfaces follow the same idiom, making it easier to remember and easy to change from one domain to the other.