Airflow rest_api 插件部署

一.描述

A plugin for Apache Airflow that exposes REST endpoints for the Command Line Interfaces listed in the airflow documentation:

http://airflow.apache.org/cli.html

二.可選操作

1.可能需要安裝flask_jwt_extended模塊
2.如果開啟tooken驗證,用戶的request請求頭需要加入rest_api_plugin_http_token

Example :curl --header "rest_api_plugin_http_token: changeme" http://{HOST}:{PORT}/admin/rest_api/api?api=version
如果未攜帶會返回:

  "call_time": "{TIMESTAMP}",
  "http_response_code": 403,
  "output": "Token Authentication Failed",
  "response_time": "{TIMESTAMP}",
  "status": "ERROR"
}

Working with the rest_api_plugin and JWT Auth tokens

Pass the additional Authorization:Bearer <access_token> header in the rest API request

curl -H 'Authorization:Bearer <access_token>' 'http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=version'  

If you have rest_api_plugin_http_token_header authentication enabled then you need to pass both headers as shown below.

curl -H 'Authorization:Bearer <access_token>'  -H 'rest_api_plugin_http_token: changeme' http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=version

If JWT Authorization header is missing in the request you will get the following response. (Status Code: 401)

{ "msg": "Missing Authorization Header" }

If JWT access token expires you will get the following response. (Status Code: 401)

{ "msg": "Token has expired" }

If JWT access token is invalid you will get the following response. (Status Code: 422)

{ "msg": "Invalid payload padding" }

Please refer Flask-JWT-Extended module documentation for more details

curl -XPOST http://localhost:8080/api/v1/security/login -H "Content-Type: application/json" -d '{"username":"pdc_admin", "password":"pdc_deploy@jjmatch", "refresh":true, "provider": "db"}' 
curl -H 'Authorization:Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1NzczNDczMjksIm5iZiI6MTU3NzM0NzMyOSwianRpIjoiMjRmNTU0YjYtOWJjNS00YWU3LWEzYjYtZTM5N2QwMTNmOGRlIiwiZXhwIjoxNTc3MzQ4MjI5LCJpZGVudGl0eSI6MiwiZnJlc2giOnRydWUsInR5cGUiOiJhY2Nlc3MifQ.917nUhpl-dNCE8wgjhGXjEjSlN8jBDYkKzL2dV8NowQ'  -H 'rest_api_plugin_http_token: changeme' http://localhost:8080/admin/rest_api/api?api=version

三.部署步驟

1.新增plugins{AIRFLOW_HOME}/plugins

2.在airflow.cfg中追加plugins_folder = /home/{USER_NAME}/airflow/plugins

3.wget https://github.com/teamclairvoyant/airflow-rest-api-plugin/archive/v1.0.7.branch.zip

4.解壓

