We can use the Hadoop DistributedCache to distribute read-only file based resources to the Map and Reduce tasks. These resources can be simple data files, archives or JAR files that are needed for the computations performed by the mappers or the reducers.
The following steps show you how to add a file to the Hadoop DistributedCache
and how to retrieve it from the Map and Reduce tasks.
> bin/hadoop fs –copyFromLocal ip2loc.dat ip2loc.dat
DistributedCache
from your driver program.Job job = new Job(getConf(), "log-analysis"); …… DistributedCache.addCacheFile(new URI("ip2loc.dat#ip2location"),job.getConfiguration());
setup()
method of your mapper or reducer and use the data in the Map()
or Reduce()
function.public class LogProcessorMap extends Mapper<Object, LogWritable, Text, IntWritable> { private IPLookup lookupTable; public void setup(Context context) throws IOException{ File lookupDb = new File("ip2location"); // Load the IP lookup table to memory lookupTable = IPLookup.LoadData(lookupDb); } public void map(…) { String country = lookupTable.getCountry(value.ipAddress); …… } }
Hadoop copies the files added to the DistributedCache
to all the worker nodes before the execution of any task of the job. DistributedCache
copies these files only once per the job. Hadoop also supports creating symlinks to the DistributedCache
files in the working directory of the computation by adding a fragment with the desired symlink name to the URI. In the following example, we are using ip2location
as the symlink to the ip2loc.dat
file in the DistributedCache
.
DistributedCache.addCacheFile(new URI("/data/ip2loc.dat#ip2location"),job.getConfiguration());
We parse and load the data from the DistributedCache
in the setup()
method of the mapper or the reducer. Files with symlinks are accessible from the working directory using the provided symlink's name.
private IPLookup lookup; public void setup(Context context) throws IOException{ File lookupDb = new File("ip2location"); // Load the IP lookup table to memory lookup = IPLookup.LoadData(lookupDb); } public void map(…) { String location =lookup.getGeoLocation(value.ipAddress); …… }
We can also access the data in the DistributedCache
directly using the getLocalCacheFiles()
method, without using the symlink.
Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
The following sections show you how to distribute the compressed archives using DistributedCache
, how to add resources to the DistributedCache
using the |command line and how to use the DistributedCache
to add resources to the classpath of the mapper and the reducer.
We can use the DistributedCache
to distribute archives as well. Hadoop extracts the archives in the worker nodes. You also can provide symlinks to the archives using the URI fragments. In the following example, we use the ip2locationdb
symlink for the ip2locationdb.tar.gz
archive.
Consider the following MapReduce driver program:
Job job = new Job(getConf(), "log-analysis"); DistributedCache.addCacheArchive(new URI("/data/ip2locationdb.tar.gz#ip2locationdb"),job.getConfiguration());
The extracted directory of the archive can be accessible from the working directory of the mapper or the reducer using the above provided symlink.
Consider the following mapper program:
public void setup(Context context) throws IOException{ Configuration conf = context.getConfiguration(); File lookupDbDir = new File("ip2locationdb"); String[] children = lookupDbDir.list(); … }
You can also access the non-extracted DistributedCache
archived files directly using the following method in the mapper or reducer implementation:
Path[] cachePath; public void setup(Context context) throws IOException{ Configuration conf = context.getConfiguration(); cachePath = DistributedCache.getLocalCacheArchives(conf); …. }
Hadoop supports adding files or archives to the DistributedCache
using the command line, provided that your MapReduce driver programs implement the org.apache.hadoop.util.Tool
interface or utilize the org.apache.hadoop.util.GenericOptionsParser
. Files can be added to the DistributedCache
using the –files
command-line option, while archives can be added using the –archives
command-line option. Files or archives can be in any filesystem accessible for Hadoop, including your local filesystem. These options support a comma-separated list of paths and the creation of symlinks using the URI fragments.
> bin/hadoop jar C4LogProcessor.jar LogProcessor-files ip2location.dat#ip2location indir outdir > bin/hadoop jar C4LogProcessor.jar LogProcessor-archives ip2locationdb.tar.gz#ip2locationdb indir outdir
You can use DistributedCache
to distribute JAR files and other dependent libraries to the mapper or reducer. You can use the following methods in your driver program to add the JAR files to the classpath of the JVM running the mapper or the reducer.
public static void addFileToClassPath(Path file,Configuration conf,FileSystem fs) public static void addArchiveToClassPath(Path archive,Configuration conf, FileSystem fs)
Similar to the –files
and –archives
command-line options we describe in Adding resources to the DistributedCache from the command line subsection, we can also add the JAR files to the classpath of our MapReduce computations by using the –libjars
command-line option as well. In order for the –libjars
command-line option to work, MapReduce driver programs should implement the Tool
interface or should utilize the GenericOptionsParser
.
> bin/hadoop jar C4LogProcessor.jar LogProcessor-libjars ip2LocationResolver.jar indir outdir