As discussed in the previous sections, interfaces extending core JMS interfaces for Topic help build publish-subscribe components.
Please remember that, to be able to execute the following example programs, you need the message queue environment.
The following is a sample program to publish messages to the Publish-Subscribe topic:
package pubsub;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
/**
* Program to publish messages to the topic
* @author Raja
*
*/
public class PubSubTopicPublisher {
public static void main(String[] args) {
PubSubTopicPublisher publisher = new PubSubTopicPublisher();
publisher.publishMultipleMessages();
}
public void publishMultipleMessages() {
BufferedReader inlineReader = new BufferedReader(
new InputStreamReader(System.in));
try {
//Prompt for the JNDI topic connection factory name
System.out.println("Enter the Publish Subscribe
Topic Connection Factory name:");
String connectionFactoryName = inlineReader.readLine();
// Prompt for topic name for the Pub Sub
System.out.println("Enter the Publish Subscribe Topic name:");
String pubsubTopicName = inlineReader.readLine();
// Look up for the administered objects of Pub Sub
InitialContext context = new InitialContext();
TopicConnectionFactory topicConnFactory =
(TopicConnectionFactory)
context.lookup(connectionFactoryName);
Topic pubsubTopic = (Topic) context.lookup(pubsubTopicName);
context.close();
// Create the JMS objects from administered objects
TopicConnection topicConnection =
topicConnFactory.createTopicConnection();
TopicSession topicSession =
topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicPublisher topicPublisher =
topicSession.createPublisher(pubsubTopic);
// Publish multiple text messages entered one after the other
String messageContent = null;
while (true) {
System.out.println("Enter the new message to send or 'abandon'
to exit the program:");
messageContent = inlineReader.readLine();
if ("abandon".equals(messageContent))
break;
TextMessage textMessage =
topicSession.createTextMessage(messageContent);
topicPublisher.publish(textMessage);
}
// Clean Up
System.out.println("Messages Successfully posted to
the queue...");
inlineReader.close();
topicConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
While the preceding program helps publish messages to the Publish-Subscribe Topic, the following program is used to subscribe to the Publish-Subscribe Topic, which keeps receiving messages related to the Topic until the quit command is given:
package pubsub;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
/**
* Program to subscribe for messages from the topic
*
* @author Raja
*
*/
public class PubSubTopicSubscriber implements MessageListener {
private boolean quitMessageSubscription = false;
public static void main(String[] args) {
PubSubTopicSubscriber pubSubTopicSubscriber =
new PubSubTopicSubscriber();
pubSubTopicSubscriber.subscribeTopic();
}
public void subscribeTopic() {
BufferedReader inlineReader =
new BufferedReader(new InputStreamReader(System.in));
try {
// Prompt for the JNDI topic connection factory name
System.out.println("Enter the Publish Subscribe
Topic Connection Factory name:");
String connectionFactoryName = inlineReader.readLine();
// Prompt for topic name for the Pub Sub
System.out.println("Enter the Publish Subscribe Topic name:");
String pubsubTopicName = inlineReader.readLine();
inlineReader.close();
// Look up for the administered objects of Pub Sub
InitialContext context = new InitialContext();
TopicConnectionFactory topicConnFactory =
(TopicConnectionFactory)
context.lookup(connectionFactoryName);
Topic pubsubTopic = (Topic) context.lookup(pubsubTopicName);
context.close();
// Create the JMS objects from administered objects
TopicConnection topicConnection =
topicConnFactory.createTopicConnection();
TopicSession topicSession =
topicConnection.createTopicSession(
false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber topicSubscriber =
topicSession.createSubscriber(pubsubTopic);
topicSubscriber.setMessageListener(this);
topicConnection.start();
// Keep listening to the pub sub until
// the Quit subscription command
while (!quitMessageSubscription) {
Thread.sleep(1000);
}
// Clean Up
System.out.println("Messages successfully listened so far,
Quitting Subscription!");
topicConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public void onMessage(Message message) {
try {
String messageContent = ((TextMessage) message).getText();
System.out.println(messageContent);
if ("quit".equals(messageContent))
quitMessageSubscription = true;
} catch (JMSException e) {
e.printStackTrace();
quitMessageSubscription = true;
}
}
}