一.描述
A plugin for Apache Airflow that exposes REST endpoints for the Command Line Interfaces listed in the airflow documentation:
http://airflow.apache.org/cli.html
二.可選操作
1.可能需要安裝flask_jwt_extended模塊
2.如果開啟tooken驗證,用戶的request請求頭需要加入rest_api_plugin_http_token
Example :curl --header "rest_api_plugin_http_token: changeme" http://{HOST}:{PORT}/admin/rest_api/api?api=version
如果未攜帶會返回:
"call_time": "{TIMESTAMP}",
"http_response_code": 403,
"output": "Token Authentication Failed",
"response_time": "{TIMESTAMP}",
"status": "ERROR"
}
Working with the rest_api_plugin and JWT Auth tokens
Pass the additional Authorization:Bearer <access_token>
header in the rest API request
curl -H 'Authorization:Bearer <access_token>' 'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=version'
If you have rest_api_plugin_http_token_header
authentication enabled then you need to pass both headers as shown below.
curl -H 'Authorization:Bearer <access_token>' -H 'rest_api_plugin_http_token: changeme' http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=version
If JWT Authorization header is missing in the request you will get the following response. (Status Code: 401)
{ "msg": "Missing Authorization Header" }
If JWT access token expires you will get the following response. (Status Code: 401)
{ "msg": "Token has expired" }
If JWT access token is invalid you will get the following response. (Status Code: 422)
{ "msg": "Invalid payload padding" }
Please refer Flask-JWT-Extended module documentation for more details
curl -XPOST http://localhost:8080/api/v1/security/login -H "Content-Type: application/json" -d '{"username":"pdc_admin", "password":"pdc_deploy@jjmatch", "refresh":true, "provider": "db"}'
curl -H 'Authorization:Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1NzczNDczMjksIm5iZiI6MTU3NzM0NzMyOSwianRpIjoiMjRmNTU0YjYtOWJjNS00YWU3LWEzYjYtZTM5N2QwMTNmOGRlIiwiZXhwIjoxNTc3MzQ4MjI5LCJpZGVudGl0eSI6MiwiZnJlc2giOnRydWUsInR5cGUiOiJhY2Nlc3MifQ.917nUhpl-dNCE8wgjhGXjEjSlN8jBDYkKzL2dV8NowQ' -H 'rest_api_plugin_http_token: changeme' http://localhost:8080/admin/rest_api/api?api=version
三.部署步驟
1.新增plugins{AIRFLOW_HOME}/plugins
2.在airflow.cfg中追加plugins_folder = /home/{USER_NAME}/airflow/plugins
3.wget https://github.com/teamclairvoyant/airflow-rest-api-plugin/archive/v1.0.7.branch.zip
4.解壓
unzip airflow-rest-api-plugin-v1.0.7.branch.zip
cp -r airflow-rest-api-plugin-v1.0.7.branch/plugins/* {AIRFLOW_HOME}/plugins/
5.(可選操作apitoken驗證 log配置 避免過多日志信息).在airflow.cfg中追加內(nèi)容
[rest_api_plugin]
# Logs global variables used in the REST API plugin when the plugin is loaded. Set to False by default to avoid too many logging messages.
# DEFAULT: False
log_loading = False
# Filters out loading messages from the standard out
# DEFAULT: True
filter_loading_messages_in_cli_response = True
# HTTP Header Name to be used for authenticating REST calls for the REST API Plugin
# DEFAULT: 'rest_api_plugin_http_token'
#rest_api_plugin_http_token_header_name = rest_api_plugin_http_token
# HTTP Token to be used for authenticating REST calls for the REST API Plugin
# DEFAULT: None
# Comment this out to disable Authentication
#rest_api_plugin_expected_http_token = changeme
6.重啟Airflow Web Server
7.如果安裝成功將會在Admin Tap 下面看到 REST API 的頁面用來測試rest請求
四.Airflow rest_api 插件使用
1.獲取airflow版本: http://{HOST}:{PORT}/admin/rest_api/api?api=version
2.獲取rest_api插件版本: http://{HOST}:{PORT}/admin/rest_api/api?api=rest_api_plugin_version
3.渲染任務(wù)實例模板:
http://{HOST}:{PORT}/admin/rest_api/api?api=render
http://{HOST}:{PORT}/admin/rest_api/api?api=render&dag_id=value&task_id=value&execution_date=2017-01-02T03:04:05&subdir=value
4.CRUD操作變量 (目前不明白能做什么事情) 類似緩存 Context可以在自定義airflow腳本中使用 也可以在jinjia模板中使用
但它是專門用于任務(wù)之間的通信而不是全局設(shè)置
note: 可將json格式的文件 導(dǎo)入 系統(tǒng)自動識變量 可序列化json
http://{HOST}:{PORT}/admin/rest_api/api?api=variables
http://{HOST}:{PORT}/admin/rest_api/api?api=variables&cmd=set&key=value&value=value&get=value&json&default=value&import=value&export=value&delete=value
For setting a variable like myVar1=myValue1 use
http://{HOST}:{PORT}/admin/rest_api/api?api=variables&cmd=set&key=myVar1&value=myValue1
使用方法:
from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)
echo {{ var.value.<variable_name> }}
5.操作外部connections:http://{HOST}:{PORT}/admin/rest_api/api?api=connections
6.暫停DAG:http://{HOST}:{PORT}/admin/rest_api/api?api=pause&dag_id=test_id
7.停止暫停DAG: http://{HOST}:{PORT}/admin/rest_api/api?api=unpause&dag_id=test_id
8.獲取Dag中任務(wù)失敗后 所以依賴的task :
http://{HOST}:{PORT}/admin/rest_api/api?api=task_failed_deps&dag_id=value&task_id=value&execution_date=2017-01-02T03:04:05
9.觸發(fā)Dag運行:
http://{HOST}:{PORT}/admin/rest_api/api?api=trigger_dag&dag_id=test_id
http://{HOST}:{PORT}/admin/rest_api/api?api=trigger_dag&dag_id=test_id&run_id=run_id_2016_01_01&conf=%7B%22key%22%3A%22value%22%7D
10.測試Dag:http://{HOST}:{PORT}/admin/rest_api/api?api=test&dag_id=value&task_id=value&execution_date=2017-01-02T03:04:05
11.獲取Dag執(zhí)行狀態(tài):http://{HOST}:{PORT}/admin/rest_api/api?api=dag_state&dag_id=test_id&execution_date=2017-01-02T03:04:05
12.運行單個task:
http://{HOST}:{PORT}/admin/rest_api/api?api=run&dag_id=value&task_id=value&execution_date=2017-01-02T03:04:05
13.展示一個Dag的所有Task:http://{HOST}:{PORT}/admin/rest_api/api?api=list_tasks&dag_id=test_id
14.列出所有Dags : http://{HOST}:{PORT}/admin/rest_api/api?api=list_dags
15.獲取Task運行狀態(tài) :
http://{HOST}:{PORT}/admin/rest_api/api?api=task_state&dag_id=value&task_id=value&execution_date=2017-01-02T03:04:05
16.CRUD對pool操作:
http://{HOST}:{PORT}/admin/rest_api/api?api=pool 類似調(diào)度池 可以分配任務(wù)權(quán)重
17.部署新的dag:
curl -X POST -H 'Content-Type: multipart/form-data' -F 'dag_file=@/path/to/dag.py' -F 'force=on' http://{HOST}:{PORT}/admin/rest_api/api?api=deploy_dag
post參數(shù):
1)dag_file:file-上傳部署的py文件
2)force:可選慕嚷,布爾型,文件已存在,是否強制上傳
3)pause:可選喝检,布爾型嗅辣,在創(chuàng)建時強制暫停dag并覆蓋'dags_are_paused_at_creation'配置
4)unpause(可選) - 布爾值 - DAG在創(chuàng)建時將被強制取消暫停,并覆蓋'dags_are_paused_at_creation'配置
18.刷新Dag:
http://{HOST}:{PORT}/admin/rest_api/api?api=refresh_dag
Api 響應(yīng)參數(shù)
API Response
The API's will all return a common response object. It is a JSON object with the following entries in it:
airflow_cmd - String - Airflow CLI command being ran on the local machine
arguments - Dict - Dictionary with the arguments you passed in and their values
post_arguments - Dict - Dictionary with the post body arguments you passed in and their values
call_time - Timestamp - Time in which the request was received by the server
output - String - Text output from calling the CLI function
response_time - Timestamp - Time in which the response was sent back by the server
status - String - Response Status of the call. (possible values: OK, ERROR)
warning - String - A Warning message that's sent back from the API
http_response_code - Integer - HTTP Response code
Sample (Result of calling the versions endpoint)
{
"airflow_cmd": "airflow version",
"arguments": {},
"call_time": "Tue, 29 Nov 2016 14:22:26 GMT",
"http_response_code": 200,
"output": "1.7.0",
"response_time": "Tue, 29 Nov 2016 14:27:59 GMT",
"status": "OK"
}