This chapter focuses on the publish-and-subscribe (pub/sub) messaging model that was introduced in Chapter 2. The pub/sub messaging model allows a message producer (also called a publisher) to broadcast a message to one or more consumers (called subscribers). There are three important aspects of the pub/sub model:
Messages are pushed to consumers, which means that consumers are delivered messages without having to request them. Messages are exchanged through a virtual channel called a topic. A topic is a destination where producers can publish, and subscribers can consume, messages. Messages delivered to a topic are automatically pushed to all qualified consumers.
As in enterprise messaging in general, there is no coupling of the producers to the consumers. Subscribers and publishers can be added dynamically at runtime, which allows the system to grow or shrink in complexity over time.
Every client that subscribes to a topic receives its own copy of messages published to that topic. A single message produced by one publisher may be copied and distributed to hundreds, or even thousands of subscribers.
In Chapter 2 you learned the basics of the pub/sub model by developing a simple chat client. In this chapter we will build on those lessons and examine more advanced features of this model, including guaranteed messaging, topic-based addressing, durable subscriptions, request-reply, and temporary topics.
In this chapter we abandon the simple chat example for a more complex and real-world Business-to-Business (B2B) scenario. In our new example, a wholesaler wants to distribute price information to retailers, and the retailers want to respond by generating orders. We'll implement this scenario using the publish-and-subscribe model: the wholesaler will publish messages containing new prices and hot deals, and the retailers will respond by creating their own messages to order stock.
This scenario is typical of many Business-to-Business operations. We call the clients retailers and wholesalers, but these names are really only for convenience. There's little difference between our wholesaler/retailer scenario and a stock broker broadcasting stock prices to investors, or a manufacturer broadcasting bid requests to multiple suppliers. The fact that we use a retailer and a wholesaler to illustrate our example is much less important than the way we apply JMS.
Our simple trading system is implemented by two classes, both of
which are JMS clients:
Wholesaler
and
Retailer
.
In the interest of keeping the code simple, we won't implement
a fancy user interface; our application has a rudimentary
command-line user interface.
Before looking at the code, let's look at how the application
works. As with the Chat
application, the
Wholesaler
class includes a main(
)
method so it can be run as a standalone
Java application. It's executed from the command line as
follows:
java chap4.B2B.Wholesaler localhost username password
username and password are
the authentication information for the client. The
Retailer
class can be executed in the same manner:
java chap4.B2B.Retailer localhost username password
Start your JMS server, then run one instance of a
Wholesaler
client and a
Retailer
client in separate command windows. In
the Wholesaler
client you are prompted to enter an
item description, an old price, and a new price. Enter the following
as shown:
Bowling Shoes, 100.00, 55.00
Upon hitting the Enter key, you should see the
Retailer
application display information on the
screen indicating that it has received a price change notice. You
should then see the Wholesaler
indicating that it
has received a "buy" order from the
Retailer
. Here's the complete interaction
with the Wholesaler
and the
Retailer
:[1]
java chap4.B2B.Wholesaler localhost WHOLESALER passwd1
Enter: Item, Old Price, New Price e.g., Bowling Shoes, 100.00, 55.00Bowling Shoes, 100.00, 55.00
Order received - 1000 Bowling Shoes from DurableRetailer -----------------------java chap4.B2B.Retailer localhost RETAILER passwd2
Retailer application started. Received Hot Buy: Bowling Shoes, 100.00, 55.00 Buying 1000 Bowling Shoes
Here's what happened. The Wholesaler
publishes a price quotation on a topic, "Hot Deals,"
which is intended for one or more Retailer
s. The
Retailer
s subscribe to the "Hot Deals"
topic in order to receive price quotes. The
Retailer
application has no interaction with a
live user. Instead, it has an autoBuy(
)
method that examines the old price and the new price. If the new
price represents a reduction of greater than ten percent, the
Retailer
sends a message back to the
Wholesaler
on the "Buy Order" topic,
telling it to purchase 1,000 items. In JMS terms, the
Wholesaler
is a
producer
of the "Hot Deals" topic and a
consumer
of the "Buy Order" topic. Conversely, the
Retailer
is a consumer of the "Hot
Deals" topic and a producer of the "Buy Order"
topic, as illustrated in Figure 4.1.
The
rest of this chapter examines the source code for the
Wholesaler
and Retailer
classes, and covers several advanced subjects related to the pub/sub
messaging model.
After the listing, we will take a brief tour of the methods in this
class, and discuss their responsibilities. We will go into detail
about the implementation later in this chapter. Now, here is the
complete definition of the Wholesaler
class, which
is responsible for publishing items to the "Hot Deals"
topic and receiving "Buy Orders" on those deals from
retailers:
public class Wholesaler implements javax.jms.MessageListener{ private javax.jms.TopicConnection connect = null; private javax.jms.TopicSession pubSession = null; private javax.jms.TopicSession subSession = null; private javax.jms.TopicPublisher publisher = null; private javax.jms.TopicSubscriber subscriber = null; private javax.jms.Topic hotDealsTopic = null; private javax.jms.TemporaryTopic buyOrdersTopic = null; public Wholesaler(String broker, String username, String password){ try { Properties env = new Properties( ); // ... specify the JNDI properties specific to the vendor InitialContext jndi = new InitialContext(env); TopicConnectionFactory factory = (TopicConnectionFactory)jndi.lookup(broker); connect = factory.createTopicConnection (username, password); pubSession = connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); subSession = connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); hotDealsTopic = (Topic)jndi.lookup("Hot Deals"); publisher = pubSession.createPublisher(hotDealsTopic); buyOrdersTopic = subSession.createTemporaryTopic( ); subscriber = subSession.createSubscriber(buyOrdersTopic); subscriber.setMessageListener(this); connect.start( ); } catch (javax.jms.JMSException jmse){ jmse.printStackTrace( ); System.exit(1); } catch (javax.naming.NamingException jne){ jne.printStackTrace( ); System.exit(1); } } private void publishPriceQuotes(String dealDesc, String username, String itemDesc, float oldPrice, float newPrice){ try { javax.jms.StreamMessage message = pubSession.createStreamMessage( ); message.writeString(dealDesc); message.writeString(itemDesc); message.writeFloat(oldPrice); message.writeFloat(newPrice); message.setStringProperty("Username", username); message.setStringProperty("Itemdesc", itemDesc); message.setJMSReplyTo(buyOrdersTopic); publisher.publish( message, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, 1800000); } catch ( javax.jms.JMSException jmse ){ jmse.printStackTrace( ); } } public void onMessage( javax.jms.Message message){ try { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText( ); System.out.println(" Order received - "+text+ " from " + message.getJMSCorrelationID( )); } catch (java.lang.Exception rte){ rte.printStackTrace( ); } } public void exit( ){ try { connect.close( ); } catch (javax.jms.JMSException jmse){ jmse.printStackTrace( ); } System.exit(0); } public static void main(String argv[]) { String broker, username, password; if (argv.length == 3){ broker = argv[0]; username = argv[1]; password = argv[2]; } else { System.out.println("Invalid arguments. Should be: "); System.out.println("java Wholesaler broker username password"); return; } Wholesaler wholesaler = new Wholesaler(broker,username,password); try { // Read all standard input and send it as a message. java.io.BufferedReader stdin = new java.io.BufferedReader (new java.io.InputStreamReader( System.in ) ); System.out.println ("Enter: Item, Old Price, New Price"); System.out.println(" e.g., Bowling Shoes, 100.00, 55.00"); while ( true ){ String dealDesc = stdin.readLine( ); if (dealDesc != null && dealDesc.length( ) > 0){ // Parse the deal description StringTokenizer tokenizer = new StringTokenizer(dealDesc,",") ; String itemDesc = tokenizer.nextToken( ); String temp = tokenizer.nextToken( ); float oldPrice = Float.valueOf(temp.trim()).floatValue( ); temp = tokenizer.nextToken( ); float newPrice = Float.valueOf(temp.trim()).floatValue( ); wholesaler.publishPriceQuotes(dealDesc,username, itemDesc,oldPrice,newPrice); } else { wholesaler.exit( ); } } } catch ( java.io.IOException ioe ){ ioe.printStackTrace( ); } } }
The main(
)
method creates an instance of the
Wholesaler
class, passing it the information it
needs to set up its publishers and subscribers.
In the Wholesaler
class's
constructor, JNDI is used to obtain the
"Hot Deals" topic identifier, which is then used to
create a publisher. Most of this should look familiar to you;
it's similar in many ways to the Chat
application, except for the creation of a temporary topic, which is
discussed in more detail later in this section.
Once the Wholesaler
is instantiated, the
main( )
method continues to monitor the command
line for new "Hot Deals." When a "Hot Deal"
is entered at the command prompt, the main( )
method parses the information and passes it to the
Wholesaler
instance via the
publishPriceQuotes(
)
method.
The publishPriceQuotes( )
method is responsible
for publishing messages containing information about price quotes to
the "Hot Deals" topic.
The onMessage( )
method receives messages from clients
responding to deals published on the "Hot Deals" topic.
The contents of these messages are simply printed to the command
line.
Here is the complete definition of the
Retailer
class,
which subscribes to the "Hot Deals" topic and responds
with "Buy Orders" on attractive deals:
public class Retailer implements javax.jms.MessageListener{ private javax.jms.TopicConnection connect = null; private javax.jms.TopicSession session = null; private javax.jms.TopicPublisher publisher = null; private javax.jms.Topic hotDealsTopic = null; public Retailer( String broker, String username, String password){ try { Properties env = new Properties( ); // ... specify the JNDI properties specific to the vendor InitialContext jndi = new InitialContext(env); TopicConnectionFactory factory = (TopicConnectionFactory)jndi.lookup(broker); connect = factory.createTopicConnection(username, password); connect.setClientID("DurableRetailer"); session = connect.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); hotDealsTopic = (Topic)jndi.lookup("Hot Deals"); javax.jms.TopicSubscriber subscriber = session.createDurableSubscriber(hotDealsTopic, "Hot Deals Subscription"); subscriber.setMessageListener(this); connect.start( ); } catch (javax.jms.JMSException jmse){ jmse.printStackTrace( ); System.exit(1); } catch (javax.naming.NamingException jne){ jne.printStackTrace( ); System.exit(1); } } public void onMessage(javax.jms.Message aMessage){ try { autoBuy(aMessage); } catch (java.lang.RuntimeException rte){ rte.printStackTrace( ); } } private void autoBuy (javax.jms.Message message){ int count = 1000; try { StreamMessage strmMsg = (StreamMessage)message; String dealDesc = strmMsg.readString( ); String itemDesc = strmMsg.readString( ); float oldPrice = strmMsg.readFloat( ); float newPrice = strmMsg.readFloat( ); System.out.println("Received Hot Buy :"+dealDesc); // If price reduction is greater than 10 percent, buy if (newPrice == 0 || oldPrice / newPrice > 1.1){ System.out.println(" Buying " + count +" "+ itemDesc); TextMessage textMsg = session.createTextMessage( ); textMsg.setText(count + " " + itemDesc ); javax.jms.Topic buytopic = (javax.jms.Topic)message.getJMSReplyTo( ); publisher = session.createPublisher(buytopic); textMsg.setJMSCorrelationID("DurableRetailer"); publisher.publish( textMsg, javax.jms.DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, 1800000); } else { System.out.println (" Bad Deal- Not buying."); } } catch (javax.jms.JMSException jmse){ jmse.printStackTrace( ); } } private void exit(String s){ try { if ( s != null && s.equalsIgnoreCase("unsubscribe")) { subscriber.close( ); session.unsubscribe("Hot Deals Subscription"); } connect.close( ); } catch (javax.jms.JMSException jmse){ jmse.printStackTrace( ); } System.exit(0); } public static void main(String argv[]) { String broker, username, password; if (argv.length == 3){ broker = argv[0]; username = argv[1]; password = argv[2]; } else { System.out.println("Invalid arguments. Should be: "); System.out.println ("java Retailer broker username password"); return; } Retailer retailer = new Retailer(broker, username, password); try { System.out.println(" Retailer application started. "); // Read all standard input and send it as a message. java.io.BufferedReader stdin = new java.io.BufferedReader ( new java.io.InputStreamReader( System.in ) ); while ( true ){ String s = stdin.readLine( ); if ( s == null )retailer.exit(null); else if ( s.equalsIgnoreCase("unsubscribe") ) retailer.exit ( s ); } } catch ( java.io.IOException ioe ){ ioe.printStackTrace( ); } } }
The
main( )
method of
Retailer
is much like the main(
)
method of
Wholesaler
. It creates an instance of the
Retailer
class and passes it the information it
needs to set up its publishers and subscribers.
The constructor of the
Retailer
class is also similar to that of the
Wholesaler
class, except that it creates a
durable subscription using the "Hot
Deals" topic. Durable subscriptions will be discussed in more
detail later in this section.
Once the Retailer
is instantiated, the
main( )
method uses the readLine(
)
method as a way of blocking program execution in order to monitor for
message input.
The publishPriceQuotes(
)
method is responsible for publishing messages
containing information about price quotes to the "Hot
Deals" topic.
The onMessage( )
method receives messages from the
Wholesaler
client, then delegates its work to the
autoBuy( )
method. The autoBuy( )
method examines the message, determines whether the price change is
significant, and arbitrarily orders 1000 items. It orders the items
by publishing a persistent message back to the
Wholesaler
client's temporary topic, using
the
JMSCorrelationID
as a way of identifying itself. We will examine persistent publishing
and temporary topics in the next section.
[1]
WHOLESALER
and RETAILER
are usernames you have set up when configuring your JMS server.
passwd1
and passwd2
are the
passwords you've assigned to those usernames. If you are using
an evaluation version of a JMS provider, it may not be necessary to
set up usernames and passwords; check your vendor's
documentation for more information.