Parallel list processing with execnet

This recipe presents a pattern for using execnet to process a list in parallel. It's a function pattern for mapping each element in the list to a new value, using execnet to do the mapping in parallel.

How to do it...

First, we need to decide exactly what we want to do. In this example, we'll just double integers, but we could do any pure computation. Following is the remote_double.py module, which will be executed by execnet. It receives a 2-tuple of (i, arg), assumes arg is a number, and sends back (i, arg*2). The need for i will be explained in the next section.

if __name__ == '__channelexec__':
  for (i, arg) in channel:
    channel.send((i, arg * 2))

To use this module to double every element in a list, we import the plists module (explained in the How it works... section) and call plists.map() with the remote_double module, and a list of integers to double.

>>> import plists, remote_double
>>> plists.map(remote_double, range(10))
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Communication between channels is very simple, as shown in the following diagram:

How to do it...

How it works...

The map() function is defined in plists.py. It takes a pure module, a list of arguments, and an optional list of 2-tuples consisting of (spec, count). The default specs are [('popen', 2)], which means we'll open two local gateways and channels. Once these channels are opened, we put them into an itertools cycle, which creates an infinite iterator that cycles back to the beginning once it hits the end.

Now we can send each argument in args to a channel for processing, and since the channels are cycled, each channel gets an almost even distribution of arguments. This is where i comes in—we don't know in what order we'll get the results back, so i, as the index of each arg in the list, is passed to the channel and back so we can combine the results in the original order. We then wait for the results with a MultiChannel receive queue and insert them into a prefilled list that's the same length as the original args. Once we have all the expected results, we can exit the gateways and return the results:

import itertools, execnet

def map(mod, args, specs=[('popen', 2)]):

  gateways = []
  channels = []

  
  for spec, count in specs:
    for i in range(count):
      gw = execnet.makegateway(spec)
      gateways.append(gw)
      channels.append(gw.remote_exec(mod))
  
  cyc = itertools.cycle(channels)
  
  for i, arg in enumerate(args):
    channel = next(cyc)
    channel.send((i, arg))
  
  mch = execnet.MultiChannel(channels)
  queue = mch.make_receive_queue()
  l = len(args)
  results = [None] * l # creates a list of length l, where every element is None
  
  for i in range(l):
    channel, (i, result) = queue.get()
    results[i] = result
  
  for gw in gateways:
    gw.exit()
  
  return results

There's more...

You can increase the parallelization by modifying the specs, as follows:

>>> plists.map(remote_double, range(10), [('popen', 4)])
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

However, more parallelization does not necessarily mean faster processing. It depends on the available resources, and the more gateways and channels you have open, the more overhead is required. Ideally, there should be one gateway and channel per CPU core to get maximum resource utilization.

You can use plists.map() with any pure module as long as it receives and sends back 2-tuples where i is the first element. This pattern is most useful when you have a bunch of numbers to crunch and want to process them as quickly as possible.

See also

The previous two recipes cover execnet features in greater detail.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset