The Pika API

In this section, we'll cover the various knobs, settings, and API surface area that Pika exposes to you. The programmer Pika is a python implementation of the AMQP 0-9-1 protocol that tries to stay fairly independent of the underlying network support library. Pika doesn't require threads. It takes care of to forbidding them either. The same goes for greenlets, callbacks, continuations, and generators. Pika is available for download via PyPI and can be installed using easy_install or pip:

pip install pika

You can also use this:

easy_installpika

Connecting

There are two ways to set up a connection with Pika. One is to explicitly specify the kind of options you want and expect RabbitMQ to respect as part of a ConnectionParameters object, and the other is by specifying a URL that lines out all of the various parameters that you'd like.

Specifying a connection option through a unified URL is a lot more useful these days, as most PaaS platforms, such as Heroku and their add-on partners (such as a RabbitMQaddon), expect you to do it in this way to the promote dynamic behavior of your application by setting a simple environment variable.

Let's start by showing a few examples of the URL parameters configuration option:

amqps://www-data:rabbit_pwd@rabbit1/web_messages

This URL represents a simple authenticated connection, a host named rabbit1 and a virtual host called web_messages:

amqps://www-data:rabbit_pwd@rabbit1/web_messages?heartbeat_interval=30

This URL represents a simple authenticated connection, a host named rabbit1, a virtual host called web_messages, and an explicit heartbeat_interval of 30 seconds:

amqp://www-data:rabbit_pwd@rabbit1/web_messages?heartbeat_interval=30&ssl_options=%7B%27keyfile%27%3A+%27%2Fetc%2Fssl%2Fmykey.pem%27%2C+%27certfile%27%3A+%27%2Fetc%2Fssl%2Fmycert.pem%27%7D

This URL represents a simple authenticated connection, a host named rabbit1, a virtual host called web_messages, and an explicit heartbeat_interval of 30 seconds; an explicit SSL certificates for the secure connection setup.

To use the URL parameters method, we simply use Pika:

pika.URLParameters('amqps://www:pwd@rabbit1/web_messages')

Here are some other options you can set up at the query-param level:

  • backpressure_detection: This disabled by default. Pass a value of that specifies how to handle clients that are too fast (previously, flow control in RabbitMQ).
  • channel_ma: This is the maximum number of channels allowed for this connection.
  • connection_attempts: This is default 1.
  • frame_max: This is maximum frame size and is useful for performance tuning.
  • heartbeat_interval: This is the client/server heartbeat interval. In the past, a small value of 5 seconds used to be good, but today, it is encouraged to go with a higher value of 30.
  • locale: This is the client locale and is useful if you use a different locale.
  • retry_delay: This is the time of seconds to wait between connection retries. It usually goes with connection_attempts.
  • socket_timeou: default 0.25.
  • ssl_options: URL encoded dict of the following keys: ca_certs, cert_reqs, certfile, keyfile, ssl_version.

The next way to connect to a RabbitMQbroker is via an explicit ConnectionParameters object; which is much like what we did so far in our scraper project.

We initialize a ConnectionParameters object simply with Pika:

pika.ConnectionParameters(host='localhost')

However, once we customize it, we can use the same parameter that we just described for the URL parameters method:

pika.ConnectionParameters(host='localhost', heartbeat_interval=30, retry_delay=2)

In either case, once you have a parameter's object in your hands, you can pass it down to the actual connection strategy that you've chosen, and then grab a connection:

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

Using connection adapters

In the previous examples, BlockingConnection represents a connection adapter. A connection adapter is the abstraction that Pika uses in order to hide away the actual strategy it uses for connections. That is, whether it is being a blocking connection, an async I/O event loop, or plugging into the back end that you're using, such as Tornado or Twisted.

To cover most cases, you should only focus on the standard BlockingConnection, and the SelectConnection adapters.

BlockingConnection

Let's survey the API of BlockingConnection (some of these may directly be relevant to other types of connections, such as SelectConnection, since they are both a connection). Following are the parameters of BlockingConnection:

  • add_backpressure_callback: This adds a callback to be called when the client experiences a backpressure event from the broker
  • basic_nack: This finds out if the broker supports nack with this
  • channel: This creates a new channel
  • close: This disconnects and returns reply_code and reply_text (both are optional)
  • is_closed/is_closing/is_open: This finds out whether the connection is open, closed, or is closing with these

BlockingChannel

Let's survey the API of BlockingChannel. This channel is the concept that you'll work against mostly in your programming with RabbitMQ:

  • add_on_close_callback: This adds a callback to be called when the channel gets closed.
  • add_on_flow_callback: This adds a callback to be called when the client receives a flow control event.
  • add_on_return_callback: This adds a callback to be called when the publishing client gets a rejected publish from the server. This is a useful callback to set up, and many don't really treat failure for publishing with RabbitMQ. It can create a very creepy environment for a really hard bug in production.
  • basic_ack / basic_nack: This one is important. When you're using acknowledge in your message processing semantics, you'll have to use this. When calling this, you should always specify delivery_tag, which you'll get on the consuming callback handler.
  • basic_consume: This is another big one parameter. When developing consumers and workers, it is most likely that you'll end up just using basic_consume. The API is tight enough to suffice in most use cases. With basic_consume you should also list out the following:
    • The consumer_callback that is your handler function
    • The queue that is your queue name
    • The no_ack that tells the broker whether we want to acknowledge or not (Boolean)
    • The exclusive means 'not to allow any other on this queue (boolean)
    • The consumer_tag is your own consumer tag, mostly don't use this
    • The arguments is custom arguments for consume, mostly don't use this
  • basic_get: This is just as basic_consume, but it gets a single message right then and there, including the queue and no_ack parameters.
  • basic_publish: This is the main entry point to push messages onto the RabbitMQ broker. Here, we have several important parameters to describe; they're as follows:
    • exchange: This is used to publish.
    • routing_key: This is the routing key to bind on.
    • body: This is the message body.
    • mandatory: If this is false, the server silently drops a message that cannot be routed to a queue; otherwise it will signal the client.
    • Immediate: Same as preceding, but now the server will queue the message with no guarantees.
    • basic_qos– Through this API, the client can control the flow of messages in order to tweak and control the overall performance of processing messages. It can tell the broker to send less messages on a batch, a smaller or bigger messages batch size, and can tell whether to apply to all channels:
    • prefetch_size: This is the window size in terms of message size. It is invalid when specifying no_ack.
    • prefetch_count: This is the window size in terms of all the messages. It is invalid when specifying no_ack.
    • all_channels: This applies rules to all channels.
    • basic_recover: This asks the broker to redeliver all unacknowledged messsages.
  • basic_reject: This is used to reject a message against the broker. Must supply a delivery_tag you can also tell the broker to requeue with the requeue boolean flag.

Declaring queues and exchanges

In the next few bits, we'll look at the Pika API designated for creating or declaring queues and exchanges. You'll usually make these kinds of calls at the prolog of your consumers or producers, and they will mostly feel like a 'setup code'.

Let's take a look at these now:

  • exhange_declare: This creates an exchange if it doesn't already exist. Note that if one exist and you are specifying a new one with different parameters, there will be an error representing that. Here are the important parameters that you can specify:
    • exchange: This is the exchange name
    • exchange_type: This is the type to use (direct, and more); consult the more-detailed RabbitMQ docs for the types and their semantics
    • passive: This is used to see if an exchange exists, but it doesn't create one
    • durable: This is used to survive a broker reboot—persistent exchange
    • auto_delete: Remove this when done using it (no queues bound)
    • internal: This can only be published into by other exchanges
  • queue_declare: This creates a queue if it is not existing with a specified sharing, durability, and other properties, shown as follows:
    • queue: This queues names
    • passive: This is passive name
    • durable: This survives reboots—persistent
    • exclusive: This allows you to share between consumers
    • auto_delete: Delete this after a consumer disconnects
  • queue_bind: This binds a queue to a specified exchange. You should provide the following parameters:
    • queue: This is the queue name
    • exchange: This is the exchange name
    • routing_key: This is the routing key
    • no_wait: Do not wait for a bind ok

This finalizes most of the API that you will use on a day-to-day basis. There's really not much to it, since Pika has been carefully crafted to simplify AMQP, as did other Pika clones (or maybe Pika is a clone of?), such as Ruby's immensely popular bunny library, which I'm deeply fond of.

There are other edge-case API endpoints for the Pika Channel and Connection abstractions, such as deletion of queues, exchanges, unbinding, and cleanups, and more. Should you wish to explore further, refer to the official Pika API documentation.

Authentication

When adopting RabbitMQ universally over your architecture and especially in the enterprise environment, security and authentication comes up immediately.

A few organizations would never even adopt a technology unless it has some widely recognized tiers of security built into them. This is why RabbitMQ grew to support connection authentication and SSL across the board.

Plain credentials

As we've seen with the URL Parameters before, we can use HTTP Basic authentication lined right there on the URL that we pass to Pika. However, we can also use a more programmatic approach using the PlainCredentials object:

importpika

credentials = pika.PlainCredentials('www', 'pwd')
parameters = pika.ConnectionParameters('rabbit1',
5672
'/',
credentials)

From here now, we can pass parameters back to our connection.

SSL and external credentials

For those requiring a more secure authentication model, RabbitMQ and Pika allow an SSL-based connection.

Since we haven't seen it before, let's describe how we can get a secure connection through the URL Parameters method. To do this, we need to use a special amqps scheme (the part that specifies what we know as the protocol in the URL):

amqps_URI      = "amqps://" amqp_authority [ "/" vhost ]

This means we can draw up URLs like this:

amqps://user:pass@host:10000/vhost

Certificate authentication

To get at this kind of functionality programmatically, we need to use the External Credential object. We'll also take advantage of this example to show how to specify custom SSL certificates along the line:

ssl_options = ({"ca_certs": "caroot.pem",
               "certfile": "client.pem",
               "keyfile": "key.pem"})

parameters = pika.ConnectionParameters(
host,
      5671,credentials=ExternalCredentials(),
ssl=True,
ssl_options=ssl_options)

Here, we are specifying ssl_options including CA-Root, a Client pem, and our actual key. We are switching credentials to External with ExternalCredentials since we have the help of PKI (private keys and certificates), which is arguably much stronger than a simple user/password combination.

Note that if we double-back to the URL method, we can again opt to use the pem keys and certificates to specify our SSL options in the same way. To do this, we'll need to encode our ssl_optionsdict right there on our URL using urllib:

url = urllib.urlencode({'ssl_options': {'ca_certs': 'caroot.pem', 'certfile': 'client.pem', 'keyfile': 'key.pem'}})

Background processing

This concludes our exploration of the Pika API. Still, there are other Pika gems to be found within its API, but rest assured, not many. You're free to explore the nitty-gritty details at the official ReadTheDocs API, available at https://pika.readthedocs.org.

We'll continue by jumping a lot higher in the abstraction model and move up to the big shot tools that'll allow us to do background or queue-based processing without really getting our hands dirty with the AMQP protocol or even close.

Having just explored the Pika API gives you immense advantage over anyone else on this same space, since you know how stuff works, and well, sometimes, you don't need a very big hammer for a very small nail.

And now, let's explore big hammers.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset