Taking RabbitMQ to Production

At this point, Complete Car (CC) is running a single instance of RabbitMQ in production. Now CC also needs to ensure that the service is highly available. Creating clusters of nodes ensures that information is reachable even if systems go down. This chapter covers how to set up RabbitMQ clusters, including coverage of broker clustering, classic mirrored queues, and quorum queues. CC is also looking for a new elegant solution for log aggregation, where all logs are published to a centralized RabbitMQ node through the federation plugin, so this chapter will cover this topic as well.

To achieve CC's goal of nearly constant uptime, the topics in this chapter will include the following:

  • Adding nodes to the cluster
  • Discovering the types of RabbitMQ queues
  • Using federated brokers and log aggregation

Technical requirements

The code files of this chapter can be found on GitHub at https://github.com/PacktPublishing/RabbitMQ-Essentials-Second-Edition/tree/master/Chapter06.

Adding nodes to the cluster

Things have been running smoothly for CC, but developers want to ensure that the system can survive a crash. A crash is always possible, even when using RabbitMQ. Power outages happen, sudden packet losses may corrupt updates, and administrators can improperly configure the system by accident. There is still a chance that, due to a glitch or error, an entire instance could be lost. Steps must be taken to address any issues that could lead to data loss, negative customer experience, or even the dreaded 2 a.m. phone call to the team.

The good news is that RabbitMQ provides the features needed to deal with potential crashes and other catastrophes right out of the box. RabbitMQ can be configured to run in an active-active deployment environment, meaning that two or more nodes actively run the same kind of service simultaneously. Several brokers can be engaged in a cluster to act as a single highly available Advanced Message Queuing Protocol (AMQP) service.

There is no need to resort to manual failover when using active-active deployment. No operation is needed if a broker goes down, sparing the team that 2 a.m. phone call. Depending on the number of active nodes in the high-availability cluster, a cluster can sustain several failures.

To avoid complications resulting from an unreachable broker, CC decides to start by rolling out a second RabbitMQ instance (named rmq-prod-2), clustering it with the one already used in production.

A RabbitMQ cluster is a logical grouping of one or several nodes, each sharing users, virtual hosts, queues, exchanges, and so on. The system architecture changes only inside the cluster, as seen in the following diagram:

 Fig 6.1: A high-availability cluster of many RabbitMQ brokers

More nodes are added into the RabbitMQ cluster. CC informs the team when the second instance of RabbitMQ is ready to be clustered with the existing one. To make this happen, the Erlang clustering feature will be used with RabbitMQ to allow local or remote communication between several Erlang nodes. Erlang clustering uses a security cookie as the mechanism for cross-node authentication. To avoid errors, the developers have made sure that the content of /var/lib/rabbitmq/.erlang.cookie is the same in each instance.

Note that a cluster will not work if a firewall blocks the RabbitMQ instances from communicating with each other. If that happens, open the specific ports used by AMQP (defaulting to 5672) so that the cluster will work. Get more information at, http://www.rabbitmq.com/clustering.html#firewall.

There is no need to configure any users or virtual hosts on the second node as done in Chapter 1, A Rabbit Springs to Life. Just join the cluster and the configuration will automatically synchronize with the existing RabbitMQ instance, including users, virtual hosts, exchanges, queues, and policies.

Keep in mind that a node completely resets when it joins a cluster. RabbitMQ deletes all configuration and data before synchronizing with the other nodes.

To join a node to a cluster, first stop RabbitMQ, then join the cluster, and finally restart the RabbitMQ application:

$ sudo rabbitmqctl stop_app
# => Stopping node rabbit@rmq-prod-2 ...
# => ...done.
$ sudo rabbitmqctl join_cluster rabbit@rmq-prod-1
# => Clustering node rabbit@rmq-prod-2 with rabbit@rmq-prod-1 ...
# => ...done.
$ sudo rabbitmqctl start_app
# => Starting node rabbit@rmq-prod-2 ...
# => ...done.

Make sure the same major version of Erlang is used by all the RabbitMQ nodes or the join_cluster command might fail. It is possible to run a cluster with mixed Erlang versions, but there can be incompatibilities that will affect cluster stability.

RabbitMQ also requires the use of the same major/minor version across nodes up to and including 3.7.x. It is possible to run different patch versions (for example, 3.7.X and 3.7.Y) most of the time, except when indicated otherwise in the release notes.

Feature flags is a mechanism new to RabbitMQ version 3.8. These flags define a RabbitMQ node's ability to become a part of a cluster. Feature flags control which features are considered enabled or available on all cluster nodes, so nodes using the subsystem must have the same dependencies. Read more at https://www.rabbitmq.com/feature-flags.html.

