修訂:
2022.12.01 初始版本
2022.12.13 增加了apisix/router.lua文件的修改方法
輸出格式
[{
"upstream_bytes_received": "132",
"upstream_connect_time": "0.002",
"protocol": "TCP",
"service_id": "",
"status": "200",
"session_time": "0.007",
"bytes_sent": "132",
"client_ip": "127.0.0.1",
"route_id": "790206901602246656",
"upstream_addr": "10.209.151.28:8099",
"upstream_bytes_sent": "79",
"bytes_received": "79"
}]
代碼修改
apisix/init.lua
原代碼
function _M.stream_log_phase()
core.log.info("enter stream_log_phase")
-- core.ctx.release_vars(api_ctx)
run_plugin("log")
end
修改后代碼
function _M.stream_log_phase()
local ngx_ctx = ngx.ctx
local api_ctx = ngx_ctx.api_ctx
core.ctx.set_vars_meta(api_ctx)
if router.global_rules and router.global_rules.values
and #router.global_rules.values > 0 then
local plugins = core.tablepool.fetch("plugins", 32, 0)
local values = router.global_rules.values
for _, global_rule in config_util.iterate_values(values) do
api_ctx.conf_type = "global_rule"
api_ctx.conf_version = global_rule.modifiedIndex
api_ctx.conf_id = global_rule.value.id
api_ctx.plugins = plugin.stream_filter(global_rule, plugins)
run_plugin("log", plugins, api_ctx)
end
core.tablepool.release("log", plugins)
api_ctx.global_rules = router.global_rules
end
run_plugin("log")
end
apisix/router.lua
源代碼
function _M.stream_init_worker()
local router_stream = require("apisix.stream.router.ip_port")
router_stream.stream_init_worker(filter)
_M.router_stream = router_stream
end
修改后代碼
function _M.stream_init_worker()
local router_stream = require("apisix.stream.router.ip_port")
router_stream.stream_init_worker(filter)
_M.router_stream = router_stream
local global_rules, err = core.config.new("/global_rules", {
automatic = true,
item_schema = core.schema.global_rule
})
if not global_rules then
error("failed to create etcd instance for fetching /global_rules : "
.. err)
end
_M.global_rules = global_rules
end
apisix/utils/log-util.lua
增加函數(shù)及導(dǎo)出
local function get_stream_full_log(ngx, conf)
local ctx = ngx.ctx.api_ctx
local var = ctx.var
local service_id
local route_id
local matched_route = ctx.matched_route and ctx.matched_route.value
if matched_route then
service_id = matched_route.service_id or ""
route_id = matched_route.id
else
service_id = var.host
end
local log = {
upstream_addr = var.upstream_addr,
service_id = service_id,
route_id = route_id,
consumer = ctx.consumer,
client_ip = var.remote_addr,
session_time = var.session_time,
protocol = var.protocol,
status = var.status,
bytes_sent = var.bytes_sent,
bytes_received = var.bytes_received,
upstream_bytes_sent = var.upstream_bytes_sent,
upstream_bytes_received = var.upstream_bytes_received,
upstream_connect_time = var.upstream_connect_time
}
return log
end
_M.get_stream_full_log = get_stream_full_log
apisix/stream/plugins/kafka-logger.lua(新增)
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
local batch_processor = require("apisix.utils.batch-processor")
local pairs = pairs
local type = type
local table = table
local ipairs = ipairs
local plugin_name = "kafka-logger"
local stale_timer_running = false;
local timer_at = ngx.timer.at
local tostring = tostring
local ngx = ngx
local buffers = {}
local schema = {
type = "object",
properties = {
broker_list = {
type = "object"
},
kafka_topic = {type = "string"},
key = {type = "string"},
timeout = {type = "integer", minimum = 1, default = 3},
name = {type = "string", default = "kafka logger"},
max_retry_count = {type = "integer", minimum = 0, default = 0},
retry_delay = {type = "integer", minimum = 0, default = 1},
buffer_duration = {type = "integer", minimum = 1, default = 60},
inactive_timeout = {type = "integer", minimum = 1, default = 5},
batch_max_size = {type = "integer", minimum = 1, default = 1000},
include_req_body = {type = "boolean", default = false}
},
required = {"broker_list", "kafka_topic", "key"}
}
local _M = {
version = 0.1,
priority = 403,
name = plugin_name,
schema = schema,
}
function _M.check_schema(conf)
return core.schema.check(schema, conf)
end
local function send_kafka_data(conf, log_message)
if core.table.nkeys(conf.broker_list) == 0 then
core.log.error("failed to identify the broker specified")
end
local broker_list = {}
local broker_config = {}
for host, port in pairs(conf.broker_list) do
if type(host) == 'string'
and type(port) == 'number' then
local broker = {
host = host, port = port
}
table.insert(broker_list,broker)
end
end
broker_config["request_timeout"] = conf.timeout * 1000
local prod, err = producer:new(broker_list,broker_config)
if err then
return nil, "failed to identify the broker specified: " .. err
end
local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
if not ok then
return nil, "failed to send data to Kafka topic" .. err
end
end
-- remove stale objects from the memory after timer expires
local function remove_stale_objects(premature)
if premature then
return
end
for key, batch in ipairs(buffers) do
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
core.log.debug("removing batch processor stale object, route id:", tostring(key))
buffers[key] = nil
end
end
stale_timer_running = false
end
function _M.log(conf)
local entry = log_util.get_stream_full_log(ngx, conf)
if not entry.route_id then
core.log.error("failed to obtain the route id for kafka logger")
return
end
local log_buffer = buffers[entry.route_id]
if not stale_timer_running then
-- run the timer every 30 mins if any log is present
timer_at(1800, remove_stale_objects)
stale_timer_running = true
end
if log_buffer then
log_buffer:push(entry)
return
end
-- Generate a function to be executed by the batch processor
local func = function(entries, batch_max_size)
local data, err
if batch_max_size == 1 then
data, err = core.json.encode(entries[1]) -- encode as single {}
else
data, err = core.json.encode(entries) -- encode as array [{}]
end
if not data then
return false, 'error occurred while encoding the data: ' .. err
end
return send_kafka_data(conf, data)
end
local config = {
name = conf.name,
retry_delay = conf.retry_delay,
batch_max_size = conf.batch_max_size,
max_retry_count = conf.max_retry_count,
buffer_duration = conf.buffer_duration,
inactive_timeout = conf.inactive_timeout,
}
local err
log_buffer, err = batch_processor:new(func, config)
if not log_buffer then
core.log.error("error when creating the batch processor: ", err)
return
end
buffers[entry.route_id] = log_buffer
log_buffer:push(entry)
end
return _M
配置文件修改
config.yaml
stream_plugins:
- mqtt-proxy
修改為
stream_plugins:
- mqtt-proxy
- kafka-logger