The instances of Hadoop MapReduce key
types should have the ability to compare against each other for sorting purposes. In order to be used as a key
type in a MapReduce a computation, a Hadoop Writable
data type should implement the org.apache.hadoop.io.WritableComparable<T>
interface. The WritableComparable
interface extends the org.apache.hadoop.io.Writable
interface and adds the compareTo()
method to perform the comparisons.
In this recipe, we modify the LogWritable
data type of the Writing a custom Hadoop Writable data type recipe to implement the WritableComparable
interface.
The following are the steps to implement a custom Hadoop WritableComparable
data type for the HTTP server log entries, which uses the request host name and timestamp for comparison.
LogWritable
class to implement the org.apache.hadoop.io.WritableComparable
interface.public class LogWritable implements WritableComparable<LogWritable> { private Text userIP, timestamp, request; private IntWritable responseSize, status; public LogWritable() { this.userIP = new Text(); this.timestamp= new Text(); this.request = new Text(); this.responseSize = new IntWritable(); this.status = new IntWritable(); } public void readFields(DataInput in) throws IOException { userIP.readFields(in); timestamp.readFields(in); request.readFields(in); responseSize.readFields(in); status.readFields(in); } public void write(DataOutput out) throws IOException { userIP.write(out); timestamp.write(out); request.write(out); responseSize.write(out); status.write(out); } public int compareTo(LogWritable o) { if (userIP.compareTo(o.userIP)==0){ return (timestamp.compareTo(o.timestamp)); }else return (userIP.compareTo(o.userIP); } public boolean equals(Object o) { if (o instanceof LogWritable) { LogWritable other = (LogWritable) o; return userIP.equals(other.userIP) && timestamp.equals(other.timestamp); } return false; } public int hashCode() { return userIP.hashCode(); } ……… // getters and setters for the fields }
LogWritable
type as either a key
type or a value
type in your MapReduce computation. In the following example, we use the LogWritable
type as the Map output key
type.public class LogProcessorMap extends Mapper<LongWritable, Text, LogWritable,IntWritable> { … } public class LogProcessorReduce extends Reducer<LogWritable, IntWritable, Text, IntWritable> { public void reduce(LogWritablekey, Iterable<IntWritable> values, Context context) { …… } }
Job job = new Job(..); … job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);job.setMapOutputKeyClass(LogWritable.class); job.setMapOutputValueClass(IntWritable.class);
The WritableComparable
interface introduces the comapreTo()
method in addition to the readFields()
and write()
methods of the Writable
interface. The compareTo()
method should return a negative integer, zero, or a positive integer, if this object is less than, equal to, or greater than the object being compared to respectively. In the LogWritable
implementation, we consider the objects equal if both the user's IP address and the timestamp are the same. If the objects are not equal, we decide the sort order first based on the user IP address and then based on the timestamp.
public int compareTo(LogWritable o) { if (userIP.compareTo(o.userIP)==0){ return (timestamp.compareTo(o.timestamp)); }else return (userIP.compareTo(o.userIP); }
Hadoop uses HashPartitioner
as the default Partitioner
implementation to calculate the distribution of the intermediate data to the reducers. HashPartitioner
requires the hashCode()
method of the key objects to satisfy the following two properties:
Hence, you must implement a stable hashCode()
method for your custom Hadoop key
types satisfying the above mentioned two requirements. In the LogWritable
implementation, we use the hash code of the request hostname/IP address as the hash code of the LogWritable
instance. This ensures that the intermediate LogWritable
data will be partitioned based on the request hostname/IP address.
public int hashCode() { return userIP.hashCode(); }