unzip airflow-rest-api-plugin-v1.0.7.branch.zip
cp -r airflow-rest-api-plugin-v1.0.7.branch/plugins/* {AIRFLOW_HOME}/plugins/

5.(可選操作apitoken驗證 log配置 避免過多日志信息).在airflow.cfg中追加內(nèi)容

 [rest_api_plugin]
 
 # Logs global variables used in the REST API plugin when the plugin is loaded. Set to False by default to avoid too many logging messages.
 # DEFAULT: False
 log_loading = False
 
 # Filters out loading messages from the standard out 
 # DEFAULT: True
 filter_loading_messages_in_cli_response = True
 
 # HTTP Header Name to be used for authenticating REST calls for the REST API Plugin
 # DEFAULT: 'rest_api_plugin_http_token'
 #rest_api_plugin_http_token_header_name = rest_api_plugin_http_token
    
 # HTTP Token  to be used for authenticating REST calls for the REST API Plugin
 # DEFAULT: None
 # Comment this out to disable Authentication
 #rest_api_plugin_expected_http_token = changeme

6.重啟Airflow Web Server

7.如果安裝成功將會在Admin Tap 下面看到 REST API 的頁面用來測試rest請求

四.Airflow rest_api 插件使用

1.獲取airflow版本: http://{HOST}:{PORT}/admin/rest_api/api?api=version

2.獲取rest_api插件版本: http://{HOST}:{PORT}/admin/rest_api/api?api=rest_api_plugin_version

3.渲染任務(wù)實例模板: 
http://{HOST}:{PORT}/admin/rest_api/api?api=render
http://{HOST}:{PORT}/admin/rest_api/api?api=render&dag_id=value&task_id=value&execution_date=2017-01-02T03:04:05&subdir=value

4.CRUD操作變量 (目前不明白能做什么事情) 類似緩存 Context可以在自定義airflow腳本中使用 也可以在jinjia模板中使用
但它是專門用于任務(wù)之間的通信而不是全局設(shè)置

note: 可將json格式的文件 導(dǎo)入 系統(tǒng)自動識變量 可序列化json

http://{HOST}:{PORT}/admin/rest_api/api?api=variables

http://{HOST}:{PORT}/admin/rest_api/api?api=variables&cmd=set&key=value&value=value&get=value&json&default=value&import=value&export=value&delete=value

For setting a variable like myVar1=myValue1 use

http://{HOST}:{PORT}/admin/rest_api/api?api=variables&cmd=set&key=myVar1&value=myValue1

使用方法:
from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)

echo {{ var.value.<variable_name> }}

5.操作外部connections:http://{HOST}:{PORT}/admin/rest_api/api?api=connections

6.暫停DAG:http://{HOST}:{PORT}/admin/rest_api/api?api=pause&dag_id=test_id

7.停止暫停DAG: http://{HOST}:{PORT}/admin/rest_api/api?api=unpause&dag_id=test_id

8.獲取Dag中任務(wù)失敗后 所以依賴的task :
http://{HOST}:{PORT}/admin/rest_api/api?api=task_failed_deps&dag_id=value&task_id=value&execution_date=2017-01-02T03:04:05

9.觸發(fā)Dag運行: 
http://{HOST}:{PORT}/admin/rest_api/api?api=trigger_dag&dag_id=test_id
http://{HOST}:{PORT}/admin/rest_api/api?api=trigger_dag&dag_id=test_id&run_id=run_id_2016_01_01&conf=%7B%22key%22%3A%22value%22%7D

10.測試Dag:http://{HOST}:{PORT}/admin/rest_api/api?api=test&dag_id=value&task_id=value&execution_date=2017-01-02T03:04:05

11.獲取Dag執(zhí)行狀態(tài):http://{HOST}:{PORT}/admin/rest_api/api?api=dag_state&dag_id=test_id&execution_date=2017-01-02T03:04:05

12.運行單個task:
http://{HOST}:{PORT}/admin/rest_api/api?api=run&dag_id=value&task_id=value&execution_date=2017-01-02T03:04:05

13.展示一個Dag的所有Task:http://{HOST}:{PORT}/admin/rest_api/api?api=list_tasks&dag_id=test_id

14.列出所有Dags : http://{HOST}:{PORT}/admin/rest_api/api?api=list_dags

15.獲取Task運行狀態(tài) : 
http://{HOST}:{PORT}/admin/rest_api/api?api=task_state&dag_id=value&task_id=value&execution_date=2017-01-02T03:04:05

16.CRUD對pool操作:

 http://{HOST}:{PORT}/admin/rest_api/api?api=pool 類似調(diào)度池 可以分配任務(wù)權(quán)重

17.部署新的dag:

curl -X POST -H 'Content-Type: multipart/form-data' -F 'dag_file=@/path/to/dag.py' -F 'force=on' http://{HOST}:{PORT}/admin/rest_api/api?api=deploy_dag

post參數(shù):
1)dag_file:file-上傳部署的py文件
2)force:可選慕嚷,布爾型,文件已存在,是否強制上傳
3)pause:可選喝检,布爾型嗅辣,在創(chuàng)建時強制暫停dag并覆蓋'dags_are_paused_at_creation'配置
4)unpause(可選) - 布爾值 -  DAG在創(chuàng)建時將被強制取消暫停,并覆蓋'dags_are_paused_at_creation'配置

18.刷新Dag:

http://{HOST}:{PORT}/admin/rest_api/api?api=refresh_dag




Api 響應(yīng)參數(shù)

API Response
The API's will all return a common response object. It is a JSON object with the following entries in it:

airflow_cmd - String - Airflow CLI command being ran on the local machine
arguments - Dict - Dictionary with the arguments you passed in and their values
post_arguments - Dict - Dictionary with the post body arguments you passed in and their values
call_time - Timestamp - Time in which the request was received by the server
output - String - Text output from calling the CLI function
response_time - Timestamp - Time in which the response was sent back by the server
status - String - Response Status of the call. (possible values: OK, ERROR)
warning - String - A Warning message that's sent back from the API
http_response_code - Integer - HTTP Response code
Sample (Result of calling the versions endpoint)

{
  "airflow_cmd": "airflow version",
  "arguments": {},
  "call_time": "Tue, 29 Nov 2016 14:22:26 GMT",
  "http_response_code": 200,
  "output": "1.7.0",
  "response_time": "Tue, 29 Nov 2016 14:27:59 GMT",
  "status": "OK"
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末挠说,一起剝皮案震驚了整個濱河市澡谭,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌损俭,老刑警劉巖蛙奖,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異杆兵,居然都是意外死亡雁仲,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進(jìn)店門琐脏,熙熙樓的掌柜王于貴愁眉苦臉地迎上來攒砖,“玉大人,你說我怎么就攤上這事日裙〖礼茫” “怎么了?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵阅签,是天一觀的道長掐暮。 經(jīng)常有香客問我,道長政钟,這世上最難降的妖魔是什么路克? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任,我火速辦了婚禮养交,結(jié)果婚禮上精算,老公的妹妹穿的比我還像新娘。我一直安慰自己碎连,他們只是感情好灰羽,可當(dāng)我...
    茶點故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著鱼辙,像睡著了一般廉嚼。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上倒戏,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天怠噪,我揣著相機與錄音,去河邊找鬼杜跷。 笑死傍念,一個胖子當(dāng)著我的面吹牛矫夷,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播憋槐,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼双藕,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了阳仔?” 一聲冷哼從身側(cè)響起蔓彩,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎驳概,沒想到半個月后赤嚼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡顺又,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年更卒,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片稚照。...
    茶點故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡蹂空,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出果录,到底是詐尸還是另有隱情上枕,我是刑警寧澤,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布弱恒,位于F島的核電站辨萍,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏返弹。R本人自食惡果不足惜锈玉,卻給世界環(huán)境...
    茶點故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望义起。 院中可真熱鬧拉背,春花似錦、人聲如沸默终。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽齐蔽。三九已至两疚,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間肴熏,已是汗流浹背鬼雀。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工顷窒, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蛙吏,地道東北人源哩。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像鸦做,于是被迫代替她去往敵國和親励烦。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,724評論 2 351

推薦閱讀更多精彩內(nèi)容