Chapter 10. Developer Topics

In previous chapters, we primarily focused on how to use and manage Oozie efficiently. We explained the details of the Oozie service and the various features it supports. Oozie users and administrators were the target audience for those chapters. In this chapter, we cover Oozie from a developer’s perspective. In particular, we discuss how to leverage Oozie’s extensible framework to expand and broaden its feature set. We see how to add custom EL functions and how to develop new synchronous and asynchronous action types.

Developing Custom EL Functions

The parameterization framework of Oozie enables users to easily build reusable and manageable applications. This feature includes variable substitution and EL functions for workflows, coordinators, and bundles. We discussed this in detail in “Parameterization”. More specifically, Oozie provides a bunch of built-in EL functions for most of the common use cases. However, users often feel the need for EL functions for new or special use cases. The parameterization framework of Oozie is extensible and allows the addition of new functions with minimal effort. In this section, we describe the steps needed to add an EL function.

Requirements for a New EL Function

Before deciding to write a new EL function, users must first determine whether it is even a good idea to develop a new function. It is highly recommended that the new EL function be simple, fast, and robust. This is critical because Oozie executes the EL functions on the Oozie server. A poorly written function can add unnecessary overhead to the server and threaten Oozie’s stability. Also, if the function runs for a long time, it could slow down the Oozie server.

In general, the new function should not perform any resource-intensive operation or be dependent on external systems and services. For these types of requirements, users are advised to use either the Java or shell action defined in “Java Action” and “Shell Action” instead of an EL function. After due consideration, if it is still determined that implementing a new EL function is the right way to go, it’s important that the user makes sure the function is robust, stable, reviewed, and tested. For instance, it will be highly inconvenient if an Oozie application fails due to an unhandled exception caused by some edge case in the new EL function.

Typically, the Oozie administrator in the organization has to approve any new EL function. The Oozie web application archive (oozie.war) should contain the new JAR supporting the new EL function. In most organizations, it would be necessary for the administrator to help inject the new JAR into the Oozie server and to configure it accordingly. An Oozie server restart is also required to make the new EL function available. In other words, end users cannot write and deploy their own EL functions without the help of the Oozie administrator.

Implementing a New EL Function

In this section, we describe how to develop and deploy a new EL function into Oozie. We also present a workflow demonstrating the usage of the new EL function.

Writing a new EL function

Oozie doesn’t require users to implement a specific interface or extend a particular base class in order to write a new EL function. However, the function should be a public static method in a Java class. For example, let’s implement a basic utility method to tokenize a string using a specific pattern and return a token at a particular position.

The following Java code shows a sample implementation. We need to compile and package this class into a JAR file (say oozie-custom-el.jar):

 package com.oreilly.oozie.customel;
            
 public class CustomELFunctions {
   public static String splitAndGet(String str, String expr, int pos) {
     if(str == null || expr == null || pos < 0) {
       return null;
     }
     String[] splitArr = str.split(expr, pos + 1);
     if(splitArr.length <= pos) {
       return null;
     }
     return splitArr[pos];
   }
 }
 

Deploy the new EL function

To deploy the new EL function on the Oozie server, we need to inject the JAR created in the previous step. The ideal way to do this is to follow the steps described in “Install Oozie Server”. The only additional step is to copy the custom JAR (oozie-custom-el.jar) to the libext/ directory before executing the command bin/oozie-setup.sh prepare-war.

Next, we need to configure the Oozie server by adding these lines to the conf/oozie-site.xml file. The property oozie.service.ELService.ext.functions.workflow must include all maps of the extended EL function to the associated class and method. Multiple map entries are separated by commas. The following example shows how to add the new function splitAndGet to the Oozie system:

 <property>
   <name>oozie.service.ELService.ext.functions.workflow</name>
   <value> 
     splitAndGet=com.oreilly.oozie.customel.CustomELFunctions#splitAndGet
   </value>
   <description>
     EL functions declarations, separated by commas, format is 
      [PREFIX:]NAME=CLASS#METHOD. This property is a convenience 
      property to add extensions to the built in executors without 
      having to include all the built in ones.
   </description>
 </property>
Note

An optional prefix can be added to the function name in the declaration to help organize the functions into logical groups. For example, we could have used custom:splitAndGet as the name. If you choose to use this optional prefix, you’ll need to use it in your workflow.xml file as well.

After including the JAR and updating the configuration, we need to restart the Oozie server. Alternatively, in the development phase, we can short-circuit the JAR injection and oozie.war file re-creation. For testing purposes, we can directly copy the JAR into the WEB-INF/lib directory. But we still need to update the configuration and restart the Oozie server.

Tip

If users want to support any new EL function at the coordinator level, they need to modify the coordinator-specific property in the oozie-site.xml file. Check the oozie-default.xml file for more details on a variety of coordinator related properties available to support such extensions.

Using the new function

Now that we have seen how a developer can implement a new function and how an administrator can add it to the Oozie server, we will see how any user on the system can use this EL function in her workflow definition. We borrowed the following example from the java-main example that comes with the Oozie distribution. We have added the line <arg>${splitAndGet("I Installed Apache Oozie!", " ", 2)}</arg> here to demonstrate the usage of the new EL function. In this example, the EL function returns Apache and that is passed as the second argument to the Java main class:

<workflow-app xmlns="uri:oozie:workflow:0.5" name="java-main-wf">
   <start to="java-node"/>
   <action name="java-node">
   <java>
     <job-tracker>${jobTracker}</job-tracker>
     <name-node>${nameNode}</name-node>
     <configuration>
       <property>
         <name>mapred.job.queue.name</name>
         <value>${queueName}</value>
       </property>
     </configuration>
     <main-class>org.apache.oozie.example.DemoJavaMain</main-class>
     <arg>Hello</arg>
     <arg>${splitAndGet("I Installed Apache Oozie!", " ", 2)}</arg>
   </java>
   <ok to="end"/>
   <error to="fail"/>
  </action>
  <kill name="fail">
    <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
    </message>
  </kill>
  <end name="end"/>
</workflow-app>

Supporting Custom Action Types

Oozie provides a set of common action types, which we described in “Action Types”. Depending on how Oozie executes an action, the workflow actions are broadly divided into two categories, synchronous and asynchronous, as explained in “Synchronous Versus Asynchronous Actions”. Oozie executes the synchronous action on the Oozie server and blocks the execution thread until it completes. In this model, each execution instance of the action shares resources with the Oozie server and impacts Oozie’s performance. Moreover, there is no isolation between the execution of Oozie’s services and the execution of the action, which can potentially destabilize the Oozie server. So adding a new synchronous action type is highly discouraged. Users should consider adding it only if the action is simple, runs for a very short period of time, and doesn’t execute any user code.

On the other hand, Oozie starts an asynchronous action and immediately returns without waiting for the action to finish. The actual action execution occurs outside the Oozie server on the Hadoop compute nodes. When the spawned action completes, it informs Oozie through a callback. The asynchronous execution model guarantees isolation between the action, which can run user code, and the execution of Oozie’s core services. So the recommended way of writing a heavy-duty action is to use the asynchronous model. In this section, we cover how to write both types of actions.

Creating a Custom Synchronous Action

Let’s now look at the steps required to support a new synchronous action type. The first step is to write a new action executor followed by writing an XML schema (XSD) file for the new action. We should then deploy the new action type onto the Oozie server through the oozie-site.xml. Finally, we will show how to write and submit a test workflow using the new action type.

In this example, we will implement a new synchronous action that can execute a SQL statement against any MySQL instance. The output of the SQL can either be stored in a local file on the Oozie server machine or written to stdout.

Caution

Note that this is merely an example to demonstrate the required steps to write a new synchronous action. Ideally, this specific kind of action should not be a synchronous type.

Writing an ActionExecutor

Every action executor class should extend Oozie’s ActionExecutor class, which is part of Oozie’s core package. Developers should then implement the following methods required by Oozie:

Constructor

Action developers need to write a no-arg constructor that ultimately calls the super-class constructor passing the new action name (e.g., super("syncMysql")). End users will use this name to define the new action type in the workflow XML.

start(ActionExecutor.Context context, Action action)

Oozie invokes this method when it needs to execute the action. Oozie passes two parameters to this method. The first parameter context provides the APIs to access all workflow configurations/variables for this action, set the action status, and return any data to be used in the execution path. The second parameter action includes the action’s definition from the workflow XML. All synchronous actions must override this method because this method performs the actual execution. At the end, the method needs to call context.setExecutionData(externalStatus, actionData) to pass back the action status and any action-specific data.

check(ActionExecutor.Context context, Action action)

Oozie calls this method to check the action status. For synchronous actions, Oozie does not need or call this method. Therefore, for this example, it’s recommended this method just throw an UnsupportedOperationException.

kill(ActionExecutor.Context context, Action action)

Oozie executes this method when it needs to kill the action for any reason. Typical implementation of this method calls context.setEndData(status, signalValue), passing Action.Status.KILLED as the status and ERROR as the signalValue.

end(ActionExecutor.Context context, Action action)

Oozie invokes this method when the execution is finished. In this method, the action executor should perform any cleanup required after completion. The implementation usually calls context.setEndData(status, signalValue). The status and signal value determine the next course of action.

isCompleted(externStatus)

This utility method is used to determine if an action status is in a terminal state.

These methods are required for Oozie to run any action. Additionally, we implement two new methods for this specific action: runMySql() to execute the SQL and writeResultSet() to store the output into a file. As mentioned earlier, the start() method is the entry point for the execution of a synchronous action. At the very beginning of this method, we need to retrieve the action’s definition and parse it using an XML parser. The actual schema of the action definition is defined in the action’s XSD file, which we discuss in the next section:

public class MySQLSyncActionExecutor extends ActionExecutor {
                
   private static final String SYNC_MYSQL_ACTION_NS = 
                                    "uri:oozie:sync-mysql-action:0.1";
   private static final String ACTION_NAME = "syncMysql";
                
   protected MySQLSyncActionExecutor() {
     super(ACTION_NAME);
   }
                
   @Override
   public void start(Context context, WorkflowAction action)
     throws ActionExecutorException {
     context.setStartData("-", "-", "-");
     try {
       Element actionXml = XmlUtils.parseXml(action.getConf());
       Namespace ns = Namespace.getNamespace(SYNC_MYSQL_ACTION_NS);
                
       String jdbcUrl = actionXml.getChildTextTrim("jdbcUrl", ns);
       String sql = actionXml.getChildTextTrim("sql", ns);
       String sqlOutputFilePath = actionXml.getChildTextTrim(
         "sql_output_file_path", ns);
       runMysql(jdbcUrl, sql, sqlOutputFilePath);
         context.setExecutionData("OK", null);
     } catch (JDOMException e) {
       throw convertException(e);
     }
  }
                
  @Override
  public void end(Context context, WorkflowAction action)
    throws ActionExecutorException {
    if (action.getExternalStatus().equals("OK")) {
      context.setEndData(WorkflowAction.Status.OK,
      WorkflowAction.Status.OK.toString());
    } else {
       context.setEndData(WorkflowAction.Status.ERROR,
       WorkflowAction.Status.ERROR.toString());
    }
  }
                
  @Override
  public void kill(Context context, WorkflowAction action)
    throws ActionExecutorException {
    context.setEndData(WorkflowAction.Status.KILLED, "ERROR");
  }
                
  @Override
  public void check(Context arg0, WorkflowAction arg1)
    throws ActionExecutorException {
    throw new UnsupportedOperationException();
  }
                
  private static Set<String> COMPLETED_STATUS = new HashSet<String>();
    static {
      COMPLETED_STATUS.add("SUCCEEDED");
      COMPLETED_STATUS.add("KILLED");
      COMPLETED_STATUS.add("FAILED");
      COMPLETED_STATUS.add("FAILED_KILLED");
    }
                    
  @Override
  public boolean isCompleted(String externalStatus) {
    return COMPLETED_STATUS.contains(externalStatus);
  }
                    
  /*
  * Execute a sql statement
  */
  private void runMysql(String jdbcUrl, String sql, String sqlOutputFilePath)
    throws ActionExecutorException {
    Connection connect = null;
    Statement statement = null;
    ResultSet resultSet = null;
    try {
      // this will load the MySQL driver, each DB has its own driver
      Class.forName("com.mysql.jdbc.Driver");
      // setup the connection with the DB.
      connect = DriverManager.getConnection(jdbcUrl);
      // statements allow to issue SQL queries to the database
      statement = connect.createStatement();
      // resultSet gets the result of the SQL query
      resultSet = statement.executeQuery(sql);
      writeResultSet(resultSet, sqlOutputFilePath);
    } catch (Exception e) {
      throw convertException(e);
    } finally {
      try {
        if (resultSet != null)
          resultSet.close();
        if (statement != null)
          statement.close();
        if (connect != null)
          connect.close();
      } catch (Exception e) {
        throw convertException(e);
      }
    }
  }
                    
  private void writeResultSet(ResultSet resultSet, String sqlOutputFilePath)
    throws Exception {
    // resultSet is initialised before the first data set
    PrintWriter out;
    if (sqlOutputFilePath != null && sqlOutputFilePath.length() > 0) {
      out = new PrintWriter(sqlOutputFilePath);
    } else {
      out = new PrintWriter(System.out);
    }
    // Get the metadata
    ResultSetMetaData md = resultSet.getMetaData();
    // Loop through the result set
    while (resultSet.next()) {
      for (int i = 1; i <= md.getColumnCount(); i++) {
        out.println(md.getColumnName(i) + "=" + resultSet.getString(i));
      }
    }
    out.close();
    }
  }
Tip

The compilation of any action executor class requires at least two Oozie JARs: oozie-core and oozie-client. The JARs for the most recent versions (starting with version 4.1.0) are published in a Maven repository. If for some reason it is not available, developers will need to get the JARs by building the Oozie source code themselves.

Writing the XML schema

A schema file is required for strict enforcement of the XML syntax of the action definition that the user writes in her workflow.xml file. The details of writing a good XML schema can be found online. The following example XSD file is self-explanatory and very similar to any other Oozie action:

<?xml version="1.0" encoding="UTF-8"?>
   <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
     xmlns:sync-mysql="uri:oozie:sync-mysql-action:0.1"
     elementFormDefault="qualified"
     targetNamespace="uri:oozie:sync-mysql-action:0.1">
                    
     <xs:complexType name="SYNC_MYSQL_TYPE">
      <xs:sequence>
       <xs:element name="jdbcUrl" type="xs:string" minOccurs="1" maxOccurs="1"/>
       <xs:element name="sql" type="xs:string" minOccurs="1" maxOccurs="1"/>
       <xs:element name="sql_output_file_path" type="xs:string" minOccurs="0"
         maxOccurs="1"/>
      </xs:sequence>
     </xs:complexType>
     <xs:element name="syncMysql" type="sync-mysql:SYNC_MYSQL_TYPE"/>
   </xs:schema>

Readers might remember that some standard Oozie actions like “Java Action” and “Shell Action” support an element called <capture-output> that can be used to pass the output back to the Oozie context. The example action shown here writes the results to an output file (in the writeResultSet() function) and does not support the <capture-output> feature. This is the right approach for this particular action given that the output generated could be large and not suitable for passing between actions. But developers can implement <capture-output> in their custom actions if the use case demands it. The key steps in supporting this feature are to generate the output in a properties file format and write it to a file defined by the system property oozie.action.output.properties. Refer to the code sample in “Java Action” to see how to implement it (do remember to include the <capture-output> element in the schema just explained if you want your action to support it).

Deploying the new action type

After we develop the code and the XSD file, we have to compile the code and package them both into a JAR file. The XSD file should be at the root level of the new JAR file. Since the preceding code uses the MySQL JDBC APIs to run the query, we would need the mysql-connector JAR during compilation and execution. Compilation can use either the IDE’s external classpath or the build file to access the JAR. The MySQL JAR can even be included as part of the new JAR we are creating. We can then inject both the JARs (or one “fat” JAR) when creating the Oozie WAR file (“Install Oozie Server”). The newly created JAR and the MySQL JAR must be copied into the libext/ directory before executing the bin/oozie-setup.sh prepare-war command. However, for testing purposes, we can bypass these standard steps and directly copy the JARs into $CATALINA_BASE/webapps/oozie/WEB-INF/lib/.

Next, we need to configure oozie-site.xml to enable Oozie to use the new action type. There are two key properties to configure. The property oozie.service.ActionService.executor.ext.classes specifies a comma-separated list of new executor classes. For this example, we should append com.oreilly.oozie.sync.customaction.MySQLSyncActionExecutor to that list. The oozie.service.SchemaService.wf.ext.schemas property defines the additional schema files required for new actions. For this example, we add sync_mysql-0.1.xsd. After all these configuration changes, the Oozie server needs to be restarted as always:

<property>
  <name>oozie.service.ActionService.executor.ext.classes</name>
  <value>
    org.apache.oozie.action.email.EmailActionExecutor,
    org.apache.oozie.action.hadoop.HiveActionExecutor,
    org.apache.oozie.action.hadoop.ShellActionExecutor,
    org.apache.oozie.action.hadoop.SqoopActionExecutor,
    org.apache.oozie.action.hadoop.DistcpActionExecutor,
    com.oreilly.oozie.sync.customaction.MySQLSyncActionExecutor
  </value>
</property>
                
<property>
  <name>oozie.service.SchemaService.wf.ext.schemas</name>
  <value>
    shell-action-0.1.xsd,shell-action-0.2.xsd,shell-action-0.3.xsd,
    email-action-0.1.xsd, hive-action-0.2.xsd,hive-action-0.3.xsd,
    hive-action-0.4.xsd,hive-action-0.5.xsd,sqoop-action-0.2.xsd,
    sqoop-action-0.3.xsd,sqoop-action-0.4.xsd,ssh-action-0.1.xsd,
    ssh-action-0.2.xsd,distcp-action-0.1.xsd,distcp-action-0.2.xsd,
    oozie-sla-0.1.xsd,oozie-sla-0.2.xsd, sync_mysql-0.1.xsd
  </value>
</property>

Using the new action type

Once the new action is deployed, it’s time to verify it. We will use the following simple workflow.xml to test it. The goal here is to execute an SQL query and write the output to a local file on the Oozie server. The corresponding job.properties file is shown here as well. This job assumes that the MySQL server is configured and running properly. Let’s also assume that a database named oozie_book and a table named income are already in place with the right schema and a few records. The name of the DB, login, password, and the table are all defined in the job.properties file. The output directory for this Oozie job is configured as $OOZIE_HOME/my_sync_sqloutput.txt. Again, this is just an example and a synchronous action is not the best approach for an operation like this:

<workflow-app xmlns="uri:oozie:workflow:0.4" name="sync-mysql-wf">
   <start to="sync-mysql-node"/>
   <action name="sync-mysql-node">
     <syncMysql xmlns="uri:oozie:sync-mysql-action:0.1">
       <jdbcUrl>${jdbcURL}</jdbcUrl>
       <sql>${sql}</sql>
       <sql_output_file_path>${SQL_OUTPUT_PATH}</sql_output_file_path>
     </syncMysql>
     <ok to="end"/>
     <error to="fail"/>
   </action>
   <kill name="fail">
     <message>syncMysql failed, error message
        [${wf:errorMessage(wf:lastErrorNode())}]
     </message>
   </kill>
   <end name="end"/>
</workflow-app>
                              
$ cat job.properties                
 nameNode=hdfs://localhost:8020
 queueName=default
 examplesRoot=examples
                
   oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/
 sync_mysql
 oozie.use.system.libpath=true
 jdbcURL=jdbc:mysql://localhost:3306/oozie_book?user=oozie_book_user&
 password=oozie_book_pw
 sql=select count(*) from oozie_book.income;
 SQL_OUTPUT_PATH=my_sync_sqloutput.txt

Overriding an Asynchronous Action Type

Let’s quickly recap Oozie’s execution model as described in “Action Execution Model”. For an asynchronous action, Oozie submits a launcher mapper task to Hadoop along with all the required JARs and configuration for executing the actual action. In particular, there are two important classes involved in executing any asynchronous action in Oozie. The class derived from ActionExecutor submits the launcher job to Hadoop. It also includes the required JARs and configuration for the actual action. ActionExecutor uses Hadoop’s distributed cache to pass the JARs and configuration to the correct compute node. The launcher job ultimately starts a single map task that is implemented by Oozie to execute any action type. This mapper is widely known as the LauncherMapper.

The map task (LauncherMapper) invokes the main() method of the action execution class that runs on the compute node. This class is known as LauncherMain. Different action types extend this class to create their own main class that executes action-specific code. For example, the PigMain class runs the Pig script and MapReduceMain submits the actual MapReduce job. In this section, we discuss how to override these Main classes. Users can write these custom main classes to override the default implementation and package them with their application. This doesn’t require any modification on the server side of Oozie. The content in this section will help us understand the next section, where we describe how to write a new asynchronous action from the ground up using the ActionExecutor and the associated main class.

The out-of-the-box <map-reduce> action supports only the old Hadoop API . In other words, it supports mapper and reducer classes written using the mapred (old) API. However, there is a way to use the new mapreduce API for the Hadoop job using some special configuration as described in “Supporting New API in MapReduce Action”. There is also another way to execute a MapReduce job that’s written using the new API by overriding the default Main class of the <map-reduce> action. In this example, we will see how to replace the old API-based job submission with the new API.

Implementing the New ActionMain Class

As explained earlier, the default <map-reduce> main class (MapreduceMain) supports executing jobs that are written using org.apache.hadoop.mapred package, also known as the old API. In this example, we will create a new main class called MapReduceNewMain that submits jobs written using the new API (org.apache.hadoop.mapreduce). However, we will still use the default <map-reduce> action executor. This allows the users to use the existing <map-reduce> action type with minimal changes. This implementation is completely in the user domain and this change doesn’t require any modification or restart of the Oozie server. The user can simply replace the original Main class with the new one during job submission.

The implementation of the MapReduceNewMain class below is based on the existing MapreduceMain class. A lot of boilerplate code is borrowed from the exiting implementation. The main difference is to use the new mapreduce API for job submission. In particular, we replace the JobClient class with the Job class and the JobConf with the Configuration class. The complete implementation is shown here:

      
 package com.oreilly.oozie.custommain;
    
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.oozie.action.hadoop.LauncherMain;
    
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
 import java.io.IOException;
 import java.io.FileOutputStream;
 import java.io.OutputStream;
 import java.io.File;
    
    
 public class MapReduceNewMain extends LauncherMain {
    
 public static final String OOZIE_MAPREDUCE_UBER_JAR = 
    "oozie.mapreduce.uber.jar";
    
 public static void main(String[] args) throws Exception {
   run(MapReduceNewMain.class, args);
 }
    
 protected void run(String[] args) throws Exception {
   System.out.println();
   System.out.println("Oozie Map-Reduce action configuration");
   System.out.println("=======================");
    
   // Loading the action conf prepared by Oozie
   // This is the same action configuration defined in the workflow
   // XML file as pat of the <configuration> section.
   Configuration actionConf = new Configuration(false);
   actionConf.addResource(new Path("file:///", System
    .getProperty("oozie.action.conf.xml")));
    
   logMasking("New Map-Reduce job configuration:", new HashSet<String>(),
      actionConf);
      
    System.out.println("Submitting Oozie action Map-Reduce job");
    System.out.println();
    // submitting job
    Job job = submitJob(actionConf);
    System.out.println("After job submission");
    // propagating job id back to Oozie
    String jobId = job.getJobID().toString();
    System.out.println("Job ID is :" + jobId);
    Properties props = new Properties();
    props.setProperty("id", jobId);
    File idFile = new File(
    System.getProperty("oozie.action.newId.properties"));
    OutputStream os = new FileOutputStream(idFile);
    props.store(os, "");
    os.close();
      
    System.out.println("=======================");
    System.out.println();
  }
      
  protected void addActionConf(Configuration conf, Configuration actionConf) {
   for (Map.Entry<String, String> entry : actionConf) {
     conf.set(entry.getKey(), entry.getValue());
   }
  }
        
  protected Job submitJob(Configuration actionConf) throws Exception {
    Configuration conf = new Configuration();
    addActionConf(conf, actionConf);
        
   // Propagate delegation related props from the launcher job to the MR job.
   // This is critical on secure Hadoop where delegation tokens
   // will be made available through these settings.
    if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
      conf.set("mapreduce.job.credentials.binary",
      System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
    }
    Job job = null;
    try {
      job = createJob(conf);
      job.submit();
    } catch (Exception ex) {
      throw ex;
    }
    return job;
  }
        
  protected Job createJob(Configuration conf) throws IOException {
    @SuppressWarnings("deprecation")
    Job job = new Job(conf);
    // Set for uber Jar
    String uberJar = conf.get(OOZIE_MAPREDUCE_UBER_JAR);
    if (uberJar != null && uberJar.trim().length() > 0) {
      job.setJar(uberJar);
    }
    return job;
  }
        
}

Testing the New Main Class

After we compile the new main class and package it into a JAR file, we are ready to test this new functionality. The following workflow.xml defines the mapper and reducer written using the new mapreduce API. Readers are advised to pay close attention to the property names corresponding to the new API in addition to the oozie.launcher.action.main.class. This workflow is based on the wordcount example that comes with the Hadoop distribution:

  
 <workflow-app xmlns="uri:oozie:workflow:0.5" name="map-reduce-wf">
   <start to="mr-node"/>
   <action name="mr-node">
     <map-reduce>
       <job-tracker>${jobTracker}</job-tracker>
       <name-node>${nameNode}</name-node>
       <prepare>
         <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/
            output-data/${outputDir}"/>
       </prepare>
       <configuration>
         <!-- Using a custom MapReduceMain for new API class-->
         <property>
           <name>oozie.launcher.action.main.class</name>
           <value>com.oreilly.oozie.custommain.MapReduceNewMain</value>
         </property>
         <property>
           <name>mapreduce.job.queuename</name>
           <value>${queueName}</value>
         </property>
         <property>
           <name>mapreduce.job.map.class</name>
           <value>org.apache.hadoop.examples.WordCount$TokenizerMapper</value>
         </property>
         <property>
           <name>mapreduce.job.reduce.class</name>
           <value>org.apache.hadoop.examples.WordCount$IntSumReducer</value>
         </property>
         <property>
           <name>mapreduce.job.output.key.class</name>
           <value>org.apache.hadoop.io.Text</value>
         </property>
         <property>
           <name>mapreduce.job.output.value.class</name>
           <value>org.apache.hadoop.io.IntWritable</value>
         </property>
         <property>
           <name>mapreduce.input.fileinputformat.inputdir</name>
           <value>/user/${wf:user()}/${examplesRoot}/input-data/text</value>
         </property>
         <property>
           <name>mapreduce.output.fileoutputformat.outputdir</name>
           <value>/user/${wf:user()}/${examplesRoot}/output-data/${outputDir}
           </value>
         </property>
       </configuration>
     </map-reduce>
    <ok to="end"/>
    <error to="fail"/>
  </action>
  <kill name="fail">
    <message>Map/Reduce failed, error message[${wf:errorMessage
    (wf:lastErrorNode())}]</message>
  </kill>
  <end name="end"/>
</workflow-app>

Next we need to include the required JARs in the workflow lib/ directory. We need to copy the JAR that includes MapReduceNewMain class. As always, we also need to copy the JAR that includes the mapper (TokenizerMapper) and reducer (IntSumReducer) classes. Since we reuse the Apache Hadoop example, we have to copy the hadoop-mapreduce-examples JAR into the lib/ directory to be able to run this example:

$ hdfs dfs -put my-custom-main
$ hdfs dfs -lsr my-custom-main
  -rw-r--r--   1 joe jgrp    172 2014-11-15 14:48 my-custom-main/job.properties
  drwxr-xr-x   - joe jgrp      0 2014-11-15 14:48 my-custom-main/lib
  -rw-r--r--   1 joe jgrp 270261 2014-11-15 14:48 my-custom-main/lib/hadoop-
     mapreduce-examples-2.3.0.jar
  -rw-r--r--   1 joe jgrp  19949 2014-11-15 14:48 my-custom-main/lib/
     oozie-extensions.jar
  -rw-r--r--   1 joe jgrp   2452 2014-11-15 14:48 my-custom-main/workflow.xml

We will use the following job.properties file and commands to run the workflow:

$cat job.properties 
   nameNode=hdfs://localhost:8020
   #Actually RM endpoint for Hadoop 2.x
   jobTracker=localhost:8032
   queueName=default
        
   oozie.wf.application.path=${nameNode}/user/${user.name}/my-custom-main
   outputDir=map-reduce-new
        
$ oozie job -config job.properties -run
   job: 0000001-141115153201961-oozie-joe-W

Creating a New Asynchronous Action

In this section, we discuss the steps required to develop a completely new asynchronous action. Most of the steps are already discussed in the previous two sections and we tweak them to develop a new action type. In “Creating a Custom Synchronous Action”, we described how to implement a synchronous action. In short, we created a new ActionExcutor, a new XSD file, and then ultimately deployed it on the Oozie server. Then, in “Overriding an Asynchronous Action Type”, we explained the steps required to override the ActionMain of an existing asynchronous action. To be more specific, the previous example only overrides the ActionMain class that runs on the compute node. It doesn’t modify the ActionExecutor that runs on the Oozie server and is responsible for launching the LauncherMapper.

Writing an Asynchronous Action Executor

The asynchronous action executor usually extends the JavaActionExecutor that comes with the oozie-core package. Most of the common functionalities are already implemented in the JavaActionExecutor. This important class basically packages the required classes (e.g., ActionMain class) and configuration and kicks off the launcher job on Hadoop. It also passes the JARs and configuration to the launcher mapper through the Hadoop distributed cache. The launcher mapper ultimately invokes the main() method of the ActionMain class. The main() method implements the actual action such as executing a Pig script for the <pig> action.

Tip

Any action executor that extends JavaActionExecutor needs to be implemented under the org.apache.oozie.action.hadoop package, though the code can reside outside the Oozie code base. This is required because of a bug/constraint in the JavaActionExecutor class where some of the required methods are declared with “package” scope instead of “protected”. So any derived class needs to be under the same package name.

Here we implement the same use case of executing a MySQL query that we described in “Creating a Custom Synchronous Action” but using an asynchronous action:

 package org.apache.oozie.action.hadoop;
    
 import java.util.List;
    
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.action.hadoop.JavaActionExecutor;
 import org.apache.oozie.action.hadoop.LauncherMain;
 import org.apache.oozie.action.hadoop.LauncherMapper;
 import org.apache.oozie.action.hadoop.MapReduceMain;
 import org.jdom.Element;
 import org.jdom.Namespace;
    
 public class MySQLActionExecutor extends JavaActionExecutor {
    
   private static final String MYSQL_MAIN_CLASS_NAME = 
     "org.apache.oozie.action.hadoop.MySqlMain";
   public static final String JDBC_URL = "oozie.mysql.jdbc.url";
   public static final String SQL_COMMAND = "oozie.mysql.sql.command";
   public static final String SQL_OUTPUT_PATH = "oozie.mysql.sql.output.path";
    
   public MySQLActionExecutor() {
    super("mysql");
   }
    
   @Override
   protected List<Class> getLauncherClasses() {
     List<Class> classes = super.getLauncherClasses();
       classes.add(LauncherMain.class);
       classes.add(MapReduceMain.class);
       try {
         classes.add(Class.forName(MYSQL_MAIN_CLASS_NAME));
       } catch (ClassNotFoundException e) {
         throw new RuntimeException("Class not found", e);
       }
     return classes;
   }
        
   @Override
   protected String getLauncherMain(Configuration launcherConf,
     Element actionXml) {
     return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS,
       MYSQL_MAIN_CLASS_NAME);
   }
        
   @Override
   @SuppressWarnings("unchecked")
   Configuration setupActionConf(Configuration actionConf, Context context,
     Element actionXml, Path appPath) throws ActionExecutorException {
     super.setupActionConf(actionConf, context, actionXml, appPath);
     Namespace ns = actionXml.getNamespace();
        
     String sql = actionXml.getChild("sql", ns).getTextTrim();
     String jdbcUrl = actionXml.getChild("jdbcUrl", ns).getTextTrim();
     String sqlOutPath = actionXml.getChild("sql_output_file_path", ns)
       .getTextTrim();
        
     actionConf.set(JDBC_URL, jdbcUrl);
     actionConf.set(SQL_COMMAND, sql);
     actionConf.set(SQL_OUTPUT_PATH, sqlOutPath);
     return actionConf;
   }
        
   @Override
   protected String getDefaultShareLibName(Element actionXml) {
     return "mysql";
   }
 }

In the preceding class, the constructor just calls its super-class constructor passing the new action name (i.e., mysql). This name is what will be used as an action type when the user writes the workflow definition. The method getLauncherClasses returns the list of classes required to be executed by the launcher mapper. Oozie server makes these classes available to the launcher mapper through the distributed cache. It includes the LauncherMain base class and the actual action main class (MySqlMain, described next). The MapReduceMain class is included only to support a few utility methods.

The method getLauncherMain returns the ActionMain class (org.apache.oozie.action.hadoop.MySqlMain). The launcher map code calls the main() method of this class. The setupActionConf method adds the configuration that is passed to the ActionMain class through a configuration file. The last method getDefaultShareLibName returns the name of the subdirectory under the system sharelib directory. This subdirectory hosts most of the JARs required to execute this action. In this example, the mysql-connector-java-*.jar file needs to be copied to the mysql/ subdirectory under the sharelib directory.

Writing the ActionMain Class

