Data skew is a serious problem in a distributed processing environment, and occurs when the data is not evenly divided among the emitted key tuples from the map phase. This can lead to inconsistent processing times. In the MapReduce framework, data skew can cause some mappers/reducers to take significantly more time to perform a task as compared to other mappers/reducers in the job.
Apache Pig has the skewed join to help alleviate the data skew issue with joins. This recipe will demonstrate how to join a skewed dataset, with a small table.
Download the apache_nobots_tsv.txt
and nobots_ip_country_tsv.txt
datasets from http://www.packtpub.com/support and place them in the folder which you are currently working on. You will also need a recent version of Apache Pig (0.9 or later) installed on the cluster.
To skew the apache_nobots_tsv.txt
file, create the following shell script to append the same row a few thousand times to a new file named skewed_apache_nobots_tsv.txt
:
#!/bin/bash cat apache_nobots_tsv.txt > skewed_apache_nobots_tsv.txt for i in {1..5000} do head -n1 apache_nobots_tsv.txt >> skewed_apache_nobots_tsv.txt done
The IP address 221.220.8.0
will appear significantly higher number of times in the skewed_apache_nobots_tsv.txt
file than any other IP.
Place the skewed_apache_nobots_tsv.txt
and nobots_ip_country_tsv.txt
files into HDFS:
$hadoop fs –put skewed_apache_nobots_tsv.txt /user/hadoop/ $hadoop fs –put nobots_ip_country_tsv.txt /user/hadoop/
Follow the steps to perform a skewed join in Apache Pig:
skewed_join.pig
. Create two relations to load the two datasets:nobots_weblogs = LOAD '/user/hadoop/skewed_apache_nobots_tsv.txt' AS (ip: chararray, timestamp:long, page:chararray, http_status:int, payload_size:int, useragent:chararray); ip_country_tbl = LOAD '/user/hadoop/nobots_ip_country_tsv.txt' AS (ip:chararray, country:chararray);
skewed
keyword:weblog_country_jnd = JOIN nobots_weblogs BY ip, ip_country_tbl BY ip USING 'skewed';
cleaned = FOREACH weblog_country_jnd GENERATE ip_country_tbl::ip, country, timestamp, page, http_status, payload_size, useragent; STORE cleaned INTO '/user/hadoop/weblog_country_jnd_skewed';
$ pig –f skewed_join.pig
In step 1, we defined two relations: nobots_weblogs
and ip_country_tbl
, to refer to the two datasets.
In step 2, we joined the two datasets on the ip
field using Pig's skewed join. Pig will launch two MapReduce jobs to perform the skewed join. The first MapReduce job will sample the nobots_weblogs.txt
(the skewed data) dataset. The second MapReduce job will perform a reduce-side join. Pig will determine
how the data is distributed to the reducers based on the sampling from the first map reduce job. If there is skew present in the dataset, Pig will attempt to optimize the data distribution to the reducers.