Flume 日志收集系統(tǒng)
#安裝
在node01下
在hadoop用戶下
cd ~/apps
在此路徑下 解壓后是 flume-1.8.0
ll
cd conf
再把演示用的各種conf文件上傳給node01
再通過scp命令把 這個flume-1.8.0傳給另外4給node
cat example.conf
sources = r1 #多個source可以用空格隔開
channels = c1
sinks = k1
sources.r1.type = netcat #綁定一個本地端口,往flume里面?zhèn)鬏敂?shù)據(jù)
sources.r1.bind = localhost
sources.r1.port = 44444
sources.r1.channels = c1 #把source和channel關(guān)聯(lián)起來
channels.c1.type = memory ¥在內(nèi)存中存儲
channels.c1.capacity = 1000 #可以存1000個event
channels.c1.transactionCapacity = 100 #一次事務(wù)提交多少給event
sinks.k1.type =logger #以日志的形式在黑窗口中打出來
sinks.k1.channel = c1 #在哪個channel中拉取數(shù)據(jù)
啟動flume
再新開一個node01
telnet localhost 44444
訪問這個端口
輸入 哈哈哈
#詳細(xì)介紹flume的組件
一個event在一個agent中的傳輸處理流程如下:source--interceptor--selector->channel->sink processor--sink->中心存儲/下一級
agent
#1.avro source
Avro Source:支持Avro協(xié)議留美,接收RPC事件請求谎砾。Avro Source通過監(jiān)
聽Avro端口接收外部Avro客戶端流事件(event)奕枝,在Flume的多層架
構(gòu)中經(jīng)常被使用接收上游Avro Sink發(fā)送的event
? 關(guān)鍵參數(shù)說明
? type:類型名稱avro
? bind :綁定的IP
? port :監(jiān)聽的端口
? threads:接收請求的線程數(shù)隘道,當(dāng)需要接收多個avro客戶端的數(shù)據(jù)流時要設(shè)置合
適的線程數(shù)谭梗,否則會造成avro客戶端數(shù)據(jù)流積壓
? compression-type:是否使用壓縮,如果使用壓縮設(shè)則值為“deflate”设塔,avro
source一般用于多個Agent組成的數(shù)據(jù)流闰蛔,接收來自avro sink的event序六,如果avro
source設(shè)置了壓縮蚤吹,name上一階段的avro sink也要設(shè)置壓縮。默認(rèn)值none
? channels:Source對接的Channel名稱
#2.exec source
Exec Source:支持Linux命令繁涂,收集標(biāo)準(zhǔn)輸?shù)姆绞奖O(jiān)聽指定文件扔罪。
Exec Source可以實現(xiàn)實時的消息傳輸步势,但文件的位置,不支持?jǐn)帱c續(xù)傳盅抚,當(dāng)Exec Source后續(xù)增加的消息丟失妄均,一般在測試環(huán)境使用
關(guān)鍵參數(shù)說明
? type :source類型為exec
? command :Linux命令
? channels :Source對接的Channel名稱丰包。
-----演示Avro Source和exec Source
在啟動前 先vim avrosource.conf
修改綁定的IP為192.16.183.101
在node01中啟動 bin/flume-ng agent --conf conf --conf-file conf/avrosource.conf --name avroagent -Dflume.root.logger=INFO,console
接著新開一個node01(1)的窗口
mkdir -p /home/hadoop/apps/flume/execsource/
touch exectest.log
echo 123 > exectest.log
echo 34567 >> exectest.log
啟動execagent
再新建一個窗口node01(2)邑彪,啟動execagent
再回到node01(1)的窗口
cd /home/hadoop/apps/flume/execsource/
echo 8040 >>? exectest.log
原理是 execsource.conf 監(jiān)聽node01/home/hadoop/apps/flume/execsource/exectest.log中日志文件的變化寄症,收集日志里面的數(shù)據(jù)再傳給avrosource
他接收后矩动,在黑窗口中打印出數(shù)據(jù)
----演示Spooling Directory Source
Spooling Directory Source:監(jiān)聽一個文件夾悲没,收集文件夾下文件數(shù)據(jù)示姿,收集完文件數(shù)據(jù)會將文件名稱的后綴改為.COMPLETED
缺點不支持已存在文件新增數(shù)據(jù)的收集,且不能夠?qū)η短孜募A遞歸監(jiān)聽
關(guān)鍵參數(shù)說明
? type :source類型為spooldir
? spoolDir:source監(jiān)聽的文件夾
? fileHeader :是否添加文件的絕對路徑到event的header中渗鬼,默認(rèn)值false
? fileHeaderKey:添加到event header中文件絕對路徑的鍵值,默認(rèn)值file
? fileSuffix:收集完新文件數(shù)據(jù)給文件添加的后綴名稱命锄,默認(rèn)值:
.COMPLETED
? channels :Source對接的Channel名稱
先做好預(yù)備工作
在node01下
cd ~/apps/flume
mkdir spoolDir
mkdir selector
mkdir taildir
mkdir filechannel
mkdir multiplexing
啟動Spooling Directory Source
在node01的一個新窗口中 bin/flume-ng agent --conf conf --conf-file conf/spooldirsource.conf --name a1 -Dflume.root.logger=INFO,console
換一個窗口
cd ~/apps/flume/spoolDir
echo 134 > test1
echo 477 >> test1? ? 不會響應(yīng)脐恩,也不能監(jiān)聽子文件夾下面的數(shù)據(jù)
echo 477 >> test2? 新的文件就會收集到
---演示Kafka source
先創(chuàng)建kafka 主題 先在node03,node04,node05上啟動kafka
在node03上苟翻, cd ~/apps/kafka_2.11-0.10.2.1
bin/kafka-topics.sh --create --zookeeper 192.168.183.101:2181,192.168.183.102:2181,192.168.183.103:2181,192.168.183.104:2181,192.168.183.105:2181, --replication-factor 2 --partitions 3 --topic flumetopictest1
在node01上先vim kafkasource.conf的主機名稱
然后
bin/flume-ng agent --conf conf --conf-file conf/kafkasource.conf --name kafkasourceagent -Dflume.root.logger=INFO,console
在node03上啟動kafka的客戶端
cd /home/hadoop/apps/kafka_2.11-0.10.2.1
bin/kafka-console-producer.sh --broker-list 192.168.183.103:9092,192.168.183.104:9092 --topic flumetopictest1
寫入12345
node01上就會輸出寫入的結(jié)果
---演示taildir source
Taildir Source:監(jiān)聽一個文件夾或者文件崇猫,通過正則表達式匹配需要監(jiān)聽的數(shù)據(jù)源文件诅炉,Taildir Source通過將監(jiān)聽的文件位置寫入到文件中來實現(xiàn)斷點
續(xù)傳屋厘,并且能夠保證沒有重復(fù)數(shù)據(jù)的讀取
關(guān)鍵參數(shù)說明
? type:source類型TAILDIR
? positionFile:保存監(jiān)聽文件讀取位置的文件路徑
? idleTimeout:關(guān)閉空閑文件延遲時間汗洒,如果有新的記錄添加到已關(guān)閉的空閑文件,taildir srouce將繼續(xù)打開該空閑文件溢谤,默認(rèn)值120000毫秒(2分鐘)
? writePosInterval:向保存讀取位置文件中寫入讀取文件位置的時間間隔,默認(rèn)值
3000毫秒
? batchSize:批量寫入channel最大event數(shù)鲫构,默認(rèn)值100
? maxBackoffSleep:每次最后一次嘗試沒有獲取到監(jiān)聽文件最新數(shù)據(jù)的最大延遲時間,默認(rèn)值5000毫秒
先做好預(yù)備工作 在node01下
cd ~/apps/flume/taildir
mkdir test1
mkdir test2
mkdir position
在node01中啟動 在flume-1.8.0文件夾下
bin/flume-ng agent --n a1 --conf conf --conf-file conf/taildirsource.conf --name taildiragent -Dflume.root.logger=INFO,console
在另一個窗口中
cd ~/apps/flume/taildir/test1
echo 123 > test.log
cd ~/apps/flume/taildir/test2
echo 4590? > file2.log
----演示filechannel
先做好預(yù)備工作 在node01下
cd? /home/hadoop/apps/flume/filechannel
mkdir data
mkdir checkpoint
mkdir backup
cd /home/hadoop/apps/flume-1.8.0
bin/flume-ng agent --conf conf --conf-file conf/filechannle.conf --name a1 -Dflume.root.logger=INFO,console
在node01的另一個窗口
telnet localhost 44444
發(fā)送12345
再開一個node01的窗口
cd /home/hadoop/apps/flume/filechannel/data 發(fā)現(xiàn)已經(jīng)創(chuàng)建了文件。
cd /home/hadoop/apps/flume/filechannel/checkpoint 已經(jīng)創(chuàng)建了檢查點文件
----演示kafkachannel(首選)
存儲容量更大湿镀,容錯更好
關(guān)鍵參數(shù)說明:
? type:Kafka Channel類型org.apache.flume.channel.kafka.KafkaChannel
? kafka.bootstrap.servers:Kafka broker列表勉痴,格式為ip1:port1, ip2:port2…,建
議配置多個值提高容錯能力瀑罗,多個值之間用逗號隔開
? kafka.topic:topic名稱,默認(rèn)值“flume-channel”
? kafka.consumer.group.id:Consumer Group Id劣像,全局唯一
? parseAsFlumeEvent:是否以Avro FlumeEvent模式寫入到Kafka Channel中摧玫,
默認(rèn)值true,event的header信息與event body都寫入到kafka中
? pollTimeout:輪詢超時時間屋群,默認(rèn)值500毫秒
? kafka.consumer.auto.offset.reset:earliest表示從最早的偏移量開始拉取芍躏,latest
表示從最新的偏移量開始拉取纸肉,none表示如果沒有發(fā)現(xiàn)該Consumer組之前拉
取的偏移量則拋異常
首先做好預(yù)備工作
在node01上
cd /home/hadoop/apps/flume-1.8.0/conf
vim kafkachannel.conf(修改kafka broker的機器號)
在node03上
創(chuàng)建一個topic
cd /home/hadoop/apps/kafka_2.11-0.10.2.1
bin/kafka-topics.sh --create --zookeeper 192.168.183.101:2181 --replication-factor 1 --partitions 3 --topic flumechannel2
在node01上啟動agent
cd /home/hadoop/apps/flume-1.8.0
bin/flume-ng agent --conf conf --conf-file conf/kafkachannel.conf --name a1 -Dflume.root.logger=INFO,console
在node01的一個新窗口中
telnet localhost 44444? 發(fā)送數(shù)據(jù)123456789
----演示HDFSsink
在node01上喊熟,cd /home/hadoop/apps/flume-1.8.0
bin/flume-ng agent --conf conf --conf-file conf/hdfssink.conf --name a1 -Dflume.root.logger=INFO,console
在node01上
使用telnet發(fā)送數(shù)據(jù)
telnet localhost 44444? ? 發(fā)送12345555
在node01的一個新窗口上芥牌,
hadoop fs -ls /data/flume/20180811 可以發(fā)現(xiàn)一個前綴為hdfssink的文件
---演示kafkasink(略)
---演示replicating seletor
cd /home/hadoop/apps/flume-1.8.0/conf
先修改 vim replicating_selector.conf? kafka的server
修好后
在node03上創(chuàng)建kakfka的 topic
cd /home/hadoop/apps/kafka_2.11-0.10.2.1
bin/kafka-topics.sh --create --zookeeper 192.168.183.101:2181 --replication-factor 1 --partitions 3 --topic FlumeSelectorTopic1
在node01上啟動agent
cd /home/hadoop/apps/flume-1.8.0/
bin/flume-ng agent --conf conf --conf-file conf/replicating_selector.conf --name a1
新開一個node01的窗口
telnet localhost 44444?
再在一個新窗口中可以發(fā)現(xiàn)
cd /home/hadoop/apps/flume/selector? 發(fā)現(xiàn)已經(jīng)寫入數(shù)據(jù)
在node03 啟動kafka客戶端監(jiān)聽主題
bin/kafka-console-consumer.sh --zookeeper 192.168.183.101:2181 --from-beginning --topic FlumeSelectorTopic1
回到node01的窗口
telnet localhost 44444? 發(fā)送數(shù)據(jù) 78834? ? 發(fā)送node03的kafka已經(jīng)讀到了
----演示Multiplexing? Channel Selector
在node01上 cd ~/apps/flume/multiplexing
mkdir k11
mkdir k22
mkdir k33
修改四個配置文件綁定的端口號
avro_sink1.conf avro_sink2.conf avro_sink3.conf multiplexing.conf
在node01上
bin/flume-ng agent --conf conf --conf-file conf/multiplexing_selector.conf --name a3 -Dflume.root.logger=INFO,console
再分別啟動三個階段的agent
在node01的一個新窗口下
cd /home/hadoop/apps/flume-1.8.0
bin/flume-ng agent --conf conf --conf-file conf/avro_sink1.conf --name agent1 >/dev/null 2>&1 &
bin/flume-ng agent --conf conf --conf-file conf/avro_sink2.conf --name agent2 >/dev/null 2>&1 &
bin/flume-ng agent --conf conf --conf-file conf/avro_sink3.conf --name agent3 >/dev/null 2>&1 &
jps后
發(fā)現(xiàn)的application進程就是新的agent進程
看看端口在不在
lsof -i:44444
lsof -i:44445
lsof -i:44446 發(fā)現(xiàn)端口正常監(jiān)聽
在node01的一個新窗口中
telnet localhost 44444? 發(fā)送4444444444
telnet localhost 44445? 發(fā)送5555555555
telnet localhost 44446? 發(fā)送6666666666
查看
cd /home/hadoop/apps/flume/multiplexing/k11
cat 1533997666003-10
發(fā)現(xiàn)了4444444444這個數(shù)據(jù)
cd /home/hadoop/apps/flume/multiplexing/k12
cat 1533997666003-10
發(fā)現(xiàn)5555555555這個數(shù)據(jù)
cd /home/hadoop/apps/flume/multiplexing/k12
cat 1533997666003-10
發(fā)現(xiàn)6666666666這個數(shù)據(jù)
---sink processor
多個sink processor需要
負(fù)載均衡或者容錯的processor