1. 概述
Airflow是airbnb開源的基于DAG(有向無環(huán)圖)的用Python開發(fā)的任務管理系統(tǒng)倘屹。最簡單的理解就是一個高級版的crontab圆雁,它解決了crontab無法解決的任務依賴問題。
項目于2014年啟動火脉,于2015年春季開源,于2016年加入Apache軟件基金會的孵化計劃。
Airflow提供了豐富的命令行工具用于系統(tǒng)管控斑响,而其web管理界面同樣也可以方便的管控調度任務,并且對任務運行狀態(tài)進行實時監(jiān)控钳榨,方便了系統(tǒng)的運維和管理恋捆。
1.1. 應用場景
- ETL場景,可以管理數(shù)據(jù)Extract重绷、Transform沸停、Load等操作,以及各個Task之間的依賴昭卓,且便于完成Debug愤钾、監(jiān)控和Backfill操作
- 系統(tǒng)運維工作,可以管理服務器端Crontab作業(yè)候醒,以及作業(yè)之間的依賴能颁,從而簡化運維工作的復雜度,提高運維效率
- 大數(shù)據(jù)平臺的任務流管理倒淫,包含數(shù)據(jù)分析伙菊、數(shù)據(jù)交換、數(shù)據(jù)報表生成與發(fā)送敌土,等
1.2. 優(yōu)勢
- 自帶web管理界面镜硕,易上手
- 業(yè)務代碼和調度代碼完全解耦
- 通過python代碼定義任務,并支持各種Operate操作器返干,靈活性大兴枯,能滿足用戶的各種需求
- python開源項目,支持擴展operate等插件矩欠,便于二次開發(fā)
1.3. 劣勢
- 對分布式部署并不友好财剖,例如node之間的dag和task同步問題
2. Concepts
2.1. 術語說明
2.1.1. airflow.DAG悠夯,實例化之后,稱之為Dag Run
或Dag Instance
DAG(Directed Acyclic Graph)是有向無環(huán)圖躺坟,也稱為有向無循環(huán)圖沦补。在Airflow中,一個DAG定義了一個完整的作業(yè)咪橙。同一個DAG中的所有Task擁有相同的調度時間策彤。
代碼樣例
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
'owner' : 'airflow' ,
'depends_on_past' : False ,
'start_date' : datetime ( 2015 , 6 , 1 ),
'email' : [ 'airflow@example.com' ],
'email_on_failure' : False ,
'email_on_retry' : False ,
'retries' : 1 ,
'retry_delay' : timedelta( minutes = 5 ),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('test_dag', default_args=default_args)
參數(shù)說明
- dag_id: 唯一識別DAG,方便日后管理
- default_args: 默認參數(shù)匣摘,如果當前DAG實例的作業(yè)沒有配置相應參數(shù)店诗,則采用DAG實例的default_args中的相應參數(shù)
- schedule_interval: 配置DAG的執(zhí)行周期,可采用crontab語法
- start_date音榜,作業(yè)開始調度時間
- end_date庞瘸,作業(yè)結束調度時間
- concurrency, the number of task instances allowed to run concurrently
- max_active_runs, maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won't create new active DAG runs
- on_failure_callback, A function to be called when a DagRun of this dag fails.
- on_success_callback, A function to be called when a DagRun of this dag succeeds.
- backfill,填充任務赠叼,手動重跑過去失敗的任務(指定日期)
- catchup擦囊,如果歷史任務出錯,調度器嘗試按調度順序重跑歷史任務(而不是按照當前時間執(zhí)行當前任務)嘴办∷渤。可以在dag中設置dag.catchup = False或者參數(shù)文件中設置catchup_by_default = False來禁用這個功能
2.1.2. airflow.operators,實例化之后涧郊,稱之為Task
操作器贯被,定義任務以哪種方式執(zhí)行。airflow有多種operator,如BashOperator妆艘、DummyOperator彤灶、MySqlOperator、HiveOperator以及社區(qū)貢獻的operator等批旺,其中BaseOperator是所有operator的基礎operator幌陕。
Operator Name | Description |
---|---|
BaseOperator | 基礎operator,設置baseoperator會影響所有的operator |
BashOperator | executes a bash command |
DummyOperator | 空操作 |
PythonOperator | calls an arbitrary Python function |
EmailOperator | sends an email |
HTTPOperator | sends an HTTP request |
SqlOperator | executes a SQL command |
DockerOperator | execute a command inside a docker container |
Sensor | waits for a certain time, file, database row, S3 key, etc… |
代碼樣例
from airflow.operators.bash_operator import BashOperator
t1 = BashOperator (
task_id = 'print_date' ,
bash_command = 'date' ,
dag = dag )
t2 = BashOperator (
task_id = 'sleep' ,
bash_command = 'sleep 5' ,
retries = 3 ,
dag = dag )
t1 >> t2
# t2.set_upstream(t1)
# t1.set_downstream(t2)
Task為DAG中具體的作業(yè)任務汽煮,依賴于DAG搏熄,也就是必須存在于某個DAG中。
Task在DAG中可以配置依賴關系(當然也可以配置跨DAG依賴暇赤,但是并不推薦心例。跨DAG依賴會導致DAG圖的直觀性降低翎卓,并給依賴管理帶來麻煩)契邀。
參數(shù)說明:
- dag: 傳遞一個DAG實例摆寄,以使當前作業(yè)屬于相應DAG
- task_id: 給任務一個標識符(名字)失暴,方便日后管理
- owner: 任務的擁有者坯门,方便日后管理
- start_date: 任務的開始時間,即任務將在這個時間點之后開始調度
- retries: 失敗后重試次數(shù)
- trigger_rule
all_success: (default) all parents have succeeded 父task全success
all_failed: all parents are in a failed or upstream_failed state 父task全failed或者upstream_failed狀態(tài)
all_done: all parents are done with their execution 父task全執(zhí)行過逗扒,不管success or failed
one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done 當父task中有一個是failed狀態(tài)時執(zhí)行古戴,不必等到所有的父task都執(zhí)行
one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done 當父task中有一個是success狀態(tài)時執(zhí)行,不必等到所有的父task都執(zhí)行
dummy: dependencies are just for show, trigger at will 無條件執(zhí)行
2.1.3. airflow.executor矩肩,即調度方式
在配置文件config/airflow.cfg
中现恼,修改executor
變量。
- SequentialExecutor黍檩,順序調度執(zhí)行叉袍,默認執(zhí)行器,通常只用于測試
- LocalExecutor刽酱,多進程本地調度執(zhí)行
- CeleryExecutor喳逛,分布式調度執(zhí)行,生產常用
- DaskExecutor棵里,在Dask分布式群集中運行Airflow任務润文,主要用于數(shù)據(jù)分析
2.1.4. 其他
- JOB,最上層的工作殿怜,分為 SchedulerJob典蝌、BackfillJob 和 LocalTaskJob。SchedulerJob 由 Scheduler 創(chuàng)建头谜,BackfillJob 由 Backfill 創(chuàng)建骏掀,LocalTaskJob 由前面兩種 Job 創(chuàng)建
在早期版本 Airflow 中,DAG 執(zhí)行主要有兩種完全獨立的執(zhí)行途徑:SchedulerJob 和 BackfillJob柱告。在一次較大的重構中增加了 DagRun 方式砖织,以跟蹤 DAG 的執(zhí)行狀態(tài)
2.2. 服務構成
Webserver
Airflow提供了一個可視化的Web界面。啟動 WebServer 后末荐,就可以在 Web 界面上查看定義好的 DAG 并監(jiān)控及改變運行狀況侧纯。也可以在 Web 界面中對一些變量進行配置。
Scheduler
整個 Airflow 的調度由 Scheduler 負責發(fā)起甲脏,每隔一段時間 Scheduler 就會檢查所有定義完成的 DAG 和定義在其中的作業(yè)眶熬,如果有符合運行條件的作業(yè),Scheduler 就會發(fā)起相應的作業(yè)任務以供 Worker 接收块请。
Worker
一般來說我們用 Celery Worker 來執(zhí)行具體的作業(yè)娜氏。Worker 可以部署在多臺機器上,并可以分別設置接收的隊列墩新。當接收的隊列中有作業(yè)任務時贸弥,Worker 就會接收這個作業(yè)任務,并開始執(zhí)行海渊。Airflow 會自動在每個部署 Worker 的機器上同時部署一個 Serve Logs 服務绵疲,這樣我們就可以在 Web 界面上方便的瀏覽分散在不同機器上的作業(yè)日志了哲鸳。
Flower
Flower 提供了一個可視化界面以監(jiān)控所有 Celery Worker 的運行狀況。這個服務并不是必要的盔憨。
3. 單機部署與測試
3.1. Install
pip install apache-airflow
如果遇到ImportError: cannot import name 'resolve_types'
徙菠,解決辦法:
pip3 install cattrs==1.0.0
參考:https://github.com/apache/airflow/issues/11965
3.2. 啟動
# 在airflow目錄初始化數(shù)據(jù)庫和airflow配置
airflow db init
# 啟動 airflow web
airflow webserver
# 開始調度
airflow scheduler
- 其他常用命令
# 測試任務,格式:airflow test dag_id task_id execution_time
airflow test test_task test1 2019-09-10
# 查看生效的 DAGs
airflow list_dags -sd $AIRFLOW_HOME/dags
# 開始運行任務(同 web 界面點 trigger 按鈕)
airflow trigger_dag test_task
# 暫停任務
airflow pause dag_id
# 取消暫停郁岩,等同于在 web 管理界面打開 off 按鈕
airflow unpause dag_id
# 查看 task 列表
airflow list_tasks dag_id 查看task列表
# 清空任務狀態(tài)
airflow clear dag_id
# 運行task
airflow run dag_id task_id execution_date
3.3. 測試
- 通過webserver婿奔,可以查看樣例dag
tutorial
的源碼
# [START tutorial]
# [START import_module]
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# [END import_module]
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
# [END default_args]
# [START instantiate_dag]
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
# [END instantiate_dag]
# t1, t2 and t3 are examples of tasks created by instantiating operators
# [START basic_task]
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
dag=dag,
)
# [END basic_task]
# [START documentation]
dag.doc_md = __doc__
t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
# [END documentation]
# [START jinja_template]
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)
# [END jinja_template]
t1 >> [t2, t3]
# [END tutorial]
通過Web頁面啟動
tutorial
dag在
~/airflow/logs
可以查看對應dag的log通過webserver查看task log,和task執(zhí)行情況
4. 分布式部署與測試
采用docker部署airflow分布式調度系統(tǒng)问慎,編排方式可以是k8s萍摊、swarm,這里采用docker-compose簡單實現(xiàn):
https://github.com/puckel/docker-airflow
$ docker-compose -f docker-compose-CeleryExecutor.yml up -d
$ docker-compose -f docker-compose-CeleryExecutor.yml scale worker=2