Elasticsearch中的Ingest pipelines
Ingest pipelines 可以在建立索引之前對(duì)數(shù)據(jù)執(zhí)行常見(jiàn)的轉(zhuǎn)換倘零。例如,您可以使用管道來(lái)刪除字段、從文本中提取值和豐富數(shù)據(jù)豺妓。
pipeline 由一系列稱(chēng)為處理器的可配置任務(wù)組成抡笼。每個(gè)處理器按順序運(yùn)行旦部,對(duì)傳入的文檔進(jìn)行特定的更改捐腿。處理器運(yùn)行后宪躯,Elasticsearch將轉(zhuǎn)換后的文檔添加到數(shù)據(jù)流或索引中罐旗。
當(dāng)您創(chuàng)建或更新管道時(shí)膳汪,您可以指定一個(gè)可選的version
。您可以使用這個(gè)版本號(hào)和if_version參數(shù)來(lái)有條件地更新管道九秀。如果指定了if_version參數(shù)遗嗽,則成功的更新將增加管道的版本號(hào)。
創(chuàng)建
PUT _ingest/pipeline/my-pipeline-id
{
"description" : "My optional pipeline description",
"processors" : [
{
"set": {
"field": "ingest_field",
"value": "1111111"
}
}
]
}
查看
GET /_ingest/pipeline/my-pipeline-id
可以使用_meta參數(shù)向管道添加任意元數(shù)據(jù)鼓蜒。
模擬文檔執(zhí)行simulate
模擬一組文檔執(zhí)行ingest pipelines
POST /_ingest/pipeline/ingest_test/_simulate
{
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "rab"
}
}
]
}
文檔執(zhí)行
post document
POST dsl_test_index/_doc?pipeline=ingest_test
{
"id": "5",
"TITLE": "ingest_test"
}
update by query
POST dsl_test_index/_update_by_query?pipeline=ingest_test
{
"query": {
"term": {
"ID": "1"
}
},
"max_docs": 1
}
reindex
POST _reindex
{
"source": {
"index": "dsl_test_index"
},
"dest": {
"index": "dsl_test_index_3",
"op_type": "create",
"pipeline": "ingest_test"
}
}
index.default_pipeline
索引的默認(rèn)的ingest pipeline痹换。如果設(shè)置了默認(rèn)pipeline且pipeline不存在,則索引請(qǐng)求將失敗都弹。默認(rèn)值可以使用pipeline參數(shù)重寫(xiě)娇豫。特殊管道名稱(chēng)_none表示不應(yīng)運(yùn)行ingest pipeline。
index.final_pipeline
索引的最終的ingest pipeline畅厢。如果設(shè)置了最終pipeline且pipeline不存在冯痢,則索引請(qǐng)求將失敗。最終的pipeline總是在請(qǐng)求pipeline(如果指定了)和默認(rèn)pipeline(如果存在)之后運(yùn)行框杜。特殊的pipeline名稱(chēng)_none表示不會(huì)運(yùn)行任何ingest pipeline浦楣。
復(fù)雜寫(xiě)法
script
POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"script": {
"description": "Extract 'tags' from 'env' field",
"lang": "painless",
"source": """
String[] envSplit = ctx['env'].splitOnToken(params['delimiter']);
ArrayList tags = new ArrayList();
tags.add(envSplit[params['position']].trim());
ctx['tags'] = tags;
""",
"params": {
"delimiter": "-",
"position": 1
}
}
}
]
},
"docs": [
{
"_source": {
"env": "es01-prod"
}
}
]
}