如何寫一個(gè)轉(zhuǎn)發(fā)日志的fluentd插件早抠?

如何寫一個(gè)轉(zhuǎn)發(fā)日志的fluentd插件霎烙?

上篇介紹了logging-operator依賴于自定義的fluentd插件,實(shí)現(xiàn)了根據(jù)指定的namespaceslabels轉(zhuǎn)發(fā)日志蕊连,本篇將從以下幾個(gè)方面介紹如何編寫一個(gè)具有該功能集成的fluentd插件:

  • 確定要擴(kuò)展的插件類型

  • 相關(guān)語法詞法介紹

  • 學(xué)習(xí)如何編寫一個(gè)fluentd插件

確定要擴(kuò)展的插件類型

根據(jù)我們的需求悬垃, 需要按照namespaceslabels來完成日志的轉(zhuǎn)發(fā),這依賴于kubernetes元數(shù)據(jù)甘苍。kubernetes元數(shù)據(jù)的獲取并不在fluentd階段配置尝蠕,而是在轉(zhuǎn)發(fā)給fluentd之前,依賴于fluent-bit的配置载庭。

https://docs.fluentbit.io/manual/pipeline/filters/kubernetes#workflow-of-tail-kubernetes-filter

$ kubectl get secrets defaultlogging-fluentbit  -o json | jq '.data."fluent-bit.conf"' | xargs echo | base64 --decode
[SERVICE]
    Flush        1
    Grace        5
    Daemon       Off
    Log_Level    info
    Parsers_File parsers.conf
    Coro_Stack_Size    24576
    storage.path  /buffers

[INPUT]
    Name         tail
    DB  /tail-db/tail-containers-state.db
    Mem_Buf_Limit  5MB
    Parser  docker
    Path  /var/log/containers/*.log
    Refresh_Interval  5
    Skip_Long_Lines  On
    Tag  kubernetes.*
[FILTER]
    Name        kubernetes
    Buffer_Size  0
    Kube_CA_File  /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
    Kube_Tag_Prefix  kubernetes.var.log.containers
    Kube_Token_File  /var/run/secrets/kubernetes.io/serviceaccount/token
    Kube_URL  https://kubernetes.default.svc:443
    Match  kubernetes.*
    Merge_Log  On

在確定好該配置后激活后看彼,我們來到fluentd這一層,需要編寫一個(gè)output插件來完成過濾囚聚、轉(zhuǎn)發(fā)功能靖榕。

相關(guān)語法詞法介紹

詳細(xì)樣例參考:https://docs.fluentd.org/plugin-development/api-plugin-output

上面鏈接中搬運(yùn)過來就是這樣的:

require 'fluent/plugin/output'

module Fluent::Plugin
  class SomeOutput < Output
    # First, register the plugin. 'NAME' is the name of this plugin
    # and identifies the plugin in the configuration file.
    Fluent::Plugin.register_output('NAME', self)

    # Enable threads if you are writing an async buffered plugin.
    helpers :thread

    # Define parameters for your plugin.
    config_param :path, :string

    #### Non-Buffered Output #############################
    # Implement `process()` if your plugin is non-buffered.
    # Read "Non-Buffered output" for details.
    ######################################################
    def process(tag, es)
      es.each do |time, record|
        # output events to ...
      end
    end

    #### Sync Buffered Output ##############################
    # Implement `write()` if your plugin uses normal buffer.
    # Read "Sync Buffered Output" for details.
    ########################################################
    def write(chunk)
      real_path = extract_placeholders(@path, chunk)

      log.debug 'writing data to file', chunk_id: dump_unique_id_hex(chunk.unique_id)

      # For standard chunk format (without `#format()` method)
      chunk.each do |time, record|
        # output events to ...
      end

      # For custom format (when `#format()` implemented)
      # File.open(real_path, 'w+')

      # or `#write_to(io)` is available
      # File.open(real_path, 'w+') do |file|
      #   chunk.write_to(file)
      # end
    end

    #### Async Buffered Output #############################
    # Implement `try_write()` if you want to defer committing
    # chunks. Read "Async Buffered Output" for details.
    ########################################################
    def try_write(chunk)
      real_path = extract_placeholders(@path, chunk)

      log.debug 'sending data to server', chunk_id: dump_unique_id_hex(chunk.unique_id)

      send_data_to_server(@host, real_path, chunk.read)

      chunk_id = chunk.unique_id

      # Create a thread for deferred commit.
      thread_create(:check_send_result) do
        while thread_current_running?
          sleep SENDDATA_CHECK_INTERVAL # == 5

          if check_data_on_server(real_path, chunk_id)
            # commit chunk
            # chunk will be deleted and not be retried anymore by this call
            commit_write(chunk_id)
            break
          end
        end
      end
    end

    # Override `#format` if you want to customize how Fluentd stores
    # events. Read the section "How to Customize the Serialization
    # Format for Chunks" for details.
    def format(tag, time, record)
      [tag, time, record].to_json
    end
  end
end

我將一個(gè)插件的編寫規(guī)范整理為兩類,一類是骨架定義顽铸,一類是子類邏輯實(shí)現(xiàn):

  • 骨架定義部分包括require茁计、moduleclass definition
  • 子類邏輯實(shí)現(xiàn)又包括插件注冊(cè)谓松、參數(shù)定義星压、激活配置等前置邏輯和具體接口實(shí)現(xiàn)和內(nèi)置方法調(diào)用的邏輯。
require

根據(jù)需要編寫的插件類型導(dǎo)入依賴:

require 'fluent/plugin/output' # input, filter, output, parser, formatter, storage or buffer
subclass

所有的插件都是Fluent::Plugin::Base的子類鬼譬。

class definition
module Fluent::Plugin
    class SomeOutput < Output
        ...
    end
end
register

注冊(cè)插件的名稱類別娜膘,需要根據(jù)這個(gè)來識(shí)別該插件,這里我們注冊(cè)了一個(gè)名為NAME類別的output插件

Fluent::Plugin.register_output('NAME', self)
helpers

https://docs.fluentd.org/plugin-helper-overview

以下的語法激活了線程helper, 可以調(diào)用 thread_create(:check_send_result)thread_current_running?

# Load thread helper
helpers :thread
----
 thread_create(:check_send_result) do
   while thread_current_running?
     sleep SENDDATA_CHECK_INTERVAL # == 5
     if check_data_on_server(real_path, chunk_id)
        # commit chunk
        # chunk will be deleted and not be retried anymore by this call
        commit_write(chunk_id)
        break
     end
   end
 end
----
config_param && desc

config_param定義插件的參數(shù), desc定義描述:

desc 'The port number'
# `config_param` Defines a parameter. You can refer the following parameter via @port instance variable.
# Without `:default`, a parameter is required.
config_param :port, :integer
config_section

定義一個(gè)可以嵌套的參數(shù)結(jié)構(gòu):

name: 名稱.

options:

  • root: 是否激活為root配置區(qū)域拧簸,內(nèi)部使用;
  • param_name: 子區(qū)域的名稱;
  • final: 激活后子類無法修改, buffer配置區(qū)域就是通過這種方法實(shí)現(xiàn)劲绪。
  • init:激活后,必須要有初始默認(rèn)值;
  • required: 激活后贾富,整個(gè)配置區(qū)域會(huì)被設(shè)為必須配置項(xiàng), 否則會(huì)報(bào)錯(cuò)歉眷;
  • multi: 激活后可以多次配置該配置區(qū)域;
  • alias: Alias for this section.

參考:

config_section :user, param_name: :users, multi: true, required: false do
  desc 'Username for authentication'
  config_param :username, :string
  desc 'Password for authentication'
  config_param :password, :string, secret: true
end
接口實(shí)現(xiàn)和內(nèi)置方法調(diào)用

如果output沒有使用buffer就需要實(shí)現(xiàn)process(tag, es)方法颤枪,反之汗捡,則需要實(shí)現(xiàn)write(同步)和try_write方法(異步)。

#### Non-Buffered Output #############################
# Implement `process()` if your plugin is non-buffered.
# Read "Non-Buffered output" for details.
######################################################
def process(tag, es)

#### Sync Buffered Output ##############################
# Implement `write()` if your plugin uses normal buffer.
# Read "Sync Buffered Output" for details.
########################################################
def write(chunk)

#### Async Buffered Output #############################
# Implement `try_write()` if you want to defer committing
# chunks. Read "Async Buffered Output" for details.
########################################################
def try_write(chunk)

# Override `#format` if you want to customize how Fluentd stores
# events. Read the section "How to Customize the Serialization
# Format for Chunks" for details.
def format(tag, time, record)

更多接口實(shí)現(xiàn)和內(nèi)置方法可以訪問上文提到的鏈接畏纲。

補(bǔ)充介紹下configure(conf)方法, confFluent::Config::Element的一個(gè)實(shí)例扇住,實(shí)例變量和可訪問的方法需要super調(diào)用之后才能可用。

def configure(conf)
  super

  # cache_default_value is created/configured by config_param
  @in_memory_cache = Hash.new(@cache_default_value)
end

學(xué)習(xí)如何編寫一個(gè)fluentd插件

掌握相關(guān)語法后盗胀,我們?cè)囍治鱿律掀恼绿岬降?code>fluentd插件如何實(shí)現(xiàn)根據(jù)namespaceslabels轉(zhuǎn)發(fā)日志的功能艘蹋。

https://github.com/banzaicloud/fluent-plugin-label-router/blob/master/lib/fluent/plugin/out_label_router.rb#L22:11

require
require "fluent/plugin/output"
require 'prometheus/client'
class定義

按照官方的說法, 這里繼承Output即可票灰,如果不是做了巨大的改變女阀,一般不推薦直接繼承BareOutput

class LabelRouterOutput < BareOutput
register

注冊(cè)了一個(gè)名為label_routertype

Fluent::Plugin.register_output("label_router", self)
helpers

激活event_emitterrecord_accessor兩個(gè)helper api

helpers :event_emitter, :record_accessor
---
# event_emitter
# 1. emit event
router.emit(tag, time, record)
# 2. emit event stream
router.emit_stream(tag, es)
---
# record_accessor
# 1. Call `record_accessor_create` to create object
 @accessor = record_accessor_create('$.user.name')
# 2. Call `call` method to get value
value = @accessor.call(record) # With `$.user.name`, access to record["user"]["name"]
---
config_param

emit_mode: list類型,可選值為batch或者record;

sticky_tags: bool類型屑迂,默認(rèn)為true, 相同的tag使用相同的方法浸策;

default_routestring類型,默認(rèn)為空惹盼,無法匹配時(shí)使用默認(rèn)標(biāo)簽庸汗;

default_tagstring類型,默認(rèn)為空, 無法匹配時(shí)使用默認(rèn)tag手报;

metrics: bool類型蚯舱,默認(rèn)為false,是否激活監(jiān)控昧诱;

config_section

定義了兩層嵌套配置區(qū)域晓淀。

image-20210703114719912

第一層,子嵌套配置區(qū)域名稱為routes盏档,可以配置多個(gè)routeroute詳細(xì)參數(shù)如下:

@label: 類型為string燥爷,默認(rèn)為nil蜈亩,如果子區(qū)域的選擇器命中匹配到,則會(huì)新建一個(gè)名為@label值的label給該record;

tag: 類型為string, 如果子區(qū)域匹配到前翎,則會(huì)新建一個(gè)名為tag值的tag給給該record稚配,前提是這個(gè)新tag不為空;

metrics_labels: 類型為string, 配置額外的metrics labels;

第二層子嵌套配置區(qū)域名稱為matches,可以配置多個(gè)match港华,match詳細(xì)參數(shù)如下:

labels : hash 類型, 例如app:nginx

namespaces: array類型道川,默認(rèn)是[], 需要過濾的命名空間在這里定義;

hostsarray類型,默認(rèn)是[], 需要過濾的hosts在這里定義冒萄;

container_names: array類型臊岸,默認(rèn)是[], 需要過濾的container_names在這里定義;

negate: bool類型尊流,用來標(biāo)記為反選帅戒,默認(rèn)為false;

接口實(shí)現(xiàn)和內(nèi)置方法

首先,定義了一個(gè)Route類共給初始化配置時(shí)調(diào)用崖技,具體的邏輯可以不用看逻住,只需要注意它實(shí)現(xiàn)了兩個(gè)方法,分別用于逐個(gè)處理和批處理迎献,處理完畢后將計(jì)數(shù)器增加size個(gè)計(jì)數(shù):

image-20210703134141141

下面直接看configure(conf)部分:

def configure(conf)
    super
    @registry = (::Prometheus::Client.registry if @metrics)
    @route_map = Hash.new { |h, k| h[k] = Set.new }
    @mutex = Mutex.new
    @routers = []
    @default_router = nil
    @routes.each do |rule|
       route_router = event_emitter_router(rule['@label'])
       @routers << Route.new(rule, route_router, @registry)
    end

    if @default_route != '' or @default_tag != ''
       default_rule = { 'matches' => nil, 'tag' => @default_tag, '@label' => @default_route}
       @default_router = Route.new(default_rule, event_emitter_router(@default_route), @registry)
    end

    @access_to_labels = record_accessor_create("$.kubernetes.labels")
    @access_to_namespace = record_accessor_create("$.kubernetes.namespace_name")
    @access_to_host = record_accessor_create("$.kubernetes.host")
    @access_to_container_name = record_accessor_create("$.kubernetes.container_name")

    @batch = @emit_mode == :batch
end

這里定義了一些初始化默認(rèn)值和實(shí)例變量瞎访,需要注意的是routers這個(gè)數(shù)組的值,存放的是定義的Route實(shí)例吁恍, 其中装诡, event_emitter_routerhelpers api導(dǎo)入的函數(shù)。

https://github.com/fluent/fluentd/blob/5844f7209fec154a4e6807eb1bee6989d3f3297f/lib/fluent/plugin_helper/event_emitter.rb#L71

 @routes.each do |rule|
    route_router = event_emitter_router(rule['@label'])
    @routers << Route.new(rule, route_router, @registry)
 end

參考上文践盼,由于沒有定義buffer組件鸦采,只需要實(shí)現(xiàn)process方法即可:

image-20210703133736264

上面這個(gè)函數(shù)基本上囊括了整個(gè)處理邏輯,無非是做一些匹配以及根據(jù)參數(shù)做一些控制流咕幻,來觸發(fā)router實(shí)例中emitemit_es方法渔伯。

整個(gè)邏輯很簡(jiǎn)單的。如果開啟了強(qiáng)制匹配tag的模式肄程,會(huì)在route_map中尋找該tag锣吼,做一次快速處理,否則會(huì)拿著組裝的input_metadata去做匹配蓝厌,如果匹配到則觸發(fā)上面的兩個(gè)emit方法玄叠,沒有一個(gè)批次全部沒匹配到就會(huì)判斷有沒有默認(rèn)router來觸發(fā),最后拓提,會(huì)觸發(fā)一次批量emit_es读恃。

至此,我們探討了一下如果編寫fluentd插件的流程代态,希望對(duì)你有所幫助寺惫!

PS: 碼字不易,歡迎點(diǎn)贊收藏~

本文由博客一文多發(fā)平臺(tái) OpenWrite 發(fā)布蹦疑!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末西雀,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子歉摧,更是在濱河造成了極大的恐慌艇肴,老刑警劉巖腔呜,帶你破解...
    沈念sama閱讀 217,826評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異再悼,居然都是意外死亡核畴,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門帮哈,熙熙樓的掌柜王于貴愁眉苦臉地迎上來膛檀,“玉大人,你說我怎么就攤上這事娘侍】校” “怎么了?”我有些...
    開封第一講書人閱讀 164,234評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵憾筏,是天一觀的道長嚎杨。 經(jīng)常有香客問我,道長氧腰,這世上最難降的妖魔是什么枫浙? 我笑而不...
    開封第一講書人閱讀 58,562評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮古拴,結(jié)果婚禮上箩帚,老公的妹妹穿的比我還像新娘。我一直安慰自己黄痪,他們只是感情好紧帕,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,611評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著桅打,像睡著了一般是嗜。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上挺尾,一...
    開封第一講書人閱讀 51,482評(píng)論 1 302
  • 那天鹅搪,我揣著相機(jī)與錄音,去河邊找鬼遭铺。 笑死丽柿,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的掂僵。 我是一名探鬼主播航厚,決...
    沈念sama閱讀 40,271評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼锰蓬!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起眯漩,我...
    開封第一講書人閱讀 39,166評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤芹扭,失蹤者是張志新(化名)和其女友劉穎麻顶,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體舱卡,經(jīng)...
    沈念sama閱讀 45,608評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡辅肾,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,814評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了轮锥。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片矫钓。...
    茶點(diǎn)故事閱讀 39,926評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖舍杜,靈堂內(nèi)的尸體忽然破棺而出新娜,到底是詐尸還是另有隱情,我是刑警寧澤既绩,帶...
    沈念sama閱讀 35,644評(píng)論 5 346
  • 正文 年R本政府宣布概龄,位于F島的核電站,受9級(jí)特大地震影響饲握,放射性物質(zhì)發(fā)生泄漏私杜。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,249評(píng)論 3 329
  • 文/蒙蒙 一救欧、第九天 我趴在偏房一處隱蔽的房頂上張望衰粹。 院中可真熱鬧,春花似錦笆怠、人聲如沸铝耻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽田篇。三九已至,卻和暖如春箍铭,著一層夾襖步出監(jiān)牢的瞬間泊柬,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評(píng)論 1 269
  • 我被黑心中介騙來泰國打工诈火, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留兽赁,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,063評(píng)論 3 370
  • 正文 我出身青樓冷守,卻偏偏與公主長得像刀崖,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子拍摇,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,871評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容