This example implements the same MySQL use case described in “Implementing the New ActionMain Class”. The main difference is that this action is asynchronous while the previous example implemented a synchronous action. The main class in this example reuses most of the MySQL query execution code from the previous example:

 package org.apache.oozie.action.hadoop;
      
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.OutputStreamWriter;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.Statement;
      
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.action.hadoop.LauncherMain;
 import org.apache.oozie.action.hadoop.LauncherSecurityManager;
      
 public class MySqlMain extends LauncherMain {
      
   public static void main(String[] args) throws Exception {
     run(MySqlMain.class, args);
   }
      
   protected void run(String[] args) throws Exception {
     System.out.println();
     System.out.println("Oozie MySql action configuration");
     System.out
      .println("=============================================");
     // loading action conf prepared by Oozie
     Configuration actionConf = new Configuration(false);
      
     String actionXml = System.getProperty("oozie.action.conf.xml");
     if (actionXml == null) {
       throw new RuntimeException(
        "Missing Java System Property [oozie.action.conf.xml]");
     }
     if (!new File(actionXml).exists()) {
       throw new RuntimeException("Action Configuration XML file ["
        + actionXml + "] does not exist");
     }
      
     actionConf.addResource(new Path("file:///", actionXml));
     String jdbcUrl = actionConf.get(MySQLActionExecutor.JDBC_URL);
     if (jdbcUrl == null) {
       throw new RuntimeException("Action Configuration does not have "
         + MySQLActionExecutor.JDBC_URL + " property");
     }
      
     String sqlCommand = actionConf.get(MySQLActionExecutor.SQL_COMMAND);
     if (sqlCommand == null) {
       throw new RuntimeException("Action Configuration does not have "
        + MySQLActionExecutor.SQL_COMMAND + " property");
     }
      
     String sqlOutputPath = actionConf
       .get(MySQLActionExecutor.SQL_OUTPUT_PATH);
     if (sqlOutputPath == null) {
       throw new RuntimeException("Action Configuration does not have "
        + MySQLActionExecutor.SQL_OUTPUT_PATH + " property");
     }
      
     System.out.println("Mysql coomands :" + sqlCommand + " with JDBC url :"
       + jdbcUrl + " sqlOutputPath " + sqlOutputPath);
     System.out
      .println("====================================================");
     System.out.println();
     System.out.println(">>> Connecting to MySQL and executing sql now >>>");
     System.out.println();
     System.out.flush();
      
     try {
       runMysql(jdbcUrl, sqlCommand, sqlOutputPath);
     } catch (SecurityException ex) {
       if (LauncherSecurityManager.getExitInvoked()) {
         if (LauncherSecurityManager.getExitCode() != 0) {
           throw ex;
         }
       }
     }
      
     System.out.println();
     System.out.println("<<< Invocation of MySql command completed <<<");
     System.out.println();
   }
      
   public void runMysql(String jdbcUrl, String sql, String sqlOutputPath)
     throws Exception {
     Connection connect = null;
     Statement statement = null;
     ResultSet resultSet = null;
     try {
       // this will load the MySQL driver, each DB has its own driver
       Class.forName("com.mysql.jdbc.Driver");
       // setup the connection with the DB.
       System.out.println("JDBC URL :" + jdbcUrl);
       connect = DriverManager.getConnection(jdbcUrl);
      
       // statements allow to issue SQL queries to the database
       statement = connect.createStatement();
       // resultSet gets the result of the SQL query
       resultSet = statement.executeQuery(sql);
       writeResultSet(resultSet, sqlOutputPath);
     } finally {
       if (resultSet != null)
         resultSet.close();
       if (statement != null)
         statement.close();
       if (connect != null)
         connect.close();
     }
   }
      
   private void writeResultSet(ResultSet resultSet, String sqlOutputFilePath)
     throws Exception {
     Configuration configuration = new Configuration();
     Path outPath = new Path(sqlOutputFilePath);
      
     BufferedWriter out = null;
     FileSystem fs = null;
     try {
      fs = outPath.getFileSystem(configuration);
      if (fs.exists(outPath)) {
       fs.delete(outPath, true);
      }
      fs.mkdirs(outPath);
      Path outFile = new Path(outPath, "sql.out");
      System.out.print("Writing output to :" + outFile);
      out = new BufferedWriter(new OutputStreamWriter(fs.create(outFile),
       "UTF-8"));
      
      // Get the metadata
      ResultSetMetaData md = resultSet.getMetaData();
      int recNo = 1;
      // Loop through the result set
      while (resultSet.next()) {
        out.write("Record_No=" + recNo++ + ",");
        for (int i = 1; i <= md.getColumnCount(); i++) {
          out.write(md.getColumnName(i) + "="
            + resultSet.getString(i) + ",");
        }
        out.write('
'),
      }
    } finally {
      if (out != null) {
        out.close();
      }
      if (fs != null) {
        fs.close();
      }
    }
   }
 }

The ActionMain class MySqlMain is derived from the LauncherMain class. The run() method actually executes the MySQL query. All of the user-defined properties defined in the workflow.xml are passed to MySqlMain class through a file. The action configuration filename is also passed as a system property called oozie.action.conf.xml. After loading the configuration, the main class executes the query and stores the result into the HDFS file passed in as a job property. Alternatively, users can implement and use Oozie’s capture-output feature explained in “Java Action”.

Tip

The ActionMain class requires a Hadoop token if it wants to communicate to any service (such as NameNode, JobTracker/Resource Manager) running with Kerberos security enabled. The ActionMain class will need to pass the location of the token file with property name mapreduce.job.credentials.binary in job configuration as shown in the example code in “Implementing the New ActionMain Class”. If a user wants to pass other kinds of credentials such as Hive meta-store or HBase token, she can follow the instructions provided in “Supporting Custom Credentials”.

Writing Action’s Schema

The XSD file defined in this section is very similar to the one in “Writing the XML schema”:

 <?xml version="1.0" encoding="UTF-8"?>
   <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
   xmlns:mysql="uri:oozie:mysql-action:0.1" elementFormDefault="qualified"
   targetNamespace="uri:oozie:mysql-action:0.1">
        
   <xs:complexType name="MYSQL_TYPE">
     <xs:sequence>
       <xs:element name="job-tracker" type="xs:string" minOccurs="1" 
               maxOccurs="1"/>
       <xs:element name="name-node" type="xs:string" minOccurs="1" 
               maxOccurs="1"/>
       <xs:element name="job-xml" type="xs:string" minOccurs="0" 
               maxOccurs="unbounded"/>
       <xs:element name="configuration" type="mysql:CONFIGURATION" 
               minOccurs="0" maxOccurs="1"/>
       <xs:element name="jdbcUrl" type="xs:string" minOccurs="1" maxOccurs="1"/>
       <xs:element name="sql" type="xs:string" minOccurs="1" maxOccurs="1"/>
       <xs:element name="sql_output_file_path" type="xs:string" minOccurs="1" 
               maxOccurs="1"/>
       <xs:element name="file" type="xs:string" minOccurs="0" 
               maxOccurs="unbounded"/>
       <xs:element name="archive" type="xs:string" minOccurs="0" 
               maxOccurs="unbounded"/>
     </xs:sequence>
   </xs:complexType>
   <xs:complexType name="CONFIGURATION">
     <xs:sequence>
      <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
       <xs:complexType>
        <xs:sequence>
          <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
          <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
          <xs:element name="description" minOccurs="0" maxOccurs="1" 
                  type="xs:string"/>
        </xs:sequence>
      </xs:complexType>
     </xs:element>
    </xs:sequence>
   </xs:complexType>
   <xs:element name="mysql" type="mysql:MYSQL_TYPE"/>
 </xs:schema>

Deploying the New Action Type

The deployment of a new asynchronous action is very similar to the synchronous action described in “Deploying the new action type”. If written outside of the Oozie’s code base, the first step is to create a JAR that includes MySqlActionExecutor, MySqlMain, and the mysql-0.1.xsd file. Then include this JAR in the Oozie WAR file. After that, we modify the conf/oozie-site.xml file to include the new action executor and the new XSD file as shown here:

<property>
  <name>oozie.service.ActionService.executor.ext.classes</name>
  <value>
    org.apache.oozie.action.email.EmailActionExecutor,
    org.apache.oozie.action.hadoop.HiveActionExecutor,
    org.apache.oozie.action.hadoop.ShellActionExecutor,
    org.apache.oozie.action.hadoop.SqoopActionExecutor,
    org.apache.oozie.action.hadoop.DistcpActionExecutor,
    org.apache.oozie.action.hadoop.MySQLActionExecutor
  </value>
</property>
      
<property>
  <name>oozie.service.SchemaService.wf.ext.schemas</name>
  <value>
    shell-action-0.1.xsd,shell-action-0.2.xsd,shell-action-0.3.xsd,
    email-action-0.1.xsd,hive-action-0.2.xsd, hive-action-0.3.xsd,
    hive-action-0.4.xsd,hive-action-0.5.xsd,sqoop-action-0.2.xsd,
    sqoop-action-0.3.xsd,sqoop-action-0.4.xsd,ssh-action-0.1.xsd,
    ssh-action-0.2.xsd,distcp-action-0.1.xsd,distcp-action-0.2.xsd,
    oozie-sla-0.1.xsd,oozie-sla-0.2.xsd,mysql-0.1.xsd
  </value>
</property>
Tip

If the new action is developed under the Oozie code-base, most of these steps are not required. Those files will then be automatically included as part of the standard Oozie build and the deployment process.

In addition, we need to upload the MySQL JAR into the sharelib directory using the following commands (and then restart the Oozie server; also note that commands must be executed by the Oozie service user—that is, (oozie):

$ hdfs dfs -mkdir share/lib/mysql
$ hdfs dfs -put  ./oozie-server/webapps/oozie/WEB-INF/lib/
   mysql-connector-java-5.1.25-bin.jar share/lib/mysql/
$ bin/oozied.sh stop
$ bin/oozied.sh start

Using the New Action Type

We follow the same steps explained in “Using the new action type” to build a workflow that uses the new mysql action. The relevant workflow.xml and job.properties files are shown here:

$ cat workflow.xml
  <workflow-app xmlns="uri:oozie:workflow:0.4" name="my-mysql-wf">
    <start to="my-mysql-node"/>
    <action name="my-mysql-node">
      <mysql xmlns="uri:oozie:mysql-action:0.1">
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <jdbcUrl>${jdbcURL}</jdbcUrl>
        <sql>${sql}</sql>
        <sql_output_file_path>${SQL_OUTPUT_PATH}</sql_output_file_path>
      </mysql>
      <ok to="end"/>
      <error to="fail"/>
    </action>
    <kill name="fail">
      <message>mysql failed, error message[${wf:errorMessage
      (wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
  </workflow-app>
 
$ cat job.propeeties     
  nameNode=hdfs://localhost:8020
  jobTracker=localhost:8032
  queueName=default
  examplesRoot=examples
      
  oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/
                    apps/mysql
  oozie.use.system.libpath=true
  jdbcURL=jdbc:mysql://localhost:3306/oozie_book?user=oozie_book_user&
                    password=oozie_book_pw
  sql=select * from oozie_book.custom_actions;
  SQL_OUTPUT_PATH=my_sqloutput
Tip

It’s fairly common to see failures with the following exception in the launcher mapper log:

Failing Oozie Launcher, Main class 
[org.apache.oozie.action.hadoop.MySqlMain], main() threw
exception, com.mysql.jdbc.Driver 
java.lang.ClassNotFoundException: com.mysql.jdbc.Driver at
java.net.URLClassLoader$1.run(URLClassLoader.java:202)

This is most likely due to missing JARs. In this example, the mysql-connecor JAR was not copied into the mysql/ subdirectory under the sharelib on HDFS. Uploading the correct version of JARs to the correct location will solve this problem.

In this chapter, we focused on Oozie developers and showed how to extend current Oozie functionalities. More specifically, we elaborated on the steps required to write a custom EL function, a new synchronous action type, and a new asynchronous action type. In the next chapter, we will cover the operational aspects of Oozie and other sundry topics concerning managing and debugging Oozie.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset