Operators 簡介
Operators 允許生成特定類型的任務(wù)片吊,這些任務(wù)在實例化時成為 DAG 中的任務(wù)節(jié)點。所有的 Operator 均派生自 BaseOperator悉稠,并以這種方式繼承許多屬性和方法。
Operator 主要有三種類型:
- 執(zhí)行一項操作或在遠程機器上執(zhí)行一項操作艘包。
- 將數(shù)據(jù)從一個系統(tǒng)移動到另一個系統(tǒng)
- 類似傳感器的猛,是一種特定類型 Operator,它將持續(xù)運行想虎,直到滿足某種條件卦尊。例如在 HDFS 或 S3 中等待特定文件到達,在 Hive 中出現(xiàn)特定的分區(qū)或一天中的特定時間舌厨,繼承自 BaseSensorOperator岂却。
BaseOperator 簡介
所有的 Operator 都是從 BaseOperator 派生而來,并通過繼承獲得更多功能。這也是引擎的核心躏哩,所以有必要花些時間來理解 BaseOperator 的參數(shù)署浩,以了解 Operator 基本特性。
先看一下構(gòu)造函數(shù)的原型:
class airflow.models.BaseOperator(task_id, owner='Airflow', email=None, email_on_retry=True, email_on_failure=True, retries=0, retry_delay=datetime.timedelta(0, 300), retry_exponential_backoff=False, max_retry_delay=None, start_date=None, end_date=None, schedule_interval=None, depends_on_past=False, wait_for_downstream=False, dag=None, params=None, default_args=None, adhoc=False, priority_weight=1, weight_rule=u'downstream', queue='default', pool=None, sla=None, execution_timeout=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, trigger_rule=u'all_success', resources=None, run_as_user=None, task_concurrency=None, executor_config=None, inlets=None, outlets=None, *args, **kwargs)
需要注意的是參數(shù) start_date扫尺。start_date 決定了任務(wù)第一次運行的時間筋栋,最好的實踐是設(shè)置 start_date 在 schedule_interval 的附近。比如每天跑的任務(wù)開始日期設(shè)為'2018-09-21 00:00:00'器联,每小時跑的任務(wù)設(shè)置為 '2018-09-21 05:00:00'二汛,airflow 將 start_date 加上 schedule_interval 作為執(zhí)行日期。需要注意的是任務(wù)的依賴需要及時排除拨拓,例如任務(wù) A 依賴任務(wù) B肴颊,但由于兩者 start_date 不同導致執(zhí)行日期不同,那么任務(wù) A 的依賴永遠不會被滿足渣磷。如果你需要執(zhí)行一個日常任務(wù)婿着,比如每天下午 2 點開始執(zhí)行,你可以在 DAG中使用 cron 表達式
schedule_interval="0 14 * * *"
BashOperator
官方提供的 DAG 示例-tutorial 就是一個典型的 BashOperator醋界,調(diào)用 bash 命令或腳本竟宋,傳遞模板參數(shù)就可以參考 tutorial
"""
### Tutorial Documentation
Documentation that goes along with the Airflow tutorial located
[here](http://pythonhosted.org/airflow/tutorial.html)
"""
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date', #這里也可以是一個 bash 腳本文件
bash_command='date',
dag=dag)
t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
dag.doc_md = __doc__
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
這里 t1 和 t2 都很容易理解,直接調(diào)用的是 bash 命令形纺,其實也可以傳入帶路徑的 bash 腳本, t3 使用了 Jinja 模板丘侠,"{% %}" 內(nèi)部是 for 標簽,用于循環(huán)操作逐样。"{{ }}" 內(nèi)部是變量蜗字,其中 ds 是執(zhí)行日期,是 airflow 的宏變量脂新,params.my_param 是自定義變量挪捕。根據(jù)官方提供的模板,稍加修改即可滿足我們的日常工作所需争便。
PythonOperator
PythonOperator 可以調(diào)用 Python 函數(shù)级零,由于 Python 基本可以調(diào)用任何類型的任務(wù),如果實在找不到合適的 Operator滞乙,將任務(wù)轉(zhuǎn)為 Python 函數(shù)奏纪,再使用 PythonOperator 也是一種選擇。下面是官方文檔給出的 PythonOperator 使用的樣例斩启。
from __future__ import print_function
from builtins import range
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
import time
from pprint import pprint
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_python_operator', default_args=args,
schedule_interval=None)
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag)
# Generate 10 sleeping tasks, sleeping from 0 to 9 seconds respectively
for i in range(10):
task = PythonOperator(
task_id='sleep_for_' + str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
dag=dag)
task.set_upstream(run_this)
通過以上代碼我們可以看到亥贸,任務(wù) task 及依賴關(guān)系都是可以動態(tài)生成的,這在實際使用中會減少代碼編寫數(shù)量浇垦,邏輯也非常清晰炕置,非常方便使用。PythonOperator 與 BashOperator 基本類似,不同的是 python_callable 傳入的是 Python 函數(shù)朴摊,而后者傳入的是 bash 指令或腳本默垄。通過 op_kwargs 可以傳入任意多個參數(shù)。
HiveOperator
hive 是基于 Hadoop 的一個數(shù)據(jù)倉庫工具甚纲,可以將結(jié)構(gòu)化的數(shù)據(jù)文件映射為一張數(shù)據(jù)庫表口锭,并提供簡單的 sql 查詢功能,可以將 sql 語句轉(zhuǎn)換為 MapReduce 任
務(wù)進行運行介杆。在 airflow 中調(diào)用 hive 任務(wù)鹃操,首先需要安裝依賴
pip install apache-airflow[hive]
下面是使用示例:
t1 = HiveOperator(
task_id='simple_query',
hql='select * from cities',
dag=dag)
常見的 Operator 還有 DockerOperator,OracleOperator春哨,MysqlOperator荆隘,DummyOperator,SimpleHttpOperator 等使用方法類似赴背,不再一一介紹椰拒。
如何自定義Operator
如果官方的 Operator 仍不滿足需求, 那么我們就自己開發(fā)一個 Operator。 開發(fā) Operator 比較簡單凰荚,繼承 BaseOperator 并實現(xiàn) execute 方法即可:
from airflow.models import BaseOperator
class MyOperator(BaseOperator):
def __init__(*args, **kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
def execute(self, context):
###do something here
除了 execute 方法外燃观,還可以實現(xiàn)以下方法:
on_kill: 在 task 被 kill 的時候執(zhí)行。
airflow 是支持Jinjia模板語言的便瑟,那么如何在自定義的 Operator 中加入Jinjia模板語言的支持呢缆毁?
其實非常簡單,只需要在自定義的Operator類中加入屬性
template_fields = (attributes_to_be_rendered_with_jinja)
即可到涂,例如官方的 bash_operator中是這樣的:
template_fields = ('bash_command', 'env')
這樣积锅,在任務(wù)執(zhí)行之前,airflow 會自動渲染 bash_command 或 env 中的屬性再執(zhí)行任務(wù)养盗。