EFK 配置geo-ip落地實(shí)踐(二)fluentd插件編寫

前因

在之前的篇幅中--EFK 配置geo-ip落地實(shí)踐隅忿,已經(jīng)介紹過如何配置geo插件到fluentd中,實(shí)現(xiàn)ip地址轉(zhuǎn)經(jīng)緯度的實(shí)例邦尊。

但是隨之也產(chǎn)生了一個(gè)問題背桐。即,ip出口地址大多數(shù)落在了大城市蝉揍,并且相對(duì)集中链峭,其導(dǎo)致根據(jù)其轉(zhuǎn)換的經(jīng)緯度所繪制的地圖點(diǎn)太集中,不能夠反映真實(shí)的用戶所在位置又沾。

因此弊仪,新的需求從天而降熙卡!我們需要使用客戶端上報(bào)的經(jīng)緯度來繪制地圖。

在瀏覽了所有fluentd的插件后励饵。居然沒有解析request body的插件驳癌。 fluentd插件查詢地址:https://www.fluentd.org/plugins/all

沒辦法,自己動(dòng)手寫一個(gè)役听。

插件編寫

首先颓鲜,我們可以明確,fluentd的filter數(shù)據(jù)是順序流向下一個(gè)的禾嫉。
由于我也是第一次編寫Ruby程序灾杰。所以先看一下項(xiàng)目結(jié)構(gòu):


image.png

其中比較重要的是:

  1. filter_parse_request_body.rb (fluentd的filter插件編寫,用于攔截?cái)?shù)據(jù)熙参,中途修改數(shù)據(jù))

  2. out_parse_request_body.rb (fluentd的out插件編寫艳吠,用于最后格式化輸出數(shù)據(jù))

  3. parse_request_body_extractor.rb (最終執(zhí)行拆解數(shù)據(jù)的類)

  4. fluent-plugin-parse_request_body.gemspec (項(xiàng)目的配置文件,包括項(xiàng)目名稱孽椰,版本號(hào)等)

下面昭娩,直接上代碼:

  1. filter_parse_request_body.rb
require "fluent/plugin/filter"
require "fluent/plugin/parse_request_body_extractor"

module Fluent::Plugin
  class ParseRequestBodyFilter < Fluent::Plugin::Filter

    #注冊(cè)filter名稱#
    Fluent::Plugin.register_filter('parse_request_body', self)

    #request body在record中所對(duì)應(yīng)的key#
    desc "point a key whose value contains body string."
    config_param :key,    :string
    #需要使用array_value替換數(shù)據(jù)的key#
    desc "point a key who will be replaced."
    config_param :replace_key,    :string, default: nil
    #需要被組合在一起的數(shù)據(jù)名稱#
    desc "If set, the key/value will be reformd array whose would be added to the record."
    config_param :array_value,   :string, default: nil
    #最終組合數(shù)據(jù)在record中的key#
    desc "array_value's key in record"
    config_param :array_value_key,   :string, default: nil
    #request body解析數(shù)據(jù)白名單#
    desc "If set, only the key/value whose key is included only will be added to the record."
    config_param :only,   :string, default: nil
    #request body解析數(shù)據(jù)黑名單#
    desc "If set, the key/value whose key is included except will NOT be added to the record."
    config_param :except, :string, default: nil
    #是否只保留最終解析出來的數(shù)據(jù),而刪除request body原數(shù)據(jù)#
    desc "If set to true, the original key url will be discarded from the record."
    config_param :discard_key, :bool, default: false
    #給解析出的數(shù)據(jù)key添加前綴#
    desc "Prefix of fields."
    config_param :add_field_prefix, :string, default: nil
    #是否允許解析空key#
    desc "If set to true, permit blank key."
    config_param :permit_blank_key, :bool, default: false

    #初始化解析器#
    def configure(conf)
      super
      @extractor = Fluent::Plugin::ParseRequestBodyExtractor.new(self, conf)
    end

    #執(zhí)行解析工作#
    def filter(tag, time, record)
      @extractor.add_query_params_field(record)
    end
  end
end
  1. out_parse_request_body.rb (fluentd的out插件編寫黍匾,用于最后格式化輸出數(shù)據(jù))
require "fluent/plugin/output"
require "fluent/plugin/parse_request_body_extractor"

module Fluent::Plugin
  class ParseRequestBodyOutput < Fluent::Plugin::Output
    include Fluent::HandleTagNameMixin

    #注冊(cè)output名稱#
    Fluent::Plugin.register_output('parse_request_body', self)

    helpers :event_emitter

    #request body在record中所對(duì)應(yīng)的key#
    desc "point a key whose value contains body string."
    config_param :key,    :string
    #需要使用array_value替換數(shù)據(jù)的key#
    desc "point a key who will be replaced."
    config_param :replace_key,    :string, default: nil
    #需要被組合在一起的數(shù)據(jù)名稱#
    desc "If set, the key/value will be reformd array whose would be added to the record."
    config_param :array_value,   :string, default: nil
    #最終組合數(shù)據(jù)在record中的key#
    desc "array_value's key in record"
    config_param :array_value_key,   :string, default: nil
    #request body解析數(shù)據(jù)白名單#
    desc "If set, only the key/value whose key is included only will be added to the record."
    config_param :only,   :string, default: nil
    #request body解析數(shù)據(jù)黑名單#
    desc "If set, the key/value whose key is included except will NOT be added to the record."
    config_param :except, :string, default: nil
    #是否只保留最終解析出來的數(shù)據(jù)栏渺,而刪除request body原數(shù)據(jù)#
    desc "If set to true, the original key url will be discarded from the record."
    config_param :discard_key, :bool, default: false
    #給解析出的數(shù)據(jù)key添加前綴#
    desc "Prefix of fields."
    config_param :add_field_prefix, :string, default: nil
    #是否允許解析空key#
    desc "If set to true, permit blank key."
    config_param :permit_blank_key, :bool, default: false

    #初始化解析器#
    def configure(conf)
      super
      @extractor = Fluent::Plugin::ParseRequestBodyExtractor.new(self, conf)
    end

    def multi_workers_ready?
      true
    end

    #執(zhí)行解析工作#
    def filter_record(tag, time, record)
      record = @extractor.add_query_params_field(record)
      super(tag, time, record)
    end

    def process(tag, es)
      es.each do |time, record|
        t = tag.dup
        filter_record(t, time, record)
        router.emit(t, time, record)
      end
    end
  end
end
  1. parse_request_body_extractor.rb (最終執(zhí)行拆解數(shù)據(jù)的類)
require 'uri'
require 'cgi/util'
require 'webrick'

module Fluent::Plugin
  class ParseRequestBodyExtractor

    attr_reader :log
        
    #初始化解析器#
    def initialize(plugin, conf)
      @log = plugin.log

      if plugin.is_a?(Fluent::Plugin::Output)
        unless have_tag_option?(plugin)
          raise Fluent::ConfigError, "out_parse_request_body: At least one of remove_tag_prefix/remove_tag_suffix/add_tag_prefix/add_tag_suffix is required to be set."
        end
      end

      #從配置中讀取配置選項(xiàng)#
      @key = plugin.key
      @only = plugin.only
      @except = plugin.except
      @discard_key = plugin.discard_key
      @add_field_prefix = plugin.add_field_prefix
      @permit_blank_key = plugin.permit_blank_key
      @array_value = plugin.array_value
      @array_value_key = plugin.array_value_key
      @replace_key = plugin.replace_key

      #初始化白名單#
      if @only
        @include_keys = @only.split(/\s*,\s*/).inject({}) do |hash, i|
          hash[i] = true
          hash
        end
      end

      #初始化黑名單#
      if @except
        @exclude_keys = @except.split(/\s*,\s*/).inject({}) do |hash, i|
          hash[i] = true
          hash
        end
      end

      #初始化需要被組合的key#
      if @array_value_key
        if @array_value
          @include_array_value = @array_value.split(/\s*,\s*/).inject({}) do |hash, i|
            hash[i] = true
            hash
          end
        end
      end

    end

    #解析方法#
    def add_query_params_field(record)
      return record unless record[@key]
      add_query_params(record[@key], record)
      replace_record_by_key(record) if @replace_key
      record.delete(@key) if @discard_key
      record
    end

    private

    #替換record中某一個(gè)鍵值#
    def replace_record_by_key(record)
      return record unless record[@replace_key]
      replace_value = record[@array_value_key]
      empty_value = replace_value.select {|item| item == 0 }
      if replace_value && (empty_value.size != replace_value.size)
        record[@replace_key] = replace_value
      end
    end

    def have_tag_option?(plugin)
      plugin.remove_tag_prefix ||
        plugin.remove_tag_suffix ||
        plugin.add_tag_prefix    ||
        plugin.add_tag_suffix
    end

    def create_field_key(field_key)
      if add_field_prefix?
        "#{@add_field_prefix}#{field_key}"
      else
        field_key
      end
    end

    def add_field_prefix?
      !!@add_field_prefix
    end

    def permit_blank_key?
      @permit_blank_key
    end

    def add_query_params(body, record)
      return if body.nil?
      placeholder = []
      body.split('&').each do |pair|
        key, value = pair.split('=', 2).map { |i| CGI.unescape(i) }
        next if (key.nil? || key.empty?) && (!permit_blank_key? || value.nil? || value.empty?)
        key ||= ''
        value ||= ''

        new_key = create_field_key(key)
        if @only
          record[new_key] = value if @include_keys.has_key?(key)
        elsif @except
          record[new_key] = value if !@exclude_keys.has_key?(key)
        else
          record[new_key] = value
        end

        if @include_array_value
          placeholder[placeholder.size] = value.to_f if @include_array_value.has_key?(key)
        end
      end

      unless placeholder.empty?
        record[@array_value_key] = placeholder
      end
    end
  end
