Broadcasting and distributing shared resources to tasks in a MapReduce job – Hadoop DistributedCache

We can use the Hadoop DistributedCache to distribute read-only file based resources to the Map and Reduce tasks. These resources can be simple data files, archives or JAR files that are needed for the computations performed by the mappers or the reducers.

How to do it...

The following steps show you how to add a file to the Hadoop DistributedCache and how to retrieve it from the Map and Reduce tasks.

  1. Copy the resource to the HDFS. You can also use files that are already in the HDFS as well.
    > bin/hadoop fs –copyFromLocal ip2loc.dat ip2loc.dat
  2. Add the resource to the DistributedCache from your driver program.
    Job job = new Job(getConf(), "log-analysis");
    ……
    DistributedCache.addCacheFile(new URI("ip2loc.dat#ip2location"),job.getConfiguration());
  3. Retrieve the resource in the setup() method of your mapper or reducer and use the data in the Map() or Reduce() function.
    public class LogProcessorMap extends Mapper<Object, LogWritable, Text, IntWritable> {
      private IPLookup lookupTable;
      
      public void setup(Context context) throws IOException{
            
          File lookupDb = new File("ip2location");
          // Load the IP lookup table to memory
          lookupTable = IPLookup.LoadData(lookupDb);
      }
    
      public void map(…) {
         String country =    lookupTable.getCountry(value.ipAddress);
       ……
      }
    }

How it works...

Hadoop copies the files added to the DistributedCache to all the worker nodes before the execution of any task of the job. DistributedCache copies these files only once per the job. Hadoop also supports creating symlinks to the DistributedCache files in the working directory of the computation by adding a fragment with the desired symlink name to the URI. In the following example, we are using ip2location as the symlink to the ip2loc.dat file in the DistributedCache.

DistributedCache.addCacheFile(new  URI("/data/ip2loc.dat#ip2location"),job.getConfiguration());

We parse and load the data from the DistributedCache in the setup() method of the mapper or the reducer. Files with symlinks are accessible from the working directory using the provided symlink's name.

private IPLookup lookup;
public void setup(Context context) throws IOException{
        
  File lookupDb = new File("ip2location");
  // Load the IP lookup table to memory
  lookup = IPLookup.LoadData(lookupDb);   
} 

public void map(…) {
  String location =lookup.getGeoLocation(value.ipAddress);
   ……
}  

We can also access the data in the DistributedCache directly using the getLocalCacheFiles() method, without using the symlink.

Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);

Note

DistributedCache do not work in the Hadoop local mode.

There's more...

The following sections show you how to distribute the compressed archives using DistributedCache, how to add resources to the DistributedCache using the |command line and how to use the DistributedCache to add resources to the classpath of the mapper and the reducer.

Distributing archives using the DistributedCache

We can use the DistributedCache to distribute archives as well. Hadoop extracts the archives in the worker nodes. You also can provide symlinks to the archives using the URI fragments. In the following example, we use the ip2locationdb symlink for the ip2locationdb.tar.gz archive.

Consider the following MapReduce driver program:

Job job = new Job(getConf(), "log-analysis");  
DistributedCache.addCacheArchive(new URI("/data/ip2locationdb.tar.gz#ip2locationdb"),job.getConfiguration());

The extracted directory of the archive can be accessible from the working directory of the mapper or the reducer using the above provided symlink.

Consider the following mapper program:

  public void setup(Context context) throws IOException{
    Configuration conf = context.getConfiguration();
        
    File lookupDbDir = new File("ip2locationdb");
    String[] children = lookupDbDir.list();
    
    …
  }

You can also access the non-extracted DistributedCache archived files directly using the following method in the mapper or reducer implementation:

Path[] cachePath;

public void setup(Context context) throws IOException{
  Configuration conf = context.getConfiguration();
  cachePath = DistributedCache.getLocalCacheArchives(conf);
    ….
}

Adding resources to the DistributedCache from the command line

Hadoop supports adding files or archives to the DistributedCache using the command line, provided that your MapReduce driver programs implement the org.apache.hadoop.util.Tool interface or utilize the org.apache.hadoop.util.GenericOptionsParser. Files can be added to the DistributedCache using the –files command-line option, while archives can be added using the –archives command-line option. Files or archives can be in any filesystem accessible for Hadoop, including your local filesystem. These options support a comma-separated list of paths and the creation of symlinks using the URI fragments.

> bin/hadoop jar C4LogProcessor.jar LogProcessor-files ip2location.dat#ip2location  indir outdir
> bin/hadoop jar C4LogProcessor.jar LogProcessor-archives ip2locationdb.tar.gz#ip2locationdb indir outdir

Adding resources to the classpath using DistributedCache

You can use DistributedCache to distribute JAR files and other dependent libraries to the mapper or reducer. You can use the following methods in your driver program to add the JAR files to the classpath of the JVM running the mapper or the reducer.

public static void addFileToClassPath(Path file,Configuration conf,FileSystem fs)

public static void addArchiveToClassPath(Path archive,Configuration conf, FileSystem fs)

Similar to the –files and –archives command-line options we describe in Adding resources to the DistributedCache from the command line subsection, we can also add the JAR files to the classpath of our MapReduce computations by using the –libjars command-line option as well. In order for the –libjars command-line option to work, MapReduce driver programs should implement the Tool interface or should utilize the GenericOptionsParser.

> bin/hadoop jar C4LogProcessor.jar LogProcessor-libjars ip2LocationResolver.jar  indir outdir

See also

  • The Debug scripts – analyzing task failures recipe in Chapter 3, Advanced Hadoop MapReduce Administration.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset