apisix-1.5支持4層kafka日志采集

修訂:
2022.12.01 初始版本
2022.12.13 增加了apisix/router.lua文件的修改方法

輸出格式

[{
    "upstream_bytes_received": "132",
    "upstream_connect_time": "0.002",
    "protocol": "TCP",
    "service_id": "",
    "status": "200",
    "session_time": "0.007",
    "bytes_sent": "132",
    "client_ip": "127.0.0.1",
    "route_id": "790206901602246656",
    "upstream_addr": "10.209.151.28:8099",
    "upstream_bytes_sent": "79",
    "bytes_received": "79"
}]

代碼修改

apisix/init.lua

原代碼

function _M.stream_log_phase()
    core.log.info("enter stream_log_phase")
    -- core.ctx.release_vars(api_ctx)
    run_plugin("log")
end

修改后代碼

function _M.stream_log_phase()
    local ngx_ctx = ngx.ctx
    local api_ctx = ngx_ctx.api_ctx
    core.ctx.set_vars_meta(api_ctx)
    if router.global_rules and router.global_rules.values
       and #router.global_rules.values > 0 then
        local plugins = core.tablepool.fetch("plugins", 32, 0)
        local values = router.global_rules.values
        for _, global_rule in config_util.iterate_values(values) do
            api_ctx.conf_type = "global_rule"
            api_ctx.conf_version = global_rule.modifiedIndex
            api_ctx.conf_id = global_rule.value.id
            api_ctx.plugins = plugin.stream_filter(global_rule, plugins)
            
            run_plugin("log", plugins, api_ctx)
        end

        core.tablepool.release("log", plugins)

        api_ctx.global_rules = router.global_rules
    end
    run_plugin("log")
end

apisix/router.lua

源代碼

function _M.stream_init_worker()
    local router_stream = require("apisix.stream.router.ip_port")
    router_stream.stream_init_worker(filter)
    _M.router_stream = router_stream
end

修改后代碼

function _M.stream_init_worker()
    local router_stream = require("apisix.stream.router.ip_port")
    router_stream.stream_init_worker(filter)
    _M.router_stream = router_stream
    local global_rules, err = core.config.new("/global_rules", {
            automatic = true,
            item_schema = core.schema.global_rule
        })
    if not global_rules then
        error("failed to create etcd instance for fetching /global_rules : "
              .. err)
    end
    _M.global_rules = global_rules
end

apisix/utils/log-util.lua

增加函數(shù)及導(dǎo)出

local function get_stream_full_log(ngx, conf)
    local ctx = ngx.ctx.api_ctx
    local var = ctx.var
    local service_id
    local route_id
  
    local matched_route = ctx.matched_route and ctx.matched_route.value

    if matched_route then
        service_id = matched_route.service_id or ""
        route_id = matched_route.id
    else
        service_id = var.host
    end

    local log =  {
        upstream_addr = var.upstream_addr,
        service_id = service_id,
        route_id = route_id,
        consumer = ctx.consumer,
        client_ip = var.remote_addr,
        session_time = var.session_time,
        protocol = var.protocol,
        status = var.status,
        bytes_sent = var.bytes_sent,
        bytes_received = var.bytes_received,
        upstream_bytes_sent = var.upstream_bytes_sent,
        upstream_bytes_received = var.upstream_bytes_received,
        upstream_connect_time = var.upstream_connect_time
      
    }

    return log
end


_M.get_stream_full_log = get_stream_full_log

apisix/stream/plugins/kafka-logger.lua(新增)

--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements.  See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License.  You may obtain a copy of the License at
--
--     http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core     = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
local batch_processor = require("apisix.utils.batch-processor")
local pairs    = pairs
local type     = type
local table    = table
local ipairs   = ipairs
local plugin_name = "kafka-logger"
local stale_timer_running = false;
local timer_at = ngx.timer.at
local tostring = tostring
local ngx = ngx
local buffers = {}

local schema = {
    type = "object",
    properties = {
        broker_list = {
            type = "object"
        },
        kafka_topic = {type = "string"},
        key = {type = "string"},
        timeout = {type = "integer", minimum = 1, default = 3},
        name = {type = "string", default = "kafka logger"},
        max_retry_count = {type = "integer", minimum = 0, default = 0},
        retry_delay = {type = "integer", minimum = 0, default = 1},
        buffer_duration = {type = "integer", minimum = 1, default = 60},
        inactive_timeout = {type = "integer", minimum = 1, default = 5},
        batch_max_size = {type = "integer", minimum = 1, default = 1000},
        include_req_body = {type = "boolean", default = false}
    },
    required = {"broker_list", "kafka_topic", "key"}
}

local _M = {
    version = 0.1,
    priority = 403,
    name = plugin_name,
    schema = schema,
}


function _M.check_schema(conf)
    return core.schema.check(schema, conf)
end


local function send_kafka_data(conf, log_message)
    if core.table.nkeys(conf.broker_list) == 0 then
        core.log.error("failed to identify the broker specified")
    end

    local broker_list = {}
    local broker_config = {}

    for host, port  in pairs(conf.broker_list) do
        if type(host) == 'string'
            and type(port) == 'number' then

            local broker = {
                host = host, port = port
            }
            table.insert(broker_list,broker)
        end
    end

    broker_config["request_timeout"] = conf.timeout * 1000

    local prod, err = producer:new(broker_list,broker_config)
    if err then
        return nil, "failed to identify the broker specified: " .. err
    end

    local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
    if not ok then
        return nil, "failed to send data to Kafka topic" .. err
    end
end

-- remove stale objects from the memory after timer expires
local function remove_stale_objects(premature)
    if premature then
        return
    end

    for key, batch in ipairs(buffers) do
        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
            core.log.debug("removing batch processor stale object, route id:", tostring(key))
            buffers[key] = nil
        end
    end

    stale_timer_running = false
end


function _M.log(conf)
    local entry = log_util.get_stream_full_log(ngx, conf)

    if not entry.route_id then
        core.log.error("failed to obtain the route id for kafka logger")
        return
    end

    local log_buffer = buffers[entry.route_id]

    if not stale_timer_running then
        -- run the timer every 30 mins if any log is present
        timer_at(1800, remove_stale_objects)
        stale_timer_running = true
    end

    if log_buffer then
        log_buffer:push(entry)
        return
    end

    -- Generate a function to be executed by the batch processor
    local func = function(entries, batch_max_size)
        local data, err
        if batch_max_size == 1 then
            data, err = core.json.encode(entries[1]) -- encode as single {}
        else
            data, err = core.json.encode(entries) -- encode as array [{}]
        end

        if not data then
            return false, 'error occurred while encoding the data: ' .. err
        end

        return send_kafka_data(conf, data)
    end

    local config = {
        name = conf.name,
        retry_delay = conf.retry_delay,
        batch_max_size = conf.batch_max_size,
        max_retry_count = conf.max_retry_count,
        buffer_duration = conf.buffer_duration,
        inactive_timeout = conf.inactive_timeout,
    }

    local err
    log_buffer, err = batch_processor:new(func, config)

    if not log_buffer then
        core.log.error("error when creating the batch processor: ", err)
        return
    end

    buffers[entry.route_id] = log_buffer
    log_buffer:push(entry)
end

return _M

配置文件修改

config.yaml

stream_plugins:
  - mqtt-proxy

修改為

stream_plugins:
  - mqtt-proxy
  - kafka-logger
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過(guò)簡(jiǎn)信或評(píng)論聯(lián)系作者肌访。
  • 序言:七十年代末疑苫,一起剝皮案震驚了整個(gè)濱河市倘是,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌供汛,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,907評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異梯皿,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)县恕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)东羹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人忠烛,你說(shuō)我怎么就攤上這事属提。” “怎么了美尸?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,298評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵冤议,是天一觀(guān)的道長(zhǎng)。 經(jīng)常有香客問(wèn)我师坎,道長(zhǎng)恕酸,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,586評(píng)論 1 293
  • 正文 為了忘掉前任胯陋,我火速辦了婚禮蕊温,結(jié)果婚禮上袱箱,老公的妹妹穿的比我還像新娘。我一直安慰自己义矛,他們只是感情好犯眠,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,633評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著症革,像睡著了一般筐咧。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上噪矛,一...
    開(kāi)封第一講書(shū)人閱讀 51,488評(píng)論 1 302
  • 那天量蕊,我揣著相機(jī)與錄音,去河邊找鬼艇挨。 笑死残炮,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的缩滨。 我是一名探鬼主播势就,決...
    沈念sama閱讀 40,275評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼脉漏!你這毒婦竟也來(lái)了苞冯?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,176評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤侧巨,失蹤者是張志新(化名)和其女友劉穎舅锄,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體司忱,經(jīng)...
    沈念sama閱讀 45,619評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡皇忿,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,819評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了坦仍。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鳍烁。...
    茶點(diǎn)故事閱讀 39,932評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖繁扎,靈堂內(nèi)的尸體忽然破棺而出幔荒,到底是詐尸還是另有隱情,我是刑警寧澤锻离,帶...
    沈念sama閱讀 35,655評(píng)論 5 346
  • 正文 年R本政府宣布铺峭,位于F島的核電站,受9級(jí)特大地震影響汽纠,放射性物質(zhì)發(fā)生泄漏卫键。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,265評(píng)論 3 329
  • 文/蒙蒙 一虱朵、第九天 我趴在偏房一處隱蔽的房頂上張望莉炉。 院中可真熱鬧钓账,春花似錦、人聲如沸絮宁。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,871評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)绍昂。三九已至啦粹,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間窘游,已是汗流浹背唠椭。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,994評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留忍饰,地道東北人贪嫂。 一個(gè)月前我還...
    沈念sama閱讀 48,095評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像艾蓝,于是被迫代替她去往敵國(guó)和親力崇。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,884評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 2022.12.13 每次都這樣 一定是自己出現(xiàn)了問(wèn)題 一定是這樣 多多自我反思 多多考慮自己哪里出現(xiàn)了問(wèn)題 客戶(hù)...
    心純見(jiàn)真閱讀 57評(píng)論 0 0
  • 大荔縣心理咨詢(xún)協(xié)會(huì)郭亞嬋堅(jiān)持分享的743天: 學(xué)習(xí)打卡第4天: 了解問(wèn)題與受督者的互動(dòng)赢织,并對(duì)焦之 通常亮靴,督...
    快樂(lè)有我_c00f閱讀 76評(píng)論 0 0
  • 今日請(qǐng)假在家,準(zhǔn)備賽課敌厘,可也沒(méi)閑著台猴。一早醒來(lái)督促家長(zhǎng)上傳核酸檢測(cè)陰性證明,回復(fù)各種各樣的問(wèn)題俱两。總算孩子們都...
    11文青青閱讀 68評(píng)論 0 0
  • 今天開(kāi)始綠色行程碼就失效了,可以自由地移動(dòng)讲婚,仿佛可以在遼闊的非洲大草原奔跑尿孔。此時(shí)頭腦是松些,身體還是緊的筹麸,才下眉頭...
    自在的小孩閱讀 351評(píng)論 0 0
  • 綜合來(lái)看活合,11月的金融數(shù)據(jù),并不樂(lè)觀(guān)物赶“字福或者說(shuō)遠(yuǎn)沒(méi)有達(dá)到央行希望的寬信用效果,甚至還有點(diǎn)背道而馳酵紫,或者說(shuō)央行把事情搞...
    唐關(guān)閱讀 73評(píng)論 0 0