Flume基礎(chǔ)案例

核心概念

  • Agent:使用JVM 運(yùn)行Flume矩桂。每臺(tái)機(jī)器運(yùn)行一個(gè)agent晨川,但是可以在一個(gè)agent中包含多個(gè)sources和sinks返十。
    Client:生產(chǎn)數(shù)據(jù)策肝,運(yùn)行在一個(gè)獨(dú)立的線程肛捍。
  • Source:從Client專(zhuān)門(mén)用來(lái)收集數(shù)據(jù)隐绵,傳遞給Channel,可以處理各種類(lèi)型拙毫、各種格式的日志數(shù)據(jù),包括avro依许、thrift、exec缀蹄、jms峭跳、spooling directory、netcat缺前、sequence generator蛀醉、syslog、http衅码、legacy拯刁、自定義。
  • Sink:從Channel收集數(shù)據(jù)逝段,運(yùn)行在一個(gè)獨(dú)立線程,sink組件是用于把數(shù)據(jù)發(fā)送到目的地的組件垛玻,目的地包括hdfs、logger奶躯、avro帚桩、thrift、ipc嘹黔、file账嚎、null、Hbase参淹、solr醉锄、自定義。
  • Channel:連接 sources 和 sinks 浙值,這個(gè)有點(diǎn)像一個(gè)隊(duì)列,source組件把數(shù)據(jù)收集來(lái)以后恳不,臨時(shí)存放在channel中,即channel組件在agent中是專(zhuān)門(mén)用來(lái)存放臨時(shí)數(shù)據(jù)的——對(duì)采集到的數(shù)據(jù)進(jìn)行簡(jiǎn)單的緩存开呐,可以存放在memory烟勋、jdbc、file等等筐付。
  • Events:可以是日志記錄卵惦、 avro 對(duì)象等。

Agent的概念

Flume以agent為最小的獨(dú)立運(yùn)行單位瓦戚。一個(gè)agent就是一個(gè)JVM,agent本身是一個(gè)Java進(jìn)程沮尿,運(yùn)行在日志收集節(jié)點(diǎn)—所謂日志收集節(jié)點(diǎn)就是服務(wù)器節(jié)點(diǎn)。

單agent由Source、Sink和Channel三大組件構(gòu)成畜疾,類(lèi)似生產(chǎn)者赴邻、倉(cāng)庫(kù)、消費(fèi)者的架構(gòu).如下圖:

[站外圖片上傳中...(image-64c038-1541939331883)]

a single node flume

NetCat Source:監(jiān)聽(tīng)一個(gè)指定的網(wǎng)絡(luò)端口啡捶,即只要應(yīng)用程序向這個(gè)端口里面寫(xiě)數(shù)據(jù)姥敛,這個(gè)source組件就可以獲取到信息。

在/home/hadoop/script/flume下新建配置文件a-single-node.conf瞎暑,配置文件如下:

#a-single-node.conf : A single node flume configuration


# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

保存之后運(yùn)行彤敛,執(zhí)行命令:

flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/a-single-node.conf \
-Dflume.root.logger=INFO,console

參數(shù)說(shuō)明:

-n 指定agent名稱(chēng)(與配置文件中代理的名字相同)

-c 指定flume中配置文件的目錄

-f 指定配置文件

-Dflume.root.logger=DEBUG,console 設(shè)置日志等級(jí)

通過(guò)telnet監(jiān)聽(tīng)端口:

telnet localhost 44444

輸入任意數(shù)據(jù),在flume中可以看到輸出:

18/08/02 15:25:29 INFO sink.LoggerSink: Event: { headers:{} body: 61 62 63 0D                                     abc. }

采集指定文件數(shù)據(jù)存入到hdfs

source-channel-sink :exec - memory - hdfs

配置文件如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
# 監(jiān)聽(tīng)文件路徑
a1.sources.r1.command = tail -F /home/hadoop/data/flume/logs/access.log


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Describe the sink
a1.sinks.k1.type = hdfs
# hdfs路徑
a1.sinks.k1.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/tail
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.batchSize=10
a1.sinks.k1.hdfs.useLocalTimeStamp=true

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

缺少這個(gè)配置的時(shí)候

a1.sinks.k1.hdfs.useLocalTimeStamp=true

會(huì)出現(xiàn)異常

java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null

原因是因?yàn)閷?xiě)入到hfds時(shí)使用到了時(shí)間戳來(lái)區(qū)分目錄結(jié)構(gòu)了赌,flume的消息組件event在接受到之后在header中沒(méi)有發(fā)現(xiàn)時(shí)間戳參數(shù)墨榄,導(dǎo)致該錯(cuò)誤發(fā)生,有三種方法可以解決這個(gè)錯(cuò)誤:

  1. agent1.sources.source1.interceptors = t1
    agent1.sources.source1.interceptors.t1.type = timestamp 為source添加攔截勿她,每條event頭中加入時(shí)間戳渠概;(效率會(huì)慢一些)
  2. agent1.sinks.sink1.hdfs.useLocalTimeStamp = true 為sink指定該參數(shù)為true (如果客戶端和flume集群時(shí)間不一致數(shù)據(jù)時(shí)間會(huì)不準(zhǔn)確)
  3. 在向source發(fā)送event時(shí),將時(shí)間戳參數(shù)添加到event的header中即可嫂拴,header是一個(gè)map播揪,添加時(shí)mapkey為timestamp(推薦使用)

采集指定文件夾的內(nèi)容到控制臺(tái)

source - channel - sink :spooling - memory - logger
目錄下的文件如果已經(jīng)讀取完畢會(huì)增加后綴.COMPELETE,且文件名不能相同
配置文件如下:


# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir=/home/hadoop/temp
a1.sources.r1.fileHeader=true

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Describe the sink
a1.sinks.k1.type = logger

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

小文件問(wèn)題

案例:采集指定文件夾內(nèi)容到hdfs taildir - memory - hdfs
配置文件如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/hadoop/temp/position/taildir_position.json
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/home/hadoop/temp/flume/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2=/home/hadoop/temp/flume/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/manager
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.hdfs.batchSize=15
#關(guān)鍵參數(shù)  三個(gè)是或的關(guān)系 滿足一個(gè)就會(huì)roll
a1.sinks.k1.hdfs.rollInterval= 0  #按時(shí)間 0為參數(shù)不生效
a1.sinks.k1.hdfs.rollSize= 500    #按大小 0為參數(shù)不生效
a1.sinks.k1.hdfs.rollCount = 0    #按記錄數(shù) 0為參數(shù)不生效

a1.sinks.k1.hdfs.useLocalTimeStamp=true

# Bind the source and sink to the channel
a1.sources.r1.channels = 

多個(gè)channel

image

一個(gè)channel對(duì)應(yīng)輸出到日志的sink,另外一個(gè)對(duì)應(yīng)寫(xiě)入到Hdfs的sink
配置文件如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/hadoop/temp/position/taildir_position.json
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/home/hadoop/temp/flume/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2=/home/hadoop/temp/flume/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true


# Use a channel which buffers events in memory
a1.channels.c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


a1.channels.c2.type=memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Describe the sink
a1.sinks.k1.type = logger


a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/manager
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=TEXT
a1.sinks.k2.hdfs.hdfs.batchSize=15
a1.sinks.k2.hdfs.rollInterval= 0  
a1.sinks.k2.hdfs.rollSize= 0  
a1.sinks.k2.hdfs.rollCount = 100 

# Bind the source and sink to the channel
a1.sources.r1.channels =c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

sink processor

主要包含兩種方式:failover和load_balance

  • failover:Failover Sink Processor維護(hù)了一個(gè)sink的優(yōu)先級(jí)列表,具有故障轉(zhuǎn)移的功能筒狠,每個(gè)sink都有一個(gè)權(quán)值用于表示自己的優(yōu)先級(jí)猪狈,優(yōu)先級(jí)值高Sink會(huì)更早被激活。值越大辩恼,優(yōu)先級(jí)越高雇庙。表示優(yōu)先級(jí)的權(quán)值不能相同。
  • load_balance:按照一定的算法選擇sink輸出到指定地方灶伊,如果在文件輸出量很大的情況下疆前,負(fù)載均衡還是很有必要的,通過(guò)多個(gè)通道輸出緩解輸出壓力,flume內(nèi)置的負(fù)載均衡的算法默認(rèn)是round robin(輪詢(xún)算法),還有一個(gè)random(隨機(jī)算法)聘萨。

failover 配置如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10 
a1.sinkgroups.g1.processor.maxpenalty = 10000

a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1


a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop002:9001/user/hadoop/flume/manager
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=TEXT
a1.sinks.k2.hdfs.hdfs.batchSize=15
a1.sinks.k2.hdfs.rollInterval= 0  
a1.sinks.k2.hdfs.rollSize= 0  
a1.sinks.k2.hdfs.rollCount = 10
a1.sinks.k2.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1

load_balance配置如下(更改負(fù)載均衡策略進(jìn)行測(cè)試):

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random


a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1


a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/manager
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=TEXT
a1.sinks.k2.hdfs.hdfs.batchSize=15
a1.sinks.k2.hdfs.rollInterval= 0
a1.sinks.k2.hdfs.rollSize= 0
a1.sinks.k2.hdfs.rollCount = 10
a1.sinks.k2.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1

avro source和avro sink

該案例需要用到兩個(gè)agent竹椒,一個(gè)作為數(shù)據(jù)源:產(chǎn)生數(shù)據(jù),一個(gè)作為數(shù)據(jù)接收端:接收數(shù)據(jù)

數(shù)據(jù)源agent配置如下:

# Name the components on this agent
avro-source-agent.sources = exec-source
avro-source-agent.sinks = avro-sink
avro-source-agent.channels = avro-memory-channel

# Describe/configure the source

avro-source-agent.sources.exec-source.type = exec
avro-source-agent.sources.exec-source.command = tail -F /home/hadoop/data/flume/logs/access.log


# Use a channel which buffers events in memory
avro-source-agent.channels.avro-memory-channel.type = memory
avro-source-agent.channels.avro-memory-channel.capacity = 1000
avro-source-agent.channels.avro-memory-channel.transactionCapacity = 100


avro-source-agent.sinks.avro-sink.type=avro
avro-source-agent.sinks.avro-sink.hostname=hadoop002
avro-source-agent.sinks.avro-sink.port=44444

# Bind the source and sink to the channel
avro-source-agent.sources.exec-source.channels = avro-memory-channel
avro-source-agent.sinks.avro-sink.channel = avro-memory-channel

數(shù)據(jù)接收端配置如下:

# Name the components on this agent
avro-sink-agent.sources = avro-source
avro-sink-agent.sinks = avro-logger
avro-sink-agent.channels = avro-memory-channel

# Describe/configure the source
avro-sink-agent.sources.avro-source.type = avro
avro-sink-agent.sources.avro-source.bind = hadoop002
avro-sink-agent.sources.avro-source.port = 44444


# Use a channel which buffers events in memory
avro-sink-agent.channels.avro-memory-channel.type = memory
avro-sink-agent.channels.avro-memory-channel.capacity = 1000
avro-sink-agent.channels.avro-memory-channel.transactionCapacity = 100


avro-sink-agent.sinks.avro-logger.type=logger

# Bind the source and sink to the channel
avro-sink-agent.sources.avro-source.channels = avro-memory-channel
avro-sink-agent.sinks.avro-logger.channel = avro-memory-channel

依次啟動(dòng)avro-sink-agent,和avro-source-agent

flume-ng agent \
--name avro-sink-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/avro-sink.conf \
-Dflume.root.logger=INFO,console 


flume-ng agent \
--name avro-source-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/avro-source.conf \
-Dflume.root.logger=INFO,console
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末米辐,一起剝皮案震驚了整個(gè)濱河市胸完,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌翘贮,老刑警劉巖赊窥,帶你破解...
    沈念sama閱讀 218,858評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異狸页,居然都是意外死亡锨能,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)址遇,“玉大人叔收,你說(shuō)我怎么就攤上這事“亮ィ” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,282評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵窃页,是天一觀的道長(zhǎng)跺株。 經(jīng)常有香客問(wèn)我,道長(zhǎng)脖卖,這世上最難降的妖魔是什么乒省? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,842評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮畦木,結(jié)果婚禮上袖扛,老公的妹妹穿的比我還像新娘。我一直安慰自己十籍,他們只是感情好蛆封,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著勾栗,像睡著了一般惨篱。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上围俘,一...
    開(kāi)封第一講書(shū)人閱讀 51,679評(píng)論 1 305
  • 那天砸讳,我揣著相機(jī)與錄音,去河邊找鬼界牡。 笑死簿寂,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的宿亡。 我是一名探鬼主播常遂,決...
    沈念sama閱讀 40,406評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼挽荠!你這毒婦竟也來(lái)了烈钞?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,311評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤坤按,失蹤者是張志新(化名)和其女友劉穎毯欣,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體臭脓,經(jīng)...
    沈念sama閱讀 45,767評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡酗钞,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片砚作。...
    茶點(diǎn)故事閱讀 40,090評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡窘奏,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出葫录,到底是詐尸還是另有隱情着裹,我是刑警寧澤,帶...
    沈念sama閱讀 35,785評(píng)論 5 346
  • 正文 年R本政府宣布米同,位于F島的核電站骇扇,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏面粮。R本人自食惡果不足惜少孝,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望熬苍。 院中可真熱鬧稍走,春花似錦、人聲如沸柴底。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,988評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)柄驻。三九已至盖淡,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間凿歼,已是汗流浹背褪迟。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,101評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留答憔,地道東北人味赃。 一個(gè)月前我還...
    沈念sama閱讀 48,298評(píng)論 3 372
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像虐拓,于是被迫代替她去往敵國(guó)和親心俗。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 博客原文 翻譯作品蓉驹,水平有限城榛,如有錯(cuò)誤,煩請(qǐng)留言指正态兴。原文請(qǐng)見(jiàn) 官網(wǎng)英文文檔 引言 概述 Apache Flume...
    rabbitGYK閱讀 11,469評(píng)論 13 34
  • 介紹 概述 Apache Flume是為有效收集聚合和移動(dòng)大量來(lái)自不同源到中心數(shù)據(jù)存儲(chǔ)而設(shè)計(jì)的可分布狠持,可靠的,可用...
    ximengchj閱讀 3,525評(píng)論 0 13
  • Flume的官網(wǎng)地址:http://flume.apache.org/FlumeUserGuide.html#ex...
    24格的世界閱讀 906評(píng)論 0 1
  • flume 有三大組件source 甜刻、channel和sink,各個(gè)組件之間都可以相互組合使用正勒,各組件間耦合度低得院。...
    三萬(wàn)_chenbing閱讀 5,780評(píng)論 0 5
  • 題目1: 輪播的實(shí)現(xiàn)原理是怎樣的?如果讓你來(lái)實(shí)現(xiàn)章贞,你會(huì)抽象出哪些函數(shù)(or接口)供使用祥绞?(比如 play()) 例...
    _達(dá)斯基閱讀 345評(píng)論 0 0