背景
在日常工作中邑闺,可能會有這種需求,類似于監(jiān)控一個目錄下新文件的產(chǎn)生昼蛀,并且這些文件會實時的追加內(nèi)容宴猾,例如ngnix的切割日至,或者某些服務(wù)器上的仿真日志等叼旋。
實現(xiàn)
在之前公司工作中仇哆,有過類似需求,也自己實現(xiàn)過夫植,后來Flume 1.7正式發(fā)布了 TaildirSource讹剔。我們來看下Flume內(nèi)部是怎么實現(xiàn)這個功能的。
源碼
首先先思考一下大體的實現(xiàn)思路是怎樣的详民,最簡單的方法是記錄下該目錄下每個文件上一次的讀取位置延欠。在讀取之后更新最新的讀取位置。
在process方法中沈跨,
Paste_Image.png
通過對每個符合要求的文件進行處理由捎,其中的updateTailFiles方法是獲取當前的更新的inode 列表,具體的內(nèi)部實現(xiàn)是判斷文件最后的更新時間之類的一大坨饿凛,看看注釋就好了狞玛,
主要邏輯不難软驰, 但它是怎么容錯的呢,在Flume宕機重啟之后心肪,是如何知道上一次傳輸?shù)奈恢玫哪囟Э鳎@里,F(xiàn)lume將相應(yīng)的記錄位置保存在文件中硬鞍,來看源碼慧瘤。
其中的有兩個定時的單線程executor service,會定時保存當前的位置固该,重啟時會load這個文件锅减,這樣其實會有一個小問題,就是當channel已經(jīng)處理過event蹬音,然后在兩次executor service啟動期間上煤,系統(tǒng)宕機了,這樣再重啟之后呢著淆,會有這部分數(shù)據(jù)的重傳劫狠。也就是說這里保證的at least once,
還有一個可能出現(xiàn)不一致的點永部,existingInodes 是一個copyOnWriteList, 在executor service 運行過程中會有不一致的情況独泞。
如果要保證exactly once, 要怎么做呢。如果是比較簡單的實現(xiàn)方式的話苔埋,如果是我個人來做懦砂,會為每個event分配個遞增的id,通過在保存在channel端最新的處理的event的id來比較组橄,如果event的id比channel端的id舊荞膘,那就丟棄,否則就更新channel的id玉工。
可能是flume這樣的日志傳輸工具都不是為了金錢交易的場景設(shè)計的羽资,所以就沒有嚴格的執(zhí)行exactly once語義,個人猜測啊遵班。