初識Ingest Pipeline

初識Ingest Pipeline

ES在具備node.ingest:true的節(jié)點上提供了pipeline功能,可以在文檔真正寫入ES之前進行一些轉換庞钢,插入新值,修改文檔內容等功能。pipeline會攔截index和bulk請求骗炉,請求經處理后再寫入ES。用戶可以定義一系列的"處理器"材失,在處理文檔時痕鳍,pipeline會按照processor的定義順序執(zhí)行processor。定義個格式:

PUT _ingest/pipeline/my_pipeline_id
{
  "description" : "describe pipeline",
  "processors" : [
    {
      "set" : {
        "field": "foo",
        "value": "new"
      }
    }
  ]
}

官方示例:在PIPELINE中使用Scripts Processor

以ES 官方文檔中的這個例子進行試驗:https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-context-examples.html

其提供了一份數據龙巨,里面包含了歌劇的場次信息統(tǒng)計笼呆。我們首先要把樣例數據導入到ES中。其中一條是這樣:

{ "create" : { "_index" : "seats", "_type" : "seat", "_id" : "36203" } }
{ "theatre" : "Skyline", "play" : "Auntie Jo", "actors": [ "Jo Hangum", "Jon Hittle", "Rob Kettleman", "Laura Conrad", "Simon Hower", "Nora Blue" ], "date": "2018-12-14", "time": "5:40PM", "row": 11, "number": 14, "cost": 17.5, "sold": false }

我們可以看到旨别,其內容是有問題的诗赌,_type類型在目前的版本中即將被廢棄,只能是doc類型秸弛。因此如果按照原文的方法導入铭若,是會失敗的。

https://www.elastic.co/guide/en/elasticsearch/reference/7.6/mapping-type-field.html

我們需要對_type進行修改递览。在painless腳本中叼屠,可以使用ctx['_type']來修改插入操作的type.

ctx['_index']

The name of the index.

ctx['_type']

The type of document within an index.

增加一條ingest的處理器。

PUT _ingest/pipeline/seats
{
  "description": "seats-ingest",
  "processors": [
    {"script": {
      "lang": "painless",
      "source": """
      ctx['_type'] = "_doc";
      """
    }}
  ]
}

在kibana上嘗試插入一條:

POST _bulk?pipeline=seats
{ "create" : { "_index" : "seats", "_type" : "seat", "_id" : "36203" } }
{ "theatre" : "Skyline", "play" : "Auntie Jo", "actors": [ "Jo Hangum", "Jon Hittle", "Rob Kettleman", "Laura Conrad", "Simon Hower", "Nora Blue" ], "date": "2018-12-14", "time": "5:40PM", "row": 11, "number": 14, "cost": 17.5, "sold": false }

#! Deprecation: [types removal] Specifying types in bulk requests is deprecated.
{
  "took" : 4960,
  "ingest_took" : 373,
  "errors" : false,
  "items" : [
    {
      "create" : {
        "_index" : "seats",
        "_type" : "_doc",
        "_id" : "36203",
        "_version" : 1,
        "result" : "created",
        "_shards" : {
          "total" : 2,
          "successful" : 1,
          "failed" : 0
        },
        "_seq_no" : 0,
        "_primary_term" : 1,
        "status" : 201
      }
    }
  ]
}

可以看到绞铃,是有_bulk請求插入時指定_doc類型的做法已經廢棄了镜雨。可以看到_type已經被改成了_doc儿捧,而如果沒有這個pipeline荚坞,結果將是_bulk請求中指定的seats類型。

修改后腳本改為如下:

PUT _ingest/pipeline/seats
{
  "description": "seats-ingest",
  "processors": [
    {"script": {
      "lang": "painless",
      "source": """
      String[] split(String str, char delimiter)
      {
        int count = 0;
        for (char c: str.toCharArray())
        {
          if (c == delimiter)
          {
            ++ count;
          }
        }
        
        if (count == 0)
        {
          return new String[] {str};
        }
        
        String[] r = new String[count + 1];
        int i0 = 0, i1 = 0, n = 0;
        for (char c: str.toCharArray())
        {
          if (c == delimiter)
          {
            r[n] = str.substring(i0, i1);
            ++n;
            i0 = i1 + 1;
          }
          ++ i1;
        }
        r[count] = str.substring(i0, i1);
        
        return r;
      }
      
      ctx['_type'] = "_doc";
      
      String[] date_array = split(ctx.date, (char)"-");
      
      String year = date_array[0].trim();
      String month = date_array[1].trim();
      if (month.length() == 1)
      {
        month = "0" + month;
      }
      String day = date_array[2].trim();
      if (day.length() == 1)
      {
        day = "0" + day;
      }
      
      boolean is_pm = ctx.time.substring(ctx.time.length() - 2).equals("PM");
      
      String[] time_array = split(ctx.time.substring(0, ctx.time.length()-2), (char)":");
      
      int hour = Integer.parseInt(time_array[0].trim());
  
      if (is_pm)
      {
        hour += 12;
      }
      String hour_str = "";
      if (hour < 10)
      {
        hour_str += "0";
      }
      hour_str += hour;
      
      int min = Integer.parseInt(time_array[1].trim());
      String min_str = "";
      if (min < 10)
      {
        min_str += "0";
      }
      min_str += min;
      
      String date_time = year + "-" + month + "-" + day + "T" + hour_str + ":" + min + ":00+08:00";
      
      ZonedDateTime dt = ZonedDateTime.parse(
         date_time, DateTimeFormatter.ISO_OFFSET_DATE_TIME);               
      ctx.datetime = dt.getLong(ChronoField.INSTANT_SECONDS)*1000L;
      ctx["_type"] = "_doc";
      """
    }}
  ]
}

對應的導入命令也做一下修改菲盾,改為:

curl -k -XPOST https://elastic:elastic@localhost:9200/_bulk?pipeline=seats -H "Content-Type: application/x-ndjson" --data-binary "@/home/DATA/seats-init.json"

之前試了好幾次颓影,都在指定輸入文件這一步出錯了,最后發(fā)現是需要在路徑最前面加一個@懒鉴。

可能是我的機器不是特別好诡挂,導入時間有點長。7萬多條記錄花了很長時間临谱。事實上最后導入失敗了咆畏,只導入了3000多條。錯誤顯示主分片不可訪問吴裤。

對比ClickHouse旧找,8000萬條記錄只花了幾分鐘就搞定。用來進行分析麦牺,ClickHouse各方面指標要來得更好钮蛛。

對pipeline的添加鞭缭,查看,修改魏颓,模擬調試API

ES 提供了一些API對ingest 的pipeline進行調試.

添加

PUT _ingest/pipeline/my-pipeline-id
{
  "description" : "describe pipeline",
  "processors" : [
    {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
    }
  ]
}

查看

GET _ingest/pipeline/my-pipeline-id

刪除

DELETE _ingest/pipeline/my-pipeline-id

模擬

POST _ingest/pipeline/_simulate
{
  "pipeline" : {
    // pipeline definition here
  },
  "docs" : [
    { "_source": {/** first document **/} },
    { "_source": {/** second document **/} },
    // ...
  ]
}

也可以這樣岭辣,在POST行中指定索引名稱進行處理:

POST _ingest/pipeline/seats/_simulate
{
  "docs": [
    {
      "_source": {
        "theatre": "Skyline",
        "play": "Auntie Jo",
        "actors": [
          "Jo Hangum",
          "Jon Hittle",
          "Rob Kettleman",
          "Laura Conrad",
          "Simon Hower",
          "Nora Blue"
        ],
        "date": "2018-12-14",
        "time": "5:40PM",
        "row": 11,
        "number": 14,
        "cost": 17.5,
        "sold": false
      }
    }
  ]
}

如何在Pipeline中操作使用文檔的數據

文檔的"_source"數據

上文的例子中,我們在Pipeline中使用script處理器時甸饱,可以使用painless的內置數據結構ctx對文檔source進行處理沦童。

ctx是一個mapping對象,內部存儲了包括文檔的meta數據和_source數據:

ingest情況下
_index: ctx._index or ctx["_index"]
_doc: ctx._doc or ctx["_doc"]
_op: ctx._op or ctx["_op"]
xxxx: ctx.xxxx or ctx["xxxx"]

Update情況下
xxxx: ctx._source.xxxx

在不使用script的情況下叹话,同樣可以對_source中的數據進行訪問偷遗。

比如說使用set處理器的情況下:

PUT _ingest/pipeline/bbb
{
  "description": "test",
  "processors": [
    {"set": {
      "field": "_source.foo",
      "value": "bar"
    }}
  ]
}

PUT _ingest/pipeline/aaa
{
  "description": "test",
  "processors": [
    {"set": {
      "field": "foo",
      "value": "bar"
    }}
  ]
}

可以直接通過數據名或者_source.數據嗎進行處理。

文檔的metadata

文檔的metadata包括_index, _type, _routing, _id驼壶,在pipeline中均可以直接訪問氏豌。

Ingest的metadata

PUT _ingest/pipeline/ccc
{
  "description": "test",
  "processors": [
    {"set": {
    "field": "received",
    "value": "{{_ingest.timestamp}}"
    }}
  ]
}

不同于metadata里的成員,_ingest可以是一個_source中的合法成員热凹,因此泵喘,訪問ingest的meta數據需要使用這樣的方式{{_ingest.timestamp}}。

template 中的數據與metadata

在模擬中般妙,也可以使用pipeline纪铺,同樣,訪問template中的數據也需要使用{{ }}碟渺。

{
  "set": {
    "field": "field_c",
    "value": "{{field_a}} {{field_b}}"
  }
}

上面的這個set processor鲜锚,將原有的 field_a 和 field_b 做了拼接之后,將其賦給了一個新字段field_c止状。

同時烹棉,template中也支持動態(tài)設置字段攒霹。

{
  "set": {
    "field": "{{service}}",
    "value": "{{code}}"
  }
}

要說明的是怯疤,如果想在template中使用,需要設置動態(tài)索引參數:

index.default_pipeline

The default ingest node pipeline for this index. Index requests will fail if the default pipeline is set and the pipeline does not exist. The default may be overridden using the pipeline parameter. The special pipeline name _none indicates no ingest pipeline should be run.

index.final_pipeline

The final ingest node pipeline for this index. Index requests will fail if the final pipeline is set and the pipeline does not exist. The final pipeline always runs after the request pipeline (if specified) and the default pipeline (if it exists). The special pipeline name _none indicates no ingest pipeline will run.

需要注意的是: final_pipeline是7.5版本之后才支持的催束。之前版本沒有集峦,比如我用的7.4版本,這個參數就不支持抠刺。

使用方法塔淤,在index的setting中指定:

  "settings": {
      "number_of_shards": 1,
      "default_pipeline": "_none"
  },

default_pipeline會被命令行中指定的pipeline覆蓋,而final_pipeline會在最后執(zhí)行速妖。

條件執(zhí)行

Pipeline中的每個processor都支持一個if參數高蜂,滿足條件的processor才會執(zhí)行。

PUT _ingest/pipeline/drop_guests_network
{
  "processors": [
    {
      "drop": {
        "if": "ctx.network_name == 'Guest'"
      }
    }
  ]
}

這是官方文檔中的一個例子罕容,當network_name為Guest時备恤,丟棄這條記錄稿饰。

同時,if參數也支持復雜的腳本露泊。

PUT _ingest/pipeline/not_prod_dropper
{
  "processors": [
    {
      "drop": {
        "if": """
            Collection tags = ctx.tags;
            if(tags != null){
              for (String tag : tags) {
                  if (tag.toLowerCase().contains('prod')) {
                      return false;
                  }
              }
            }
            return true;
        """
      }
    }
  ]
}

編寫一段painless腳本喉镰,返回true or false即可。

Processors中可以使用嵌套對象進行判斷惭笑,但是為了防止產生null point exception侣姆,可以使用?.來訪問成員以防止成員不存在。

PUT _ingest/pipeline/not_prod_dropper
{
  "processors": [
    {
      "drop": {
        "if": """
            Collection tags = ctx.tags;
            if(tags != null){
              for (String tag : tags) {
                  if (tag.toLowerCase().contains('prod')) {
                      return false;
                  }
              }
            }
            return true;
        """
      }
    }
  ]
}

有一種processor名為pipeline沉噩,可以結合if條件指定對應的pipeline名稱捺宗。

curl -X PUT "localhost:9200/_ingest/pipeline/logs_pipeline?pretty" -H 'Content-Type: application/json' -d'
{
  "description": "A pipeline of pipelines for log files",
  "version": 1,
  "processors": [
    {
      "pipeline": {
        "if": "ctx.service?.name == \u0027apache_httpd\u0027",
        "name": "httpd_pipeline"
      }
    },
    {
      "pipeline": {
        "if": "ctx.service?.name == \u0027syslog\u0027",
        "name": "syslog_pipeline"
      }
    },
    {
      "fail": {
        "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
      }
    }
  ]
}
'

上面的pipeline processor指定了兩個參數,if是該processor滿足的條件屁擅,name是對應需要執(zhí)行的pipeline的名稱偿凭。

此外,pipeline的條件中還可以使用正則表達式派歌。

PUT _ingest/pipeline/check_url
{
  "processors": [
    {
      "set": {
        "if": "ctx.href?.url =~ /^http[^s]/",
        "field": "href.insecure",
        "value": true
      }
    }
  ]
}

異常處理

正常情況下弯囊,pipeline會順序執(zhí)行所有的processor,并在遇到第一個異常時胶果,中斷當前文檔的處理匾嘱。但是有時候,用戶希望自定義異常的處理早抠,這時候就需要為processor設置on_failure參數霎烙。這樣當process遇到error的時候,會執(zhí)行on_failure里的內容蕊连,然后繼續(xù)執(zhí)行下一個處理器悬垃。

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "foo",
        "target_field" : "bar",
        "on_failure" : [
          {
            "set" : {
              "field" : "error",
              "value" : "field \"foo\" does not exist, cannot rename to \"bar\""
            }
          }
        ]
      }
    }
  ]
}

也可以進行索引級別的異常處理:

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "foo",
        "target_field" : "bar"
        ]
      }
    }
  ],
  "on_failure" : [
    {
      "set" : {
        "field" : "_index",
        "value" : "failed-{{ _index }}"
      }
    }
  ]
}

值得注意的是,同一個異常只能被捕獲一次甘苍,這個和java的異常處理機制是一致的尝蠕。如果同時定義了processor和pipeline級別的異常處理模塊,則異常只會被processor級別所捕獲载庭。

同時看彼,我們也可以讓processor忽略此處的錯誤。只要將參數ignore_failure置為true即可囚聚。

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "foo",
        "target_field" : "bar",
        "ignore_failure" : true
      }
    }
  ]
}
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末靖榕,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子顽铸,更是在濱河造成了極大的恐慌茁计,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件谓松,死亡現場離奇詭異星压,居然都是意外死亡瓶蝴,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進店門租幕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來舷手,“玉大人,你說我怎么就攤上這事劲绪∧锌撸” “怎么了?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵贾富,是天一觀的道長歉眷。 經常有香客問我,道長颤枪,這世上最難降的妖魔是什么汗捡? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮畏纲,結果婚禮上扇住,老公的妹妹穿的比我還像新娘。我一直安慰自己盗胀,他們只是感情好艘蹋,可當我...
    茶點故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著票灰,像睡著了一般女阀。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上屑迂,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天浸策,我揣著相機與錄音,去河邊找鬼惹盼。 笑死庸汗,一個胖子當著我的面吹牛,可吹牛的內容都是我干的逻锐。 我是一名探鬼主播夫晌,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼雕薪,長吁一口氣:“原來是場噩夢啊……” “哼昧诱!你這毒婦竟也來了?” 一聲冷哼從身側響起所袁,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤盏档,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后燥爷,有當地人在樹林里發(fā)現了一具尸體蜈亩,經...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡懦窘,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了稚配。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片畅涂。...
    茶點故事閱讀 38,617評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖道川,靈堂內的尸體忽然破棺而出午衰,到底是詐尸還是另有隱情,我是刑警寧澤冒萄,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布臊岸,位于F島的核電站,受9級特大地震影響尊流,放射性物質發(fā)生泄漏帅戒。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一崖技、第九天 我趴在偏房一處隱蔽的房頂上張望逻住。 院中可真熱鬧,春花似錦迎献、人聲如沸鄙信。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽装诡。三九已至,卻和暖如春践盼,著一層夾襖步出監(jiān)牢的瞬間鸦采,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工咕幻, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留渔伯,地道東北人。 一個月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓肄程,卻偏偏與公主長得像锣吼,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子蓝厌,可洞房花燭夜當晚...
    茶點故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內容