本機Airflow 安裝
# 1. 配置airflow環(huán)境變量
echo '\nexport AIRFLOW_HOME=~/opt/airflow' > ~/.bashrc
source ~/.bashrc
# 2. 安裝airflow
pip install apache-airflow
airflow initdb
# 3. 啟動airflow
airflow webserver -D
airflow scheduler -D
# 4. 創(chuàng)建dags目錄
mkdir ~/opt/airflow/dags
Pycharm 配置
設(shè)置Interpreter
python env 選擇安裝airflow的python解愤, 這樣pycharm的環(huán)境才會包含airflow的依賴辆床。
image-20201110140343519.png
image-20201110140410115.png
新建Dag
pycharm打開 ${AIRFLOW_HOME} 目錄,在dags目錄下新建hello dag
image-20201110141137000.png
# hello.py
# -*- coding: utf-8 -*-
from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'airflow',
}
dag = DAG(
dag_id='hello',
default_args=args,
schedule_interval=None,
start_date=days_ago(1),
dagrun_timeout=timedelta(minutes=60)
)
def print_hello():
print('Hello airflow!!')
return 'Hello airflow!!'
dummy_operator = DummyOperator(task_id='start', retries=3, dag=dag)
hello_operator = PythonOperator(task_id='hello', python_callable=print_hello, dag=dag)
dummy_operator >> hello_operator
Airflow test
# airflow test
# usage: airflow test [-h] [-sd SUBDIR] [-dr] [-tp TASK_PARAMS] [-pm] dag_id task_id execution_date
airflow test hello hello 2020-10-10
image-20201110141334555.png
Pycharm run
image-20201110142744922.png
然后run hello.py
image-20201110142905241.png