如何寫一個(gè)轉(zhuǎn)發(fā)日志的fluentd插件霎烙?
上篇介紹了logging-operator
依賴于自定義的fluentd
插件,實(shí)現(xiàn)了根據(jù)指定的namespaces
和labels
轉(zhuǎn)發(fā)日志蕊连,本篇將從以下幾個(gè)方面介紹如何編寫一個(gè)具有該功能集成的fluentd
插件:
確定要擴(kuò)展的插件類型
相關(guān)語法詞法介紹
學(xué)習(xí)如何編寫一個(gè)
fluentd
插件
確定要擴(kuò)展的插件類型
根據(jù)我們的需求悬垃, 需要按照namespaces
和labels
來完成日志的轉(zhuǎn)發(fā),這依賴于kubernetes
元數(shù)據(jù)甘苍。kubernetes
元數(shù)據(jù)的獲取并不在fluentd
階段配置尝蠕,而是在轉(zhuǎn)發(fā)給fluentd
之前,依賴于fluent-bit
的配置载庭。
https://docs.fluentbit.io/manual/pipeline/filters/kubernetes#workflow-of-tail-kubernetes-filter
$ kubectl get secrets defaultlogging-fluentbit -o json | jq '.data."fluent-bit.conf"' | xargs echo | base64 --decode
[SERVICE]
Flush 1
Grace 5
Daemon Off
Log_Level info
Parsers_File parsers.conf
Coro_Stack_Size 24576
storage.path /buffers
[INPUT]
Name tail
DB /tail-db/tail-containers-state.db
Mem_Buf_Limit 5MB
Parser docker
Path /var/log/containers/*.log
Refresh_Interval 5
Skip_Long_Lines On
Tag kubernetes.*
[FILTER]
Name kubernetes
Buffer_Size 0
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Tag_Prefix kubernetes.var.log.containers
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Kube_URL https://kubernetes.default.svc:443
Match kubernetes.*
Merge_Log On
在確定好該配置后激活后看彼,我們來到fluentd
這一層,需要編寫一個(gè)output
插件來完成過濾囚聚、轉(zhuǎn)發(fā)功能靖榕。
相關(guān)語法詞法介紹
詳細(xì)樣例參考:https://docs.fluentd.org/plugin-development/api-plugin-output
上面鏈接中搬運(yùn)過來就是這樣的:
require 'fluent/plugin/output'
module Fluent::Plugin
class SomeOutput < Output
# First, register the plugin. 'NAME' is the name of this plugin
# and identifies the plugin in the configuration file.
Fluent::Plugin.register_output('NAME', self)
# Enable threads if you are writing an async buffered plugin.
helpers :thread
# Define parameters for your plugin.
config_param :path, :string
#### Non-Buffered Output #############################
# Implement `process()` if your plugin is non-buffered.
# Read "Non-Buffered output" for details.
######################################################
def process(tag, es)
es.each do |time, record|
# output events to ...
end
end
#### Sync Buffered Output ##############################
# Implement `write()` if your plugin uses normal buffer.
# Read "Sync Buffered Output" for details.
########################################################
def write(chunk)
real_path = extract_placeholders(@path, chunk)
log.debug 'writing data to file', chunk_id: dump_unique_id_hex(chunk.unique_id)
# For standard chunk format (without `#format()` method)
chunk.each do |time, record|
# output events to ...
end
# For custom format (when `#format()` implemented)
# File.open(real_path, 'w+')
# or `#write_to(io)` is available
# File.open(real_path, 'w+') do |file|
# chunk.write_to(file)
# end
end
#### Async Buffered Output #############################
# Implement `try_write()` if you want to defer committing
# chunks. Read "Async Buffered Output" for details.
########################################################
def try_write(chunk)
real_path = extract_placeholders(@path, chunk)
log.debug 'sending data to server', chunk_id: dump_unique_id_hex(chunk.unique_id)
send_data_to_server(@host, real_path, chunk.read)
chunk_id = chunk.unique_id
# Create a thread for deferred commit.
thread_create(:check_send_result) do
while thread_current_running?
sleep SENDDATA_CHECK_INTERVAL # == 5
if check_data_on_server(real_path, chunk_id)
# commit chunk
# chunk will be deleted and not be retried anymore by this call
commit_write(chunk_id)
break
end
end
end
end
# Override `#format` if you want to customize how Fluentd stores
# events. Read the section "How to Customize the Serialization
# Format for Chunks" for details.
def format(tag, time, record)
[tag, time, record].to_json
end
end
end
我將一個(gè)插件的編寫規(guī)范整理為兩類,一類是骨架定義顽铸,一類是子類邏輯實(shí)現(xiàn):
- 骨架定義部分包括
require
茁计、module
、class definition
- 子類邏輯實(shí)現(xiàn)又包括插件注冊(cè)谓松、參數(shù)定義星压、激活配置等前置邏輯和具體接口實(shí)現(xiàn)和內(nèi)置方法調(diào)用的邏輯。
require
根據(jù)需要編寫的插件類型導(dǎo)入依賴:
require 'fluent/plugin/output' # input, filter, output, parser, formatter, storage or buffer
subclass
所有的插件都是Fluent::Plugin::Base
的子類鬼譬。
class definition
module Fluent::Plugin
class SomeOutput < Output
...
end
end
register
注冊(cè)插件的名稱類別娜膘,需要根據(jù)這個(gè)來識(shí)別該插件,這里我們注冊(cè)了一個(gè)名為NAME
類別的output
插件
Fluent::Plugin.register_output('NAME', self)
helpers
以下的語法激活了線程helper
, 可以調(diào)用 thread_create(:check_send_result)
和thread_current_running?
:
# Load thread helper
helpers :thread
----
thread_create(:check_send_result) do
while thread_current_running?
sleep SENDDATA_CHECK_INTERVAL # == 5
if check_data_on_server(real_path, chunk_id)
# commit chunk
# chunk will be deleted and not be retried anymore by this call
commit_write(chunk_id)
break
end
end
end
----
config_param && desc
config_param
定義插件的參數(shù), desc
定義描述:
desc 'The port number'
# `config_param` Defines a parameter. You can refer the following parameter via @port instance variable.
# Without `:default`, a parameter is required.
config_param :port, :integer
config_section
定義一個(gè)可以嵌套的參數(shù)結(jié)構(gòu):
name
: 名稱.
options
:
-
root
: 是否激活為root
配置區(qū)域拧簸,內(nèi)部使用; -
param_name
: 子區(qū)域的名稱; -
final
: 激活后子類無法修改,buffer
配置區(qū)域就是通過這種方法實(shí)現(xiàn)劲绪。 -
init
:激活后,必須要有初始默認(rèn)值; -
required
: 激活后贾富,整個(gè)配置區(qū)域會(huì)被設(shè)為必須配置項(xiàng), 否則會(huì)報(bào)錯(cuò)歉眷; -
multi
: 激活后可以多次配置該配置區(qū)域; -
alias
: Alias for this section.
參考:
config_section :user, param_name: :users, multi: true, required: false do
desc 'Username for authentication'
config_param :username, :string
desc 'Password for authentication'
config_param :password, :string, secret: true
end
接口實(shí)現(xiàn)和內(nèi)置方法調(diào)用
如果output
沒有使用buffer
就需要實(shí)現(xiàn)process(tag, es)
方法颤枪,反之汗捡,則需要實(shí)現(xiàn)write
(同步)和try_write
方法(異步)。
#### Non-Buffered Output #############################
# Implement `process()` if your plugin is non-buffered.
# Read "Non-Buffered output" for details.
######################################################
def process(tag, es)
#### Sync Buffered Output ##############################
# Implement `write()` if your plugin uses normal buffer.
# Read "Sync Buffered Output" for details.
########################################################
def write(chunk)
#### Async Buffered Output #############################
# Implement `try_write()` if you want to defer committing
# chunks. Read "Async Buffered Output" for details.
########################################################
def try_write(chunk)
# Override `#format` if you want to customize how Fluentd stores
# events. Read the section "How to Customize the Serialization
# Format for Chunks" for details.
def format(tag, time, record)
更多接口實(shí)現(xiàn)和內(nèi)置方法可以訪問上文提到的鏈接畏纲。
補(bǔ)充介紹下configure(conf)
方法, conf
是Fluent::Config::Element
的一個(gè)實(shí)例扇住,實(shí)例變量和可訪問的方法需要super
調(diào)用之后才能可用。
def configure(conf)
super
# cache_default_value is created/configured by config_param
@in_memory_cache = Hash.new(@cache_default_value)
end
學(xué)習(xí)如何編寫一個(gè)fluentd
插件
掌握相關(guān)語法后盗胀,我們?cè)囍治鱿律掀恼绿岬降?code>fluentd插件如何實(shí)現(xiàn)根據(jù)namespaces
和labels
轉(zhuǎn)發(fā)日志的功能艘蹋。
require
require "fluent/plugin/output"
require 'prometheus/client'
class定義
按照官方的說法, 這里繼承Output
即可票灰,如果不是做了巨大的改變女阀,一般不推薦直接繼承BareOutput
:
class LabelRouterOutput < BareOutput
register
注冊(cè)了一個(gè)名為label_router
的type
:
Fluent::Plugin.register_output("label_router", self)
helpers
激活event_emitter
和record_accessor
兩個(gè)helper api
:
helpers :event_emitter, :record_accessor
---
# event_emitter
# 1. emit event
router.emit(tag, time, record)
# 2. emit event stream
router.emit_stream(tag, es)
---
# record_accessor
# 1. Call `record_accessor_create` to create object
@accessor = record_accessor_create('$.user.name')
# 2. Call `call` method to get value
value = @accessor.call(record) # With `$.user.name`, access to record["user"]["name"]
---
config_param
emit_mode
: list
類型,可選值為batch
或者record
;
sticky_tags
: bool
類型屑迂,默認(rèn)為true
, 相同的tag
使用相同的方法浸策;
default_route
:string
類型,默認(rèn)為空惹盼,無法匹配時(shí)使用默認(rèn)標(biāo)簽庸汗;
default_tag
:string
類型,默認(rèn)為空, 無法匹配時(shí)使用默認(rèn)tag
手报;
metrics
: bool
類型蚯舱,默認(rèn)為false
,是否激活監(jiān)控昧诱;
config_section
定義了兩層嵌套配置區(qū)域晓淀。
第一層,子嵌套配置區(qū)域名稱為routes
盏档,可以配置多個(gè)route
,route
詳細(xì)參數(shù)如下:
@label
: 類型為string
燥爷,默認(rèn)為nil
蜈亩,如果子區(qū)域的選擇器命中匹配到,則會(huì)新建一個(gè)名為@label
值的label
給該record
;
tag
: 類型為string
, 如果子區(qū)域匹配到前翎,則會(huì)新建一個(gè)名為tag
值的tag
給給該record
稚配,前提是這個(gè)新tag
不為空;
metrics_labels
: 類型為string
, 配置額外的metrics labels
;
第二層子嵌套配置區(qū)域名稱為matches
,可以配置多個(gè)match
港华,match
詳細(xì)參數(shù)如下:
labels
: hash
類型, 例如app:nginx
namespaces
: array
類型道川,默認(rèn)是[]
, 需要過濾的命名空間在這里定義;
hosts
:array
類型,默認(rèn)是[]
, 需要過濾的hosts
在這里定義冒萄;
container_names
: array
類型臊岸,默認(rèn)是[]
, 需要過濾的container_names
在這里定義;
negate
: bool
類型尊流,用來標(biāo)記為反選帅戒,默認(rèn)為false
;
接口實(shí)現(xiàn)和內(nèi)置方法
首先,定義了一個(gè)Route
類共給初始化配置時(shí)調(diào)用崖技,具體的邏輯可以不用看逻住,只需要注意它實(shí)現(xiàn)了兩個(gè)方法,分別用于逐個(gè)處理和批處理迎献,處理完畢后將計(jì)數(shù)器增加size
個(gè)計(jì)數(shù):
下面直接看configure(conf)
部分:
def configure(conf)
super
@registry = (::Prometheus::Client.registry if @metrics)
@route_map = Hash.new { |h, k| h[k] = Set.new }
@mutex = Mutex.new
@routers = []
@default_router = nil
@routes.each do |rule|
route_router = event_emitter_router(rule['@label'])
@routers << Route.new(rule, route_router, @registry)
end
if @default_route != '' or @default_tag != ''
default_rule = { 'matches' => nil, 'tag' => @default_tag, '@label' => @default_route}
@default_router = Route.new(default_rule, event_emitter_router(@default_route), @registry)
end
@access_to_labels = record_accessor_create("$.kubernetes.labels")
@access_to_namespace = record_accessor_create("$.kubernetes.namespace_name")
@access_to_host = record_accessor_create("$.kubernetes.host")
@access_to_container_name = record_accessor_create("$.kubernetes.container_name")
@batch = @emit_mode == :batch
end
這里定義了一些初始化默認(rèn)值和實(shí)例變量瞎访,需要注意的是routers
這個(gè)數(shù)組的值,存放的是定義的Route
實(shí)例吁恍, 其中装诡, event_emitter_router
是helpers api
導(dǎo)入的函數(shù)。
@routes.each do |rule|
route_router = event_emitter_router(rule['@label'])
@routers << Route.new(rule, route_router, @registry)
end
參考上文践盼,由于沒有定義buffer
組件鸦采,只需要實(shí)現(xiàn)process
方法即可:
上面這個(gè)函數(shù)基本上囊括了整個(gè)處理邏輯,無非是做一些匹配以及根據(jù)參數(shù)做一些控制流咕幻,來觸發(fā)router
實(shí)例中emit
和emit_es
方法渔伯。
整個(gè)邏輯很簡(jiǎn)單的。如果開啟了強(qiáng)制匹配tag
的模式肄程,會(huì)在route_map
中尋找該tag
锣吼,做一次快速處理,否則會(huì)拿著組裝的input_metadata
去做匹配蓝厌,如果匹配到則觸發(fā)上面的兩個(gè)emit
方法玄叠,沒有一個(gè)批次全部沒匹配到就會(huì)判斷有沒有默認(rèn)router
來觸發(fā),最后拓提,會(huì)觸發(fā)一次批量emit_es
读恃。
至此,我們探討了一下如果編寫fluentd
插件的流程代态,希望對(duì)你有所幫助寺惫!
PS: 碼字不易,歡迎點(diǎn)贊收藏~
本文由博客一文多發(fā)平臺(tái) OpenWrite 發(fā)布蹦疑!