什么是Airflow柑船?
Airflow 是一個(gè)使用 python 語(yǔ)言編寫(xiě)的 data pipeline 調(diào)度和監(jiān)控工作流的平臺(tái)究孕。Airflow 被 Airbnb 內(nèi)部用來(lái)創(chuàng)建洽洁、監(jiān)控和調(diào)整數(shù)據(jù)管道绞惦。任何工作流都可以在這個(gè)使用 Python 來(lái)編寫(xiě)的平臺(tái)上運(yùn)行飒箭。Airflow是通過(guò)DAG(Directed acyclic graph)來(lái)管理任務(wù)流程的任務(wù)調(diào)度工具氛雪,她不需要知道業(yè)務(wù)數(shù)據(jù)的具體內(nèi)容,設(shè)置任務(wù)的依賴(lài)關(guān)系實(shí)現(xiàn)任務(wù)調(diào)度卦尊。
這個(gè)平臺(tái)擁有和 Hive叛拷、Presto、MySQL岂却、HDFS忿薇、Postgres 和 S3 交互的能力,并且提供了鉤子(hook)使得系統(tǒng)擁有很好地?cái)U(kuò)展性躏哩。除了一個(gè)命令行界面署浩,該工具還提供了一個(gè)基于 Web 的用戶(hù)界面讓您可以可視化管道的依賴(lài)關(guān)系、監(jiān)控進(jìn)度扫尺、觸發(fā)任務(wù)等筋栋。
Airflow 的架構(gòu)
在一個(gè)可擴(kuò)展的生產(chǎn)環(huán)境中,Airflow 含有以下組件:
- 一個(gè)元數(shù)據(jù)庫(kù)(MySQL 或 Postgres)
- 一組 Airflow 工作節(jié)點(diǎn)
- 一個(gè)調(diào)節(jié)器(Redis 或 RabbitMQ)
-
一個(gè) Airflow Web 服務(wù)器
功能簡(jiǎn)介
任務(wù)依賴(lài)
通常正驻,在一個(gè)運(yùn)維系統(tǒng)弊攘,數(shù)據(jù)分析系統(tǒng),或測(cè)試系統(tǒng)等大型系統(tǒng)中姑曙,我們會(huì)有各種各樣的依賴(lài)需求襟交。比如:
- 時(shí)間依賴(lài):任務(wù)需要等待某一個(gè)時(shí)間點(diǎn)觸發(fā)。
- 外部系統(tǒng)依賴(lài):任務(wù)依賴(lài) Mysql 中的數(shù)據(jù)伤靠,HDFS 中的數(shù)據(jù)等等捣域,這些不同的外部系統(tǒng)需要調(diào)用接口去訪問(wèn)。
- 機(jī)器依賴(lài):任務(wù)的執(zhí)行只能在特定的某一臺(tái)機(jī)器的環(huán)境中宴合,可能這臺(tái)機(jī)器內(nèi)存比較大焕梅,也可能只有那臺(tái)機(jī)器上有特殊的庫(kù)文件。
- 任務(wù)間依賴(lài):任務(wù) A 需要在任務(wù) B 完成后啟動(dòng)卦洽,兩個(gè)任務(wù)互相間會(huì)產(chǎn)生影響贞言。
- 資源依賴(lài):任務(wù)消耗資源非常多,使用同一個(gè)資源的任務(wù)需要被限制逐样,比如跑個(gè)數(shù)據(jù)轉(zhuǎn)換任務(wù)要10個(gè) G蜗字,機(jī)器一共就30個(gè) G打肝,最多只能跑兩個(gè)脂新,我希望類(lèi)似的任務(wù)排個(gè)隊(duì)。
- 權(quán)限依賴(lài):某種任務(wù)只能由某個(gè)權(quán)限的用戶(hù)啟動(dòng)粗梭。
也許大家會(huì)覺(jué)得這些是在任務(wù)程序中的邏輯需要處理的部分争便,但是我認(rèn)為,這些邏輯可以抽象為任務(wù)控制邏輯的部分断医,和實(shí)際任務(wù)執(zhí)行邏輯解耦合滞乙。
Airflow的處理依賴(lài)的方式
Airflow 和crontab不同奏纪。crontab 可以很好地處理定時(shí)執(zhí)行任務(wù)的需求,它是一種依賴(lài)管理系統(tǒng)斩启,而且只管理時(shí)間上的依賴(lài)序调。Airflow 的核心概念,是 DAG (有向無(wú)環(huán)圖)兔簇,DAG 由一個(gè)或多個(gè) TASK 組成发绢,而這個(gè) DAG 正是解決了上文所說(shuō)的任務(wù)間依賴(lài)。Task A 執(zhí)行完成后才能執(zhí)行 Task B垄琐,多個(gè)Task之間的依賴(lài)關(guān)系可以很好的用DAG表示完善边酒。
Airflow 完整的支持 crontab 表達(dá)式,也支持直接使用 python 的 datatime 表述時(shí)間狸窘,還可以用 datatime 的 delta 表述時(shí)間差墩朦。這樣可以解決任務(wù)的時(shí)間依賴(lài)問(wèn)題。
Airflow 在 CeleryExecuter 下可以使用不同的用戶(hù)啟動(dòng) Worke r翻擒,不同的 Worker 監(jiān)聽(tīng)不同的 Queue 氓涣,這樣可以解決用戶(hù)權(quán)限依賴(lài)問(wèn)題。Worker 也可以啟動(dòng)在多個(gè)不同的機(jī)器上陋气,解決機(jī)器依賴(lài)的問(wèn)題春哨。
Airflow 可以為任意一個(gè) Task 指定一個(gè)抽象的 Pool,每個(gè) Pool 可以指定一個(gè) Slot 數(shù)恩伺。每當(dāng)一個(gè) Task 啟動(dòng)時(shí)赴背,就占用一個(gè) Slot ,當(dāng) Slot 數(shù)占滿(mǎn)時(shí)晶渠,其余的任務(wù)就處于等待狀態(tài)凰荚。這樣就解決了資源依賴(lài)問(wèn)題。
Airflow 中有 Hook 機(jī)制(其實(shí)我覺(jué)得不應(yīng)該叫 Hook )褒脯,作用時(shí)建立一個(gè)與外部數(shù)據(jù)系統(tǒng)之間的連接便瑟,比如 Mysql,HDFS番川,本地文件系統(tǒng)(文件系統(tǒng)也被認(rèn)為是外部系統(tǒng))等到涂,通過(guò)拓展 Hook 能夠接入任意的外部系統(tǒng)的接口進(jìn)行連接,這樣就解決的外部系統(tǒng)依賴(lài)問(wèn)題颁督。
Airflow的命令
- airflow webserver -p 8080 打開(kāi)webserver
- airflow scheduler 調(diào)度器践啄,必須啟動(dòng),不然dag沒(méi)法run起來(lái)(使用CeleryExecutor沉御、LocalExecutor時(shí))
- airflow run dagid [time] run task instance
- airflow backfill [dagid] -s[startTime] -e [endTime] run a backfill over 2 days
run的demo
# run your first task instance
airflow run example_bash_operator runme_0 2018-01-11
# run a backfill over 2 days
airflow backfill example_bash_operator -s 2018-01-10 -e 2018-01-11
基于CeleryExecutor方式的系統(tǒng)架構(gòu)
基于CeleryExecutor方式的系統(tǒng)架構(gòu)
使用celery方式的系統(tǒng)架構(gòu)圖(官方推薦使用這種方式屿讽,同時(shí)支持mesos方式部署)。turing為外部系統(tǒng)吠裆,GDags服務(wù)幫助拼接成dag伐谈,可以忽略烂完。
1.master節(jié)點(diǎn)webui管理dags、日志等信息诵棵。scheduler負(fù)責(zé)調(diào)度抠蚣,只支持單節(jié)點(diǎn),多節(jié)點(diǎn)啟動(dòng)scheduler可能會(huì)掛掉
2.worker負(fù)責(zé)執(zhí)行具體dag中的task履澳。這樣不同的task可以在不同的環(huán)境中執(zhí)行柱徙。
基于LocalExecutor方式的系統(tǒng)架構(gòu)圖
另一種啟動(dòng)方式的思考,一個(gè)dag分配到1臺(tái)機(jī)器上執(zhí)行奇昙。如果task不復(fù)雜同時(shí)task環(huán)境相同护侮,可以采用這種方式,方便擴(kuò)容储耐、管理羊初,同時(shí)沒(méi)有master單點(diǎn)問(wèn)題。
從安裝airflow到成功運(yùn)行什湘,踩了很多坑长赞,憑借著回憶簡(jiǎn)單記錄一下。
運(yùn)行常用命令
- Initialize the db 第一次安裝完初始化airflow
airflow initdb
- 開(kāi)啟webserver闽撤,登陸網(wǎng)頁(yè)http://localhost:8080可以看到dag運(yùn)行狀況
airflow webserver --port 8080
- test the 單個(gè)DAG
$ python airflow_dag.py #先編譯python
編譯成功之后test:
#格式:airflow test dag_id task_id execution_time
$ airflow test airflow_tutorial print_world 2017-07-01
- 測(cè)試成功之后運(yùn)行
#開(kāi)啟調(diào)度任務(wù)器
$ airflow scheduler
# 開(kāi)始運(yùn)行任務(wù)
$ airflow trigger {$dag}.py
這一步也可以在web界面點(diǎn)trigger按鈕
一點(diǎn)總結(jié)
我用airflow執(zhí)行的任務(wù)是從一個(gè)數(shù)據(jù)庫(kù)中同步數(shù)據(jù)到另一個(gè)數(shù)據(jù)庫(kù)得哆。本來(lái)想用airflow里的hook連數(shù)據(jù)庫(kù),進(jìn)行同步哟旗,但是程序死卡在running死活不運(yùn)行贩据。這樣研究了好久也沒(méi)研究出來(lái)原因。網(wǎng)上關(guān)于airflow的文檔都停留在介紹和入門(mén)闸餐,很少有實(shí)際運(yùn)行的例子饱亮。
后來(lái)才發(fā)現(xiàn),其實(shí)沒(méi)必要非去使用airflow連數(shù)據(jù)庫(kù)的方法舍沙。airflow相當(dāng)于一個(gè)工具近上,一個(gè)嵌入在python文件中的工具。所以在寫(xiě)airflow時(shí)拂铡,其他部分功能定義正常寫(xiě)壹无,連數(shù)據(jù)庫(kù)啊,分割字符串啊感帅,這些都按照python里的方法來(lái)寫(xiě)就可以了斗锭,最后再用airflow定義task前后依賴(lài)關(guān)系。
搞清楚這點(diǎn)留瞳,就會(huì)發(fā)現(xiàn)其實(shí)airflow非常好用拒迅。