【調(diào)研】Airflow使用

相關(guān)文檔:

官方文檔

github地址

Airflow管理和調(diào)度各種離線定時(shí) Job 厉碟,可以替代 crontab兄纺。

一個(gè)自學(xué)習(xí)向族、批量預(yù)估的demo

1. 安裝及初始化

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

# install from pypi using pip
pip install apache-airflow

# initialize the database
airflow initdb

# start the web server, default port is 8080
airflow webserver -p 8080
# nohup airflow webserver -p 8080 > ~/airflow/active.log 2>&1 &

# start the scheduler
airflow scheduler

# 刪除dag(需要先去~/airflow/dags中刪除py文件)
airflow delete_dag -y {dag_id}

安裝路徑: /root/anaconda3/envs/he_test/lib/python3.7/site-packages/

# 守護(hù)進(jìn)程運(yùn)行webserver
airflow webserver -D -p 8080
# 守護(hù)進(jìn)程運(yùn)行調(diào)度器
airflow scheduler -D

當(dāng)在~/airflow/dags文件下添加py文件后侦讨,需要等待一會(huì)吟策,才會(huì)在web中顯示
重新運(yùn)行airflow scheduler可以立刻在web中顯示py文件

2. XCOM

可以使用xcom在不同的operator間傳遞變量(好像也可以在不同的dag之間傳遞參數(shù)琅轧,需要試一下)
xcom_pull(self, task_ids, dag_id, key, include_prior_dates)

def processing_data(**kwargs):
    kwargs['ti'].xcom_push(key='X', value=X)
    kwargs['ti'].xcom_push(key='str_with_trx_with_retail_with_corporate_with_account', value=str_with_trx_with_retail_with_corporate_with_account)


processing_data_operator = PythonOperator(
    task_id='processing_data_operator',
    provide_context=True,
    python_callable=processing_data,
    dag=dag,
)


def predict(**kwargs):
    ti = kwargs['ti']
    X = ti.xcom_pull(key='X', task_ids='processing_data_operator')
    
predict_operator = PythonOperator(
    task_id='predict_operator',
    provide_context=True,
    python_callable=predict,
    dag=dag,
)

3. 幾個(gè)概念

https://www.cnblogs.com/piperck/p/10101423.html

  • DAG
  • DAG 意為有向無循環(huán)圖,在 Airflow 中則定義了整個(gè)完整的作業(yè)踊挠。同一個(gè) DAG 中的所有 Task 擁有相同的調(diào)度時(shí)間乍桂。
  • Task
  • Task 為 DAG 中具體的作業(yè)任務(wù)冲杀,它必須存在于某一個(gè) DAG 之中。Task 在 DAG 中配置依賴關(guān)系睹酌,跨 DAG 的依賴是可行的权谁,但是并不推薦”镅兀跨 DAG 依賴會(huì)導(dǎo)致 DAG 圖的直觀性降低旺芽,并給依賴管理帶來麻煩。
  • DAG Run
  • 當(dāng)一個(gè) DAG 滿足它的調(diào)度時(shí)間辐啄,或者被外部觸發(fā)時(shí)采章,就會(huì)產(chǎn)生一個(gè) DAG Run『迹可以理解為由 DAG 實(shí)例化的實(shí)例悯舟。
  • Task Instance
  • 當(dāng)一個(gè) Task 被調(diào)度啟動(dòng)時(shí),就會(huì)產(chǎn)生一個(gè) Task Instance砸民〉衷酰可以理解為由 Task 實(shí)例化的實(shí)例。

4. 常用命令

airflow test dag_id task_id execution_date   測試task
示例: airflow test example_hello_world_dag hello_task 20180516

airflow run dag_id task_id execution_date 運(yùn)行task

airflow run -A dag_id task_id execution_date 忽略依賴task運(yùn)行task

airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE  運(yùn)行整個(gè)dag文件

airflow webserver -D  守護(hù)進(jìn)程運(yùn)行webserver

airflow scheduler -D  守護(hù)進(jìn)程運(yùn)行調(diào)度

airflow worker -D 守護(hù)進(jìn)程運(yùn)行celery worker

airflow worker -c 1 -D 守護(hù)進(jìn)程運(yùn)行celery worker并指定任務(wù)并發(fā)數(shù)為1

airflow pause dag_id  暫停任務(wù)

airflow unpause dag_id 取消暫停岭参,等同于在管理界面打開off按鈕

airflow list_tasks dag_id 查看task列表

airflow clear dag_id 清空任務(wù)實(shí)例

5. 運(yùn)行一個(gè)dag的流程

  • 在~/airflow/dags文件下添加py文件反惕,(需要等待一會(huì),才會(huì)在web中顯示演侯,如果未開啟webserver姿染,也是可以運(yùn)行的)
  • airflow unpause dag_id(取消暫停任務(wù),任務(wù)會(huì)按照設(shè)定時(shí)間周期執(zhí)行)
  • airflow trigger_dag dag_id(立刻運(yùn)行整個(gè)dag)

6. 重啟一個(gè)dag的流程

rm -rf ~/airflow/dags/aml_sl_with_config.py
airflow delete_dag -y aml_sl_with_config
ps -ef |grep "airflow scheduler" |awk '{print $2}'|xargs kill -9
vi ~/airflow/dags/aml_sl_with_config.py
nohup airflow scheduler &

7. json配置文件格式

{
  "hdfs_url": "localhost:50070",
  "hdfs_user": "hdfs",
  "daily_dir_list": [
    "trx",
    "str"
  ],
  "static_dir_list": [
    "retail",
    "corporate",
    "account"
  ],
  "base_local_path": "/root/airflow/aml_data/sl_data/{}/",
  "base_local_metrics_path": "/root/airflow/aml_data/sl_data/{}/for_metrics/",
  "base_local_model_path": "/root/airflow/aml_data/model/{}",
  "base_local_predict_res_path": "/root/airflow/aml_data/bp_data/res/{}",
  "model_prefix": "he_test_xgboost",
  "predict_res_prefix": "pred_full_table",
  "base_remote_daily_path": "/anti-money/daily_data_group/{}/daily/{}",
  "base_remote_static_path": "/anti-money/daily_data_group/{}/all",
  "base_remote_model_path": "/anti-money/he_test/model/{}",
  "base_remote_predict_res_path": "/anti-money/he_test/predict_res/{}",
  "specified_model_path":"",
  "start_time": "2018-05-01",
  "end_time": "2018-05-27",
  "metrics_start_time": "2018-05-28",
  "metrics_end_time": "2018-05-30"
}

8. 配置參數(shù)方式

  • Menu -> Admin -> Variables


9. 一個(gè)可配置參數(shù)的自學(xué)習(xí)demo

# -*- coding: utf-8 -*-

from __future__ import print_function

import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable

from hdfs import *
import datetime
import pandas as pd
from sklearn.preprocessing import LabelEncoder
import xgboost as xgb
import os
import json
import shutil
from sklearn import metrics

args = {
    'owner': 'aml',
    'start_date': airflow.utils.dates.days_ago(0, hour=0),
    'retries': 3,
    'retry_delay': datetime.timedelta(minutes=2),
    'email': ['maotao@4paradigm.com', 'maopengyu@4paradigm.com'],
    'email_on_failure': True,
    'email_on_retry': True,
}

dag = DAG(
    dag_id='aml_sl_with_config',
    catchup=False,
    default_args=args,
    schedule_interval='0 * * * *',
    dagrun_timeout=datetime.timedelta(minutes=60),
)


def clear_local_path(local_path):
    if os.path.exists(local_path):
        if os.path.isfile(local_path):
            os.remove(local_path)
        else:
            # 刪除目錄及其下子文件
            shutil.rmtree(local_path)
            os.mkdir(local_path)
    else:
        os.mkdir(local_path)


def get_config_from_variables(**kwargs):
    # 獲取配置文件中的參數(shù)
    sl_config = Variable.get('sl_config', deserialize_json=True, default_var={"hdfs_url":"localhost:50070","hdfs_user":"hdfs","daily_dir_list":["trx","str"],"static_dir_list":["retail","corporate","account"],"base_local_path":"/root/airflow/aml_data/sl_data/{}/","base_local_metrics_path":"/root/airflow/aml_data/sl_data/{}/for_metrics/","base_local_model_path":"/root/airflow/aml_data/model/{}","model_prefix":"he_test_xgboost","base_remote_daily_path":"/anti-money/daily_data_group/{}/daily/{}","base_remote_static_path":"/anti-money/daily_data_group/{}/all","base_remote_model_path":"/anti-money/he_test/model/{}","start_time":"2018-05-01","end_time":"2018-05-27","metrics_start_time":"2018-05-28","metrics_end_time":"2018-05-30"})
    print('config: {}'.format(sl_config))
    hdfs_url = sl_config['hdfs_url']
    hdfs_user = sl_config['hdfs_user']
    daily_dir_list = sl_config['daily_dir_list']
    static_dir_list = sl_config['static_dir_list']
    base_local_path = sl_config['base_local_path']
    base_remote_daily_path = sl_config['base_remote_daily_path']
    base_remote_static_path = sl_config['base_remote_static_path']
    start_time = sl_config['start_time']
    end_time = sl_config['end_time']
    base_local_model_path = sl_config['base_local_model_path']
    model_prefix = sl_config['model_prefix']
    base_remote_model_path = sl_config['base_remote_model_path']
    metrics_start_time = sl_config['metrics_start_time']
    metrics_end_time = sl_config['metrics_end_time']
    base_local_metrics_path = sl_config['base_local_metrics_path']

    kwargs['ti'].xcom_push(key='hdfs_url', value=hdfs_url)
    kwargs['ti'].xcom_push(key='hdfs_user', value=hdfs_user)
    kwargs['ti'].xcom_push(key='static_dir_list', value=static_dir_list)
    kwargs['ti'].xcom_push(key='daily_dir_list', value=daily_dir_list)
    kwargs['ti'].xcom_push(key='base_local_path', value=base_local_path)

    kwargs['ti'].xcom_push(key='base_remote_daily_path', value=base_remote_daily_path)
    kwargs['ti'].xcom_push(key='base_remote_static_path', value=base_remote_static_path)
    kwargs['ti'].xcom_push(key='start_time', value=start_time)
    kwargs['ti'].xcom_push(key='end_time', value=end_time)
    kwargs['ti'].xcom_push(key='metrics_start_time', value=metrics_start_time)
    kwargs['ti'].xcom_push(key='metrics_end_time', value=metrics_end_time)
    kwargs['ti'].xcom_push(key='base_local_metrics_path', value=base_local_metrics_path)

    kwargs['ti'].xcom_push(key='base_local_model_path', value=base_local_model_path)
    kwargs['ti'].xcom_push(key='model_prefix', value=model_prefix)
    kwargs['ti'].xcom_push(key='base_remote_model_path', value=base_remote_model_path)


get_config_operator = PythonOperator(
    task_id='get_config_operator',
    provide_context=True,
    python_callable=get_config_from_variables,
    dag=dag,
)


def get_data_from_hdfs(**kwargs):
    start_time = kwargs['ti'].xcom_pull(key='start_time', task_ids='get_config_operator')
    end_time = kwargs['ti'].xcom_pull(key='end_time', task_ids='get_config_operator')
    base_local_path = kwargs['ti'].xcom_pull(key='base_local_path', task_ids='get_config_operator')
    download_data_from_hdfs(kwargs, start_time, end_time, base_local_path)


def download_data_from_hdfs(kwargs, start_time, end_time, base_local_path):
    hdfs_url = kwargs['ti'].xcom_pull(key='hdfs_url', task_ids='get_config_operator')
    hdfs_user = kwargs['ti'].xcom_pull(key='hdfs_user', task_ids='get_config_operator')
    daily_dir_list = kwargs['ti'].xcom_pull(key='daily_dir_list', task_ids='get_config_operator')
    static_dir_list = kwargs['ti'].xcom_pull(key='static_dir_list', task_ids='get_config_operator')
    base_remote_daily_path = kwargs['ti'].xcom_pull(key='base_remote_daily_path', task_ids='get_config_operator')
    base_remote_static_path = kwargs['ti'].xcom_pull(key='base_remote_static_path', task_ids='get_config_operator')
    hdfs_client = InsecureClient(url=hdfs_url, user=hdfs_user)
    start_date = datetime.datetime.strptime(start_time, '%Y-%m-%d')
    end_date = datetime.datetime.strptime(end_time, '%Y-%m-%d')
    date_list = []
    for i in range((end_date - start_date).days + 1):
        range_date = start_date + datetime.timedelta(days=i)
        date_list.append(range_date.strftime('%Y-%m-%d'))

    kwargs['ti'].xcom_push(key='date_list', value=date_list)

    # 下載起始時(shí)間范圍內(nèi)的文件
    for sl_dir in daily_dir_list:
        clear_local_path(base_local_path.format(sl_dir))
        for date in date_list:
            try:
                print("downloading {} from {}".format(base_local_path.format(sl_dir), base_remote_daily_path.format(sl_dir, date)))
                hdfs_client.download(base_remote_daily_path.format(sl_dir, date), base_local_path.format(sl_dir))
            except HdfsError:
                # 這個(gè)hdfs庫無法判斷文件是否存在秒际,只能采用這種粗暴的方式
                print(base_remote_daily_path.format(sl_dir, date) + "悬赏,hdfs文件不存在")
    for sl_dir in static_dir_list:
        clear_local_path(base_local_path.format(sl_dir))
        print("downloading {} from {}".format(base_local_path.format(sl_dir), base_remote_static_path.format(sl_dir)))
        hdfs_client.download(base_remote_static_path.format(sl_dir), base_local_path.format(sl_dir))

    return date_list, hdfs_client


get_data_operator = PythonOperator(
    task_id='get_data_operator',
    provide_context=True,
    python_callable=get_data_from_hdfs,
    dag=dag,
)


def process_data(**kwargs):
    date_list = kwargs['ti'].xcom_pull(key='date_list', task_ids='get_data_operator')
    base_local_path = kwargs['ti'].xcom_pull(key='base_local_path', task_ids='get_config_operator')
    X, y = process_data_from_local(kwargs, date_list, base_local_path)
    kwargs['ti'].xcom_push(key='X', value=X)
    kwargs['ti'].xcom_push(key='y', value=y)


def process_data_from_local(kwargs, date_list, base_local_path):
    daily_dir_list = kwargs['ti'].xcom_pull(key='daily_dir_list', task_ids='get_config_operator')
    static_dir_list = kwargs['ti'].xcom_pull(key='static_dir_list', task_ids='get_config_operator')
    # 讀取每日更新的表
    daily_dir_dict = {}
    for sl_dir in daily_dir_list:
        # 初始化
        daily_dir_dict[sl_dir] = pd.DataFrame()
        for date in date_list:
            if os.path.exists(base_local_path.format(sl_dir) + date):
                print("read daily table from path: {}".format(base_local_path.format(sl_dir) + date))
                daily_dir_dict[sl_dir] = daily_dir_dict[sl_dir].append(pd.read_parquet(base_local_path.format(sl_dir) + date))
    # 讀取靜態(tài)表
    static_dir_dict = {}
    for sl_dir in static_dir_list:
        # 初始化
        print("read static table from path: {}".format(base_local_path.format(sl_dir) + 'all'))
        static_dir_dict[sl_dir] = pd.read_parquet(base_local_path.format(sl_dir) + 'all', engine='auto')
    # 拼表
    # 這里還是寫死了,討厭
    str_with_trx = pd.merge(daily_dir_dict['str'], daily_dir_dict['trx'], how='left', on=['trx_id'])
    print("merged str_with_trx")
    str_with_trx_with_retail = pd.merge(str_with_trx, static_dir_dict['retail'], how='left', on=['cust_id'])
    print("merged str_with_trx_with_retail")
    str_with_trx_with_retail_with_corporate = pd.merge(str_with_trx_with_retail, static_dir_dict['corporate'], how='left', on=['cust_id'])
    print("merged str_with_trx_with_retail_with_corporate")
    str_with_trx_with_retail_with_corporate_with_account = pd.merge(str_with_trx_with_retail_with_corporate, static_dir_dict['account'], how='left', on=['account_id'])
    print("merged str_with_trx_with_retail_with_corporate_with_account")
    # 對(duì)類別特征做Label Encoder
    le = LabelEncoder()
    # labeled_data = str_with_trx_with_retail_with_corporate_with_account.copy()
    labeled_data = str_with_trx_with_retail_with_corporate_with_account.astype(str).apply(le.fit_transform)
    X = labeled_data.drop(['label', 'trx_id', 'ins_id', 'cust_id', 'account_id', 'target_cust_id', 'target_account_id'], axis=1)
    y = labeled_data['label']
    return X, y


process_data_operator = PythonOperator(
    task_id='process_data_operator',
    provide_context=True,
    python_callable=process_data,
    dag=dag,
)


def train_model(**kwargs):
    hdfs_url = kwargs['ti'].xcom_pull(key='hdfs_url', task_ids='get_config_operator')
    hdfs_user = kwargs['ti'].xcom_pull(key='hdfs_user', task_ids='get_config_operator')
    X = kwargs['ti'].xcom_pull(key='X', task_ids='process_data_operator')
    y = kwargs['ti'].xcom_pull(key='y', task_ids='process_data_operator')
    base_local_model_path = kwargs['ti'].xcom_pull(key='base_local_model_path', task_ids='get_config_operator')
    model_prefix = kwargs['ti'].xcom_pull(key='model_prefix', task_ids='get_config_operator')
    base_remote_model_path = kwargs['ti'].xcom_pull(key='base_remote_model_path', task_ids='get_config_operator')

    local_model_path = base_local_model_path.format('{}.model'.format(model_prefix))

    # 模型訓(xùn)練
    dtrain = xgb.DMatrix(X, label=y)
    watchlist = [(dtrain, 'train')]

    params = {'booster': 'gbtree',
              'objective': 'binary:logistic',
              'eval_metric': 'auc',
              'max_depth': 4,
              'lambda': 10,
              'subsample': 0.75,
              'colsample_bytree': 0.75,
              'min_child_weight': 2,
              'eta': 0.025,
              'seed': 0,
              'nthread': 8,
              'silent': 1}
    bst = xgb.train(params, dtrain, num_boost_round=1000, evals=watchlist)
    clear_local_path(local_model_path)
    bst.save_model(local_model_path)
    model_id = '{}_{}'.format(model_prefix, datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S'))
    kwargs['ti'].xcom_push(key='model_id', value=model_id)
    hdfs_client = InsecureClient(url=hdfs_url, user=hdfs_user)
    hdfs_client.upload(base_remote_model_path.format(model_id)+'.model', local_model_path)
    print('uploaded local model {} to {} successfully'.format(local_model_path, base_remote_model_path.format(model_id)+'.model'))


train_model_operator = PythonOperator(
    task_id='train_model_operator',
    provide_context=True,
    python_callable=train_model,
    dag=dag,
)


def get_metrics(**kwargs):
    metrics_start_time = kwargs['ti'].xcom_pull(key='metrics_start_time', task_ids='get_config_operator')
    metrics_end_time = kwargs['ti'].xcom_pull(key='metrics_end_time', task_ids='get_config_operator')
    base_local_metrics_path = kwargs['ti'].xcom_pull(key='base_local_metrics_path', task_ids='get_config_operator')
    # 下載數(shù)據(jù)
    date_list_for_metrics, hdfs_client = download_data_from_hdfs(kwargs, metrics_start_time, metrics_end_time, base_local_metrics_path)
    # 數(shù)據(jù)處理
    X, y = process_data_from_local(kwargs, date_list_for_metrics, base_local_metrics_path)

    dtrain = xgb.DMatrix(X)
    modle = xgb.Booster(model_file='/root/airflow/aml_data/model/he_test_xgboost.model')
    y_pred = modle.predict(dtrain)
    y_pred_binary = (y_pred >= 0.5) * 1

    auc = metrics.roc_auc_score(y, y_pred)
    acc = metrics.accuracy_score(y, y_pred_binary)
    recall = metrics.recall_score(y, y_pred_binary)
    F1_score = metrics.f1_score(y, y_pred_binary)
    Precesion = metrics.precision_score(y, y_pred_binary)

    print('AUC: %.4f' % auc)
    print('ACC: %.4f' % acc)
    print('Recall: %.4f' % recall)
    print('F1-score: %.4f' % F1_score)
    print('Precesion: %.4f' % Precesion)
    metrics.confusion_matrix(y, y_pred_binary)

    ti = kwargs['ti']
    model_id = ti.xcom_pull(key='model_id', task_ids='train_model_operator')

    res_dict = {}
    res_dict['model_id'] = model_id
    res_dict['auc'] = auc
    res_dict['acc'] = acc
    res_dict['recall'] = recall
    res_dict['f1_score'] = F1_score
    res_dict['precesion'] = Precesion
    res_json = json.dumps(res_dict)

    base_local_model_path = kwargs['ti'].xcom_pull(key='base_local_model_path', task_ids='get_config_operator')
    model_prefix = kwargs['ti'].xcom_pull(key='model_prefix', task_ids='get_config_operator')
    base_remote_model_path = kwargs['ti'].xcom_pull(key='base_remote_model_path', task_ids='get_config_operator')
    local_model_meta_path = base_local_model_path.format('{}.meta.json'.format(model_prefix))
    clear_local_path(local_model_meta_path)
    with open(local_model_meta_path, "w") as f:
        f.write(res_json)

    hdfs_client.upload(base_remote_model_path.format(model_id)+'.meta.json', local_model_meta_path)
    print('uploaded local meta {} to {} successfully'.format(local_model_meta_path, base_remote_model_path.format(model_id)+'.meta.json'))


get_metrics_operator = PythonOperator(
    task_id='get_metrics_operator',
    provide_context=True,
    python_callable=get_metrics,
    dag=dag,
)


get_data_operator.set_upstream(get_config_operator)
process_data_operator.set_upstream(get_data_operator)
train_model_operator.set_upstream(process_data_operator)
get_metrics_operator.set_upstream(train_model_operator)

10. sensor

就是根據(jù)poke_interval這個(gè)時(shí)間間隔去輪詢程癌。不同的sensor(eg. hdfs_sensor, http_sensor)去實(shí)現(xiàn)不同的poke方法


base_sensor源碼

看來自帶的hdfs_sensor不支持python3

*** Reading local file: /root/airflow/logs/aml_sensor/hdfs_sensor/2019-06-03T13:00:00+00:00/1.log
[2019-06-03 22:38:03,089] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: aml_sensor.hdfs_sensor 2019-06-03T13:00:00+00:00 [queued]>
[2019-06-03 22:38:03,096] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: aml_sensor.hdfs_sensor 2019-06-03T13:00:00+00:00 [queued]>
[2019-06-03 22:38:03,096] {__init__.py:1353} INFO - 
--------------------------------------------------------------------------------
[2019-06-03 22:38:03,096] {__init__.py:1354} INFO - Starting attempt 1 of 2
[2019-06-03 22:38:03,096] {__init__.py:1355} INFO - 
--------------------------------------------------------------------------------
[2019-06-03 22:38:03,105] {__init__.py:1374} INFO - Executing <Task(HdfsSensor): hdfs_sensor> on 2019-06-03T13:00:00+00:00
[2019-06-03 22:38:03,106] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'aml_sensor', 'hdfs_sensor', '2019-06-03T13:00:00+00:00', '--job_id', '484', '--raw', '-sd', 'DAGS_FOLDER/aml_senor.py', '--cfg_path', '/tmp/tmp3rq2m6ky']
[2019-06-03 22:38:03,995] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor [2019-06-03 22:38:03,994] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-06-03 22:38:04,243] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor [2019-06-03 22:38:04,242] {__init__.py:305} INFO - Filling up the DagBag from /root/airflow/dags/aml_senor.py
[2019-06-03 22:38:04,350] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor [2019-06-03 22:38:04,349] {cli.py:517} INFO - Running <TaskInstance: aml_sensor.hdfs_sensor 2019-06-03T13:00:00+00:00 [running]> on host m7-notebook-gpu01
[2019-06-03 22:38:04,442] {__init__.py:1580} ERROR - This HDFSHook implementation requires snakebite, but snakebite is not compatible with Python 3 (as of August 2015). Please use Python 2 if you require this hook  -- or help by submitting a PR!
Traceback (most recent call last):
  File "/root/anaconda3/envs/he_test/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/root/anaconda3/envs/he_test/lib/python3.7/site-packages/airflow/sensors/base_sensor_operator.py", line 108, in execute
    while not self.poke(context):
  File "/root/anaconda3/envs/he_test/lib/python3.7/site-packages/airflow/sensors/hdfs_sensor.py", line 105, in poke
    sb = self.hook(self.hdfs_conn_id).get_conn()
  File "/root/anaconda3/envs/he_test/lib/python3.7/site-packages/airflow/hooks/hdfs_hook.py", line 49, in __init__
    'This HDFSHook implementation requires snakebite, but '
ImportError: This HDFSHook implementation requires snakebite, but snakebite is not compatible with Python 3 (as of August 2015). Please use Python 2 if you require this hook  -- or help by submitting a PR!
[2019-06-03 22:38:04,496] {__init__.py:1603} INFO - Marking task as UP_FOR_RETRY
[2019-06-03 22:38:04,512] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor Traceback (most recent call last):
[2019-06-03 22:38:04,512] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor   File "/root/anaconda3/envs/he_test/bin/airflow", line 32, in <module>
[2019-06-03 22:38:04,512] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor     args.func(args)
[2019-06-03 22:38:04,512] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor   File "/root/anaconda3/envs/he_test/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-06-03 22:38:04,512] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor     return f(*args, **kwargs)
[2019-06-03 22:38:04,512] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor   File "/root/anaconda3/envs/he_test/lib/python3.7/site-packages/airflow/bin/cli.py", line 523, in run
[2019-06-03 22:38:04,512] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor     _run(args, dag, ti)
[2019-06-03 22:38:04,512] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor   File "/root/anaconda3/envs/he_test/lib/python3.7/site-packages/airflow/bin/cli.py", line 442, in _run
[2019-06-03 22:38:04,513] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor     pool=args.pool,
[2019-06-03 22:38:04,513] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor   File "/root/anaconda3/envs/he_test/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
[2019-06-03 22:38:04,513] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor     return func(*args, **kwargs)
[2019-06-03 22:38:04,513] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor   File "/root/anaconda3/envs/he_test/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
[2019-06-03 22:38:04,513] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor     result = task_copy.execute(context=context)
[2019-06-03 22:38:04,513] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor   File "/root/anaconda3/envs/he_test/lib/python3.7/site-packages/airflow/sensors/base_sensor_operator.py", line 108, in execute
[2019-06-03 22:38:04,513] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor     while not self.poke(context):
[2019-06-03 22:38:04,513] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor   File "/root/anaconda3/envs/he_test/lib/python3.7/site-packages/airflow/sensors/hdfs_sensor.py", line 105, in poke
[2019-06-03 22:38:04,513] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor     sb = self.hook(self.hdfs_conn_id).get_conn()
[2019-06-03 22:38:04,513] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor   File "/root/anaconda3/envs/he_test/lib/python3.7/site-packages/airflow/hooks/hdfs_hook.py", line 49, in __init__
[2019-06-03 22:38:04,513] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor     'This HDFSHook implementation requires snakebite, but '
[2019-06-03 22:38:04,514] {base_task_runner.py:101} INFO - Job 484: Subtask hdfs_sensor ImportError: This HDFSHook implementation requires snakebite, but snakebite is not compatible with Python 3 (as of August 2015). Please use Python 2 if you require this hook  -- or help by submitting a PR!
[2019-06-03 22:38:08,084] {logging_mixin.py:95} INFO - [2019-06-03 22:38:08,084] {jobs.py:2562} INFO - Task exited with return code 1

暫時(shí)的解決方案

pip install snakebite-py3

不是官方的舷嗡,這玩意只有一顆星星轴猎,不知道靠不靠譜嵌莉。

https://pypi.org/project/snakebite-py3/

https://github.com/kirklg/snakebite/tree/feature/python3

  • 沒有找到hdfs_sensor的demo,只能看源碼來猜他怎么用捻脖。
hdfs_conn_id='AML_HDFS'
  • 這里傳入的字符串锐峭,有兩種使用方式,一種是從環(huán)境變量里取可婶,另一種是從數(shù)據(jù)庫中取
@classmethod
@provide_session
def _get_connections_from_db(cls, conn_id, session=None):
    db = (
        session.query(Connection)
        .filter(Connection.conn_id == conn_id)
        .all()
    )
    session.expunge_all()
    if not db:
        raise AirflowException(
            "The conn_id `{0}` isn't defined".format(conn_id))
    return db

@classmethod
def _get_connection_from_env(cls, conn_id):
    environment_uri = os.environ.get(CONN_ENV_PREFIX + conn_id.upper())
    conn = None
    if environment_uri:
        conn = Connection(conn_id=conn_id, uri=environment_uri)
    return conn

  • 這里手動(dòng)在環(huán)境變量里塞入了"AIRFLOW_CONN_AML_HDFS"
export AIRFLOW_CONN_AML_HDFS="m7-solution-cpu01:8020"
  • Connection構(gòu)造方法中會(huì)取出url沿癞,然后解析它的host和port的信息,最后傳到hdfs客戶端中
  • 可以在聲明DAG時(shí)的default_args中添加env的聲明矛渴,但需要注意椎扬,如果設(shè)置了env惫搏,airflow就不再訪問系統(tǒng)的環(huán)境變量,所以這里設(shè)置的env一定要包含程序運(yùn)行所需的所有環(huán)境變量蚕涤,否則會(huì)出錯(cuò)
import os
local_env = os.environ
local_env['PATH'] = os.environ['PATH'] + ":" + Variable.get('PATH')
local_env['JAVA_HOME'] = Variable.get('JAVA_HOME')
 
# 在dag的default_args中添加'env':dict(local_env)
  • 使用自定義環(huán)境變量的方式好像并沒有成功筐赔!
  • 使用下面這個(gè)方法,在web中配置參數(shù)后可以生效揖铜。


Menu -> Admin -> Connections可以在這里配置連接所需參數(shù)茴丰。

11. 自學(xué)習(xí)、批量預(yù)估設(shè)計(jì)

airflow_sl&bp.jpg

12. 發(fā)現(xiàn)

  • 上游所有依賴節(jié)點(diǎn)都跑完天吓,下個(gè)節(jié)點(diǎn)才會(huì)運(yùn)行贿肩。
  • 定義在operator之外的代碼,每個(gè)operator運(yùn)行時(shí)龄寞,都會(huì)重新執(zhí)行一遍汰规。

13. 使用k8s管理

docker build -t aml_env .
# 創(chuàng)建新的namespace
kubectl create namespace aml
kubectl create -f ./aml.yaml -n aml
[root@m7-notebook-gpu01] dockerfile # pwd
/root/dockerfile
[root@m7-notebook-gpu01] dockerfile # ll
total 32
-rw-r--r-- 1 root root 16488 Jun 10 17:59 aml_flow.py
-rw-r--r-- 1 root root   716 Jun 10 22:37 aml.yaml
-rw-r--r-- 1 root root   907 Jun 10 22:41 Dockerfile
-rw-r--r-- 1 root root   178 Jun 10 22:19 start.sh

dockerfile

FROM conda/miniconda3-centos7
MAINTAINER Mao Pengyu <maopengyu@4paradigm.com>
 
ENV CONDA_AML he_test
RUN conda create -n $CONDA_AML python=3.7
RUN echo "source activate $CONDA_AML" > ~/.bashrc
ENV PATH /opt/conda/envs/env/bin:$PATH
# 切換到he_test環(huán)境
RUN yum -y install \
    curl \
    build-essential \
    gcc \
    graphviz
RUN source activate he_test && \
    conda install -y \
        jupyter \
        requests==2.19.1 \
        ipykernel \
        matplotlib \
        numpy \
        pandas \
        scipy \
        scikit-learn \
        fastparquet \
        snappy \
        pyarrow &&\
    conda install -y -c conda-forge \
        python-hdfs \
        xgboost \
        lightgbm && \
    export AIRFLOW_HOME=/root/airflow && \
    pip --no-cache-dir install apache-airflow snakebite-py3 && \
    # 在notebook中創(chuàng)建he_test的kernel
    python -m ipykernel install --user --name he_test --display-name "he_test"
ADD ./aml_flow.py /root/airflow/dags/
ADD ./start.sh /root/
 
ENTRYPOINT ["bash", "/root/start.sh"]

start.sh

#!/bin/bash

source activate $CONDA_AML
airflow initdb
nohup airflow webserver -p 8080 &
nohup airflow scheduler &
# 配置connections
airflow connections --add --conn_id aml_hdfs --conn_type 'hdfs' --conn_host m7-solution-cpu01 --conn_login hdfs --conn_port 8020
# 配置variables
airflow variables -s sl_config $SL_CONFIG
# 取消訪問notebook需要token或密碼的配置
jupyter notebook --generate-config
echo "c.NotebookApp.token = ''" >> /root/.jupyter/jupyter_notebook_config.py
mkdir -p /root/notebook
# 掛載的本地路徑
cd /root/notebook
# 腳本中最后一個(gè)進(jìn)程一定要用前臺(tái)運(yùn)行方式即在進(jìn)程最后不加&(&表示后臺(tái)運(yùn)行),否則容器會(huì)退出
nohup jupyter notebook --allow-root --ip=0.0.0.0 --port=8888

aml_flow.py

# -*- coding: utf-8 -*-
 
from __future__ import print_function
 
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
from airflow.operators.sensors import HdfsSensor
 
from hdfs import *
import datetime
import pandas as pd
from sklearn.preprocessing import LabelEncoder
import xgboost as xgb
import os
import json
import shutil
from sklearn import metrics
import logging
 
args = {
    'owner': 'aml',
    'start_date': airflow.utils.dates.days_ago(0, hour=0),
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
    'email': ['maotao@4paradigm.com', 'maopengyu@4paradigm.com'],
    'email_on_failure': True,
    'email_on_retry': True,
}
 
dag = DAG(
    dag_id='aml_flow',
    catchup=False,
    default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=datetime.timedelta(minutes=60),
)
#############################################
# 全局變量
# 獲取配置文件中的參數(shù)
sl_config = Variable.get('sl_config', deserialize_json=True, default_var={"hdfs_url":"localhost","hdfs_user":"hdfs","daily_dir_list":["trx","str"],"static_dir_list":["retail","corporate","account"],"base_local_path":"/root/airflow/aml_data/sl_data/{}/","base_local_metrics_path":"/root/airflow/aml_data/sl_data/{}/for_metrics/","base_local_model_path":"/root/airflow/aml_data/model/{}","base_local_model_meta_path":"/root/airflow/aml_data/model_meta/{}","base_local_predict_res_path":"/root/airflow/aml_data/bp_data/res/{}","model_prefix":"he_test_xgboost","predict_res_prefix":"pred_full_table","base_remote_daily_path":"/anti-money/daily_data_group/{}/daily/{}","base_remote_static_path":"/anti-money/daily_data_group/{}/all","base_remote_model_path":"/anti-money/he_test/model/{}","base_remote_model_meta_path":"/anti-money/he_test/model_meta/{}","base_remote_predict_res_path":"/anti-money/he_test/predict_res/{}","specified_model_path":"","start_time":"2018-05-01","end_time":"2018-05-27","metrics_start_time":"2018-05-28","metrics_end_time":"2018-05-30"})
logging.info('config: {}'.format(sl_config))
hdfs_url = sl_config['hdfs_url']
hdfs_user = sl_config['hdfs_user']
daily_dir_list = sl_config['daily_dir_list']
static_dir_list = sl_config['static_dir_list']
base_local_path = sl_config['base_local_path']
base_remote_daily_path = sl_config['base_remote_daily_path']
base_remote_static_path = sl_config['base_remote_static_path']
start_time = sl_config['start_time']
end_time = sl_config['end_time']
base_local_model_path = sl_config['base_local_model_path']
base_local_model_meta_path = sl_config['base_local_model_meta_path']
base_local_predict_res_path = sl_config['base_local_predict_res_path']
model_prefix = sl_config['model_prefix']
base_remote_model_path = sl_config['base_remote_model_path']
metrics_start_time = sl_config['metrics_start_time']
metrics_end_time = sl_config['metrics_end_time']
base_local_metrics_path = sl_config['base_local_metrics_path']
predict_res_prefix = sl_config['predict_res_prefix']
base_remote_predict_res_path = sl_config['base_remote_predict_res_path']
base_remote_model_meta_path = sl_config['base_remote_model_meta_path']
specified_model_path = sl_config['specified_model_path']
today = datetime.date.today().strftime('%Y-%m-%d')
poke_interval = 60
model_id = '{}_{}'.format(model_prefix, today)
uploaded_model_path = base_remote_model_path.format(model_id)+'.model'
local_model_path = base_local_model_path.format('{}.model'.format(model_prefix))
hdfs_client = InsecureClient(url=hdfs_url, user=hdfs_user)
#############################################
 
 
def clear_local_path(local_path):
    if os.path.exists(local_path):
        if os.path.isfile(local_path):
            os.remove(local_path)
        else:
            # 刪除目錄及其下子文件
            shutil.rmtree(local_path)
            os.makedirs(local_path)
    else:
        os.makedirs(local_path)
 
 
def get_data_from_hdfs(**kwargs):
    date_list = download_data_from_hdfs(start_time, end_time, base_local_path)
    kwargs['ti'].xcom_push(key='date_list', value=date_list)
    # 下載今日的數(shù)據(jù)
    download_daily_data_from_hdfs(base_local_path, today, today)
 
 
def download_data_from_hdfs(start_time, end_time, base_local_path):
    date_list = download_daily_data_from_hdfs(base_local_path, start_time, end_time)
    download_static_data_from_hdfs(base_local_path)
    return date_list
 
 
def download_daily_data_from_hdfs(base_local_path, start_time, end_time):
    start_date = datetime.datetime.strptime(start_time, '%Y-%m-%d')
    end_date = datetime.datetime.strptime(end_time, '%Y-%m-%d')
    date_list = []
    for i in range((end_date - start_date).days + 1):
        range_date = start_date + datetime.timedelta(days=i)
        date_list.append(range_date.strftime('%Y-%m-%d'))
    # 下載起始時(shí)間范圍內(nèi)的文件
    for sl_dir in daily_dir_list:
        # clear_local_path(base_local_path.format(sl_dir))
        for date in date_list:
            # 當(dāng)本地文件不存在時(shí)萄焦,才下載
            if not os.path.exists(base_local_path.format(sl_dir)+'{}/'.format(date)):
                if not os.path.exists(base_local_path.format(sl_dir)):
                    os.makedirs(base_local_path.format(sl_dir))
                try:
                    logging.info("downloading {} from {}".format(base_local_path.format(sl_dir), base_remote_daily_path.format(sl_dir, date)))
                    # 這個(gè)client的download控轿,目標(biāo)本地路徑/a/b/,遠(yuǎn)端路徑/c/d/拂封,當(dāng)/a/b/存在時(shí)茬射,下載后/a/b/d/;當(dāng)/a/b/不存在時(shí)冒签,下載后/a/b/
                    hdfs_client.download(base_remote_daily_path.format(sl_dir, date), base_local_path.format(sl_dir))
                except HdfsError as e:
                    # 這個(gè)hdfs庫無法判斷文件是否存在在抛,只能采用這種粗暴的方式
                    logging.info(base_remote_daily_path.format(sl_dir, date) + "下載文件出錯(cuò):" + e.message)
            else:
                logging.info(base_local_path.format(sl_dir)+'{}/'.format(date) + "已存在,無需下載")
    return date_list
 
 
def download_static_data_from_hdfs(base_local_path):
    for sl_dir in static_dir_list:
        if not os.path.exists(base_local_path.format(sl_dir)):
            # clear_local_path(base_local_path.format(sl_dir))
            if not os.path.exists(base_local_path.format(sl_dir)):
                os.makedirs(base_local_path.format(sl_dir))
            logging.info("downloading {} from {}".format(base_local_path.format(sl_dir), base_remote_static_path.format(sl_dir)))
            hdfs_client.download(base_remote_static_path.format(sl_dir), base_local_path.format(sl_dir))
        else:
            logging.info(base_local_path.format(sl_dir) + "已存在萧恕,無需下載")
 
 
get_data_operator = PythonOperator(
    task_id='get_data_operator',
    provide_context=True,
    python_callable=get_data_from_hdfs,
    dag=dag,
)
 
 
# sensor這玩意怎么從xcom中取數(shù)據(jù)呀刚梭?現(xiàn)在從全局變量里取
def get_daily_file_path(filename):
    return base_remote_daily_path.format(filename, today)
 
 
# 循環(huán)生成多個(gè)sensor
for daily_dir in daily_dir_list:
    daily_file_sensor = HdfsSensor(
        task_id='daily_{}_file_sensor'.format(daily_dir),
        poke_interval=poke_interval,  # (seconds)
        timeout=60 * 60 * 24,  # timeout in 12 hours
        filepath=get_daily_file_path(daily_dir),
        hdfs_conn_id='aml_hdfs',
        dag=dag
    )
    daily_file_sensor >> get_data_operator
 
 
def process_data(**kwargs):
    date_list = kwargs['ti'].xcom_pull(key='date_list', task_ids='get_data_operator')
    X, y, str_with_trx_with_retail_with_corporate_with_account = process_data_from_local(date_list, base_local_path)
    kwargs['ti'].xcom_push(key='X', value=X)
    kwargs['ti'].xcom_push(key='y', value=y)
 
 
def process_data_from_local(date_list, base_local_path):
    # 讀取每日更新的表
    daily_dir_dict = {}
    for sl_dir in daily_dir_list:
        # 初始化
        daily_dir_dict[sl_dir] = pd.DataFrame()
        for date in date_list:
            if os.path.exists(base_local_path.format(sl_dir) + date):
                logging.info("read daily table from path: {}".format(base_local_path.format(sl_dir) + date))
                daily_dir_dict[sl_dir] = daily_dir_dict[sl_dir].append(pd.read_parquet(base_local_path.format(sl_dir) + date))
    # 讀取靜態(tài)表
    static_dir_dict = {}
    for sl_dir in static_dir_list:
        # 初始化
        logging.info("read static table from path: {}".format(base_local_path.format(sl_dir) + 'all'))
        static_dir_dict[sl_dir] = pd.read_parquet(base_local_path.format(sl_dir) + 'all', engine='auto')
    # 拼表
    # 這里還是寫死了,討厭
    str_with_trx = pd.merge(daily_dir_dict['str'], daily_dir_dict['trx'], how='left', on=['trx_id'])
    logging.info("merged str_with_trx")
    str_with_trx_with_retail = pd.merge(str_with_trx, static_dir_dict['retail'], how='left', on=['cust_id'])
    logging.info("merged str_with_trx_with_retail")
    str_with_trx_with_retail_with_corporate = pd.merge(str_with_trx_with_retail, static_dir_dict['corporate'], how='left', on=['cust_id'])
    logging.info("merged str_with_trx_with_retail_with_corporate")
    str_with_trx_with_retail_with_corporate_with_account = pd.merge(str_with_trx_with_retail_with_corporate, static_dir_dict['account'], how='left', on=['account_id'])
    logging.info("merged str_with_trx_with_retail_with_corporate_with_account")
    # 對(duì)類別特征做Label Encoder
    le = LabelEncoder()
    # labeled_data = str_with_trx_with_retail_with_corporate_with_account.copy()
    labeled_data = str_with_trx_with_retail_with_corporate_with_account.astype(str).apply(le.fit_transform)
    X = labeled_data.drop(['label', 'trx_id', 'ins_id', 'cust_id', 'account_id', 'target_cust_id', 'target_account_id'], axis=1)
    y = labeled_data['label']
    return X, y, str_with_trx_with_retail_with_corporate_with_account
 
 
process_data_operator = PythonOperator(
    task_id='process_data_operator',
    provide_context=True,
    python_callable=process_data,
    dag=dag,
)
 
 
def train_model(**kwargs):
    X = kwargs['ti'].xcom_pull(key='X', task_ids='process_data_operator')
    y = kwargs['ti'].xcom_pull(key='y', task_ids='process_data_operator')
 
    # 模型訓(xùn)練
    dtrain = xgb.DMatrix(X, label=y)
    watchlist = [(dtrain, 'train')]
 
    params = {'booster': 'gbtree',
              'objective': 'binary:logistic',
              'eval_metric': 'auc',
              'max_depth': 4,
              'lambda': 10,
              'subsample': 0.75,
              'colsample_bytree': 0.75,
              'min_child_weight': 2,
              'eta': 0.025,
              'seed': 0,
              'nthread': 8,
              'silent': 1}
    bst = xgb.train(params, dtrain, num_boost_round=1000, evals=watchlist)
    clear_local_path(base_local_model_path.format(''))
    bst.save_model(local_model_path)
    # 第二個(gè)參數(shù)為False票唆,當(dāng)文件不存在時(shí)return none朴读,文件存在時(shí)返回文件信息
    if not hdfs_client.status(uploaded_model_path, False):
        hdfs_client.upload(uploaded_model_path, local_model_path)
        logging.info('uploaded local model {} to {} successfully'.format(local_model_path, uploaded_model_path))
    else:
        logging.info('{}已存在'.format(uploaded_model_path))
 
 
train_model_operator = PythonOperator(
    task_id='train_model_operator',
    provide_context=True,
    python_callable=train_model,
    dag=dag,
)
 
 
def get_metrics(**kwargs):
    # 下載數(shù)據(jù)
    date_list_for_metrics = download_data_from_hdfs(metrics_start_time, metrics_end_time, base_local_metrics_path)
    # 數(shù)據(jù)處理
    X, y, str_with_trx_with_retail_with_corporate_with_account = process_data_from_local(date_list_for_metrics, base_local_metrics_path)
 
    dtrain = xgb.DMatrix(X)
    model = xgb.Booster(model_file=local_model_path)
    y_pred = model.predict(dtrain)
    y_pred_binary = (y_pred >= 0.5) * 1
 
    auc = metrics.roc_auc_score(y, y_pred)
    acc = metrics.accuracy_score(y, y_pred_binary)
    recall = metrics.recall_score(y, y_pred_binary)
    F1_score = metrics.f1_score(y, y_pred_binary)
    Precesion = metrics.precision_score(y, y_pred_binary)
 
    logging.info('AUC: %.4f' % auc)
    logging.info('ACC: %.4f' % acc)
    logging.info('Recall: %.4f' % recall)
    logging.info('F1-score: %.4f' % F1_score)
    logging.info('Precesion: %.4f' % Precesion)
    metrics.confusion_matrix(y, y_pred_binary)
 
    res_dict = {}
    res_dict['model_id'] = model_id
    res_dict['auc'] = auc
    res_dict['acc'] = acc
    res_dict['recall'] = recall
    res_dict['f1_score'] = F1_score
    res_dict['precesion'] = Precesion
    res_json = json.dumps(res_dict)
 
    local_model_meta_path = base_local_model_meta_path.format('{}.meta.json'.format(model_prefix))
    clear_local_path(base_local_model_meta_path.format(''))
    with open(local_model_meta_path, "w") as f:
        f.write(res_json)
 
    # 第二個(gè)參數(shù)為False,當(dāng)文件不存在時(shí)return none走趋,文件存在時(shí)返回文件信息
    if not hdfs_client.status(base_remote_model_meta_path.format(model_id) + '.meta.json', False):
        hdfs_client.upload(base_remote_model_meta_path.format(model_id) + '.meta.json', local_model_meta_path)
        logging.info('uploaded local meta {} to {} successfully'.format(local_model_meta_path, base_remote_model_meta_path.format( model_id) + '.meta.json'))
    else:
        logging.info('{}已存在'.format(base_remote_model_meta_path.format(model_id) + '.meta.json'))
 
 
get_metrics_operator = PythonOperator(
    task_id='get_metrics_operator',
    provide_context=True,
    python_callable=get_metrics,
    dag=dag,
)
 
 
def bp_process_data(**kwargs):
    bp_X, y, str_with_trx_with_retail_with_corporate_with_account = process_data_from_local([today], base_local_path)
    kwargs['ti'].xcom_push(key='bp_X', value=bp_X)
    kwargs['ti'].xcom_push(key='str_with_trx_with_retail_with_corporate_with_account', value=str_with_trx_with_retail_with_corporate_with_account)
 
 
bp_process_data_operator = PythonOperator(
    task_id='bp_process_data_operator',
    provide_context=True,
    python_callable=bp_process_data,
    dag=dag,
)
 
if specified_model_path.strip():
    model_sensor = HdfsSensor(
        task_id='model_sensor',
        poke_interval=poke_interval,  # (seconds)
        timeout=60 * 60 * 24,  # timeout in 24 hours
        filepath=specified_model_path,
        hdfs_conn_id='aml_hdfs',
        dag=dag
    )
else:
    model_sensor = HdfsSensor(
        task_id='model_sensor',
        poke_interval=poke_interval,  # (seconds)
        timeout=60 * 60 * 24,  # timeout in 24 hours
        filepath=uploaded_model_path,
        hdfs_conn_id='aml_hdfs',
        dag=dag
    )
 
 
def predict(**kwargs):
    X = kwargs['ti'].xcom_pull(key='bp_X', task_ids='bp_process_data_operator')
    str_with_trx_with_retail_with_corporate_with_account = kwargs['ti'].xcom_pull(key='str_with_trx_with_retail_with_corporate_with_account', task_ids='bp_process_data_operator')
 
    dtrain = xgb.DMatrix(X)
    model = xgb.Booster(model_file=local_model_path)
    y_pred = model.predict(dtrain)
    pred_full_table = str_with_trx_with_retail_with_corporate_with_account.copy()
    pred_full_table['pred_score'] = y_pred
 
    predict_res_name = predict_res_prefix + today + '.csv'
 
    clear_local_path(base_local_predict_res_path.format(''))
    pred_full_table.to_csv(base_local_predict_res_path.format(predict_res_name))
 
    # 第二個(gè)參數(shù)為False衅金,當(dāng)文件不存在時(shí)return none,文件存在時(shí)返回文件信息
    if not hdfs_client.status(base_remote_predict_res_path.format(predict_res_name), False):
        hdfs_client.upload(base_remote_predict_res_path.format(predict_res_name), base_local_predict_res_path.format(predict_res_name))
        logging.info('uploaded predict res {} to {} successfully'.format(base_local_predict_res_path.format(predict_res_name), base_remote_predict_res_path.format(predict_res_name)))
    else:
        logging.info('{}已存在'.format(base_remote_predict_res_path.format(predict_res_name)))
 
 
predict_operator = PythonOperator(
    task_id='predict_operator',
    provide_context=True,
    python_callable=predict,
    dag=dag,
)
 
get_data_operator >> process_data_operator >> train_model_operator >> get_metrics_operator
get_data_operator >> bp_process_data_operator >> model_sensor >> predict_operator

aml.yaml
/root/he_test掛載到/root/notebook/local_file
添加configMap簿煌,并配置到container的環(huán)境變量中

apiVersion: v1
data:
  sl.config: '{"hdfs_url":"http://172.27.128.237:50070","hdfs_user":"hdfs","daily_dir_list":["trx","str"],"static_dir_list":["retail","corporate","account"],"base_local_path":"/root/airflow/aml_data/sl_data/{}/","base_local_metrics_path":"/root/airflow/aml_data/sl_data/{}/for_metrics/","base_local_model_path":"/root/airflow/aml_data/model/{}","base_local_model_meta_path":"/root/airflow/aml_data/model_meta/{}","base_local_predict_res_path":"/root/airflow/aml_data/bp_data/res/{}","model_prefix":"he_test_xgboost","predict_res_prefix":"pred_full_table","base_remote_daily_path":"/anti-money/daily_data_group/{}/daily/{}","base_remote_static_path":"/anti-money/daily_data_group/{}/all","base_remote_model_path":"/anti-money/he_test/model/{}","base_remote_model_meta_path":"/anti-money/he_test/model_meta/{}","base_remote_predict_res_path":"/anti-money/he_test/predict_res/{}","specified_model_path":"","start_time":"2018-05-01","end_time":"2018-05-27","metrics_start_time":"2018-05-28","metrics_end_time":"2018-05-30"}'
kind: ConfigMap
metadata:
  name: aml-config
  namespace: aml
---
apiVersion: v1
kind: Service
metadata:
  name: aml
  labels:
    app: aml
spec:
  type: NodePort
  ports:
  - nodePort: 31234
    port: 8080
    targetPort: 8080
    protocol: TCP
    name: airflow-port
  - nodePort: 31235
    port: 8888
    targetPort: 8888
    protocol: TCP
    name: notebook-port
  selector:
    app: aml
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: aml
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: aml
    spec:
      containers:
      - name: aml
        image: aml_env:0.0.6
        resources:
          requests:
            memory: "0"
            cpu: "0"
          limits:
            memory: "128Gi"
            cpu: "20"
        ports:
        - containerPort: 80
        volumeMounts:
        - mountPath: /root/notebook/local_file
          name: notebook-path
        env:
        - name: SL_CONFIG
          valueFrom:
            configMapKeyRef:
              name: aml-config
              key: sl.config
      volumes:
      - name: notebook-path
        hostPath:
          path: /root/he_test
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末氮唯,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子姨伟,更是在濱河造成了極大的恐慌惩琉,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件夺荒,死亡現(xiàn)場離奇詭異瞒渠,居然都是意外死亡良蒸,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門伍玖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來诚啃,“玉大人,你說我怎么就攤上這事私沮∈际辏” “怎么了?”我有些...
    開封第一講書人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵仔燕,是天一觀的道長造垛。 經(jīng)常有香客問我,道長晰搀,這世上最難降的妖魔是什么五辽? 我笑而不...
    開封第一講書人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮外恕,結(jié)果婚禮上杆逗,老公的妹妹穿的比我還像新娘。我一直安慰自己鳞疲,他們只是感情好罪郊,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著尚洽,像睡著了一般悔橄。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上腺毫,一...
    開封第一講書人閱讀 51,541評(píng)論 1 305
  • 那天癣疟,我揣著相機(jī)與錄音,去河邊找鬼潮酒。 笑死睛挚,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的急黎。 我是一名探鬼主播扎狱,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼叁熔!你這毒婦竟也來了委乌?” 一聲冷哼從身側(cè)響起床牧,我...
    開封第一講書人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤荣回,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后戈咳,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體心软,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡壕吹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了删铃。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片耳贬。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖猎唁,靈堂內(nèi)的尸體忽然破棺而出咒劲,到底是詐尸還是另有隱情,我是刑警寧澤诫隅,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布腐魂,位于F島的核電站,受9級(jí)特大地震影響逐纬,放射性物質(zhì)發(fā)生泄漏蛔屹。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一豁生、第九天 我趴在偏房一處隱蔽的房頂上張望兔毒。 院中可真熱鬧,春花似錦甸箱、人聲如沸育叁。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽擂红。三九已至,卻和暖如春围小,著一層夾襖步出監(jiān)牢的瞬間昵骤,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來泰國打工肯适, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留变秦,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓框舔,卻偏偏與公主長得像蹦玫,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子刘绣,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 本文將介紹 Airflow 這一款優(yōu)秀的調(diào)度工具樱溉。主要包括 Airflow 的服務(wù)構(gòu)成、Airflow 的 Web...
    a7f00a9019ae閱讀 62,051評(píng)論 6 42
  • Apache Spark 是專為大規(guī)模數(shù)據(jù)處理而設(shè)計(jì)的快速通用的計(jì)算引擎纬凤。Spark是UC Berkeley AM...
    大佛愛讀書閱讀 2,828評(píng)論 0 20
  • Airflow 是 Airbnb 公司開源的任務(wù)調(diào)度系統(tǒng), 通過使用 Python 開發(fā) DAG, 非常方便的調(diào)度...
    haitaoyao閱讀 9,780評(píng)論 2 7
  • 平時(shí),用的方法即第一種方法是設(shè)置left,top值均為50%,同時(shí)margin-left設(shè)置為絕對(duì)定位元素widt...
    戰(zhàn)神飄雪閱讀 379評(píng)論 0 0
  • 最近一直沉默福贞。 今天一天的狀態(tài)都不好,不知道是不是沒有吃早餐的緣故停士。 早餐已經(jīng)被代餐粉替代挖帘,過午不食完丽,也沒有了晚餐...
    王呀嘛王沐心閱讀 190評(píng)論 0 0