The Hive query language provides facilities to control the MapReduce dataflow and inject your own custom map, and to reduce scripts at each stage. When used properly, this is a very powerful technique for writing concise MapReduce programs using minimal syntax.
This recipe will show a complete example of how to write custom MapReduce control flow using different operators in Hive. The analytic will specifically look for the longest gap in events for each location to get an idea of how frequently violence occurs in that location.
Make sure you have access to a pseudo-distributed or fully-distributed Hadoop cluster with Apache Hive 0.7.1 installed on your client machine and on the environment path for the active user account.
Your cluster will also need Python 2.7 or greater installed on each node and available on the environment path for the Hadoop user. The script shown in this recipe assumes an installation at /usr/bin/env python
. If this does not match your installation, change the script accordingly.
This recipe depends on having the Nigeria_ACLED_cleaned.tsv
dataset loaded into a Hive table named acled_nigeria_cleaned
with the following fields mapped to the respective datatypes.
Issue the following command to the Hive client:
describe acled_nigeria_cleaned;
You should see the following response:
OK loc string event_date string event_type string actor string latitude double longitude double source string fatalities int
Perform the following steps to mark the longest period of non-violence using Hive:
SET mapred.child.java.opts=-Xmx512M; DROP TABLE IF EXISTS longest_event_delta_per_loc; CREATE TABLE longest_event_delta_per_loc ( loc STRING, start_date STRING, end_date STRING, days INT ); ADD FILE calc_longest_nonviolent_period.py; FROM ( SELECT loc, event_date, event_type FROM acled_nigeria_cleaned DISTRIBUTE BY loc SORT BY loc, event_date ) mapout INSERT OVERWRITE TABLE longest_event_delta_per_loc REDUCE mapout.loc, mapout.event_date, mapout.event_type USING 'python calc_longest_nonviolent_period.py' AS loc, start_date, end_date, days;
longest_nonviolent_periods_per_location.sql
.calc_longest_nonviolent_period.py
and save it in the same working folder as longest_nonviolent_periods_per_location.sql
.#!/usr/bin/python import sys from datetime import datetime, timedelta current_loc = "START_OF_APP" (prev_date, start_date, end_date, start_time_obj, end_time_obj, current_diff)=('', '', '', None, None, timedelta.min) for line in sys.stdin: (loc,event_date,event_type) = line.strip(' ').split(' ') if loc != current_loc and current_loc != "START_OF_APP": if end_date != '': print ' '.join([current_loc,start_date,event_date,str(current_diff.days)]) (prev_date, start_date, end_date, start_time_obj, end_time_obj,current_diff)=('', '', '', None, None, timedelta.min) end_time_obj = datetime.strptime(event_date,'%Y-%m-%d') current_loc = loc if start_time_obj is not None: # implies > 2 events diff = end_time_obj - start_time_obj if diff > current_diff: current_diff = diff # set the current max time delta start_date = prev_date end_date = event_date prev_date = event_date start_time_obj = end_time_obj
hive –f longest_nonviolent_periods_per_location.sql
hive –e "select * from longest_event_delta_per_loc;"
Let's start with the Hive script we created. The first line is simply to force a certain JVM heap size in our execution. You can set this to whatever is appropriate for your cluster. For the ACLED Nigeria dataset, 512 MB is more than enough.
Then we create our table definition for the output, dropping any existing tables with a matching name longest_event_delta_per_loc
. The table requires four fields per record: loc
for the location, start_date
to hold the value of the event_date
field of the lower bound, end_date
to hold the value of event_date
field of the upper bound, and days
to show the total number of days elapsed between the events.
We then add the file calc_longest_nonviolent_period.py
to the distributed cache for use across the different reducer JVMs. This will be used as our reduce script, but first we must organize the map output. The inner SELECT
statement grabs loc
, event_date
, and event_type
from the acled_nigeria_cleaned
table in Hive. The DISTRIBUTE BY loc
statement tells Hive to guarantee that all rows with matching values for loc
go to the same reducer. SORT BY loc
, event_date
tells Hive to sort the data as it arrives to each reducer by the combination of loc
and event_date
. Now the same reducer can process every row corresponding to each location locally, and in the sorted order of event_date
.
We alias the output of this SELECT
statement to mapout
and use the shorthand REDUCE
operator to process each row from mapout
. The USING
clause lets us supply a custom Python script to read each record as it comes over stdin
. The AS
operator lets us map the delimited fields that are output by the script over stdout
to pipe into the fields of the receiving table.
The Python script calc_longest_nonviolent_period.py
will be used by the reduce stage to compute the longest time gap between the events for each location. Since we have guaranteed that all records with a common loc
value are at the same reducer and that those records are in the date-sorted order for each location,
we are now in a position to understand how the Python script works.
In the Python script calc_longest_nonviolent_period.py
, we start with #!/usr/bin/python
as a hint to the shell on how to execute the script. We need to import sys
to use the stdin
and stdout
operations. We also need the datetime
and timedelta
class definitions from the datetime
package.
The script operates very procedurally and can be a bit difficult to follow. First we declare current_loc
and initialize its value to START_OF_APP
as a flag to the print out conditional. We then set up several different variables to hold different placeholder values to be used on a per-location basis by the for
loop.
prev_date
: This holds the last observed event_date
for the loc
value. It is blank if it's the start of the app, or holds a new location value.start_date
: This holds the lower bound for the longest currently observed time delta between events for that value of loc
.end_date
: This holds the upper bound for the longest currently observed time elapsed between events for the value of current_loc
.start_time_obj
: This holds the most recently iterated datetime
object, or None
if it's the start of the app, or holds a new location value.end_time_obj
: This holds the current event_date
datetime
object, or None
if it's the start of the app, or holds a new location value.current_diff
: This holds the time delta for the current longest observed time elapsed between events for the current_loc
, or the lowest possible time delta if it's the start of the app, or a new location value.The for
loop reads rows over stdin
that have already been sorted by the combination of loc
and event_date
. We parse each row into variables representing the column values by first stripping any additional newlines and splitting the line on tab characters.
The first conditional is skipped as current_loc
is equal to START OF APP
. We have only begun processing the first row across all locations on that reducer, and have nothing to output yet. Should we have a value for loc
that is different from the value of current_loc
, and we are not at the start of the application, then that is a signal that we are done processing the rows for
current_loc
, and can safely output the longest time delta for events in that location. Should end_date
still be set to an empty string, then that indicates we only saw one event for that location. In this scenario,
we do not output anything for that location. Finally, we reset the six placeholder variables previously explained, so that we may accurately process the records for the next location.
Following the conditional, we immediately set the value of current_loc
that we are processing equal to loc
, to avoid unnecessary entry of the mentioned conditional on the next iteration when we have not yet transitioned locations. We set end_time_obj
to the value of event_date
for the current row. If start_time_obj
is set to None
, then that means we are on the first row for that location and cannot yet do a time delta comparison. Whether or not start_time_obj
is set to None
, at the end of the loop we set prev_date
equal to event_date
and start_time_obj
equal to end_time_obj
of the current iteration. By doing so, on the next iteration, start_time_obj
will hold the event_date
of the previous record, while end_time_obj
will hold the event_date
of the current record.
When start_time_obj
is no longer set to None
after the first iteration for a given location, we can begin doing diff
comparisons on these two datetime
objects. Subtracting start_time_obj
from end_time_obj
yields a time delta object, which if larger than the current_diff
value, gets set as the value for current_diff
. In doing so, we capture the longest elapsed time period for that location between events. We also set the values of start_date
and end_date
for easy output later, once we are done processing this location. As mentioned earlier, whether or not we reset current_diff
, we then change prev_date
to point to event_date
and start_time_obj
equal to the current end_time_obj
.
The next time the loop encounters the condition where loc
is not equal to current_loc
, we output the currently held longest time difference between events, before we move onto the next event. Each print to stdout
writes a row into the receiving Hive table that holds each location held by current_loc
, the lower_bound event_date
string held by start_date
, the upper bound event_date
string held by end_date
, and the total number of days elapsed between those two dates held by current_diff.days
.
Here are a few additional notes on some of the operations touched upon in this recipe:.
These four operator variants always cause confusion to Hive beginners. Here's a quick comparison so you'll know which one is appropriate for your use case:
DISTRIBUTE BY
: Rows with matching column values will partition to the same reducer. When used alone, it does not guarantee sorted input to the reducer.SORT BY
: This dictates which columns to sort by when ordering reducer input records.CLUSTER BY
: This is a shorthand operator to perform both SORT BY
and DISTRIBUTE BY
operations on a group of columns.ORDER BY
: This is similar to the traditional SQL operator. Sorted order is maintained across all of the output from every reducer. Use this with caution as it can force all of the output records to a single reducer to perform the sorting. Usage with LIMIT
is strongly recommended.