This chapter is different from the others in that it doesn’t contain patterns for solving a particular problem, but patterns that deal with patterns. The term metapatterns is directly translated to “patterns about patterns.” The first method that will be discussed is job chaining, which is piecing together several patterns to solve complex, multistage problems. The second method is job merging, which is an optimization for performing several analytics in the same MapReduce job, effectively killing multiple birds with one stone.
Job chaining is extremely important to understand and have an operational plan for in your environment. Many people find that they can’t solve a problem with a single MapReduce job. Some jobs in a chain will run in parallel, some will have their output fed into other jobs, and so on. Once you start to understand how to start solving problems as a series of MapReduce jobs, you’ll be able to tackle a whole new class of challenges.
Job chaining is one of the more complicated processes to handle because it’s not a feature out of the box in most MapReduce frameworks. Systems like Hadoop are designed for handling one MapReduce job very well, but handling a multistage job takes a lot of manual coding. There are operational considerations for handling failures in the stages of the job and cleaning up intermediate output. In this section, a few different approaches to job chaining will be discussed. Some will seem more appealing than others for your particular environment, as each has its own pros and cons.
A couple of frameworks and tools have emerged to fill this niche. If you do a lot of job flows and your chaining is pretty complex, you should consider using one of these. The approaches described in this section are more lightweight and need to be implemented on a job-by-job basis. Oozie, an open source Apache project, has functionality for building workflows and coordinating job running. Building job chains is only one of the many features that are useful for operationally running Hadoop MapReduce.
One particular common pitfall is to use MapReduce for something that is small enough that distributing the job is not necessary. If you think chaining two jobs together is the right choice, think about how much output there is from the first. If there are tons of output data, then by all means use a second MapReduce job. Many times, however, the output of the job is small and can be processed quite effectively on a single node. The two ways of doing this is to either load the data through the file system API in the driver after the job has completed, or incorporate it in some sort of bash script wrapper.
A major problem with MapReduce chains is the size of the temporary files. In some cases, they may be tiny, which will cause a significant amount of overhead in firing up way too many map tasks to load them.
In a nonchained job, the number of reducers typically depends more on the amount of data they are receiving than the amount of data you’d like to output. When chaining, the size of the output files is likely more important, even if the reducers will take a bit longer. Try to shoot for output files about the size of one block on the distributed filesystem. Just play around with the number of reducers and see what the impact is on performance (which is good advice in general).
The other option is to consistently use CombineFileInputFormat
for jobs that load
intermittent output. CombineFileInputFormat
takes smaller blocks
and lumps them together to make a larger input split before being
processed by the mapper.
Probably the simplest method for performing job chaining is to have a master driver that simply fires off multiple job-specific drivers. There’s nothing special about a MapReduce driver in Hadoop; it’s pretty generic Java. It doesn’t derive from some sort of special class or anything.
Take the driver for each MapReduce job and call them in the sequence they should run. You’ll have to specifically be sure that the output path of the first job is the input path of the second. You can be sure of this by storing the temporary path string as a variable and sharing it.
In a production scenario, the temporary directories should be cleaned up so they don’t linger past the completion of the job. Lack of discipline here can surprisingly fill up your cluster rather quickly. Also, be careful of how much temporary data you are actually creating because you’ll need storage in your file system to store that data.
You can pretty easily extrapolate this approach to create chains that are much longer than just two jobs. Just be sure to keep track of all of the temporary paths and optionally clean up the data not being used anymore as the job runs.
You can also fire off multiple jobs in parallel by using Job.submit()
instead of Job.waitForCompletion()
. The submit method
returns immediately to the current thread and runs the job in the
background. This allows you to run several jobs at once. Use Job.isComplete()
, a
nonblocking job completion check, to constantly poll to see whether all
of the jobs are complete.
The other thing to pay attention to is job success. It’s not good enough to just know that the job completed. You also need to check whether it succeeded or not. If a dependency job failed, you should break out of the entire chain instead of trying to let it continue.
It’s pretty obvious that this process is going to be rather
difficult to manage and maintain from a software engineering prospective
as the job chains get more complicated. This is where something
like JobControl
or
Oozie comes in.
The goal of this example is to output a list of users along with a couple pieces of information: their reputations and how many posts each has issued. This could be done in a single MapReduce job, but we also want to separate users into those with an above-average number of posts and those with a below-average number. We need one job to perform the counts and another to separate the users into two bins based on the number of posts. Four different patterns are used in this example: numerical summarization, counting, binning, and a replicated join. The final output consists of a user ID, the number of times they posted, and their reputation.
The average number of posts per user is calculated between the
two jobs using the framework’s counters. The users data set is put in
the DistributedCache
in the
second job to enrich the output data with the users’
reputations. This enrichment occurs in order to feed in to the next
example in this section, which calculates the average reputation of
the users in the two bins.
The following descriptions of each code section explain the solution to the problem.
Problem: Given a data set of StackOverflow posts, bin users based on if they are below or above the number of average posts per user. Also to enrich each user with his or her reputation from a separate data set when generating the output.
Before we look at the driver, let’s get an understanding of
the mapper and reducer for both jobs. The mapper records the user ID
from each record by assigning the value of the OwnerUserId
attribute as the output key
for the job, with a count of one as the value. It also increments a
record counter by one. This value is later used in the driver to
calculate the average number of posts per user. The AVERAGE_CALC_GROUP
is a public static
string at the driver
level.
public
static
class
UserIdCountMapper
extends
Mapper
<
Object
,
Text
,
Text
,
LongWritable
>
{
public
static
final
String
RECORDS_COUNTER_NAME
=
"Records"
;
private
static
final
LongWritable
ONE
=
new
LongWritable
(
1
);
private
Text
outkey
=
new
Text
();
public
void
map
(
Object
key
,
Text
value
,
Context
context
)
throws
IOException
,
InterruptedException
{
Map
<
String
,
String
>
parsed
=
MRDPUtils
.
transformXmlToMap
(
value
.
toString
());
String
userId
=
parsed
.
get
(
"OwnerUserId"
);
if
(
userId
!=
null
)
{
outkey
.
set
(
userId
);
context
.
write
(
outkey
,
ONE
);
context
.
getCounter
(
AVERAGE_CALC_GROUP
,
RECORDS_COUNTER_NAME
).
increment
(
1
);
}
}
}
The reducer is also fairly trivial. It simply iterates through the input values (all of which we set to 1) and keeps a running sum, which is output along with the input key. A different counter is also incremented by one for each reduce group, in order to calculate the average.
public
static
class
UserIdSumReducer
extends
Reducer
<
Text
,
LongWritable
,
Text
,
LongWritable
>
{
public
static
final
String
USERS_COUNTER_NAME
=
"Users"
;
private
LongWritable
outvalue
=
new
LongWritable
();
public
void
reduce
(
Text
key
,
Iterable
<
LongWritable
>
values
,
Context
context
)
throws
IOException
,
InterruptedException
{
// Increment user counter, as each reduce group represents one user
context
.
getCounter
(
AVERAGE_CALC_GROUP
,
USERS_COUNTER_NAME
).
increment
(
1
);
int
sum
=
0
;
for
(
LongWritable
value
:
values
)
{
sum
+=
value
.
get
();
}
outvalue
.
set
(
sum
);
context
.
write
(
key
,
outvalue
);
}
}
This mapper is more complicated than the previous jobs. It is
doing a few different things to get the desired output. The setup
phase accomplishes three different things. The average number of
posts per user is pulled from the Context
object that was set during job
configuration. The MultipleOutputs
utility is initialized as well. This is used to write the output to
different bins. Finally, the user data set is parsed from the
DistributedCache
to build a map
of user ID to reputation. This map is used for the desired data
enrichment during output.
Compared to the setup, the map method is much easier. The input value is parsed to get the user ID and number of times posted. This is done by simply splitting on tabs and getting the first two fields of data. Then the mapper sets the output key to the user ID and the output value to the number of posts along with the user’s reputation, delimited by a tab. The user’s number of posts is then compared to the average, and the user is binned appropriately.
An optional fourth parameter of MultipleOutputs.write
is used in this
example to name each part file. A constant is used to specify the
directory for users based on whether they are below or above average
in their number of posts. The filename in the folder is named
through an extra /part
string.
This becomes the beginning of the filename, to which the framework
will append -m-
nnnnn
, where
nnnnn
is the task ID number. With this
name, a folder will be created for both bins and the folders will
contain a number of part
files.
This is done for easier input/output management for the next example
on parallel jobs.
Finally, MultipleOutputs
is
closed in the cleanup stage.
public
static
class
UserIdBinningMapper
extends
Mapper
<
Object
,
Text
,
Text
,
Text
>
{
public
static
final
String
AVERAGE_POSTS_PER_USER
=
"avg.posts.per.user"
;
public
static
void
setAveragePostsPerUser
(
Job
job
,
double
avg
)
{
job
.
getConfiguration
().
set
(
AVERAGE_POSTS_PER_USER
,
Double
.
toString
(
avg
));
}
public
static
double
getAveragePostsPerUser
(
Configuration
conf
)
{
return
Double
.
parseDouble
(
conf
.
get
(
AVERAGE_POSTS_PER_USER
));
}
private
double
average
=
0.0
;
private
MultipleOutputs
<
Text
,
Text
>
mos
=
null
;
private
Text
outkey
=
new
Text
(),
outvalue
=
new
Text
();
private
HashMap
<
String
,
String
>
userIdToReputation
=
new
HashMap
<
String
,
String
>();
protected
void
setup
(
Context
context
)
throws
IOException
,
InterruptedException
{
average
=
getAveragePostsPerUser
(
context
.
getConfiguration
());
mos
=
new
MultipleOutputs
<
Text
,
Text
>(
context
);
Path
[]
files
=
DistributedCache
.
getLocalCacheFiles
(
context
.
getConfiguration
());
// Read all files in the DistributedCache
for
(
Path
p
:
files
)
{
BufferedReader
rdr
=
new
BufferedReader
(
new
InputStreamReader
(
new
GZIPInputStream
(
new
FileInputStream
(
new
File
(
p
.
toString
())))));
String
line
;
// For each record in the user file
while
((
line
=
rdr
.
readLine
())
!=
null
)
{
// Get the user ID and reputation
Map
<
String
,
String
>
parsed
=
MRDPUtils
.
transformXmlToMap
(
line
);
// Map the user ID to the reputation
userIdToReputation
.
put
(
parsed
.
get
(
"Id"
),
parsed
.
get
(
"Reputation"
));
}
}
}
public
void
map
(
Object
key
,
Text
value
,
Context
context
)
throws
IOException
,
InterruptedException
{
String
[]
tokens
=
value
.
toString
().
split
(
" "
);
String
userId
=
tokens
[
0
];
int
posts
=
Integer
.
parseInt
(
tokens
[
1
]);
outkey
.
set
(
userId
);
outvalue
.
set
((
long
)
posts
+
" "
+
userIdToReputation
.
get
(
userId
));
if
((
double
)
posts
<
average
)
{
mos
.
write
(
MULTIPLE_OUTPUTS_BELOW_NAME
,
outkey
,
outvalue
,
MULTIPLE_OUTPUTS_BELOW_NAME
+
"/part"
);
}
else
{
mos
.
write
(
MULTIPLE_OUTPUTS_ABOVE_NAME
,
outkey
,
outvalue
,
MULTIPLE_OUTPUTS_ABOVE_NAME
+
"/part"
);
}
}
protected
void
cleanup
(
Context
context
)
throws
IOException
,
InterruptedException
{
mos
.
close
();
}
}
Now let’s take a look at this more complicated driver. It is broken down into two sections for discussion: the first job and the second job. The first job starts by parsing command-line arguments to create proper input and output directories. It creates an intermediate directory that will be deleted by the driver at the end of the job chain.
A string is tacked on to the name of the output directory here to make our intermediate output directory. This is fine for the most part, but it may be a good idea to come up with a naming convention for any intermediate directories to avoid conflicts. If an output directory already exists during job submission, the job will never start.
public
static
void
main
(
String
[]
args
)
throws
Exception
{
Configuration
conf
=
new
Configuration
();
Path
postInput
=
new
Path
(
args
[
0
]);
Path
userInput
=
new
Path
(
args
[
1
]);
Path
outputDirIntermediate
=
new
Path
(
args
[
2
]
+
"_int"
);
Path
outputDir
=
new
Path
(
args
[
2
]);
// Setup first job to counter user posts
Job
countingJob
=
new
Job
(,
"JobChaining-Counting"
);
countingJob
.
setJarByClass
(
JobChainingDriver
.
class
);
// Set our mapper and reducer, we can use the API's long sum reducer for
// a combiner!
countingJob
.
setMapperClass
(
UserIdCountMapper
.
class
);
countingJob
.
setCombinerClass
(
LongSumReducer
.
class
);
countingJob
.
setReducerClass
(
UserIdSumReducer
.
class
);
countingJob
.
setOutputKeyClass
(
Text
.
class
);
countingJob
.
setOutputValueClass
(
LongWritable
.
class
);
countingJob
.
setInputFormatClass
(
TextInputFormat
.
class
);
TextInputFormat
.
addInputPath
(
countingJob
,
postInput
);
countingJob
.
setOutputFormatClass
(
TextOutputFormat
.
class
);
TextOutputFormat
.
setOutputPath
(
countingJob
,
outputDirIntermediate
);
// Execute job and grab exit code
int
code
=
countingJob
.
waitForCompletion
(
true
)
?
0
:
1
;
...
The first job is checked for success before executing the
second job. This seems simple enough, but with more complex job
chains it can get a little annoying. Before the second job is
configured, we grab the counter values from the first job to get the
average posts per user. This value is then added to the job
configuration. We set our mapper code and disable the reduce phase,
as this is a map-only job. The other key parts to pay attention to
are the configuration of MultipleOutputs
and the DistributedCache
.
The job is then executed and the framework takes over
from there.
Lastly and most importantly, success or failure, the intermediate output directory is cleaned up. This is an important and often overlooked step. Leaving any intermediate output will fill up a cluster quickly and require you to delete the output by hand. If you won’t be needing the intermediate output for any other analytics, by all means delete it in the code.
if
(
code
==
0
)
{
// Calculate the average posts per user by getting counter values
double
numRecords
=
(
double
)
countingJob
.
getCounters
()
.
findCounter
(
AVERAGE_CALC_GROUP
,
UserIdCountMapper
.
RECORDS_COUNTER_NAME
).
getValue
();
double
numUsers
=
(
double
)
countingJob
.
getCounters
()
.
findCounter
(
AVERAGE_CALC_GROUP
,
UserIdSumReducer
.
USERS_COUNTER_NAME
).
getValue
();
double
averagePostsPerUser
=
numRecords
/
numUsers
;
// Setup binning job
Job
binningJob
=
new
Job
(
new
Configuration
(),
"JobChaining-Binning"
);
binningJob
.
setJarByClass
(
JobChainingDriver
.
class
);
// Set mapper and the average posts per user
binningJob
.
setMapperClass
(
UserIdBinningMapper
.
class
);
UserIdBinningMapper
.
setAveragePostsPerUser
(
binningJob
,
averagePostsPerUser
);
binningJob
.
setNumReduceTasks
(
0
);
binningJob
.
setInputFormatClass
(
TextInputFormat
.
class
);
TextInputFormat
.
addInputPath
(
binningJob
,
outputDirIntermediate
);
// Add two named outputs for below/above average
MultipleOutputs
.
addNamedOutput
(
binningJob
,
MULTIPLE_OUTPUTS_BELOW_NAME
,
TextOutputFormat
.
class
,
Text
.
class
,
Text
.
class
);
MultipleOutputs
.
addNamedOutput
(
binningJob
,
MULTIPLE_OUTPUTS_ABOVE_NAME
,
TextOutputFormat
.
class
,
Text
.
class
,
Text
.
class
);
MultipleOutputs
.
setCountersEnabled
(
binningJob
,
true
);
TextOutputFormat
.
setOutputPath
(
binningJob
,
outputDir
);
// Add the user files to the DistributedCache
FileStatus
[]
userFiles
=
FileSystem
.
get
(
conf
).
listStatus
(
userInput
);
for
(
FileStatus
status
:
userFiles
)
{
DistributedCache
.
addCacheFile
(
status
.
getPath
().
toUri
(),
binningJob
.
getConfiguration
());
}
// Execute job and grab exit code
code
=
binningJob
.
waitForCompletion
(
true
)
?
0
:
1
;
}
// Clean up the intermediate output
FileSystem
.
get
(
conf
).
delete
(
outputDirIntermediate
,
true
);
System
.
exit
(
code
);
}
The driver in parallel job chaining is similar to the previous example. The only big enhancement is that jobs are submitted in parallel and then monitored until completion. The two jobs run in this example are independent. (However, they require the previous example to have completed successfully.) This has the added benefit of utilizing cluster resources better to have them execute simultaneously.
The following descriptions of each code section explain the solution to the problem.
Problem: Given the previous example’s output of binned users, run parallel jobs over both bins to calculate the average reputation of each user.
The mapper splits the input value into a string array. The
third column of this index is the reputation of the particular user.
This reputation is output with a unique key. This key is shared
across all map tasks in order to group all the reputations together
for the average calculation. NullWritable
can be used to group all the records together, but we want
the key to have a meaningful value.
This can be expensive for very large data sets, as one reducer is responsible for streaming all the intermediate key/value pairs over the network. The added benefit here over serially reading the data set on one node is that the input splits are read in parallel and the reducers use a configurable number of threads to read each mapper’s output.
public
static
class
AverageReputationMapper
extends
Mapper
<
LongWritable
,
Text
,
Text
,
DoubleWritable
>
{
private
static
final
Text
GROUP_ALL_KEY
=
new
Text
(
"Average Reputation:"
);
private
DoubleWritable
outvalue
=
new
DoubleWritable
();
protected
void
map
(
LongWritable
key
,
Text
value
,
Context
context
)
throws
IOException
,
InterruptedException
{
// Split the line into tokens
String
[]
tokens
=
value
.
toString
().
split
(
" "
);
// Get the reputation from the third column
double
reputation
=
Double
.
parseDouble
(
tokens
[
2
]);
// Set the output value and write to context
outvalue
.
set
(
reputation
);
context
.
write
(
GROUP_ALL_KEY
,
outvalue
);
}
}
The reducer simply iterates through the reputation values, summing the numbers and keeping a count. The average is then calculated and output with the input key.
public
static
class
AverageReputationReducer
extends
Reducer
<
Text
,
DoubleWritable
,
Text
,
DoubleWritable
>
{
private
DoubleWritable
outvalue
=
new
DoubleWritable
();
protected
void
reduce
(
Text
key
,
Iterable
<
DoubleWritable
>
values
,
Context
context
)
throws
IOException
,
InterruptedException
{
double
sum
=
0.0
;
double
count
=
0
;
for
(
DoubleWritable
dw
:
values
)
{
sum
+=
dw
.
get
();
++
count
;
}
outvalue
.
set
(
sum
/
count
);
context
.
write
(
key
,
outvalue
);
}
}
The driver code parses command-line arguments to get the input
and output directories for both jobs. A helper function is then
called to submit the job configuration, which we will look at next.
The Job
objects for both are then
returned and monitored for job completion. So long as either job is
still running, the driver goes back to sleep for five seconds. Once
both jobs are complete, they are checked for success or failure and
an appropriate log message is printed. An exit code is then returned
based on job success.
public
static
void
main
(
String
[]
args
)
throws
Exception
{
Configuration
conf
=
new
Configuration
();
Path
belowAvgInputDir
=
new
Path
(
args
[
0
]);
Path
aboveAvgInputDir
=
new
Path
(
args
[
1
]);
Path
belowAvgOutputDir
=
new
Path
(
args
[
2
]);
Path
aboveAvgOutputDir
=
new
Path
(
args
[
3
]);
Job
belowAvgJob
=
submitJob
(
conf
,
belowAvgInputDir
,
belowAvgOutputDir
);
Job
aboveAvgJob
=
submitJob
(
conf
,
aboveAvgInputDir
,
aboveAvgOutputDir
);
// While both jobs are not finished, sleep
while
(!
belowAvgJob
.
isComplete
()
||
!
aboveAvgJob
.
isComplete
())
{
Thread
.
sleep
(
5000
);
}
if
(
belowAvgJob
.
isSuccessful
())
{
System
.
out
.
println
(
"Below average job completed successfully!"
);
}
else
{
System
.
out
.
println
(
"Below average job failed!"
);
}
if
(
aboveAvgJob
.
isSuccessful
())
{
System
.
out
.
println
(
"Above average job completed successfully!"
);
}
else
{
System
.
out
.
println
(
"Above average job failed!"
);
}
System
.
exit
(
belowAvgJob
.
isSuccessful
()
&&
aboveAvgJob
.
isSuccessful
()
?
0
:
1
);
}
This helper function is configured for each job. It looks very
standard to any other configuration, except Job.submit
is used rather than Job.waitForCompletion
. This will submit
the job and then immediately return, allowing the application to
continue. As we saw, the returned Job
is monitored in the main
method until completion.
private
static
Job
submitJob
(
Configuration
conf
,
Path
inputDir
,
Path
outputDir
)
throws
Exception
{
Job
job
=
new
Job
(
conf
,
"ParallelJobs"
);
job
.
setJarByClass
(
ParallelJobs
.
class
);
job
.
setMapperClass
(
AverageReputationMapper
.
class
);
job
.
setReducerClass
(
AverageReputationReducer
.
class
);
job
.
setOutputKeyClass
(
Text
.
class
);
job
.
setOutputValueClass
(
DoubleWritable
.
class
);
job
.
setInputFormatClass
(
TextInputFormat
.
class
);
TextInputFormat
.
addInputPath
(
job
,
inputDir
);
job
.
setOutputFormatClass
(
TextOutputFormat
.
class
);
TextOutputFormat
.
setOutputPath
(
job
,
outputDir
);
// Submit job and immediately return, rather than waiting for completion
job
.
submit
();
return
job
;
}
This method of job chaining is very similar to the previous approach of implementing a complex job flow in a master driver that fires off individual job drivers, except that we do it in a shell script. Each job in the chain is fired off separately in the way you would run it from the command line from inside of a shell script.
This has a few major benefits and a couple minor downsides. One
benefit is that changes to the job flow can be made without having to
recompile the code because the master driver is a scripting language
instead of Java. This is important if the job is prone to failure and
you need to easily be able to manually rerun or repair failed jobs.
Also, you’ll be able to use jobs that have already been productionalized
to work through a command-line interface, but not a script. Yet another
benefit is that the shell script can interact with services, systems,
and tools that are not Java centric. For example, later in this chapter
we’ll discuss post-processing of output, which may be very natural to do
with sed
or awk
, but less natural to do in Java.
One of the downsides of this approach is it may be harder to implement more complicated job flows in which jobs are running in parallel. You can run jobs in the background and then test for success, but it may not be as clean as in Java.
Wrapping any Hadoop MapReduce job in a script, whether it be a single Java MapReduce job, a Pig job, or whatever, has a number of benefits. This includes post-processing, data flows, data preparation, additional logging, and more.
In general, using shell scripting is useful to chain new jobs with existing jobs quickly. For more robust applications, it may make more sense to build a driver-based chaining mechanism that can better interface with Hadoop.
In this example, we use the Bash shell to tie together the basic job chaining and parallel jobs examples. The script is broken into two pieces: setting variables to actually execute the jobs, and then executing them.
Input and outputs are stored in variables to create the a
number of executable commands. There are two commands to run both
jobs, cat
the output to the
screen, and cleanup all the analytic output.
#!/bin/bash
JAR_FILE
=
"mrdp.jar"
JOB_CHAIN_CLASS
=
"mrdp.ch6.JobChainingDriver"
PARALLEL_JOB_CLASS
=
"mrdp.ch6.ParallelJobs"
HADOOP
=
"
$(
which hadoop)
"
POST_INPUT
=
"posts"
USER_INPUT
=
"users"
JOBCHAIN_OUTDIR
=
"jobchainout"
BELOW_AVG_INPUT
=
"
${
JOBCHAIN_OUTDIR
}
/belowavg"
ABOVE_AVG_INPUT
=
"
${
JOBCHAIN_OUTDIR
}
/aboveavg"
BELOW_AVG_REP_OUTPUT
=
"belowavgrep"
ABOVE_AVG_REP_OUTPUT
=
"aboveavgrep"
JOB_1_CMD
=
"
${
HADOOP
}
jar
${
JAR_FILE
}
${
JOB_CHAIN_CLASS
}
${
POST_INPUT
}
${
USER_INPUT
}
${
JOBCHAIN_OUTDIR
}
"
JOB_2_CMD
=
"
${
HADOOP
}
jar
${
JAR_FILE
}
${
PARALLEL_JOB_CLASS
}
${
BELOW_AVG_INPUT
}
${
ABOVE_AVG_INPUT
}
${
BELOW_AVG_REP_OUTPUT
}
${
ABOVE_AVG_REP_OUTPUT
}
"
CAT_BELOW_OUTPUT_CMD
=
"
${
HADOOP
}
fs -cat
${
BELOW_AVG_REP_OUTPUT
}
/part-*"
CAT_ABOVE_OUTPUT_CMD
=
"
${
HADOOP
}
fs -cat
${
ABOVE_AVG_REP_OUTPUT
}
/part-*"
RMR_CMD
=
"
${
HADOOP
}
fs -rmr
${
JOBCHAIN_OUTDIR
}
${
BELOW_AVG_REP_OUTPUT
}
${
ABOVE_AVG_REP_OUTPUT
}
"
LOG_FILE
=
"avgrep_`date +%s`.txt"
The next part of the script echo
s each command prior to running it. It
executes the first job, and then checks the return code to see
whether it failed. If it did, output is deleted and the script
exits. Upon success, the second job is executed and the same error
condition is checked. If the
second job completes successfully, the output of each job is written
to the log file and all the output is deleted. All the extra output
is not required, and since the final output of each file consists
only one line, storing it in the log file is worthwhile, instead of
keeping it in HDFS.
{
echo
${
JOB_1_CMD
}
${
JOB_1_CMD
}
if
[
$?
-ne0
]
then
echo
"First job failed!"
echo
${
RMR_CMD
}
${
RMR_CMD
}
exit
$?
fi
echo
${
JOB_2_CMD
}
${
JOB_2_CMD
}
if
[
$?
-ne0
]
then
echo
"Second job failed!"
echo
${
RMR_CMD
}
${
RMR_CMD
}
exit
$?
fi
echo
${
CAT_BELOW_OUTPUT_CMD
}
${
CAT_BELOW_OUTPUT_CMD
}
echo
${
CAT_ABOVE_OUTPUT_CMD
}
${
CAT_ABOVE_OUTPUT_CMD
}
echo
${
RMR_CMD
}
${
RMR_CMD
}
exit
0}
&
>${
LOG_FILE
}
A sample run of the script follows. The MapReduce analytic output is omitted for brevity.
/home/mrdp/hadoop/bin/hadoop jar mrdp.jar mrdp.ch6.JobChainingDriver posts users jobchainout 12/06/10 15:57:43 INFO input.FileInputFormat: Total input paths to process : 5 12/06/10 15:57:43 INFO util.NativeCodeLoader: Loaded the native-hadoop library 12/06/10 15:57:43 WARN snappy.LoadSnappy: Snappy native library not loaded 12/06/10 15:57:44 INFO mapred.JobClient: Running job: job_201206031928_0065 ... 12/06/10 15:59:14 INFO mapred.JobClient: Job complete: job_201206031928_0065 ... 12/06/10 15:59:15 INFO mapred.JobClient: Running job: job_201206031928_0066 ... 12/06/10 16:02:02 INFO mapred.JobClient: Job complete: job_201206031928_0066 /home/mrdp/hadoop/bin/hadoop jar mrdp.jar mrdp.ch6.ParallelJobs jobchainout/belowavg jobchainout/aboveavg belowavgrep aboveavgrep 12/06/10 16:02:08 INFO input.FileInputFormat: Total input paths to process : 1 12/06/10 16:02:08 INFO util.NativeCodeLoader: Loaded the native-hadoop library 12/06/10 16:02:08 WARN snappy.LoadSnappy: Snappy native library not loaded 12/06/10 16:02:12 INFO input.FileInputFormat: Total input paths to process : 1 Below average job completed successfully! Above average job completed successfully! /home/mrdp/hadoop/bin/hadoop fs -cat belowavgrep/part-* Average Reputation: 275.36385831014724 /home/mrdp/hadoop/bin/hadoop fs -cat aboveavgrep/part-* Average Reputation: 2375.301960784314 /home/mrdp/hadoop/bin/hadoop fs -rmr jobchainout belowavgrep aboveavgrep Deleted hdfs://localhost:9000/user/mrdp/jobchainout Deleted hdfs://localhost:9000/user/mrdp/belowavgrep Deleted hdfs://localhost:9000/user/mrdp/aboveavgrep
The JobControl
and ControlledJob
classes make up a system for chaining MapReduce jobs and has some
nice features like being able to track the state of the chain and fire
off jobs automatically when they’re ready by declaring their
dependencies. Using JobControl
is the
right way of doing job chaining, but can sometimes be too heavyweight
for simpler applications.
To use JobControl
, start by
wrapping your jobs with ControlledJob
. Doing this is relatively
simple: you create your job like you usually would, except you also
create a ControlledJob
that takes in
your Job
or Configuration
as a parameter, along with a
list of its dependencies (other ControlledJobs
). Then, you add them one-by-one
to the JobControl
object, which
handles the rest.
You still have to keep track of temporary data and clean it up afterwards or in the event of a failure.
You can use any of the methods we’ve discussed so far to create iterative jobs that run the same job over and over. Typically, each iteration takes the previous iteration’s data as input. This is common practice for algorithms that have some sort of optimization component, such as k-means clustering in MapReduce. This is also common practice in many graph algorithms in MapReduce.
For an example of a driver using JobControl
, let’s combine the previous two
examples of basic job chaining and parallel jobs. We are already
familiar with the mapper and reducer code, so there is no need to go
over them again. The driver is the main showpiece here for job
configuration. It uses basic job chaining to launch the first job, and
then uses JobControl
to execute the
remaining job in the chain and the two parallel jobs. The initial job
is not added via JobControl
because
you need to interrupt the control for the in-between step of using the
counters of the first job to help assist in configuration of the
second job. All jobs must be completely configured before executing
the entire job chain, which can be limiting.
Let’s take a look at the main method. Here, we parse the
command line arguments and create all the paths we will need for all
four jobs to execute. We take special care when naming the variables
to know our data flows. The first job is then configured via a
helper function and executed. Upon completion of the first job, we
invoke Configuration
methods in helper functions
to create three ControlledJob
objects. Each Configuration
method determines what mapper, reducer, etc. goes into each
job.
The binningControlledJob
has no dependencies, other than verifying that previous job executed
and completed successfully. The next two jobs are dependent on the
binning ControlledJob
. These two
jobs will not be executed by JobControl
until the binning job completes
successfully. If it doesn’t complete successfully, the other jobs
won’t be executed at all.
All three ControlledJobs
are added to the JobControl
object, and then it is run. The call to JobControl.run
will block until the group
of jobs completes. We then check the failed job list to see if any
jobs failed and set our exit code accordingly. Intermediate output
is cleaned up prior to exiting.
public
static
final
String
AVERAGE_CALC_GROUP
=
"AverageCalculation"
;
public
static
final
String
MULTIPLE_OUTPUTS_ABOVE_NAME
=
"aboveavg"
;
public
static
final
String
MULTIPLE_OUTPUTS_BELOW_NAME
=
"belowavg"
;
public
static
void
main
(
String
[]
args
)
throws
Exception
{
Configuration
conf
=
new
Configuration
();
Path
postInput
=
new
Path
(
args
[
0
]);
Path
userInput
=
new
Path
(
args
[
1
]);
Path
countingOutput
=
new
Path
(
args
[
3
]
+
"_count"
);
Path
binningOutputRoot
=
new
Path
(
args
[
3
]
+
"_bins"
);
Path
binningOutputBelow
=
new
Path
(
binningOutputRoot
+
"/"
+
JobChainingDriver
.
MULTIPLE_OUTPUTS_BELOW_NAME
);
Path
binningOutputAbove
=
new
Path
(
binningOutputRoot
+
"/"
+
JobChainingDriver
.
MULTIPLE_OUTPUTS_ABOVE_NAME
);
Path
belowAverageRepOutput
=
new
Path
(
args
[
2
]);
Path
aboveAverageRepOutput
=
new
Path
(
args
[
3
]);
Job
countingJob
=
getCountingJob
(
conf
,
postInput
,
countingOutput
);
int
code
=
1
;
if
(
countingJob
.
waitForCompletion
(
true
))
{
ControlledJob
binningControlledJob
=
new
ControlledJob
(
getBinningJobConf
(
countingJob
,
conf
,
countingOutput
,
userInput
,
binningOutputRoot
));
ControlledJob
belowAvgControlledJob
=
new
ControlledJob
(
getAverageJobConf
(
conf
,
binningOutputBelow
,
belowAverageRepOutput
));
belowAvgControlledJob
.
addDependingJob
(
binningControlledJob
);
ControlledJob
aboveAvgControlledJob
=
new
ControlledJob
(
getAverageJobConf
(
conf
,
binningOutputAbove
,
aboveAverageRepOutput
));
aboveAvgControlledJob
.
addDependingJob
(
binningControlledJob
);
JobControl
jc
=
new
JobControl
(
"AverageReputation"
);
jc
.
addJob
(
binningControlledJob
);
jc
.
addJob
(
belowAvgControlledJob
);
jc
.
addJob
(
aboveAvgControlledJob
);
jc
.
run
();
code
=
jc
.
getFailedJobList
().
size
()
==
0
?
0
:
1
;
}
FileSystem
fs
=
FileSystem
.
get
(
conf
);
fs
.
delete
(
countingOutput
,
true
);
fs
.
delete
(
binningOutputRoot
,
true
);
System
.
exit
(
code
);
}
Following are all the helper methods used to create the
actual Job
or
Configuration
objects. A ControlledJob
can be created from either
class. There are three separate methods, the final method being used
twice to create the identical parallel jobs. The inputs and outputs
are all that differentiate them.
public
static
Job
getCountingJob
(
Configuration
conf
,
Path
postInput
,
Path
outputDirIntermediate
)
throws
IOException
{
// Setup first job to counter user posts
Job
countingJob
=
new
Job
(
conf
,
"JobChaining-Counting"
);
countingJob
.
setJarByClass
(
JobChainingDriver
.
class
);
// Set our mapper and reducer, we can use the API's long sum reducer for
// a combiner!
countingJob
.
setMapperClass
(
UserIdCountMapper
.
class
);
countingJob
.
setCombinerClass
(
LongSumReducer
.
class
);
countingJob
.
setReducerClass
(
UserIdSumReducer
.
class
);
countingJob
.
setOutputKeyClass
(
Text
.
class
);
countingJob
.
setOutputValueClass
(
LongWritable
.
class
);
countingJob
.
setInputFormatClass
(
TextInputFormat
.
class
);
TextInputFormat
.
addInputPath
(
countingJob
,
postInput
);
countingJob
.
setOutputFormatClass
(
TextOutputFormat
.
class
);
TextOutputFormat
.
setOutputPath
(
countingJob
,
outputDirIntermediate
);
return
countingJob
;
}
public
static
Configuration
getBinningJobConf
(
Job
countingJob
,
Configuration
conf
,
Path
jobchainOutdir
,
Path
userInput
,
Path
binningOutput
)
throws
IOException
{
// Calculate the average posts per user by getting counter values
double
numRecords
=
(
double
)
countingJob
.
getCounters
()
.
findCounter
(
JobChainingDriver
.
AVERAGE_CALC_GROUP
,
UserIdCountMapper
.
RECORDS_COUNTER_NAME
).
getValue
();
double
numUsers
=
(
double
)
countingJob
.
getCounters
()
.
findCounter
(
JobChainingDriver
.
AVERAGE_CALC_GROUP
,
UserIdSumReducer
.
USERS_COUNTER_NAME
).
getValue
();
double
averagePostsPerUser
=
numRecords
/
numUsers
;
// Setup binning job
Job
binningJob
=
new
Job
(
conf
,
"JobChaining-Binning"
);
binningJob
.
setJarByClass
(
JobChainingDriver
.
class
);
// Set mapper and the average posts per user
binningJob
.
setMapperClass
(
UserIdBinningMapper
.
class
);
UserIdBinningMapper
.
setAveragePostsPerUser
(
binningJob
,
averagePostsPerUser
);
binningJob
.
setNumReduceTasks
(
0
);
binningJob
.
setInputFormatClass
(
TextInputFormat
.
class
);
TextInputFormat
.
addInputPath
(
binningJob
,
jobchainOutdir
);
// Add two named outputs for below/above average
MultipleOutputs
.
addNamedOutput
(
binningJob
,
JobChainingDriver
.
MULTIPLE_OUTPUTS_BELOW_NAME
,
TextOutputFormat
.
class
,
Text
.
class
,
Text
.
class
);
MultipleOutputs
.
addNamedOutput
(
binningJob
,
JobChainingDriver
.
MULTIPLE_OUTPUTS_ABOVE_NAME
,
TextOutputFormat
.
class
,
Text
.
class
,
Text
.
class
);
MultipleOutputs
.
setCountersEnabled
(
binningJob
,
true
);
// Configure multiple outputs
conf
.
setOutputFormat
(
NullOutputFormat
.
class
);
FileOutputFormat
.
setOutputPath
(
conf
,
outputDir
);
MultipleOutputs
.
addNamedOutput
(
conf
,
MULTIPLE_OUTPUTS_ABOVE_5000
,
TextOutputFormat
.
class
,
Text
.
class
,
LongWritable
.
class
);
MultipleOutputs
.
addNamedOutput
(
conf
,
MULTIPLE_OUTPUTS_BELOW_5000
,
TextOutputFormat
.
class
,
Text
.
class
,
LongWritable
.
class
);
// Add the user files to the DistributedCache
FileStatus
[]
userFiles
=
FileSystem
.
get
(
conf
).
listStatus
(
userInput
);
for
(
FileStatus
status
:
userFiles
)
{
DistributedCache
.
addCacheFile
(
status
.
getPath
().
toUri
(),
binningJob
.
getConfiguration
());
}
// Execute job and grab exit code
return
binningJob
.
getConfiguration
();
}
public
static
Configuration
getAverageJobConf
(
Configuration
conf
,
Path
averageOutputDir
,
Path
outputDir
)
throws
IOException
{
Job
averageJob
=
new
Job
(
conf
,
"ParallelJobs"
);
averageJob
.
setJarByClass
(
ParallelJobs
.
class
);
averageJob
.
setMapperClass
(
AverageReputationMapper
.
class
);
averageJob
.
setReducerClass
(
AverageReputationReducer
.
class
);
averageJob
.
setOutputKeyClass
(
Text
.
class
);
averageJob
.
setOutputValueClass
(
DoubleWritable
.
class
);
averageJob
.
setInputFormatClass
(
TextInputFormat
.
class
);
TextInputFormat
.
addInputPath
(
averageJob
,
averageOutputDir
);
averageJob
.
setOutputFormatClass
(
TextOutputFormat
.
class
);
TextOutputFormat
.
setOutputPath
(
averageJob
,
outputDir
);
// Execute job and grab exit code
return
averageJob
.
getConfiguration
();
}
Chain folding is an optimization that is applied to MapReduce job chains. Basically, it is a rule of thumb that says each record can be submitted to multiple mappers, or to a reducer and then a mapper. Such combined processing would save a lot of time reading files and transmitting data. The structure of the jobs often make these feasible because a map phase is completely shared-nothing: it looks at each record alone, so it doesn’t really matter what the organization of the data is or if it is grouped or not. When building large MapReduce chains, folding the chain to combine map phases will have some drastic performance benefits.
The main benefit of chain folding is reducing the amount of data movement in the MapReduce pipeline, whether it be the I/O of loading and storing to disk, or shuffling data over the network. In chained MapReduce jobs, temporary data is stored in HDFS, so if we can reduce the number of times we hit the disks, we’re reducing the total I/O in the chain.
There are a number of patterns in chains to look for to determine what to fold.
Take a look at the map phases in the chain. If multiple map phases are adjacent, merge them into one phase. This would be the case if you had a map-only job (such as a replicated join), followed by a numerical aggregation. In this step, we are reducing the amount of times we’re hitting the disks. Consider a two-job chain in which the first job is a map-only job, which is then followed by a traditional MapReduce job with a map phase and a reduce phase. Without this optimization, the first map-only job will write its output out to the distributed file system, and then that data will be loaded by the second job.
Instead, if we merge the map phase of the map-only job and the traditional job, that temporary data never gets written, reducing the I/O significantly. Also, fewer tasks are started, reducing overhead of task management. Chaining many map tasks together is an even more drastic optimization. In this case, there really isn’t any downside to do this other than having to possibly alter already existing code.
If the job ends with a map phase (combined or otherwise), push that phase into the reducer right before it. This is a special case with the same performance benefits as the previous step. It removes the I/O of writing temporary data out and then running a map-only job on it. It also reduces the task start-up overhead.
Note that the the first map phase of the chain cannot benefit from this next optimization. As much as possible, split up each map phase (combined or otherwise) between operations that decrease the amount of data (e.g., filtering) and operations that increase the amount of data (e.g., enrichment). In some cases, this is not possible because you may need some enrichment data in order to do the filtering. In these cases, look at dependent phases as one larger phase that cumulatively increases or decreases the amount of data. Push the processes that decrease the amount of data into the previous reducer, while keeping the processes that increase the amount of data where they are.
This step is a bit more complex and the difference is more subtle. The gain here is that if you push minimizing map-phase processing into the previous reducer, you will reduce the amount of data written to temporary storage, as well as the amount of data loaded off disk into the next part of the chain. This can be pretty significant if a drastic amount of filtering is done.
Be careful when merging phases that require lots of memory. For example, merging five replicated joins together might not be a good idea because it will exceed the total memory available to the task. In these cases, it might be better to just leave them separate.
Regardless of whether a job is a chain or not, try to filter as much data as early as possible. The most expensive parts of a MapReduce job are typically pushing data through the pipeline: loading the data, the shuffle/sort, and storing the data. For example, if you care only about data from item 2012, filter that out in the map phase, not after the reducer has grouped the data together.
Let’s run through a couple of examples to help explain the idea and why it is so useful.
To exemplify step one, consider the chain in Figure 6-1. The original chain (on top) is optimized so that the replicated join is folded into the mapper of the second MapReduce job (bottom).
This job performs a word count on comments from teenagers. We do this to find out what topics are interesting to our youngest users. The age of the user isn’t with the comment, which is why we need to do a join. In this case, the map-only replicated join can be merged into the preprocessing of the second job.
To exemplify step two, consider the following chain in Figure 6-2. The original chain (top) is optimized so that the replicated join is folded into the reducer of the second MapReduce job (bottom).
This job enriches each user’s information with the number of comments that user has posted. It uses a generic counting MapReduce job, then uses a replicated join to add in the user information to the count. In this case, the map-only replicated join can be merged into the reducer.
To exemplify step three, consider the following chain in Figure 6-3. The original chain (top) is optimized so that the replicated join is folded into the reducer of the second MapReduce job (bottom).
This job is a bit more complicated than the others, as is evident from the long chain used to solve it. The intent is to find the most popular tags per age group, which is is done by finding a count of each user, enriching their user information onto it, filtering out counts less than 5, then finally grouping by the age group and summing up the original counts. When we look at the map tasks (enrichment and filtering), the replicated join is adding data, while the filter is removing data. Following step three, we are going to move the filtering to the first MapReduce job, and then move the replicated join into the map phase of the second MapReduce job. This gives us the new chain that can be seen at the bottom of Figure 6-3. Now the first MapReduce job will write out significantly less data than before and then it follows that the second MapReduce job is loading less data.
There are two primary methods for implementing chain folding:
manually cutting and pasting code together, and a more elegant approach
that uses special classes called ChainMapper
and ChainReducer
. If this is a one-time job and
logically has multiple map phases, just implement it in one shot with the
manual approach. If several of the map phases are reused (in a software
reuse sense), then you should use the ChainMapper
and ChainReducer
approach to follow good software
engineering practice.
ChainMapper
and ChainReducer
are special mapper and reducer classes that allow you to run multiple
map phases in the mapper and multiple map phases after the reducer. You
are effectively expanding the traditional map and reduce paradigm into
several map phases, followed by a reduce phase, followed by several map
phases. However, only one map phase and one reduce phase is ever
invoked.
Each chained map phase feeds into the next in the pipeline. The output of the first is then processed by the second, which is then processed by the third, and so on. The map phases on the backend of the reducer take the output of the reducer and do additional computation. This is useful for post-processing operations or additional filtering.
Be sure that the input types and output types between each chain
match up. If the first phase outputs a <LongWritable, Text>
, be sure the
second phase takes its input as <LongWritable, Text>
.
This example is a slight modification of the job chaining example. Here,
we use two mapper implementations for the initial map phase. The first
formats each input XML record and writes out the user ID with a count
of one. The second mapper then enriches the user ID with his or her
reputation, which is read during the setup phase via the DistributedCache
.
These two individual mapper classes are then chained together to
feed a single reducer. This reducer is a basic LongSumReducer
that simply iterates through all the values and sums the
numbers. This sum is then output along with the input key.
Finally, a third mapper is called that will bin the records
based on whether their reputation is below or above 5,000. This entire
flow is executed in one MapReduce job using ChainMapper
and ChainReducer
.
This example uses the deprecated mapred
API, because ChainMapper
and ChainReducer
were not available in the
mapreduce
package when this
example was written.
The following descriptions of each code section explain the solution to the problem.
Problem: Given a set of user posts and user information, bin users based on whether their reputation is below or above 5,000.
This mapper implementation gets the user ID from the input post record and outputs it with a count of 1.
public
static
class
UserIdCountMapper
extends
MapReduceBase
implements
Mapper
<
Object
,
Text
,
Text
,
LongWritable
>
{
public
static
final
String
RECORDS_COUNTER_NAME
=
"Records"
;
private
static
final
LongWritable
ONE
=
new
LongWritable
(
1
);
private
Text
outkey
=
new
Text
();
public
void
map
(
Object
key
,
Text
value
,
OutputCollector
<
Text
,
LongWritable
>
output
,
Reporter
reporter
)
throws
IOException
{
Map
<
String
,
String
>
parsed
=
MRDPUtils
.
transformXmlToMap
(
value
.
toString
());
// Get the value for the OwnerUserId attribute
outkey
.
set
(
parsed
.
get
(
"OwnerUserId"
));
output
.
collect
(
outkey
,
ONE
);
}
}
This mapper implementation is fed the output from the previous
mapper. It reads the users data set during the setup phase to create
a map of user ID to reputation. This map is used in the calls to
map
to enrich the output key with
the user’s reputation. This new key is then output along with the
input value.
public
static
class
UserIdReputationEnrichmentMapper
extends
MapReduceBase
implements
Mapper
<
Text
,
LongWritable
,
Text
,
LongWritable
>
{
private
Text
outkey
=
new
Text
();
private
HashMap
<
String
,
String
>
userIdToReputation
=
new
HashMap
<
String
,
String
>();
public
void
configure
(
JobConf
job
)
{
Path
[]
files
=
DistributedCache
.
getLocalCacheFiles
(
job
);
// Read all files in the DistributedCache
for
(
Path
p
:
files
)
{
BufferedReader
rdr
=
new
BufferedReader
(
new
InputStreamReader
(
new
GZIPInputStream
(
new
FileInputStream
(
new
File
(
p
.
toString
())))));
String
line
;
// For each record in the user file
while
((
line
=
rdr
.
readLine
())
!=
null
)
{
// Get the user ID and reputation
Map
<
String
,
String
>
parsed
=
MRDPUtils
.
transformXmlToMap
(
line
);
// Map the user ID to the reputation
userIdToReputation
.
put
(
parsed
.
get
(
"Id"
,
parsed
.
get
(
"Reputation"
));
}
}
}
public
void
map
(
Text
key
,
LongWritable
value
,
OutputCollector
<
Text
,
LongWritable
>
output
,
Reporter
reporter
)
throws
IOException
{
String
reputation
=
userIdToReputation
.
get
(
key
.
toString
());
if
(
reputation
!=
null
)
{
outkey
.
set
(
key
.
toString
()
+
" "
+
reputation
);
output
.
collect
(
outkey
,
value
);
}
}
}
This reducer implementation sums the values together and outputs this summation with the input key: user ID and reputation.
public
static
class
LongSumReducer
extends
MapReduceBase
implements
Reducer
<
Text
,
LongWritable
,
Text
,
LongWritable
>
{
private
LongWritable
outvalue
=
new
LongWritable
();
public
void
reduce
(
Text
key
,
Iterator
<
LongWritable
>
values
,
OutputCollector
<
Text
,
LongWritable
>
output
,
Reporter
reporter
)
throws
IOException
{
int
sum
=
0
;
while
(
values
.
hasNext
())
{
sum
+=
values
.
next
().
get
();
}
outvalue
.
set
(
sum
);
output
.
collect
(
key
,
outvalue
);
}
}
This mapper uses MultipleOutputs
to bin users into two data
sets. The input key is parsed to pull out the reputation. This
reputation value is then compared to the value 5,000 and the record
is binned appropriately.
public
static
class
UserIdBinningMapper
extends
MapReduceBase
implements
Mapper
<
Text
,
LongWritable
,
Text
,
LongWritable
>
{
private
MultipleOutputs
mos
=
null
;
public
void
configure
(
JobConf
conf
)
{
mos
=
new
MultipleOutputs
(
conf
);
}
public
void
map
(
Text
key
,
LongWritable
value
,
OutputCollector
<
Text
,
LongWritable
>
output
,
Reporter
reporter
)
throws
IOException
{
if
(
Integer
.
parseInt
(
key
.
toString
().
split
(
" "
)[
1
])
<
5000
)
{
mos
.
getCollector
(
MULTIPLE_OUTPUTS_BELOW_5000
,
reporter
)
.
collect
(
key
,
value
);
}
else
{
mos
.
getCollector
(
MULTIPLE_OUTPUTS_ABOVE_5000
,
reporter
)
.
collect
(
key
,
value
);
}
}
public
void
close
()
{
mos
.
close
();
}
}
The driver handles configuration of the ChainMapper
and ChainReducer
. The most
interesting piece here is adding mappers and setting the reducer.
The order in which they are added affects the execution of the
different mapper implementations. ChainMapper
is first used to add the two
map implementations that will be called back to back before any
sorting and shuffling occurs. Then, the ChainReducer
static methods are used to
set the reducer implementation, and then finally a mapper on the
end. Note that you don’t use ChainMapper
to add a mapper after a
reducer: use ChainReducer
.
The signature of each method takes in the JobConf
of a
mapper/reducer class, the input and output key value pair types, and
another JobConf
for the
mapper/reducer class. This can be used in case the mapper or reducer
has overlapping configuration parameters. No special configuration
is required, so we simply pass in empty JobConf
objects. The seventh parameter in
the signature is a flag as to pass values in the chain by reference
or by value. This is an added optimization you can use if the
collector does not modify the keys or values in either the mapper or
the reducer. Here, we make these assumptions, so we pass objects by
reference (byValue =
false
).
In addition to configuring the chain mappers and reducers, we
also add the user data set to the DistributedCache
so our second mapper can
perform the enrichment. We also set configure the MultipleOutputs
and use a NullOutputFormat
rather than the typical TextOutputFormat
. Use of this output
format will prevent the framework from creating the default empty
part files.
public
static
void
main
(
String
[]
args
)
throws
Exception
{
JobConf
conf
=
new
JobConf
(
"ChainMapperReducer"
);
conf
.
setJarByClass
(
ChainMapperDriver
.
class
);
Path
postInput
=
new
Path
(
args
[
0
]);
Path
userInput
=
new
Path
(
args
[
1
]);
Path
outputDir
=
new
Path
(
args
[
2
]);
ChainMapper
.
addMapper
(
conf
,
UserIdCountMapper
.
class
,
LongWritable
.
class
,
Text
.
class
,
Text
.
class
,
LongWritable
.
class
,
false
,
new
JobConf
(
false
));
ChainMapper
.
addMapper
(
conf
,
UserIdReputationEnrichmentMapper
.
class
,
Text
.
class
,
LongWritable
.
class
,
Text
.
class
,
LongWritable
.
class
,
false
,
new
JobConf
(
false
));
ChainReducer
.
setReducer
(
conf
,
LongSumReducer
.
class
,
Text
.
class
,
LongWritable
.
class
,
Text
.
class
,
LongWritable
.
class
,
false
,
new
JobConf
(
false
));
ChainReducer
.
addMapper
(
conf
,
UserIdBinningMapper
.
class
,
Text
.
class
,
LongWritable
.
class
,
Text
.
class
,
LongWritable
.
class
,
false
,
new
JobConf
(
false
));
conf
.
setCombinerClass
(
LongSumReducer
.
class
);
conf
.
setInputFormat
(
TextInputFormat
.
class
);
TextInputFormat
.
setInputPaths
(
conf
,
postInput
);
// Configure multiple outputs
conf
.
setOutputFormat
(
NullOutputFormat
.
class
);
FileOutputFormat
.
setOutputPath
(
conf
,
outputDir
);
MultipleOutputs
.
addNamedOutput
(
conf
,
MULTIPLE_OUTPUTS_ABOVE_5000
,
TextOutputFormat
.
class
,
Text
.
class
,
LongWritable
.
class
);
MultipleOutputs
.
addNamedOutput
(
conf
,
MULTIPLE_OUTPUTS_BELOW_5000
,
conf
.
setOutputKeyClass
(
Text
.
class
);
conf
.
setOutputValueClass
(
LongWritable
.
class
);
// Add the user files to the DistributedCache
FileStatus
[]
userFiles
=
FileSystem
.
get
(
conf
).
listStatus
(
userInput
);
for
(
FileStatus
status
:
userFiles
)
{
DistributedCache
.
addCacheFile
(
status
.
getPath
().
toUri
(),
conf
);
}
RunningJob
job
=
JobClient
.
runJob
(
conf
);
while
(!
job
.
isComplete
())
{
Thread
.
sleep
(
5000
);
}
System
.
exit
(
job
.
isSuccessful
()
?
0
:
1
);
}
Like job folding, job merging is another optimization aimed to reduce the amount of I/O through the MapReduce pipeline. Job merging is a process that allows two unrelated jobs that are loading the same data to share the MapReduce pipeline. The main benefit of merging is that the data needs to be loaded and parsed only once. For some large-scale jobs, that task might be the most expensive part of the whole operation. One of the downsides of “schema-on-load” and storing the data in its original form is having to parse it over and over again, which can really impact performance if parsing is complicated (e.g., XML).
Assume we have two jobs that need to run over the exact same massive amount of data. These two jobs both load and parse the data, then perform their computations. With job merging, we’ll have one MapReduce job that logically performs the two jobs at once without mixing the two applications as seen in Figure 6-4. The original chain (top) is optimized so that the two mappers run on the same data, and the two reducers run on the same data (bottom).
Nothing is stopping you from applying job merging to more than two jobs at once. The more the merrier! The more you consolidate a shared burden across jobs, the more compute resources you’ll have available in your cluster.
Likely, this process will be relevant only for important and already existing jobs in a production cluster. Development groups that take the time to consolidate their core analytics will see significant reductions in cluster utilization. When the jobs are merged, they’ll have to run together and the source code will have to be kept together. This is likely not worth it for jobs that are run in an ad hoc manner or are relatively new to the environment.
Unfortunately, you must satisfy a number of prerequisites before applying this pattern. The most obvious one is that both jobs need to have the same intermediate keys and output formats, because they’ll be sharing the pipeline and thus need to use the same data types. Serialization or polymorphism can be used if this is truly a problem, but adds a bit of complexity.
Job merging is a dirty procedure. Some hacks will have to be done to get it to work, but definitely more work can be put into a merging solution to make it a bit cleaner. From a software engineering perspective, this complicates the code organization, because unrelated jobs now share the same code. At a high level, the same map function will now be performing the original duties of the old map functions, while the reduce function will perform one action or another based on a tag on the key that tells which data set it came from. The steps for merging two jobs are as follows:
Bring the code for the two mappers together.
There are a couple of ways to do this. Copying and pasting the code works, but may complicate which piece of code is doing what. Good in-code comments can help you compensate for this. The other method is to separate the code into two helper map functions that process the input for each algorithm.
In the mapper, change the writing of the key and value to “tag” the key with the map source.
Tagging the key to indicate which map it came from is critical so that the data from the different maps don’t get mixed up. There are a few ways to do this depending on the original data type. If it is a string, you can simply make the first character the tag, so for instance you could change “parks” to “Aparks” when it comes from the first map, and “Bparks” when it comes from the second map.
The general way to tag is to make a custom composite tuple-like key that stores the tag separately from the original data. This is definitely the cleaner way of doing things, but takes a bit more work.
In the reducer, parse out the tag and use an if-statement to switch what reducer code actually gets executed.
As in the mapper, you can either just copy and paste the code into an if-statement or have the if-statement call out to helper functions. The if-statement controls the path of execution based on the tag.
Use MultipleOutputs
to
separate the output for the jobs.
MultipleOutputs
is a special
output format helper class that allows you to write to different
folders of output for the same reducer, instead of just a single
folder. Make it so the one reducer path always writes to one folder of
the MultipleOutputs
, while the
other reducer path writes to the other folder.
This example combines Anonymizing StackOverflow comments and Distinct user IDs. Both examples used the comments data set as input. However, their outputs were very different. One created a distinct set of users, while the other created an anonymized version of each record. The comment portion of the StackOverflow data set is the largest we have, so merging these jobs together will definitely cut our processing time down. This way, the data set needs to be read only once.
The following descriptions of each code section explain the solution to the problem.
Problem: Given a set of comments, generate an anonymized version of the data and a distinct set of user IDs.
A custom WritableComparable
object is created to tag a Text
with a string. This is a cleaner way of splitting the logic
between the two jobs, and saves us some string parsing in the
reducer.
This object has two private member variables and getters and
setters for each variable. It holds a String
that the mapper uses to tag each Text
value
that is also held by this object. The reducer then examines the tag
to find out which reduce logic to execute. The compareTo
method is what makes this object
also comparable and allowed for use as a key in the MapReduce
framework. This method first examines the tag for equality. If they
are equal, the text inside the object is then compared and the value
immediately returned. If they are not equal, the value of the
comparison is then returned. Items are sorted by tag first, and then
by the text value. This type of comparison will produce output such
as:
A:100004122 A:120019879 D:10 D:22 D:23
public
static
class
TaggedText
implements
WritableComparable
<
TaggedText
>
{
private
String
tag
=
""
;
private
Text
text
=
new
Text
();
public
TaggedText
()
{
}
public
void
setTag
(
String
tag
)
{
this
.
tag
=
tag
;
}
public
String
getTag
()
{
return
tag
;
}
public
void
setText
(
Text
text
)
{
this
.
text
.
set
(
text
);
}
public
void
setText
(
String
text
)
{
this
.
text
.
set
(
text
);
}
public
Text
getText
()
{
return
text
;
}
public
void
readFields
(
DataInput
in
)
throws
IOException
{
tag
=
in
.
readUTF
();
text
.
readFields
(
in
);
}
public
void
write
(
DataOutput
out
)
throws
IOException
{
out
.
writeUTF
(
tag
);
text
.
write
(
out
);
}
public
int
compareTo
(
TaggedText
obj
)
{
int
compare
=
tag
.
compareTo
(
obj
.
getTag
());
if
(
compare
==
0
)
{
return
text
.
compareTo
(
obj
.
getText
());
}
else
{
return
compare
;
}
}
public
String
toString
()
{
return
tag
.
toString
()
+
":"
+
text
.
toString
();
}
}
The map method simply passes the parameters to two helper
functions, each of which processes the map logic individual to write
output to context
. The map
methods were slightly changed from their original respective
examples in order to both output Text
objects as the key and value. This is
a necessary change so we can have the same type of intermediate
key/value pairs we had in the separate map logic. The anonymizeMap
method generates an anonymous
record from the input value, whereas the distinctMap
method grabs the user ID from
the record and outputs it. Each intermediate key/value pair written
out from each helper map method is tagged with either “A” for
anonymize or “D” for distinct.
Each helper math method parses the input record, but this
parsing should instead be done inside the actual map
method, The resulting Map<String,String>
can then be
passed to both helper methods. Any little optimizations like this
can be very beneficial in the long run and should be
implemented!
public
static
class
AnonymizeDistinctMergedMapper
extends
Mapper
<
Object
,
Text
,
TaggedText
,
Text
>
{
private
static
final
Text
DISTINCT_OUT_VALUE
=
new
Text
();
private
Random
rndm
=
new
Random
();
private
TaggedText
anonymizeOutkey
=
new
TaggedText
(),
distinctOutkey
=
new
TaggedText
();
private
Text
anonymizeOutvalue
=
new
Text
();
public
void
map
(
Object
key
,
Text
value
,
Context
context
)
throws
IOException
,
InterruptedException
{
anonymizeMap
(
key
,
value
,
context
);
distinctMap
(
key
,
value
,
context
);
}
private
void
anonymizeMap
(
Object
key
,
Text
value
,
Context
context
)
throws
IOException
,
InterruptedException
{
Map
<
String
,
String
>
parsed
=
MRDPUtils
.
transformXmlToMap
(
value
.
toString
());
if
(
parsed
.
size
()
>
0
)
{
StringBuilder
bldr
=
new
StringBuilder
();
bldr
.
append
(
"<row "
);
for
(
Entry
<
String
,
String
>
entry
:
parsed
.
entrySet
())
{
if
(
entry
.
getKey
().
equals
(
"UserId"
)
||
entry
.
getKey
().
equals
(
"Id"
))
{
// ignore these fields
}
else
if
(
entry
.
getKey
().
equals
(
"CreationDate"
))
{
// Strip out the time, anything after the 'T'
// in the value
bldr
.
append
(
entry
.
getKey
()
+
"=""
+
entry
.
getValue
().
substring
(
0
,
entry
.
getValue
().
indexOf
(
'T'
))
+
"" "
);
}
else
{
// Otherwise, output this.
bldr
.
append
(
entry
.
getKey
()
+
"=""
+
entry
.
getValue
()
+
"" "
);
}
}
bldr
.
append
(
">"
);
anonymizeOutkey
.
setTag
(
"A"
);
anonymizeOutkey
.
setText
(
Integer
.
toString
(
rndm
.
nextInt
()));
anonymizeOutvalue
.
set
(
bldr
.
toString
());
context
.
write
(
anonymizeOutkey
,
anonymizeOutvalue
);
}
}
private
void
distinctMap
(
Object
key
,
Text
value
,
Context
context
)
throws
IOException
,
InterruptedException
{
Map
<
String
,
String
>
parsed
=
MRDPUtils
.
transformXmlToMap
(
value
.
toString
());
// Otherwise, set our output key to the user's id,
// tagged with a "D"
distinctOutkey
.
setTag
(
"D"
);
distinctOutkey
.
setText
(
parsed
.
get
(
"UserId"
));
// Write the user's id with a null value
context
.
write
(
distinctOutkey
,
DISTINCT_OUT_VALUE
);
}
}
The reducer’s calls to setup
and cleanup
handle the creation and closing of
the MultipleOutputs
utility. The reduce
method checks
the tag of each input key and calls a helper reducer method based on
the tag. The reduce methods are passed the Text
object inside the TaggedText
.
For the anonymous call, all the input values are iterated over
and written to a named output of anonymize/part
. Adding the slash and the
“part” creates a folder under the configured output directory that
contains a number of part files equivalent to the number of reduce
tasks.
For the distinct reduce call, the input key is written to
MultipleOutputs
with a NullWritable
to a named output of distinct/part
. Again, this will create a
folder called distinct
underneath
the job’s configured output directory.
In this example, we are outputting the same essential
format—a Text
object and a
NullWritable
object— from each
of the reduce calls. This won’t always be the case! If your jobs
have conflicting output key/value types, you can utilize the
Text
object to normalize the
outputs.
public
static
class
AnonymizeDistinctMergedReducer
extends
Reducer
<
TaggedText
,
Text
,
Text
,
NullWritable
>
{
private
MultipleOutputs
<
Text
,
NullWritable
>
mos
=
null
;
protected
void
setup
(
Context
context
)
throws
IOException
,
InterruptedException
{
mos
=
new
MultipleOutputs
<
Text
,
NullWritable
>(
context
);
}
protected
void
reduce
(
TaggedText
key
,
Iterable
<
Text
>
values
,
Context
context
)
throws
IOException
,
InterruptedException
{
if
(
key
.
getTag
().
equals
(
"A"
))
{
anonymizeReduce
(
key
.
getText
(),
values
,
context
);
}
else
{
distinctReduce
(
key
.
getText
(),
values
,
context
);
}
}
private
void
anonymizeReduce
(
Text
key
,
Iterable
<
Text
>
values
,
Context
context
)
throws
IOException
,
InterruptedException
{
for
(
Text
value
:
values
)
{
mos
.
write
(
MULTIPLE_OUTPUTS_ANONYMIZE
,
value
,
NullWritable
.
get
(),
MULTIPLE_OUTPUTS_ANONYMIZE
+
"/part"
);
}
}
private
void
distinctReduce
(
Text
key
,
Iterable
<
Text
>
values
,
Context
context
)
throws
IOException
,
InterruptedException
{
mos
.
write
(
MULTIPLE_OUTPUTS_DISTINCT
,
key
,
NullWritable
.
get
(),
MULTIPLE_OUTPUTS_DISTINCT
+
"/part"
);
}
protected
void
cleanup
(
Context
context
)
throws
IOException
,
InterruptedException
{
mos
.
close
();
}
}
The driver code looks just like any other driver that uses
MultipleOutputs
. All the logic of
merging jobs is done inside the mapper and reducer implementation.
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// Configure the merged job
Job
job
=
new
Job
(
new
Configuration
(),
"MergedJob"
);
job
.
setJarByClass
(
MergedJobDriver
.
class
);
job
.
setMapperClass
(
AnonymizeDistinctMergedMapper
.
class
);
job
.
setReducerClass
(
AnonymizeDistinctMergedReducer
.
class
);
job
.
setNumReduceTasks
(
10
);
TextInputFormat
.
setInputPaths
(
job
,
new
Path
(
args
[
0
]));
TextOutputFormat
.
setOutputPath
(
job
,
new
Path
(
args
[
1
]));
MultipleOutputs
.
addNamedOutput
(
job
,
MULTIPLE_OUTPUTS_ANONYMIZE
,
TextOutputFormat
.
class
,
Text
.
class
,
NullWritable
.
class
);
MultipleOutputs
.
addNamedOutput
(
job
,
MULTIPLE_OUTPUTS_DISTINCT
,
TextOutputFormat
.
class
,
Text
.
class
,
NullWritable
.
class
);
job
.
setOutputKeyClass
(
TaggedText
.
class
);
job
.
setOutputValueClass
(
Text
.
class
);
System
.
exit
(
job
.
waitForCompletion
(
true
)
?
0
:
1
);
}