Distributed word scoring with Redis and execnet

We can use Redis and execnet together to do distributed word scoring. In the Calculating high information words recipe in Chapter 7, Text Classification, we calculated the information gain of each word in the movie_reviews corpus using a FreqDist and ConditionalFreqDist. Now that we have Redis, we can do the same thing using a RedisHashFreqDist and a RedisConditionalHashFreqDist, and then store the scores in a RedisOrderedDict. We can use execnet to distribute the counting in order to get a better performance out of Redis.

Getting ready

Redis, redis-py, and execnet must be installed, and an instance of redis-server must be running on localhost.

How to do it...

We start by getting a list of (label, words) tuples for each label in the movie_reviews corpus (which only has pos and neg labels). Then, we get the word_scores using score_words() from the dist_featx module. The word_scores function is an instance of RedisOrderedDict, and we can see that the total number of words is 39,764. Using the keys() method, we can then get the top 1,000 words and inspect the top five, just to see what they are. Once we've gotten all we want from word_scores, we can delete the keys in Redis, as we no longer need the data.

>>> from dist_featx import score_words
>>> from nltk.corpus import movie_reviews
>>> labels = movie_reviews.categories()
>>> labelled_words = [(l, movie_reviews.words(categories=[l])) for l in labels]
>>> word_scores = score_words(labelled_words)
>>> len(word_scores)
39767
>>> topn_words = word_scores.keys(end=1000)
>>> topn_words[0:5]
[b'bad', b',', b'and', b'?', b'movie']
>>> from redis import Redis
>>> r = Redis()
>>> [r.delete(key) for key in ['word_fd', 'label_word_fd:neg', 'label_word_fd:pos', 'word_scores']]
[1, 1, 1, 1]

The score_words() function from dist_featx can take a while to complete, so expect to wait a couple of minutes. The overhead of using execnet and Redis means it will take significantly longer than a nondistributed, in-memory version of the function.

How it works...

The dist_featx.py module contains the score_words() function, which does the following:

  • Opens gateways and channels, sending initialization data to each channel
  • Sends each (label, words) tuple over a channel for counting
  • Sends a done message to each channel, waits for a done reply back, then closes the channels and gateways
  • Calculates the score of each word based on the counts and stores in a RedisOrderedDict

In our case of counting words in the movie_reviews corpus, calling score_words() opens two gateways and channels, one for counting the pos words and the other for counting the neg words. The communication looks like the following diagram:

How it works...

Once the counting is finished, we can score all the words and store the results. The code itself is as follows:

import itertools, execnet, remote_word_count
from nltk.metrics import BigramAssocMeasures
from redis import Redis
from redisprob import RedisHashFreqDist, RedisConditionalHashFreqDist
from rediscollections import RedisOrderedDict

def score_words(labelled_words, score_fn=BigramAssocMeasures.chi_sq, host='localhost', specs=[('popen', 2)]):
  gateways = []
  channels = []
  
  for spec, count in specs:
    for i in range(count):
      gw = execnet.makegateway(spec)
      gateways.append(gw)
      channel = gw.remote_exec(remote_word_count)
      channel.send((host, 'word_fd', 'label_word_fd'))
      channels.append(channel)
  
  cyc = itertools.cycle(channels)
  
  for label, words in labelled_words:
    channel = next(cyc)
    channel.send((label, list(words)))
  
  for channel in channels:
    channel.send('done')
    assert 'done' == channel.receive()
    channel.waitclose(5)
  
  for gateway in gateways:
    gateway.exit()
  
  r = Redis(host)
  fd = RedisHashFreqDist(r, 'word_fd')
  cfd = RedisConditionalHashFreqDist(r, 'label_word_fd')
  word_scores = RedisOrderedDict(r, 'word_scores')
  n_xx = cfd.N()
  
  for label in cfd.conditions():
    n_xi = cfd[label].N()
    
    for word, n_ii in cfd[label].iteritems():
      word = word.decode()
      n_ix = fd[word]
      
      if n_ii and n_ix and n_xi and n_xx:
        score = score_fn(n_ii, (n_ix, n_xi), n_xx)
        word_scores[word] = score
  
  return word_scores

Note

Note that this scoring method will only be accurate for comparing two labels. If there are more than two labels, a different scoring method should be used, and its requirements will dictate how you store word scores.

The remote_word_count.py module looks like the following code:

from redis import Redis
from redisprob import RedisHashFreqDist, RedisConditionalHashFreqDist

if __name__ == '__channelexec__':
  host, fd_name, cfd_name = channel.receive()
  r = Redis(host)
  fd = RedisHashFreqDist(r, fd_name)
  cfd = RedisConditionalHashFreqDist(r, cfd_name)
  
  for data in channel:
    if data == 'done':
      channel.send('done')
      break
    
    label, words = data
    
    for word in words:
      fd[word] += 1
      cfd[label][word] += 1

You'll notice that this is not a pure module, as it requires being able to import both redis and redisprob. The reason is that instances of RedisHashFreqDist and RedisConditionalHashFreqDist cannot be pickled and sent over the channel. Instead, we send the hostname and key names over the channel so we can create the instances in the remote module. Once we have the instances, there are two kinds of data we can receive over the channel:

  • A done message, which signals that there is no more data coming in over the channel. We reply back with another done message, then exit the loop to close the channel.
  • A 2-tuple of (label, words), which we then iterate over to increment counts in both the RedisHashFreqDist and RedisConditionalHashFreqDist.

There's more...

In this particular case, it would be faster to compute the scores without using Redis or execnet. However, by using Redis, we can store the scores persistently for later examination and usage. Being able to inspect all the word counts and scores manually is a great way to learn about your data. We can also tweak feature extraction without having to re-compute the scores. For example, you could use featx.bag_of_words_in_set() (found in Chapter 7, Text Classification) with the top N words from the RedisOrderedDict, where N could be 1,000, 2,000, or whatever number you want. If our data size is much greater, the benefits of execnet will be much more apparent. Horizontal scalability using execnet or some other method to distribute computations across many nodes becomes more valuable as the size of the data you need to process increases. This method of word scoring is much slower than if we weren't using Redis, but the benefit is that the numbers are stored persistently.

See also

The Calculating high information words recipe in Chapter 7, Text Classification, introduces information gain scoring of words for feature extraction and classification. The first three recipes of this chapter show how to use execnet, while the next three recipes describe RedisHashFreqDist, RedisConditionalHashFreqDist, and RedisOrderedDict, respectively.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset