Distributed tagging with execnet

Execnet is a distributed execution library for Python. It allows you to create gateways and channels for remote code execution. A gateway is a connection from the calling process to a remote environment. The remote environment can be a local subprocess or an SSH connection to a remote node. A channel is created from a gateway and handles communication between the channel creator and the remote code. In this way, execnet is a kind of Message Passing Interface (MPI), where the gateway creates the connection and the channel is used to send messages back and forth.

Since many NLTK processes take 100% CPU during computation, execnet is an ideal way to distribute that computation for maximum resource usage. You can create one gateway per CPU core, and it doesn't matter whether the cores are in your local computer or spread across remote machines. In many situations, you only need to have the trained objects and data on a single machine and can send the objects and data to the remote nodes as needed.

Getting ready

You'll need to install execnet for this to work. It should be as simple as sudo pip install execnet or sudo easy_install execnet. The current version of execnet, as of this writing, is 1.2. The execnet home page, which has API documentation and examples, is at http://codespeak.net/execnet/.

How to do it...

We start by importing the required modules, as well as an additional module, remote_tag.py, that will be explained in the How it works... section. We also need to import pickle so we can serialize (transmit) the tagger. Execnet does not natively know how to deal with complex objects such as a part-of-speech tagger, so we must dump the tagger to a string using pickle.dumps(). We'll use the default tagger that's used by the nltk.tag.pos_tag() function, but you could use any pre-trained part-of-speech tagger as long as it implements the TaggerI interface.

Once we have a serialized tagger, we start execnet by making a gateway with execnet.makegateway(). The default gateway creates a Python subprocess, and we can call the remote_exec() function of the remote_tag module to create a channel. With an open channel, we send over the serialized tagger, followed by the first tokenized sentence of the treebank corpus.

Note

You don't have to do any special serialization of simple types such as lists and tuples, since execnet already knows how to handle serializing the built-in types.

Now, if we call channel.receive(), we get back a tagged sentence that is equivalent to the first tagged sentence in the treebank corpus, so we know the tagging worked. We end by exiting the gateway, which closes the channel and kills the subprocess.

>>> import execnet, remote_tag, nltk.tag, nltk.data
>>> from nltk.corpus import treebank
>>> import pickle
>>> pickled_tagger = pickle.dumps(nltk.data.load(nltk.tag._POS_TAGGER))
>>> gw = execnet.makegateway()
>>> channel = gw.remote_exec(remote_tag)

>>> channel.send(pickled_tagger)
>>> channel.send(treebank.sents()[0])

>>> tagged_sentence = channel.receive()
>>> tagged_sentence == treebank.tagged_sents()[0]
True
>>> gw.exit()

Visually, the communication process looks like this:

How to do it...

How it works...

The gateway's remote_exec() method takes a single argument that can be one of the following three types:

  • A string of code to execute remotely
  • The name of a pure function that will be serialized and executed remotely
  • The name of a pure module whose source will be executed remotely

We use option three with the remote_tag.py module, which is defined as follows:

  import pickle
  
  if __name__ == '__channelexec__':
    tagger = pickle.loads(channel.receive())
    
    for sentence in channel:
      channel.send(tagger.tag(sentence))

A pure module is a module that is self-contained: it can only access Python modules that are available where it executes, and does not have access to any variables or states that exist wherever the gateway is initially created. Similarly, a pure function is a self-contained function, with no external dependencies. To detect that the module is being executed by execnet, you can look at the __name__ variable. If it's equal to '__channelexec__', then it is being used to create a remote channel. This is similar to doing if __name__ == '__main__' to check if a module is being executed on the command line.

The first thing we do is call channel.receive() to get the serialized tagger, which we load using pickle.loads(). You may notice that channel is not imported anywhere—that's because it is included in the global namespace of the module. Any module that execnet executes remotely has access to the channel variable in order to communicate with the channel creator.

Once we have the tagger, we iteratively tag() each tokenized sentence that we receive from the channel. This allows us to tag as many sentences as the sender wants to send, as iteration will not stop until the channel is closed. What we've essentially created is a compute node for part-of-speech tagging that dedicates 100% of its resources to tagging whatever sentences it receives. As long as the channel remains open, the node is available for processing.

There's more...

This is a simple example that opens a single gateway and channel. But execnet can do a lot more, such as opening multiple channels to increase parallel processing, as well as opening gateways to remote hosts over SSH to do distributed processing.

Creating multiple channels

We can create multiple channels, one per gateway, to make the processing more parallel. Each gateway creates a new subprocess (or remote interpreter if using an SSH gateway), and we use one channel per gateway for communication. Once we've created two channels, we can combine them using the MultiChannel class, which allows us to iterate over the channels and make a receive queue to receive messages from each channel.

After creating each channel and sending the tagger, we cycle through the channels to send an even number of sentences to each channel for tagging. Then, we collect all the responses from the queue. A call to queue.get() will return a 2-tuple of (channel, message) in case you need to know which channel the message came from.

Note

If you don't want to wait forever, you can also pass a timeout keyword argument with the maximum number of seconds you want to wait, as in queue.get(timeout=4). This can be a good way to handle network errors.

Once all the tagged sentences have been collected, we can exit the gateways. Here's the code:

>>> import itertools
>>> gw1 = execnet.makegateway()
>>> gw2 = execnet.makegateway()
>>> ch1 = gw1.remote_exec(remote_tag)
>>> ch1.send(pickled_tagger)
>>> ch2 = gw2.remote_exec(remote_tag)
>>> ch2.send(pickled_tagger)
>>> mch = execnet.MultiChannel([ch1, ch2])
>>> queue = mch.make_receive_queue()
>>> channels = itertools.cycle(mch)
>>> for sentence in treebank.sents()[:4]:
...      channel = next(channels)
...      channel.send(sentence)
>>> tagged_sentences = []
>>> for i in range(4):
...      channel, tagged_sentence = queue.get()
...      tagged_sentences.append(tagged_sentence)
>>> len(tagged_sentences)
4
>>> gw1.exit()
>>> gw2.exit()

In the example code, we're only sending four sentences, but in real-life, you'd want to send thousands. A single computer can tag four sentences very quickly, but when thousands, or hundreds of thousands of sentences need to be tagged, sending sentences to multiple computers can be much faster than waiting for a single computer to do it all.

Local versus remote gateways

The default gateway spec is popen, which creates a Python subprocess on the local machine. This means execnet.makegateway() is equivalent to execnet.makegateway('popen'). If you have password-less SSH access to a remote machine, then you can create a remote gateway using execnet.makegateway('ssh=remotehost'), where remotehost should be the hostname of the machine. An SSH gateway spawns a new Python interpreter for executing the code remotely. As long as the code you're using for remote execution is pure, you only need a Python interpreter on the remote machine.

Channels work exactly the same no matter what kind of gateway is used; the only difference will be communication time. This means you can mix and match local subprocesses with remote interpreters to distribute your computations across many machines in a network. There are many more details on gateways in the API documentation at http://codespeak.net/execnet/basics.html.

See also

Part-of-speech tagging and taggers are covered in detail in Chapter 4, Part-of-speech Tagging. In the next recipe, we'll use execnet to do distributed chunk extraction.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset