Publish-Subscribe (topic) programming

As discussed in the previous sections, interfaces extending core JMS interfaces for Topic help build publish-subscribe components.

Please remember that, to be able to execute the following example programs, you need the message queue environment.

The following is a sample program to publish messages to the Publish-Subscribe topic:

package pubsub;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.InitialContext;

/**
* Program to publish messages to the topic
* @author Raja
*
*/
public class PubSubTopicPublisher {

public static void main(String[] args) {
PubSubTopicPublisher publisher = new PubSubTopicPublisher();
publisher.publishMultipleMessages();
}

public void publishMultipleMessages() {
BufferedReader inlineReader = new BufferedReader(
new InputStreamReader(System.in));
try {

//Prompt for the JNDI topic connection factory name
System.out.println("Enter the Publish Subscribe
Topic Connection Factory name:");
String connectionFactoryName = inlineReader.readLine();

// Prompt for topic name for the Pub Sub
System.out.println("Enter the Publish Subscribe Topic name:");
String pubsubTopicName = inlineReader.readLine();

// Look up for the administered objects of Pub Sub
InitialContext context = new InitialContext();
TopicConnectionFactory topicConnFactory =
(TopicConnectionFactory)
context.lookup(connectionFactoryName);
Topic pubsubTopic = (Topic) context.lookup(pubsubTopicName);
context.close();

// Create the JMS objects from administered objects
TopicConnection topicConnection =
topicConnFactory.createTopicConnection();
TopicSession topicSession =
topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicPublisher topicPublisher =
topicSession.createPublisher(pubsubTopic);

// Publish multiple text messages entered one after the other
String messageContent = null;
while (true) {
System.out.println("Enter the new message to send or 'abandon'
to exit the program:");
messageContent = inlineReader.readLine();
if ("abandon".equals(messageContent))
break;
TextMessage textMessage =
topicSession.createTextMessage(messageContent);
topicPublisher.publish(textMessage);
}

// Clean Up
System.out.println("Messages Successfully posted to
the queue...");
inlineReader.close();
topicConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

While the preceding program helps publish messages to the Publish-Subscribe Topic, the following program is used to subscribe to the Publish-Subscribe Topic, which keeps receiving messages related to the Topic until the quit command is given:

package pubsub;

import java.io.BufferedReader;
import java.io.InputStreamReader;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;

/**
* Program to subscribe for messages from the topic
*
* @author Raja
*
*/
public class PubSubTopicSubscriber implements MessageListener {
private boolean quitMessageSubscription = false;

public static void main(String[] args) {
PubSubTopicSubscriber pubSubTopicSubscriber =
new PubSubTopicSubscriber();
pubSubTopicSubscriber.subscribeTopic();
}

public void subscribeTopic() {
BufferedReader inlineReader =
new BufferedReader(new InputStreamReader(System.in));
try {

// Prompt for the JNDI topic connection factory name
System.out.println("Enter the Publish Subscribe
Topic Connection Factory name:");
String connectionFactoryName = inlineReader.readLine();

// Prompt for topic name for the Pub Sub
System.out.println("Enter the Publish Subscribe Topic name:");
String pubsubTopicName = inlineReader.readLine();
inlineReader.close();

// Look up for the administered objects of Pub Sub
InitialContext context = new InitialContext();
TopicConnectionFactory topicConnFactory =
(TopicConnectionFactory)
context.lookup(connectionFactoryName);
Topic pubsubTopic = (Topic) context.lookup(pubsubTopicName);
context.close();

// Create the JMS objects from administered objects
TopicConnection topicConnection =
topicConnFactory.createTopicConnection();
TopicSession topicSession =
topicConnection.createTopicSession(
false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber topicSubscriber =
topicSession.createSubscriber(pubsubTopic);
topicSubscriber.setMessageListener(this);
topicConnection.start();

// Keep listening to the pub sub until
// the Quit subscription command
while (!quitMessageSubscription) {
Thread.sleep(1000);
}

// Clean Up
System.out.println("Messages successfully listened so far,
Quitting Subscription!");
topicConnection.close();
} catch (Exception e) {
e.printStackTrace();
}
}

public void onMessage(Message message) {
try {
String messageContent = ((TextMessage) message).getText();
System.out.println(messageContent);
if ("quit".equals(messageContent))
quitMessageSubscription = true;
} catch (JMSException e) {
e.printStackTrace();
quitMessageSubscription = true;
}
}
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset