需求:修復與增強寫?的數(shù)據(jù)
-
Tags 字段中萄凤,逗號分隔的?本應該是數(shù)組室抽,?不是? 個字符串
- 需求:后期需要對 Tags 進? Aggregation 統(tǒng)計
PUT tech_blogs/_doc/1
{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data"
}
Ingest Node
-
Elasticsearch 5.0 后,引?的?種新的節(jié)點類型靡努。默認配置下坪圾,每個節(jié)點都是 Ingest Node
具有預處理數(shù)據(jù)的能?,可攔截 Index 或 Bulk API 的請求
對數(shù)據(jù)進?轉(zhuǎn)換惑朦,并重新返回給 Index 或 Bulk API
-
?需 Logstash兽泄,就可以進?數(shù)據(jù)的預處理,例如
為某個字段設置默認值漾月;重命名某個字段的字段名病梢;對字段值進? Split 操作
?持設置 Painless 腳本,對數(shù)據(jù)進?更加復雜的加?
Pipeline & Processor
Pipeline - 管道會對通過的數(shù)據(jù)(?檔)栅屏,按照順序進?加?
-
Processor - Elasticsearch 對?些加?的?為進?了抽象包裝
- Elasticsearch 有很多內(nèi)置的 Processors飘千。也?持通過插件的?式,實現(xiàn)??的 Processor
使? Pipeline 切分字符串
# 測試split tags
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "to split blog tags",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"title": "Introducing big data......",
"tags": "hadoop,elasticsearch,spark",
"content": "You konw, for big data"
}
},
{
"_index": "index",
"_id": "idxx",
"_source": {
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}
}
]
}
為?檔增加字段
#同時為文檔栈雳,增加一個字段护奈。blog查看量
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "to split blog tags",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},
{
"set":{//為?檔增加 Views 字段
"field": "views",
"value": 0
}
}
]
},
"docs": [
{
"_index":"index",
"_id":"id",
"_source":{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data"
}
},
{
"_index":"index",
"_id":"idxx",
"_source":{
"title":"Introducing cloud computering",
"tags":"openstack,k8s",
"content":"You konw, for cloud"
}
}
]
}
Pipeline API
添加 Pipeline 并測試
# 為ES添加一個 Pipeline
PUT _ingest/pipeline/blog_pipeline
{
"description": "a blog pipeline",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},
{
"set":{
"field": "views",
"value": 0
}
}
]
}
#測試pipeline
POST _ingest/pipeline/blog_pipeline/_simulate
{
"docs": [
{
"_source": {
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}
}
]
}
Index & Update By Query
?些內(nèi)置 Processors
-
https://www.elastic.co/guide/en/elasticsearch/reference/7.1/ingest-processors.html
Split Processor (例:將給定字段值分成?個數(shù)組)
Remove / Rename Processor (例:移除?個重命名字段)
Append (例:為商品增加?個新的標簽)
Convert(例:將商品價格,從字符串轉(zhuǎn)換成 float 類型)
Date / JSON(例:?期格式轉(zhuǎn)換哥纫,字符串轉(zhuǎn) JSON 對象)
Date Index Name Processor (例:將通過該處理器的?檔霉旗,分配到指定時間格式的索引中)
內(nèi)置 Processors (續(xù))
https://www.elastic.co/guide/en/elasticsearch/reference/7.1/ingest-processors.html
Fail Processor (?旦出現(xiàn)異常,該 Pipeline 指定的錯誤信息能返回給?戶)
Foreach Process(數(shù)組字段蛀骇,數(shù)組的每個元素都會使?到?個相同的處理器)
Grok Processor(?志的?期格式切割)
Gsub / Join / Split(字符串替換 / 數(shù)組轉(zhuǎn)字符串/ 字符串轉(zhuǎn)數(shù)組)
Lowercase / Upcase(??寫轉(zhuǎn)換)
Ingest Node v.s Logstash
Logstash | Ingest Node | |
---|---|---|
數(shù)據(jù)輸?與輸出 | ?持從不同的數(shù)據(jù)源讀取厌秒, 并寫?不同的數(shù)據(jù)源 | ?持從 ES REST API 獲取數(shù)據(jù), 并且寫? Elasticsearch |
數(shù)據(jù)緩沖 | 實現(xiàn)了簡單的數(shù)據(jù)隊列擅憔,? 持重寫 | 不?持緩沖 |
數(shù)據(jù)處理 | ?持?量的插件鸵闪,也?持定 制開發(fā) | 內(nèi)置的插件,可以開發(fā) Plugin 進 ?擴展(Plugin 更新需要重啟) |
配置和使? | 增加了?定的架構(gòu)復雜度 | ?需額外部署 |
Painless 簡介
? Elasticsearch 5.x 后引?暑诸,專?為 Elasticsearch 設計蚌讼,擴展了 Java 的語法。
6.0 開始个榕,ES 只?持 Painless篡石。Groovy, JavaScript 和 Python 都不再?持
Painless ?持所有 Java 的數(shù)據(jù)類型及 Java API ?集
-
Painless Script 具備以下特性
?性能 / 安全
?持顯示類型或者動態(tài)定義類型
Painless 的?途
-
可以對?檔字段進?加?處理
更新或刪除字段西采,處理數(shù)據(jù)聚合操作
Script Field:對返回的字段提前進?計算
Function Score:對?檔的算分進?處理
在 Ingest Pipeline 中執(zhí)?腳本
在 Reindex API凰萨,Update By Query 時胖眷,對數(shù)據(jù)進?處理
通過 Painless 腳本訪問字段
上下? | 語法 |
---|---|
Ingestion | ctx.field_name |
Update | ctx._source.field_name |
Search & Aggregation | doc[“field_name”] |
案例 1:Script Processor
案例 2:?檔更新計數(shù)
PUT tech_blogs/_doc/1
{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data",
"views":0
}
POST tech_blogs/_update/1
{
"script": {
"source": "ctx._source.views += params.new_views", //腳本控制字段错沽。通過 ctx._source 訪問數(shù)據(jù)
"params": {
"new_views":100
}
}
}
案例 3:搜索時的 Script 字段
//通過 doc[‘field_name’]訪問數(shù)據(jù)
GET tech_blogs/_search
{
"script_fields": {
"rnd_views": {
"script": {
"lang": "painless",
"source": """
java.util.Random rnd = new Random();
doc['views'].value+rnd.nextInt(1000);
"""
}
}
},
"query": {
"match_all": {}
}
}
Script: Inline v.s Stored
腳本緩存
編譯的開銷相較?
-
Elasticsearch 會將腳本編譯后緩存在 Cache 中
Inline scripts 和 Stored Scripts 都會被緩 存
默認緩存 100 個腳本
參數(shù) | 說明 |
---|---|
script.cache.max_size | 設置最?緩存數(shù) |
script.cache.expire | 設置緩存超時 |
script.max_compilations_rate | 默認5分鐘最多75 次編譯 (75/5m) |
本節(jié)知識點
概念講解:Ingest Node,Pipeline 與 Processor
Ingest Node 與 Logstash 的?較
Pipeline 的 相關(guān)操作 / 內(nèi)置 Processor 講解與演示
-
Painless 腳本與
Ingestion (Pipeline)
Update
Search & Aggregation
課程demo
#########Demo for Pipeline###############
DELETE tech_blogs
#Blog數(shù)據(jù)底哥,包含3個字段翰守,tags用逗號間隔
PUT tech_blogs/_doc/1
{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data"
}
# 測試split tags
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "to split blog tags",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"title": "Introducing big data......",
"tags": "hadoop,elasticsearch,spark",
"content": "You konw, for big data"
}
},
{
"_index": "index",
"_id": "idxx",
"_source": {
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}
}
]
}
#同時為文檔,增加一個字段。blog查看量
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "to split blog tags",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},
{
"set":{
"field": "views",
"value": 0
}
}
]
},
"docs": [
{
"_index":"index",
"_id":"id",
"_source":{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data"
}
},
{
"_index":"index",
"_id":"idxx",
"_source":{
"title":"Introducing cloud computering",
"tags":"openstack,k8s",
"content":"You konw, for cloud"
}
}
]
}
# 為ES添加一個 Pipeline
PUT _ingest/pipeline/blog_pipeline
{
"description": "a blog pipeline",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},
{
"set":{
"field": "views",
"value": 0
}
}
]
}
#查看Pipleline
GET _ingest/pipeline/blog_pipeline
#測試pipeline
POST _ingest/pipeline/blog_pipeline/_simulate
{
"docs": [
{
"_source": {
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}
}
]
}
#不使用pipeline更新數(shù)據(jù)
PUT tech_blogs/_doc/1
{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data"
}
#使用pipeline更新數(shù)據(jù)
PUT tech_blogs/_doc/2?pipeline=blog_pipeline
{
"title": "Introducing cloud computering",
"tags": "openstack,k8s",
"content": "You konw, for cloud"
}
#查看兩條數(shù)據(jù),一條被處理驻呐,一條未被處理
POST tech_blogs/_search
{}
#update_by_query 會導致錯誤
POST tech_blogs/_update_by_query?pipeline=blog_pipeline
{
}
#增加update_by_query的條件
POST tech_blogs/_update_by_query?pipeline=blog_pipeline
{
"query": {
"bool": {
"must_not": {
"exists": {
"field": "views"
}
}
}
}
}
#########Demo for Painless###############
# 增加一個 Script Prcessor
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "to split blog tags",
"processors": [
{
"split": {
"field": "tags",
"separator": ","
}
},
{
"script": {
"source": """
if(ctx.containsKey("content")){
ctx.content_length = ctx.content.length();
}else{
ctx.content_length=0;
}
"""
}
},
{
"set":{
"field": "views",
"value": 0
}
}
]
},
"docs": [
{
"_index":"index",
"_id":"id",
"_source":{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data"
}
},
{
"_index":"index",
"_id":"idxx",
"_source":{
"title":"Introducing cloud computering",
"tags":"openstack,k8s",
"content":"You konw, for cloud"
}
}
]
}
DELETE tech_blogs
PUT tech_blogs/_doc/1
{
"title":"Introducing big data......",
"tags":"hadoop,elasticsearch,spark",
"content":"You konw, for big data",
"views":0
}
POST tech_blogs/_update/1
{
"script": {
"source": "ctx._source.views += params.new_views",
"params": {
"new_views":100
}
}
}
# 查看views計數(shù)
POST tech_blogs/_search
{
}
#保存腳本在 Cluster State
POST _scripts/update_views
{
"script":{
"lang": "painless",
"source": "ctx._source.views += params.new_views"
}
}
POST tech_blogs/_update/1
{
"script": {
"id": "update_views",
"params": {
"new_views":1000
}
}
}
GET tech_blogs/_search
{
"script_fields": {
"rnd_views": {
"script": {
"lang": "painless",
"source": """
java.util.Random rnd = new Random();
doc['views'].value+rnd.nextInt(1000);
"""
}
}
},
"query": {
"match_all": {}
}
}
相關(guān)閱讀
- https://www.elastic.co/cn/blog/should-i-use-logstash-or-elasticsearch-ingest-nodes
- https://www.elastic.co/guide/en/elasticsearch/reference/7.1/ingest-apis.html
- https://www.elastic.co/guide/en/elasticsearch/reference/7.1/ingest-processors.html
- https://www.elastic.co/guide/en/elasticsearch/painless/7.1/painless-lang-spec.html
- https://www.elastic.co/guide/en/elasticsearch/painless/7.1/painless-api-reference.html