Implementing a custom Hadoop key type

The instances of Hadoop MapReduce key types should have the ability to compare against each other for sorting purposes. In order to be used as a key type in a MapReduce a computation, a Hadoop Writable data type should implement the org.apache.hadoop.io.WritableComparable<T> interface. The WritableComparable interface extends the org.apache.hadoop.io.Writable interface and adds the compareTo() method to perform the comparisons.

In this recipe, we modify the LogWritable data type of the Writing a custom Hadoop Writable data type recipe to implement the WritableComparable interface.

How to do it...

The following are the steps to implement a custom Hadoop WritableComparable data type for the HTTP server log entries, which uses the request host name and timestamp for comparison.

  1. Modify the LogWritable class to implement the org.apache.hadoop.io.WritableComparable interface.
    public class LogWritable implements 
         WritableComparable<LogWritable> {
    
      private Text userIP, timestamp, request;
      private IntWritable responseSize, status;
    
      public LogWritable() {
        this.userIP = new Text();
        this.timestamp=  new Text();
        this.request = new Text();
        this.responseSize = new IntWritable();
        this.status = new IntWritable();    
      }
    
      public void readFields(DataInput in) throws IOException {
        userIP.readFields(in);
        timestamp.readFields(in);
        request.readFields(in);
        responseSize.readFields(in);
        status.readFields(in);
      }
    
      public void write(DataOutput out) throws IOException {
        userIP.write(out);
        timestamp.write(out);
        request.write(out);
        responseSize.write(out);
        status.write(out);
      }
    
      public int compareTo(LogWritable o) {
        if (userIP.compareTo(o.userIP)==0){    
             return (timestamp.compareTo(o.timestamp));
        }else return (userIP.compareTo(o.userIP);
      }
    
      public boolean equals(Object o) {
        if (o instanceof LogWritable) {
             LogWritable other = (LogWritable) o;
             return userIP.equals(other.userIP) && timestamp.equals(other.timestamp);
        }
        return false;
      }
    
      public int hashCode()
      {  
        return userIP.hashCode();
      }
       ……… // getters and setters for the fields
    }
  2. You can use the LogWritable type as either a key type or a value type in your MapReduce computation. In the following example, we use the LogWritable type as the Map output key type.
    public class LogProcessorMap extends Mapper<LongWritable, 
    Text, LogWritable,IntWritable> {    
    …
    }
    
    public class LogProcessorReduce extends Reducer<LogWritable, 
    IntWritable, Text, IntWritable> {
    
    public void reduce(LogWritablekey, 
    Iterable<IntWritable> values, Context context) {
         ……  }
    }
  3. Configure the output types of the job accordingly.
    Job job = new Job(..);
    …
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);job.setMapOutputKeyClass(LogWritable.class);
    job.setMapOutputValueClass(IntWritable.class);

How it works...

The WritableComparable interface introduces the comapreTo() method in addition to the readFields() and write() methods of the Writable interface. The compareTo() method should return a negative integer, zero, or a positive integer, if this object is less than, equal to, or greater than the object being compared to respectively. In the LogWritable implementation, we consider the objects equal if both the user's IP address and the timestamp are the same. If the objects are not equal, we decide the sort order first based on the user IP address and then based on the timestamp.

  public int compareTo(LogWritable o) {
    if (userIP.compareTo(o.userIP)==0){      
        return (timestamp.compareTo(o.timestamp));
    }else return (userIP.compareTo(o.userIP);
  }

Hadoop uses HashPartitioner as the default Partitioner implementation to calculate the distribution of the intermediate data to the reducers. HashPartitioner requires the hashCode() method of the key objects to satisfy the following two properties:

  • Provide the same hash value across different JVM instances
  • Provide a uniform distribution of hash values

Hence, you must implement a stable hashCode() method for your custom Hadoop key types satisfying the above mentioned two requirements. In the LogWritable implementation, we use the hash code of the request hostname/IP address as the hash code of the LogWritable instance. This ensures that the intermediate LogWritable data will be partitioned based on the request hostname/IP address.

  public int hashCode()
  {    
    return userIP.hashCode();
  }

See also

  • Implementing a custom Hadoop Writable data type
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset