Tweaking Message Delivery

What happens to messages that end up stuck in queues? Do they just disappear? What is the best way to prevent messages from being dropped silently, without warning? In this chapter, we will answers these questions thoroughly, looking in detail at message time to live (TTL) and dead-letter exchanges and queues. This chapter will also cover how the broker should react if a message cannot be routed to a specific queue using a mandatory flag. Additionally, the chapter will explain policies and the default exchange.

Expect to learn important information about the following topics:

  • Handling dead letters
  • Making delivery mandatory 

Technical requirements

The code files of this chapter can be found on GitHub at https://github.com/PacktPublishing/RabbitMQ-Essentials-Second-Edition/tree/master/Chapter04.

Handling dead letters

Things are going very well at Complete Car (CC). The driver-information message feature is gaining traction as more and more drivers join the company. After a few months of activity, one thing becomes clear: not all taxi drivers log in to the application every day, which leads to messages piling up in taxi inbox queues.

Though the amount of data is not detrimental to the system, the idea of having messages lying around in queues, potentially forever, is not satisfactory. Imagine a taxi driver logging in after a couple of weeks of vacation and being flooded with obsolete messages—this is the negative type of user experience that CC is keen to avoid.

CC decides to address this by specifying a new rule: after one week, any message that is not delivered will be dealt with in one of two ways:

  • It will be emailed to the user if it's an important information message.
  • It will be discarded if it's an information message concerning the current traffic situation or other important information:

Fig 4.1: Email important information messages, discard other messages

The developers look at what is offered in terms of message expiration in RabbitMQ and list the following possible options that can be implemented:

  • Standard AMQP message expiration property for published messages
  • Custom RabbitMQ extension that allows users to define a message TTL per queue
  • Custom RabbitMQ extension that allows users to define a TTL for the queue itself

The first option is interesting because it is a standard AMQP option; however, after reading more about how it is supported in RabbitMQ, it turns out that these messages are only discarded when the message reaches the head, or beginning, of a queue. Even if they have expired, the messages would still sit in the queue, which would defeat the purpose of what they're trying to achieve. CC rules out the last option as well, as they do not want the queue to be deleted. This leaves us with the second option of configuring each taxi inbox queue with a TTL that is enforced by RabbitMQ whether the queue is being consumed or not.

This is all fine and dandy, but what actually happens to messages when they expire? CC wants to consume these important messages in order to email them. So how do we achieve this? This is where RabbitMQ's dead letter exchange (DLX) comes in handy.

A dead letter is a message that can't be delivered, either because the intended target cannot be accessed or because it has expired. In the case of CC, messages that reach their TTL will become dead letters.

RabbitMQ offers the option to automatically route these dead letters to a specific exchange, a so-called DLX. Since CC wants to receive messages sent to this exchange, they must bind a queue to it, consume it, and take care of received messages. This queue acts as a dead letter queue (DLQ), the ultimate destination for dead messages.

The following diagram illustrates the overall DLX architecture that CC intends to roll out:

Fig 4.2: Dead-letter-handling architecture

A message that reaches its TTL is rerouted via the dead letter exchange to the taxi-dlx queue and finally handled by the consumer.

Note that when messages expire, they are published to the DLX using the original routing key they had when they were delivered to their taxi inbox queue. This behavior can be modified as RabbitMQ allows the definition of a specific routing key to use when messages are published to the DLX. The default suits CC, as the original routing key is an interesting bit of information they will use to find out the ID of the taxi. Therefore, the DLX exchange is constructed as a fanout in order to have all messages routed in the DLQ, whatever their original routing key may have been.

The battle plan is ready. It's now time to roll it out!

Refactoring queues

The first step in rolling out this architecture consists of configuring the taxi queues with the desired TTL of one week and a dead letter exchange equal to taxi-dlx.

By using the RabbitMQ extensions to AMQP, this can be achieved by respectively defining the 'x-message-ttl' and "x-dead-letter-exchange" arguments when declaring the queue. Messages published to a queue that expires after the TTL are rerouted to the exchange with the given x-dead-letter-routing-key.

It’s tempting to jump right to the code editor and modify the Ruby code written in Chapter 3, Sending Messages to Multiple Taxi Drivers, by using the following arguments:

# Declare a queue for a taxi inbox 1
queue1 = channel.queue('taxi-inbox.1',
durable: true,
arguments:{
'x-message-ttl'=> 604800000,
'x-dead-letter-exchange'=> 'taxi-dlx',
'x-dead-letter-routing-key'=> 'taxi-inbox.1'
}
)

However, this would be wrong on several levels. The main issue is that the declaration would be changing from a queue with no arguments to one with three arguments. Remember that in Chapter 2, Creating a Taxi Application, queue (or exchange) declaration is idempotent only if all the parameters that are used are the same. Any discrepancy in the declaration yields an exception and will be punished with an immediate channel termination.

Make it a habit to double-check that the same attributes/parameters are always used when declaring existing queues and exchanges. Any difference will cause errors and terminate channels.

The other problem is that this change will only apply when taxi drivers log in. This is when the taxi queue is declared; however, it would not fulfill the requirement to apply the expiration rule to all existing queues independent of user actions. Finally, another thing to consider is that if these properties were configured at the queue declaration level, any change to one of them would require us to delete and recreate all of the queues. Clearly, the TTL and DLX configurations are cross-cutting concerns and should be configured in a more global fashion. Is that even possible?

The answer is yes! RabbitMQ has a simple and elegant solution to this problem in the concept of policies. RabbitMQ supports policies that define specific behaviors, and these policies can be applied to queues or exchanges. Policies are applied not only when a queue or exchange is declared, but also to an existing queue or exchange. 

Both the queue message TTL and dead letter exchange are configurable via policies, but only a single policy can apply to a queue or exchange. Therefore, CC will craft a policy that combines both the TTL and DLX settings and apply it to all taxi inbox queues. This cannot be achieved via the AMQP protocol or by using the RabbitMQ client. Instead, the powerful command-line tools provided by RabbitMQ are the best way to achieve the policies needed.

This strategy to refactor the existing queues is achieved with the following single command-line operation:

$ sudo rabbitmqctl set_policy -p cc-dev-vhost Q_TTL_DLX "taxi.d+" '{"message-ttl":604800000, "dead-letter-exchange":"taxi-dlx"}' 
--apply-to queues

Let's take some time to dissect the preceding command:

  •  sudo rabbitmqctl set_policy: This part of the command uses the set_policy control command.
  • -p cc-dev-vhost: This part of the command applies the message to the development virtual host.
  • Q_TTL_DLX: This part of the command names the policy to make it obvious that it pertains to queue TTL and dead letter exchange.
  • "taxi.d+": This part of the command uses some regex to apply the entire command to the taxi queues only by selecting them by name.
  • '{"message-ttl":604800000, "dead-letter-exchange":"taxi-dlx"}': This part of the command uses a policy definition composed of a TTL of seven days in milliseconds and the name of the DLX.
  • --apply-to queues: This part of the command ensures that this policy is only applied to queues, which is somewhat redundant with the regex, but acts as a safety net because it selects RabbitMQ entities by type instead of name.

Ready to run this command? Not so fast—the "taxi-dlx" exchange must be created and bound to the "taxi-dlq" queue. Applying this policy right now means that there will be seven days available to roll out the missing exchange and queue. Sure, this is plenty of time, but smart developers don't like to work against the clock if they can avoid it.

Instead of running the command right now, take the time to create the infrastructure in charge of dealing with the dead letters and roll it out to the application before applying the "Q_TTL_DLX" policy.

The policies are now set up, and it's time to add some code for the missing exchanges and queues.

Undertaking messages

The necessary infrastructure must be created to deal with expired messages. The dead letter queue needs to be declared, as well as the new dead letter fanout exchange. These need to be bound to each other.

The following needs to be done:

  • Declare the taxi-dlq queue.
  • Declare the taxi-dlx fanout exchange.
  • Bind the taxi-dlq to the taxi-dlx fanout.
  • Create a subscriber of the taxi-dlq queue that consumes and emails the dead letters.

To implement this behavior, simply add the exchange and queue with the following code to create the exchange and bind the queue to it:

  1. Start by declaring two queues with x-message-ttl set to 604800000:
queue1 = channel.queue('taxi-inbox.1', durable: true,
arguments: {'x-message-ttl'=> 604800000, 'x-dead-letter-exchange'=> 'taxi-dlx'})

queue2 = channel.queue('taxi-inbox.2', durable: true,
arguments: {'x-message-ttl'=> 604800000, 'x-dead-letter-exchange'=> 'taxi-dlx'})
  1. Declare a fanout exchange taxi-fanout:
exchange = channel.fanout('taxi-fanout')
  1. Bind both queues to the exchange:
queue1.bind(exchange, routing_key: "")
queue2.bind(exchange, routing_key: "")
  1. Declare a dead letter queue, taxi-dlq:
taxi_dlq = channel.queue('taxi-dlq', durable: true)
  1. Declare a dead letter fanout exchange, taxi-dlx:
dlx_exchange = channel.fanout('taxi-dlx')
  1. Now taxi-dlx needs to be bound to taxi-dlq:
taxi_dlq.bind(dlx_exchange, routing_key: "")
  1. Finally, publish a message:
exchange.publish("Hello! This is an information message!",   key: "")

As you can see, this is just a standard fanout exchange declaration along with the related queue declaration and binding. The same logic was used when implementing the public address system in Chapter 3, Sending Messages to Multiple Drivers.

To simplify things even more, make sure that you log enough contextual data when an exception occurs. Always consider what information will be needed to perform forensics for a particular exception, if necessary.

After rolling out this code to the application servers, note that the dead letter exchange and queue have been correctly created. Now it is time to set the "Q_TTL_DLX" policy, as shown in the following code:

$ sudo rabbitmqctl set_policy 
-p cc-dev-vhost Q_TTL_DLX "taxi-inbox.d+ " '{"message-ttl":604800000, "dead-letter-exchange":"taxi-dlx"}' --apply-to queues
Setting policy "Q_TTL_DLX" for pattern "taxi-inbox.d+ " to "{"message-ttl":604800000, "dead-letter-exchange":"taxi-dlx"}" with priority "0" ... ...done.

After running this script, use the management console to see what's been changed on the user inbox queue definitions.

The following screenshot shows a few of these queues:

Fig 4.3: The Q_TTL_DLX policy is applied to all taxi queues

The following screenshot demonstrates that the Q_TTL_DLX policy has been applied to the taxi queue, while other queues, such as taxi-dlq, haven't been affected:

Fig 4.4: The Q_TTL_DLX policy is applied to the taxi-inbox.1 queue

In the management interface, click on the Admin tab and then the Policies tab (on the right). Note how the custom policy is visible in the following screenshot:

Fig 4.5: The Q_TTL_DLX policy is in the admin view

At this point, any message created that will stay for more than 7 days in a taxi queue will be unmercifully moved to the taxi_dlq, consumed, potentially emailed, and buried for real! But what should be done with the existing messages that were created before the policy was rolled out?

There is, unfortunately, no out-of-the-box solution to this problem, so the somewhat drastic measure to purge all the queues that are not empty and have no active subscribers must be taken. This is rough, but it is the only way to get out of the current situation. Moreover, it's a solution that is easily implemented with a simple script.

Thus far, the rabbitmqctl script has been used to manage the RabbitMQ broker. The next steps require the installation of a new script that comes bundled with the management console installed in Chapter 1, A Rabbit Springs to Life. This script, called rabbitmqadmin, can be downloaded by simply browsing to a particular URL in the management interface, namely http://localhost:15672/cli/. After following the displayed download instructions, install the script in a location that makes it available to all users (typically /usr/local/bin on a Linux machine).

More information on the rabbitmqadmin script can be found at http://www.rabbitmq.com/management-cli.html.

The following code shows how to create a script that will drop all consumerless queues that are not empty:

#!/bin/bash

queues_to_purge=`rabbitmqctl list_queues -p cc-dev-vhost name messages_ready consumers | grep "taxi.[[:digit:]]+[[:space:]]+[1-9][[:digit:]]*[[:space:]]+0" | awk '{ print $1}'`

for queue in $queues_to_purge ; do
    echo -n "Purging $queue ... "
    rabbitmqadmin -V cc-dev-vhost -u cc-admin -p taxi123 purge queue name=$queue
done

Note that both rabbitmqctl and rabbitmqadmin were used to achieve the goal, the former with the ability to list specific attributes of queues in a way that's easy to parse and the latter with the ability to purge queues. After executing this script as a superuser, the state of the RabbitMQ broker is fit for purpose and the TTL and DLX policies will keep it that way in the long run!

CC now wants to send out a survey to all customers that have completed a ride with the taxi a few minutes after the completed ride. Let's see how it is possible to use dead letter exchanges and TTL to delay message delivery within RabbitMQ.

Delayed messages with RabbitMQ

While finishing off the work with this feature, the back office realizes that they can publish messages with a fixed delay so that consumers don’t see them immediately. This is a perfect feature for their survey, which should be sent out to customers 5 minutes after a finished ride. The AMQP protocol doesn't have a native delayed queue feature, but one can easily be emulated by combining the message TTL function and the dead-lettering function. 

