The MapReduce framework provides Counters as an efficient mechanism for tracking the occurrences of global events within the map and reduce phases of jobs. For example, a typical MapReduce job will kick off several mapper instances, one for each block of the input data, all running the same code. These instances are part of the same job, but run independent of one another. Counters allow a developer to track aggregated events from all of those separate instances.
A more concrete use of Counters can be found in the MapReduce framework itself. Each MapReduce job defines several standard Counters. The output of these Counters can be found in the job details of the Job Tracker web UI.
The UI shows the Counter group, name, mapper totals, reducer totals, and job totals.
Counters should be limited to tracking metadata about the job. The standard Counters are good examples of this. The Map input records counter provides useful information about a particular execution of a job. If Counters did not exist, these kinds of statistics would have to be part of the job's main output, where they don't belong; or more likely as part of a secondary output, complicating the logic of the job.
The following recipe is a simple map-only job that filters out bad records and uses a counter to log the number of records that were filtered.
You will need to download the weblog_entries_bad_records.txt
dataset from the Packt website, http://www.packtpub.com/support.
weblog_entries_bad_records.txt
file from the local file system into the new folder created in HDFS:hadoop fs –copyFromLocal weblog_entries.txt /data/weblogs
CountersExample
job:hadoop jar ./CountersExample.jar com.packt.hadoop.solutions.CounterExample /data/weblogs/weblog_entries_bad_records.txt /data/weblogs/weblog_entries_clean.txt
localhost:50030
. Scroll down to the Completed Jobs section. Then locate the CounterExample job. The most recent jobs are at the bottom of the table. Once the job has been located, click on Jobid. This page has high-level statistics about the job, including the Counters.Counters are defined in groups. In Java, each Counter group is an Enum. In the CounterExample job, an Enum for tracking the count of each type of bad record was defined:
static enum BadRecords{INVALID_NUMBER_OF_COLUMNS, INVALID_IP_ADDRESS};
In the map function, there are two checks for valid data. The first check splits the data delimited by tabs. For this example, if properly formed, each record should have five columns. If a record does not have five columns, a call is made to the
Context
class to get the counter for BadRecords.INVALID_NUMBER_OF_COLUMNS
. The counter is then incremented by 1.
String record = value.toString(); String [] columns = record.split(" "); // Check for valid number of columns if (columns.length != 5) { context.getCounter(BadRecords.INVALID_NUMBER_OF_COLUMNS).increment(1); return; }
The second check is for validating IP addresses. A regular expression, VALID_IP_ADDRESS
, is defined. As its name implies, this regular expression will match valid IP addresses.
private static final String VALID_IP_ADDRESS = "^([01]?\d\d?|2[0-4]\d|25[0-5])\.([01]?\d\d?|2[0-4]\d|25[0-5])\." + "([01]?\d\d?|2[0-4]\d|25[0-5])\.([01]?\d\d?|2[0-4]\d|25[0-5])$";
The VALID_IP_ADDRESS
regular expression is used to check every record's IP address column for a match. For each record that does not match, the
INVALID_IP_ADDRESS
counter is incremented.
// Check for valid IP addresses Matcher matcher = pattern.matcher(columns[4]); If (!matcher.matches()) { context.getCounter(BadRecords.INVALID_IP_ADDRESS).increment(1); return; }
Each increment of a counter is first stored locally by each mapper. The counter values are then sent to the Task Tracker for a second level of aggregation. Finally, the values are sent to the Job Tracker where the global aggregation takes place.