In previous chapters, we primarily focused on how to use and manage Oozie efficiently. We explained the details of the Oozie service and the various features it supports. Oozie users and administrators were the target audience for those chapters. In this chapter, we cover Oozie from a developer’s perspective. In particular, we discuss how to leverage Oozie’s extensible framework to expand and broaden its feature set. We see how to add custom EL functions and how to develop new synchronous and asynchronous action types.
Oozie provides a set of common action types, which we described in “Action Types”. Depending on how Oozie executes an action, the workflow actions are broadly divided into two categories, synchronous and asynchronous, as explained in “Synchronous Versus Asynchronous Actions”. Oozie executes the synchronous action on the Oozie server and blocks the execution thread until it completes. In this model, each execution instance of the action shares resources with the Oozie server and impacts Oozie’s performance. Moreover, there is no isolation between the execution of Oozie’s services and the execution of the action, which can potentially destabilize the Oozie server. So adding a new synchronous action type is highly discouraged. Users should consider adding it only if the action is simple, runs for a very short period of time, and doesn’t execute any user code.
On the other hand, Oozie starts an asynchronous action and immediately returns without waiting for the action to finish. The actual action execution occurs outside the Oozie server on the Hadoop compute nodes. When the spawned action completes, it informs Oozie through a callback. The asynchronous execution model guarantees isolation between the action, which can run user code, and the execution of Oozie’s core services. So the recommended way of writing a heavy-duty action is to use the asynchronous model. In this section, we cover how to write both types of actions.
Let’s now look at the steps required to support a new synchronous action type.
The first step is to write a new action
executor
followed by writing an XML schema (XSD) file for the
new action. We should then deploy the new action type onto the Oozie
server through the oozie-site.xml.
Finally, we will show how to write and submit a test workflow using the
new action type.
In this example, we will implement a new synchronous action that
can execute a SQL statement against any MySQL instance. The output of
the SQL can either be stored in a local file on the Oozie server machine
or written to stdout
.
Note that this is merely an example to demonstrate the required steps to write a new synchronous action. Ideally, this specific kind of action should not be a synchronous type.
Every action executor class should extend Oozie’s ActionExecutor
class, which is part of Oozie’s core package. Developers should
then implement the following methods required by Oozie:
Action developers need to write a no-arg constructor that
ultimately calls the super-class constructor passing the new
action name (e.g., super("syncMysql")
). End users will use
this name to define the new action type in the workflow
XML.
Oozie invokes this method when it needs to execute the action. Oozie
passes two parameters to this method. The first parameter
context
provides the APIs to
access all workflow configurations/variables for this action,
set the action status, and return any data to be used in the
execution path. The second parameter action
includes the action’s
definition from the workflow XML. All synchronous actions must
override this method because this method performs the actual
execution. At the end, the method needs to call context.setExecutionData(
external
Status,
actionData)
to pass back the action status and any
action-specific data.
Oozie calls this method to check the action status. For
synchronous actions, Oozie does not need or call this method.
Therefore, for this example, it’s recommended this method just
throw an UnsupportedOperationException
.
Oozie executes this method when it needs to kill the action for
any reason. Typical implementation of this method calls context.setEndData(status,
signalValue)
, passing Action.Status.KILLED
as the status and
ERROR
as the
signalValue.
Oozie invokes this method when the execution is finished. In
this method, the action executor should perform any cleanup
required after completion. The implementation usually calls
context.setEndData(status,
signalValue)
. The status and signal value determine
the next course of action.
This utility method is used to determine if an action status is in a terminal state.
These methods are required for Oozie to run any action.
Additionally, we implement two new methods for this specific action:
runMySql()
to execute the SQL and
write
Result
Set()
to store the
output into a file. As mentioned earlier, the start()
method is the entry point for the
execution of a synchronous action. At the very beginning of this
method, we need to retrieve the action’s definition and parse it using
an XML parser. The actual schema of the action definition is defined
in the action’s XSD file, which we discuss in the next section:
public class MySQLSyncActionExecutor extends ActionExecutor { private static final String SYNC_MYSQL_ACTION_NS = "uri:oozie:sync-mysql-action:0.1"; private static final String ACTION_NAME = "syncMysql"; protected MySQLSyncActionExecutor() { super(ACTION_NAME); } @Override public void start(Context context, WorkflowAction action) throws ActionExecutorException { context.setStartData("-", "-", "-"); try { Element actionXml = XmlUtils.parseXml(action.getConf()); Namespace ns = Namespace.getNamespace(SYNC_MYSQL_ACTION_NS); String jdbcUrl = actionXml.getChildTextTrim("jdbcUrl", ns); String sql = actionXml.getChildTextTrim("sql", ns); String sqlOutputFilePath = actionXml.getChildTextTrim( "sql_output_file_path", ns); runMysql(jdbcUrl, sql, sqlOutputFilePath); context.setExecutionData("OK", null); } catch (JDOMException e) { throw convertException(e); } } @Override public void end(Context context, WorkflowAction action) throws ActionExecutorException { if (action.getExternalStatus().equals("OK")) { context.setEndData(WorkflowAction.Status.OK, WorkflowAction.Status.OK.toString()); } else { context.setEndData(WorkflowAction.Status.ERROR, WorkflowAction.Status.ERROR.toString()); } } @Override public void kill(Context context, WorkflowAction action) throws ActionExecutorException { context.setEndData(WorkflowAction.Status.KILLED, "ERROR"); } @Override public void check(Context arg0, WorkflowAction arg1) throws ActionExecutorException { throw new UnsupportedOperationException(); } private static Set<String> COMPLETED_STATUS = new HashSet<String>(); static { COMPLETED_STATUS.add("SUCCEEDED"); COMPLETED_STATUS.add("KILLED"); COMPLETED_STATUS.add("FAILED"); COMPLETED_STATUS.add("FAILED_KILLED"); } @Override public boolean isCompleted(String externalStatus) { return COMPLETED_STATUS.contains(externalStatus); } /* * Execute a sql statement */ private void runMysql(String jdbcUrl, String sql, String sqlOutputFilePath) throws ActionExecutorException { Connection connect = null; Statement statement = null; ResultSet resultSet = null; try { // this will load the MySQL driver, each DB has its own driver Class.forName("com.mysql.jdbc.Driver"); // setup the connection with the DB. connect = DriverManager.getConnection(jdbcUrl); // statements allow to issue SQL queries to the database statement = connect.createStatement(); // resultSet gets the result of the SQL query resultSet = statement.executeQuery(sql); writeResultSet(resultSet, sqlOutputFilePath); } catch (Exception e) { throw convertException(e); } finally { try { if (resultSet != null) resultSet.close(); if (statement != null) statement.close(); if (connect != null) connect.close(); } catch (Exception e) { throw convertException(e); } } } private void writeResultSet(ResultSet resultSet, String sqlOutputFilePath) throws Exception { // resultSet is initialised before the first data set PrintWriter out; if (sqlOutputFilePath != null && sqlOutputFilePath.length() > 0) { out = new PrintWriter(sqlOutputFilePath); } else { out = new PrintWriter(System.out); } // Get the metadata ResultSetMetaData md = resultSet.getMetaData(); // Loop through the result set while (resultSet.next()) { for (int i = 1; i <= md.getColumnCount(); i++) { out.println(md.getColumnName(i) + "=" + resultSet.getString(i)); } } out.close(); } }
The compilation of any action executor class requires at least two Oozie JARs: oozie-core and oozie-client. The JARs for the most recent versions (starting with version 4.1.0) are published in a Maven repository. If for some reason it is not available, developers will need to get the JARs by building the Oozie source code themselves.
A schema file is required for strict enforcement of the XML syntax of the action definition that the user writes in her workflow.xml file. The details of writing a good XML schema can be found online. The following example XSD file is self-explanatory and very similar to any other Oozie action:
<?xml version="1.0" encoding="UTF-8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:sync-mysql="uri:oozie:sync-mysql-action:0.1" elementFormDefault="qualified" targetNamespace="uri:oozie:sync-mysql-action:0.1"> <xs:complexType name="SYNC_MYSQL_TYPE"> <xs:sequence> <xs:element name="jdbcUrl" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="sql" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="sql_output_file_path" type="xs:string" minOccurs="0" maxOccurs="1"/> </xs:sequence> </xs:complexType> <xs:element name="syncMysql" type="sync-mysql:SYNC_MYSQL_TYPE"/> </xs:schema>
Readers might remember that some standard Oozie actions like
“Java Action” and “Shell Action” support an element called <capture-output>
that can be used to
pass the output back to the Oozie context. The example action shown here
writes the results to an output file (in the writeResultSet()
function) and does not
support the <capture-output>
feature. This is the right approach for this particular action given
that the output generated could be large and not suitable for passing
between actions. But developers can implement <capture-output>
in their custom
actions if the use case demands it. The key steps in supporting this
feature are to generate the output in a properties file format and
write it to a file defined by the system property oozie.action.output.properties
. Refer
to the code sample in “Java Action” to see how
to implement it (do remember to include the <capture-output>
element in the schema
just explained if you want your action to support it).
After we develop the code and the XSD file, we have to compile the code and
package them both into a JAR file. The XSD file should be at the root
level of the new JAR file. Since the preceding code uses the MySQL JDBC
APIs to run the query, we would need the mysql-connector
JAR during compilation and
execution. Compilation can use either the IDE’s external classpath or
the build file to access the JAR. The MySQL JAR can even be included
as part of the new JAR we are creating. We can then inject both the
JARs (or one “fat” JAR) when creating the Oozie WAR file (“Install Oozie Server”). The newly created JAR and the
MySQL JAR must be copied into the libext/ directory before executing the
bin/oozie-setup.sh prepare-war
command. However, for testing purposes, we can bypass these standard
steps and directly copy the JARs into $CATALINA_BASE/webapps/oozie/WEB-INF/lib/.
Next, we need to configure oozie-site.xml to enable Oozie to use the
new action type. There are two key properties to configure. The
property oozie.
service.
Action
Service.
executor.
ext.classes
specifies a comma-separated list of new executor classes. For this
example, we should append com.
oreilly.
oozie.
sync.
custom
action.
MySQL
SyncActionExecutor
to that list. The oozie.service.Schema
Service.wf.ext.
schemas
property defines the additional schema files required for new actions. For this example, we add sync_mysql-0.1.xsd. After all these
configuration changes, the Oozie server needs to be restarted as
always:
<property> <name>oozie.service.ActionService.executor.ext.classes</name> <value> org.apache.oozie.action.email.EmailActionExecutor, org.apache.oozie.action.hadoop.HiveActionExecutor, org.apache.oozie.action.hadoop.ShellActionExecutor, org.apache.oozie.action.hadoop.SqoopActionExecutor, org.apache.oozie.action.hadoop.DistcpActionExecutor, com.oreilly.oozie.sync.customaction.MySQLSyncActionExecutor </value> </property> <property> <name>oozie.service.SchemaService.wf.ext.schemas</name> <value> shell-action-0.1.xsd,shell-action-0.2.xsd,shell-action-0.3.xsd, email-action-0.1.xsd, hive-action-0.2.xsd,hive-action-0.3.xsd, hive-action-0.4.xsd,hive-action-0.5.xsd,sqoop-action-0.2.xsd, sqoop-action-0.3.xsd,sqoop-action-0.4.xsd,ssh-action-0.1.xsd, ssh-action-0.2.xsd,distcp-action-0.1.xsd,distcp-action-0.2.xsd, oozie-sla-0.1.xsd,oozie-sla-0.2.xsd, sync_mysql-0.1.xsd </value> </property>
Once the new action is deployed, it’s time to verify it. We will
use the following simple workflow.xml to test it. The goal here is
to execute an SQL query and write the output to a local file on the
Oozie server. The corresponding job.properties file is shown here as well.
This job assumes that the MySQL server is configured and running
properly. Let’s also assume that a database named oozie_book
and a table named income
are already in place with the right
schema and a few records. The name of the DB, login, password, and the
table are all defined in the job.properties file. The output directory
for this Oozie job is configured as $OOZIE_HOME/my_sync_sqloutput.txt. Again,
this is just an example and a synchronous action is not the best
approach for an operation like this:
<workflow-app xmlns="uri:oozie:workflow:0.4" name="sync-mysql-wf"> <start to="sync-mysql-node"/> <action name="sync-mysql-node"> <syncMysql xmlns="uri:oozie:sync-mysql-action:0.1"> <jdbcUrl>${jdbcURL}</jdbcUrl> <sql>${sql}</sql> <sql_output_file_path>${SQL_OUTPUT_PATH}</sql_output_file_path> </syncMysql> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>syncMysql failed, error message [${wf:errorMessage(wf:lastErrorNode())}] </message> </kill> <end name="end"/> </workflow-app> $ cat job.properties nameNode=hdfs://localhost:8020 queueName=default examplesRoot=examples oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/ sync_mysql oozie.use.system.libpath=true jdbcURL=jdbc:mysql://localhost:3306/oozie_book?user=oozie_book_user& password=oozie_book_pw sql=select count(*) from oozie_book.income; SQL_OUTPUT_PATH=my_sync_sqloutput.txt
Let’s quickly recap Oozie’s execution model as described in “Action Execution Model”. For an asynchronous action, Oozie submits a
launcher mapper task to Hadoop along with all the required JARs and
configuration for executing the actual action. In particular, there are
two important classes involved in executing any asynchronous action in
Oozie. The class derived from ActionExecutor
submits the launcher job to
Hadoop. It also includes the required JARs and configuration for the
actual action. ActionExecutor
uses
Hadoop’s distributed cache to pass the JARs and configuration to the
correct compute node. The launcher job ultimately starts a single map task
that is implemented by Oozie to execute any action type. This mapper is
widely known as the LauncherMapper
.
The map task (LauncherMapper
)
invokes the main()
method of the action
execution class that runs on the compute node. This class is known as
LauncherMain
. Different action types
extend this class to create their own main class that executes
action-specific code. For example, the PigMain
class runs the Pig script and MapReduceMain
submits the actual MapReduce job.
In this section, we discuss how to override these Main classes. Users can
write these custom main classes to override the default implementation and
package them with their application. This doesn’t require any modification
on the server side of Oozie. The content in this section will help us
understand the next section, where we describe how to write a new
asynchronous action from the ground up using the ActionExecutor
and the associated main
class.
The out-of-the-box <map-reduce>
action supports only the old
Hadoop API . In other words, it supports mapper and reducer classes
written using the mapred (old) API.
However, there is a way to use the new mapreduce
API for the Hadoop job using some
special configuration as described in “Supporting New API in MapReduce Action”.
There is also another way to execute a MapReduce job that’s written using
the new API by overriding the default Main class of the <map-reduce>
action. In this example, we
will see how to replace the old API-based job submission with the new
API.
As explained earlier, the default <map-reduce>
main
class (MapreduceMain
) supports
executing jobs that are written using org.apache.hadoop.mapred
package, also known
as the old API. In this example, we will create a new main class called
Map
Reduce
NewMain
that submits jobs
written using the new API (org.
apache.hadoop.
mapreduce
). However, we will
still use the default <map-reduce>
action executor. This allows the users to use the existing <map-reduce>
action type with minimal changes.
This implementation is completely in the user domain and this change
doesn’t require any modification or restart of the Oozie server. The
user can simply replace the original Main class with the new one during
job submission.
The implementation of the MapReduceNewMain
class below is based on the
existing MapreduceMain
class. A lot
of boilerplate code is borrowed from the exiting implementation. The
main difference is to use the new mapreduce API for job submission. In
particular, we replace the JobClient
class with the Job
class and the
JobConf
with the Configuration
class. The complete
implementation is shown here:
package com.oreilly.oozie.custommain; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.oozie.action.hadoop.LauncherMain; import java.util.HashSet; import java.util.Map; import java.util.Properties; import java.io.IOException; import java.io.FileOutputStream; import java.io.OutputStream; import java.io.File; public class MapReduceNewMain extends LauncherMain { public static final String OOZIE_MAPREDUCE_UBER_JAR = "oozie.mapreduce.uber.jar"; public static void main(String[] args) throws Exception { run(MapReduceNewMain.class, args); } protected void run(String[] args) throws Exception { System.out.println(); System.out.println("Oozie Map-Reduce action configuration"); System.out.println("======================="); // Loading the action conf prepared by Oozie // This is the same action configuration defined in the workflow // XML file as pat of the <configuration> section. Configuration actionConf = new Configuration(false); actionConf.addResource(new Path("file:///", System .getProperty("oozie.action.conf.xml"))); logMasking("New Map-Reduce job configuration:", new HashSet<String>(), actionConf); System.out.println("Submitting Oozie action Map-Reduce job"); System.out.println(); // submitting job Job job = submitJob(actionConf); System.out.println("After job submission"); // propagating job id back to Oozie String jobId = job.getJobID().toString(); System.out.println("Job ID is :" + jobId); Properties props = new Properties(); props.setProperty("id", jobId); File idFile = new File( System.getProperty("oozie.action.newId.properties")); OutputStream os = new FileOutputStream(idFile); props.store(os, ""); os.close(); System.out.println("======================="); System.out.println(); } protected void addActionConf(Configuration conf, Configuration actionConf) { for (Map.Entry<String, String> entry : actionConf) { conf.set(entry.getKey(), entry.getValue()); } } protected Job submitJob(Configuration actionConf) throws Exception { Configuration conf = new Configuration(); addActionConf(conf, actionConf); // Propagate delegation related props from the launcher job to the MR job. // This is critical on secure Hadoop where delegation tokens // will be made available through these settings. if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { conf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); } Job job = null; try { job = createJob(conf); job.submit(); } catch (Exception ex) { throw ex; } return job; } protected Job createJob(Configuration conf) throws IOException { @SuppressWarnings("deprecation") Job job = new Job(conf); // Set for uber Jar String uberJar = conf.get(OOZIE_MAPREDUCE_UBER_JAR); if (uberJar != null && uberJar.trim().length() > 0) { job.setJar(uberJar); } return job; } }
After we compile the new main class and package it into a JAR file, we are ready to
test this new functionality. The following workflow.xml defines the mapper and reducer
written using the new mapreduce
API.
Readers are advised to pay close attention to the property names
corresponding to the new API in addition to the oozie.launcher.
action.main.class
. This
workflow is based on the wordcount
example that comes with the Hadoop distribution:
<workflow-app xmlns="uri:oozie:workflow:0.5" name="map-reduce-wf"> <start to="mr-node"/> <action name="mr-node"> <map-reduce> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <prepare> <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/ output-data/${outputDir}"/> </prepare> <configuration> <!-- Using a custom MapReduceMain for new API class--> <property> <name>oozie.launcher.action.main.class</name> <value>com.oreilly.oozie.custommain.MapReduceNewMain</value> </property> <property> <name>mapreduce.job.queuename</name> <value>${queueName}</value> </property> <property> <name>mapreduce.job.map.class</name> <value>org.apache.hadoop.examples.WordCount$TokenizerMapper</value> </property> <property> <name>mapreduce.job.reduce.class</name> <value>org.apache.hadoop.examples.WordCount$IntSumReducer</value> </property> <property> <name>mapreduce.job.output.key.class</name> <value>org.apache.hadoop.io.Text</value> </property> <property> <name>mapreduce.job.output.value.class</name> <value>org.apache.hadoop.io.IntWritable</value> </property> <property> <name>mapreduce.input.fileinputformat.inputdir</name> <value>/user/${wf:user()}/${examplesRoot}/input-data/text</value> </property> <property> <name>mapreduce.output.fileoutputformat.outputdir</name> <value>/user/${wf:user()}/${examplesRoot}/output-data/${outputDir} </value> </property> </configuration> </map-reduce> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Map/Reduce failed, error message[${wf:errorMessage (wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app>
Next we need to include the required JARs in the workflow
lib/ directory. We need to copy the
JAR that includes MapReduceNewMain
class. As always, we also need to copy the JAR that includes the mapper
(TokenizerMapper
) and reducer
(IntSumReducer
) classes. Since we
reuse the Apache Hadoop example, we have to copy the
hadoop-mapreduce-examples JAR into the lib/ directory to be able to run this example:
$ hdfs dfs -put my-custom-main $ hdfs dfs -lsr my-custom-main -rw-r--r-- 1 joe jgrp 172 2014-11-15 14:48 my-custom-main/job.properties drwxr-xr-x - joe jgrp 0 2014-11-15 14:48 my-custom-main/lib -rw-r--r-- 1 joe jgrp 270261 2014-11-15 14:48 my-custom-main/lib/hadoop- mapreduce-examples-2.3.0.jar -rw-r--r-- 1 joe jgrp 19949 2014-11-15 14:48 my-custom-main/lib/ oozie-extensions.jar -rw-r--r-- 1 joe jgrp 2452 2014-11-15 14:48 my-custom-main/workflow.xml
We will use the following job.properties file and commands to run the workflow:
$cat job.properties nameNode=hdfs://localhost:8020 #Actually RM endpoint for Hadoop 2.x jobTracker=localhost:8032 queueName=default oozie.wf.application.path=${nameNode}/user/${user.name}/my-custom-main outputDir=map-reduce-new $ oozie job -config job.properties -run job: 0000001-141115153201961-oozie-joe-W
In this section, we discuss the steps required to develop a
completely new asynchronous action. Most of the steps are already
discussed in the previous two sections and we tweak them to develop a new
action type. In “Creating a Custom Synchronous Action”, we described how
to implement a synchronous action. In short, we created a new ActionExcutor
, a new XSD file, and then
ultimately deployed it on the Oozie server. Then, in “Overriding an Asynchronous Action Type”, we explained the steps
required to override the ActionMain
of
an existing asynchronous action. To be more specific, the previous example
only overrides the ActionMain
class that runs on the compute node. It
doesn’t modify the ActionExecutor
that
runs on the Oozie server and is responsible for launching the LauncherMapper
.
The asynchronous action executor usually extends the JavaActionExecutor
that comes with the
oozie-core
package. Most of the
common functionalities are already implemented in the
JavaActionExecutor
. This important class basically packages the required classes (e.g., ActionMain
class) and configuration and kicks off the
launcher job on Hadoop. It also passes the JARs and configuration to the
launcher mapper through the Hadoop distributed cache. The launcher
mapper ultimately invokes the main()
method of the ActionMain
class. The main()
method implements the actual
action such as executing a Pig script for the <pig>
action.
Any action executor that extends JavaActionExecutor
needs to be implemented
under the org.apache.oozie.action.hadoop
package,
though the code can reside outside the Oozie code base. This is
required because of a bug/constraint in the JavaActionExecutor
class where some of the
required methods are declared with “package” scope instead of “protected”. So any derived class needs to be
under the same package name.
Here we implement the same use case of executing a MySQL query that we described in “Creating a Custom Synchronous Action” but using an asynchronous action:
package org.apache.oozie.action.hadoop; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.action.hadoop.JavaActionExecutor; import org.apache.oozie.action.hadoop.LauncherMain; import org.apache.oozie.action.hadoop.LauncherMapper; import org.apache.oozie.action.hadoop.MapReduceMain; import org.jdom.Element; import org.jdom.Namespace; public class MySQLActionExecutor extends JavaActionExecutor { private static final String MYSQL_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.MySqlMain"; public static final String JDBC_URL = "oozie.mysql.jdbc.url"; public static final String SQL_COMMAND = "oozie.mysql.sql.command"; public static final String SQL_OUTPUT_PATH = "oozie.mysql.sql.output.path"; public MySQLActionExecutor() { super("mysql"); } @Override protected List<Class> getLauncherClasses() { List<Class> classes = super.getLauncherClasses(); classes.add(LauncherMain.class); classes.add(MapReduceMain.class); try { classes.add(Class.forName(MYSQL_MAIN_CLASS_NAME)); } catch (ClassNotFoundException e) { throw new RuntimeException("Class not found", e); } return classes; } @Override protected String getLauncherMain(Configuration launcherConf, Element actionXml) { return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, MYSQL_MAIN_CLASS_NAME); } @Override @SuppressWarnings("unchecked") Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) throws ActionExecutorException { super.setupActionConf(actionConf, context, actionXml, appPath); Namespace ns = actionXml.getNamespace(); String sql = actionXml.getChild("sql", ns).getTextTrim(); String jdbcUrl = actionXml.getChild("jdbcUrl", ns).getTextTrim(); String sqlOutPath = actionXml.getChild("sql_output_file_path", ns) .getTextTrim(); actionConf.set(JDBC_URL, jdbcUrl); actionConf.set(SQL_COMMAND, sql); actionConf.set(SQL_OUTPUT_PATH, sqlOutPath); return actionConf; } @Override protected String getDefaultShareLibName(Element actionXml) { return "mysql"; } }
In the preceding class, the constructor just calls its super-class
constructor passing the new action name (i.e., mysql
). This name is what will be used as an
action type when the user writes the workflow definition. The method
getLauncherClasses
returns the list
of classes required to be executed by the launcher mapper. Oozie server
makes these classes available to the launcher mapper through the
distributed cache. It includes the LauncherMain
base class and the actual action
main class (MySqlMain
, described
next). The MapReduceMain
class
is included only to support a few utility methods.
The method getLauncherMain
returns the ActionMain
class (org.
apache.oozie.
action.hadoop.MySqlMain
). The
launcher map code calls the main()
method of this class. The setupActionConf
method adds the configuration
that is passed to the ActionMain
class through a configuration file.
The last method getDefaultShareLibName
returns the name of the
subdirectory under the system sharelib
directory. This subdirectory
hosts most of the JARs required to execute this action. In this example,
the mysql-connector-java-*.jar file
needs to be copied to the mysql/
subdirectory under the sharelib
directory.
This example implements the same MySQL use case described in “Implementing the New ActionMain Class”. The main difference
is that this action is asynchronous while the previous example implemented a
synchronous action. The main
class
in this example reuses most of the MySQL query execution code from
the previous example:
package org.apache.oozie.action.hadoop; import java.io.BufferedWriter; import java.io.File; import java.io.OutputStreamWriter; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.Statement; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.oozie.action.hadoop.LauncherMain; import org.apache.oozie.action.hadoop.LauncherSecurityManager; public class MySqlMain extends LauncherMain { public static void main(String[] args) throws Exception { run(MySqlMain.class, args); } protected void run(String[] args) throws Exception { System.out.println(); System.out.println("Oozie MySql action configuration"); System.out .println("============================================="); // loading action conf prepared by Oozie Configuration actionConf = new Configuration(false); String actionXml = System.getProperty("oozie.action.conf.xml"); if (actionXml == null) { throw new RuntimeException( "Missing Java System Property [oozie.action.conf.xml]"); } if (!new File(actionXml).exists()) { throw new RuntimeException("Action Configuration XML file [" + actionXml + "] does not exist"); } actionConf.addResource(new Path("file:///", actionXml)); String jdbcUrl = actionConf.get(MySQLActionExecutor.JDBC_URL); if (jdbcUrl == null) { throw new RuntimeException("Action Configuration does not have " + MySQLActionExecutor.JDBC_URL + " property"); } String sqlCommand = actionConf.get(MySQLActionExecutor.SQL_COMMAND); if (sqlCommand == null) { throw new RuntimeException("Action Configuration does not have " + MySQLActionExecutor.SQL_COMMAND + " property"); } String sqlOutputPath = actionConf .get(MySQLActionExecutor.SQL_OUTPUT_PATH); if (sqlOutputPath == null) { throw new RuntimeException("Action Configuration does not have " + MySQLActionExecutor.SQL_OUTPUT_PATH + " property"); } System.out.println("Mysql coomands :" + sqlCommand + " with JDBC url :" + jdbcUrl + " sqlOutputPath " + sqlOutputPath); System.out .println("===================================================="); System.out.println(); System.out.println(">>> Connecting to MySQL and executing sql now >>>"); System.out.println(); System.out.flush(); try { runMysql(jdbcUrl, sqlCommand, sqlOutputPath); } catch (SecurityException ex) { if (LauncherSecurityManager.getExitInvoked()) { if (LauncherSecurityManager.getExitCode() != 0) { throw ex; } } } System.out.println(); System.out.println("<<< Invocation of MySql command completed <<<"); System.out.println(); } public void runMysql(String jdbcUrl, String sql, String sqlOutputPath) throws Exception { Connection connect = null; Statement statement = null; ResultSet resultSet = null; try { // this will load the MySQL driver, each DB has its own driver Class.forName("com.mysql.jdbc.Driver"); // setup the connection with the DB. System.out.println("JDBC URL :" + jdbcUrl); connect = DriverManager.getConnection(jdbcUrl); // statements allow to issue SQL queries to the database statement = connect.createStatement(); // resultSet gets the result of the SQL query resultSet = statement.executeQuery(sql); writeResultSet(resultSet, sqlOutputPath); } finally { if (resultSet != null) resultSet.close(); if (statement != null) statement.close(); if (connect != null) connect.close(); } } private void writeResultSet(ResultSet resultSet, String sqlOutputFilePath) throws Exception { Configuration configuration = new Configuration(); Path outPath = new Path(sqlOutputFilePath); BufferedWriter out = null; FileSystem fs = null; try { fs = outPath.getFileSystem(configuration); if (fs.exists(outPath)) { fs.delete(outPath, true); } fs.mkdirs(outPath); Path outFile = new Path(outPath, "sql.out"); System.out.print("Writing output to :" + outFile); out = new BufferedWriter(new OutputStreamWriter(fs.create(outFile), "UTF-8")); // Get the metadata ResultSetMetaData md = resultSet.getMetaData(); int recNo = 1; // Loop through the result set while (resultSet.next()) { out.write("Record_No=" + recNo++ + ","); for (int i = 1; i <= md.getColumnCount(); i++) { out.write(md.getColumnName(i) + "=" + resultSet.getString(i) + ","); } out.write(' '), } } finally { if (out != null) { out.close(); } if (fs != null) { fs.close(); } } } }
The ActionMain
class MySqlMain
is derived from the LauncherMain
class. The run()
method actually
executes the MySQL query. All of the user-defined properties defined in
the workflow.xml are passed to
MySqlMain
class through a file. The
action configuration filename is also passed as a system property
called oozie.action.conf.xml
. After
loading the configuration, the main class executes the query and stores
the result into the HDFS file passed in as a job property.
Alternatively, users can implement and use Oozie’s capture-output
feature explained in “Java Action”.
The ActionMain
class requires a Hadoop token if it wants to
communicate to any service (such as NameNode
, JobTracker
/Resource
Manager) running with Kerberos security enabled. The ActionMain
class will
need to pass the location of the token file with property name
mapreduce.job.credentials.binary
in
job configuration as shown in the example code in “Implementing the New ActionMain Class”. If a user wants
to pass other kinds of credentials such as Hive meta-store or HBase
token, she can follow the instructions provided in “Supporting Custom Credentials”.
The XSD file defined in this section is very similar to the one in “Writing the XML schema”:
<?xml version="1.0" encoding="UTF-8"?> <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:mysql="uri:oozie:mysql-action:0.1" elementFormDefault="qualified" targetNamespace="uri:oozie:mysql-action:0.1"> <xs:complexType name="MYSQL_TYPE"> <xs:sequence> <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> <xs:element name="configuration" type="mysql:CONFIGURATION" minOccurs="0" maxOccurs="1"/> <xs:element name="jdbcUrl" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="sql" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="sql_output_file_path" type="xs:string" minOccurs="1" maxOccurs="1"/> <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/> </xs:sequence> </xs:complexType> <xs:complexType name="CONFIGURATION"> <xs:sequence> <xs:element name="property" minOccurs="1" maxOccurs="unbounded"> <xs:complexType> <xs:sequence> <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/> <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/> <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/> </xs:sequence> </xs:complexType> </xs:element> </xs:sequence> </xs:complexType> <xs:element name="mysql" type="mysql:MYSQL_TYPE"/> </xs:schema>
The deployment of a new asynchronous action is very similar to the synchronous
action described in “Deploying the new action type”. If
written outside of the Oozie’s code base, the first step is to
create a JAR that includes MySql
Action
Executor
, MySqlMain
, and the mysql-0.1.xsd file. Then include this JAR in
the Oozie WAR file. After that, we modify the conf/oozie-site.xml file to include the new
action executor and the new XSD file as shown here:
<property> <name>oozie.service.ActionService.executor.ext.classes</name> <value> org.apache.oozie.action.email.EmailActionExecutor, org.apache.oozie.action.hadoop.HiveActionExecutor, org.apache.oozie.action.hadoop.ShellActionExecutor, org.apache.oozie.action.hadoop.SqoopActionExecutor, org.apache.oozie.action.hadoop.DistcpActionExecutor, org.apache.oozie.action.hadoop.MySQLActionExecutor </value> </property> <property> <name>oozie.service.SchemaService.wf.ext.schemas</name> <value> shell-action-0.1.xsd,shell-action-0.2.xsd,shell-action-0.3.xsd, email-action-0.1.xsd,hive-action-0.2.xsd, hive-action-0.3.xsd, hive-action-0.4.xsd,hive-action-0.5.xsd,sqoop-action-0.2.xsd, sqoop-action-0.3.xsd,sqoop-action-0.4.xsd,ssh-action-0.1.xsd, ssh-action-0.2.xsd,distcp-action-0.1.xsd,distcp-action-0.2.xsd, oozie-sla-0.1.xsd,oozie-sla-0.2.xsd,mysql-0.1.xsd </value> </property>
If the new action is developed under the Oozie code-base, most of these steps are not required. Those files will then be automatically included as part of the standard Oozie build and the deployment process.
In addition, we need to upload the MySQL JAR into the sharelib
directory using the following commands (and then restart the
Oozie server; also note that commands must be executed by the Oozie service user—that is,
(oozie):
$ hdfs dfs -mkdir share/lib/mysql $ hdfs dfs -put ./oozie-server/webapps/oozie/WEB-INF/lib/ mysql-connector-java-5.1.25-bin.jar share/lib/mysql/ $ bin/oozied.sh stop $ bin/oozied.sh start
We follow the same steps explained in “Using the new action type” to build a workflow that uses
the new mysql
action. The relevant
workflow.xml and job.properties files are shown here:
$ cat workflow.xml <workflow-app xmlns="uri:oozie:workflow:0.4" name="my-mysql-wf"> <start to="my-mysql-node"/> <action name="my-mysql-node"> <mysql xmlns="uri:oozie:mysql-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <jdbcUrl>${jdbcURL}</jdbcUrl> <sql>${sql}</sql> <sql_output_file_path>${SQL_OUTPUT_PATH}</sql_output_file_path> </mysql> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>mysql failed, error message[${wf:errorMessage (wf:lastErrorNode())}]</message> </kill> <end name="end"/> </workflow-app> $ cat job.propeeties nameNode=hdfs://localhost:8020 jobTracker=localhost:8032 queueName=default examplesRoot=examples oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/ apps/mysql oozie.use.system.libpath=true jdbcURL=jdbc:mysql://localhost:3306/oozie_book?user=oozie_book_user& password=oozie_book_pw sql=select * from oozie_book.custom_actions; SQL_OUTPUT_PATH=my_sqloutput
It’s fairly common to see failures with the following exception in the launcher mapper log:
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.MySqlMain], main() threw exception, com.mysql.jdbc.Driver java.lang.ClassNotFoundException: com.mysql.jdbc.Driver at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
This is most likely due to missing JARs.
In this example, the mysql-connecor JAR was not copied into the
mysql/ subdirectory under the
sharelib
on HDFS. Uploading the correct version of JARs to the
correct location will solve this problem.
In this chapter, we focused on Oozie developers and showed how to extend current Oozie functionalities. More specifically, we elaborated on the steps required to write a custom EL function, a new synchronous action type, and a new asynchronous action type. In the next chapter, we will cover the operational aspects of Oozie and other sundry topics concerning managing and debugging Oozie.