Chapter 5. Workflow Management with Python

The most popular workflow scheduler to manage Hadoop jobs is arguably Apache Oozie. Like many other Hadoop products, Oozie is written in Java, and is a server-based web application that runs workflow jobs that execute Hadoop MapReduce and Pig jobs. An Oozie workflow is a collection of actions arranged in a control dependency directed acyclic graph (DAG) specified in an XML document. While Oozie has a lot of support in the Hadoop community, configuring workflows and jobs through XML attributes has a steep learning curve.

Luigi is a Python alternative, created by Spotify, that enables complex pipelines of batch jobs to be built and configured. It handles dependency resolution, workflow management, visualization, and much more. It also has a large community and supports many Hadoop technologies.

This chapter begins with the installation of Luigi and a detailed description of a workflow. Multiple examples then show how Luigi can be used to control MapReduce and Pig jobs.

Installation

Luigi is distributed through PyPI and can be installed using pip:

$ pip install luigi

Or it can be installed from source:

$ git clone https://github.com/spotify/luigi
$ python setup.py install

Workflows

Within Luigi, a workflow consists of a pipeline of actions, called tasks. Luigi tasks are nonspecific, that is, they can be anything that can be written in Python. The locations of input and output data for a task are known as targets. Targets typically correspond to locations of files on disk, on HDFS, or in a database. In addition to tasks and targets, Luigi utilizes parameters to customize how tasks are executed.

Tasks

Tasks are the sequences of actions that comprise a Luigi workflow. Each task declares its dependencies on targets created by other tasks. This enables Luigi to create dependency chains that ensure a task will not be executed until all of the dependent tasks and all of the dependencies for those tasks are satisfied.

Figure 5-1 depicts a workflow highlighting Luigi tasks and their dependencies.

Figure 5-1. A Luigi task dependency diagram illustrates the flow of work up a pipeline and the dependencies between tasks

Target

Targets are the inputs and outputs of a task. The most common targets are files on a disk, files in HDFS, or records in a database. Luigi wraps the underlying filesystem operations to ensure that interactions with targets are atomic. This allows a workflow to be replayed from the point of failure without having to replay any of the already successfully completed tasks.

Parameters

Parameters allow the customization of tasks by enabling values to be passed into a task from the command line, programmatically, or from another task. For example, the name of a task’s output may be determined by a date passed into the task through a parameter.

An Example Workflow

This section describes a workflow that implements the WordCount algorithm to explain the interaction among tasks, targets, and parameters. The complete workflow is shown in Example 5-1.

Example 5-1. /python/Luigi/wordcount.py
import luigi

class InputFile(luigi.Task):
   """
   A task wrapping a target 
   """
   input_file = luigi.Parameter()

   def output(self):
      """
      Return the target for this task
      """
      return luigi.LocalTarget(self.input_file)