The Delayed Message Plugin is available for RabbitMQ 3.5.3 and later versions of RabbitMQ. The Delayed Message Plugin adds a new exchange type to RabbitMQ. It is possible to delay messages routed via that exchange by adding a delay header to a message. You can read more about the plugin at https://github.com/rabbitmq/rabbitmq-delayed-message-exchange.

CC decides to publish survey request messages to a delayed queue once the driver has marked a ride as completed. All survey request messages are set to expire after a TTL of 5 minutes. The routing key of the message is then changed to the same as the destination queue name. This means that the survey request message will end up in the queue from which the survey request should be sent.

The following is an example of the code that CC would use. Messages are first delivered to the DELAYED_QUEUE called work.later. After 300,000 ms, messages are dead lettered and routed to the DESTINATION_QUEUE called work.now

  1. We start by assigning the variables:
DELAYED_QUEUE='work.later'
DESTINATION_QUEUE='work.now'
  1. After that, we define the publish method. There are a lot of things happening here:
  • First, the delayed queueDELAYED_QUEUE, is declared and x-dead-letter-exchange is set to the default queue.
  • A routing key for dead-lettering messages is set via the x-dead-letter-routing-key argument to DESTINATION_QUEUE.
  • The number of milliseconds to delay a message is specified in the message TTL x-message-ttl argument.
  1. Finally, a message is published to the default exchange, where DELAYED_QUEUE is used as a routing key:
def publish
channel = connection.create_channel

channel.queue(DELAYED_QUEUE, arguments: {
'x-dead-letter-exchange' => '',
'x-dead-letter-routing-key' => DESTINATION_QUEUE,
'x-message-ttl' => 300000
})

channel.default_exchange.publish 'message content', routing_key: DELAYED_QUEUE
puts "#{Time.now}: Published the message"
channel.close
end
  1. Then we define the subscribe method and handle the message:
def subscribe
channel = connection.create_channel
q = channel.queue DESTINATION_QUEUE, durable: true
q.subscribe do |delivery, headers, body|
puts "#{Time.now}: Got the message"
end
end
  1. Lastly, we call both methods:
subscribe()
publish()

That's it! The survey request feature is implemented. But, of course, a new feature is requested immediately. The back office wants to be able to send messages to single drivers and to also make sure that all drivers, even drivers without a RabbitMQ taxi inbox, receive the message. Let's look at the mandatory delivery of messages in RabbitMQ.

Making delivery mandatory

So far, the back office team at CC has been relying only on emails to interact with individual drivers. CC recently added the RabbitMQ-powered system discussed in Chapter 3, Sending Messages to Multiple Taxi Drivers, allowing the back office to send information messages to all drivers. They now want to explore the possibility of sending messages to individual drivers from the back office service. Furthermore, if possible, CC would like drivers who do not have an inbox queue set up on RabbitMQ to get the message emailed to them immediately.

In terms of messaging architecture, this is a known territory—the exact same model was put in place in Chapter 2, Creating a Taxi Application, for client-to-taxi messages, as illustrated in the following diagram: 

Fig 4.6: The back office team will use the taxi direct exchange for direct messages to drivers

A direct exchange is used. The only difference is that, unlike the main application, the back office will not create and bind a taxi queue prior to sending a message. Instead, the back office will have to somehow detect that no such queue exists already and revert to email delivery for the message.

What's unclear is how to achieve the second part of these requirements: how can the back office check for the existence of a queue? The AMQP specification doesn't specify a direct way to do this. The RabbitMQ management plugin exposes a REST API that could be used to check the existence of a queue, which is a tempting approach, but not what AMQP offers by default, which is preferred. Moreover, this could expose the process to a check-then-act type of race condition.

Indeed, the queue could be created by another process after it is verified that it doesn't exist. Digging deep into the AMQP specification uncovers a feature that will handle this more elegantly, namely mandatory delivery. The mandatory field is part of the AMQP specification that tells RabbitMQ how to react if a message cannot be routed to a queue.

Consider the management REST API of RabbitMQ for cases when the AMQP specification doesn't have any way to support the functionality required. You can access the REST API documentation on the RabbitMQ broker at http://localhost:15672/api/.

