7.4-IngestPipeline&PainlessScript

需求:修復與增強寫?的數(shù)據(jù)

  • Tags 字段中萄凤,逗號分隔的?本應該是數(shù)組室抽,?不是? 個字符串

    • 需求:后期需要對 Tags 進? Aggregation 統(tǒng)計
PUT tech_blogs/_doc/1
{
  "title":"Introducing big data......",
  "tags":"hadoop,elasticsearch,spark",
  "content":"You konw, for big data"
}

Ingest Node

  • Elasticsearch 5.0 后,引?的?種新的節(jié)點類型靡努。默認配置下坪圾,每個節(jié)點都是 Ingest Node

    • 具有預處理數(shù)據(jù)的能?,可攔截 Index 或 Bulk API 的請求

    • 對數(shù)據(jù)進?轉(zhuǎn)換惑朦,并重新返回給 Index 或 Bulk API

  • ?需 Logstash兽泄,就可以進?數(shù)據(jù)的預處理,例如

    • 為某個字段設置默認值漾月;重命名某個字段的字段名病梢;對字段值進? Split 操作

    • ?持設置 Painless 腳本,對數(shù)據(jù)進?更加復雜的加?

Pipeline & Processor

  • Pipeline - 管道會對通過的數(shù)據(jù)(?檔)栅屏,按照順序進?加?

  • Processor - Elasticsearch 對?些加?的?為進?了抽象包裝

    • Elasticsearch 有很多內(nèi)置的 Processors飘千。也?持通過插件的?式,實現(xiàn)??的 Processor
image.png

使? Pipeline 切分字符串

# 測試split tags
POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "to split blog tags",
    "processors": [
      {
        "split": {
          "field": "tags",
          "separator": ","
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "title": "Introducing big data......",
        "tags": "hadoop,elasticsearch,spark",
        "content": "You konw, for big data"
      }
    },
    {
      "_index": "index",
      "_id": "idxx",
      "_source": {
        "title": "Introducing cloud computering",
        "tags": "openstack,k8s",
        "content": "You konw, for cloud"
      }
    }
  ]
}

image.png

為?檔增加字段

#同時為文檔栈雳,增加一個字段护奈。blog查看量
POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "to split blog tags",
    "processors": [
      {
        "split": {
          "field": "tags",
          "separator": ","
        }
      },

      {
        "set":{//為?檔增加 Views 字段 
          "field": "views",
          "value": 0
        }
      }
    ]
  },

  "docs": [
    {
      "_index":"index",
      "_id":"id",
      "_source":{
        "title":"Introducing big data......",
  "tags":"hadoop,elasticsearch,spark",
  "content":"You konw, for big data"
      }
    },


    {
      "_index":"index",
      "_id":"idxx",
      "_source":{
        "title":"Introducing cloud computering",
  "tags":"openstack,k8s",
  "content":"You konw, for cloud"
      }
    }

    ]
}
image.png

Pipeline API

image.png

添加 Pipeline 并測試

# 為ES添加一個 Pipeline
PUT _ingest/pipeline/blog_pipeline
{
  "description": "a blog pipeline",
  "processors": [
      {
        "split": {
          "field": "tags",
          "separator": ","
        }
      },

      {
        "set":{
          "field": "views",
          "value": 0
        }
      }
    ]
}

#測試pipeline
POST _ingest/pipeline/blog_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "title": "Introducing cloud computering",
        "tags": "openstack,k8s",
        "content": "You konw, for cloud"
      }
    }
  ]
}

Index & Update By Query

image.png

?些內(nèi)置 Processors

  • https://www.elastic.co/guide/en/elasticsearch/reference/7.1/ingest-processors.html

    • Split Processor (例:將給定字段值分成?個數(shù)組)

    • Remove / Rename Processor (例:移除?個重命名字段)

    • Append (例:為商品增加?個新的標簽)

    • Convert(例:將商品價格,從字符串轉(zhuǎn)換成 float 類型)

    • Date / JSON(例:?期格式轉(zhuǎn)換哥纫,字符串轉(zhuǎn) JSON 對象)

    • Date Index Name Processor (例:將通過該處理器的?檔霉旗,分配到指定時間格式的索引中)

內(nèi)置 Processors (續(xù))

  • https://www.elastic.co/guide/en/elasticsearch/reference/7.1/ingest-processors.html

  • Fail Processor (?旦出現(xiàn)異常,該 Pipeline 指定的錯誤信息能返回給?戶)

  • Foreach Process(數(shù)組字段蛀骇,數(shù)組的每個元素都會使?到?個相同的處理器)

  • Grok Processor(?志的?期格式切割)

  • Gsub / Join / Split(字符串替換 / 數(shù)組轉(zhuǎn)字符串/ 字符串轉(zhuǎn)數(shù)組)

  • Lowercase / Upcase(??寫轉(zhuǎn)換)

Ingest Node v.s Logstash

Logstash Ingest Node
數(shù)據(jù)輸?與輸出 ?持從不同的數(shù)據(jù)源讀取厌秒, 并寫?不同的數(shù)據(jù)源 ?持從 ES REST API 獲取數(shù)據(jù), 并且寫? Elasticsearch
數(shù)據(jù)緩沖 實現(xiàn)了簡單的數(shù)據(jù)隊列擅憔,? 持重寫 不?持緩沖
數(shù)據(jù)處理 ?持?量的插件鸵闪,也?持定 制開發(fā) 內(nèi)置的插件,可以開發(fā) Plugin 進 ?擴展(Plugin 更新需要重啟)
配置和使? 增加了?定的架構(gòu)復雜度 ?需額外部署

Painless 簡介

  • ? Elasticsearch 5.x 后引?暑诸,專?為 Elasticsearch 設計蚌讼,擴展了 Java 的語法。

  • 6.0 開始个榕,ES 只?持 Painless篡石。Groovy, JavaScript 和 Python 都不再?持

  • Painless ?持所有 Java 的數(shù)據(jù)類型及 Java API ?集

  • Painless Script 具備以下特性

    • ?性能 / 安全

    • ?持顯示類型或者動態(tài)定義類型

Painless 的?途

  • 可以對?檔字段進?加?處理

    • 更新或刪除字段西采,處理數(shù)據(jù)聚合操作

    • Script Field:對返回的字段提前進?計算

    • Function Score:對?檔的算分進?處理

  • 在 Ingest Pipeline 中執(zhí)?腳本

  • 在 Reindex API凰萨,Update By Query 時胖眷,對數(shù)據(jù)進?處理

通過 Painless 腳本訪問字段

上下? 語法
Ingestion ctx.field_name
Update ctx._source.field_name
Search & Aggregation doc[“field_name”]

案例 1:Script Processor

image.png

案例 2:?檔更新計數(shù)

PUT tech_blogs/_doc/1
{
  "title":"Introducing big data......",
  "tags":"hadoop,elasticsearch,spark",
  "content":"You konw, for big data",
  "views":0
}

POST tech_blogs/_update/1
{
  "script": {
    "source": "ctx._source.views += params.new_views", //腳本控制字段错沽。通過 ctx._source 訪問數(shù)據(jù) 
    "params": {
      "new_views":100
    }
  }
}

案例 3:搜索時的 Script 字段

//通過 doc[‘field_name’]訪問數(shù)據(jù) 
GET tech_blogs/_search
{
  "script_fields": {
    "rnd_views": {
      "script": {
        "lang": "painless",
        "source": """
          java.util.Random rnd = new Random();
          doc['views'].value+rnd.nextInt(1000);
        """
      }
    }
  },
  "query": {
    "match_all": {}
  }
}
image.png

Script: Inline v.s Stored

image.png

腳本緩存

  • 編譯的開銷相較?

  • Elasticsearch 會將腳本編譯后緩存在 Cache 中

    • Inline scripts 和 Stored Scripts 都會被緩 存

    • 默認緩存 100 個腳本

參數(shù) 說明
script.cache.max_size 設置最?緩存數(shù)
script.cache.expire 設置緩存超時
script.max_compilations_rate 默認5分鐘最多75 次編譯 (75/5m)

本節(jié)知識點

  • 概念講解:Ingest Node,Pipeline 與 Processor

  • Ingest Node 與 Logstash 的?較

  • Pipeline 的 相關(guān)操作 / 內(nèi)置 Processor 講解與演示

  • Painless 腳本與

    • Ingestion (Pipeline)

    • Update

    • Search & Aggregation

課程demo

#########Demo for Pipeline###############

DELETE tech_blogs

#Blog數(shù)據(jù)底哥,包含3個字段翰守,tags用逗號間隔
PUT tech_blogs/_doc/1
{
  "title":"Introducing big data......",
  "tags":"hadoop,elasticsearch,spark",
  "content":"You konw, for big data"
}


# 測試split tags
POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "to split blog tags",
    "processors": [
      {
        "split": {
          "field": "tags",
          "separator": ","
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "title": "Introducing big data......",
        "tags": "hadoop,elasticsearch,spark",
        "content": "You konw, for big data"
      }
    },
    {
      "_index": "index",
      "_id": "idxx",
      "_source": {
        "title": "Introducing cloud computering",
        "tags": "openstack,k8s",
        "content": "You konw, for cloud"
      }
    }
  ]
}


#同時為文檔,增加一個字段。blog查看量
POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "to split blog tags",
    "processors": [
      {
        "split": {
          "field": "tags",
          "separator": ","
        }
      },

      {
        "set":{
          "field": "views",
          "value": 0
        }
      }
    ]
  },

  "docs": [
    {
      "_index":"index",
      "_id":"id",
      "_source":{
        "title":"Introducing big data......",
  "tags":"hadoop,elasticsearch,spark",
  "content":"You konw, for big data"
      }
    },


    {
      "_index":"index",
      "_id":"idxx",
      "_source":{
        "title":"Introducing cloud computering",
  "tags":"openstack,k8s",
  "content":"You konw, for cloud"
      }
    }

    ]
}



# 為ES添加一個 Pipeline
PUT _ingest/pipeline/blog_pipeline
{
  "description": "a blog pipeline",
  "processors": [
      {
        "split": {
          "field": "tags",
          "separator": ","
        }
      },

      {
        "set":{
          "field": "views",
          "value": 0
        }
      }
    ]
}

