In this section, we will create a task pipeline, which is a messaging pattern that we mentioned briefly when discussing the Disco project's answer to MapReduce. Task pipelines can be viewed as a generalization of MapReduce in that they support data flows through a sequence of steps, where each step provides the results from its processing to the next step in the flow. We will accomplish this by using ZeroMQ to create a messaging topology that is suitable for the execution of embarrassingly parallel code in a number of worker processes.
The descriptive term embarrassingly parallel was adopted in online parallelization discussions after its use in an article named Matrix Computation on Distributed Memory Multiprocessors that was written by Cleve Moler. The parallelizing of serially biased code is notoriously difficult, and problems that were obviously or easily parallelizable were described by using this phrase.
ZeroMQ is an asynchronous messaging framework, which evolved from the experience and lessons learned from the work on the Advanced Message Queuing Protocol (AMQP). Its primary purpose is to allow programmers to quickly and easily interconnect various software components from a system written in any networking-capable programming language.
We will adapt some Python examples from the ZeroMQ Guide (from the Divide and Conquer and Handling Errors and ETERM sections) to create a task pipeline cluster. We will then use it to estimate the value of π.
Blaise Barney from Lawrence Livermore National Laboratory created a series of pages and tutorials on the topic of parallel computing that offers as an example a means to estimate the value of π by using a parallel approach. The method used is as follows:
This can be clearly and unambiguously expressed by using the equations for the area of the square and the circle. We will start with the following equations:
We can find the value for r2 in both the preceding cases, as follows:
Knowing that each value of r2 is equal to the other, we may now solve for π, as follows:
Letting the random placement of points in the respective areas uniformly represent the areas themselves, we can use this to estimate the value of π.
A graphic representation of this may be helpful. The following code assumes that you have opened and run the notebook for this chapter and executed the first few cells, which import all the necessary libraries:
In [4]: xs = np.random.uniform(-1, 1, 10000) ys = np.random.uniform(-1, 1, 10000) points_inside = [] points_outside = [] for point in zip(xs, ys): (x, y) = point if (x ** 2 + y ** 2) <= 1: points_inside.append(point) else: points_outside.append(point)
The preceding code generates random numbers for the x and y values between -1
and 1
. Using the equation of a unit circle, it then calculates whether the points constructed from these x and y values fall within the area of the circle or not.
Let's plot these two sets of points by using the following code:
In [5]: (figure, axes) = plt.subplots(figsize=(10,10)) axes.scatter(*zip(*points_inside), color=colors[1], alpha=0.5) axes.scatter(*zip(*points_outside), color=colors[0], alpha=0.5) circle = plt.Circle((0, 0), 1, fill=False) axes.set_xlim((-1.1, 1.1)) axes.set_ylim((-1.1, 1.1)) axes.add_artist(circle) axes.set_title("Visualization of estimating $pi$", fontsize=28) nbutil.hide_axes(axes) plt.show()
The following plot (Visualization of estimating π) is the result of the preceding code:
The C and Fortran π estimation code provided in the example given in the Lawrence Livermore National Laboratory (LLNL) parallel programming tutorial can be easily converted to Python, as follows:
In [6]: def estimate_pi(points): in_circle = 0 for _ in range(int(points)): (x, y) = (random.random(), random.random()) if (x ** 2 + y ** 2) <= 1: in_circle += 1 return 4 * in_circle / points
When we run the preceding code with 100 million points and track the execution time on a moderate workstation, it takes over a minute, which is depicted in the following code:
In [7]: %%time print(estimate_pi(1e8)) 3.14157116 CPU times: user 1min 10s, sys: 151 ms, total: 1min 10s Wall time: 1min 11s
The preceding code offers a baseline against which we will soon be able to compare our parallel results.
In order to use the ZeroMQ pipeline pattern (dispatching tasks to workers and having the workers forward the results to a collector), we'll need to create each of the following components:
As stated before, the code for the components were adapted from the code from the ZeroMQ guide, which was originally contributed by Lev Givon and Jeremy Avnet. We've made several changes, one of which is to utilize the Python multiprocessing
module, which lets us run the example in a single terminal window rather than three. Naturally, this also lets us easily run the example in an IPython Notebook.
We've decided to implement the cluster in several small modules in order to keep things as clear and organized as possible. This should make it fairly obvious which code is responsible for which functionality. All the modules are available in the lib
directory of the repository for this chapter's notebook. We'll start where everything begins—the demo
module:
import multiprocessing, random, time import zmq import collector, democfg, distributor, worker def main(tasks): # Initialize random number generator random.seed() print("Starting task workers ...") for worker_id in range(democfg.worker_pool_size): worker_proc = multiprocessing.Process( target=worker.work, args=[worker_id]) worker_proc.start() print("Starting task collector ...") collector_proc = multiprocessing.Process( target=collector.collect) collector_proc.start() time.sleep(democfg.pause_time) print("Starting task distributor ...") distributor.distribute(tasks) if __name__ == "__main__": tasks = [1e8 / democfg.task_count] * democfg.task_count main(tasks)
The main
function performs the following tasks:
democfg
modulemain
function is calledThough you can't tell right now, the distributor.distribute
function is the one that starts the process of passing messages across the cluster. Now, let's take a look at this code in the distributor
module:
import time import zmq import democfg def distribute(tasks): context = zmq.Context() # Socket to send messages on sender = context.socket(zmq.PUSH) sender.bind(democfg.routing_table["receiver"]) # Socket with direct access to the sink used to synchronize # start of batch syncher = context.socket(zmq.PUSH) syncher.connect(democfg.routing_table["sender"]) # Give 0MQ time to start up time.sleep(democfg.pause_time) syncher.send(democfg.start_flag) for task in tasks: sender.send_pyobj(task) # Give 0MQ time to deliver message time.sleep(democfg.pause_time)
The distribute
function takes as an argument a list of tasks that it will be passing to the workers. Before it gives out any jobs though, it sets up the ZeroMQ sockets to communicate with the other components:
sender
ZeroMQ socket to pass jobssyncher
ZeroMQ socket to kick off the batchIf you're not familiar with ZeroMQ, don't be alarmed that the worker.work
function isn't called. Instead, what happens is this—a message gets passed to the ZeroMQ queue that the worker is listening. As we'll shortly see, the work function will pull task data off the queue to which the distribute
function sent it.
Next up, the worker:
import random import zmq import democfg, tasker def is_done(socks, controller): if (socks.get(controller) == zmq.POLLIN and controller.recv() == democfg.done_msg): return True return False def work(worker_id): context = zmq.Context() # Socket to receive messages on receiver = context.socket(zmq.PULL) receiver.connect(democfg.routing_table["receiver"]) # Socket to send messages to sender = context.socket(zmq.PUSH) sender.connect(democfg.routing_table["sender"]) # Socket for control input controller = context.socket(zmq.SUB) controller.connect(democfg.routing_table ["controller"]) controller.setsockopt(zmq.SUBSCRIBE, b"") # Process messages from receiver and controller poller = zmq.Poller() poller.register(receiver, zmq.POLLIN) poller.register(controller, zmq.POLLIN) # Process messages from both sockets run_loop = True while run_loop: socks = dict(poller.poll()) if socks.get(receiver) == zmq.POLLIN: task_data = receiver.recv_pyobj() # Process task data result = tasker.task(task_data) sender.send_pyobj(result) if is_done(socks, controller): run_loop = False
The worker.work
function creates a worker that listens on a ZeroMQ PULL
connection, where it will take the task data that the distributor PUSH
-ed. The worker then calls another function that will do the actual computation for the task. Once this is done, the worker will send the result to the collector.
The code for the worker is a little more complex than the distributor, primarily because the worker polls the following two separate queues:
control
messages—in particular, a stop
messageThe result that gets sent to the collector is created in a separate tasker
module, as follows:
import random import zmq import democfg def task(task_data): in_circle = 0 for _ in range(int(task_data)): (x, y) = (random.random(), random.random()) if (x ** 2 + y ** 2) <= 1: in_circle += 1 return 4 * in_circle / task_data
This code might look familiar. It's very close to the code that we used to create a area plot to demonstrate the method for estimating the value of π. In the tasker
module, we will see what the task data, which was originally passed to the main
function, is actually used for—it is the number of points that will be created to estimate the value of π.
The worker
function executes the task
function, and when the function gets the results, it sends a message to the queue upon which the collector is listening. Here's the collector
module:
import time import zmq import democfg, processor def collect(): context = zmq.Context() # Socket to receive messages on (collect results from worker) receiver = context.socket(zmq.PULL) receiver.bind(democfg.routing_table["sender"]) # Socket for worker control controller = context.socket(zmq.PUB) controller.bind(democfg.routing_table["controller"]) # Wait for start signal assert receiver.recv() == democfg.start_flag processor.process(receiver) # Let workers know that all results have been processed controller.send(democfg.done_msg) # Finished, but give 0MQ time to deliver time.sleep(5 * democfg.pause_time)
This code is almost as simple as that for the distributor. The collector.collect
function does a few things, which includes the following tasks:
PULL
-s the data that was PUSH
-ed by the workersprocessor.process
on the data that it PULL
-edUsing time.sleep
repeatedly to ensure that the different parts of the system have completed their tasks is a fragile and slow process. As with the official ZeroMQ examples upon which this code is based, it is used here only for pedagogical clarity. A more robust solution will be to use
Pub-Sub synchronization. For more information, see the ZeroMQ Guide sections, Node-Coordination and Getting the Message Out.
In the same way that we separated the ZeroMQ code and task code in the worker, we will separate the collector code and the code that processes the results. Here is the processor
module:
import time import zmq import democfg def process(receiver): return get_results(receiver) def get_results(receiver): data = [] # Process results from the workers for _ in range(democfg.task_count): result = receiver.recv_pyobj() print("Processing result: {}".format(result)) data.append(result) # Calculate final result print("Final result: {}".format(sum(data)/len(data))) return data
The get_results
function's primary purpose is to print out the mean value for π based on the values obtained from all the workers. The fact that we split this into distinct functions may seem useless right now, but this will provide some convenience when we update the code to plot our data.
We will take a moment here to review what we've seen:
demo
module)tasker
module)processor
module)We could have easily left the tasker
code in the worker
module and the processor
code in the collector
module. So why did we split these out? This separation highlights the very different concepts of each. The worker.work
function doesn't care what it does. The work that it represents is abstract in nature—tell me what to do, and I'll do it. The same goes for collector.collect
—tell me what to collect, and I will do that. The distributor, worker, and collector are not concerned with the execution of the Python code and the returning of the results. Instead, they are concerned with the sending and receiving of messages in various special-purpose queues that we set up. These three components are the message-passing architecture, which is the core of the pipeline cluster.
On the other hand, the tasker.task
and processor.process
functions are the ones that care about executing the Python code and gathering results. What they do is completely unrelated to what the pipeline architecture code does, so much so that we can rip them out and insert the new task
and process
functions that solve completely different problems. All of this can be accomplished without touching the pipeline modules. In fact, we will do a bit of this at the end of the next section.
We discussed the message-passing pipeline architecture and the conceptually orthogonal computational work done by the other modules, but we need to make sure that we don't lose sight of another key element—the parallelization. Where does it live?
When we start the program by calling main
, we don't give any indication of what gets parallelized. Also, we don't explicitly state where the parallelization occurs in any of the modules. In fact, it is spread across a couple of features, some of which are as follows:
main
, we divide the original value—the total number of points that we want to process—by the number of workers that we will be creatingcollector
componentIt is important to note that it is these three conceptual elements that work together that carry out the hard work of finding the solution to a parallelizable problem. However, the last bit of hidden implicitness is the most difficult of all—identifying the problems that may be parallelized and then figuring out how to do so. We brushed over this in the introduction to this section, but it is without doubt where all the hard work lies.
Before we try implementing the cluster, there's one more module that we should look at so that there's no mystery and the source is shared—the democfg
module that is referenced by the others:
worker_pool_size = 8 task_count = 8 delay_range = (1, 100) # milliseconds pause_time = 1 # seconds start_flag = b"0" done_msg = b"DONE" routing_table = { "receiver": "tcp://127.0.0.1:7557", "sender": "tcp://127.0.0.1:7558", "controller": "tcp://127.0.0.1:7559"}
With this bit of code, we have reviewed all the components of our system and are ready to run it!
If you review the code in the demo
module, you may recall that when the module is called from the command line, it performs the following tasks:
main
function and passes this partitioned dataLet's try calling the demo
module now:
In [15]: %%time %%bash python ../lib/demo.py Starting task workers ... Starting task collector ... Starting task distributor ... Processing result: 3.14210336 Processing result: 3.1415472 Processing result: 3.14151648 Processing result: 3.14219872 Processing result: 3.14187168 Processing result: 3.14149984 Processing result: 3.14178592 Processing result: 3.14194304 Final result: 3.14180828 CPU times: user 3.05 ms, sys: 8.41 ms, total: 11.5 ms Wall time: 25.4 s
As you might have expected, the parallel version runs faster than—more than twice as fast as—the single-process version. However, we don't even get close to eight times as fast. From this, you can infer that there is overhead above and beyond the simple computation that we parallelized. It is likely that this is due to our extensive use of time.sleep
. Generally though, to understand exactly what impacts the performance of any given code, we need to profile the code and analyze the results. This is left as an exercise for curious readers, as a great deal of literature was produced in the past half century of computing research on parallel computing and its performance characteristics.
We will set this aside though and connect our example to matplotlib. The next thing that we'd like to be able to do is plot our results. However, our data is handled in separate processes. We can use any number of approaches to solve this problem (for examples, anything from databases to creating an on-notebook ZeroMQ socket to receive data), but let's do something simple instead. Make the following changes to ./lib/processor.py
:
import numpy as np def process(receiver): results = get_results(receiver) np.save("../data/results.npy", np.array(results)) return results
We haven't muddied the logic coded in the get_results
function, and because we're still using the process
function, no code in the other modules needs to be updated. We do need to reload some modules though, as follows:
In [16]: import imp imp.reload(collector) imp.reload(processor) imp.reload(demo) Out[16]: <module 'demo' from '../lib/demo.py'>
Let's run the demo.main
function in the following way, passing it some tasks, the results of which will be saved to our NumPy file:
In [17]: tasks = [1e8 / democfg.task_count] * democfg.task_count demo.main(tasks) Starting task workers ... Starting task collector ... Starting task distributor ...
If you didn't see the expected output and you've checked the system process table to find out that eight Python processes are not running at high CPU specs, then the job is finished and you might need to flush stdout
, as follows:
In [18]: sys.stdout.flush() Processing result: 3.14124992 Processing result: 3.14037024 Processing result: 3.1411328 Processing result: 3.14301344 Processing result: 3.1412272 Processing result: 3.14260256 Processing result: 3.14093152 Processing result: 3.1416672 Final result: 3.14152436
With the data saved, we can load it and plot it, as follows:
In [19]: results = np.load("../data/results.npy") (figure, axes) = plt.subplots(figsize=(16, 8)) axes.scatter([x + 1 for x in range(len(results))], results, color=colors[0]) axes.set_title("Estimated values of $pi$", fontsize=28) axes.set_ylabel("~$pi$", fontsize=24) axes.set_xlabel("Worker Number", fontsize=20) plt.show()
The following plot is the result of the preceding code:
This brings us to the end of our ZeroMQ cluster exploration, but it serves as an excellent introduction because IPython also uses ZeroMQ for its clustering features, and this is what we shall discuss next.