Docs
uhadoop
Developer Guide
Airflow Development Guide

Airflow Development Guide

Airflow is a workflow distribution management system that manages task flow through Directed Acyclic Graphs (DAGs). It can set task dependencies and time scheduling. Two services are started on the master1 by the Airflow service, AirflowScheduler and AirflowWebserver. AirflowScheduler is used to manage all DAGs, Tasks, and the scheduling between Tasks. AirflowWebserver is the web service of Airflow, used for visual management.

If you check Airflow when creating a cluster, Airflow will be installed on the uhadoop-******-master1 node. Accessing the Airflow web service is done via the external network IP of the master1 node:8999 (you need to open the external network firewall port 8999 bound to the master1 node)

1. Airflow Examples

Please refer to the official website introduction for details.

Please place the following code in the /home/hadoop/airflow/dags directory of uhadoop-******-master2

If this directory does not exist, you need to manually create it and change the user group to Hadoop.

cat tutorial.py
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
 
 
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
 
dag = DAG('tutorial', default_args=default_args)
 
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)
 
t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)
 
templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""
 
t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)
 
t2.set_upstream(t1)
t3.set_upstream(t1)

After the page is refreshed, the tutorial DAG will be available.

For detailed usage, please refer to Airflow official website