Flume高可靠傳輸實現

簡介

Flume是Cloudera提供的一個高可用的套么,高可靠的培己,分布式的海量日志采集、聚合和傳輸的系統(tǒng)胚泌,Flume支持在日志系統(tǒng)中定制各類數據發(fā)送方省咨,用于收集數據;同時玷室,Flume提供對數據進行簡單處理零蓉,并寫到各種數據接受方(可定制)的能力。

image.png

Flume 作為 cloudera 開發(fā)的實時日志收集系統(tǒng)穷缤,受到了業(yè)界的認可與廣泛應用敌蜂。Flume 初始的發(fā)行版本目前被統(tǒng)稱為 Flume OG(original generation),屬于 cloudera津肛。但隨著 FLume 功能的擴展章喉,Flume OG 代碼工程臃腫、核心組件設計不合理身坐、核心配置不標準等缺點暴露出來秸脱,尤其是在 Flume OG 的最后一個發(fā)行版本 0.94.0 中,日志傳輸不穩(wěn)定的現象尤為嚴重部蛇,為了解決這些問題摊唇,2011 年 10 月 22 號,cloudera 完成了 Flume-728涯鲁,對 Flume 進行了里程碑式的改動:重構核心組件巷查、核心配置以及代碼架構有序,重構后的版本統(tǒng)稱為 Flume NG(next generation);改動的另一原因是將 Flume 納入 apache 旗下吮便,cloudera Flume 改名為 Apache Flume笔呀。

image.png

Flume的核心概念

l Event:是Flume數據傳輸的基本單元。flume以事件的形式將數據從源頭傳送到最終的目的髓需。Event由可選的hearders和載有數據的一個byte array構成许师。

l Clinet:是一個將原始數據包裝成events并且發(fā)送它們到一個或多個agent的實體

l Agent:一個Agent包含Sources, Channels, Sinks和其他組件,它利用這些組件將events從一個節(jié)點傳輸到另一個節(jié)點或最終目的僚匆。

l Source:負責接收events或通過特殊機制產生events微渠,并將events批量的放到一個或多個Channels。

l Channel:位于Source和Sink之間咧擂,用于緩存進來的events逞盆,當Sink成功的將events發(fā)送到下一跳的channel或最終目的,events從Channel移除松申。

l Sink:負責將events傳輸到下一跳或最終目的云芦,成功完成后將events從channel移除。

其中Event是Flume數據傳輸的基本單元贸桶。flume以事件的形式將數據從源頭傳送到最終的目的舅逸。Event由可選的hearders和載有數據的一個byte array構成。

載有的數據對flume是不透明的

Headers是容納了key-value字符串對的無序集合皇筛,key在集合內是唯一的琉历。

Headers可以在上下文路由中使用擴展

image.png
image.png

Flume以agent為最小的獨立運行單位。一個agent就是一個JVM水醋。單agent由Source旗笔、Sink和Channel三大組件構成

image.png

Flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位拄踪,它攜帶日志數據(字節(jié)數組形式)并且攜帶有頭信息蝇恶,這些Event由Agent外部的Source,比如上圖中的Web Server生成宫蛆。當Source捕獲事件后會進行特定的格式化艘包,然后Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區(qū)耀盗,它將保存事件直到Sink處理完該事件想虎。Sink負責持久化日志或者把事件推向另一個Source。

image.png

Flume的數據流細化一下則是首先Source捕捉到外部進來的Events叛拷,然后把Events提交給ChannelProcessor舌厨,ChannelProcessor先走一遍Interceptor(進行一些過濾處理),然后通過ChannelSelector引用對象獲得Channel列表忿薇,使用事務方式把Events提交到Channel裙椭,因此Source的Events提交到Channel實際上是在ChannelProcessor中進行的躏哩;而Sink則通過SinkProcessor去Channel中獲得Events并消費Events,整個過程就是一個生產者消費者模式揉燃。

高可靠傳輸實現原理

Flume使用事務的辦法來保證event的可靠傳遞扫尺。Source和Sink分別被封裝在事務中,這些事務由保存event的存儲提供或者由 Channel提供炊汤。這就保證了event在數據流的點對點傳輸中是可靠的正驻。在多級數據流中,如下圖抢腐,上一級的Sink和下一級的Source都被包含在事務中姑曙,保證數據可靠地從一個Channel到另一個Channel轉移

其次,數據流中 Channel的持久性迈倍。Flume中MemoryChannel是可能丟失數據的(當Agent死掉時)伤靠,而FileChannel是持久性的,提供類似mysql的日志機制啼染,保證數據不丟失宴合。

image.png

