?Flume 是Cloudera提供的一個高可用的呵晨,高可靠的诡右,分布式的海量日志采集稠项、聚合和傳輸?shù)南到y(tǒng), Flume支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方撑柔,用于收集數(shù)據(jù)瘸爽;同時,F(xiàn)lume提供對數(shù)據(jù)進行簡單處理铅忿,并寫到各種數(shù)據(jù)接受方(可定制)的能力剪决。(Flume使用java編寫,支持Java1.6及以上檀训。)
由原來的Flume OG到現(xiàn)在的Flume NG柑潦,進行了架構(gòu)重構(gòu),并且現(xiàn)在NG版本完全不兼容原來的OG版本峻凫。經(jīng)過架構(gòu)重構(gòu)后渗鬼,F(xiàn)lume NG更像是一個輕量的小工具,非常簡單荧琼,容易適應(yīng)各種方式日志收集譬胎,并支持failover和負(fù)載均衡肛循。
1.主要的核心概念:
Event:flume最基本的數(shù)據(jù)單元,帶有一個可選的消息頭(headers)银择。如果是文本,event通常是一行記錄累舷,event也是事務(wù)的基本單位浩考。
Flow:Event從源點到達目的點的遷移的抽象。
Client:操作位于源點處的Event被盈,將其發(fā)送到Flume Agent析孽。
Agent:一個獨立的Flume進程,包含組件Source只怎、Channel袜瞬、Sink。
Source:用來消費傳遞到該組件的Event身堡,完成對數(shù)據(jù)的收集邓尤,分成transtion和event打入到channel之中。不同的source贴谎,可以接受不同的數(shù)據(jù)格式汞扎。
Channel:主要提供一個隊列的功能,對source提供中的數(shù)據(jù)進行簡單緩存擅这,作用是保證source到sink的數(shù)據(jù)傳輸過程一定能成功澈魄。
Sink:取出Channel中的數(shù)據(jù),進行相應(yīng)的存儲文件系統(tǒng)仲翎、數(shù)據(jù)庫等痹扇。
Flume邏輯上分三層架構(gòu):agent,collector溯香,storage鲫构。
agent?用于采集數(shù)據(jù),agent是flume中產(chǎn)生數(shù)據(jù)流的地方玫坛,同時芬迄,agent會將產(chǎn)生的數(shù)據(jù)流傳輸?shù)絚ollector。
collector?的作用是將多個agent的數(shù)據(jù)匯總后昂秃,加載到storage中禀梳。
storage?是存儲系統(tǒng),可以是一個普通file肠骆,也可以是HDFS算途,HIVE,HBase等蚀腿。
source:client端操作消費數(shù)據(jù)的來源嘴瓤,支持的類型有Avro扫外、log4j、tailDir廓脆、http post筛谚、Thrift、JMS停忿、Spooling Directory等類型驾讲。
? ? ? ?對原程序影響最小的方式是直接讀取程序原來記錄的日志文件,基本可以實現(xiàn)無縫接入席赂,不需要對現(xiàn)有程序進行任何改動吮铭。對于直接讀取文件 Source,有兩種方式:
ExecSource: 以運行 Linux 命令的方式,持續(xù)的輸出最新的數(shù)據(jù)颅停,如tail -F文件名指令谓晌,在這種方式下,取的文件名必須是指定的癞揉。 ExecSource 可以實現(xiàn)對日志的實時收集纸肉,但是存在Flume不運行、指令執(zhí)行出錯或者channel爆倉喊熟,將導(dǎo)致event傳送失敗毁靶,無法保證日志數(shù)據(jù)的完整性。
SpoolSource: 監(jiān)測配置的目錄下新增的文件逊移,并將文件中的數(shù)據(jù)讀取出來预吆。需要注意兩點:拷貝到 spool 目錄下的文件不可以再打開編輯;spool 目錄下不可包含相應(yīng)的子目錄胳泉。
SpoolSource雖然無法實現(xiàn)實時的收集數(shù)據(jù)拐叉,但是可以使用以分鐘的方式分割文件,趨近于實時扇商。
? ? 使用SpoolingDirectorySource的時候凤瘦,一定要避免同時讀寫一個文件的情況。 可以通過 source1.ignorePattern=^(.)*\\.tmp$ 配置案铺,讓spoolingsource不讀取該格式的文件蔬芥。
?RPC:?RPC 的主要功能目標(biāo)是讓構(gòu)建分布式計算(應(yīng)用)更容易,在提供強大的遠程調(diào)用能力時不損失本地調(diào)用的語義簡潔性控汉。
? ? ? ? 比如服務(wù)器A笔诵、B,一個應(yīng)用部署在A服務(wù)器上姑子,相應(yīng)調(diào)用B服務(wù)器上應(yīng)用提供的函數(shù)\方法乎婿,由于不在一個內(nèi)存空間,不能直接調(diào)用街佑,需要通過網(wǎng)絡(luò)來表達調(diào)用的語義和傳達調(diào)用的數(shù)據(jù)谢翎。1.首先解決通訊問題捍靠,建立TCP連接,解決尋址問題森逮。通過序列化利用二進制傳輸榨婆,收到數(shù)據(jù)后反序列化,恢復(fù)內(nèi)存中的表達方式褒侧,找到對應(yīng)方法后執(zhí)行得到返回值良风。
? ? Avro: ?是一個基于二進制數(shù)據(jù)傳輸?shù)母咝阅艿闹虚g件,avro可以將數(shù)據(jù)結(jié)構(gòu)或?qū)ο筠D(zhuǎn)化為便于存儲或傳輸?shù)母袷搅眩m合遠程或本地大規(guī)模的存儲和交。Avro支持兩種序列化編碼方式:二進制編碼和JSON編碼鳞上。使用二進制編碼會高效序列化这吻,并且序列化后得到的結(jié)果會比較小篙议;而JSON一般用于調(diào)試系統(tǒng)或是基于WEB的應(yīng)用唾糯。Avro也被作為一種RPC框架來使用。
Thrift:?Thrift是一個跨語言的服務(wù)部署框架鬼贱,最初由Facebook于2007年開發(fā)移怯,2008年進入Apache開源項目。Thrift通過IDL(Interface Definition Language这难,接口定義語言)來定義RPC(Remote Procedure Call舟误,遠程過程調(diào)用)的接口和數(shù)據(jù)類型,然后通過thrift編譯器生成不同語言的代碼(目前支持C++,Java, Python, PHP, Ruby, Erlang, Perl,Haskell, C#, Cocoa, Smalltalk和OCaml)姻乓,并由生成的代碼負(fù)責(zé)RPC協(xié)議層和傳輸層的實現(xiàn)嵌溢。
?Netcat: NetCat是一個非常簡單的Unix工具,可以讀蹋岩、寫TCP或UDP網(wǎng)絡(luò)連接(network connection)赖草。最簡單的使用方法,”nc host port”剪个,能建立一個TCP連接秧骑,連向指定的主機和端口。接下來扣囊,你的從標(biāo)準(zhǔn)輸入中輸入的任何內(nèi)容都會被發(fā)送到指定的主機乎折,任何通過連接返回來的信息都被顯示在你的標(biāo)準(zhǔn)輸出上。這個連接會一直持續(xù)下去侵歇,至到連接兩端的程序關(guān)閉連接笆檀。
channel:??有MemoryChannel, JDBC Channel, File Channel, Kafka Channel. 比較常用的是前三種。有capacity盒至、transactionCapacity酗洒、keep-alive等屬性士修。
MemoryChannel可以實現(xiàn)高速的吞吐,但是無法保證數(shù)據(jù)的完整性樱衷。
FileChannel保證數(shù)據(jù)的完整性與一致性棋嘲。在具體配置FileChannel時,建議FileChannel設(shè)置的目錄和程序日志文件保存的目錄設(shè)成不同的磁盤矩桂,以便提高效率沸移。
Sink:支持的數(shù)據(jù)類型:HDFS Sink,Logger Sink, Kafka Sink,Avro Sink, Thrift, IPC, File Roll 等侄榴。
Sink在設(shè)置存儲數(shù)據(jù)時雹锣,可以向文件系統(tǒng)、數(shù)據(jù)庫癞蚕、hadoop存數(shù)據(jù)蕊爵,在日志數(shù)據(jù)較少時,可以將數(shù)據(jù)存儲在文件系中桦山,并且設(shè)定一定的時間間隔保存數(shù)據(jù)攒射。在日志數(shù)據(jù)較多時,可以將相應(yīng)的日志數(shù)據(jù)存儲到Hadoop中恒水,便于日后進行相應(yīng)的數(shù)據(jù)分析会放。
多個 agent 順序連接:
可以將多個Agent順序連接起來,將最初的數(shù)據(jù)源經(jīng)過收集钉凌,存儲到最終的存儲系統(tǒng)中咧最。這是最簡單的情況,一般情況下御雕,應(yīng)該控制這種順序連接的Agent的數(shù)量窗市,因為數(shù)據(jù)流經(jīng)的路徑變長了,如果不考慮failover的話饮笛,出現(xiàn)故障將影響整個Flow上的Agent收集服務(wù)咨察。
多個Agent的數(shù)據(jù)匯聚到同一個Agent:
? ? ? ?這種情況應(yīng)用的場景比較多,比如要收集Web網(wǎng)站的用戶行為日志福青,Web網(wǎng)站為了可用性使用的負(fù)載均衡的集群模式摄狱,每個節(jié)點都產(chǎn)生用戶行為日志,可以為每個節(jié)點都配置一個Agent來單獨收集日志數(shù)據(jù)无午,然后多個Agent將數(shù)據(jù)最終匯聚到一個agent上用來存儲數(shù)據(jù)存儲系統(tǒng)媒役,如HDFS上。
多路(Multiplexing)Agent
這種模式宪迟,有兩種方式酣衷,一種是用來復(fù)制(Replication),另一種是用來分流(Multiplexing)次泽。Replication方式穿仪,可以將最前端的數(shù)據(jù)源復(fù)制多份席爽,分別傳遞到多個channel中,每個channel接收到的數(shù)據(jù)都是相同的啊片。Multiplexing方式只锻,selector可以根據(jù)header的值來確定數(shù)據(jù)傳遞到哪一個channel。
實現(xiàn)load balance功能
? ? ? ? Load balancing Sink Processor能夠?qū)崿F(xiàn)load balance功能紫谷,上圖Agent1是一個路由節(jié)點齐饮,負(fù)責(zé)將Channel暫存的Event均衡到對應(yīng)的多個Sink組件上,而每個Sink組件分別連接到一個獨立的Agent上笤昨。
實現(xiàn)failover能
? ? ? ?Failover Sink Processor能夠?qū)崿F(xiàn)failover功能祖驱,具體流程類似load balance(可參考load balance圖),但是內(nèi)部處理機制與load balance完全不同:Failover Sink Processor維護一個優(yōu)先級Sink組件列表瞒窒,只要有一個Sink組件可用捺僻,Event就被傳遞到下一個組件。如果一個Sink能夠成功處理Event根竿,則會加入到一個Pool中陵像,否則會被移出Pool并計算失敗次數(shù)就珠,設(shè)置一個懲罰因子寇壳。
? ? ? 代碼實例:
?
啟動agent的shell操作:
??? flume-ng?agent -n a1? -c? ../conf?-f? ../conf/example.file
??? -Dflume.root.logger=DEBUG,console?
參數(shù)說明: ?-n 指定agent名稱(與配置文件中代理的名字相同)
-c 指定flume中配置文件的目錄?????????? -f 指定配置文件
-Dflume.root.logger=DEBUG,console 設(shè)置日志等級
例:flume-ng agent -n a1 -c/usr/local/flume/conf -f /usr/local/flume/conf/avro.conf
?-Dflume.root.logger=INFO,console
Kafka、Flume區(qū)別:都可以實現(xiàn)數(shù)據(jù)傳輸妻怎,但側(cè)重點不同壳炎。
Kafka追求的是高吞吐量、高負(fù)載(topic下可以有多個partition)
Flume追求的是數(shù)據(jù)的多樣性:數(shù)據(jù)來源的多樣性逼侦、數(shù)據(jù)流向的多樣性匿辩。
如果數(shù)據(jù)來源很單一、想要高吞吐的話可以使用Kafka榛丢;如果數(shù)據(jù)來源很多铲球、數(shù)據(jù)流向很多的話可以使用Flume;也可以將Kafka和Flume結(jié)合起來使用晰赞。
關(guān)于flume和kafka的集成
Flume的kafka Sink目前只支持Kafka 0.9及以上的版本稼病,目前kafka最新版本是1.0
Flume的source、channel掖鱼、sink均支持kafka接口然走。
參考:Flume官方文檔