We talked enough about NLTK as a library, and what are some of the most-used functions it gives us. Now, NLTK can solve many NLP problems from which many are highly parallelizable. This is the reason why we will try to use NLTK on Hadoop.
The best way of running NLTK on Hadoop is to get it installed on all the nodes of the cluster. This is probably not that difficult to achieve. There are ways in which you can do this, such as sending the resource files as a streaming argument. However, we will rather prefer the first option.
There are a variety of ways in which we can make NLTK run on Hadoop. Let's talk about one example of using NLTK by doing tokenization in parallel using a Hive UDF.
For this use case, we have to follow these steps:
ID |
Content |
---|---|
UA0001 |
"I tried calling you. The service was not up to the mark" |
UA0002 |
"Can you please update my phone no" |
UA0003 |
"Really bad experience" |
UA0004 |
"I am looking for an iPhone" |
Hive script
CREATE TABLE $InputTableName ( ID String, Content String ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ';
$outTable
with the same schema, and the added column of tokens:Hive script
CREATE TABLE $OutTableName ( ID String, Content String, Tokens String )
tokenize
method. This is very similar to what we did in Chapter 3, Part of Speech Tagging. This is the piece of function that is analogous to all the examples in Chapter 3, Part of Speech Tagging. Now, if you want to get POS tags, Lemmatization, and HTML, you just need to modify this UDF. Let's see how the UDF will look for our tokenizer:>>>import sys >>>import datetime >>>import pickle >>>import nltk >>>nltk.download('punkt') >>>for line in sys.stdin: >>> line = line.strip() >>> print>>sys.stderr, line >>> id, content= line.split(' ') >>> print>>sys.stderr,tok.tokenize(content) >>> tokens =nltk.word_tokenize(concat_all_text) >>> print ' '.join([id,content,tokens])
nltk_scoring.py
.TRANSFORM
function to apply the UDF on the given content and to do tokenization and dump the tokens in the new column:Hive script
add FILE nltk_scoring.py; add FILE english.pickle; #Adding file to DistributedCache INSERT OVERWRITE TABLE $OutTableName SELECT TRANSFORM (id, content) USING 'PYTHONPATH nltk_scoring.py' AS (id string, content string, tokens string ) FROM $InputTablename;
raiseLookupError(resource_not_found) LookupError: ********************************************************************** Resource u'tokenizers/punkt/english.pickle' not found. Please use the NLTK Downloader to obtain the resource: >>> nltk.download() Searched in: - '/home/nltk_data' - '/usr/share/nltk_data' - '/usr/local/share/nltk_data' - '/usr/lib/nltk_data' - '/usr/local/lib/nltk_data'
OutTableName
, that will look something like this:
ID |
Content | |
---|---|---|
UA0001 |
"I tried calling you, The service was not up to the mark" |
[" I", " tried", "calling", "you", "The", "service" "was", "not", "up", "to", "the", "mark"] |
UA0002 |
"Can you please update my phone no" |
["Can", "you", "please" "update", " my", "phone" "no"] |
UA0003 |
"Really bad experience" |
["Really"," bad" "experience"] |
UA0004 |
"I am looking for an iphone" |
["I", "am", "looking", "for", "an", "iPhone"] |
Let's try the second option of Python streaming. We have Hadoop streaming, where we can write our own mapper and reducer functions, and then use Python streaming with mapper.py
, as it looks quite similar to our Hive UDF. Here we are using the same example with map-reduce python streaming this will give us a option of choosing a Hive table or using a HDFS file directly. We will just go over the content of the file and tokenize it. We will not perform any reduce operation here, but for learning, I included a dummy reducer, which just dumps it. So now, we can ignore the reducer from the execution command completely.
Here is the code for the Mapper.py:
Mapper.py
>>>import sys >>>import pickle >>>import nltk >>>for line in sys.stdin: >>> line = line.strip() >>> id, content = line.split(' ') >>> tokens =nltk.word_tokenize(concat_all_text) >>> print ' '.join([id,content,topics])
Here is the code for the Reducer.py
:
Reducer.py
>>>import sys >>>import pickle >>>import nltk >>>for line in sys.stdin: >>> line = line.strip() >>> id, content,tokens = line.split(' ') >>> print ' '.join([id,content,tokens])
The following is the Hadoop command to execute a Python stream:Hive script
hadoop jar <path>/hadoop-streaming.jar -D mapred.reduce.tasks=1 -file <path>/mapper.py -mapper <path>/mapper.py -file <path>/reducer.py -reducer <path>/reducer.py -input /hdfspath/infile -output outfile