airflow 介紹

聲明:
本文轉(zhuǎn)自我的個人博客绑改,有興趣的可以查看原文。
轉(zhuǎn)發(fā)請注明來源逊谋。

最近工作需要擂达,使用airflow搭建了公司的ETL系統(tǒng),順帶在公司分享了一次airflow胶滋,整理成文板鬓,Enjoy!

1. airflow 介紹

1.1 airflow 是什么

Airflow is a platform to programmatically author, schedule and monitor workflows.

airflow 是一個編排究恤、調(diào)度和監(jiān)控workflow的平臺俭令,由Airbnb開源,現(xiàn)在在Apache Software Foundation 孵化部宿。airflow 將workflow編排為tasks組成的DAGs抄腔,調(diào)度器在一組workers上按照指定的依賴關(guān)系執(zhí)行tasks。同時理张,airflow 提供了豐富的命令行工具和簡單易用的用戶界面以便用戶查看和操作赫蛇,并且airflow提供了監(jiān)控和報警系統(tǒng)。

1.2 airflow 核心概念

  1. DAGs:即有向無環(huán)圖(Directed Acyclic Graph)雾叭,將所有需要運行的tasks按照依賴關(guān)系組織起來悟耘,描述的是所有tasks執(zhí)行的順序。
  2. Operators:可以簡單理解為一個class织狐,描述了DAG中一個具體的task具體要做的事暂幼。其中,airflow內(nèi)置了很多operators移迫,如BashOperator 執(zhí)行一個bash 命令旺嬉,PythonOperator 調(diào)用任意的Python 函數(shù),EmailOperator 用于發(fā)送郵件厨埋,HTTPOperator 用于發(fā)送HTTP請求邪媳, SqlOperator 用于執(zhí)行SQL命令...同時,用戶可以自定義Operator,這給用戶提供了極大的便利性悲酷。
  3. Tasks:Task 是 Operator的一個實例套菜,也就是DAGs中的一個node。
  4. Task Instance:task的一次運行设易。task instance 有自己的狀態(tài),包括"running", "success", "failed", "skipped", "up for retry"等蛹头。
  5. Task Relationships:DAGs中的不同Tasks之間可以有依賴關(guān)系顿肺,如 TaskA >> TaskB,表明TaskB依賴于TaskA渣蜗。

通過將DAGs和Operators結(jié)合起來屠尊,用戶就可以創(chuàng)建各種復(fù)雜的 workflow了。

1.3 其它概念

  1. Connections: 管理外部系統(tǒng)的連接信息耕拷,如外部MySQL讼昆、HTTP服務(wù)等,連接信息包括conn_idhostnameloginpasswordschema 等骚烧,可以通過界面查看和管理浸赫,編排workflow時,使用conn_id 進(jìn)行使用赃绊。
  2. Pools: 用來控制tasks執(zhí)行的并行數(shù)既峡。將一個task賦給一個指定的pool,并且指明priority_weight碧查,可以干涉tasks的執(zhí)行順序运敢。
  3. XComs:在airflow中,operator一般(not always)是原子的忠售,也就是說传惠,他們一般獨立執(zhí)行,同時也不需要和其他operator共享信息稻扬,如果兩個operators需要共享信息卦方,如filename之類的, 推薦將這兩個operators組合成一個operator腐螟。如果實在不能避免愿汰,則可以使用XComs (cross-communication)來實現(xiàn)。XComs用來在不同tasks之間交換信息乐纸。
  4. Trigger Rules:指task的觸發(fā)條件衬廷。默認(rèn)情況下是task的直接上游執(zhí)行成功后開始執(zhí)行,airflow允許更復(fù)雜的依賴設(shè)置汽绢,包括all_success(所有的父節(jié)點執(zhí)行成功)吗跋,all_failed(所有父節(jié)點處于failed或upstream_failed狀態(tài)),all_done(所有父節(jié)點執(zhí)行完成),one_failed(一旦有一個父節(jié)點執(zhí)行失敗就觸發(fā)跌宛,不必等所有父節(jié)點執(zhí)行完成)酗宋,one_success(一旦有一個父節(jié)點執(zhí)行成功就觸發(fā),不必等所有父節(jié)點執(zhí)行完成)疆拘,dummy(依賴關(guān)系只是用來查看的蜕猫,可以任意觸發(fā))。另外哎迄,airflow提供了depends_on_past回右,設(shè)置為True時,只有上一次調(diào)度成功了漱挚,才可以觸發(fā)翔烁。

2. 示例

先來看一個簡單的DAG。圖中每個節(jié)點表示一個task旨涝,所有tasks組成一個DAG蹬屹,各個tasks之間的依賴關(guān)系可以根據(jù)節(jié)點之間的線看出來。

DAGs

2.1 實例化DAG

# -*- coding: UTF-8 -*-

## 導(dǎo)入airflow需要的modules
from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'lxwei',
    'depends_on_past': False, # 如上文依賴關(guān)系所示
    'start_date': datetime(2018, 1, 17), # DAGs都有個參數(shù)start_date白华,表示調(diào)度器調(diào)度的起始時間
    'email': ['lxwei@github.com'], # 用于alert
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3, # 重試策略
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('example-dag', default_args=default_args, schedule_interval='0 0 * * *')

在創(chuàng)建DAGs時慨默,我們可以顯示的給每個Task傳遞參數(shù),但通過default_args衬鱼,我們可以定義一個默認(rèn)參數(shù)用于創(chuàng)建tasks业筏。

注意,schedule_interval 跟官方文檔不一致鸟赫,官方文檔的方式已經(jīng)被deprecated蒜胖。

2.2 定義依賴關(guān)系

這個依賴關(guān)系是我自己定義的,key表示某個taskId抛蚤,value里的每個元素也表示一個taskId台谢,其中,key依賴value里的所有task岁经。

"dependencies": {
    "goods_sale_2": ["goods_sale_1"], # goods_sale_2 依賴 goods_sale1
    "shop_sale_1_2": ["shop_sale_1_1"],
    "shop_sale_2_2": ["shop_sale_2_1"],
    "shop_sale_2_3": ["shop_sale_2_2"],
    "etl_task": ["shop_info", "shop_sale_2_3", "shop_sale_realtime_1", "goods_sale_2", "shop_sale_1_2"],
    "goods_sale_1": ["timelySalesCheck", "productDaySalesCheck"],
    "shop_sale_1_1": ["timelySalesCheck", "productDaySalesCheck"],
    "shop_sale_realtime_1": ["timelySalesCheck", "productDaySalesCheck"],
    "shop_sale_2_1": ["timelySalesCheck", "productDaySalesCheck"],
    "shop_info": ["timelySalesCheck", "productDaySalesCheck"]
}

2.3 定義tasks和依賴關(guān)系

首先朋沮,實例化operators,構(gòu)造tasks缀壤。如代碼所示樊拓,其中,EtlTask塘慕、MySQLToWebDataTransfer筋夏、MySQLSelector 是自定義的三種Operator,根據(jù)taskType實例化operator图呢,并存放到taskDict中条篷,便于后期建立tasks之間的依賴關(guān)系骗随。

for taskConf in tasksConfs:
    taskType = taskConf.get("taskType")
    if taskType == "etlTask":
        task = EtlTask(
            task_id=taskConf.get("taskId"),
            httpConnId=httpConn,
            etlId=taskConf.get("etlId"),
            dag=dag)
        taskDict[taskConf.get("taskId")] = task
    elif taskType == "MySQLToWebDataTransfer":
        task = MySqlToWebdataTransfer(
            task_id = taskConf.get("taskId"),
            sql= taskConf.get("sql"),
            tableName=taskConf.get("tableName"),
            mysqlConnId =mysqlConn,
            httpConnId=httpConn,
            dag=dag
        )
        taskDict[taskConf.get("taskId")] = task
    elif taskType == "MySQLSelect":
        task = StatusChecker(
            task_id = taskConf.get("taskId"),
            mysqlConnId = mysqlConn,
            sql = taskConf.get("sql"),
            dag = dag
        )
        taskDict[taskConf.get("taskId")] = task
    else:
        logging.error("error. TaskType is illegal.")

構(gòu)建tasks之間的依賴關(guān)系,其中赴叹,dependencies中定義了上面的依賴關(guān)系鸿染,A >> B 表示A是B的父節(jié)點,相應(yīng)的乞巧,A << B 表示A是B的子節(jié)點涨椒。

for sourceKey in dependencies:
    destTask = taskDict.get(sourceKey)
    sourceTaskKeys = dependencies.get(sourceKey)
    for key in sourceTaskKeys:
        sourceTask = taskDict.get(key)
        if (sourceTask != None and destTask != None):
            sourceTask >> destTask

3. 常用命令

命令行輸入airflow -h,得到幫助文檔

backfill            Run subsections of a DAG for a specified date range
list_tasks          List the tasks within a DAG
clear               Clear a set of task instance, as if they never ran
pause               Pause a DAG
unpause             Resume a paused DAG
trigger_dag         Trigger a DAG run
pool                CRUD operations on pools
variables           CRUD operations on variables
kerberos            Start a kerberos ticket renewer
render              Render a task instance's template(s)
run                 Run a single task instance
initdb              Initialize the metadata database
list_dags           List all the DAGs
dag_state           Get the status of a dag run
task_failed_deps    Returns the unmet dependencies for a task instance
                    from the perspective of the scheduler. In other words,
                    why a task instance doesn't get scheduled and then
                    queued by the scheduler, and then run by an executor).
task_state          Get the status of a task instance
serve_logs          Serve logs generate by worker
test                Test a task instance. This will run a task without
                    checking for dependencies or recording it's state in
                    the database.
webserver           Start a Airflow webserver instance
resetdb             Burn down and rebuild the metadata database
upgradedb           Upgrade the metadata database to latest version
scheduler           Start a scheduler instance
worker              Start a Celery worker node
flower              Start a Celery Flower
version             Show the version
connections         List/Add/Delete connections

其中摊欠,使用較多的是backfill丢烘、runtest些椒、webserverscheduler掸刊。其他操作在web界面操作更方便免糕。另外,initdb 用于初始化metadata忧侧,使用一次即可石窑;resetdb會重置metadata,清除掉數(shù)據(jù)(如connection數(shù)據(jù)), 需要慎用蚓炬。

4. 問題

在使用airflow過程中松逊,曾把DAGs里的task拆分得很細(xì),這樣的話肯夏,如果某個task失敗经宏,重跑的代價會比較低。但是驯击,在實踐中發(fā)現(xiàn)烁兰,tasks太多時,airflow在調(diào)度tasks會很低效徊都,airflow一直處于選擇待執(zhí)行的task的過程中沪斟,會長時間沒有具體task在執(zhí)行,從而整體執(zhí)行效率大幅降低暇矫。

5. 總結(jié)

airflow 很好很強(qiáng)大主之。如果只是簡單的ETL之類的工作,可以很容易的編排李根。調(diào)度靈活槽奕,而且監(jiān)控和報警系統(tǒng)完備,可以很方便的投入生產(chǎn)環(huán)節(jié)朱巨。

6. 參閱

airflow 官網(wǎng)
github

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末史翘,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌琼讽,老刑警劉巖必峰,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異钻蹬,居然都是意外死亡吼蚁,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進(jìn)店門问欠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來肝匆,“玉大人,你說我怎么就攤上這事顺献∑旃” “怎么了钝满?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵扰肌,是天一觀的道長。 經(jīng)常有香客問我凌简,道長肿轨,這世上最難降的妖魔是什么寿冕? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮椒袍,結(jié)果婚禮上驼唱,老公的妹妹穿的比我還像新娘。我一直安慰自己驹暑,他們只是感情好玫恳,可當(dāng)我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著岗钩,像睡著了一般纽窟。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上兼吓,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天臂港,我揣著相機(jī)與錄音,去河邊找鬼视搏。 笑死审孽,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的浑娜。 我是一名探鬼主播佑力,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼筋遭!你這毒婦竟也來了打颤?” 一聲冷哼從身側(cè)響起暴拄,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎编饺,沒想到半個月后乖篷,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡透且,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年撕蔼,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片秽誊。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡鲸沮,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出锅论,到底是詐尸還是另有隱情讼溺,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布最易,位于F島的核電站肾胯,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏耘纱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一毕荐、第九天 我趴在偏房一處隱蔽的房頂上張望束析。 院中可真熱鬧,春花似錦憎亚、人聲如沸员寇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蝶锋。三九已至,卻和暖如春什往,著一層夾襖步出監(jiān)牢的瞬間扳缕,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工别威, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留躯舔,地道東北人。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓省古,卻偏偏與公主長得像粥庄,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子豺妓,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,515評論 2 359