When a message is published on an exchange with the mandatory flag set to true, it will be returned by RabbitMQ if the message cannot be delivered to a queue. A message cannot be delivered to a queue either because no queue is bound to the exchange or because none of the bound queues have a routing key that would match the routing rules of the exchange. In the current case, this would mean that no taxi inbox queue is bound to a routing key that matches the taxi ID.

The trick with returned messages is that RabbitMQ doesn't return them synchronously as a response to the publish operation: it returns them in an asynchronous fashion. This means that, for the developer, a specific message handler will have to be registered with RabbitMQ in order to receive the returned messages.

This leads to the overall architecture illustrated in the following diagram:

Fig 4.7: A dedicated handler takes care of returned messages

Messages published to a queue that does not exist are returned to the return handler. This handler is now in charge of making sure that the information message reaches the driver in some other way—for example, through email.

The default exchange type will be described before the new back office sender will be implemented.

Default exchanges in RabbitMQ

Each time a queue is created, it gets automatically bound to the default exchange with its queue name as the routing key. By publishing a message to the default exchange using the queue name as the routing key, the message will end up in the designated queue. This is also something that is going to be added into the following code example, in the Implementing the back office sender section. 

What is this mysterious default exchange? It is a direct and durable exchange named " " (an empty string) that is automatically created by RabbitMQ for each virtual host.

To make the default exchange visible in the management console, its empty string name is rendered as the AMQP default, as shown in the following screenshot:

Fig 4.8: The default exchange is one among several built-in exchanges

As you can see, there are a host of other predeclared exchanges that are automatically created for every virtual host. They are easy to spot because their names start with amq. These exchanges are meant for testing and prototyping purposes only, so there is no need to use them in production.

Sending messages to the default exchange is a convenient way to reach a particular queue; however, do not overuse this pattern. It creates tight coupling between producers and consumers because the producer becomes aware of particular queue names.

With this explained, it's now time to add the necessary code to build this feature that was requested by the back office, which is implemented if a driver doesn't have an existing inbox queue.

Implementing the back office sender

CC's back office is now adding support for messages that were sent to drivers without a taxi inbox queue, messages that were returned. The Ruby client library, among other libraries, supports this feature very elegantly. The following is the required code to support the mandatory delivery of messages to taxi inboxes and to handle potentially returned messages.

Start out by requiring the bunny client library and then set up a connection and a channel to RabbitMQ, as described in Chapter 2, Creating a Taxi Application:

require "bunny"
connection = Bunny.new ENV["RABBITMQ_URI"]

connection.start
channel = connection.create_channel

Then, declare a default exchange:

exchange = channel.default_exchange

A return handler is created, which handles the returned message:

exchange.on_return do |return_info, properties, content|
puts "A returned message!"
end

Next, declare a durable inbox queue—in this example, named taxi-inbox.100:

queue = channel.queue("taxi-inbox.100", durable: true)

Subscribe to messages from RabbitMQ and give a simple notification to the developer. At this point, an email is sent, but please note that this example is kept short on purpose and doesn't include the method for actually sending the email:

queue.subscribe do |delivery_info, properties, content|
puts "A message is consumed."
end

Messages are published with routing_key to target a particular taxi with the mandatory flag set to true. Since this queue is created and exists, this message should not be returned:

exchange.publish("A message published to a queue that does exist, it should NOT be returned", :mandatory => true, :routing_key => queue.name)

Another mandatory message is published, but this time to a random queue. This message is going to be returned and handled by the return handler:

exchange.publish("A message published to a queue that does not exist, it should be returned", :mandatory => true, :routing_key => "random-key")

Finally, close the connection:

connection.close

The preceding code example includes one message published to a queue that exists, while the other message is published to a queue with a random key name, a queue that does not exist. More code examples can be found at http://rubybunny.info/articles/exchanges.html.

That is all! The feature is ready to go live. Messages are returned asynchronously and there is no need to handle them the right way.

Summary

This chapter included information about message TTL and explored how to use message property name expiration values while looking at other important topics about tweaking message delivery. The information also described the use of dead-letter exchanges and queues. The chapter then took a look at how to use the default exchange and how to send mandatory messages.

CC is growing into a proper company and its platform is keeping right up with new features to meet the demands of drivers, customers, and back-office staff.

So far, only asynchronous interactions with RabbitMQ have been discussed, which makes sense because it's the core premise of messaging. That said, it's possible to perform synchronous operations too, as the next chapter demonstrates. The following chapter will include information on the direct interaction between the taxi and the client. What will the next feature rollout entail? The only way to find out is to keep reading!

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset