前因
在之前的篇幅中--EFK 配置geo-ip落地實(shí)踐隅忿,已經(jīng)介紹過如何配置geo插件到fluentd中,實(shí)現(xiàn)ip地址轉(zhuǎn)經(jīng)緯度的實(shí)例邦尊。
但是隨之也產(chǎn)生了一個(gè)問題背桐。即,ip出口地址大多數(shù)落在了大城市蝉揍,并且相對(duì)集中链峭,其導(dǎo)致根據(jù)其轉(zhuǎn)換的經(jīng)緯度所繪制的地圖點(diǎn)太集中,不能夠反映真實(shí)的用戶所在位置又沾。
因此弊仪,新的需求從天而降熙卡!我們需要使用客戶端上報(bào)的經(jīng)緯度來繪制地圖。
在瀏覽了所有fluentd的插件后励饵。居然沒有解析request body的插件驳癌。 fluentd插件查詢地址:https://www.fluentd.org/plugins/all
沒辦法,自己動(dòng)手寫一個(gè)役听。
插件編寫
首先颓鲜,我們可以明確,fluentd的filter數(shù)據(jù)是順序流向下一個(gè)的禾嫉。
由于我也是第一次編寫Ruby程序灾杰。所以先看一下項(xiàng)目結(jié)構(gòu):
其中比較重要的是:
filter_parse_request_body.rb (fluentd的filter插件編寫,用于攔截?cái)?shù)據(jù)熙参,中途修改數(shù)據(jù))
out_parse_request_body.rb (fluentd的out插件編寫艳吠,用于最后格式化輸出數(shù)據(jù))
parse_request_body_extractor.rb (最終執(zhí)行拆解數(shù)據(jù)的類)
fluent-plugin-parse_request_body.gemspec (項(xiàng)目的配置文件,包括項(xiàng)目名稱孽椰,版本號(hào)等)
下面昭娩,直接上代碼:
- filter_parse_request_body.rb
require "fluent/plugin/filter"
require "fluent/plugin/parse_request_body_extractor"
module Fluent::Plugin
class ParseRequestBodyFilter < Fluent::Plugin::Filter
#注冊(cè)filter名稱#
Fluent::Plugin.register_filter('parse_request_body', self)
#request body在record中所對(duì)應(yīng)的key#
desc "point a key whose value contains body string."
config_param :key, :string
#需要使用array_value替換數(shù)據(jù)的key#
desc "point a key who will be replaced."
config_param :replace_key, :string, default: nil
#需要被組合在一起的數(shù)據(jù)名稱#
desc "If set, the key/value will be reformd array whose would be added to the record."
config_param :array_value, :string, default: nil
#最終組合數(shù)據(jù)在record中的key#
desc "array_value's key in record"
config_param :array_value_key, :string, default: nil
#request body解析數(shù)據(jù)白名單#
desc "If set, only the key/value whose key is included only will be added to the record."
config_param :only, :string, default: nil
#request body解析數(shù)據(jù)黑名單#
desc "If set, the key/value whose key is included except will NOT be added to the record."
config_param :except, :string, default: nil
#是否只保留最終解析出來的數(shù)據(jù),而刪除request body原數(shù)據(jù)#
desc "If set to true, the original key url will be discarded from the record."
config_param :discard_key, :bool, default: false
#給解析出的數(shù)據(jù)key添加前綴#
desc "Prefix of fields."
config_param :add_field_prefix, :string, default: nil
#是否允許解析空key#
desc "If set to true, permit blank key."
config_param :permit_blank_key, :bool, default: false
#初始化解析器#
def configure(conf)
super
@extractor = Fluent::Plugin::ParseRequestBodyExtractor.new(self, conf)
end
#執(zhí)行解析工作#
def filter(tag, time, record)
@extractor.add_query_params_field(record)
end
end
end
- out_parse_request_body.rb (fluentd的out插件編寫黍匾,用于最后格式化輸出數(shù)據(jù))
require "fluent/plugin/output"
require "fluent/plugin/parse_request_body_extractor"
module Fluent::Plugin
class ParseRequestBodyOutput < Fluent::Plugin::Output
include Fluent::HandleTagNameMixin
#注冊(cè)output名稱#
Fluent::Plugin.register_output('parse_request_body', self)
helpers :event_emitter
#request body在record中所對(duì)應(yīng)的key#
desc "point a key whose value contains body string."
config_param :key, :string
#需要使用array_value替換數(shù)據(jù)的key#
desc "point a key who will be replaced."
config_param :replace_key, :string, default: nil
#需要被組合在一起的數(shù)據(jù)名稱#
desc "If set, the key/value will be reformd array whose would be added to the record."
config_param :array_value, :string, default: nil
#最終組合數(shù)據(jù)在record中的key#
desc "array_value's key in record"
config_param :array_value_key, :string, default: nil
#request body解析數(shù)據(jù)白名單#
desc "If set, only the key/value whose key is included only will be added to the record."
config_param :only, :string, default: nil
#request body解析數(shù)據(jù)黑名單#
desc "If set, the key/value whose key is included except will NOT be added to the record."
config_param :except, :string, default: nil
#是否只保留最終解析出來的數(shù)據(jù)栏渺,而刪除request body原數(shù)據(jù)#
desc "If set to true, the original key url will be discarded from the record."
config_param :discard_key, :bool, default: false
#給解析出的數(shù)據(jù)key添加前綴#
desc "Prefix of fields."
config_param :add_field_prefix, :string, default: nil
#是否允許解析空key#
desc "If set to true, permit blank key."
config_param :permit_blank_key, :bool, default: false
#初始化解析器#
def configure(conf)
super
@extractor = Fluent::Plugin::ParseRequestBodyExtractor.new(self, conf)
end
def multi_workers_ready?
true
end
#執(zhí)行解析工作#
def filter_record(tag, time, record)
record = @extractor.add_query_params_field(record)
super(tag, time, record)
end
def process(tag, es)
es.each do |time, record|
t = tag.dup
filter_record(t, time, record)
router.emit(t, time, record)
end
end
end
end
- parse_request_body_extractor.rb (最終執(zhí)行拆解數(shù)據(jù)的類)
require 'uri'
require 'cgi/util'
require 'webrick'
module Fluent::Plugin
class ParseRequestBodyExtractor
attr_reader :log
#初始化解析器#
def initialize(plugin, conf)
@log = plugin.log
if plugin.is_a?(Fluent::Plugin::Output)
unless have_tag_option?(plugin)
raise Fluent::ConfigError, "out_parse_request_body: At least one of remove_tag_prefix/remove_tag_suffix/add_tag_prefix/add_tag_suffix is required to be set."
end
end
#從配置中讀取配置選項(xiàng)#
@key = plugin.key
@only = plugin.only
@except = plugin.except
@discard_key = plugin.discard_key
@add_field_prefix = plugin.add_field_prefix
@permit_blank_key = plugin.permit_blank_key
@array_value = plugin.array_value
@array_value_key = plugin.array_value_key
@replace_key = plugin.replace_key
#初始化白名單#
if @only
@include_keys = @only.split(/\s*,\s*/).inject({}) do |hash, i|
hash[i] = true
hash
end
end
#初始化黑名單#
if @except
@exclude_keys = @except.split(/\s*,\s*/).inject({}) do |hash, i|
hash[i] = true
hash
end
end
#初始化需要被組合的key#
if @array_value_key
if @array_value
@include_array_value = @array_value.split(/\s*,\s*/).inject({}) do |hash, i|
hash[i] = true
hash
end
end
end
end
#解析方法#
def add_query_params_field(record)
return record unless record[@key]
add_query_params(record[@key], record)
replace_record_by_key(record) if @replace_key
record.delete(@key) if @discard_key
record
end
private
#替換record中某一個(gè)鍵值#
def replace_record_by_key(record)
return record unless record[@replace_key]
replace_value = record[@array_value_key]
empty_value = replace_value.select {|item| item == 0 }
if replace_value && (empty_value.size != replace_value.size)
record[@replace_key] = replace_value
end
end
def have_tag_option?(plugin)
plugin.remove_tag_prefix ||
plugin.remove_tag_suffix ||
plugin.add_tag_prefix ||
plugin.add_tag_suffix
end
def create_field_key(field_key)
if add_field_prefix?
"#{@add_field_prefix}#{field_key}"
else
field_key
end
end
def add_field_prefix?
!!@add_field_prefix
end
def permit_blank_key?
@permit_blank_key
end
def add_query_params(body, record)
return if body.nil?
placeholder = []
body.split('&').each do |pair|
key, value = pair.split('=', 2).map { |i| CGI.unescape(i) }
next if (key.nil? || key.empty?) && (!permit_blank_key? || value.nil? || value.empty?)
key ||= ''
value ||= ''
new_key = create_field_key(key)
if @only
record[new_key] = value if @include_keys.has_key?(key)
elsif @except
record[new_key] = value if !@exclude_keys.has_key?(key)
else
record[new_key] = value
end
if @include_array_value
placeholder[placeholder.size] = value.to_f if @include_array_value.has_key?(key)
end
end
unless placeholder.empty?
record[@array_value_key] = placeholder
end
end
end
end
- fluent-plugin-parse_request_body.gemspec (項(xiàng)目的配置文件,包括項(xiàng)目名稱锐涯,版本號(hào)等)
Gem::Specification.new do |gem|
gem.name = 'fluent-plugin-parse_request_body'
gem.version = '0.0.18'
gem.authors = ['EkiSong']
gem.email = ['yifriday0614@gmail.com']
gem.homepage = 'https://github.com/yifriday/fluent-plugin-parse_request_body.git'
gem.description = %q{Fluentd plugin to parse request body.}
gem.summary = %q{Fluentd plugin to parse request body}
gem.license = 'MIT'
gem.files = `git ls-files`.split($\)
gem.executables = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) }
gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
gem.require_paths = ['lib']
if defined?(RUBY_VERSION) && RUBY_VERSION > '2.2'
gem.add_development_dependency "test-unit", '~> 3'
end
gem.add_development_dependency 'rake'
gem.add_development_dependency 'appraisal'
gem.add_runtime_dependency 'fluentd', ['>= 0.14.8', '< 2']
end
插件制作
在結(jié)束編碼工作之后磕诊,使用gem制作上傳插件。
ruby所寫的插件纹腌,都可以上傳到這個(gè)網(wǎng)站https://rubygems.org/
下面是在命令行中的相關(guān)操作
? fluent-plugin-parse_request_body git:(master) gem build fluent-plugin-parse_request_body.gemspec
Successfully built RubyGem
Name: fluent-plugin-parse_request_body
Version: 0.0.18
File: fluent-plugin-parse_request_body-0.0.18.gem
? fluent-plugin-parse_request_body git:(master) gem push fluent-plugin-parse_request_body-0.0.18.gem
Pushing gem to https://rubygems.org...
Successfully registered gem: fluent-plugin-parse_request_body (0.0.18)
在成功上傳插件后霎终。我們就可以使用gem工具,在我們的服務(wù)器中安裝了
插件安裝和配置
安裝直接執(zhí)行下面語句即可:
/usr/sbin/td-agent-gem install fluent-plugin-parse_request_body
而在td-agent.conf中的配置如下:
<filter nginx.**>
@type geoip
geoip_lookup_key remote
geoip_database /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-geoip-1.2.0/data/GeoLiteCity.dat
geoip2_database /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-geoip-1.2.0/data/GeoLite2-City.mmdb
<record>
city ${city.names.zh-CN["remote"]} # skip adding fields if this field is null
country ${country.iso_code["remote"]}
country_name ${country.names.zh-CN["remote"]}
location '[${location.longitude["remote"]},${location.latitude["remote"]}]'
</record>
</filter>
<filter nginx.**>
@type parse_request_body
key request_body
#discard_key true
add_field_prefix body.
only lat,lng
array_value lat,lng
array_value_key locatonGPS
replace_key location
</filter>
注意:由于我們需要在沒有客戶端經(jīng)緯度的情況下升薯,依然使用ip轉(zhuǎn)換莱褒,并且filter是單向數(shù)據(jù)流,所以 parse_request_body一定要配置在geo下方涎劈。
至此广凸,數(shù)據(jù)地圖的數(shù)據(jù)處理部分第二階段工作結(jié)束。
后續(xù)蛛枚,在目前的需求上谅海,此插件需要優(yōu)化一點(diǎn):
組合數(shù)據(jù),使用<record>標(biāo)簽動(dòng)態(tài)配置蹦浦,想geo中配置location一樣扭吁,而不是現(xiàn)在死配置這種比較low的做法。
感興趣的同學(xué),可以看我的github:
https://github.com/yifriday/fluent-plugin-parse_request_body.git