Apache Pig supports a number of advanced joins, including:
The
reduce-side join is the default implementation when you use Pig's JOIN
operator. Pig also supports map-side joins when you specify the replicated
or merge
keyword. This recipe will demonstrate how to perform a map-side replicated join using Pig. We will join a weblog dataset, and a dataset containing a list of distinct IPs and their associated countries.
Download the apache_nobots_tsv.txt
and nobots_ip_country_tsv.txt
datasets from http://www.packtpub.com/support and place them into HDFS. You will also need a recent version of Apache Pig (0.9 or later) installed on the cluster.
Carry out the following steps to perform a replicated join in Apache Pig:
replicated_join.pig
. Create two Pig relations to load the two datasets:nobots_weblogs = LOAD '/user/hadoop/apache_nobots_tsv.txt' AS (ip: chararray, timestamp:long, page:chararray, http_status:int, payload_size:int, useragent:chararray); ip_country_tbl = LOAD '/user/hadoop/nobots_ip_country_tsv.txt' AS (ip:chararray, country:chararray);
replicated
keyword:weblog_country_jnd = JOIN nobots_weblogs BY ip, ip_country_tbl BY ip USING 'replicated';
cleaned = FOREACH weblog_country_jnd GENERATE ip_country_tbl::ip, country, timestamp, page, http_status, payload_size, useragent; STORE cleaned INTO '/user/hadoop/weblog_country_jnd_replicated';
$ pig –f replicated_join.pig
In step 1, we defined two relations: nobots_weblogs
and ip_country_tbl
, to refer to the two input datasets. Next, we joined the two datasets on the ip
field using Pig's replicated join. Pig will load the right-most relation, ip_country_tbl
, into memory and will join the data with the nobots_weblogs
relationship.
It is important that the right-most relations be small enough to fit into a mapper's memory. Pig will not warn you if the dataset is too large, the job will just fail with an out of memory exception.
Finally, in step 3, we formatted the joined relation into a new relation named cleaned
. There is one field that looks odd in the FOREACH
statement, and that field is ip_country_tbl::ip
. We had to use the ::
operator to define which column we wanted to store in the cleaned
relation, since the joined relation contains two fields named ip
. We could have easily chosen to use nobots_weblogs::ip
instead; it makes no difference in this example.
The replicated join can be used on more than one relation. For example, we can modify the previous recipe to use a replicated join to perform an inner join on three relations:
weblog_country_jnd = JOIN nobots_weblogs BY ip, ip_country_tbl BY ip, another_relation BY ip USING 'replicated';
Again, the right-most datasets must fit into the main memory. In this case, both ip_country_tbl
and another_relation
must fit into
the memory of a mapper.