聲明:
本文轉(zhuǎn)自我的個人博客绑改,有興趣的可以查看原文。
轉(zhuǎn)發(fā)請注明來源逊谋。
最近工作需要擂达,使用airflow搭建了公司的ETL系統(tǒng),順帶在公司分享了一次airflow胶滋,整理成文板鬓,Enjoy!
1. airflow 介紹
1.1 airflow 是什么
Airflow is a platform to programmatically author, schedule and monitor workflows.
airflow 是一個編排究恤、調(diào)度和監(jiān)控workflow的平臺俭令,由Airbnb開源,現(xiàn)在在Apache Software Foundation 孵化部宿。airflow 將workflow編排為tasks組成的DAGs抄腔,調(diào)度器在一組workers上按照指定的依賴關(guān)系執(zhí)行tasks。同時理张,airflow 提供了豐富的命令行工具和簡單易用的用戶界面以便用戶查看和操作赫蛇,并且airflow提供了監(jiān)控和報警系統(tǒng)。
1.2 airflow 核心概念
- DAGs:即有向無環(huán)圖(Directed Acyclic Graph)雾叭,將所有需要運行的tasks按照依賴關(guān)系組織起來悟耘,描述的是所有tasks執(zhí)行的順序。
- Operators:可以簡單理解為一個class织狐,描述了DAG中一個具體的task具體要做的事暂幼。其中,airflow內(nèi)置了很多operators移迫,如
BashOperator
執(zhí)行一個bash 命令旺嬉,PythonOperator
調(diào)用任意的Python 函數(shù),EmailOperator
用于發(fā)送郵件厨埋,HTTPOperator
用于發(fā)送HTTP請求邪媳,SqlOperator
用于執(zhí)行SQL命令...同時,用戶可以自定義Operator,這給用戶提供了極大的便利性悲酷。 - Tasks:Task 是 Operator的一個實例套菜,也就是DAGs中的一個node。
- Task Instance:task的一次運行设易。task instance 有自己的狀態(tài),包括"running", "success", "failed", "skipped", "up for retry"等蛹头。
- Task Relationships:DAGs中的不同Tasks之間可以有依賴關(guān)系顿肺,如
TaskA >> TaskB
,表明TaskB依賴于TaskA渣蜗。
通過將DAGs和Operators結(jié)合起來屠尊,用戶就可以創(chuàng)建各種復(fù)雜的 workflow了。
1.3 其它概念
- Connections: 管理外部系統(tǒng)的連接信息耕拷,如外部MySQL讼昆、HTTP服務(wù)等,連接信息包括
conn_id
/hostname
/login
/password
/schema
等骚烧,可以通過界面查看和管理浸赫,編排workflow時,使用conn_id
進(jìn)行使用赃绊。 - Pools: 用來控制tasks執(zhí)行的并行數(shù)既峡。將一個task賦給一個指定的
pool
,并且指明priority_weight
碧查,可以干涉tasks的執(zhí)行順序运敢。 - XComs:在airflow中,operator一般(not always)是原子的忠售,也就是說传惠,他們一般獨立執(zhí)行,同時也不需要和其他operator共享信息稻扬,如果兩個operators需要共享信息卦方,如filename之類的, 推薦將這兩個operators組合成一個operator腐螟。如果實在不能避免愿汰,則可以使用XComs (cross-communication)來實現(xiàn)。XComs用來在不同tasks之間交換信息乐纸。
- Trigger Rules:指task的觸發(fā)條件衬廷。默認(rèn)情況下是task的直接上游執(zhí)行成功后開始執(zhí)行,airflow允許更復(fù)雜的依賴設(shè)置汽绢,包括
all_success
(所有的父節(jié)點執(zhí)行成功)吗跋,all_failed
(所有父節(jié)點處于failed或upstream_failed狀態(tài)),all_done
(所有父節(jié)點執(zhí)行完成),one_failed
(一旦有一個父節(jié)點執(zhí)行失敗就觸發(fā)跌宛,不必等所有父節(jié)點執(zhí)行完成)酗宋,one_success
(一旦有一個父節(jié)點執(zhí)行成功就觸發(fā),不必等所有父節(jié)點執(zhí)行完成)疆拘,dummy
(依賴關(guān)系只是用來查看的蜕猫,可以任意觸發(fā))。另外哎迄,airflow提供了depends_on_past
回右,設(shè)置為True時,只有上一次調(diào)度成功了漱挚,才可以觸發(fā)翔烁。
2. 示例
先來看一個簡單的DAG。圖中每個節(jié)點表示一個task旨涝,所有tasks組成一個DAG蹬屹,各個tasks之間的依賴關(guān)系可以根據(jù)節(jié)點之間的線看出來。
2.1 實例化DAG
# -*- coding: UTF-8 -*-
## 導(dǎo)入airflow需要的modules
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
'owner': 'lxwei',
'depends_on_past': False, # 如上文依賴關(guān)系所示
'start_date': datetime(2018, 1, 17), # DAGs都有個參數(shù)start_date白华,表示調(diào)度器調(diào)度的起始時間
'email': ['lxwei@github.com'], # 用于alert
'email_on_failure': True,
'email_on_retry': False,
'retries': 3, # 重試策略
'retry_delay': timedelta(minutes=5)
}
dag = DAG('example-dag', default_args=default_args, schedule_interval='0 0 * * *')
在創(chuàng)建DAGs時慨默,我們可以顯示的給每個Task傳遞參數(shù),但通過default_args衬鱼,我們可以定義一個默認(rèn)參數(shù)用于創(chuàng)建tasks业筏。
注意,schedule_interval 跟官方文檔不一致鸟赫,官方文檔的方式已經(jīng)被deprecated蒜胖。
2.2 定義依賴關(guān)系
這個依賴關(guān)系是我自己定義的,key表示某個taskId抛蚤,value里的每個元素也表示一個taskId台谢,其中,key依賴value里的所有task岁经。
"dependencies": {
"goods_sale_2": ["goods_sale_1"], # goods_sale_2 依賴 goods_sale1
"shop_sale_1_2": ["shop_sale_1_1"],
"shop_sale_2_2": ["shop_sale_2_1"],
"shop_sale_2_3": ["shop_sale_2_2"],
"etl_task": ["shop_info", "shop_sale_2_3", "shop_sale_realtime_1", "goods_sale_2", "shop_sale_1_2"],
"goods_sale_1": ["timelySalesCheck", "productDaySalesCheck"],
"shop_sale_1_1": ["timelySalesCheck", "productDaySalesCheck"],
"shop_sale_realtime_1": ["timelySalesCheck", "productDaySalesCheck"],
"shop_sale_2_1": ["timelySalesCheck", "productDaySalesCheck"],
"shop_info": ["timelySalesCheck", "productDaySalesCheck"]
}
2.3 定義tasks和依賴關(guān)系
首先朋沮,實例化operators,構(gòu)造tasks缀壤。如代碼所示樊拓,其中,EtlTask
塘慕、MySQLToWebDataTransfer
筋夏、MySQLSelector
是自定義的三種Operator,根據(jù)taskType實例化operator图呢,并存放到taskDict中条篷,便于后期建立tasks之間的依賴關(guān)系骗随。
for taskConf in tasksConfs:
taskType = taskConf.get("taskType")
if taskType == "etlTask":
task = EtlTask(
task_id=taskConf.get("taskId"),
httpConnId=httpConn,
etlId=taskConf.get("etlId"),
dag=dag)
taskDict[taskConf.get("taskId")] = task
elif taskType == "MySQLToWebDataTransfer":
task = MySqlToWebdataTransfer(
task_id = taskConf.get("taskId"),
sql= taskConf.get("sql"),
tableName=taskConf.get("tableName"),
mysqlConnId =mysqlConn,
httpConnId=httpConn,
dag=dag
)
taskDict[taskConf.get("taskId")] = task
elif taskType == "MySQLSelect":
task = StatusChecker(
task_id = taskConf.get("taskId"),
mysqlConnId = mysqlConn,
sql = taskConf.get("sql"),
dag = dag
)
taskDict[taskConf.get("taskId")] = task
else:
logging.error("error. TaskType is illegal.")
構(gòu)建tasks之間的依賴關(guān)系,其中赴叹,dependencies中定義了上面的依賴關(guān)系鸿染,A >> B
表示A是B的父節(jié)點,相應(yīng)的乞巧,A << B
表示A是B的子節(jié)點涨椒。
for sourceKey in dependencies:
destTask = taskDict.get(sourceKey)
sourceTaskKeys = dependencies.get(sourceKey)
for key in sourceTaskKeys:
sourceTask = taskDict.get(key)
if (sourceTask != None and destTask != None):
sourceTask >> destTask
3. 常用命令
命令行輸入airflow -h
,得到幫助文檔
backfill Run subsections of a DAG for a specified date range list_tasks List the tasks within a DAG clear Clear a set of task instance, as if they never ran pause Pause a DAG unpause Resume a paused DAG trigger_dag Trigger a DAG run pool CRUD operations on pools variables CRUD operations on variables kerberos Start a kerberos ticket renewer render Render a task instance's template(s) run Run a single task instance initdb Initialize the metadata database list_dags List all the DAGs dag_state Get the status of a dag run task_failed_deps Returns the unmet dependencies for a task instance from the perspective of the scheduler. In other words, why a task instance doesn't get scheduled and then queued by the scheduler, and then run by an executor). task_state Get the status of a task instance serve_logs Serve logs generate by worker test Test a task instance. This will run a task without checking for dependencies or recording it's state in the database. webserver Start a Airflow webserver instance resetdb Burn down and rebuild the metadata database upgradedb Upgrade the metadata database to latest version scheduler Start a scheduler instance worker Start a Celery worker node flower Start a Celery Flower version Show the version connections List/Add/Delete connections
其中摊欠,使用較多的是backfill丢烘、run、test些椒、webserver、scheduler掸刊。其他操作在web界面操作更方便免糕。另外,initdb 用于初始化metadata忧侧,使用一次即可石窑;resetdb會重置metadata,清除掉數(shù)據(jù)(如connection數(shù)據(jù)), 需要慎用蚓炬。
4. 問題
在使用airflow過程中松逊,曾把DAGs里的task拆分得很細(xì),這樣的話肯夏,如果某個task失敗经宏,重跑的代價會比較低。但是驯击,在實踐中發(fā)現(xiàn)烁兰,tasks太多時,airflow在調(diào)度tasks會很低效徊都,airflow一直處于選擇待執(zhí)行的task的過程中沪斟,會長時間沒有具體task在執(zhí)行,從而整體執(zhí)行效率大幅降低暇矫。
5. 總結(jié)
airflow 很好很強(qiáng)大主之。如果只是簡單的ETL之類的工作,可以很容易的編排李根。調(diào)度靈活槽奕,而且監(jiān)控和報警系統(tǒng)完備,可以很方便的投入生產(chǎn)環(huán)節(jié)朱巨。