This recipe uses certain operators in Hive to input/output data through a custom Python script. The script performs a few simple pruning operations over each row, and outputs a slightly modified version of the row into a Hive table.
You will need to download/compile/install the following:
Nigeria_ACLED.csv
, Nigeria_ACLED_cleaned.tsv
from http://www.packtpub.com/supportThis recipe requires the Nigera_ACLED.csv
file to be loaded into a Hive table named acled_nigeria
with the following fields mapped to the respective data types.
Issue the following command to the Hive client:
describe acled_nigeria
You should see the following response:
OK loc string event_date string year string event_type string actor string latitude double longitude double source string fatalities string
Follow these steps to use Python and Hive to transform data:
clean_and_transform_acled.hql
in your local working directory and add the inline creation and transformation syntax:SET mapred.child.java.opts=-Xmx512M; DROP TABLE IF EXISTS acled_nigeria_cleaned; CREATE TABLE acled_nigeria_cleaned ( loc STRING, event_date STRING, event_type STRING, actor STRING, latitude DOUBLE, longitude DOUBLE, source STRING, fatalities INT ) ROW FORMAT DELIMITED; ADD FILE ./clean_acled_nigeria.py; INSERT OVERWRITE TABLE acled_nigeria_cleaned SELECT TRANSFORM( if(loc != "", loc, 'Unknown'), event_date, year, event_type, actor, latitude, longitude, source, if(fatalities != "", fatalities, 'ZERO_FLAG')) USING 'python clean_acled_nigeria.py' AS (loc, event_date, event_type, actor, latitude, longitude, source, fatalities) FROM acled_nigeria;
clean_acled_nigeria.py
in the same working directory as clean_and_transform_acled.hql
and add the following Python code to read from stdin:#!/usr/bin/env python import sys for line in sys.stdin: (loc, event_date, year, event_type, actor, lat, lon, src, fatalities) = line.strip().split(' ') if loc != 'LOCATION': #remove header row if fatalities == 'ZERO_FLAG': fatalities = '0' print ' '.join([loc, event_date, event_type, actor, lat, lon, src, fatalities]) #strip out year
–f
option to the Hive client:$ hive –f clean_and_transform_acled.hql
–e
option to the Hive client.hive –e "select count(1) from acled_nigeria_cleaned"
Hive should count 2931 rows.
Let's start with the Hive script that we created. The first line is simply to force a certain JVM heap size in our execution. You can set this to any size that may be appropriate for your cluster. For the ACLED Nigeria dataset, a 512 MB memory is more than enough.
Immediately following this, we drop any tables with the name acled_nigeria_cleaned
and create a table by the same name. We can omit the fields delimited by ','
and rows delimited by '
'
since they are the default field and row delimiters assumed by ROW FORMAT
, and the ACLED Nigeria data is in that format.
Once we have our receiving table defined, we need to define the SELECT
statement that will transform and output the data. The common convention is to add scripts required by SELECT
before the statement. The command ADD FILE ./clean_acled_nigeria.py
tells Hive to load the script from the local filesystem into the distributed cache for use by the MapReduce tasks.
The SELECT
statement uses the Hive
TRANSFORM
operator to separate each column by tabs and to cast all columns as String
with nulls as '
'
. The columns loc
and fatalities
are conditionally checked for empty strings; and if found to be empty, are set to a default value.
We specify the USING
operator to provide a custom script to work
with the TRANSFORM
operator. Hive requires that scripts that make a call to the USING
operator for row transformation need to first invoke TRANSFORM
with the appropriate columns. If the file has been placed on the distributed cache, and each node in the cluster has Python installed, the MapReduce JVM tasks will be able to execute the script and read the rows in parallel. The AS
operator contains a list of named fields corresponding to the columns found in the receiving Hive table, acled_nigeria_cleaned
.
The Python script is very straightforward. The #!/usr/bin/env python
statement is a hint to tell the shell how to execute the script. Each row from the table is passed in as a line over standard input. The call to
strip()
method removes any leading/trailing whitespace, and then we tokenize it into an array of named variables. Each field from the row is put in a named variable. The raw ACLED Nigeria data was used to create the input Hive table, and contains a header row we wish to discard. The first condition will check for 'LOCATION'
as the value of loc
, which indicates the header row we want to ignore.
If the row passes this check, we look for the presence of 'ZERO_FLAG'
as the value for fatalities
, which we set in our Hive script. If the script detects this value for fatalities
, we set the value of fatalities
to the string '0'
.
Finally, we output each field excluding year
in the same order as it was input. Each row will be placed into the table acled_nigeria_cleaned
.
There is a lot going on in this recipe. The following are a
few additional explanations that will help you with Hive TRANSFORM/USING/AS
operations and ETL in general.
This is a bit counterintuitive and certainly not found anywhere in the Hive documentation. If your initial Hive staging table for the incoming data maps each delimited field as a string, it will aid tremendously in data validation and debugging. You can use the Hive
STRING
type to successfully represent almost any input into a cleansing script or direct Hive
QL
statement. Trying to perfectly map datatypes over expected values is not flexible to an erroneous input. There may be malformed characters for fields where you expect numeric values, and other similar hang-ups that make it impossible to perform certain analytics. Using strings over the raw data fields will allow a custom script to inspect the invalid data and decide how to respond. Moreover, when dealing with CSV or tab-separated data, a slightly misaligned INT
or FLOAT
type mapping in your Hive table declaration, where the data has a STRING
, could lead to NULL
mappings per row. String mappings for every field in the raw table will show you column misalignment failures such as these, very quickly. This is just a matter of preference, and only applies to tables designed for holding the raw or dirty input for immediate validation and transformation into other Hive tables.
This recipe only outputs strings from the
Python script for use over standard output. Hive will attempt to cast them to the appropriate type in the receiving table. The advantage to this is the time and coding space saved by not having to explicitly cast every field with the AS
operator. The disadvantage is that this will not fail should a value be cast to an incompatible type. For instance, outputting HI THERE
to a numeric field will insert NULL
for the field value for that row. This can
lead to undesirable behavior for subsequent SELECT
statements over the table.
This one is pretty self-explanatory. It is much easier to debug your script directly on the command line than it is across MapReduce task error logs. It likely will not prevent you from having to troubleshoot issues dealing with scale or data validity, but it will eliminate a large majority of the compile time and control flow issues.