class WordCount(luigi.Task):
   """
   A task that counts the number of words in a file
   """
   input_file = luigi.Parameter()
   output_file = luigi.Parameter(default='/tmp/wordcount')

   def requires(self):
      """
      The task's dependencies:
      """
      return InputFile(self.input_file)

   def output(self):
      """
      The task's output
      """
      return luigi.LocalTarget(self.output_file)

   def run(self):
      """
      The task's logic
      """
      count = {}

      ifp = self.input().open('r')

      for line in ifp:
         for word in line.strip().split():
            count[word] = count.get(word, 0) + 1

      ofp = self.output().open('w')
      for k, v in count.items():
            ofp.write('{}	{}
'.format(k, v))
      ofp.close()

if __name__ == '__main__':
   luigi.run()

This workflow contains two tasks: InputFile and WordCount. The InputFile task returns the input file to the WordCount task. The WordCount tasks then counts the occurrences of each word in the input file and stores the results in the output file.

Within each task, the requires(), output(), and run() methods can be overridden to customize a task’s behavior.

Task.requires

The requires() method is used to specify a task’s dependencies. The WordCount task requires the output of the InputFile task:

 def requires(self):
    return InputFile(self.input_file)

It is important to note that the requires() method cannot return a Target object. In this example, the Target object is wrapped in the InputFile task. Calling the InputFile task with the self.input_file argument enables the input_file parameter to be passed to the InputFile task.

Task.output

The output() method returns one or more Target objects. The InputFile task returns the Target object that was the input for the WordCount task:

def output(self):
   return luigi.LocalTarget(self.input_file)

The WordCount task returns the Target object that was the output for the workflow:

def output(self):
   return luigi.LocalTarget(self.output_file)

Task.run

The run() method contains the code for a task. After the requires() method completes, the run() method is executed. The run() method for the WordCount task reads data from the input file, counts the number of occurrences, and writes the results to an output file:

def run(self):
   count = {}

   ifp = self.input().open('r')

   for line in ifp:
      for word in line.strip().split():
         count[word] = count.get(word, 0) + 1

   ofp = self.output().open('w')
   for k, v in count.items():
      ofp.write('{}	{}
'.format(k, v))
   ofp.close()

The input() and output() methods are helper methods that allow the task to read and write to Target objects in the requires() and output() methods, respectively.

Parameters

Parameters enable values to be passed into a task, customizing the task’s execution. The WordCount task contains two parameters: input_file and output_file:

class WordCount(luigi.Task):
   input_file = luigi.Parameter()
   output_file = luigi.Parameter(default='/tmp/wordcount')

Default values can be set for parameters by using the default argument.

Luigi creates a command-line parser for each Parameter object, enabling values to be passed into the Luigi script on the command line, e.g., --input-file input.txt and --output-file /tmp/output.txt.

Execution

To enable execution from the command line, the following lines must be present in the application:

if __name__ == '__main__':
   luigi.run()

This will enable Luigi to read commands from the command line.

The following command will execute the workflow, reading from input.txt and storing the results in /tmp/wordcount.txt:

$ python wordcount.py WordCount 
--local-scheduler 
--input-file input.txt 
--output-file /tmp/wordcount.txt

Hadoop Workflows

This section contains workflows that control MapReduce and Pig jobs on a Hadoop cluster.

Configuration File

The examples in this section require a Luigi configuration file, client.cfg, to specify the location of the Hadoop streaming jar and the path to the Pig home directory. The config files should be in the current working directory, and an example of a config file is shown in Example 5-2.

Example 5-2. python/Luigi/client.cfg
[hadoop]
streaming-jar: /usr/lib/hadoop-xyz/hadoop-streaming-xyz-123.jar

[pig]
home: /usr/lib/pig

MapReduce in Luigi

Luigi scripts can control the execution of MapReduce jobs on a Hadoop cluster by using Hadoop streaming (Example 5-3).

Example 5-3. python/Luigi/luigi_mapreduce.py
import luigi
import luigi.contrib.hadoop
import luigi.contrib.hdfs

class InputFile(luigi.ExternalTask):
   """
   A task wrapping the HDFS target
   """
   input_file = luigi.Parameter()

   def output(self):
      """
      Return the target on HDFS
      """
      return luigi.contrib.hdfs.HdfsTarget(self.input_file)

class WordCount(luigi.contrib.hadoop.JobTask):
   """
   A task that uses Hadoop streaming to perform WordCount
   """
   input_file = luigi.Parameter()
   output_file = luigi.Parameter()

   # Set the number of reduce tasks
   n_reduce_tasks = 1

   def requires(self):
      """
      Read from the output of the InputFile task
      """
      return InputFile(self.input_file)

   def output(self):
      """
      Write the output to HDFS
      """
      return luigi.contrib.hdfs.HdfsTarget(self.output_file)

   def mapper(self, line):
      """
      Read each line and produce a word and 1
      """
      for word in line.strip().split():
         yield word, 1

   def reducer(self, key, values):
      """
      Read each word and produce the word and the sum of 
      its values
      """
      yield key, sum(values)

if __name__ == '__main__':
   luigi.run(main_task_cls=WordCount)

Luigi comes packaged with support for Hadoop streaming. The task implementing the MapReduce job must subclass luigi⁠.con⁠trib​.hadoop.JobTask. The mapper() and reducer() methods can be overridden to implement the map and reduce methods of a MapReduce job.

The following command will execute the workflow, reading from /user/hduser/input.txt and storing the results in /user/hduser/wordcount on HDFS:

$ python luigi_mapreduce.py --local-scheduler 
--input-file /user/hduser/input/input.txt 
--output-file /user/hduser/wordcount

Pig in Luigi

Luigi can be used to control the execution of Pig on a Hadoop cluster (Example 5-4).

Example 5-4. python/Luigi/luigi_pig.py
import luigi
import luigi.contrib.pig
import luigi.contrib.hdfs

class InputFile(luigi.ExternalTask):
   """
   A task wrapping the HDFS target
   """
   input_file = luigi.Parameter()

   def output(self):
      return luigi.contrib.hdfs.HdfsTarget(self.input_file)

class WordCount(luigi.contrib.pig.PigJobTask):
   """
   A task that uses Pig to perform WordCount
   """
   input_file = luigi.Parameter()
   output_file = luigi.Parameter()
   script_path = luigi.Parameter(default='pig/wordcount.pig')

   def requires(self):
      """
      Read from the output of the InputFile task
      """
      return InputFile(self.input_file)

   def output(self):
      """
      Write the output to HDFS
      """
      return luigi.contrib.hdfs.HdfsTarget(self.output_file)     

   def pig_parameters(self):
      """
      A dictionary of parameters to pass to pig
      """
      return {'INPUT': self.input_file, 'OUTPUT': self.output_file}

   def pig_options(self):
      """
      A list of options to pass to pig
      """
      return ['-x', 'mapreduce']

   def pig_script_path(self):
      """
      The path to the pig script to run
      """
      return self.script_path

if __name__ == '__main__':
   luigi.run(main_task_cls=WordCount)

Luigi comes packaged with support for Pig. The task implementing the Pig job must subclass luigi.contrib.hadoop.PigJobTask. The pig_script_path() method is used to define the path to the Pig script to run. The pig_options() method is used to define the options to pass to the Pig script. The pig_parameters() method is used to pass parameters to the Pig script.

The following command will execute the workflow, reading from /user/hduser/input.txt and storing the results in /user/hduser/output on HDFS. The --script-path parameter is used to define the Pig script to execute:

$ python luigi_pig.py --local-scheduler 
--input-file /user/hduser/input/input.txt 
--output-file /user/hduser/output 
--script-path pig/wordcount.pig

Chapter Summary

This chapter introduced Luigi as a Python workflow scheduler. It described the components of a Luigi workflow and contained examples of using Luigi to control MapReduce jobs and Pig scripts.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset