初識Ingest Pipeline
ES在具備node.ingest:true
的節(jié)點上提供了pipeline
功能,可以在文檔真正寫入ES之前進行一些轉換庞钢,插入新值,修改文檔內容等功能。pipeline會攔截index和bulk請求骗炉,請求經處理后再寫入ES。用戶可以定義一系列的"處理器"材失,在處理文檔時痕鳍,pipeline會按照processor的定義順序執(zhí)行processor。定義個格式:
PUT _ingest/pipeline/my_pipeline_id
{
"description" : "describe pipeline",
"processors" : [
{
"set" : {
"field": "foo",
"value": "new"
}
}
]
}
官方示例:在PIPELINE中使用Scripts Processor
以ES 官方文檔中的這個例子進行試驗:https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-context-examples.html
其提供了一份數據龙巨,里面包含了歌劇的場次信息統(tǒng)計笼呆。我們首先要把樣例數據導入到ES中。其中一條是這樣:
{ "create" : { "_index" : "seats", "_type" : "seat", "_id" : "36203" } }
{ "theatre" : "Skyline", "play" : "Auntie Jo", "actors": [ "Jo Hangum", "Jon Hittle", "Rob Kettleman", "Laura Conrad", "Simon Hower", "Nora Blue" ], "date": "2018-12-14", "time": "5:40PM", "row": 11, "number": 14, "cost": 17.5, "sold": false }
我們可以看到旨别,其內容是有問題的诗赌,_type
類型在目前的版本中即將被廢棄,只能是doc
類型秸弛。因此如果按照原文的方法導入铭若,是會失敗的。
https://www.elastic.co/guide/en/elasticsearch/reference/7.6/mapping-type-field.html
我們需要對_type進行修改递览。在painless
腳本中叼屠,可以使用ctx['_type']來修改插入操作的type.
ctx['_index']
The name of the index.
ctx['_type']
The type of document within an index.
增加一條ingest的處理器。
PUT _ingest/pipeline/seats
{
"description": "seats-ingest",
"processors": [
{"script": {
"lang": "painless",
"source": """
ctx['_type'] = "_doc";
"""
}}
]
}
在kibana上嘗試插入一條:
POST _bulk?pipeline=seats
{ "create" : { "_index" : "seats", "_type" : "seat", "_id" : "36203" } }
{ "theatre" : "Skyline", "play" : "Auntie Jo", "actors": [ "Jo Hangum", "Jon Hittle", "Rob Kettleman", "Laura Conrad", "Simon Hower", "Nora Blue" ], "date": "2018-12-14", "time": "5:40PM", "row": 11, "number": 14, "cost": 17.5, "sold": false }
#! Deprecation: [types removal] Specifying types in bulk requests is deprecated.
{
"took" : 4960,
"ingest_took" : 373,
"errors" : false,
"items" : [
{
"create" : {
"_index" : "seats",
"_type" : "_doc",
"_id" : "36203",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1,
"status" : 201
}
}
]
}
可以看到绞铃,是有_bulk
請求插入時指定_doc
類型的做法已經廢棄了镜雨。可以看到_type
已經被改成了_doc
儿捧,而如果沒有這個pipeline荚坞,結果將是_bulk
請求中指定的seats
類型。
修改后腳本改為如下:
PUT _ingest/pipeline/seats
{
"description": "seats-ingest",
"processors": [
{"script": {
"lang": "painless",
"source": """
String[] split(String str, char delimiter)
{
int count = 0;
for (char c: str.toCharArray())
{
if (c == delimiter)
{
++ count;
}
}
if (count == 0)
{
return new String[] {str};
}
String[] r = new String[count + 1];
int i0 = 0, i1 = 0, n = 0;
for (char c: str.toCharArray())
{
if (c == delimiter)
{
r[n] = str.substring(i0, i1);
++n;
i0 = i1 + 1;
}
++ i1;
}
r[count] = str.substring(i0, i1);
return r;
}
ctx['_type'] = "_doc";
String[] date_array = split(ctx.date, (char)"-");
String year = date_array[0].trim();
String month = date_array[1].trim();
if (month.length() == 1)
{
month = "0" + month;
}
String day = date_array[2].trim();
if (day.length() == 1)
{
day = "0" + day;
}
boolean is_pm = ctx.time.substring(ctx.time.length() - 2).equals("PM");
String[] time_array = split(ctx.time.substring(0, ctx.time.length()-2), (char)":");
int hour = Integer.parseInt(time_array[0].trim());
if (is_pm)
{
hour += 12;
}
String hour_str = "";
if (hour < 10)
{
hour_str += "0";
}
hour_str += hour;
int min = Integer.parseInt(time_array[1].trim());
String min_str = "";
if (min < 10)
{
min_str += "0";
}
min_str += min;
String date_time = year + "-" + month + "-" + day + "T" + hour_str + ":" + min + ":00+08:00";
ZonedDateTime dt = ZonedDateTime.parse(
date_time, DateTimeFormatter.ISO_OFFSET_DATE_TIME);
ctx.datetime = dt.getLong(ChronoField.INSTANT_SECONDS)*1000L;
ctx["_type"] = "_doc";
"""
}}
]
}
對應的導入命令也做一下修改菲盾,改為:
curl -k -XPOST https://elastic:elastic@localhost:9200/_bulk?pipeline=seats -H "Content-Type: application/x-ndjson" --data-binary "@/home/DATA/seats-init.json"
之前試了好幾次颓影,都在指定輸入文件這一步出錯了,最后發(fā)現是需要在路徑最前面加一個@
懒鉴。
可能是我的機器不是特別好诡挂,導入時間有點長。7萬多條記錄花了很長時間临谱。事實上最后導入失敗了咆畏,只導入了3000多條。錯誤顯示主分片不可訪問吴裤。
對比ClickHouse旧找,8000萬條記錄只花了幾分鐘就搞定。用來進行分析麦牺,ClickHouse各方面指標要來得更好钮蛛。
對pipeline的添加鞭缭,查看,修改魏颓,模擬調試API
ES 提供了一些API對ingest 的pipeline進行調試.
添加
PUT _ingest/pipeline/my-pipeline-id
{
"description" : "describe pipeline",
"processors" : [
{
"set" : {
"field": "foo",
"value": "bar"
}
}
]
}
查看
GET _ingest/pipeline/my-pipeline-id
刪除
DELETE _ingest/pipeline/my-pipeline-id
模擬
POST _ingest/pipeline/_simulate
{
"pipeline" : {
// pipeline definition here
},
"docs" : [
{ "_source": {/** first document **/} },
{ "_source": {/** second document **/} },
// ...
]
}
也可以這樣岭辣,在POST行中指定索引名稱進行處理:
POST _ingest/pipeline/seats/_simulate
{
"docs": [
{
"_source": {
"theatre": "Skyline",
"play": "Auntie Jo",
"actors": [
"Jo Hangum",
"Jon Hittle",
"Rob Kettleman",
"Laura Conrad",
"Simon Hower",
"Nora Blue"
],
"date": "2018-12-14",
"time": "5:40PM",
"row": 11,
"number": 14,
"cost": 17.5,
"sold": false
}
}
]
}
如何在Pipeline中操作使用文檔的數據
文檔的"_source"數據
上文的例子中,我們在Pipeline中使用script處理器時甸饱,可以使用painless
的內置數據結構ctx對文檔source進行處理沦童。
ctx是一個mapping對象,內部存儲了包括文檔的meta數據和_source數據:
ingest情況下
_index: ctx._index or ctx["_index"]
_doc: ctx._doc or ctx["_doc"]
_op: ctx._op or ctx["_op"]
xxxx: ctx.xxxx or ctx["xxxx"]
Update情況下
xxxx: ctx._source.xxxx
在不使用script的情況下叹话,同樣可以對_source中的數據進行訪問偷遗。
比如說使用set處理器的情況下:
PUT _ingest/pipeline/bbb
{
"description": "test",
"processors": [
{"set": {
"field": "_source.foo",
"value": "bar"
}}
]
}
PUT _ingest/pipeline/aaa
{
"description": "test",
"processors": [
{"set": {
"field": "foo",
"value": "bar"
}}
]
}
可以直接通過數據名或者_source.數據嗎進行處理。
文檔的metadata
文檔的metadata包括_index, _type, _routing, _id驼壶,在pipeline中均可以直接訪問氏豌。
Ingest的metadata
PUT _ingest/pipeline/ccc
{
"description": "test",
"processors": [
{"set": {
"field": "received",
"value": "{{_ingest.timestamp}}"
}}
]
}
不同于metadata里的成員,_ingest可以是一個_source中的合法成員热凹,因此泵喘,訪問ingest的meta數據需要使用這樣的方式{{_ingest.timestamp}}。
template 中的數據與metadata
在模擬中般妙,也可以使用pipeline纪铺,同樣,訪問template中的數據也需要使用{{ }}碟渺。
{
"set": {
"field": "field_c",
"value": "{{field_a}} {{field_b}}"
}
}
上面的這個set processor鲜锚,將原有的 field_a 和 field_b 做了拼接之后,將其賦給了一個新字段field_c止状。
同時烹棉,template中也支持動態(tài)設置字段攒霹。
{
"set": {
"field": "{{service}}",
"value": "{{code}}"
}
}
要說明的是怯疤,如果想在template中使用,需要設置動態(tài)索引參數:
index.default_pipeline
The default ingest node pipeline for this index. Index requests will fail if the default pipeline is set and the pipeline does not exist. The default may be overridden using the pipeline
parameter. The special pipeline name _none
indicates no ingest pipeline should be run.
index.final_pipeline
The final ingest node pipeline for this index. Index requests will fail if the final pipeline is set and the pipeline does not exist. The final pipeline always runs after the request pipeline (if specified) and the default pipeline (if it exists). The special pipeline name _none
indicates no ingest pipeline will run.
需要注意的是: final_pipeline是7.5版本之后才支持的催束。之前版本沒有集峦,比如我用的7.4版本,這個參數就不支持抠刺。
使用方法塔淤,在index的setting中指定:
"settings": {
"number_of_shards": 1,
"default_pipeline": "_none"
},
default_pipeline會被命令行中指定的pipeline覆蓋,而final_pipeline會在最后執(zhí)行速妖。
條件執(zhí)行
Pipeline中的每個processor都支持一個if參數高蜂,滿足條件的processor才會執(zhí)行。
PUT _ingest/pipeline/drop_guests_network
{
"processors": [
{
"drop": {
"if": "ctx.network_name == 'Guest'"
}
}
]
}
這是官方文檔中的一個例子罕容,當network_name為Guest時备恤,丟棄這條記錄稿饰。
同時,if
參數也支持復雜的腳本露泊。
PUT _ingest/pipeline/not_prod_dropper
{
"processors": [
{
"drop": {
"if": """
Collection tags = ctx.tags;
if(tags != null){
for (String tag : tags) {
if (tag.toLowerCase().contains('prod')) {
return false;
}
}
}
return true;
"""
}
}
]
}
編寫一段painless腳本喉镰,返回true or false即可。
Processors中可以使用嵌套對象進行判斷惭笑,但是為了防止產生null point exception侣姆,可以使用?.
來訪問成員以防止成員不存在。
PUT _ingest/pipeline/not_prod_dropper
{
"processors": [
{
"drop": {
"if": """
Collection tags = ctx.tags;
if(tags != null){
for (String tag : tags) {
if (tag.toLowerCase().contains('prod')) {
return false;
}
}
}
return true;
"""
}
}
]
}
有一種processor名為pipeline沉噩,可以結合if
條件指定對應的pipeline名稱捺宗。
curl -X PUT "localhost:9200/_ingest/pipeline/logs_pipeline?pretty" -H 'Content-Type: application/json' -d'
{
"description": "A pipeline of pipelines for log files",
"version": 1,
"processors": [
{
"pipeline": {
"if": "ctx.service?.name == \u0027apache_httpd\u0027",
"name": "httpd_pipeline"
}
},
{
"pipeline": {
"if": "ctx.service?.name == \u0027syslog\u0027",
"name": "syslog_pipeline"
}
},
{
"fail": {
"message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
}
}
]
}
'
上面的pipeline processor指定了兩個參數,if
是該processor滿足的條件屁擅,name是對應需要執(zhí)行的pipeline的名稱偿凭。
此外,pipeline的條件中還可以使用正則表達式派歌。
PUT _ingest/pipeline/check_url
{
"processors": [
{
"set": {
"if": "ctx.href?.url =~ /^http[^s]/",
"field": "href.insecure",
"value": true
}
}
]
}
異常處理
正常情況下弯囊,pipeline會順序執(zhí)行所有的processor,并在遇到第一個異常時胶果,中斷當前文檔的處理匾嘱。但是有時候,用戶希望自定義異常的處理早抠,這時候就需要為processor設置on_failure
參數霎烙。這樣當process遇到error的時候,會執(zhí)行on_failure里的內容蕊连,然后繼續(xù)執(zhí)行下一個處理器悬垃。
{
"description" : "my first pipeline with handled exceptions",
"processors" : [
{
"rename" : {
"field" : "foo",
"target_field" : "bar",
"on_failure" : [
{
"set" : {
"field" : "error",
"value" : "field \"foo\" does not exist, cannot rename to \"bar\""
}
}
]
}
}
]
}
也可以進行索引級別的異常處理:
{
"description" : "my first pipeline with handled exceptions",
"processors" : [
{
"rename" : {
"field" : "foo",
"target_field" : "bar"
]
}
}
],
"on_failure" : [
{
"set" : {
"field" : "_index",
"value" : "failed-{{ _index }}"
}
}
]
}
值得注意的是,同一個異常只能被捕獲一次甘苍,這個和java的異常處理機制是一致的尝蠕。如果同時定義了processor和pipeline級別的異常處理模塊,則異常只會被processor級別所捕獲载庭。
同時看彼,我們也可以讓processor忽略此處的錯誤。只要將參數ignore_failure置為true即可囚聚。
{
"description" : "my first pipeline with handled exceptions",
"processors" : [
{
"rename" : {
"field" : "foo",
"target_field" : "bar",
"ignore_failure" : true
}
}
]
}