This recipe shows how to use Hadoop Streaming with Python to perform a basic time series analysis over the cleansed ACLED Nigeria dataset. The program is designed to output a list of dates in sorted order for each location where the government in Nigeria regained territory.
For this recipe, we will use structured Nigerian conflict data provided by Armed Conflict Location and Event dataset collections team.
You will need to download/compile/install the following:
Nigeria_ACLED_cleaned.tsv
from http://www.packtpub.com/support and place the file into HDFSThe following are the steps to use Python with Hadoop Streaming:
run_location_regains.sh
that runs the Streaming job. It is important to change the streaming JAR path to match the absolute path of your hadoop-streaming.jar
file. The path of the hadoop-streaming.jar
file is different depending on the Hadoop distribution:#!/bin/bash $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.2-cdh3u1.jar -input /input/acled_cleaned/Nigeria_ACLED_cleaned.tsv -output /output/acled_analytic_out -mapper location_regains_mapper.py -reducer location_regains_by_time.py -file location_regains_by_time.py -file location_regains_mapper.py -jobconf stream.num.map.output.key.fields=2 -jobconf map.output.key.field.separator= -jobconf num.key.fields.for.partition=1 -jobconf mapred.reduce.tasks=1
location_regains_mapper.py
and add the following:#!/usr/bin/python import sys for line in sys.stdin: (loc, event_date, event_type, actor, lat, lon, src, fatalities) = line.strip().split(' '), (day,month,year) = event_date.split('/') if len(day) == 1: day = '0' + day if len(month) == 1: month = '0' + month; if len(year) == 2: if int(year) > 30 and int(year) < 99: year = '19' + year else: year = '20' + year event_date = year + '-' + month + '-' + day print ' '.join([loc, event_date, event_type]);
location_regains_by_time.py
and add the following:#!/usr/bin/python import sys current_loc = "START_OF_APP" govt_regains=[] for line in sys.stdin: (loc,event_date,event_type) = line.strip(' ').split(' ') if loc != current_loc: if current_loc != "START_OF_APP": print current_loc + ' ' + ' '.join(govt_regains) current_loc = loc govt_regains = [] if event_type.find('regains') != -1: govt_regains.append(event_date)
./run_location_regains.sh
You should see the job start from the command line and finish successfully:
INFO streaming.StreamJob: Output: /output/acled_analytic_out
The shell script sets up the Hadoop Streaming JAR path and passes the necessary arguments to the program. Each argument is explained in detail as follows:
Argument |
Description |
---|---|
|
The HDFS path to the input data for MapReduce. |
The HDFS path for MapReduce to write the job output. | |
Script to be run as the map function; records passed via | |
Script to be run as the reduce function. | |
Add a file to the distributed cache. This is required for external scripts. | |
Add a file to the distributed cache. | |
Tells the streaming tool which field/fields should be treated as the map output key. Our mapper outputs three fields per record. This parameter tells the program to treat the first two as the key. This will leverage the secondary sort feature in MapReduce to sort our rows based on the composite of these two fields. | |
|
Parameter for setting the delimiter token on the key. |
Guarantees that all of the map output records with the same value in the first field of the key are sent to the same reducer. | |
Number of JVM tasks to reduce over the output keys. |
The Python script used in the map phase gets a line
corresponding to each record. We call strip()
to remove any leading/trailing whitespace and then split the line on tabs. The result is an array of variables descriptively named to the row fields they hold.
The event_date
field in the raw input requires some processing. In order for the framework to sort records in ascending order of dates, we want to take the current form, which is dd/mm/yy and convert it to yyyy-mm-dd. Since some of the events occurred before the year 2000, we need to expand the year variable out to four digits. Single-digit days and months are zero-padded, so that it sorts correctly.
This analytics only requires location
, event_date
, and event_type
to be output to the reduce stage. In the shell script, we specified the first two fields as the output key. Specifying location
as the first field groups all records with the same location on a common reducer. Specifying event_date
as the
second field allows the MapReduce framework to sort the records by the composite of location
and event_date
. The value in each key-value pair is simply of the
event_type
field.
Sample map output:
(cityA, 2010-08-09, explosion) (cityB, 2008-10-10, fire) (cityA, 2009-07-03, riots)
Order reducer shows the records that are sorted on the composite value of location
and event_date
(cityA, 2009-07-03, riots) (cityA, 2010-08-09,explosion) (cityB, 2008-10-10,fire)
Our configuration specifies only one reducer, so in this recipe all of the rows will partition to the same reduce Java Virtual Machine (JVM). If multiple reduce tasks are specified, cityA
and cityB
could be processed independently on separate reduce JVMs.
Understanding how the MapReduce framework sorts and handles the output of the location_regains_mapper.py
file is important to determine how the reduce script works.
We use location_regains_by_time.py
to iterate over the sorted collection of events per location, and aggregate events that match a particular type.
As the records were partitioned by location, we can assume that each partition will go to its own mapper. Furthermore, because we specified event_date
as an additional sort column, we can make the assumption that the events corresponding to a given location are sorted by date in the ascending order. Now we are in a position to understand how the script works.
The script must keep a track of when a loc
input changes from the previous location. Such a change signifies that we are done processing the previous location, since they are all in sorted order. We initialize the current_loc
flag to START_OF_APP
. We also declare an empty array govt_regains
to hold the dates of events we are interested in.
The program starts by processing each line into variables. If there is a change in loc
and it is not the beginning of the application, we know to output the current govt_regains
collection to standard out. The change means that we are done processing the previous location, and can safely write its collection of event dates out of the reducer.
If the incoming loc
value is the same as current_loc
, we know that the incoming event still corresponds to the location we are currently processing. We check to see if the event is of the type regains
to show the government the regained territories in that region. If it matches that type, we add it to the current govt_regains
collection. Since the incoming records are sorted by event_date
, we are guaranteed that the records are inserted in govt_regains
in the ascending order of dates.
The net result is a single part file that is output from the reducer with a list of locations in lexicographically sorted order. To the right-hand side of each location is a tab-separated sorted list of dates matching the occurrences of when the government regained territory in that location.
Hadoop Streaming is a very popular component. The following are a few important additions to know:
You are not limited to just Python when working with Hadoop Streaming. Java classes, shell scripts, ruby scripts, and many other languages are frequently used to transition existing code and functionality into full-fledged MapReduce programs. Any language that can read stdin and write to stdout will work with Hadoop Streaming.