Airflow 是 Airbnb 公司開源的任務(wù)調(diào)度系統(tǒng), 通過使用 Python 開發(fā) DAG, 非常方便的調(diào)度計算任務(wù). 介紹一下在 Airflow 提供的 Operator 不滿足需求的場景下, 如何自己開發(fā) Operator.
0x00 DAG 的最基本執(zhí)行單元: Operator
在 Airflow 的一個 DAG 中, 最基本的執(zhí)行單元是 Operator
. 例如如下示例 DAG 中, 使用的都是 BashOperator
, 執(zhí)行一個 bash 腳本.
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('tutorial', default_args=default_args)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
Airflow 實現(xiàn)了很多 Operator(參見 Airflow 源代碼), 涵蓋了常用的功能, 例如執(zhí)行 Hive 查詢, 執(zhí)行 bash 腳本等. 有幾種特殊的 Operator:
-
XXXSensor
用作其他外界條件的 sensor, 實現(xiàn)也很簡單, 在 Operator 的execute
方法中進行 long poll, 直到poke
方法返回 True 則完成執(zhí)行.
# BaseSensorOperator 部分源碼
def poke(self, context):
'''
Function that the sensors defined while deriving this class should
override.
'''
raise AirflowException('Override me.')
def execute(self, context):
started_at = datetime.now()
while not self.poke(context):
sleep(self.poke_interval)
if (datetime.now() - started_at).seconds > self.timeout:
raise AirflowSensorTimeout('Snap. Time is OUT.')
logging.info("Success criteria met. Exiting.")
-
PythonOperator
用來執(zhí)行 Python 函數(shù), 這也是使用 Python 代碼來定義 DAG 的好處 -
BranchPythonOperator
用來支持分支, 通過函數(shù)返回要執(zhí)行的分支
Airflow Operator 相關(guān) class 繼承關(guān)系如下:
.
└── BaseOperator
├── BaseSensorOperator
│ └── ...Sensor
├── PythonOperator
│ ├── BranchPythonOperator
│ └── ShortCircuitOperator
└── ...Operator
0x01 Operator 開發(fā)
如果官方的 Operator 都不滿足需求, 那么我們就要來開發(fā)一個 Operator. 開發(fā) Operator 也很簡單, 直接繼承 BaseOperator
并實現(xiàn) execute
方法即可.
from airflow.models import BaseOperator
class DemoOperator(BaseOperator):
def __init__(*args, **kwargs):
super(DemoOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print "hello"
除了 execute
方法必須實現(xiàn)外, 還有一個 hook 方法:
-
pre_execute
: 在execute
方法前調(diào)用, 實現(xiàn)點兒準(zhǔn)備邏輯 -
post_execute
: 在execute
方法完成后調(diào)用, cleanup 一下 -
on_kill
: 在 task 被 kill 的時候執(zhí)行.
Operator 獲取模板變量
Aiflow 是支持 Templating with Jinja 功能的, 具體來說就是 Operator 中支持模板語言 Jinja, 寫一些 for 循環(huán), 或者通過 {{param}}
語法替換一些變量等(例如 {{ds}}
被替換成執(zhí)行任務(wù)的日期)
# 官方示例的 jinja 語句
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7) }}"
echo "{{ params.my_param }}"
{% endfor %}
那么, 自己開發(fā)的 Operator 中如何使用這個功能呢?
其實也很簡單, 在自己的 Operator
中添加屬性 template_fields = (attributes_to_be_rendered_with_jinja)
. 在任務(wù)被執(zhí)行前, Airflow 會自動渲染 template_fields
中的屬性再執(zhí)行任務(wù).
# 節(jié)選自 Airflow 中 BashOperator 源碼
class BashOperator(BaseOperator):
# 這里定義需要被渲染的屬性名稱
template_fields = ('bash_command', 'env')
Operator 部署
開發(fā)的 Operator
代碼作為一個 Python 的 Package, 使用 distutil 打包安裝到 Airflow 對應(yīng)的服務(wù)器上即可.
0x02 Operator 跟其他系統(tǒng)交互
Airflow 考慮到為了跟外界環(huán)境隔離, 提出了 Connection
概念: 將配置信息放到 Airflow Server 本身配置, 在 DAG 中使用 connection_id
來引用. 例如, 我們配置一個 HiveServer 的 Connection
, 使用 liulishuo_hiveserver1
作為 connection_id
, 這樣同一個 DAG 文件就可以在測試環(huán)境和生成環(huán)境調(diào)用對應(yīng)環(huán)境中的 HiveServer 服務(wù)了. 總結(jié)來說, 這就是架構(gòu)設(shè)計模式中的 External Configuration Store Pattern 的標(biāo)準(zhǔn)實現(xiàn).
那么如果自己開發(fā)的 Operator
如何調(diào)用這些 Connection
呢? 這里 Airflow 又引入了一個 Hook 的概念:
Hooks are interfaces to external platforms and databases like Hive, S3, MySQL, Postgres, HDFS, and Pig. Hooks implement a common interface when possible, and act as a building block for operators. They also use the airflow.models.Connection model to retrieve hostnames and authentication information. Hooks keep authentication code and information out of pipelines, centralized in the metadata database.
我個人覺得這個概念的引入有點兒臃腫了, 沒有任何意義. 開發(fā)者在 Airflow Web 中看到的也是 Connection
的配置, 那么開發(fā) DAG 時第一個想到的就是去找 Connection
相關(guān)的 class, 再多一個 Hook
的概念有點兒繞.
那么 Operator
中想使用對應(yīng)的 Connection
, 直接根據(jù) connection_id
創(chuàng)建對應(yīng)的 Hook
就好了(講真, 真繞), 例如, 想使用 HiveServer2
的 Connection
, 創(chuàng)建一個 HiveServer2Hook
即可.
# Operator 調(diào)用 Connection 示例代碼
class LiulishuoDemoOperator(BaseOperator):
def __init__(self, hive_server2_connection_id, *args, **kwargs):
super(LiulishuoDemoOperator, self).__init__(*args, **kwargs)
self.hive_server2_connection_id = hive_server2_connection_id
def execute(self, context):
hive_server2 = HiveServer2Hook(self.hive_server2_connection_id)
hive_serve2.get_records('SELECT * FROM testdb.table1 LIMIT 20')
# ....
HiveServer2Hook
設(shè)計有還有一個貼心之處就是, 在創(chuàng)建 HiveServer2Hook
時根本不涉及真正連接 HiveServer2 的邏輯, 只有真正調(diào)用其get_records
等方法時才會真正去連接 HiveServer2, 這樣就給單元測試 mock 帶來很大的方便, 畢竟在 CI 環(huán)境中構(gòu)建一個隔離的專門用于跑自己的 test-case 的 HiveServer2 也不是那么容易的.
def test_operator_with_mock(self):
with mock.patch.object(HiveServer2Hook, 'get_records') as mock_get_records:
# 這里設(shè)置 mock 的返回值
mock_get_records.return_value = [['Location: ', 's3://test-bucket/valid_table']]
hive_server_id = 'test-hive-server'
# 這里測試對應(yīng)的 Operator 代碼
0x03 總結(jié)
Airflow 通過精簡的抽象, 將 DAG 開發(fā)簡化到了會寫 Python 基本就沒問題的程度, 還是值得點贊的. 自己開發(fā)一個 Operator
也是很簡單, 不過自己開發(fā) Operator
也僅僅是技術(shù)選型的其中一個方案而已, 復(fù)雜邏輯也可以通過暴露一個 Restful API 的形式, 使用 Airflow 提供的 SimpleHttpOperator
調(diào)用對應(yīng)的服務(wù)執(zhí)行任務(wù).