簡介
Flume是Cloudera提供的一個高可用的套么,高可靠的培己,分布式的海量日志采集、聚合和傳輸的系統(tǒng)胚泌,Flume支持在日志系統(tǒng)中定制各類數據發(fā)送方省咨,用于收集數據;同時玷室,Flume提供對數據進行簡單處理零蓉,并寫到各種數據接受方(可定制)的能力。
Flume 作為 cloudera 開發(fā)的實時日志收集系統(tǒng)穷缤,受到了業(yè)界的認可與廣泛應用敌蜂。Flume 初始的發(fā)行版本目前被統(tǒng)稱為 Flume OG(original generation),屬于 cloudera津肛。但隨著 FLume 功能的擴展章喉,Flume OG 代碼工程臃腫、核心組件設計不合理身坐、核心配置不標準等缺點暴露出來秸脱,尤其是在 Flume OG 的最后一個發(fā)行版本 0.94.0 中,日志傳輸不穩(wěn)定的現象尤為嚴重部蛇,為了解決這些問題摊唇,2011 年 10 月 22 號,cloudera 完成了 Flume-728涯鲁,對 Flume 進行了里程碑式的改動:重構核心組件巷查、核心配置以及代碼架構有序,重構后的版本統(tǒng)稱為 Flume NG(next generation);改動的另一原因是將 Flume 納入 apache 旗下吮便,cloudera Flume 改名為 Apache Flume笔呀。
Flume的核心概念
l Event:是Flume數據傳輸的基本單元。flume以事件的形式將數據從源頭傳送到最終的目的髓需。Event由可選的hearders和載有數據的一個byte array構成许师。
l Clinet:是一個將原始數據包裝成events并且發(fā)送它們到一個或多個agent的實體
l Agent:一個Agent包含Sources, Channels, Sinks和其他組件,它利用這些組件將events從一個節(jié)點傳輸到另一個節(jié)點或最終目的僚匆。
l Source:負責接收events或通過特殊機制產生events微渠,并將events批量的放到一個或多個Channels。
l Channel:位于Source和Sink之間咧擂,用于緩存進來的events逞盆,當Sink成功的將events發(fā)送到下一跳的channel或最終目的,events從Channel移除松申。
l Sink:負責將events傳輸到下一跳或最終目的云芦,成功完成后將events從channel移除。
其中Event是Flume數據傳輸的基本單元贸桶。flume以事件的形式將數據從源頭傳送到最終的目的舅逸。Event由可選的hearders和載有數據的一個byte array構成。
載有的數據對flume是不透明的
Headers是容納了key-value字符串對的無序集合皇筛,key在集合內是唯一的琉历。
Headers可以在上下文路由中使用擴展
Flume以agent為最小的獨立運行單位。一個agent就是一個JVM水醋。單agent由Source旗笔、Sink和Channel三大組件構成
Flume的數據流由事件(Event)貫穿始終。事件是Flume的基本數據單位拄踪,它攜帶日志數據(字節(jié)數組形式)并且攜帶有頭信息蝇恶,這些Event由Agent外部的Source,比如上圖中的Web Server生成宫蛆。當Source捕獲事件后會進行特定的格式化艘包,然后Source會把事件推入(單個或多個)Channel中。你可以把Channel看作是一個緩沖區(qū)耀盗,它將保存事件直到Sink處理完該事件想虎。Sink負責持久化日志或者把事件推向另一個Source。
Flume的數據流細化一下則是首先Source捕捉到外部進來的Events叛拷,然后把Events提交給ChannelProcessor舌厨,ChannelProcessor先走一遍Interceptor(進行一些過濾處理),然后通過ChannelSelector引用對象獲得Channel列表忿薇,使用事務方式把Events提交到Channel裙椭,因此Source的Events提交到Channel實際上是在ChannelProcessor中進行的躏哩;而Sink則通過SinkProcessor去Channel中獲得Events并消費Events,整個過程就是一個生產者消費者模式揉燃。
高可靠傳輸實現原理
Flume使用事務的辦法來保證event的可靠傳遞扫尺。Source和Sink分別被封裝在事務中,這些事務由保存event的存儲提供或者由 Channel提供炊汤。這就保證了event在數據流的點對點傳輸中是可靠的正驻。在多級數據流中,如下圖抢腐,上一級的Sink和下一級的Source都被包含在事務中姑曙,保證數據可靠地從一個Channel到另一個Channel轉移
其次,數據流中 Channel的持久性迈倍。Flume中MemoryChannel是可能丟失數據的(當Agent死掉時)伤靠,而FileChannel是持久性的,提供類似mysql的日志機制啼染,保證數據不丟失宴合。
當一個正常的Flow運行時,每個Agent中的Channel中的Events數量是均衡(消費速度大于生產速度的情況)迹鹅,而一旦Agent直接出現故障形纺,那么Channel就會暫時持有Events,直到故障恢復(MemoryChannel可能會丟失Events)徒欣。
Flume啟動分析
Flume的Agent啟動是從Application的main函數開始的,首先把自己的實例注冊到了EventBus蜗字,然后通過LifecycleAware模式(類似Tomcat的開始結束模式)打肝,創(chuàng)建PollingPropertiesFileConfigurationProvider對象并執(zhí)行start()函數,在start()函數中挪捕,通過線程池啟動了一個FileWatcherRunnable任務去不斷的檢查啟動文件是否修改粗梭,第一次或者發(fā)現文件修改了的時候就去讀取配置文件,并通過EvenBus的post()方法響應讀取結果级零,而Application的主線程因為注冊過EventBus断医,handleConfigurationEvent()函數獲得post()事件消息后就會執(zhí)行先stopAllComponents(),然后startAllComponents(conf)奏纪,當執(zhí)行startAllComponents函數的時候鉴嗤,就會啟動channel、sink和source這三個核心組件序调,注意啟動順序是先channel醉锅,后sink,最后source发绢,這次才不會有消息丟失問題發(fā)生硬耍。整個啟動過程如圖所示垄琐。
簡化來說:
第一步:Application主線程啟動,通過PollingPropertiesFileConfigurationProvider獲取source经柴、sink狸窘、channel等配置信息
第二步:PollingPropertiesFileConfigurationProvider把配置信息通過事件總線廣播給Application主線程
第三步:Application主線程重新啟動channel、sink和source
Source分析
Source的繼承關系類圖如圖所示坯认,所有source均實現自source接口翻擒,接口方法只有兩個:setChannelProcessor()和getChannelProcessor(),所以所source具體的業(yè)務實現(比如把Events發(fā)送給Channel以及過程中的事務實現都是在ChannelProcessor中實現的)鹃操。
source又分為兩種類型:EventDrivenSource和PollableSource韭寸,PollableSource主要用于接收外部驅動程序的Events,比如來自Kafka的消息等荆隘,而其他source基本都是實現于EventDrivenSource恩伺,這種source不需要外部的驅動程序pollEvents,而是有自己的事件監(jiān)控獲得Events椰拒,比如SpoolDirectorySource晶渠,它可以從磁盤中某個文件獲取文件更新數據。
以SpoolDirectorySource為例燃观,其創(chuàng)建啟動過程如下:
第一步:Application主線程啟動的時候褒脯,通過AbstractConfigurationProvider(前面提到的PollingPropertiesFileConfigurationProvider的父類)獲取配置信息
第二步:當判斷配置為SpoolDirectorySource時,則通過SourceFactory實例化一個SpoolDirectorySource
第三步:AbstractConfigurationProvider調用實例化的SpoolDirectorySource對象的configure()進行初始化配置
第四步:Application主線程通過SpoolDirectoryRunnable(SpoolDirectorySource的內部類)啟動source并且500毫秒執(zhí)行一次
如下圖所示:
注:Application.startAllComponents()啟動source的時候其實最先啟動的是SourceRunner缆毁,Flume有兩類SourceRunner:EventDrivenSourceRunner和PollableSourceRunner番川,EventDrivenSourceRunner再啟動具體類型的source。
SpoolDirectorySource實例的每次執(zhí)行脊框,則會讀取具體目錄下的文件颁督,生成Event數據,通過ChannelProcessor把Event放入Channel浇雹,同時對文件的讀取位置進行標記沉御,下次則從標記位置進行讀取。如下圖所示昭灵。
Source提交Channel事務處理
之前也有說明吠裆,source是通過channelProcessor來提交Events的,如下圖ChannelProcessor.class所示烂完,channelprocessor先獲得一個事務试疙,然后開啟事務,之后才進行event的提交操作窜护,最后提交事務效斑,如果中間出現異常則進行事務回滾。(finally中還有一個事務關閉)
ChannelProcessor.class
FileChannel內部類FileBackedTransaction處理put代碼參考
FileChannel處理Event的時序
Channel有兩種:MemoryChannel和FileChannel,這里以FileChannel為例缓屠,其調用時序如下圖所示:
BasicTransactionSemantics.put()->BasicTransactionSemantics.put()->FileBackedTransaction.doPut()奇昙,在FileBackedTransaction執(zhí)行doPut()操作的時候執(zhí)行了兩步操作:
1、 調用Log.put()敌完,把Event寫入實體文件
2储耐、 調用FlumeEventQueue.addWithoutCommit(),把Event寫入隊列以便sink獲取
Sink分析
Sink從Channel消費Event滨溉,然后進行轉移到收集/聚合層或存儲層什湘,它的啟動過程和source類似是從Application的main主線程開始的,通過AbstractConfigurationProvider獲取配置信息晦攒,然后通過SinkFactory實例化具體Sink闽撤,然后調用sink實例的configure進行實例的初始化配置,最后通過SinkRunner啟動Sink實例脯颜。
和Source不同的是SinkRunner不直接啟動Sink實例哟旗,而是通過SinkProcessor異步啟動的。
SinkProcessor主要有三種:
l DefaultSinkProcessor:默認實現栋操,用于單個Sink的場景使用
l FailoverSinkProcessor:故障轉移實現
l LoadBalanceSinkProcessor:用于實現Sink的負載均衡
其類的繼承關系如圖所示:
多個Sink可以構成一個SinkGroup闸餐。一個Sink Processor負責從一個指定的Sink Group中激活一個Sink。Sink Processor可以通過組中所有Sink實現負載均衡矾芙;也可以在一個Sink失敗時轉移到另一個舍沙。
? Flume通過Sink Processor實現負載均衡(Load Balancing)和故障轉移(failover)
? 內建的SinkProcessors:
? Load Balancing Sink Processor – 使用RANDOM, ROUND_ROBIN或定制的選擇算法
? Failover Sink Processor
? Default Sink Processor(單Sink)
? 所有的Sink都是采取輪詢(polling)的方式從Channel上獲取events。這個動作是通過SinkRunner激活的
? Sink Processor充當Sink的一個代理
Sink Group
Groups配置可以實現sink的負載均衡和失敗重試機制
? 負載均衡配置示例
a1.sinkgroups =g1
a1.sinkgroups.g1.sinks= k1 k2
a1.sinkgroups.g1.processor.type= load_balance
a1.sinkgroups.g1.processor.backoff= true
a1.sinkgroups.g1.processor.selector= random
? 失敗重試配置示例
a1.sinkgroups =g1
a1.sinkgroups.g1.sinks= k1 k2
a1.sinkgroups.g1.processor.type= failover
a1.sinkgroups.g1.processor.priority.k1= 5
a1.sinkgroups.g1.processor.priority.k2= 10
a1.sinkgroups.g1.processor.maxpenalty= 10000
Sink事務實現
以HBaseSink.class為例剔宪,首先從channel獲取一個事務拂铡,然后事務開啟后進行take操作,即從channel獲取Event葱绒,然后對Event進行消費處理(putEventsAndCommit即提交事務)和媳,處理完成后關閉事務。
引用美團的使用場景
參考自:http://www.aboutyun.com/thread-8317-1-1.html
a. 整個系統(tǒng)分為三層:Agent層哈街,Collector層和Store層。其中Agent層每個機器部署一個進程拒迅,負責對單機的日志收集工作骚秦;Collector層部署在中心服務器上,負責接收Agent層發(fā)送的日志璧微,并且將日志根據路由規(guī)則寫到相應的Store層中作箍;Store層負責提供永久或者臨時的日志存儲服務,或者將日志流導向其它服務器前硫。
b. Agent到Collector使用LoadBalance策略胞得,將所有的日志均衡地發(fā)到所有的Collector上,達到負載均衡的目標屹电,同時并處理單個Collector失效的問題阶剑。
c. Collector層的目標主要有三個:SinkHdfs,SinkKafka和SinkBypass跃巡。分別提供離線的數據到Hdfs,和提供實時的日志流到Kafka和Bypass牧愁。其中SinkHdfs又根據日志量的大小分為SinkHdfs_b素邪,SinkHdfs_m和SinkHdfs_s三個Sink,以提高寫入到Hdfs的性能猪半,具體見后面介紹兔朦。
d. 對于Store來說,Hdfs負責永久地存儲所有日志磨确;Kafka存儲最新的7天日志沽甥,并給Storm系統(tǒng)提供實時日志流;Bypass負責給其它服務器和應用提供實時日志流乏奥。
a. 模塊命名規(guī)則:所有的Source以src開頭摆舟,所有的Channel以ch開頭,所有的Sink以sink開頭英融;
b. Channel統(tǒng)一使用美團開發(fā)的DualChannel盏檐,具體原因后面詳述;對于過濾掉的日志使用NullChannel驶悟,具體原因后面詳述胡野;
c. 模塊之間內部通信統(tǒng)一使用Avro接口;
(DualChannel:基于 MemoryChannel和 FileChannel開發(fā)痕鳍。當堆積在Channel中的events數小于閾值時硫豆,所有的events被保存在MemoryChannel中,Sink從MemoryChannel中讀取數據笼呆;當堆積在Channel中的events數大于閾值時熊响,所有的events被自動存放在FileChannel中,Sink從FileChannel中讀取數據诗赌。這樣當系統(tǒng)正常運行時汗茄,我們可以使用MemoryChannel的高吞吐特性;當系統(tǒng)有異常時铭若,我們可以利用FileChannel的大緩存的特性洪碳。)
參考:(基于Flume的美團日志收集系統(tǒng)(一)架構和設計)http://www.aboutyun.com/thread-8317-1-1.html
參考:(基于Flume的美團日志收集系統(tǒng)(二)改進和優(yōu)化)http://www.aboutyun.com/thread-8318-1-1.html