Message acknowledgement

So far, we have learned how messages can be delivered from one point to another. We have used queues and topics to send the messages. Once the messages are consumed by the consumer, the messages will be removed from the messaging provider. Once the message is processed by the consumer, it can send an acknowledgement back to the messaging provider, so that the provider takes the messages off the queue or the topics. There are different ways to send this acknowledgement:

  • Auto acknowledgeIn auto acknowledgement, when the message is received by the client, the message will be removed from the JMS provider. The message will be automatically removed from the queue or the topic once it is processed by the consumer.
  • Duplicates OKIn duplicates OK mode, the consumer can receive duplicate messages.
  • Client acknowledgment: In client acknowledgment mode, the client or the consumer has to acknowledge the message once it is received. Until then, the message won't be removed from the queue or topic. The provider will resend the message until it receives acknowledgement from the consumer that it has received the message.

Let's see this in action. We previously created the point-to-point messaging model. Consider this module again. Here, we have the Producer class, which produces a message for the queue:

class Producer { 
fun sendMessage(message: String): String {
//…
}
}

The Consumer class consumes the message from the queue:

class Consumer {
fun receiveMessage(): Message {
//….
}
}

We had test cases to invoke each of these classes to produce and consume the messages:

class ProducerTest {
@Test
fun sendMessageTest() {
//…
val message = producer.sendMessage("TEST MESSAGE")
Assert.assertEquals("Message sent", message)
}
}
// Consumes the messages
class ConsumerTest {
@Test
fun receiveMessageTest() {
//..
val message = consumer.receiveMessage()
Assert.assertNotNull(message)
Assert.assertTrue(message is TextMessage)
}
}

By default, when we create a JMSContext using ConnectionFactory, sessionMode is set to AUTO_ACKNOWLEDGE

connectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE)

This means that once a message is produced by a producer and consumed by a consumer, it is automatically acknowledged to the messaging provider and the message will be removed from the queue.

If we re-run our ConsumerTest alone, it will wait for the message until there is a message in the queue:

When the producer produces the message, the message will be consumed by the consumer and the message will be removed from the queue. When the consumer starts processing the message, it is automatically acknowledged and will be removed from the queue.

Let's now change JMSContext to DUPS_OK_ACKNOWLEDGE and see what happens. We will run ProducerTest and ConsumerTest once again. Notice that the message will be consumed by the receiver and the tests will pass.

Let's now re-run the ConsumerTest alone. The output will be as follows:

This time, the ConsumerTest didn't block. It got the message and the test passed. If we run it again, the consumer would still receive the message as the sessionMode is set to duplicates_ok. Until the receiver sends acknowledgement, the provider keeps the message in the queue.

If we add the acknowledgement in the code once the message has been processed, after the consumer consumes the message, it will be removed from the queue. If we execute ConsumerTest again, it will wait for the next message to be produced in the queue.

In client acknowledgement, the message receiving party has to send acknowledgement after it processes the message. After acknowledgement, the message will be removed from the queue or the topic. Acknowledging the message tells the provider that it has processed the message and not to send it again.  If we don't acknowledge the message, it will be there in the provider until we acknowledge it:

class Consumer {
@Inject
private lateinit var initialContext: InitialContext

fun receiveMessage(): Message {
val queue = initialContext.lookup("jms/PointToPointQueue") as Queue
val connectionFactory = initialContext.lookup("jms/__defaultConnectionFactory")
as ConnectionFactory
val jmsContext = connectionFactory.createContext(JMSContext.CLIENT_ACKNOWLEDGE)
val textMessage = jmsContext.createConsumer(queue)
.receive()
return textMessage
}
}
In this case, acknowledging the message after processing it is the responsibility of the receiver. We created sessionMode as CLIENT_ACKNOWLEDGE only in the Consumer class.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset