目前是公司正在搞得項(xiàng)目, 所以只說思路, 具體代碼就不貼了
修改配置中的寫出類型
suricata.yaml配置中, 修改eve.json的filetype為kafka, 并在下面新增幾個(gè)配置傳入kafka的broker, topic, clientid(若有)等信息
增加對(duì)kafka寫出類型的識(shí)別
主要修改 output-json.c 文件
可以搜索 LOGFILE_TYPE_REDIS 關(guān)鍵字, 仿照類似的方式增加對(duì)Kafka類型的識(shí)別, 并設(shè)置 json_ctx->json_out 的值, 方便后面的代碼流程判斷寫出類型
讀kafka配置也是一樣的方法, 根據(jù) LOGFILE_TYPE_REDIS 關(guān)鍵字找到相關(guān)的 ConfNodeLookupChild 方法, 從配置中讀內(nèi)容存到指針節(jié)點(diǎn)中, 方便后面的函數(shù)從指針取配置信息
響應(yīng)kafka類型輸出事件
主要修改 util-logopenfile.c 文件
關(guān)鍵字 HAVE_LIBHIREDIS 和 LOGFILE_TYPE_REDIS, 找到緩沖區(qū)內(nèi)容寫到Redis的相關(guān)判斷邏輯以及主函數(shù), 增加寫Kafka的判斷邏輯, 之后就只剩下實(shí)現(xiàn)一個(gè)寫kafka的主函數(shù)了
緩沖區(qū)寫Kafka函數(shù)
如果直接無腦用一個(gè)函數(shù)把緩沖區(qū)的內(nèi)容打到Kafka, 會(huì)導(dǎo)致連接池爆滿(每個(gè)流量寫到kafka時(shí)都會(huì)創(chuàng)建新的client), 所以還是要參考Redis的寫出方法, 先在前面用init方法將kafka對(duì)象放到指針里, 以file like的方式實(shí)現(xiàn)函數(shù)將數(shù)據(jù)寫到kafka.
kafka consumer參考:
https://www.cnblogs.com/GnibChen/p/8604544.html
https://blog.csdn.net/qq849635649/article/details/77915615
其他還有一些比較細(xì)的小步驟上文沒有提到, 比如新增報(bào)錯(cuò)類型, header文件增加對(duì)應(yīng)結(jié)構(gòu)體, 修改Makefile文件加入源文件等.