airflow 使用之Operators

Operators 簡介

Operators 允許生成特定類型的任務(wù)片吊,這些任務(wù)在實例化時成為 DAG 中的任務(wù)節(jié)點。所有的 Operator 均派生自 BaseOperator悉稠,并以這種方式繼承許多屬性和方法。
Operator 主要有三種類型:

  • 執(zhí)行一項操作或在遠程機器上執(zhí)行一項操作艘包。
  • 將數(shù)據(jù)從一個系統(tǒng)移動到另一個系統(tǒng)
  • 類似傳感器的猛,是一種特定類型 Operator,它將持續(xù)運行想虎,直到滿足某種條件卦尊。例如在 HDFS 或 S3 中等待特定文件到達,在 Hive 中出現(xiàn)特定的分區(qū)或一天中的特定時間舌厨,繼承自 BaseSensorOperator岂却。

BaseOperator 簡介

所有的 Operator 都是從 BaseOperator 派生而來,并通過繼承獲得更多功能。這也是引擎的核心躏哩,所以有必要花些時間來理解 BaseOperator 的參數(shù)署浩,以了解 Operator 基本特性。

先看一下構(gòu)造函數(shù)的原型:

class airflow.models.BaseOperator(task_id, owner='Airflow', email=None, email_on_retry=True, email_on_failure=True, retries=0, retry_delay=datetime.timedelta(0, 300), retry_exponential_backoff=False, max_retry_delay=None, start_date=None, end_date=None, schedule_interval=None, depends_on_past=False, wait_for_downstream=False, dag=None, params=None, default_args=None, adhoc=False, priority_weight=1, weight_rule=u'downstream', queue='default', pool=None, sla=None, execution_timeout=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, trigger_rule=u'all_success', resources=None, run_as_user=None, task_concurrency=None, executor_config=None, inlets=None, outlets=None, *args, **kwargs)


需要注意的是參數(shù) start_date扫尺。start_date 決定了任務(wù)第一次運行的時間筋栋,最好的實踐是設(shè)置 start_date 在 schedule_interval 的附近。比如每天跑的任務(wù)開始日期設(shè)為'2018-09-21 00:00:00'器联,每小時跑的任務(wù)設(shè)置為 '2018-09-21 05:00:00'二汛,airflow 將 start_date 加上 schedule_interval 作為執(zhí)行日期。需要注意的是任務(wù)的依賴需要及時排除拨拓,例如任務(wù) A 依賴任務(wù) B肴颊,但由于兩者 start_date 不同導致執(zhí)行日期不同,那么任務(wù) A 的依賴永遠不會被滿足渣磷。如果你需要執(zhí)行一個日常任務(wù)婿着,比如每天下午 2 點開始執(zhí)行,你可以在 DAG中使用 cron 表達式

schedule_interval="0 14 * * *"

BashOperator

官方提供的 DAG 示例-tutorial 就是一個典型的 BashOperator醋界,調(diào)用 bash 命令或腳本竟宋,傳遞模板參數(shù)就可以參考 tutorial

"""
### Tutorial Documentation
Documentation that goes along with the Airflow tutorial located
[here](http://pythonhosted.org/airflow/tutorial.html)
"""
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta


# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'adhoc':False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'trigger_rule': u'all_success'
}

dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',   #這里也可以是一個 bash 腳本文件
    bash_command='date',
    dag=dag)

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""

dag.doc_md = __doc__

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    dag=dag)

templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

這里 t1 和 t2 都很容易理解,直接調(diào)用的是 bash 命令形纺,其實也可以傳入帶路徑的 bash 腳本, t3 使用了 Jinja 模板丘侠,"{% %}" 內(nèi)部是 for 標簽,用于循環(huán)操作逐样。"{{ }}" 內(nèi)部是變量蜗字,其中 ds 是執(zhí)行日期,是 airflow 的宏變量脂新,params.my_param 是自定義變量挪捕。根據(jù)官方提供的模板,稍加修改即可滿足我們的日常工作所需争便。

PythonOperator

PythonOperator 可以調(diào)用 Python 函數(shù)级零,由于 Python 基本可以調(diào)用任何類型的任務(wù),如果實在找不到合適的 Operator滞乙,將任務(wù)轉(zhuǎn)為 Python 函數(shù)奏纪,再使用 PythonOperator 也是一種選擇。下面是官方文檔給出的 PythonOperator 使用的樣例斩启。

from __future__ import print_function
from builtins import range
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG

import time
from pprint import pprint

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='example_python_operator', default_args=args,
    schedule_interval=None)


def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print_the_context',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

# Generate 10 sleeping tasks, sleeping from 0 to 9 seconds respectively
for i in range(10):
    task = PythonOperator(
        task_id='sleep_for_' + str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
        dag=dag)

    task.set_upstream(run_this)

通過以上代碼我們可以看到亥贸,任務(wù) task 及依賴關(guān)系都是可以動態(tài)生成的,這在實際使用中會減少代碼編寫數(shù)量浇垦,邏輯也非常清晰炕置,非常方便使用。PythonOperator 與 BashOperator 基本類似,不同的是 python_callable 傳入的是 Python 函數(shù)朴摊,而后者傳入的是 bash 指令或腳本默垄。通過 op_kwargs 可以傳入任意多個參數(shù)。

HiveOperator

hive 是基于 Hadoop 的一個數(shù)據(jù)倉庫工具甚纲,可以將結(jié)構(gòu)化的數(shù)據(jù)文件映射為一張數(shù)據(jù)庫表口锭,并提供簡單的 sql 查詢功能,可以將 sql 語句轉(zhuǎn)換為 MapReduce 任
務(wù)進行運行介杆。在 airflow 中調(diào)用 hive 任務(wù)鹃操,首先需要安裝依賴

pip install apache-airflow[hive]

下面是使用示例:

t1 = HiveOperator(
    task_id='simple_query',
    hql='select * from cities',
    dag=dag)

常見的 Operator 還有 DockerOperator,OracleOperator春哨,MysqlOperator荆隘,DummyOperator,SimpleHttpOperator 等使用方法類似赴背,不再一一介紹椰拒。

如何自定義Operator

如果官方的 Operator 仍不滿足需求, 那么我們就自己開發(fā)一個 Operator。 開發(fā) Operator 比較簡單凰荚,繼承 BaseOperator 并實現(xiàn) execute 方法即可:

from airflow.models import BaseOperator

class MyOperator(BaseOperator):

    def __init__(*args, **kwargs):
        super(MyOperator, self).__init__(*args, **kwargs)
    
    def execute(self, context):
        ###do something here

除了 execute 方法外燃观,還可以實現(xiàn)以下方法:
on_kill: 在 task 被 kill 的時候執(zhí)行。

airflow 是支持Jinjia模板語言的便瑟,那么如何在自定義的 Operator 中加入Jinjia模板語言的支持呢缆毁?
其實非常簡單,只需要在自定義的Operator類中加入屬性

template_fields = (attributes_to_be_rendered_with_jinja)

即可到涂,例如官方的 bash_operator中是這樣的:

template_fields = ('bash_command', 'env')

這樣积锅,在任務(wù)執(zhí)行之前,airflow 會自動渲染 bash_command 或 env 中的屬性再執(zhí)行任務(wù)养盗。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市适篙,隨后出現(xiàn)的幾起案子往核,更是在濱河造成了極大的恐慌,老刑警劉巖嚷节,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件聂儒,死亡現(xiàn)場離奇詭異,居然都是意外死亡硫痰,警方通過查閱死者的電腦和手機衩婚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來效斑,“玉大人非春,你說我怎么就攤上這事。” “怎么了奇昙?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵护侮,是天一觀的道長。 經(jīng)常有香客問我储耐,道長羊初,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任什湘,我火速辦了婚禮长赞,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘闽撤。我一直安慰自己得哆,他們只是感情好,可當我...
    茶點故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布腹尖。 她就那樣靜靜地躺著柳恐,像睡著了一般。 火紅的嫁衣襯著肌膚如雪热幔。 梳的紋絲不亂的頭發(fā)上乐设,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天,我揣著相機與錄音绎巨,去河邊找鬼近尚。 笑死,一個胖子當著我的面吹牛场勤,可吹牛的內(nèi)容都是我干的戈锻。 我是一名探鬼主播,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼和媳,長吁一口氣:“原來是場噩夢啊……” “哼格遭!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起留瞳,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤拒迅,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后她倘,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體璧微,經(jīng)...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年硬梁,在試婚紗的時候發(fā)現(xiàn)自己被綠了前硫。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,722評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡荧止,死狀恐怖屹电,靈堂內(nèi)的尸體忽然破棺而出阶剑,到底是詐尸還是另有隱情,我是刑警寧澤嗤详,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布个扰,位于F島的核電站,受9級特大地震影響葱色,放射性物質(zhì)發(fā)生泄漏递宅。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一苍狰、第九天 我趴在偏房一處隱蔽的房頂上張望办龄。 院中可真熱鬧,春花似錦淋昭、人聲如沸俐填。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽英融。三九已至,卻和暖如春歇式,著一層夾襖步出監(jiān)牢的瞬間驶悟,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工材失, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留痕鳍,地道東北人。 一個月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓龙巨,卻偏偏與公主長得像笼呆,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子旨别,可洞房花燭夜當晚...
    茶點故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內(nèi)容

  • 本文將介紹 Airflow 這一款優(yōu)秀的調(diào)度工具诗赌。主要包括 Airflow 的服務(wù)構(gòu)成、Airflow 的 Web...
    a7f00a9019ae閱讀 62,020評論 6 42
  • Airflow 是 Airbnb 公司開源的任務(wù)調(diào)度系統(tǒng), 通過使用 Python 開發(fā) DAG, 非常方便的調(diào)度...
    haitaoyao閱讀 9,775評論 2 7
  • 1. 聽說鳳凰古城是“艷遇”頻率高發(fā)的一個地方秸弛,作為大齡資深單身狗一枚铭若,朵朵對它傾注了各種憧憬和向往,就想著某天胆屿,...
    花子魚閱讀 768評論 1 0
  • 六、無心插柳柳成蔭 早晨的陽光直射到窗臺上偶宫,窗外的小麻雀嘰嘰喳喳叫了起來非迹。夏滿荷像往常一樣長長地伸了一個懶腰后,揮...
    蝸牛也是牛6267閱讀 361評論 0 3
  • 那是一條吵鬧的大街纯趋,熙熙攘攘的人群一直川流不息憎兽。有一個胖乎乎的小男孩冷离,一邊走一邊吃著橘子〈棵“小胖西剥,把核都吐手里拿著...
    穗拾閱讀 714評論 1 8