Enabling transactions in the point-to-point messaging model

Consider the point-to-point messaging model that we created earlier with the Message type. We can enable transactions in the Producer class with JMSContext.SESSION_TRANSACTED as follows:

val jmsContext = connectionFactory
.createContext(JMSContext.SESSION_TRANSACTED)

Our message Producer class looks as follows:

class Producer {
@Inject
private lateinit var initialContext: InitialContext

fun sendMessage(message: String): String {
try {
val queue = initialContext
.lookup("jms/PointToPointQueue") as Queue
val connectionFactory = initialContext
.lookup("jms/__defaultConnectionFactory")
as ConnectionFactory
val jmsContext = connectionFactory
.createContext(JMSContext.SESSION_TRANSACTED)
val textMessage = jmsContext.createTextMessage(message)
jmsContext.createProducer()
.send(queue, textMessage)
return "Message sent"
} catch (e: NamingException) {
println("unable to load a resource " + e.message)
return "Unable to deliver a message"
}
}
}

Note that we have created the context in transaction mode but we are not committing the message. This means the message has not yet reached the messaging provider, so our ConsumerTest blocks while waiting for the message.

This is shown in the following screenshot:

Let's invoke the commit() function in the Producer class, as follows:

 jmsContext.commit()

The ConsumerTest class, which was waiting for the message, has now received the message:

We'll also take a look at the Consumer class:

class Consumer {
@Inject
private lateinit var initialContext: InitialContext
fun receiveMessage(): Message {

val queue = initialContext
.lookup("jms/PointToPointQueue") as Queue
val connectionFactory = initialContext
.lookup("jms/__defaultConnectionFactory") as
ConnectionFactory
val jmsContext = connectionFactory
.createContext(JMSContext.SESSION_TRANSACTED)
val textMessage = jmsContext.createConsumer(queue)
.receive()
return textMessage
}

Note that we have again created the context with the transaction enabled, and we are not committing the message. This means that we are not acknowledging the messaging provider after processing the message. As a result, the message will continue to exist in the provider. If we run ConsumerTest again, we continue to get the following result until we acknowledge the message:

Let's invoke commit() after receiving the message:

class Consumer {
@Inject
private lateinit var initialContext: InitialContext

fun receiveMessage(): Message {

val queue = initialContext
.lookup("jms/PointToPointQueue") as Queue
val connectionFactory = initialContext
.lookup("jms/__defaultConnectionFactory")
as ConnectionFactory
val jmsContext = connectionFactory
.createContext(JMSContext.SESSION_TRANSACTED)
val textMessage = jmsContext.createConsumer(queue)
.receive()
jmsContext.commit()
return textMessage
}
}

Run the ConsumerTest class again and notice that this time, the commit() function sends an acknowledgement to the provider that the message has been received. Any further calls on ConsumerTest will block and wait for the message to be produced:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset