Joining data in MapReduce is an expensive operation. Depending on the size of the datasets, you can choose to perform a map-side join or a reduce-side join. In a map-side join, two or more datasets are joined on a key in the map phase of a MapReduce job. In a reduce-side join, the mapper emits the join key, and the reduce phase is responsible for joining the two datasets. In this recipe we will demonstrate how to perform a map-side replicated join using Pig. We will join a weblog dataset, and a dataset containing a list of distinct IPs and their associated country. As the datasets will be joined in the map-phase, this will be a map-only job.
Download the apache_nobots_tsv.txt
and nobots_ip_country_tsv.txt
datasets from http://www.packtpub.com/support and place them into HDFS.
Carry out the following steps to join data in the map phase using MapReduce:
nobots_ip_country_tsv.txt
dataset into the distributed cache:public class MapSideJoin extends Configured implements Tool { public int run(String[] args) throws Exception { Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); Configuration conf = getConf(); DistributedCache.addCacheFile(new URI("/user/hadoop/nobots_ip_country_tsv.txt"), conf); Job weblogJob = new Job(conf); weblogJob.setJobName("MapSideJoin"); weblogJob.setNumReduceTasks(0); weblogJob.setJarByClass(getClass()); weblogJob.setMapperClass(WeblogMapper.class); weblogJob.setMapOutputKeyClass(Text.class); weblogJob.setMapOutputValueClass(Text.class); weblogJob.setOutputKeyClass(Text.class); weblogJob.setOutputValueClass(Text.class); weblogJob.setInputFormatClass(TextInputFormat.class); weblogJob.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(weblogJob, inputPath); FileOutputFormat.setOutputPath(weblogJob, outputPath); if(weblogJob.waitForCompletion(true)) { return 0; } return 1; } public static void main( String[] args ) throws Exception { int returnCode = ToolRunner.run(new MapSideJoin(), args); System.exit(returnCode); } }
nobots_ip_country_tsv.txt
dataset from the distributed cache, and store the IP
/Country
table into a HashMap.public class WeblogMapper extends Mapper<Object, Text, Text, Text> { public static final String IP_COUNTRY_TABLE_FILENAME = "nobots_ip_country_tsv.txt"; private Map<String, String> ipCountryMap = new HashMap<String, String>(); private Text outputKey = new Text(); private Text outputValue = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { Path[] files = DistributedCache.getLocalCacheFiles(context.getConfiguration()); for (Path p : files) { if (p.getName().equals(IP_COUNTRY_TABLE_FILENAME)) { BufferedReader reader = new BufferedReader(new FileReader(p.toString())); String line = reader.readLine(); while(line != null) { String[] tokens = line.split(" "); String ip = tokens[0]; String country = tokens[1]; ipCountryMap.put(ip, country); line = reader.readLine(); } } } if (ipCountryMap.isEmpty()) { throw new IOException("Unable to load IP country table."); } } @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String row = value.toString(); String[] tokens = row.split(" "); String ip = tokens[0]; String country = ipCountryMap.get(ip); outputKey.set(country); outputValue.set(row); context.write(outputKey, outputValue); } }
$ hadoop jar AdvJoinChapter5-1.0.jar com.packt.ch5.advjoin.mr.MapSideJoin /user/hadoop/apache_nobots_tsv.txt /user/hadoop/data_jnd
In step 1, we called the following static method:
DistributedCache.addCacheFile(new URI("/user/hadoop/nobots_ip_country_tsv.txt"), conf)
This method will set the
mapred.cache.files
property in the job configuration. The mapred.cache.files
property tells the MapReduce framework to distribute the nobots_ip_country_tsv.txt
file to every node in
the cluster that will launch a mapper (and reducer if your job is configured to run reducers).
In step 2, we overrode the
setup()
method of the mapper. The setup()
method is called by the MapReduce framework only once, prior to any calls to the map()
method. The setup()
method is an excellent place to perform any one-time initialization to the mapper class.
To read from the distributed cache, we used the static method DistributedCache.getLocalCacheFiles(context.getConfiguration())
to get all of the files that have been placed, into the distributed cache. Next, we iterated over every file in the distributed cache, which was only one, and loaded the nobots_ip_country_tsv.txt
dataset into a HashSet.
Finally, in the
map()
method, we used the HashSet loaded in the setup()
method to join the nobots_ip_country_tsv.txt
and the apache_nobots_tsv.txt
files by emitting the country associated with every IP in the apache_nobots_tsv.txt
file.
The MapReduce framework also supports distributing archive files using the distributed cache. An archive file can be a ZIP file, GZIP file, or even a JAR file. Once the archives have been distributed to the task nodes, they will be decompressed automatically.
To add an archive to the distributed cache, simply use the
addCacheArchive()
static method of the
DistributedCache
class when configuring the MapReduce job:
DistributedCache.addCacheArchive(new URI("/user/hadoop/nobots_ip_country_tsv.zip"), conf);