(2022.11.20 Sun)
基本運行模式(pattern)是data pipeline使用Airflow的DAG的不同結(jié)構(gòu)焙蚓,基本模式有如下四種 :
- 序列Sequence
- 平行拆分Parallel split
- 同步Synchronisation
- 單選Exclusive choice
序列模式
序列模式即若干task按先后順序依次執(zhí)行糙麦,在運行代碼上 表示為task_1 >> task_2 >> ...
译隘。
dag = DAG(
dag_id='sequential_pattern',
default_args={
'start_date': utils.dates.days_ago(1),
},
schedule_interval=None,
)
with dag:
read_input = DummyOperator(task_id='read_input')
aggregate_data = DummyOperator(task_id='generate_data')
write_to_redshift = DummyOperator(task_id='write_to_redshift')
read_input >> aggregate_data >> write_to_redshift
Parallel split
parallel split
parallel split模式用于在分支的情況粗恢。比如當(dāng)數(shù)據(jù)集備好之后,需要被加載進(jìn)入多個不同的tasks片酝,且都是同一個pipeline中囚衔,如同數(shù)據(jù)進(jìn)入不同的分支。
分支在DAG中的表示為task_1 >> [task_2, task_3]
案例如
dag = DAG(
dag_id='pattern_parallel_split',
default_args={
'start_date': utils.dates.days_ago(1),
},
schedule_interval=None,
)
with dag:
read_input = DummyOperator(task_id='read_input')
aggregate_data = DummyOperator(task_id='generate_data')
convert_to_parquet = DummyOperator(task_id='convert_to_parquet')
convert_to_avro = DummyOperator(task_id='convert_to_avro')
read_input >> aggregate_data >> [convert_to_parquet, convert_to_avro]
Sychronisation
與parallel split相似雕沿,在同步模式中练湿,不同branch的結(jié)果匯聚(reconciliation)在一個task中,不同的branch執(zhí)行并行計算审轮,并將結(jié)果整合肥哎。
synchronization
DAG的代碼表達(dá)中,同步模式可拆解為在每個for loop
中執(zhí)行順序模式疾渣,即
for xx in xxx:
task_0 >> task_i >> task_2
代碼實例
dag = DAG(
dag_id='pattern_synchronization',
default_args={
'start_date': utils.dates.days_ago(1),
},
schedule_interval=None,
)
with dag:
convert_to_parquet = DummyOperator(task_id='convert_to_parquet')
for hour in range(0, 24):
read_input = DummyOperator(task_id='read_input_hour_{}'.format(hour))
aggregate_data = DummyOperator(task_id='generate_data_hour_{}'.format(hour))
read_input >> aggregate_data >> convert_to_parquet
單選
根據(jù)預(yù)先設(shè)定的條件篡诽,在分支部分選擇不同的task執(zhí)行。
exclusive choice
在Apache Airflow中稳衬,可通過BranchOpertor
對象執(zhí)行分支單選命令霞捡。BranchOperator
對象指定的方法,其返回值可用于指定對分支的選擇薄疚,而task_id
用于標(biāo)識分支的名字碧信。參考如下案例。
dag = DAG(
dag_id='pattern_exclusive_choice',
default_args={
'start_date': utils.dates.days_ago(1),
},
schedule_interval=None,
)
with dag:
def route_task():
execution_date = context['execution_date']
return 'convert_to_parquet'if execution_date.minute % 2 == 0 else 'convert_to_avro'
read_input = DummyOperator(task_id='read_input')
aggregate_data = DummyOperator(task_id='generate_data')
route_to_format = BranchPythonOperator(task_id='route_to_format', python_callable=route_task)
convert_to_parquet = DummyOperator(task_id='convert_to_parquet')
convert_to_avro = DummyOperator(task_id='convert_to_avro')
read_input >> aggregate_data >> route_to_format >> [convert_to_parquet, convert_to_avro]
Reference
1 ETL data patterns with Apache Airflow, waitingforcode, by BARTOSZ KONIECZNY