After running the preceding commands, check to see whether the cluster is active by running the cluster_status command on any node:

$ sudo rabbitmqctl cluster_status 
# => Cluster status of node rabbit@rmq-prod-1 # ->
# => [{nodes,[{disc,[rabbit@rmq-prod-2,rabbit@rmq-prod-1]}]}, {running_nodes,[rabbit@rmq-prod-2,rabbit@rmq-prod-1]}, {partitions,[]}]
# => ...done.

Notice how two lists of nodes are given in the status message. In this case, the nodes are the list of configured nodes in the cluster. The list named running_nodes contains those that are actually active. Configured nodes are persistent, meaning they will survive broker restarts since each broker automatically re-engages with the cluster.

Confirm that the new node will synchronize with the cluster by connecting to the management console on another node (rmq-prod-2). Use the cc-admin user to log in and go to the Queues view.

The configuration should be synchronized as shown in the following screenshot:

Fig 6.2: All configurations are synchronized after joining the cluster

To add more nodes, let each new node join another node in the cluster. The Overview tab in the management console of the first node shows all the nodes that are in the cluster, which are automatically discovered, as shown in the following screenshot:

Fig 6.3: The management console overview shows all cluster members

As shown, all members of the cluster are listed along with basic statistics and ports. The different values shown in the Info column are as follows:

  • basic: Describes the rates_mode, which tells how the queues report statistics. This can be one of basic (the default), detailed, or none.
  • disc: Means that the node persists data to the filesystem, which is the default behavior. It is also possible to start a node in RAM mode, where all message data is stored in memory, which can speed up systems provided that they have enough memory.
  • 7: Shows the number of plugins that are enabled.
  • allocated: Describes the memory calculation strategy.

Nodes can be removed (http://www.rabbitmq.com/clustering.html#breakup) from the cluster through rabbitmqctl, the command-line tool for managing a RabbitMQ server node.

All CC applications are currently connecting to one single RabbitMQ node. This needs to be modified. Applications should try to connect to one node first and fail over to another node if the original attempt fails. Read on to see how that is done.

Connecting to the cluster

All CC applications currently connect to a single RabbitMQ node, which needs to be modified to benefit from the advantages of the cluster. All applications connecting to RabbitMQ need to be modified. The applications should try to connect to one node first, failing over to another if the original attempt fails. This is the only required change; the applications will interact with the broker as they did before.

First, modify the main application connection Ruby code as follows:

begin
connection = Bunny.new(
hosts: ['rmq-prod-01', 'rmq-prod-02'])
connection.start
rescue Bunny::TCPConnectionFailed => e
puts "Connection to server failed"
end

Basically, the list of broker addresses is passed. With this in place, the RabbitMQ Ruby client will connect to the first responsive node in the address list and will try each of the provided broker addresses until it can establish a connection or eventually fails. In the case of failure, the overall reconnect mechanism that's already in place will kick in and the addresses will once again be attempted for connection.

It is possible to manually synchronize a mirrored queue using the rabbitmqctl sync_queue <queue_name> command. Cancel the synchronization with rabbitmqctl cancel_sync_queue <queue_name>.

At this point, there is only one more step to perform to ensure the high availability of the queue data: enabling a way to spread the data to the other node(s). The options available are classic mirrored queues and quorum queues. But first, some partition handling strategies.

Partition handling strategies

Adding even more nodes to the cluster is of course possible. However, this brings a new challenge in the form of network connectivity. Split-brains and early message confirmation are common issues when using more than one node. Split-brains occur in distributed systems when a portion of the network becomes unreachable from another portion, creating network partitions (called a netsplit). To avoid this situation, set a partition handling strategy. In RabbitMQ, this is set through the cluster_partition_handling parameter in the configuration file – https://www.rabbitmq.com/partitions.html#automatic-handling.

The pause-minority strategy terminates nodes in the minority partition. This is the default way to resolve split-brains in many distributed networks. The pause-if-all-down feature only pauses a node if none are reachable. This is inadvisable as it creates large discrepancies between the data in each partition.

Once nodes become available in the pause-if-all-down setting, two more options are available to specify how to reconnect the network. Simply ignore another partition or auto-heal the cluster. The nodes the system cannot pause must also be specified. In the pause-minority strategy, the partitions reconnect when available.

RabbitMQ ensures synchronization across clusters. Clients can reach their exchanges and queues over any node; however, the messages themselves are not carried over. The next section covers how that can be done.

Discovering the types of RabbitMQ queues

Queues in RabbitMQ can be durable or transient. Classic mirrored queues are recommended for transient message handling, while quorum queues are a good alternative for durable queues. 

Durable queue metadata is stored on disk while a transient queue stores it in memory, when possible. Another queue type, lazy queues, writes the contents to disk as early as possible for both durable and transient messages.

Due to technical limitations in classic mirrored queues, it is difficult to make guarantees on how failures are handled. The RabbitMQ documentation (https://www.rabbitmq.com/ha.html) recommends that users get familiar with quorum queues and consider them instead of classic mirrored queues where possible.

Mirroring queues

In the case of CC, the data in the queues needs to be highly available. Mirrored queues provide this type of security. Queue mirroring uses a master-mirror design pattern. All message queuing and dequeuing actions happen with the master, and the mirrors receive the updates periodically from the master. If a master becomes unavailable, RabbitMQ promotes a mirror to a master; usually, the oldest mirror becomes the new master, as long as it is synchronized.

It is also possible to set up a master-master system by sending data to a different cluster in addition to the original. This provides a useful backup for hardware updates and extreme cases of failure. It can also help speed up interaction in different geographic regions.

Telling the cluster how to mirror queues must in our case be done via the Q_TTL_DLX policy since only one policy at a time is allowed in a queue or exchange. The first step is to clear the policy created in Chapter 4, Tweaking Message Delivery, then applying a new policy combining the Q_TTL_DLX policy with one created for queue mirroring.

Run the following commands to change the Q_TTL_DLX policy and tell RabbitMQ how to mirror queues. Start by clearing the policy:

$ sudo rabbitmqctl clear_policy -p cc-prod-vhost Q_TTL_DLX
# => Clearing policy "Q_TTL_DLX"
# => ......done.
"Specify the new HA_Q_TTL_DLX policy:"
$ sudo rabbitmqctl set_policy -p cc-prod-vhost HA_Q_TTL_DLX "taxi.d+" '{"message-ttl":604800000, "dead-letter-exchange":"taxi-dlx", "ha-mode":"all", "ha-sync-mode":"automatic"}' --apply-to queues
# => Setting policy "HA_Q_TTL_DLX" for pattern "taxi.d+" to "{"ha-mode":"all", "message-ttl":604800000, "dead-letter-exchange":"taxi-dlx"}" with priority "0"
# => ......done.

Alternatively, add the policy from the management console, as shown in the following screenshot:

Fig 6.4: Policy added via the RabbitMQ management console

High availability mode has been added to the existing TTL and DLX policy rules. The all value for ha-mode tells RabbitMQ to mirror queues across all nodes in the cluster, which is exactly what CC wants in their two-node cluster. The other options are exactly and nodes, allowing developers to specify the number of nodes when using the exact option and a list of node names when using the nodes option through the ha-params parameters.

The ha-sync-mode parameter is used to specify the synchronization mode for the mirrored queue. This parameter can be set to manual or automatic. In manual mode, a newly mirrored queue will not receive any existing messages but will eventually become consistent with the master queue as consumers retrieve messages. This reduces overhead at the cost of losing information. Automatic mode sends messages to each queue, meaning a small hit to the system performance. 

CC decides to use immediate queue synchronization so that any existing messages become visible across all nodes nearly instantaneously. CC is fine with the initial unresponsiveness this creates since performance is not critical for user messages.

Navigate to the Queues tab in the management console after running the preceding command. Observe that the HA_Q_TTL_DLX policy has been applied to the intended queues:

Fig 6.5: Mirrored queues with the high availability policies applied

Notice how the mirrored queues have a +1 next to them. This denotes the fact that the queues are mirrored to another node in the cluster. The master (rabbit@rmq-prod-1) and the mirror nodes (rabbit@rmq-prod-2) are clearly defined in the Details section of each queue in the management console as well, as seen in the following screenshot:

Fig 6.6: Master and mirror nodes are detailed

At this point, the RabbitMQ brokers are clustered and taxi order request queues are mirrored. Client applications can benefit from this highly available deployment and connect to different nodes.

Setting the Master Queue Location: Every queue has a primary replica known as the queue master. This queue is the first to receive messages before synchronization. It is possible to influence how this is set using the x-queue-master-locator parameter in the Queues tab of the management console or when creating a queue programmatically.

Quorum queues are a new type of queue, often recommended over classic mirrored queues.

Quorum queues

As an alternative to durable mirrored queues, quorum queues ensure that the cluster is up to date by agreeing on the contents of a queue. In doing so, quorum queues avoid losing data, which could occur with mirrored queues when messages are confirmed too early. Quorum queues are available as of RabbitMQ 3.8.0. As detailed in the RabbitMQ documentation (https://www.rabbitmq.com/quorum-queues.html), some transient features are not available when using quorum queues.

A quorum queue has a leader that roughly serves the same purpose as it did for the classic mirrored queue master. All communication is routed to the queue leader, which means the queue leader locality has an effect on the latency and bandwidth requirement of the messages; however, the effect should be lower than it was in classic mirrored queues.

In quorum queues, the leader and replication are consensus-driven, which means they agree on the state of the queue and its contents. While mirrored queues may confirm messages too early and lose data, quorum queues will only confirm when the majority of its nodes are available, which thereby avoids data loss. 

Declare a quorum queue using the following command:

rabbitmqadmin declare queue name=<name> durable=true arguments='{“x-queue-type”: “quorum”}'

These queues must be durable and instantiated by setting the x-queue-type header to quorum. If the majority of nodes agree on the contents of a queue, the data is valid. Otherwise, the system attempts to bring all queues up to date.

Quorum queues have support for the handling of poison messages, which are messages that are never consumed completely or positively acknowledged.

The number of unsuccessful delivery attempts can be tracked and displayed in the x-delivery-count header. A poison message can be dead-lettered when it has been returned more times than configured.

Lazy queues are another queue type worth exploring, so read on.

Lazy queues

Queues can become long for various reasons including consumer maintenance or the arrival of large batches of messages. While RabbitMQ can support millions of messages, keeping queues as short as possible is recommended by most experts. Messages are stored in memory by default. RabbitMQ then flushes messages (page out) to free up the RAM usage when the queue becomes too long for the underlying instance to handle. Storing messages in RAM enables faster delivery of messages to consumers than storing them to disk.

The page out function usually takes time and often stops the queue from processing messages, which deteriorates the queue speed. For this reason, queues that contain a lot of messages can have a negative impact on the broker's performance. Additionally, it takes a lot of time to rebuild the index after a cluster is restarted and to sync messages between nodes.

Beginning with RabbitMQ version 3.6, a policy called lazy queues was added to enable the storage of messages to disk automatically in order to minimize RAM usage. Lazy queues can be enabled by setting the mode via the queue.declare arguments or by applying a policy to all queues. 

Persistent messages can be written to the disk as they enter the broker and be kept in RAM at the same time.

Different queue types have been shown, and it's time to look into how CC should handle log aggregation from all clusters.

Using federated brokers and log aggregation

The way a cluster of two RabbitMQ brokers is created is really similar to what is typically done when making a relational database highly available. The database remains a centralized resource offering high guarantees of availability. Still, RabbitMQ is not a one-trick rabbit when it comes to high availability. 

To form a picture of a RabbitMQ system, the following two plugins allow broker connection:

  • Shovel: Connects queues and exchanges between different brokers
  • Federation: Forms cross-broker connections for queues to queues, or exchanges to exchanges

Both plugins ensure the reliable delivery of messages across brokers by routing them as instructed or offering a safe place for them to remain until they can be dealt with. Neither requires the brokers to be clustered, which simplifies setup and management. Moreover, both plugins work fine over WAN connections, which isn't the case in a clustering scenario.

Configure the destination node in a federation manually. The upstream nodes are configured automatically. On the other hand, shovels must have each source node configured manually to send to a destination node, which itself doesn't require any configuration.

The CC team is requesting a good way to process logs, and they quickly realize that the federation plugin suits the process well.

Handling log processing 

CC's system is growing and growing, and so is the team of taxi drivers and developers. The team that is in charge of analytics has been looking for an elegant solution to aggregate logs from different applications in order to roll out new statistics, both for internal and end-user consumption. Fortunately, RabbitMQ can be used for application log processing thanks to its high performance.

In this topology, all applications will write to a local RabbitMQ node, which will act as a store-and-forward broker, pushing all logs to a centralized RabbitMQ node as shown in the following diagram:

Fig 6.7: A topology that federates log messages to a central broker

If this central node is down, the log entries will remain locally accumulated until it comes back up. Messages flow through an exchange in one location (called the upstream) to be replicated to exchanges in other locations (the downstream), as seen in the following diagram:

Fig 6.8: Exchange federation message flow

Obviously, the assumption here is that the local RabbitMQ nodes are extremely stable. The experience with running RabbitMQ in the past few months will help with this approach. Moreover, logs are considered important but not critical data for CC, so a best-effort approach is acceptable. Knowing this, the team chooses to use the federation plugin, as it's the one that supports federation to queue connectivity (with the shovel plugin, messages would have to be accumulated in a local queue on each node).

Remember, all queues that were mirrored in the previous section were queues that matched the taxi-inbox.d+ regex pattern. All log queues mentioned now are left out of the equation. That's how the CC team wants it, as they don't want to mirror such highly trafficked queues. What could be done in order for CC to enjoy the same guarantees for log aggregation? Enter the notion of messaging topologies.

More information on the shovel plugin can be found at http://www.rabbitmq.com/shovel.html.

The federation plugin needs to be installed on all RabbitMQ nodes that will engage in the topology by running the following commands on each node:

 $ sudo rabbitmq-plugins enable rabbitmq_federation 
Applying plugin configuration to rabbit@app-prod-1...

$ sudo rabbitmq-plugins enable rabbitmq_federation_management
Applying plugin configuration to rabbit@app-prod-1...

Moreover, unlike with clustering, each node needs to be manually set up to have the desired user and virtual host configured. As discussed in Chapter 1, A Rabbit Springs to Life, it is time to run the necessary command. Next, the apps-log exchange federation itself must be configured. This involves multiple steps (detailed shortly) that are all run on the central broker, the one toward which all logs will converge, the downstream.

First, the upstreams are configured, which are the RabbitMQ nodes that will send data to the central broker. Three upstreams are needed since there are three servers that will send logs, app-prod-1, app-prod-2, and app-prod-3; however, in the interest of brevity, only two nodes will be shown in the following example.

An upstream can be added via rabbitmqctl:

# Adds a federation upstream named "app-prod-logs"
rabbitmqctl -p logs-prod set_parameter federation-upstream app-prod-logs '{"uri":"amqp://cc-prod:******@app-prod-1:5672/cc-prod-vhost"}'

Alternatively, the policy can be added via the management console:

Fig 6.9: Add a federation upstream named app-prod-logs to the downstream broker

Once an upstream has been specified in the downstream, a policy that controls the federation can be added to the downstream server as well. The app-prod-logs federation is added just like any other policy (https://www.rabbitmq.com/parameters.html#policies) by using the terminal:

rabbitmqctl set_policy -p logs-prod --apply-to exchanges log-exchange-federation "^app-logs*" '{"federation-upstream-set":"all"}' --apply-to exchanges

The policy can also be added through the management console:

Fig 6.10: Federation policy added to the downstream server

The CC team does this by applying a policy that matches the exchange names. The pattern argument is a regular expression used to match queue (or exchange) names. In CC's case, the federation policy is applied to all exchanges with names beginning with app-prod.

A policy can apply to an upstream set or to a single exchange or queue upstream. In this example, federation-upstream-set is applied to all upstreams.

If it is certain that there will never be more than one logical group of upstreams, the creation of an upstream set is skipped in favor of using the implicit set named all, which automatically contains all the upstreams in a virtual host.

In this case, it is good to make sure that the user that the federation plugin will use in the central broker to interact with the federated exchange is also configured.

Browse to the Federation Upstreams tab in the Admin section of the management console, which will show that the upstream has been correctly configured, as shown in the following screenshot:

Fig 6.11: Upstream nodes are configured in a federation

Switching to Federation Status shows an empty screen as it's inactive. Why is that? After all, the topology was just created. The reason is that no exchange or queue is actively engaged in the topology yet. Because of its dynamic nature, the federation is inactive. Creating the app-logs exchange on both the upstream and the downstream servers and binding the app-logs exchange to queues is the next step before returning to the Federation Status tab. It is here noted that the federation is now running links for the app-logs exchange from the two upstream nodes of the configured set. See the following screenshot:

Fig 6.12: Running upstream links for a federated exchange

It's possible to get the status of the federation from the command line by running sudo rabbitmqctl eval rabbit_federation_status:status() on the downstream node.

The Connections and Channels tabs of the management console now show that the downstream node is connected to the upstream node over the AMQP protocol, as seen in the following screenshot:

Fig 6.13: Federation link in the Connections tab

Except for the setup of the topology itself, there's nothing magical about the federation. It's been built on top of AMQP, and thus benefits from the same advantages offered by the protocol. Hence, if the RabbitMQ instances are firewalled, no special port other than the one used by AMQP (5672 by default) needs to be opened.

Summary

The CC example has provided information on how to create a basic message queue architecture, add valuable features to meet user demand, and keep a system running flawlessly. This chapter covered how RabbitMQ delivers powerful features through clustering and federation and how these features increase the availability and overall resilience of the messaging infrastructure. Quorum, classic mirrored, and lazy queues were also explored.

Along the way, information and guidance on best practices for a reliable, resilient system were offered. The next chapter highlights these recommendations and provides key takeaways from CC's journey through RabbitMQ. It also explores monitoring of RabbitMQ.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset