The distributed cache in MapReduce is almost always required for any complex assignment involving dependent libraries and code. One very common operation is passing cache files for use in each map/reduce task JVM. This recipe will use the MapReduce API and the distributed cache to mark any lines in the news archive dataset that contain one or more keywords denoted in a list. We will use the distributed cache to make each mapper aware of the list location in HDFS.
This recipe assumes you have a basic familiarity with the Hadoop 0.20 MapReduce API. You will need access to the news_archives.zip
dataset supplied with this book. Inside the ZIP file, you will find rural.txt
and science.txt
. Place both in a single HDFS directory. Additionally, inside the ZIP file you will find news_keywords.txt
. You will need to place this file in an HDFS directory with the absolute path /cache_files/news_archives.txt
. Feel free to add any additional words to this file, so long as they each appear on a new line.
You will need access to a pseudo-distributed or fully-distributed cluster capable of running MapReduce jobs using the newer MapReduce API introduced in Hadoop 0.20.
You will also need to package this code inside a JAR file that is to be executed by the Hadoop JAR launcher from the shell. Only the core Hadoop libraries are required to compile and run this example.
Carry out the following steps to implement a word-matching MapReduce job:
LinesWithMatchingWordsJob.java
in your JAR file at whatever source package is appropriate.Tool
implementation for job submission:import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.HashSet; import java.util.Set; import java.util.regex.Pattern; public class LinesWithMatchingWordsJob implements Tool { private Configuration conf; public static final String NAME = "linemarker"; public void setConf(Configuration conf) { this.conf = conf; } public Configuration getConf() { return conf; } public static void main(String[] args) throws Exception { if(args.length != 2) { System.err.println("Usage: linemarker <input> <output>"); System.exit(1); } ToolRunner.run(new LinesWithMatchingWordsJob( new Configuration()), args); } public LinesWithMatchingWordsJob(Configuration conf) { this.conf = conf; }
run()
method is where we set the input/output formats, mapper class configuration, and key-value class configuration:public int run(String[] args) throws Exception { DistributedCache.addCacheFile(new Path("/cache_files/news_keywords.txt").toUri(), conf); Job job = new Job(conf, "Line Marker"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(LineMarkerMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setJarByClass(LinesWithMatchingWordsJob.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); return job.waitForCompletion(true) ? 1 : 0; }
map()
function is implemented in the following code snippet by extending mapreduce.Mapper
:public static class LineMarkerMapper extends Mapper<LongWritable, Text, LongWritable, Text> { private Pattern space_pattern = Pattern.compile("[ ]"); private Set<String> keywords = new HashSet<String>();
setup()
routine, we must load and write the file to a local disk from the distributed cache:@Override protected void setup(Context context) throws IOException, InterruptedException { URI[] uris =DistributedCache.getCacheFiles( context.getConfiguration()); FileSystem fs = FileSystem.get(context.getConfiguration()); if(uris == null || uris.length == 0) { throw new IOException("Error reading file from distributed cache. No URIs found."); } String localPath = "./keywords.txt"; fs.copyToLocalFile(new Path(uris[0]), new Path(localPath)); BufferedReader reader = new BufferedReader(new FileReader(localPath)); String word = null; while((word = reader.readLine()) != null) { keywords.add(word); } }
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = space_pattern.split(value.toString()); for(String token : tokens) { if(keywords.contains(token)) { context.write(key, new Text(token)); } } } } }
First, we set up our imports and create a public class LinesWithMatchingWordsJob
. This class implements the Hadoop Tool
interface for easy submission using the ToolRunner
. Before the job is submitted, we first check for the existence of both input and output parameters. Inside the
run()
method, we immediately call the DistributedCache
static helper method addCacheFile()
and pass it a hardcoded reference to the HDFS cache file at the absolute path /cache_files/news_keywords.txt
. This file contains the keywords, separated by newline characters, that we are interested in locating within the news archives corpus. We pass the helper method a URI reference to this path and the Configuration
instance.
Now we can begin configuring the rest of the job. Since we are working with text, we will use the TextInputFormat
and TextOutputFormat
classes to read and write lines as strings. We will also configure the Mapper
class to use the public static inner class LineMarkerMapper
. This is a map-only job, so we set the number of reducers to zero. We also configure the output key type to be LongWritable
for the line numbers and the output value as Text
for the words, as we locate them. It's also very important to call setJarByClass()
so that the TaskTrackers
can properly unpack and find the Mapper
and Reducer
classes. The job uses the static helper methods on FileInputFormat
and FileOutputFormat
to set the input and output directories respectively. Now we are completely set up and ready to submit the job.
The Mapper
class has two very important member variables. There is a statically compiled regex pattern used to tokenize each line by spaces, and a wordlist Set
used to store each distinct word we are interested in searching for.
The
setup()
method in the Mapper is told to pull the complete list of cache file URIs currently in the distributed cache. We first check that the URI array returned a non-null value and that the number of elements is greater than zero. If the array passes these tests, grab the keywords file located in HDFS and write it to the temporary working directory for the task. Save the contents in a local file named ./keywords.txt
. Now we are free to use the standard Java I/O classes to read/write off the local disk. Each line contained in the file denotes a keyword that we can store in the keywords' HashSet. Inside our map()
function, we first tokenize the line by spaces, and for each token, we see if it's contained in our keyword list. If a match is found, emit the line number it was found on as the key and the token itself as the value.
The following are a few additional tips to know when starting out with the distributed cache in MapReduce.
Very frequently, your map and reduce tasks will depend on third-party libraries that take the form of JAR files. If you store these dependencies in HDFS, you can use the static helper method DistributedCache.addArchiveToClassPath()
to initialize your job with the dependencies and have Hadoop automatically add the JAR files as classpath dependencies for every task JVM in that job.