In this section, we'll cover the various knobs, settings, and API surface area that Pika exposes to you. The programmer Pika is a python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library. Pika doesn't require threads. It takes care of to forbidding them either. The same goes for greenlets, callbacks, continuations, and generators. Pika is available for download via PyPI and can be installed using easy_install
or pip
:
pip install pika
You can also use this:
easy_installpika
There are two ways to set up a connection with Pika. One is to explicitly specify the kind of options you want and expect RabbitMQ to respect as part of a ConnectionParameters
object, and the other is by specifying a URL that lines out all of the various parameters that you'd like.
Specifying a connection option through a unified URL is a lot more useful these days, as most PaaS platforms, such as Heroku and their add-on partners (such as a RabbitMQaddon), expect you to do it in this way to the promote dynamic behavior of your application by setting a simple environment variable.
Let's start by showing a few examples of the URL parameters configuration option:
amqps://www-data:rabbit_pwd@rabbit1/web_messages
This URL represents a simple authenticated connection, a host named rabbit1
and a virtual host called web_messages
:
amqps://www-data:rabbit_pwd@rabbit1/web_messages?heartbeat_interval=30
This URL represents a simple authenticated connection, a host named rabbit1
, a virtual host called web_messages
, and an explicit heartbeat_interval
of 30 seconds:
amqp://www-data:rabbit_pwd@rabbit1/web_messages?heartbeat_interval=30&ssl_options=%7B%27keyfile%27%3A+%27%2Fetc%2Fssl%2Fmykey.pem%27%2C+%27certfile%27%3A+%27%2Fetc%2Fssl%2Fmycert.pem%27%7D
This URL represents a simple authenticated connection, a host named rabbit1
, a virtual host called web_messages, and an explicit heartbeat_interval
of 30 seconds; an explicit SSL certificates for the secure connection setup.
To use the URL parameters method, we simply use Pika:
pika.URLParameters('amqps://www:pwd@rabbit1/web_messages')
Here are some other options you can set up at the query-param level:
backpressure_detection
: This disabled by default. Pass a value of that specifies how to handle clients that are too fast (previously, flow control in RabbitMQ).channel_ma
: This is the maximum number of channels allowed for this connection.connection_attempts
: This is default 1
.frame_max
: This is maximum frame size and is useful for performance tuning.heartbeat_interval
: This is the client/server heartbeat interval. In the past, a small value of 5 seconds used to be good, but today, it is encouraged to go with a higher value of 30
.locale
: This is the client locale and is useful if you use a different locale.retry_delay
: This is the time of seconds to wait between connection retries. It usually goes with connection_attempts.socket_timeou
: default 0.25
.ssl_options
: URL encoded dict of the following keys: ca_certs
, cert_reqs
, certfile
, keyfile
, ssl_version
.The next way to connect to a RabbitMQbroker is via an explicit ConnectionParameters
object; which is much like what we did so far in our scraper project.
We initialize a ConnectionParameters
object simply with Pika
:
pika.ConnectionParameters(host='localhost')
However, once we customize it, we can use the same parameter that we just described for the URL parameters method:
pika.ConnectionParameters(host='localhost', heartbeat_interval=30, retry_delay=2)
In either case, once you have a parameter's object in your hands, you can pass it down to the actual connection strategy that you've chosen, and then grab a connection:
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
In the previous examples, BlockingConnection
represents a connection adapter. A connection adapter is the abstraction that Pika uses in order to hide away the actual strategy it uses for connections. That is, whether it is being a blocking connection, an async I/O event loop, or plugging into the back end that you're using, such as Tornado or Twisted.
To cover most cases, you should only focus on the standard BlockingConnection
, and the SelectConnection
adapters.
Let's survey the API of BlockingConnection
(some of these may directly be relevant to other types of connections, such as SelectConnection
, since they are both a connection). Following are the parameters of BlockingConnection
:
add_backpressure_callback
: This adds a callback to be called when the client experiences a backpressure event from the brokerbasic_nack
: This finds out if the broker supports nack with thischannel
: This creates a new channelclose
: This disconnects and returns reply_code
and reply_text
(both are optional)is_closed/is_closing/is_open
: This finds out whether the connection is open, closed, or is closing with theseLet's survey the API of BlockingChannel
. This channel is the concept that you'll work against mostly in your programming with RabbitMQ:
add_on_close_callback
: This adds a callback to be called when the channel gets closed.add_on_flow_callback
: This adds a callback to be called when the client receives a flow control event.add_on_return_callback
: This adds a callback to be called when the publishing client gets a rejected publish from the server. This is a useful callback to set up, and many don't really treat failure for publishing with RabbitMQ. It can create a very creepy environment for a really hard bug in production.basic_ack / basic_nack
: This one is important. When you're using acknowledge in your message processing semantics, you'll have to use this. When calling this, you should always specify delivery_tag
, which you'll get on the consuming callback handler.basic_consume
: This is another big one parameter. When developing consumers and workers, it is most likely that you'll end up just using basic_consume
. The API is tight enough to suffice in most use cases. With basic_consume
you should also list out the following:consumer_callback
that is your handler functionqueue
that is your queue nameno_ack
that tells the broker whether we want to acknowledge or not (Boolean)exclusiv
e means 'not to allow any other on this queue (boolean)consumer_tag
is your own consumer tag, mostly don't use thisarguments
is custom arguments for consume, mostly don't use thisbasic_get
: This is just as basic_consume
, but it gets a single message right then and there, including the queue
and no_ack
parameters.basic_publish
: This is the main entry point to push messages onto the RabbitMQ broker. Here, we have several important parameters to describe; they're as follows:exchange
: This is used to publish.routing_key
: This is the routing key to bind on.body
: This is the message body.mandatory
: If this is false, the server silently drops a message that cannot be routed to a queue; otherwise it will signal the client.Immediate
: Same as preceding, but now the server will queue the message with no guarantees.basic_qos
– Through this API, the client can control the flow of messages in order to tweak and control the overall performance of processing messages. It can tell the broker to send less messages on a batch, a smaller or bigger messages batch size, and can tell whether to apply to all channels:prefetch_size
: This is the window size in terms of message size. It is invalid when specifying no_ack
.prefetch_count
: This is the window size in terms of all the messages. It is invalid when specifying no_ack
.all_channels
: This applies rules to all channels.basic_recover
: This asks the broker to redeliver all unacknowledged messsages.basic_reject
: This is used to reject a message against the broker. Must supply a delivery_tag
you can also tell the broker to requeue with the requeue boolean
flag.In the next few bits, we'll look at the Pika API designated for creating or declaring queues and exchanges. You'll usually make these kinds of calls at the prolog of your consumers or producers, and they will mostly feel like a 'setup code'.
Let's take a look at these now:
exhange_declare
: This creates an exchange if it doesn't already exist. Note that if one exist and you are specifying a new one with different parameters, there will be an error representing that. Here are the important parameters that you can specify:exchange
: This is the exchange nameexchange_type
: This is the type to use (direct, and more); consult the more-detailed RabbitMQ docs for the types and their semanticspassive
: This is used to see if an exchange exists, but it doesn't create onedurable
: This is used to survive a broker reboot—persistent exchangeauto_delete
: Remove this when done using it (no queues bound)internal
: This can only be published into by other exchangesqueue_declare
: This creates a queue if it is not existing with a specified sharing, durability, and other properties, shown as follows:queue
: This queues namespassive
: This is passive namedurable
: This survives reboots—persistentexclusive
: This allows you to share between consumersauto_delete
: Delete this after a consumer disconnectsqueue_bind
: This binds a queue to a specified exchange. You should provide the following parameters:queue
: This is the queue nameexchange
: This is the exchange namerouting_key
: This is the routing keyno_wait
: Do not wait for a bind okThis finalizes most of the API that you will use on a day-to-day basis. There's really not much to it, since Pika has been carefully crafted to simplify AMQP, as did other Pika clones (or maybe Pika is a clone of?), such as Ruby's immensely popular bunny
library, which I'm deeply fond of.
There are other edge-case API endpoints for the Pika Channel and Connection abstractions, such as deletion of queues, exchanges, unbinding, and cleanups, and more. Should you wish to explore further, refer to the official Pika API documentation.
When adopting RabbitMQ universally over your architecture and especially in the enterprise environment, security and authentication comes up immediately.
A few organizations would never even adopt a technology unless it has some widely recognized tiers of security built into them. This is why RabbitMQ grew to support connection authentication and SSL across the board.
As we've seen with the URL Parameters before, we can use HTTP Basic authentication lined right there on the URL that we pass to Pika. However, we can also use a more programmatic approach using the PlainCredentials
object:
importpika credentials = pika.PlainCredentials('www', 'pwd') parameters = pika.ConnectionParameters('rabbit1', 5672 '/', credentials)
From here now, we can pass parameters
back to our connection.
For those requiring a more secure authentication model, RabbitMQ and Pika allow an SSL-based connection.
Since we haven't seen it before, let's describe how we can get a secure connection through the URL Parameters method. To do this, we need to use a special amqps
scheme (the part that specifies what we know as the protocol in the URL):
amqps_URI = "amqps://" amqp_authority [ "/" vhost ]
This means we can draw up URLs like this:
amqps://user:pass@host:10000/vhost
To get at this kind of functionality programmatically, we need to use the External Credential object. We'll also take advantage of this example to show how to specify custom SSL certificates along the line:
ssl_options = ({"ca_certs": "caroot.pem", "certfile": "client.pem", "keyfile": "key.pem"}) parameters = pika.ConnectionParameters( host, 5671,credentials=ExternalCredentials(), ssl=True, ssl_options=ssl_options)
Here, we are specifying ssl_options
including CA-Root, a Client pem, and our actual key. We are switching credentials to External
with ExternalCredentials
since we have the help of PKI (private keys and certificates), which is arguably much stronger than a simple user/password combination.
Note that if we double-back to the URL method, we can again opt to use the pem keys and certificates to specify our SSL options in the same way. To do this, we'll need to encode our ssl_optionsdict
right there on our URL using urllib
:
url = urllib.urlencode({'ssl_options': {'ca_certs': 'caroot.pem', 'certfile': 'client.pem', 'keyfile': 'key.pem'}})
This concludes our exploration of the Pika API. Still, there are other Pika gems to be found within its API, but rest assured, not many. You're free to explore the nitty-gritty details at the official ReadTheDocs API, available at https://pika.readthedocs.org.
We'll continue by jumping a lot higher in the abstraction model and move up to the big shot tools that'll allow us to do background or queue-based processing without really getting our hands dirty with the AMQP protocol or even close.
Having just explored the Pika API gives you immense advantage over anyone else on this same space, since you know how stuff works, and well, sometimes, you don't need a very big hammer for a very small nail.
And now, let's explore big hammers.