Often times we require multiple MapReduce applications to be executed in a workflow-like manner to achieve our objective. Hadoop ControlledJob
and JobControl
classes provide a mechanism to execute a simple workflow graph of MapReduce jobs by specifying the dependencies between them.
In this recipe, we execute the log-grep
MapReduce computation followed by the log-analysis
MapReduce computation on a HTTP server log data set. The log-grep
computation filters the input data based on a regular expression. The log-analysis
computation analyses the filtered data. Hence, the log-analysis
computation is dependent on the log-grep
computation. We use the ControlledJob
to express this dependency and use the JobControl
to execute the two related MapReduce computations.
The following steps show you how to add a MapReduce computation as a dependency of another MapReduce computation.
Configuration
and the Job
objects for the first MapReduce job and populate them with the other needed configurations.Job job1 = new Job(getConf(), "log-grep"); job1.setJarByClass(RegexMapper.class); job1.setMapperClass(RegexMapper.class); FileInputFormat.setInputPaths(job1, new Path(inputPath)); FileOutputFormat.setOutputPath(job1, new Path(intermedPath)); ……
Configuration
and Job
objects for the second MapReduce job and populate them with the necessary configurations.Job job2 = new Job(getConf(), "log-analysis"); job2.setJarByClass(LogProcessorMap.class); job2.setMapperClass(LogProcessorMap.class); job2.setReducerClass(LogProcessorReduce.class); FileOutputFormat.setOutputPath(job2, new Path(outputPath)); ………
FileInputFormat.setInputPaths(job2, new Path(intermedPath +"/part*"));
ControlledJob
objects using the above-created Job
objects.ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration()); ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration());
controlledJob2.addDependingJob(controlledJob1);
JobControl
object for this group of jobs and add the ControlledJob
created in step 4 to the newly created JobControl
object.JobControl jobControl = new JobControl("JobControlDemoGroup"); jobControl.addJob(controlledJob1); jobControl.addJob(controlledJob2);
JobControl
object. Start the thread and wait for the completion.Thread jobControlThread = new Thread(jobControl); jobControlThread.start(); while (!jobControl.allFinished()){ Thread.sleep(500); } jobControl.stop();
The ControlledJob
class encapsulates MapReduce job and provides the functionality to track the dependencies for the job. A ControlledJob
class with depending jobs becomes ready for submission only when all of its depending jobs are completed successfully. A ControlledJob
fails if any of the depending jobs fail.
The JobControl
class encapsulates a set of ControlledJob
s and their dependencies. JobControl
tracks the status of the encapsulated ControlledJob
s and contains a thread that submits the jobs that are in the READY state.
If you want to use the output of a MapReduce job as the input of a dependent job, the input paths to the dependent job has to be set manually. By default, Hadoop generates an output folder per reduce task name with the part
prefix. We can specify all the part
prefixed subdirectories as input to the dependent job using wildcards.
FileInputFormat.setInputPaths(job2, new Path(job1OutPath +"/part*"));
We can use the JobControl
class to execute and track a group of non-dependent tasks as well.
Apache Oozie is a workflow system for Hadoop MapReduce computations. You can use Oozie to execute Directed Acyclic Graphs (DAG) of MapReduce computations. You can find more information on Oozie from the project's home page at http://oozie.apache.org/.
The ChainMapper
class, available in the older version of Hadoop MapReduce API, allowed us to execute a pipeline of mapper classes inside a single Map task computation in a pipeline. ChainReducer
provided the similar support for reduce tasks.