#查看Pipleline
GET _ingest/pipeline/blog_pipeline


#測試pipeline
POST _ingest/pipeline/blog_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "title": "Introducing cloud computering",
        "tags": "openstack,k8s",
        "content": "You konw, for cloud"
      }
    }
  ]
}

#不使用pipeline更新數(shù)據(jù)
PUT tech_blogs/_doc/1
{
  "title":"Introducing big data......",
  "tags":"hadoop,elasticsearch,spark",
  "content":"You konw, for big data"
}

#使用pipeline更新數(shù)據(jù)
PUT tech_blogs/_doc/2?pipeline=blog_pipeline
{
  "title": "Introducing cloud computering",
  "tags": "openstack,k8s",
  "content": "You konw, for cloud"
}


#查看兩條數(shù)據(jù),一條被處理驻呐,一條未被處理
POST tech_blogs/_search
{}

#update_by_query 會導致錯誤
POST tech_blogs/_update_by_query?pipeline=blog_pipeline
{
}

#增加update_by_query的條件
POST tech_blogs/_update_by_query?pipeline=blog_pipeline
{
    "query": {
        "bool": {
            "must_not": {
                "exists": {
                    "field": "views"
                }
            }
        }
    }
}


#########Demo for Painless###############

# 增加一個 Script Prcessor
POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "to split blog tags",
    "processors": [
      {
        "split": {
          "field": "tags",
          "separator": ","
        }
      },
      {
        "script": {
          "source": """
          if(ctx.containsKey("content")){
            ctx.content_length = ctx.content.length();
          }else{
            ctx.content_length=0;
          }


          """
        }
      },

      {
        "set":{
          "field": "views",
          "value": 0
        }
      }
    ]
  },

  "docs": [
    {
      "_index":"index",
      "_id":"id",
      "_source":{
        "title":"Introducing big data......",
  "tags":"hadoop,elasticsearch,spark",
  "content":"You konw, for big data"
      }
    },


    {
      "_index":"index",
      "_id":"idxx",
      "_source":{
        "title":"Introducing cloud computering",
  "tags":"openstack,k8s",
  "content":"You konw, for cloud"
      }
    }

    ]
}


DELETE tech_blogs
PUT tech_blogs/_doc/1
{
  "title":"Introducing big data......",
  "tags":"hadoop,elasticsearch,spark",
  "content":"You konw, for big data",
  "views":0
}

POST tech_blogs/_update/1
{
  "script": {
    "source": "ctx._source.views += params.new_views",
    "params": {
      "new_views":100
    }
  }
}

# 查看views計數(shù)
POST tech_blogs/_search
{

}

#保存腳本在 Cluster State
POST _scripts/update_views
{
  "script":{
    "lang": "painless",
    "source": "ctx._source.views += params.new_views"
  }
}

POST tech_blogs/_update/1
{
  "script": {
    "id": "update_views",
    "params": {
      "new_views":1000
    }
  }
}


GET tech_blogs/_search
{
  "script_fields": {
    "rnd_views": {
      "script": {
        "lang": "painless",
        "source": """
          java.util.Random rnd = new Random();
          doc['views'].value+rnd.nextInt(1000);
        """
      }
    }
  },
  "query": {
    "match_all": {}
  }
}

相關(guān)閱讀

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末灌诅,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子含末,更是在濱河造成了極大的恐慌猜拾,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件佣盒,死亡現(xiàn)場離奇詭異挎袜,居然都是意外死亡,警方通過查閱死者的電腦和手機肥惭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進店門盯仪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蜜葱,你說我怎么就攤上這事全景。” “怎么了牵囤?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵蚪燕,是天一觀的道長。 經(jīng)常有香客問我奔浅,道長馆纳,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任汹桦,我火速辦了婚禮鲁驶,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘舞骆。我一直安慰自己钥弯,他們只是感情好,可當我...
    茶點故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布督禽。 她就那樣靜靜地躺著脆霎,像睡著了一般。 火紅的嫁衣襯著肌膚如雪狈惫。 梳的紋絲不亂的頭發(fā)上睛蛛,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天,我揣著相機與錄音,去河邊找鬼忆肾。 笑死荸频,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的客冈。 我是一名探鬼主播旭从,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼场仲!你這毒婦竟也來了和悦?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤渠缕,失蹤者是張志新(化名)和其女友劉穎摹闽,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體褐健,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年澜汤,在試婚紗的時候發(fā)現(xiàn)自己被綠了蚜迅。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,617評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡俊抵,死狀恐怖谁不,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情徽诲,我是刑警寧澤刹帕,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站谎替,受9級特大地震影響偷溺,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜钱贯,卻給世界環(huán)境...
    茶點故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一挫掏、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧秩命,春花似錦尉共、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至霹菊,卻和暖如春剧蚣,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工券敌, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留唾戚,地道東北人。 一個月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓待诅,卻偏偏與公主長得像叹坦,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子卑雁,可洞房花燭夜當晚...
    茶點故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內(nèi)容