Python - Airflow任務調度系統(tǒng)初識

1. 概述

Apache Airflow

Airflow是airbnb開源的基于DAG(有向無環(huán)圖)的用Python開發(fā)的任務管理系統(tǒng)倘屹。最簡單的理解就是一個高級版的crontab圆雁,它解決了crontab無法解決的任務依賴問題。

項目于2014年啟動火脉,于2015年春季開源,于2016年加入Apache軟件基金會的孵化計劃。

Airflow提供了豐富的命令行工具用于系統(tǒng)管控斑响,而其web管理界面同樣也可以方便的管控調度任務,并且對任務運行狀態(tài)進行實時監(jiān)控钳榨,方便了系統(tǒng)的運維和管理恋捆。

有向無環(huán)圖

1.1. 應用場景

  • ETL場景,可以管理數(shù)據(jù)Extract重绷、Transform沸停、Load等操作,以及各個Task之間的依賴昭卓,且便于完成Debug愤钾、監(jiān)控和Backfill操作
  • 系統(tǒng)運維工作,可以管理服務器端Crontab作業(yè)候醒,以及作業(yè)之間的依賴能颁,從而簡化運維工作的復雜度,提高運維效率
  • 大數(shù)據(jù)平臺的任務流管理倒淫,包含數(shù)據(jù)分析伙菊、數(shù)據(jù)交換、數(shù)據(jù)報表生成與發(fā)送敌土,等

1.2. 優(yōu)勢

  • 自帶web管理界面镜硕,易上手
  • 業(yè)務代碼和調度代碼完全解耦
  • 通過python代碼定義任務,并支持各種Operate操作器返干,靈活性大兴枯,能滿足用戶的各種需求
  • python開源項目,支持擴展operate等插件矩欠,便于二次開發(fā)

1.3. 劣勢

  • 對分布式部署并不友好财剖,例如node之間的dag和task同步問題

2. Concepts

2.1. 術語說明

2.1.1. airflow.DAG悠夯,實例化之后,稱之為Dag RunDag Instance

DAG(Directed Acyclic Graph)是有向無環(huán)圖躺坟,也稱為有向無循環(huán)圖沦补。在Airflow中,一個DAG定義了一個完整的作業(yè)咪橙。同一個DAG中的所有Task擁有相同的調度時間策彤。

代碼樣例
from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner' : 'airflow' ,
    'depends_on_past' : False ,
    'start_date' : datetime ( 2015 , 6 , 1 ),
    'email' : [ 'airflow@example.com' ],
    'email_on_failure' : False ,
    'email_on_retry' : False ,
    'retries' : 1 ,
    'retry_delay' : timedelta( minutes = 5 ),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
dag = DAG('test_dag', default_args=default_args)
參數(shù)說明
  • dag_id: 唯一識別DAG,方便日后管理
  • default_args: 默認參數(shù)匣摘,如果當前DAG實例的作業(yè)沒有配置相應參數(shù)店诗,則采用DAG實例的default_args中的相應參數(shù)
  • schedule_interval: 配置DAG的執(zhí)行周期,可采用crontab語法
  • start_date音榜,作業(yè)開始調度時間
  • end_date庞瘸,作業(yè)結束調度時間
  • concurrency, the number of task instances allowed to run concurrently
  • max_active_runs, maximum number of active DAG runs, beyond this number of DAG runs in a running state, the scheduler won't create new active DAG runs
  • on_failure_callback, A function to be called when a DagRun of this dag fails.
  • on_success_callback, A function to be called when a DagRun of this dag succeeds.
  • backfill,填充任務赠叼,手動重跑過去失敗的任務(指定日期)
  • catchup擦囊,如果歷史任務出錯,調度器嘗試按調度順序重跑歷史任務(而不是按照當前時間執(zhí)行當前任務)嘴办∷渤。可以在dag中設置dag.catchup = False或者參數(shù)文件中設置catchup_by_default = False來禁用這個功能

2.1.2. airflow.operators,實例化之后涧郊,稱之為Task

操作器贯被,定義任務以哪種方式執(zhí)行。airflow有多種operator,如BashOperator妆艘、DummyOperator彤灶、MySqlOperator、HiveOperator以及社區(qū)貢獻的operator等批旺,其中BaseOperator是所有operator的基礎operator幌陕。

Operator Name Description
BaseOperator 基礎operator,設置baseoperator會影響所有的operator
BashOperator executes a bash command
DummyOperator 空操作
PythonOperator calls an arbitrary Python function
EmailOperator sends an email
HTTPOperator sends an HTTP request
SqlOperator executes a SQL command
DockerOperator execute a command inside a docker container
Sensor waits for a certain time, file, database row, S3 key, etc…
代碼樣例
from airflow.operators.bash_operator import BashOperator

t1 = BashOperator (
    task_id = 'print_date' ,
    bash_command = 'date' ,
    dag = dag )

t2 = BashOperator (
    task_id = 'sleep' ,
    bash_command = 'sleep 5' ,
    retries = 3 ,
    dag = dag )

t1 >> t2
# t2.set_upstream(t1)
# t1.set_downstream(t2)

Task為DAG中具體的作業(yè)任務汽煮,依賴于DAG搏熄,也就是必須存在于某個DAG中。

Task在DAG中可以配置依賴關系(當然也可以配置跨DAG依賴暇赤,但是并不推薦心例。跨DAG依賴會導致DAG圖的直觀性降低翎卓,并給依賴管理帶來麻煩)契邀。

參數(shù)說明:

  • dag: 傳遞一個DAG實例摆寄,以使當前作業(yè)屬于相應DAG
  • task_id: 給任務一個標識符(名字)失暴,方便日后管理
  • owner: 任務的擁有者坯门,方便日后管理
  • start_date: 任務的開始時間,即任務將在這個時間點之后開始調度
  • retries: 失敗后重試次數(shù)
  • trigger_rule

all_success: (default) all parents have succeeded 父task全success

all_failed: all parents are in a failed or upstream_failed state 父task全failed或者upstream_failed狀態(tài)

all_done: all parents are done with their execution 父task全執(zhí)行過逗扒,不管success or failed

one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done 當父task中有一個是failed狀態(tài)時執(zhí)行古戴,不必等到所有的父task都執(zhí)行

one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done 當父task中有一個是success狀態(tài)時執(zhí)行,不必等到所有的父task都執(zhí)行

dummy: dependencies are just for show, trigger at will 無條件執(zhí)行

2.1.3. airflow.executor矩肩,即調度方式

在配置文件config/airflow.cfg中现恼,修改executor變量。

  • SequentialExecutor黍檩,順序調度執(zhí)行叉袍,默認執(zhí)行器,通常只用于測試
  • LocalExecutor刽酱,多進程本地調度執(zhí)行
  • CeleryExecutor喳逛,分布式調度執(zhí)行,生產常用
  • DaskExecutor棵里,在Dask分布式群集中運行Airflow任務润文,主要用于數(shù)據(jù)分析

2.1.4. 其他

  • JOB,最上層的工作殿怜,分為 SchedulerJob典蝌、BackfillJob 和 LocalTaskJob。SchedulerJob 由 Scheduler 創(chuàng)建头谜,BackfillJob 由 Backfill 創(chuàng)建骏掀,LocalTaskJob 由前面兩種 Job 創(chuàng)建
    在早期版本 Airflow 中,DAG 執(zhí)行主要有兩種完全獨立的執(zhí)行途徑:SchedulerJob 和 BackfillJob柱告。在一次較大的重構中增加了 DagRun 方式砖织,以跟蹤 DAG 的執(zhí)行狀態(tài)

2.2. 服務構成

單節(jié)點部署

Webserver

Airflow提供了一個可視化的Web界面。啟動 WebServer 后末荐,就可以在 Web 界面上查看定義好的 DAG 并監(jiān)控及改變運行狀況侧纯。也可以在 Web 界面中對一些變量進行配置。

image.png

Scheduler

整個 Airflow 的調度由 Scheduler 負責發(fā)起甲脏,每隔一段時間 Scheduler 就會檢查所有定義完成的 DAG 和定義在其中的作業(yè)眶熬,如果有符合運行條件的作業(yè),Scheduler 就會發(fā)起相應的作業(yè)任務以供 Worker 接收块请。

Worker

一般來說我們用 Celery Worker 來執(zhí)行具體的作業(yè)娜氏。Worker 可以部署在多臺機器上,并可以分別設置接收的隊列墩新。當接收的隊列中有作業(yè)任務時贸弥,Worker 就會接收這個作業(yè)任務,并開始執(zhí)行海渊。Airflow 會自動在每個部署 Worker 的機器上同時部署一個 Serve Logs 服務绵疲,這樣我們就可以在 Web 界面上方便的瀏覽分散在不同機器上的作業(yè)日志了哲鸳。

Flower

Flower 提供了一個可視化界面以監(jiān)控所有 Celery Worker 的運行狀況。這個服務并不是必要的盔憨。

image.png

3. 單機部署與測試

3.1. Install

pip install apache-airflow

如果遇到ImportError: cannot import name 'resolve_types'徙菠,解決辦法:

pip3 install cattrs==1.0.0

參考:https://github.com/apache/airflow/issues/11965

3.2. 啟動

# 在airflow目錄初始化數(shù)據(jù)庫和airflow配置
airflow db init
# 啟動 airflow web
airflow webserver
# 開始調度
airflow scheduler
  • 其他常用命令
# 測試任務,格式:airflow test dag_id task_id execution_time
airflow test test_task test1 2019-09-10
# 查看生效的 DAGs
airflow list_dags -sd $AIRFLOW_HOME/dags
# 開始運行任務(同 web 界面點 trigger 按鈕)
airflow trigger_dag test_task    
# 暫停任務
airflow pause dag_id      
# 取消暫停郁岩,等同于在 web 管理界面打開 off 按鈕
airflow unpause dag_id     
# 查看 task 列表
airflow list_tasks dag_id  查看task列表
# 清空任務狀態(tài)
airflow clear dag_id       
# 運行task
airflow run dag_id task_id execution_date

3.3. 測試

  • 通過webserver婿奔,可以查看樣例dagtutorial的源碼
# [START tutorial]
# [START import_module]
from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
# [END default_args]

# [START instantiate_dag]
dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)
# [END instantiate_dag]

# t1, t2 and t3 are examples of tasks created by instantiating operators
# [START basic_task]
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)
# [END basic_task]

# [START documentation]
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
# [END documentation]

# [START jinja_template]
templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)
# [END jinja_template]

t1 >> [t2, t3]
# [END tutorial]
  • 通過Web頁面啟動tutorialdag

  • ~/airflow/logs可以查看對應dag的log

  • 通過webserver查看task log,和task執(zhí)行情況

4. 分布式部署與測試

分布式部署

采用docker部署airflow分布式調度系統(tǒng)问慎,編排方式可以是k8s萍摊、swarm,這里采用docker-compose簡單實現(xiàn):
https://github.com/puckel/docker-airflow

$ docker-compose -f docker-compose-CeleryExecutor.yml up -d
$ docker-compose -f docker-compose-CeleryExecutor.yml scale worker=2

5. 參考資料

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末如叼,一起剝皮案震驚了整個濱河市记餐,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌薇正,老刑警劉巖片酝,帶你破解...
    沈念sama閱讀 216,496評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異挖腰,居然都是意外死亡雕沿,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評論 3 392
  • 文/潘曉璐 我一進店門猴仑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來审轮,“玉大人,你說我怎么就攤上這事辽俗〖苍” “怎么了?”我有些...
    開封第一講書人閱讀 162,632評論 0 353
  • 文/不壞的土叔 我叫張陵崖飘,是天一觀的道長榴捡。 經常有香客問我,道長朱浴,這世上最難降的妖魔是什么吊圾? 我笑而不...
    開封第一講書人閱讀 58,180評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮翰蠢,結果婚禮上项乒,老公的妹妹穿的比我還像新娘。我一直安慰自己梁沧,他們只是感情好檀何,可當我...
    茶點故事閱讀 67,198評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般频鉴。 火紅的嫁衣襯著肌膚如雪栓辜。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,165評論 1 299
  • 那天砚殿,我揣著相機與錄音啃憎,去河邊找鬼芝囤。 笑死似炎,一個胖子當著我的面吹牛,可吹牛的內容都是我干的悯姊。 我是一名探鬼主播羡藐,決...
    沈念sama閱讀 40,052評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼悯许!你這毒婦竟也來了仆嗦?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,910評論 0 274
  • 序言:老撾萬榮一對情侶失蹤先壕,失蹤者是張志新(化名)和其女友劉穎瘩扼,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體垃僚,經...
    沈念sama閱讀 45,324評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡集绰,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,542評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了谆棺。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片栽燕。...
    茶點故事閱讀 39,711評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖改淑,靈堂內的尸體忽然破棺而出碍岔,到底是詐尸還是另有隱情,我是刑警寧澤朵夏,帶...
    沈念sama閱讀 35,424評論 5 343
  • 正文 年R本政府宣布蔼啦,位于F島的核電站,受9級特大地震影響仰猖,放射性物質發(fā)生泄漏询吴。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,017評論 3 326
  • 文/蒙蒙 一亮元、第九天 我趴在偏房一處隱蔽的房頂上張望猛计。 院中可真熱鬧,春花似錦爆捞、人聲如沸奉瘤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽盗温。三九已至藕赞,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間卖局,已是汗流浹背斧蜕。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留砚偶,地道東北人批销。 一個月前我還...
    沈念sama閱讀 47,722評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像染坯,于是被迫代替她去往敵國和親均芽。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,611評論 2 353

推薦閱讀更多精彩內容