1. 什么是Airflow
?Airflow是Airbnb開源的data pipeline調(diào)度和監(jiān)控工作流的平臺,用于用來創(chuàng)建、監(jiān)控和調(diào)整data pipeline(ETL)。
2. 簡單的定時任務(wù)cron
?假設(shè)我們想要定時調(diào)用一個程序棚辽,比如說:每天定時從Web抓數(shù)據(jù),我們可以使用cron。cron是一個Linux下的后臺服務(wù)诅诱,用來定期的執(zhí)行一些任務(wù),在/etc/crontab中設(shè)置后即可送朱,基本寫法如下:
# 分鐘 小時 日 月 周 用戶 命令
17 * * * * root date >> /tmp/time.log
?它的意思是每個小時的第18分鐘娘荡,將當前時間寫入log文件,注意各值的取值范圍(分鐘 0 - 59驶沼,小時0 - 23炮沐,天1 - 31,月1 - 12回怜,星期0 - 6大年,0表示星期天) 修改/etc/crontab后,還需要用 $ sudo service cron restart命令重啟crontab任務(wù)玉雾,才能生效翔试。
3. 為什么要用Airflow
?有了cron為什么還需要airflow?以抓取web數(shù)據(jù)為例抹凳,可能在某天抓取數(shù)據(jù)時遏餐,網(wǎng)斷或者關(guān)機了,當天的數(shù)據(jù)沒抓進來赢底,這種情況下失都,只能通過寫日志定時分析日志,以及在程序中定時重連的方式保證數(shù)據(jù)完整幸冻,相對比較零碎和麻煩粹庞。另外,如果crontab設(shè)置文件中有幾十上百條任務(wù)時洽损,就比較頭疼了庞溜。
?Airflow支持圖形界面和命令行兩種方式,管理起來比較方便,另外流码,它可以把幾個相互依賴的任務(wù)編成一組又官,并監(jiān)督執(zhí)行是否正常,如果不正常漫试,調(diào)用程序重試等等六敬。
?當然,Airflow也不全是優(yōu)點驾荣,比如需要使用python腳本來定義任務(wù)間的依賴關(guān)系外构,相對于手動編輯crontab文件,相對難一些播掷。因此审编,如果只調(diào)用簡單的任務(wù),使用cron即可歧匈,復(fù)雜的再考慮airflow垒酬。
4. Airflow的基礎(chǔ)概念
?Airflow 中最基本的兩個概念是:DAG 和 task。DAG 的全稱是 Directed Acyclic Graph 是所有你想執(zhí)行的任務(wù)的集合眯亦,在這個集合中可以定義了他們的依賴關(guān)系伤溉,一個 DAG object可以用 Python 腳本中配置完成般码。每個 DAG object 代表了一個 workflow妻率,每個 workflow 都可以包含任意個 task,task就是具體的任務(wù)板祝。
5. Airflow安裝和使用
(1) 安裝airflow
$ sudo pip install airflow
?可以通過環(huán)境變量AIRFLOW_HOME 設(shè)置airflow的工作目錄宫静,默認為$HOME/airflow/
(2) Mysql支持
?如果想使用mysql存儲airflow內(nèi)容,請按如下方法設(shè)置mysql券时;如果不設(shè)置孤里,airflow在其工作目錄下建立db文件,以sqlite方式存儲橘洞。
$ mysql -u root -p
mysql> create database airflow default charset utf8 collate utf8_general_ci;
mysql> create user airflow@'localhost' identified by 'airflow';
mysql> grant all on airflow.* to airflow@'localhost';
mysql> flush privileges;
?修改配置文件 $AIRFLOW_HOME/airflow.cfg捌袜,把sql_alchemy_conn對應(yīng)語句替換成:
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
(3) 運行
$ airflow initdb
$ airflow worker
$ airflow webserver -p 8080 # 一直運行
$ airflow scheduler # 一直運行
?此時在瀏覽器中輸入:http://localhost:8080,即可看到airflow界面炸枣,其中有很多demo可以參考虏等。
(4) 建立第一個DAG:Hellow world
$ mkdir $AIRFLOW_HOME/dags/
$ vi $AIRFLOW_HOME/dags/hello_word.py # 內(nèi)容如下:
# -*- coding: utf-8 -*-
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta
default_args = {
'owner': 'yan.xie',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'retries': 5, # 重試次數(shù)
'retry_delay': timedelta(minutes=1), # 運行間隔時間
}
dag = DAG(
'test_my_dag', # DAG名字
default_args=default_args,
description='my first DAG',
schedule_interval=timedelta(days=1))
task1 = BashOperator(
task_id='task_1', # TASK名
bash_command='date', # 運行命令
dag=dag)
task2 = BashOperator(
task_id='task_2',
depends_on_past=False,
bash_command='sleep 5',
dag=dag)
def print_hello():
return 'Hello world!'
test3 = PythonOperator(
task_id='task_3',
python_callable=print_hello, # 運行python程序
dag=dag)
task2.set_upstream(task1) # 設(shè)置依賴關(guān)系
test3.set_upstream(task1)
?保存之后,再瀏覽器刷新一下界面适肠,即可在list中看到該DAG霍衫,點On后,即可運行侯养。
?點開DAG可以看到各Task間的依賴關(guān)系
?以及樹型關(guān)系
(5) 調(diào)試
?有時候敦跌,怕不能一次寫對,可以運行以下命令調(diào)試單個Task
$ airflow test test_my_dag task_3 20181027
(6) 清除全部DAG重置數(shù)據(jù)庫
$ airflow resetdb
?并刪除 $AIRFLOW_HOME/dags/ 下所有DAG文件逛揩,然后重啟webserver柠傍。
?在Airflow中麸俘,如果改了一個DAG的名字,它會新建一個DAG惧笛,而不僅是改名疾掰,所以舊的DAG還在數(shù)據(jù)庫和列表中存在,可以用 “$ airflow delete_dag DAG名” 的方式刪除它徐紧,但不是每個airflow版本都支持delete_dag命令静檬。此時可以只用resetdb不刪除dags目錄下文件的方式,刪除目錄中沒有對應(yīng)文件的DAG(刪除有風(fēng)險并级,操作須謹慎)拂檩。
6. 參考
(1) Ubuntu下crontab命令的用法
https://www.cnblogs.com/daxian2012/articles/2589894.html
(2) 使用 Airflow 替代你的 crontab
https://www.juhe.cn/news/index/id/2365