Adding dependencies between MapReduce jobs

Often times we require multiple MapReduce applications to be executed in a workflow-like manner to achieve our objective. Hadoop ControlledJob and JobControl classes provide a mechanism to execute a simple workflow graph of MapReduce jobs by specifying the dependencies between them.

In this recipe, we execute the log-grep MapReduce computation followed by the log-analysis MapReduce computation on a HTTP server log data set. The log-grep computation filters the input data based on a regular expression. The log-analysis computation analyses the filtered data. Hence, the log-analysis computation is dependent on the log-grep computation. We use the ControlledJob to express this dependency and use the JobControl to execute the two related MapReduce computations.

How to do it...

The following steps show you how to add a MapReduce computation as a dependency of another MapReduce computation.

  1. Create the Configuration and the Job objects for the first MapReduce job and populate them with the other needed configurations.
    Job job1 = new Job(getConf(), "log-grep");
    job1.setJarByClass(RegexMapper.class);
    job1.setMapperClass(RegexMapper.class);  
    FileInputFormat.setInputPaths(job1, new Path(inputPath));
    FileOutputFormat.setOutputPath(job1, new  Path(intermedPath));
    …… 
  2. Create the Configuration and Job objects for the second MapReduce job and populate them with the necessary configurations.
    Job job2 = new Job(getConf(), "log-analysis");
    job2.setJarByClass(LogProcessorMap.class);
    job2.setMapperClass(LogProcessorMap.class);
    job2.setReducerClass(LogProcessorReduce.class);  
    FileOutputFormat.setOutputPath(job2, new Path(outputPath));    
    ………
  3. Set the output directory of the first job as the input directory of the second job.
    FileInputFormat.setInputPaths(job2, new Path(intermedPath +"/part*"));
  4. Create ControlledJob objects using the above-created Job objects.
    ControlledJob controlledJob1 = new ControlledJob(job1.getConfiguration());
    ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration());
  5. Add the first job as a dependency to the second job.
    controlledJob2.addDependingJob(controlledJob1);
  6. Create the JobControl object for this group of jobs and add the ControlledJob created in step 4 to the newly created JobControl object.
    JobControl jobControl = new  JobControl("JobControlDemoGroup");
    jobControl.addJob(controlledJob1);
    jobControl.addJob(controlledJob2);
  7. Create a new thread to run the group of jobs added to the JobControl object. Start the thread and wait for the completion.
    Thread jobControlThread = new Thread(jobControl);
    jobControlThread.start();    
    while (!jobControl.allFinished()){
      Thread.sleep(500);
    }
    jobControl.stop();

How it works...

The ControlledJob class encapsulates MapReduce job and provides the functionality to track the dependencies for the job. A ControlledJob class with depending jobs becomes ready for submission only when all of its depending jobs are completed successfully. A ControlledJob fails if any of the depending jobs fail.

The JobControl class encapsulates a set of ControlledJob s and their dependencies. JobControl tracks the status of the encapsulated ControlledJob s and contains a thread that submits the jobs that are in the READY state.

If you want to use the output of a MapReduce job as the input of a dependent job, the input paths to the dependent job has to be set manually. By default, Hadoop generates an output folder per reduce task name with the part prefix. We can specify all the part prefixed subdirectories as input to the dependent job using wildcards.

FileInputFormat.setInputPaths(job2, new Path(job1OutPath +"/part*"));

There's more...

We can use the JobControl class to execute and track a group of non-dependent tasks as well.

Apache Oozie is a workflow system for Hadoop MapReduce computations. You can use Oozie to execute Directed Acyclic Graphs (DAG) of MapReduce computations. You can find more information on Oozie from the project's home page at http://oozie.apache.org/.

The ChainMapper class, available in the older version of Hadoop MapReduce API, allowed us to execute a pipeline of mapper classes inside a single Map task computation in a pipeline. ChainReducer provided the similar support for reduce tasks.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset