Scenario

There are several cities, with a daily temperature provided for each city, and we have to calculate the city's average salary. However, there are certain rules to calculate the average. After calculating the city-wise total for each city, we can compute the average of temperature for each city:

Input Files (several files)

Map

(Parallel)

(, Value = Name)

Combiner

(Parallel)

Reducer

(Not Parallel)

Output

City 1

1<10,20,25,45,15,45,25,20>

2 <10,30,20,25,35>

1 <250,20>

2 <120,10>

1

Boston, < 250,20,155,

10,90,90,30>

2

New York, <120,10,175,10,135,

10,110,10,130,10>

Boston

<645>

New York

<720>

City 2

1<Boston>

2 <New York>

1 <Boston>

2 <New York>

 

 

 

 

 

Now, let's look at the complete example of a SingleMapperCombinerReducer job. For this, we will simply try to output the cityID and the average temperature from the temperature.csv file seen earlier.

The following is the code:

package io.somethinglikethis;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class SingleMapperCombinerReducer
{
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "City Temperature Job");
job.setMapperClass(TemperatureMapper.class);
job.setCombinerClass(TemperatureReducer.class);
job.setReducerClass(TemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}


/*
Date,Id,Temperature
2018-01-01,1,21
2018-01-01,2,22
*/
private static class TemperatureMapper
extends Mapper<Object, Text, Text, IntWritable> {

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String txt = value.toString();
String[] tokens = txt.split(",");
String date = tokens[0];
String id = tokens[1].trim();
String temperature = tokens[2].trim();
if (temperature.compareTo("Temperature") != 0)
context.write(new Text(id), new IntWritable(Integer.parseInt(temperature)));
}
}



private static class TemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
int n = 0;
for (IntWritable val : values) {
sum += val.get();
n +=1;
}
result.set(sum/n);
context.write(key, result);
}
}
}

Now, run the command, as shown in the following code:

hadoop jar target/uber-mapreduce-1.0-SNAPSHOT.jar io.somethinglikethis.SingleMapperCombinerReducer /user/normal/temperatures.csv /user/normal/output/SingleMapperCombinerReducer

The job will run, and you should be able to see output as shown in the following output counters:

Map-Reduce Framework
Map input records=28
Map output records=27
Map output bytes=162
Map output materialized bytes=54
Input split bytes=115
Combine input records=27
Combine output records=6
Reduce input groups=6
Reduce shuffle bytes=54
Reduce input records=6
Reduce output records=6
Spilled Records=12
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=11
Total committed heap usage (bytes)=1077936128

This shows that 27 records were output from mapper and there are 6 output records from reducer. However, note that there is now a combiner which takes 27 input records and outputs 6 records clearly demonstrating the performance gain by reducing the records shuffled from mappers to reducers. You will be able to check this using the HDFS browser by simply using http://localhost:9870 and jumping into the output directory shown under /user/normal/output, as shown in the following screenshot:

Now find the SingleMapperCombinerReducer folder and go into this directory and then drilldown as in SingleMapper section earlier and then using head/tail option in the preceding screen you can view the content of the file as shown in the following screenshot:

Figure: Check output in output directory

This shows the output of the SingleMapperCombinerReducer job as writing each row's cityID and the average temperature per cityID:

Figure: Screenshot showing output of the SingleMapperCombinerReducer
You can also use command line to view contents of output hdfs dfs -cat /user/normal/output/SingleMapperCombinerReducer/part-r-00000.

The output file contents are shown in the following code:

1 22
2 23
3 23
4 23
5 22
6 22

This concludes the SingleMapperCombinerReducer job execution and the output is as expected.

Next, we will look into more details on the patterns used in writing MapReduce jobs.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset