There are several cities, with a daily temperature provided for each city, and we have to calculate the city's average salary. However, there are certain rules to calculate the average. After calculating the city-wise total for each city, we can compute the average of temperature for each city:
Input Files (several files) |
Map (Parallel) (, Value = Name) |
Combiner (Parallel) |
Reducer (Not Parallel) |
Output |
City 1 |
1<10,20,25,45,15,45,25,20> 2 <10,30,20,25,35> |
1 <250,20> 2 <120,10> |
1 Boston, < 250,20,155, 10,90,90,30> 2 New York, <120,10,175,10,135, 10,110,10,130,10> |
Boston <645> New York <720> |
City 2 |
1<Boston> 2 <New York> |
1 <Boston> 2 <New York> |
|
|
Now, let's look at the complete example of a SingleMapperCombinerReducer job. For this, we will simply try to output the cityID and the average temperature from the temperature.csv file seen earlier.
The following is the code:
package io.somethinglikethis;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class SingleMapperCombinerReducer
{
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "City Temperature Job");
job.setMapperClass(TemperatureMapper.class);
job.setCombinerClass(TemperatureReducer.class);
job.setReducerClass(TemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
/*
Date,Id,Temperature
2018-01-01,1,21
2018-01-01,2,22
*/
private static class TemperatureMapper
extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String txt = value.toString();
String[] tokens = txt.split(",");
String date = tokens[0];
String id = tokens[1].trim();
String temperature = tokens[2].trim();
if (temperature.compareTo("Temperature") != 0)
context.write(new Text(id), new IntWritable(Integer.parseInt(temperature)));
}
}
private static class TemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
int n = 0;
for (IntWritable val : values) {
sum += val.get();
n +=1;
}
result.set(sum/n);
context.write(key, result);
}
}
}
Now, run the command, as shown in the following code:
hadoop jar target/uber-mapreduce-1.0-SNAPSHOT.jar io.somethinglikethis.SingleMapperCombinerReducer /user/normal/temperatures.csv /user/normal/output/SingleMapperCombinerReducer
The job will run, and you should be able to see output as shown in the following output counters:
Map-Reduce Framework
Map input records=28
Map output records=27
Map output bytes=162
Map output materialized bytes=54
Input split bytes=115
Combine input records=27
Combine output records=6
Reduce input groups=6
Reduce shuffle bytes=54
Reduce input records=6
Reduce output records=6
Spilled Records=12
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=11
Total committed heap usage (bytes)=1077936128
This shows that 27 records were output from mapper and there are 6 output records from reducer. However, note that there is now a combiner which takes 27 input records and outputs 6 records clearly demonstrating the performance gain by reducing the records shuffled from mappers to reducers. You will be able to check this using the HDFS browser by simply using http://localhost:9870 and jumping into the output directory shown under /user/normal/output, as shown in the following screenshot:
Now find the SingleMapperCombinerReducer folder and go into this directory and then drilldown as in SingleMapper section earlier and then using head/tail option in the preceding screen you can view the content of the file as shown in the following screenshot:
This shows the output of the SingleMapperCombinerReducer job as writing each row's cityID and the average temperature per cityID:
The output file contents are shown in the following code:
1 22
2 23
3 23
4 23
5 22
6 22
This concludes the SingleMapperCombinerReducer job execution and the output is as expected.
Next, we will look into more details on the patterns used in writing MapReduce jobs.