end
  1. fluent-plugin-parse_request_body.gemspec (項(xiàng)目的配置文件,包括項(xiàng)目名稱锐涯,版本號(hào)等)
Gem::Specification.new do |gem|
  gem.name          = 'fluent-plugin-parse_request_body'
  gem.version       = '0.0.18'
  gem.authors       = ['EkiSong']
  gem.email         = ['yifriday0614@gmail.com']
  gem.homepage      = 'https://github.com/yifriday/fluent-plugin-parse_request_body.git'
  gem.description   = %q{Fluentd plugin to parse request body.}
  gem.summary       = %q{Fluentd plugin to parse request body}
  gem.license       = 'MIT'

  gem.files         = `git ls-files`.split($\)
  gem.executables   = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) }
  gem.test_files    = gem.files.grep(%r{^(test|spec|features)/})
  gem.require_paths = ['lib']

  if defined?(RUBY_VERSION) && RUBY_VERSION > '2.2'
    gem.add_development_dependency "test-unit", '~> 3'
  end

  gem.add_development_dependency 'rake'
  gem.add_development_dependency 'appraisal'
  gem.add_runtime_dependency     'fluentd', ['>= 0.14.8', '< 2']
end

插件制作

在結(jié)束編碼工作之后磕诊,使用gem制作上傳插件。

ruby所寫的插件纹腌,都可以上傳到這個(gè)網(wǎng)站https://rubygems.org/

下面是在命令行中的相關(guān)操作

?  fluent-plugin-parse_request_body git:(master) gem build fluent-plugin-parse_request_body.gemspec 
  Successfully built RubyGem
  Name: fluent-plugin-parse_request_body
  Version: 0.0.18
  File: fluent-plugin-parse_request_body-0.0.18.gem
?  fluent-plugin-parse_request_body git:(master) gem push fluent-plugin-parse_request_body-0.0.18.gem
Pushing gem to https://rubygems.org...
Successfully registered gem: fluent-plugin-parse_request_body (0.0.18)

在成功上傳插件后霎终。我們就可以使用gem工具,在我們的服務(wù)器中安裝了

插件安裝和配置

安裝直接執(zhí)行下面語句即可:

/usr/sbin/td-agent-gem install fluent-plugin-parse_request_body

而在td-agent.conf中的配置如下:

<filter nginx.**>
  @type geoip
    geoip_lookup_key remote
    geoip_database /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-geoip-1.2.0/data/GeoLiteCity.dat
    geoip2_database /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-geoip-1.2.0/data/GeoLite2-City.mmdb
    <record>
     city         ${city.names.zh-CN["remote"]}     # skip adding fields if this field is null
     country      ${country.iso_code["remote"]}
     country_name ${country.names.zh-CN["remote"]}
     location '[${location.longitude["remote"]},${location.latitude["remote"]}]'
    </record>
</filter>

<filter nginx.**>
  @type parse_request_body
  key                request_body
  #discard_key        true
  add_field_prefix   body.
  only lat,lng
  array_value lat,lng
  array_value_key locatonGPS
  replace_key location
</filter>

注意:由于我們需要在沒有客戶端經(jīng)緯度的情況下升薯,依然使用ip轉(zhuǎn)換莱褒,并且filter是單向數(shù)據(jù)流,所以 parse_request_body一定要配置在geo下方涎劈。

至此广凸,數(shù)據(jù)地圖的數(shù)據(jù)處理部分第二階段工作結(jié)束。

后續(xù)蛛枚,在目前的需求上谅海,此插件需要優(yōu)化一點(diǎn):

組合數(shù)據(jù),使用<record>標(biāo)簽動(dòng)態(tài)配置蹦浦,想geo中配置location一樣扭吁,而不是現(xiàn)在死配置這種比較low的做法。

感興趣的同學(xué),可以看我的github:

https://github.com/yifriday/fluent-plugin-parse_request_body.git

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末智末,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子徒河,更是在濱河造成了極大的恐慌系馆,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件顽照,死亡現(xiàn)場(chǎng)離奇詭異由蘑,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)代兵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門尼酿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人植影,你說我怎么就攤上這事裳擎。” “怎么了思币?”我有些...
    開封第一講書人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵鹿响,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我谷饿,道長(zhǎng)惶我,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任博投,我火速辦了婚禮绸贡,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘毅哗。我一直安慰自己听怕,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開白布黎做。 她就那樣靜靜地躺著叉跛,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蒸殿。 梳的紋絲不亂的頭發(fā)上筷厘,一...
    開封第一講書人閱讀 51,631評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音宏所,去河邊找鬼酥艳。 笑死,一個(gè)胖子當(dāng)著我的面吹牛爬骤,可吹牛的內(nèi)容都是我干的充石。 我是一名探鬼主播,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼霞玄,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼骤铃!你這毒婦竟也來了拉岁?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤惰爬,失蹤者是張志新(化名)和其女友劉穎喊暖,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體撕瞧,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡陵叽,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了丛版。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片巩掺。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖页畦,靈堂內(nèi)的尸體忽然破棺而出胖替,到底是詐尸還是另有隱情,我是刑警寧澤寇漫,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布刊殉,位于F島的核電站,受9級(jí)特大地震影響州胳,放射性物質(zhì)發(fā)生泄漏记焊。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一栓撞、第九天 我趴在偏房一處隱蔽的房頂上張望遍膜。 院中可真熱鬧,春花似錦瓤湘、人聲如沸瓢颅。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽挽懦。三九已至,卻和暖如春木人,著一層夾襖步出監(jiān)牢的瞬間信柿,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工醒第, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留渔嚷,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓稠曼,卻偏偏與公主長(zhǎng)得像形病,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理漠吻,服務(wù)發(fā)現(xiàn)量瓜,斷路器,智...
    卡卡羅2017閱讀 134,659評(píng)論 18 139
  • 這次接到的需求是途乃,可以根據(jù)用戶的ip地址榔至,實(shí)時(shí)展示在我們大中國(guó)的地圖上。 被飛哥告知可以在EFK上實(shí)現(xiàn)欺劳,再經(jīng)過一番...
    宋奕Ekis閱讀 1,741評(píng)論 0 1
  • 如果生命的初次排練就已經(jīng)是生命本身,那么生命到底會(huì)有什么價(jià)值铅鲤? 正因?yàn)檫@樣划提,生命才總是像一張草圖。 ——米蘭昆德拉...
    樂活人生百態(tài)閱讀 1,337評(píng)論 0 5
  • 深夜罪惡感跑出來嚇了我一跳然后它睡了我還醒著
    七九月貓與CD閱讀 161評(píng)論 0 0
  • 我們?yōu)槭裁纯鞓沸舷恚恳驗(yàn)榭鞓返哪芰ι鷣砥降取? 記得丁立梅的《風(fēng)會(huì)記得一朵花的香》中又一篇文章就叫做“我為什...
    章燚周閱讀 405評(píng)論 1 1