Hadoop is not limited to running MapReduce jobs written in
Java or other JVM languages. It also provides a generic streaming interface. Using the streaming interface, any application that can read and write to
stdin
and
stdout
can be used in a MapReduce job. Since streaming jobs do not have access to the Hadoop Java classes, different approaches need to be taken to get access to the framework's features. One convenient and extremely useful feature provided by Hadoop is Counters. This recipe will use a simple Python program to show how to increment a counter from a streaming application. The Python code does not have direct access to the Java Reporter
class used by the Hadoop framework for working with Counters. Instead, it will write data to stderr
in a format that has special meaning. The Hadoop framework will interpret this as a request to increment the specified counter.
You will need to download the weblog_entries_bad_records.txt
dataset from the Packt website, http://www.packtpub.com/support. This example will use the streaming_counters.py
Python program provided in the code section of this chapter.
Complete the following steps to execute a Hadoop streaming job using the streaming_counters.py
program:
hadoop jar $HADOOP_HOME/contrib/hadoop-*streaming*.jar -file streaming_counters.py -mapper streaming_counters.py -reducer NONE -input /data/weblogs/weblog_entries_bad_records.txt -output /data/weblogs/weblog_entries_filtered.txt
localhost:50030
. Scroll down to the Completed Jobs section. Then locate the streaming_counters job. The most recent jobs are at the bottom of the table. Once the job has been located, click on Jobid.The Hadoop framework constantly monitors
stderr
for entries that fit the following format:
reporter:counter:group,counter,value
If it finds a string that matches this format, the Hadoop framework will check to see if that group and counter exists. If they do exist, the current value will be incremented by that value. If they do not exist, the group and counter will be created and set to that value.
The Python code performs two validation checks on the weblog data. The first checks for an invalid number of columns:
if len(cols) < 5: sys.stderr.write("reporter:counter:BadRecords, INVALID_NUMBER_OF_COLS,1") continue
If a line has less than five columns, the program will write to stderr
in the format that Hadoop expects for manipulating the Counter. Similarly, the second validation verifies the IP address of each record and increments a counter each time an invalid IP address is found.
m = re.match(('^([01]?\d\d?|2[0-4]\d|25[0-5])' '.([01]?\d\d?|2[0-4]\d|25[0-5])' '.([01]?\d\d?|2[0-4]\d|25[0-5])' '.([01]?\d\d?|2[0-4]\d|25[0-5])$'), ip) if not m: sys.stderr.write("reporter:counter:BadRecords,INVALID_IP,1") continue
Streaming jobs also have access to setting the task's status message using the same basic method. Writing to stderr
in the following format will update a task's status, setting it to message
.
reporter:status:message