Emitting data of different value types from a mapper

Emitting data products belonging to multiple value types from a mapper is useful when performing reducer-side joins as well as when we need to avoid the complexity of having multiple MapReduce computations to summarize different types of properties in a data set. However, Hadoop reducers do not allow multiple input value types. In these scenarios, we can use the GenericWritable class to wrap multiple value instances belonging to different data types.

In this recipe, we reuse the HTTP server log entry analyzing sample of the Implementing a custom Hadoop Writable data type recipe. However, instead of using a custom data type, in the current recipe we output multiple value types from the mapper. This sample aggregates the total number of bytes served from the web server to a particular host and also outputs a tab-separated list of URLs requested by the particular host. We use IntWritable to output the number of bytes from the mapper and Text to output the request URL.

How to do it...

The following steps show how to implement a Hadoop GenericWritable data type that can wrap instances of either IntWritable or Text data types.

  1. Write a class extending the org.apache.hadoop.io.GenericWritable class. Implement the getTypes() method to return an array of the Writable classes that you will be using. If you are adding a custom constructor, make sure to add a parameter-less default constructor as well.
    public class MultiValueWritable extends GenericWritable {
    
      private static Class[] CLASSES =  new Class[]{
        IntWritable.class,
        Text.class
      };
      
      public MultiValueWritable(){    
      }
      
      public MultiValueWritable(Writable value){
        set(value);
      }
      
      protected Class[] getTypes() {
        return CLASSES;
      }
    }
  2. Set MultiValueWritable as the output value type of the mapper. Wrap the output Writable values of the mapper with instances of the MultiValueWritable class.
    public class LogProcessorMap extends
        Mapper<Object, Text, Text, MultiValueWritable> {
      private Text userHostText = new Text();
      private Text requestText = new Text();
      private IntWritableresponseSize = new IntWritable();
    
      public void map(Object key, Text value, 
                                  Context context)…{
        ……// parse the value (log entry) using a regex.
        userHostText.set(userHost); 
        requestText.set(request);  
        bytesWritable.set(responseSize);
    
        context.write(userHostText,
          newMultiValueWritable(requestText));
        context.write(userHostText,   
          newMultiValueWritable(responseSize));
      }
    }
  3. Set the reducer input value type as MultiValueWritable. Implement the reduce() method to handle multiple value types.
    public class LogProcessorReduce extends
        Reducer<Text,MultiValueWritable,Text,Text> {
      private Text result = new Text();
    
        public void reduce(Text key,Iterable<MultiValueWritable>    values, Context context)…{
        int sum = 0;
        StringBuilder requests = new StringBuilder();
        for (MultiValueWritable multiValueWritable : values) {
        Writable writable = multiValueWritable.get();
        if (writable instanceof IntWritable){
          sum += ((IntWritable)writable).get();
        }else{
          requests.append(((Text)writable).toString());
          requests.append("	");
        }
      }     
    result.set(sum + "	"+requests);
    context.write(key, result);
       }
    }
  4. Set MultiValueWritable as the Map output value class of this computation.
        Configuration conf = new Configuration();
        Job job = new Job(conf, "log-analysis");
        …
        job.setMapOutputValueClass(MultiValueWritable.class);

How it works...

The GenericWritable implementations should extend org.apache.hadoop.io.GenericWritable and should specify a set of the Writable value types to wrap, by returning an array of CLASSES from the getTypes() method. The GenericWritable implementations serialize and de-serialize the data using the index to this array of classes.

  private static Class[] CLASSES =  new Class[]{
    IntWritable.class,
    Text.class
  };

  protected Class[] getTypes() {
    return CLASSES;
  }

In the mapper, you wrap each of your values with instances of the GenericWritable implementation.

private Text requestText = new Text();
context.write(userHostText,new MultiValueWritable(requestText));

The reducer implementation has to take care of the different value types manually.

if (writable instanceof IntWritable){
  sum += ((IntWritable)writable).get();
}else{
  requests.append(((Text)writable).toString());
  requests.append("	");
}

There's more...

org.apache.hadoop.io.ObjectWritable is another class which can be used to achieve the same objective as GenericWritable. The ObjectWritable class can handle Java primitive types, strings, and arrays without the need of a Writable wrapper. However, Hadoop serializes the ObjectWritable instances by writing the class name of the instance with each serialized entry, making it inefficient compared to a GenericWritable class-based implementation.

See also

  • Implementing a custom Hadoop Writable data type
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset