Oozie Development Guide
Oozie defines workflows, which are a fundamental organization of multiple Hadoop Jobs in a certain order and a whole that runs along a predetermined path. By starting a workflow, multiple Hadoop Jobs in the workflow will be executed until they are completed. This is the lifecycle of a workflow.
Oozie introduced the concept of a Coordinator, which can run each Job of the workflow as an Action, which is equivalent to an execution node in the workflow. This way, multiple workflow Jobs can be organized into a Coordinator Job, and the triggering time and frequency can be set, as well as the dataset, concurrency, etc. A Coordinator Job contains the semantics of setting the execution cycle and frequency outside the Job. It is similar to having a coordinator outside the workflow to manage the running of these workflow Jobs.
If Hue is installed on the cluster, the workflow can also be configured through page operation, for the specific operation steps, click here to view. The following introduces the method of configuring the workflow through the background:
1. Run the Coordinator Job
Let’s first look at a simple example included in the official distribution package oozie/examples/src/main/apps/cron. It can implement timed scheduling of a workflow Job. A blank workflow Job is given in this example, also to demonstrate that the Coordinator system can be used for scheduling.
There are 3 configuration files in this example. After modification, they are as follows:
- job.properties configuration
nameNode=hdfs://uhadoop-XXXXXX-master1:8020
jobTracker=uhadoop-XXXXXX-master1:23140
queueName=default
examplesRoot=examples
oozie.coord.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/cron
start=2016-12-01T19:00Z
end=2016-12-31T01:00Z
workflowAppUri=${nameNode}/user/${user.name}/${examplesRoot}/apps/cron
The configuration of the Hadoop cluster was modified, as well as the scheduling start and end time range.
- wordflow.xml
<workflow-app xmlns="uri:oozie:workflow:0.5" name="one-op-wf">
<start to="action1"/>
<action name="action1">
<fs/>
<ok to="end"/>
<error to="end"/>
</action>
<end name="end"/>
</workflow-app>
This is an empty Job, no modifications were made.
- coordinator.xml configuration
<coordinator-app name="cron-coord" frequency="${coord:minutes(2)}" start="${start}" end="${end}" timezone="UTC" xmlns="uri:oozie:coordinator:0.2">
<action>
<workflow>
<app-path>${workflowAppUri}</app-path>
<configuration>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>queueName</name>
<value>${queueName}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
The scheduled frequency was changed to 2 minutes. Then, upload these 3 files to HDFS.
Starting a Coordinator Job is similar to starting an Oozie workflow Job. Simply execute the following command:
bin/oozie job -oozie http://uhadoop-XXXXXX-master2:11000/oozie -config /home/hadoop/oozie/examples/src/main/apps/cron/job.properties -run
Running the above command will return the ID of this Job on the console. We can also view it through the Oozie Web console.
2. Coordinator Action
A Coordinator Job will create and execute Coordinator Actions. Usually, a Coordinator Action is a workflow Job, and this workflow Job will generate a dataset instance and process this dataset. Once a Coordinator Action is created, it will wait for the completion of all input events that meet the execution conditions and then execute, or it will time out.
Each Coordinator Job has a driving event, which determines the initialization of the Coordinator Actions it contains. For synchronous Coordinator Jobs, the trigger execution frequency is a driving event. Similarly, the basic unit that makes up a Coordinator Job is a Coordinator Action. Unlike the Oozie workflow Job, which only has OK and Error execution results, a Coordinator Action status set, as shown below:
WAITING
READY
SUBMITTED
TIMEDOUT
RUNNING
KILLED
SUCCEEDED
FAILED
3. Coordinator Application
When certain conditions are met, the Coordinator Application will trigger the Oozie workflow. Among them, the triggering conditions can be a time frequency, whether a dataset instance is available, or possibly other external events. A Coordinator Job is a running instance of a Coordinator application, which runs on the Coordinator engine provided by Oozie, and this instance starts at a specified time and runs until it ends. A Coordinator Job has the following statuses:
PREP
RUNNING
RUNNINGWITHERROR
PREPSUSPENDED
SUSPENDED
SUSPENDEDWITHERROR
PREPPAUSED
PAUSED
PAUSEDWITHERROR
SUCCEEDED
DONEWITHERROR
KILLED
FAILED
The status of the Coordinator Job is much more complex than that of a basic Oozie workflow Job. Because the basic execution unit of the Coordinator Job may be a basic Oozie Job, and some scheduling information is added, it is necessary to increase additional states to describe it.
4. Definition of Coordinator Application
The syntax format of a synchronous Coordinator Appliction definition is as follows:
<coordinator-app name="[NAME]" frequency="[FREQUENCY]" start="[DATETIME]" end="[DATETIME]" timezone="[TIMEZONE]" xmlns="uri:oozie:coordinator:0.1">
<controls>
<timeout>[TIME_PERIOD]</timeout>
<concurrency>[CONCURRENCY]</concurrency>
<execution>[EXECUTION_STRATEGY]</execution>
</controls>
<datasets>
<include>[SHARED_DATASETS]</include>
...
<!-- Synchronous datasets -->
<dataset name="[NAME]" frequency="[FREQUENCY]" initial-instance="[DATETIME]" timezone="[TIMEZONE]">
<uri-template>[URI_TEMPLATE]</uri-template>
</dataset>
...
</datasets>
<input-events>
<data-in name="[NAME]" dataset="[DATASET]">
<instance>[INSTANCE]</instance>
...
</data-in>
...
<data-in name="[NAME]" dataset="[DATASET]">
<start-instance>[INSTANCE]</start-instance>
<end-instance>[INSTANCE]</end-instance>
</data-in>
...
</input-events>
<output-events>
<data-out name="[NAME]" dataset="[DATASET]">
<instance>[INSTANCE]</instance>
</data-out>
...
</output-events>
<action>
<workflow>
<app-path>[WF-APPLICATION-PATH]</app-path>
<configuration>
<property>
<name>[PROPERTY-NAME]</name>
<value>[PROPERTY-VALUE]</value>
</property>
...
</configuration>
</workflow>
</action>
</coordinator-app>
Based on the above definition syntax format, we respectively explain the meaning of the corresponding elements, as follows:
- control elements
Element Name | Description |
---|---|
timeout | Timeout time, in minutes. When a Coordinator Job is started, multiple Coordinator actions will be initialized, timeout is used to restrict this initialization process. The default value is -1, which means never timeout, if 0 it always times out. |
concurrency | The number of parallel executions, indicating the concurrent execution of multiple Coordinator Jobs, the default value is 1. |
execution | Configure the strategy of concurrent execution of multiple coordinator Jobs: default is FIFO. There are two other types: LIFO (the latest ones are executed first), LAST_ONLY (only the latest Coordinator Job is executed, all others are discarded). |
throttle | The maximum number of Coordinator actions allowed to be in the WAITING state when a Coordinator Job is initialized. |
- dataset elements
In the Coordinator Job, there is a concept of Dataset, which can provide computing data for actual computation, mainly referring to data directories or files on HDFS, and can configure the frequency of dataset generation (Frequency), URI template, time and other information. The syntax format of the dataset is as follows:
<dataset name="[NAME]" frequency="[FREQUENCY]" initial-instance="[DATETIME]" timezone="[TIMEZONE]">
<uri-template>[URI TEMPLATE]</uri-template>
<done-flag>[FILE NAME]</done-flag>
</dataset>
An example is as follows:
<dataset name="stats_hive_table" frequency="${coord:days(1)}" initial-instance="2016-12-25T00:00Z" timezone="America/Los_Angeles">
<uri-template>
hdfs://m1:9000/hive/warehouse/user_events/${YEAR}${MONTH}/${DAY}/data
</uri-template>
<done-flag>donefile.flag</done-flag>
</dataset>
The above example will generate a user event table every day, which can be queried and analyzed by Hive. The location of this dataset is specified here, and subsequent calculations will use this part of the data. The uri-template specifies a matching template, and paths that meet this template will be used as the basis for computation. There is another way to define a dataset collection, to combine multiple datasets into a group to define, the syntax format is as follows:
<datasets>
<include>[SHARED_DATASETS]</include>
...
<dataset name="[NAME]" frequency="[FREQUENCY]" initial-instance="[DATETIME]" timezone="[TIMEZONE]">
<uri-template>[URI TEMPLATE]</uri-template>
</dataset>
...
</datasets>
- input-events and output-events elements
The input event of a Coordinator Application specifies the input conditions that must be met to execute a Coordinator Action. In the current version of Oozie, only dataset instances are supported.
A Coordinator Action may generate one or more dataset instances. In the current version of Oozie, the output event only supports the output of dataset instances.
Constant expression | Description |
---|---|
${coord:minutes(int n)} | Returns the datetime:and starts from one, executes every n minutes |
${coord:hours(int n)} | Returns the datetime: and starts from one, performs every n * 60 minutes |
${coord:days(int n)} | Returns the datetime: and starts from one, performs every n * 24 * 60 minutes |
${coord:months(int n)} | Returns the datetime: and starts from one, performs every n * M * 24 * 60 minutes (M is the number of days in a month) |
${coord:endOfDays(int n)} | Returns the datetime: starts from the latest time of the day (i.e., the next day), carries out every n * 24 * 60 minutes |
${coord:endOfMonths(1)} | Returns the datetime: starts from the last time of this month (that is, the beginning of next month), performs every n * 24 * 60 minutes |
${coord:current(int n)} | Returns the datetime: start calculating from the time a Coordinator Action (Action) is created, the execution time of the nth dataset Instance |
${coord:dataIn(String name)} | In the input event (input-events), parse all URIs contained in the dataset instance |
${coord:dataOut(String name)} | In the output event (output-events), parse all URIs contained in the dataset instance. |
${coord:offset(int n, String timeUnit)} | Indicates the time offset, if the creation time of a Coordinator Action is T, n is a positive number indicates offset to the moment after T, n is a negative number indicates offset to the moment before T, timeUnit indicates the time unit (options are MINUTE, HOUR, DAY, MONTH, YEAR) |
${coord:hoursInDay(int n)} | The number of hours on the specified n-th day, n > 0 indicates the number of hours on the n-th day counting backwards, n = 0 indicates the number of hours today, n < 0 indicates the number of hours on the n-th day counting forward |
${coord:daysInMonth(int n)} | The number of days in the specified n-th month, n > 0 indicates the number of days in the n-th month counting backwards, n = 0 indicates the number of days in this month, n < 0 indicates the number of days in the n-th month counting forward |
${coord:tzOffset()} | The number of minutes difference between the time zone corresponding to the dataset and the time zone of the Coordinator Job |
${coord:latest(int n)} | The nth dataset instance that is currently available |
${coord:future(int n, int limit)} | Dataset instances after the current time, n >= 0, when n = 0 it means the dataset instance is immediately available, limit represents the number of dataset instances |
${coord:nominalTime()} | The nominal time is equal to the start time of the Coordinator Job, plus the datetime obtained by the frequency of multiple Coordinator Jobs. For example: start = “2009-01-01T24:00Z”, end = “2009-12-31T24:00Z”, frequency = ”${coord:days(1)}”, frequency = ”${coord:days(1)}, then the nominal time is: 2009-01-02T00:00Z, 2009-01-03T00:00Z, 2009-01-04T00:00Z, …, 2010-01-01T00:00Z |
${coord:actualTime()} | The actual creation time of the Coordinator Action. For example: start = “2011-05-01T24:00Z”, end = “2011-12-31T24:00Z”, frequency = ”${coord:days(1)}”, then the actual time is: 2011-05-01, 2011-05-02, 2011-05-03, …, 2011-12-31 |
${coord:user()} | The user name who started the current Coordinator Job |
${coord:dateOffset(String baseDate, int instance, String timeUnit)} | The formula for calculating the new datetime: newDate = baseDate + instance * timeUnit, such as: baseDate = ’2009-01-01T00:00Z’, instance = ’2’, timeUnit = ’MONTH’, then the calculated new datetime is ’2009-03-01T00:00Z’ |
${coord:formatTime(String timeStamp, String format)} | Formats a time string, format specifies the pattern. |