This recipe presents a pattern for using execnet
to process a list in parallel. It's a function pattern for mapping each element in the list to a new value, using execnet
to do the mapping in parallel.
First, we need to decide exactly what we want to do. In this example, we'll just double integers, but we could do any pure computation. Following is the remote_double.py
module, which will be executed by execnet
. It receives a 2-tuple of (i, arg)
, assumes arg
is a number, and sends back (i, arg*2)
. The need for i
will be explained in the next section.
if __name__ == '__channelexec__': for (i, arg) in channel: channel.send((i, arg * 2))
To use this module to double every element in a list, we import the plists
module (explained in the How it works... section) and call plists.map()
with the remote_double
module, and a list of integers to double.
>>> import plists, remote_double >>> plists.map(remote_double, range(10)) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Communication between channels is very simple, as shown in the following diagram:
The map()
function is defined in plists.py
. It takes a pure module, a list of arguments, and an optional list of 2-tuples consisting of (spec, count)
. The default specs are [('popen', 2)]
, which means we'll open two local gateways and channels. Once these channels are opened, we put them into an itertools
cycle, which creates an infinite iterator that cycles back to the beginning once it hits the end.
Now we can send each argument in args
to a channel
for processing, and since the channels are cycled, each channel gets an almost even distribution of arguments. This is where i
comes in—we don't know in what order we'll get the results back, so i
, as the index of each arg
in the list, is passed to the channel and back so we can combine the results in the original order. We then wait for the results with a MultiChannel
receive queue and insert them into a prefilled list that's the same length as the original args
. Once we have all the expected results, we can exit the gateways and return the results:
import itertools, execnet def map(mod, args, specs=[('popen', 2)]): gateways = [] channels = [] for spec, count in specs: for i in range(count): gw = execnet.makegateway(spec) gateways.append(gw) channels.append(gw.remote_exec(mod)) cyc = itertools.cycle(channels) for i, arg in enumerate(args): channel = next(cyc) channel.send((i, arg)) mch = execnet.MultiChannel(channels) queue = mch.make_receive_queue() l = len(args) results = [None] * l # creates a list of length l, where every element is None for i in range(l): channel, (i, result) = queue.get() results[i] = result for gw in gateways: gw.exit() return results
You can increase the parallelization by modifying the specs, as follows:
>>> plists.map(remote_double, range(10), [('popen', 4)]) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
However, more parallelization does not necessarily mean faster processing. It depends on the available resources, and the more gateways and channels you have open, the more overhead is required. Ideally, there should be one gateway and channel per CPU core to get maximum resource utilization.
You can use plists.map()
with any pure module as long as it receives and sends back 2-tuples where i
is the first element. This pattern is most useful when you have a bunch of numbers to crunch and want to process them as quickly as possible.