Distributing Python with Celery and Redis

The following is a simple application that uses Redis as a broker, where consumer.py sends messages to producer.py. Both the consumer and the producer require this configuration, where you can use the database with number 0 of your local Redis installation:

app = Celery (‘celery_tasks',broker='redis://localhost:6379/0',backend='redis://localhost:6379/0')

To establish a connection with Celery, we need to set broker and backend parameters. The broker parameter allows specification of the server address, where the tasks are stored and the backend parameter is the address where Celery puts the results so that we can use them in our application. In this case, both addresses are the same, executing in localhost in the same port.

To start building things with Celery, we will first need to create a Celery application with the following command:

$ celery -A producer worker --loglevel=debug --concurrency=4 --pool=solo

The different options that can be used when starting a worker are detailed in the Celery documentation, which can be found at http://docs.celeryproject.org/en/latest/reference/celery.bin.worker.html#module-celery.bin.worker.

In the following screenshot, we can see the execution of the previous command:

After that, Celery needs to know what kind of tasks it can execute. For this, we have to register the tasks for the Celery application. This is the content of producer.py, which exposes a task called task_execution that takes five seconds before printing the result. We will do this with the @app.task decorator.

You can find the following code in the producer.py file:

!/usr/bin/python3

from celery import Celery
from time import sleep

app = Celery('celery_tasks',broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def task_execution(message):
sleep(5)
print('Message received: %s' % message)

The following is the consumer.py code. All it does is receive a message from the console and send it to the producer:

#!/usr/bin/python3
from producer import task_execution

while True:
message = input('Enter Message> ')
task_execution.delay(message)

The consumer runs with the following command:

$ python consumer.py
Enter Message> This is my message

When writing a message to the consumer, you can see that the producer receives it and, after 5 seconds, prints it. The interesting thing is that the consumer does not need to wait 5 seconds—it is instantly available to process another message. If the producer receives many messages, then they are added to the message queue.

Also, keep in mind that the records are now in the standard output of the Celery processes, so check them out at the appropriate Terminal.

In the following screenshot, you can see the output when you send a message from the consumer Terminal in debug mode:

In the following screenshot, you can see the output when you send a message from the consumer Terminal with info mode, --loglevel=info:

There is the option to put the consumer and producer in the script.

You can find the following code in the demo_celery.py file:

#!/usr/bin/python3

# Celery full example: publisher/subscriber
from celery import Celery

# Redis
app = Celery('demo_celery', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def task_execution(message,count):
array=[]
print('Message received: %s' % message)
for index in range(0,int(count)):
array.append(message)
return (array)

In the previous code block, we defined our Celery application using redis as a message broker. The task_execution() method is annotated with @app.task. This method will add the message in array that will return.

In the next code block, we define infinite loop to request user message. For each message, it generates a task calling the task_execution() method:

def main():
while True:
message = input('Enter Message> ')
count = input('Enter times appears the message> ')
promise = task_execution.delay(message,count)

if __name__ == '__main__':
main()

In this example, we are using the eventlet event manager. You can install it with the pip install eventlet command. With the -P gevent command parameter, we can execute Celery with the following manager event:

$ celery -A demo_celery worker --loglevel=debug --concurrency=4 -P gevent

This is the output when you enter the number of message and times you want it to appear in the message.

In the following screenshot, we can see how its execution returns an array with the message repeated as many times as you have entered:

In this section, you have learned about the Celery and Redis projects for building applications. They allow you to send messages between a consumer and a producer with the help of a broker as a mechanism, which allows a consumer to store pending tasks.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset