Hadoop intermediate (map to reduce) data partitioning

Hadoop partitions the intermediate data generated from the Map tasks across the reduce tasks of the computations. A proper partitioning function ensuring balanced load for each reduce task is crucial to the performance of MapReduce computations. Partitioning can also be used to group together related set of records to specific reduce tasks, where you want the certain outputs to be processed or grouped together.

Hadoop partitions the intermediate data based on the key space of the intermediate data and decides which reduce task will receive which intermediate record. The sorted set of keys and their values of a partition would be the input for a reduce task. In Hadoop, the total number of partitions should be equal to the number of reduce tasks for the MapReduce computation. Hadoop Partitioners should extend the org.apache.hadoop.mapreduce.Partitioner<KEY,VALUE> abstract class. Hadoop uses org.apache.hadoop.mapreduce.lib.partition.HashPartitioner as the default Partitioner for the MapReduce computations. HashPartitioner partitions the keys based on their hashcode(), using the formula key.hashcode() mod r, where r is the number of reduce tasks. The following diagram illustrates HashPartitioner for a computation with two reduce tasks:

Hadoop intermediate (map to reduce) data partitioning

There can be scenarios where our computations logic would require or can be better implemented using an application's specific data-partitioning schema. In this recipe, we implement a custom Partitioner for our HTTP log processing application, which partitions the keys (IP addresses) based on their geographic regions.

How to do it...

The following steps show you how to implement a custom Partitioner that partitions the intermediate data based on the location of the request IP address or the hostname.

  1. Implement the IPBasedPartitioner extending the Partitioner abstract class.
    public class IPBasedPartitioner extends Partitioner<Text, IntWritable>{
    
      public int getPartition(Text ipAddress, 
                 IntWritable value, int numPartitions) {
      String region = getGeoLocation(ipAddress);
    
      if (region!=null){
                  return ((region.hashCode() &Integer.MAX_VALUE) % numPartitions);
       }
      return 0;
      }
    } 
  2. Set the Partitioner class parameter in the Job object.
    Job job = new Job(getConf(), "log-analysis");
    ……
    job.setPartitionerClass(IPBasedPartitioner.class);

How it works...

In the above example, we perform the partitioning of the intermediate data, such that the requests from the same geographic region will be sent to the same reducer instance. The getGeoLocation() method returns the geographic location of the given IP address. We omit the implementation details of the getGeoLocation() method as it's not essential for the understanding of this example. We then obtain the hashCode() of the geographic location and perform a modulo operation to choose the reducer bucket for the request.

public int getPartition(Text ipAddress, IntWritable value, int numPartitions) {
  String region = getGeoLocation(ipAddress);

  if (region!= null && !region.isEmpty())){
      return ((region.hashCode() &Integer.MAX_VALUE) % numPartitions);
  }
  return 0;
}

There's more...

TotalOrderPartitioner and KeyFieldPartitioner are two of the several built-in Partitioner implementations provided by Hadoop.

TotalOrderPartitioner

TotalOrderPartitioner extends org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner<K,V>. The set of input records to a reducer are in a sorted order ensuring proper ordering within an input partition. However, the Hadoop default partitioning strategy (HashPartitioner) does not enforce an ordering when partitioning the intermediate data and scatters the keys among the partitions. In use cases where we want to ensure a global order, we can use the TotalOrderPartitioner to enforce a total order to the reduce input records across the reducer task. TotalOrderPartitioner requires a partition file as the input defining the ranges of the partitions.org.apache.hadoop.mapreduce.lib.partition.InputSampler utility allows us to generate a partition file for the TotalOrderPartitioner by sampling the input data. TotalOrderPartitioner is used in the Hadoop TeraSort benchmark.

Job job = new Job(getConf(), "Sort");
……
job.setPartitionerClass(TotalOrderPartitioner.class);
TotalOrderPartitioner.setPartitionFile(jobConf,partitionFile);

KeyFieldBasedPartitioner

org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedPartitioner<K,V> can be used to partition the intermediate data based on parts of the key. A key can be split into a set of fields by using a separator string. We can specify the indexes of the set of fields to be considered when partitioning. We can also specify the index of the characters within fields as well.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset