Like the replicated join described in the previous recipe, the Apache Pig's merge join is another map-side join technique. However, the major difference between the two implementations is that the merge join does not place any data into main memory. This recipe will demonstrate how to use Pig's merge join to join two datasets.
Download the apache_nobots_tsv.txt
and nobots_ip_country_tsv.txt
datasets from http://www.packtpub.com/support and place them into the folder that you are working on. You will also need a recent version of Apache Pig (0.9 or later) installed on the cluster.
In order to use the merge join functionality in Pig, the two datasets need to be sorted on the join key. To sort the two datasets, run the following commands using Unix sort:
$ sort -k1 apache_nobots_tsv.txt > sorted_apache_nobots_tsv.txt $ sort -k1 nobots_ip_country_tsv.txt > sorted_nobots_ip_country_tsv.txt
Place the two new sorted files into HDFS:
$ hadoop fs –put sorted_apache_nobots_tsv.txt /user/hadoop $ hadoop fs –put sorted_nobots_ip_country_tsv.txt /user/hadoop
Carry out the following steps to perform a merge join in Apache Pig:
merge_join.pig
. Create two Pig relations to load the two datasets:nobots_weblogs = LOAD '/user/hadoop/sorted_apache_nobots_tsv.txt' AS (ip: chararray, timestamp:long, page:chararray, http_status:int, payload_size:int, useragent:chararray); ip_country_tbl = LOAD '/user/hadoop/sorted_nobots_ip_country_tsv.txt' AS (ip:chararray, country:chararray);
merge
keyword:weblog_country_jnd = JOIN nobots_weblogs BY ip, ip_country_tbl BY ip USING 'merge';
cleaned = FOREACH weblog_country_jnd GENERATE ip_country_tbl::ip, country, timestamp, page, http_status, payload_size, useragent; STORE cleaned INTO '/user/jowens/weblog_country_jnd_merge';
$ pig –f merge_join.pig
In step 1, we defined two relations: nobots_weblogs
and ip_country_tbl
, to refer to the two datasets.
In step 2, we joined the two datasets on the ip
field using Pig's merge join. Pig will launch two MapReduce jobs to perform the merge join. First, Pig will send the data associated with the nobots_weblogs
relation to all of the mappers, and sample the ip_country_tbl
data to build an index. It is important to
place the larger of the two relations as the left-hand side input to the
JOIN
statement, as we did with the
nobots_weblogs
relation. Once Pig has built the index, it launches a second map-only job, which reads the left-hand side relationship, and the index created in the first MapReduce job to join the two relations.
It is important to note that Pig's merge join requires the input data to be sorted in ascending order across all input files. In addition, all of the data must be sorted in an ascending order by filename. For example, if the nobots_weblogs
relation contains three distinct IPs across two input files, the following IPs could be distributed in this fashion:
This example shows the possible total ordering of IPs across a number of files ordered by name. Filenames need to support ascending order because this is the order in which Pig will attempt to read each file to access the sorted data.