當一個正常的Flow運行時,每個Agent中的Channel中的Events數量是均衡(消費速度大于生產速度的情況)迹鹅,而一旦Agent直接出現故障形纺,那么Channel就會暫時持有Events,直到故障恢復(MemoryChannel可能會丟失Events)徒欣。

image.png

Flume啟動分析

image.png

Flume的Agent啟動是從Application的main函數開始的,首先把自己的實例注冊到了EventBus蜗字,然后通過LifecycleAware模式(類似Tomcat的開始結束模式)打肝,創(chuàng)建PollingPropertiesFileConfigurationProvider對象并執(zhí)行start()函數,在start()函數中挪捕,通過線程池啟動了一個FileWatcherRunnable任務去不斷的檢查啟動文件是否修改粗梭,第一次或者發(fā)現文件修改了的時候就去讀取配置文件,并通過EvenBus的post()方法響應讀取結果级零,而Application的主線程因為注冊過EventBus断医,handleConfigurationEvent()函數獲得post()事件消息后就會執(zhí)行先stopAllComponents(),然后startAllComponents(conf)奏纪,當執(zhí)行startAllComponents函數的時候鉴嗤,就會啟動channel、sink和source這三個核心組件序调,注意啟動順序是先channel醉锅,后sink,最后source发绢,這次才不會有消息丟失問題發(fā)生硬耍。整個啟動過程如圖所示垄琐。

簡化來說:

第一步:Application主線程啟動,通過PollingPropertiesFileConfigurationProvider獲取source经柴、sink狸窘、channel等配置信息

第二步:PollingPropertiesFileConfigurationProvider把配置信息通過事件總線廣播給Application主線程

第三步:Application主線程重新啟動channel、sink和source

image.png

Source分析

image.png

Source的繼承關系類圖如圖所示坯认,所有source均實現自source接口翻擒,接口方法只有兩個:setChannelProcessor()和getChannelProcessor(),所以所source具體的業(yè)務實現(比如把Events發(fā)送給Channel以及過程中的事務實現都是在ChannelProcessor中實現的)鹃操。

image.png

source又分為兩種類型:EventDrivenSource和PollableSource韭寸,PollableSource主要用于接收外部驅動程序的Events,比如來自Kafka的消息等荆隘,而其他source基本都是實現于EventDrivenSource恩伺,這種source不需要外部的驅動程序pollEvents,而是有自己的事件監(jiān)控獲得Events椰拒,比如SpoolDirectorySource晶渠,它可以從磁盤中某個文件獲取文件更新數據。

以SpoolDirectorySource為例燃观,其創(chuàng)建啟動過程如下:

第一步:Application主線程啟動的時候褒脯,通過AbstractConfigurationProvider(前面提到的PollingPropertiesFileConfigurationProvider的父類)獲取配置信息

第二步:當判斷配置為SpoolDirectorySource時,則通過SourceFactory實例化一個SpoolDirectorySource

第三步:AbstractConfigurationProvider調用實例化的SpoolDirectorySource對象的configure()進行初始化配置

第四步:Application主線程通過SpoolDirectoryRunnable(SpoolDirectorySource的內部類)啟動source并且500毫秒執(zhí)行一次

如下圖所示:

image.png

注:Application.startAllComponents()啟動source的時候其實最先啟動的是SourceRunner缆毁,Flume有兩類SourceRunner:EventDrivenSourceRunner和PollableSourceRunner番川,EventDrivenSourceRunner再啟動具體類型的source。

SpoolDirectorySource實例的每次執(zhí)行脊框,則會讀取具體目錄下的文件颁督,生成Event數據,通過ChannelProcessor把Event放入Channel浇雹,同時對文件的讀取位置進行標記沉御,下次則從標記位置進行讀取。如下圖所示昭灵。

image.png

Source提交Channel事務處理

image.png

之前也有說明吠裆,source是通過channelProcessor來提交Events的,如下圖ChannelProcessor.class所示烂完,channelprocessor先獲得一個事務试疙,然后開啟事務,之后才進行event的提交操作窜护,最后提交事務效斑,如果中間出現異常則進行事務回滾。(finally中還有一個事務關閉)

image.png

ChannelProcessor.class

image.png

FileChannel內部類FileBackedTransaction處理put代碼參考

FileChannel處理Event的時序

Channel有兩種:MemoryChannel和FileChannel,這里以FileChannel為例缓屠,其調用時序如下圖所示:

image.png

BasicTransactionSemantics.put()->BasicTransactionSemantics.put()->FileBackedTransaction.doPut()奇昙,在FileBackedTransaction執(zhí)行doPut()操作的時候執(zhí)行了兩步操作:

1、 調用Log.put()敌完,把Event寫入實體文件

2储耐、 調用FlumeEventQueue.addWithoutCommit(),把Event寫入隊列以便sink獲取

Sink分析

Sink從Channel消費Event滨溉,然后進行轉移到收集/聚合層或存儲層什湘,它的啟動過程和source類似是從Application的main主線程開始的,通過AbstractConfigurationProvider獲取配置信息晦攒,然后通過SinkFactory實例化具體Sink闽撤,然后調用sink實例的configure進行實例的初始化配置,最后通過SinkRunner啟動Sink實例脯颜。

和Source不同的是SinkRunner不直接啟動Sink實例哟旗,而是通過SinkProcessor異步啟動的。

image.png

SinkProcessor主要有三種:

l DefaultSinkProcessor:默認實現栋操,用于單個Sink的場景使用

l FailoverSinkProcessor:故障轉移實現

l LoadBalanceSinkProcessor:用于實現Sink的負載均衡

其類的繼承關系如圖所示:

image.png

多個Sink可以構成一個SinkGroup闸餐。一個Sink Processor負責從一個指定的Sink Group中激活一個Sink。Sink Processor可以通過組中所有Sink實現負載均衡矾芙;也可以在一個Sink失敗時轉移到另一個舍沙。

? Flume通過Sink Processor實現負載均衡(Load Balancing)和故障轉移(failover)

? 內建的SinkProcessors:

? Load Balancing Sink Processor – 使用RANDOM, ROUND_ROBIN或定制的選擇算法

? Failover Sink Processor

? Default Sink Processor(單Sink)

? 所有的Sink都是采取輪詢(polling)的方式從Channel上獲取events。這個動作是通過SinkRunner激活的

? Sink Processor充當Sink的一個代理

Sink Group

     Groups配置可以實現sink的負載均衡和失敗重試機制

? 負載均衡配置示例

a1.sinkgroups =g1

a1.sinkgroups.g1.sinks= k1 k2

a1.sinkgroups.g1.processor.type= load_balance

a1.sinkgroups.g1.processor.backoff= true

a1.sinkgroups.g1.processor.selector= random

? 失敗重試配置示例

a1.sinkgroups =g1

a1.sinkgroups.g1.sinks= k1 k2

a1.sinkgroups.g1.processor.type= failover

a1.sinkgroups.g1.processor.priority.k1= 5

a1.sinkgroups.g1.processor.priority.k2= 10

a1.sinkgroups.g1.processor.maxpenalty= 10000

Sink事務實現

image.png

以HBaseSink.class為例剔宪,首先從channel獲取一個事務拂铡,然后事務開啟后進行take操作,即從channel獲取Event葱绒,然后對Event進行消費處理(putEventsAndCommit即提交事務)和媳,處理完成后關閉事務。

image.png

引用美團的使用場景

參考自:http://www.aboutyun.com/thread-8317-1-1.html

image.png

a. 整個系統(tǒng)分為三層:Agent層哈街,Collector層和Store層。其中Agent層每個機器部署一個進程拒迅,負責對單機的日志收集工作骚秦;Collector層部署在中心服務器上,負責接收Agent層發(fā)送的日志璧微,并且將日志根據路由規(guī)則寫到相應的Store層中作箍;Store層負責提供永久或者臨時的日志存儲服務,或者將日志流導向其它服務器前硫。

b. Agent到Collector使用LoadBalance策略胞得,將所有的日志均衡地發(fā)到所有的Collector上,達到負載均衡的目標屹电,同時并處理單個Collector失效的問題阶剑。

c. Collector層的目標主要有三個:SinkHdfs,SinkKafka和SinkBypass跃巡。分別提供離線的數據到Hdfs,和提供實時的日志流到Kafka和Bypass牧愁。其中SinkHdfs又根據日志量的大小分為SinkHdfs_b素邪,SinkHdfs_m和SinkHdfs_s三個Sink,以提高寫入到Hdfs的性能猪半,具體見后面介紹兔朦。

d. 對于Store來說,Hdfs負責永久地存儲所有日志磨确;Kafka存儲最新的7天日志沽甥,并給Storm系統(tǒng)提供實時日志流;Bypass負責給其它服務器和應用提供實時日志流乏奥。

image.png

a. 模塊命名規(guī)則:所有的Source以src開頭摆舟,所有的Channel以ch開頭,所有的Sink以sink開頭英融;

b. Channel統(tǒng)一使用美團開發(fā)的DualChannel盏檐,具體原因后面詳述;對于過濾掉的日志使用NullChannel驶悟,具體原因后面詳述胡野;

c. 模塊之間內部通信統(tǒng)一使用Avro接口;

(DualChannel:基于 MemoryChannel和 FileChannel開發(fā)痕鳍。當堆積在Channel中的events數小于閾值時硫豆,所有的events被保存在MemoryChannel中,Sink從MemoryChannel中讀取數據笼呆;當堆積在Channel中的events數大于閾值時熊响,所有的events被自動存放在FileChannel中,Sink從FileChannel中讀取數據诗赌。這樣當系統(tǒng)正常運行時汗茄,我們可以使用MemoryChannel的高吞吐特性;當系統(tǒng)有異常時铭若,我們可以利用FileChannel的大緩存的特性洪碳。)

參考:(基于Flume的美團日志收集系統(tǒng)(一)架構和設計)http://www.aboutyun.com/thread-8317-1-1.html
參考:(基于Flume的美團日志收集系統(tǒng)(二)改進和優(yōu)化)http://www.aboutyun.com/thread-8318-1-1.html

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市叼屠,隨后出現的幾起案子瞳腌,更是在濱河造成了極大的恐慌,老刑警劉巖镜雨,帶你破解...
    沈念sama閱讀 211,376評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嫂侍,死亡現場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機挑宠,發(fā)現死者居然都...
    沈念sama閱讀 90,126評論 2 385
  • 文/潘曉璐 我一進店門菲盾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人痹栖,你說我怎么就攤上這事亿汞。” “怎么了揪阿?”我有些...
    開封第一講書人閱讀 156,966評論 0 347
  • 文/不壞的土叔 我叫張陵疗我,是天一觀的道長。 經常有香客問我南捂,道長吴裤,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,432評論 1 283
  • 正文 為了忘掉前任溺健,我火速辦了婚禮麦牺,結果婚禮上,老公的妹妹穿的比我還像新娘鞭缭。我一直安慰自己剖膳,他們只是感情好,可當我...
    茶點故事閱讀 65,519評論 6 385
  • 文/花漫 我一把揭開白布岭辣。 她就那樣靜靜地躺著吱晒,像睡著了一般。 火紅的嫁衣襯著肌膚如雪沦童。 梳的紋絲不亂的頭發(fā)上仑濒,一...
    開封第一講書人閱讀 49,792評論 1 290
  • 那天,我揣著相機與錄音偷遗,去河邊找鬼墩瞳。 笑死,一個胖子當著我的面吹牛氏豌,可吹牛的內容都是我干的喉酌。 我是一名探鬼主播,決...
    沈念sama閱讀 38,933評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼泵喘,長吁一口氣:“原來是場噩夢啊……” “哼瞭吃!你這毒婦竟也來了?” 一聲冷哼從身側響起涣旨,我...
    開封第一講書人閱讀 37,701評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎股冗,沒想到半個月后紧索,有當地人在樹林里發(fā)現了一具尸體裤园,經...
    沈念sama閱讀 44,143評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡但骨,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,488評論 2 327
  • 正文 我和宋清朗相戀三年莉炉,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片岖圈。...
    茶點故事閱讀 38,626評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內的尸體忽然破棺而出催束,到底是詐尸還是另有隱情,我是刑警寧澤伏社,帶...
    沈念sama閱讀 34,292評論 4 329
  • 正文 年R本政府宣布抠刺,位于F島的核電站,受9級特大地震影響摘昌,放射性物質發(fā)生泄漏速妖。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,896評論 3 313
  • 文/蒙蒙 一聪黎、第九天 我趴在偏房一處隱蔽的房頂上張望罕容。 院中可真熱鬧,春花似錦稿饰、人聲如沸锦秒。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽旅择。三九已至,卻和暖如春梧喷,著一層夾襖步出監(jiān)牢的瞬間砌左,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工铺敌, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留汇歹,地道東北人。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓偿凭,卻偏偏與公主長得像产弹,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子弯囊,可洞房花燭夜當晚...
    茶點故事閱讀 43,494評論 2 348

推薦閱讀更多精彩內容

  • 這里主要介紹幾種常見的日志的source來源痰哨,包括監(jiān)控文件型,監(jiān)控文件內容增量匾嘱,TCP和HTTP斤斧。 Spool類型...
    里仁有鄰閱讀 1,080評論 0 1
  • title: Flume構建日志采集系統(tǒng)date: 2018-02-03 19:45tags: [flume,k...
    溯水心生閱讀 16,126評論 3 25
  • 閱讀目錄(Content) 一、Flume簡介 二霎烙、Flume特點 三撬讽、Flume的一些核心概念 3.1蕊连、Agen...
    達微閱讀 4,715評論 0 9
  • 一、Flume簡介 flume 作為 cloudera 開發(fā)的實時日志收集系統(tǒng)游昼,受到了業(yè)界的認可與廣泛應用甘苍。Flu...
    superxcp閱讀 939評論 0 2
  • Flume架構與實踐 Flume是一款在線數據采集的系統(tǒng),典型的應用場景是作為數據的總線烘豌,在線的進行日志的采集载庭、分...
    mike_zhangliang閱讀 2,090評論 0 2