Hadoop partitions the intermediate data generated from the Map tasks across the reduce tasks of the computations. A proper partitioning function ensuring balanced load for each reduce task is crucial to the performance of MapReduce computations. Partitioning can also be used to group together related set of records to specific reduce tasks, where you want the certain outputs to be processed or grouped together.
Hadoop partitions the intermediate data based on the key space of the intermediate data and decides which reduce task will receive which intermediate record. The sorted set of keys and their values of a partition would be the input for a reduce task. In Hadoop, the total number of partitions should be equal to the number of reduce tasks for the MapReduce computation. Hadoop Partitioners
should extend the org.apache.hadoop.mapreduce.Partitioner<KEY,VALUE>
abstract class. Hadoop uses org.apache.hadoop.mapreduce.lib.partition.HashPartitioner
as the default Partitioner
for the MapReduce computations. HashPartitioner partitions the keys based on their hashcode()
, using the formula key.hashcode() mod r, where r is the number of reduce tasks. The following diagram illustrates HashPartitioner
for a computation with two reduce tasks:
There can be scenarios where our computations logic would require or can be better implemented using an application's specific data-partitioning schema. In this recipe, we implement a custom Partitioner
for our HTTP log processing application, which partitions the keys (IP addresses) based on their geographic regions.
The following steps show you how to implement a custom Partitioner
that partitions the intermediate data based on the location of the request IP address or the hostname.
IPBasedPartitioner
extending the Partitioner
abstract class.public class IPBasedPartitioner extends Partitioner<Text, IntWritable>{ public int getPartition(Text ipAddress, IntWritable value, int numPartitions) { String region = getGeoLocation(ipAddress); if (region!=null){ return ((region.hashCode() &Integer.MAX_VALUE) % numPartitions); } return 0; } }
Partitioner
class parameter in the Job
object.Job job = new Job(getConf(), "log-analysis"); …… job.setPartitionerClass(IPBasedPartitioner.class);
In the above example, we perform the partitioning of the intermediate data, such that the requests from the same geographic region will be sent to the same reducer instance. The getGeoLocation()
method returns the geographic location of the given IP address. We omit the implementation details of the getGeoLocation()
method as it's not essential for the understanding of this example. We then obtain the hashCode()
of the geographic location and perform a modulo operation to choose the reducer bucket for the request.
public int getPartition(Text ipAddress, IntWritable value, int numPartitions) { String region = getGeoLocation(ipAddress); if (region!= null && !region.isEmpty())){ return ((region.hashCode() &Integer.MAX_VALUE) % numPartitions); } return 0; }
TotalOrderPartitioner
and KeyFieldPartitioner
are two of the several built-in Partitioner
implementations provided by Hadoop.
TotalOrderPartitioner extends org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner<K,V>
. The set of input records to a reducer are in a sorted order ensuring proper ordering within an input partition. However, the Hadoop default partitioning strategy (HashPartitioner
) does not enforce an ordering when partitioning the intermediate data and scatters the keys among the partitions. In use cases where we want to ensure a global order, we can use the TotalOrderPartitioner
to enforce a total order to the reduce input records across the reducer task. TotalOrderPartitioner
requires a partition file as the input defining the ranges of the partitions.org.apache.hadoop.mapreduce.lib.partition.InputSampler
utility allows us to generate a partition file for the TotalOrderPartitioner
by sampling the input data. TotalOrderPartitioner
is used in the Hadoop TeraSort benchmark.
Job job = new Job(getConf(), "Sort"); …… job.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(jobConf,partitionFile);
org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedPartitioner<K,V>
can be used to partition the intermediate data based on parts of the key. A key can be split into a set of fields by using a separator string. We can specify the indexes of the set of fields to be considered when partitioning. We can also specify the index of the characters within fields as well.