The custom ZeroMQ cluster

In this section, we will create a task pipeline, which is a messaging pattern that we mentioned briefly when discussing the Disco project's answer to MapReduce. Task pipelines can be viewed as a generalization of MapReduce in that they support data flows through a sequence of steps, where each step provides the results from its processing to the next step in the flow. We will accomplish this by using ZeroMQ to create a messaging topology that is suitable for the execution of embarrassingly parallel code in a number of worker processes.

Note

The descriptive term embarrassingly parallel was adopted in online parallelization discussions after its use in an article named Matrix Computation on Distributed Memory Multiprocessors that was written by Cleve Moler. The parallelizing of serially biased code is notoriously difficult, and problems that were obviously or easily parallelizable were described by using this phrase.

ZeroMQ is an asynchronous messaging framework, which evolved from the experience and lessons learned from the work on the Advanced Message Queuing Protocol (AMQP). Its primary purpose is to allow programmers to quickly and easily interconnect various software components from a system written in any networking-capable programming language.

We will adapt some Python examples from the ZeroMQ Guide (from the Divide and Conquer and Handling Errors and ETERM sections) to create a task pipeline cluster. We will then use it to estimate the value of π.

Estimating the value of π

Blaise Barney from Lawrence Livermore National Laboratory created a series of pages and tutorials on the topic of parallel computing that offers as an example a means to estimate the value of π by using a parallel approach. The method used is as follows:

  1. Inscribe a circle in a square.
  2. Randomly generate points in the square.
  3. Determine the number of points in the square that fall within the inscribed circle.
  4. Let π be represented by the ratio of the areas.

This can be clearly and unambiguously expressed by using the equations for the area of the square and the circle. We will start with the following equations:

Estimating the value of π

We can find the value for r2 in both the preceding cases, as follows:

Estimating the value of π

Knowing that each value of r2 is equal to the other, we may now solve for π, as follows:

Estimating the value of π

Letting the random placement of points in the respective areas uniformly represent the areas themselves, we can use this to estimate the value of π.

A graphic representation of this may be helpful. The following code assumes that you have opened and run the notebook for this chapter and executed the first few cells, which import all the necessary libraries:

In [4]: xs = np.random.uniform(-1, 1, 10000)
        ys = np.random.uniform(-1, 1, 10000)
        points_inside = []
        points_outside = []
        for point in zip(xs, ys):
            (x, y) = point
            if (x ** 2 + y ** 2) <= 1:
                points_inside.append(point)
            else:
                points_outside.append(point)

The preceding code generates random numbers for the x and y values between -1 and 1. Using the equation of a unit circle, it then calculates whether the points constructed from these x and y values fall within the area of the circle or not.

Let's plot these two sets of points by using the following code:

In [5]: (figure, axes) = plt.subplots(figsize=(10,10))
        axes.scatter(*zip(*points_inside), color=colors[1], 
                     alpha=0.5)
        axes.scatter(*zip(*points_outside), color=colors[0],
                     alpha=0.5)
        circle = plt.Circle((0, 0), 1, fill=False)
        axes.set_xlim((-1.1, 1.1))
        axes.set_ylim((-1.1, 1.1))
        axes.add_artist(circle)
        axes.set_title("Visualization of estimating $pi$", 
                       fontsize=28)
        nbutil.hide_axes(axes)
        plt.show()

The following plot (Visualization of estimating π) is the result of the preceding code:

Estimating the value of π

The C and Fortran π estimation code provided in the example given in the Lawrence Livermore National Laboratory (LLNL) parallel programming tutorial can be easily converted to Python, as follows:

In [6]: def estimate_pi(points):
            in_circle = 0
            for _ in range(int(points)):
                (x, y) = (random.random(), random.random())
                if (x ** 2 + y ** 2) <= 1:
                    in_circle += 1
            return 4 * in_circle / points

When we run the preceding code with 100 million points and track the execution time on a moderate workstation, it takes over a minute, which is depicted in the following code:

In [7]: %%time
        print(estimate_pi(1e8))
3.14157116
CPU times: user 1min 10s, sys: 151 ms, total: 1min 10s
Wall time: 1min 11s

The preceding code offers a baseline against which we will soon be able to compare our parallel results.

Creating the ZeroMQ components

In order to use the ZeroMQ pipeline pattern (dispatching tasks to workers and having the workers forward the results to a collector), we'll need to create each of the following components:

  • A distributor
  • Multiple instances of a worker
  • A collector (for the results)

As stated before, the code for the components were adapted from the code from the ZeroMQ guide, which was originally contributed by Lev Givon and Jeremy Avnet. We've made several changes, one of which is to utilize the Python multiprocessing module, which lets us run the example in a single terminal window rather than three. Naturally, this also lets us easily run the example in an IPython Notebook.

We've decided to implement the cluster in several small modules in order to keep things as clear and organized as possible. This should make it fairly obvious which code is responsible for which functionality. All the modules are available in the lib directory of the repository for this chapter's notebook. We'll start where everything begins—the demo module:

import multiprocessing, random, time
import zmq
import collector, democfg, distributor, worker

def main(tasks):
    # Initialize random number generator
    random.seed()
    print("Starting task workers ...")
    for worker_id in range(democfg.worker_pool_size):
        worker_proc = multiprocessing.Process(
            target=worker.work, args=[worker_id])
        worker_proc.start()
    print("Starting task collector ...")
    collector_proc = multiprocessing.Process(
           target=collector.collect)
    collector_proc.start()
    time.sleep(democfg.pause_time)
    print("Starting task distributor ...")
    distributor.distribute(tasks)

if __name__ == "__main__":
    tasks = [1e8 / democfg.task_count] * democfg.task_count
    main(tasks)

The main function performs the following tasks:

  • It creates a worker pool with a worker count, as configured in the democfg module
  • It starts the collector process, which will be responsible for the processing of results from the workers
  • It starts the distributor process, passing it the tasks that are defined at the end of the module before the main function is called

Though you can't tell right now, the distributor.distribute function is the one that starts the process of passing messages across the cluster. Now, let's take a look at this code in the distributor module:

import time
import zmq
import democfg

def distribute(tasks):
    context = zmq.Context()
    # Socket to send messages on
    sender = context.socket(zmq.PUSH)
    sender.bind(democfg.routing_table["receiver"])
    # Socket with direct access to the sink used to 
    synchronize 
    # start of batch
    syncher = context.socket(zmq.PUSH)
    syncher.connect(democfg.routing_table["sender"])
    # Give 0MQ time to start up
    time.sleep(democfg.pause_time)
    syncher.send(democfg.start_flag)
    for task in tasks:
        sender.send_pyobj(task)
    # Give 0MQ time to deliver message
    time.sleep(democfg.pause_time)

The distribute function takes as an argument a list of tasks that it will be passing to the workers. Before it gives out any jobs though, it sets up the ZeroMQ sockets to communicate with the other components:

  • The sender ZeroMQ socket to pass jobs
  • The syncher ZeroMQ socket to kick off the batch

If you're not familiar with ZeroMQ, don't be alarmed that the worker.work function isn't called. Instead, what happens is this—a message gets passed to the ZeroMQ queue that the worker is listening. As we'll shortly see, the work function will pull task data off the queue to which the distribute function sent it.

Next up, the worker:

import random
import zmq
import democfg, tasker

def is_done(socks, controller):
    if (socks.get(controller) == zmq.POLLIN and
        controller.recv() == democfg.done_msg):
        return True
    return False

def work(worker_id):
    context = zmq.Context()
    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.connect(democfg.routing_table["receiver"])
    # Socket to send messages to
    sender = context.socket(zmq.PUSH)
    sender.connect(democfg.routing_table["sender"])
    # Socket for control input
    controller = context.socket(zmq.SUB)
    controller.connect(democfg.routing_table ["controller"])
    controller.setsockopt(zmq.SUBSCRIBE, b"")
    # Process messages from receiver and controller
    poller = zmq.Poller()
    poller.register(receiver, zmq.POLLIN)
    poller.register(controller, zmq.POLLIN)
    # Process messages from both sockets
    run_loop = True
    while run_loop:
        socks = dict(poller.poll())
        if socks.get(receiver) == zmq.POLLIN:
            task_data = receiver.recv_pyobj()
            # Process task data
            result = tasker.task(task_data)
            sender.send_pyobj(result)
        if is_done(socks, controller):
            run_loop = False

The worker.work function creates a worker that listens on a ZeroMQ PULL connection, where it will take the task data that the distributor PUSH-ed. The worker then calls another function that will do the actual computation for the task. Once this is done, the worker will send the result to the collector.

The code for the worker is a little more complex than the distributor, primarily because the worker polls the following two separate queues:

  • A queue from which it will pull the task data
  • A queue on which it is listening for the control messages—in particular, a stop message

The result that gets sent to the collector is created in a separate tasker module, as follows:

import random
import zmq
import democfg

def task(task_data):
    in_circle = 0
    for _ in range(int(task_data)):
        (x, y) = (random.random(), random.random())
        if (x ** 2 + y ** 2) <= 1:
            in_circle += 1
    return 4 * in_circle / task_data

This code might look familiar. It's very close to the code that we used to create a area plot to demonstrate the method for estimating the value of π. In the tasker module, we will see what the task data, which was originally passed to the main function, is actually used for—it is the number of points that will be created to estimate the value of π.

The worker function executes the task function, and when the function gets the results, it sends a message to the queue upon which the collector is listening. Here's the collector module:

import time
import zmq
import democfg, processor

def collect():
    context = zmq.Context()
    # Socket to receive messages on (collect results from worker)
    receiver = context.socket(zmq.PULL)
    receiver.bind(democfg.routing_table["sender"])
    # Socket for worker control
    controller = context.socket(zmq.PUB)
    controller.bind(democfg.routing_table["controller"])
    # Wait for start signal
    assert receiver.recv() == democfg.start_flag
    processor.process(receiver)
    # Let workers know that all results have been processed
    controller.send(democfg.done_msg)
    # Finished, but give 0MQ time to deliver
    time.sleep(5 * democfg.pause_time)

This code is almost as simple as that for the distributor. The collector.collect function does a few things, which includes the following tasks:

  • It PULL-s the data that was PUSH-ed by the workers
  • It calls processor.process on the data that it PULL-ed
  • It sends a control message to stop the function as soon as it finishes processing the results

Tip

Using time.sleep repeatedly to ensure that the different parts of the system have completed their tasks is a fragile and slow process. As with the official ZeroMQ examples upon which this code is based, it is used here only for pedagogical clarity. A more robust solution will be to use Pub-Sub synchronization. For more information, see the ZeroMQ Guide sections, Node-Coordination and Getting the Message Out.

In the same way that we separated the ZeroMQ code and task code in the worker, we will separate the collector code and the code that processes the results. Here is the processor module:

import time
import zmq
import democfg

def process(receiver):
    return get_results(receiver)

def get_results(receiver):
    data = []
    # Process results from the workers
    for _ in range(democfg.task_count):
        result = receiver.recv_pyobj()
        print("Processing result: {}".format(result))
        data.append(result)
    # Calculate final result
    print("Final result: {}".format(sum(data)/len(data)))
    return data

The get_results function's primary purpose is to print out the mean value for π based on the values obtained from all the workers. The fact that we split this into distinct functions may seem useless right now, but this will provide some convenience when we update the code to plot our data.

We will take a moment here to review what we've seen:

  • The source code that initializes the cluster components (the demo module)
  • The source code for each of the components of the ZeroMQ pipeline cluster—the distributor, worker, and collector
  • The source code that does the actual calculation (in the tasker module)
  • The source code that renders the final result (in the processor module)

We could have easily left the tasker code in the worker module and the processor code in the collector module. So why did we split these out? This separation highlights the very different concepts of each. The worker.work function doesn't care what it does. The work that it represents is abstract in nature—tell me what to do, and I'll do it. The same goes for collector.collecttell me what to collect, and I will do that. The distributor, worker, and collector are not concerned with the execution of the Python code and the returning of the results. Instead, they are concerned with the sending and receiving of messages in various special-purpose queues that we set up. These three components are the message-passing architecture, which is the core of the pipeline cluster.

On the other hand, the tasker.task and processor.process functions are the ones that care about executing the Python code and gathering results. What they do is completely unrelated to what the pipeline architecture code does, so much so that we can rip them out and insert the new task and process functions that solve completely different problems. All of this can be accomplished without touching the pipeline modules. In fact, we will do a bit of this at the end of the next section.

We discussed the message-passing pipeline architecture and the conceptually orthogonal computational work done by the other modules, but we need to make sure that we don't lose sight of another key element—the parallelization. Where does it live?

When we start the program by calling main, we don't give any indication of what gets parallelized. Also, we don't explicitly state where the parallelization occurs in any of the modules. In fact, it is spread across a couple of features, some of which are as follows:

  • Before we call main, we divide the original value—the total number of points that we want to process—by the number of workers that we will be creating
  • These split point counts are then sent to the distributor component, which iterates over these and sends them (via a queue) to the workers
  • Finally, the results are then assembled at the end in the collector component

It is important to note that it is these three conceptual elements that work together that carry out the hard work of finding the solution to a parallelizable problem. However, the last bit of hidden implicitness is the most difficult of all—identifying the problems that may be parallelized and then figuring out how to do so. We brushed over this in the introduction to this section, but it is without doubt where all the hard work lies.

Before we try implementing the cluster, there's one more module that we should look at so that there's no mystery and the source is shared—the democfg module that is referenced by the others:

worker_pool_size = 8
task_count = 8
delay_range = (1, 100) # milliseconds
pause_time = 1 # seconds
start_flag = b"0"
done_msg = b"DONE"
routing_table = {
    "receiver": "tcp://127.0.0.1:7557",
    "sender": "tcp://127.0.0.1:7558",
    "controller": "tcp://127.0.0.1:7559"}

With this bit of code, we have reviewed all the components of our system and are ready to run it!

Working with the results

If you review the code in the demo module, you may recall that when the module is called from the command line, it performs the following tasks:

  • It gets a total task count
  • It partitions this total task count into equal segments that represent the task
  • It calls the main function and passes this partitioned data

Let's try calling the demo module now:

In [15]: %%time
         %%bash
         python ../lib/demo.py
Starting task workers ...
Starting task collector ...
Starting task distributor ...
Processing result: 3.14210336
Processing result: 3.1415472
Processing result: 3.14151648
Processing result: 3.14219872
Processing result: 3.14187168
Processing result: 3.14149984
Processing result: 3.14178592
Processing result: 3.14194304
Final result: 3.14180828
CPU times: user 3.05 ms, sys: 8.41 ms, total: 11.5 ms
Wall time: 25.4 s

As you might have expected, the parallel version runs faster than—more than twice as fast as—the single-process version. However, we don't even get close to eight times as fast. From this, you can infer that there is overhead above and beyond the simple computation that we parallelized. It is likely that this is due to our extensive use of time.sleep. Generally though, to understand exactly what impacts the performance of any given code, we need to profile the code and analyze the results. This is left as an exercise for curious readers, as a great deal of literature was produced in the past half century of computing research on parallel computing and its performance characteristics.

We will set this aside though and connect our example to matplotlib. The next thing that we'd like to be able to do is plot our results. However, our data is handled in separate processes. We can use any number of approaches to solve this problem (for examples, anything from databases to creating an on-notebook ZeroMQ socket to receive data), but let's do something simple instead. Make the following changes to ./lib/processor.py:

import numpy as np

def process(receiver):
    results = get_results(receiver)
    np.save("../data/results.npy", np.array(results))
    return results

We haven't muddied the logic coded in the get_results function, and because we're still using the process function, no code in the other modules needs to be updated. We do need to reload some modules though, as follows:

In [16]: import imp
         imp.reload(collector)
         imp.reload(processor)
         imp.reload(demo)
Out[16]: <module 'demo' from '../lib/demo.py'> 

Let's run the demo.main function in the following way, passing it some tasks, the results of which will be saved to our NumPy file:

In [17]: tasks = [1e8 / democfg.task_count] * democfg.task_count
         demo.main(tasks)
Starting task workers ...
Starting task collector ...
Starting task distributor ...

If you didn't see the expected output and you've checked the system process table to find out that eight Python processes are not running at high CPU specs, then the job is finished and you might need to flush stdout, as follows:

In [18]: sys.stdout.flush()
Processing result: 3.14124992
Processing result: 3.14037024
Processing result: 3.1411328
Processing result: 3.14301344
Processing result: 3.1412272
Processing result: 3.14260256
Processing result: 3.14093152
Processing result: 3.1416672
Final result: 3.14152436

With the data saved, we can load it and plot it, as follows:

In [19]: results = np.load("../data/results.npy")
         (figure, axes) = plt.subplots(figsize=(16, 8))
         axes.scatter([x + 1 for x in range(len(results))],
                      results,
                      color=colors[0])
         axes.set_title("Estimated values of $pi$", fontsize=28)
         axes.set_ylabel("~$pi$", fontsize=24)
         axes.set_xlabel("Worker Number", fontsize=20)
         plt.show()

The following plot is the result of the preceding code:

Working with the results

This brings us to the end of our ZeroMQ cluster exploration, but it serves as an excellent introduction because IPython also uses ZeroMQ for its clustering features, and this is what we shall discuss next.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset