Flume構(gòu)建日志采集系統(tǒng)


title: Flume構(gòu)建日志采集系統(tǒng)
date: 2018-02-03 19:45
tags: [flume,kafka]


一、Flume介紹

1.Flume特點
  • Flume是一個分布式的、可靠的、高可用的海量日志采集
    岁疼、聚合和傳輸?shù)南到y(tǒng)
  • 數(shù)據(jù)流模型:Source-Channel-Sink
  • 事務(wù)機(jī)制保證消息傳遞的可靠性
  • 內(nèi)置豐富插件餐禁,輕松與其他系統(tǒng)集成
  • Java實現(xiàn)传睹,優(yōu)秀的系統(tǒng)框架設(shè)計乾戏,模塊分明,易于開發(fā)

2.Flume原型圖

Flume原型圖.png

3.Flume基本組件

  • Event:消息的基本單位斑粱,有header和body組成
  • Agent:JVM進(jìn)程弃揽,負(fù)責(zé)將一端外部來源產(chǎn)生的消息轉(zhuǎn) 發(fā)到另一端外部的目的地
    • Source:從外部來源讀入event,并寫入channel
    • Channel:event暫存組件则北,source寫入后矿微,event將會 一直保存,
    • Sink:從channel讀入event,并寫入目的地

3.Flume事件流

Flume事件流.png

4.Flumes數(shù)據(jù)流

Flume數(shù)據(jù)流.png

Flume數(shù)據(jù)流2.png

二尚揣、Flume搭建

1.下載二進(jìn)制安裝包

下載地址:http://flume.apache.org/download.html

2.安裝Flume

解壓縮安裝包文件

[hadoop@hadoop01 apps]$ tar -zxvf apache-flume-1.8.0-bin.tar.gz 
[hadoop@hadoop01 apps]$ cd apache-flume-1.8.0-bin/
[hadoop@hadoop01 apache-flume-1.8.0-bin]$ ll
總用量 148
drwxr-xr-x.  2 hadoop hadoop    62 1月  21 14:31 bin
-rw-r--r--.  1 hadoop hadoop 81264 9月  15 20:26 CHANGELOG
drwxr-xr-x.  2 hadoop hadoop   127 1月  21 14:31 conf
-rw-r--r--.  1 hadoop hadoop  5681 9月  15 20:26 DEVNOTES
-rw-r--r--.  1 hadoop hadoop  2873 9月  15 20:26 doap_Flume.rdf
drwxr-xr-x. 10 hadoop hadoop  4096 9月  15 20:48 docs
drwxr-xr-x.  2 hadoop hadoop  8192 1月  21 14:31 lib
-rw-r--r--.  1 hadoop hadoop 27663 9月  15 20:26 LICENSE
-rw-r--r--.  1 hadoop hadoop   249 9月  15 20:26 NOTICE
-rw-r--r--.  1 hadoop hadoop  2483 9月  15 20:26 README.md
-rw-r--r--.  1 hadoop hadoop  1588 9月  15 20:26 RELEASE-NOTES
drwxr-xr-x.  2 hadoop hadoop    68 1月  21 14:31 tools
[hadoop@hadoop01 apache-flume-1.8.0-bin]$ 

3.創(chuàng)建軟連接【此步驟可省略】

[root@hadoop01 bin]# ln -s /home/hadoop/apps/apache-flume-1.8.0-bin /usr/local/flume

4.配置環(huán)境變量

編輯 /etc/profile文件涌矢,增加以下內(nèi)容:

export FLUME_HOME=/usr/local/flume
export PATH=$PATH:${JAVA_HOME}/bin:${ZOOKEEPER_HOME}/bin:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:${HIVE_HOME}/bin:${FLUME_HOME}/bin

4.啟動flume

使用example.conf 配置文件啟動一個實例

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

啟動命令如下:

[root@hadoop01 conf]# pwd
/home/hadoop/apps/apache-flume-1.8.0-bin/conf
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file  example.conf --name a1 -Dflume.root.logger=INFO,console

啟動成功后如下圖所示:

........略
18/01/27 18:17:25 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]
18/01/27 18:17:25 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.NetcatSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@20470f counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
18/01/27 18:17:25 INFO node.Application: Starting Channel c1
18/01/27 18:17:25 INFO node.Application: Waiting for channel: c1 to start. Sleeping for 500 ms
18/01/27 18:17:25 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
18/01/27 18:17:25 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
18/01/27 18:17:26 INFO node.Application: Starting Sink k1
18/01/27 18:17:26 INFO node.Application: Starting Source r1
18/01/27 18:17:26 INFO source.NetcatSource: Source starting
18/01/27 18:17:26 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]

使用telnet發(fā)送數(shù)據(jù)

[root@hadoop01 apps]# telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Are you OK ?
OK

控制臺打印如下:

Impl[/127.0.0.1:44444]
18/01/27 18:21:00 INFO sink.LoggerSink: Event: { headers:{} body: 41 72 65 20 79 6F 75 20 4F 4B 20 3F 0D          Are you OK ?. }

如無法使用telnet,請先安裝telnet工具

[root@hadoop01 apps]# yum -y install telnet

三快骗、Flume實踐

1.Source組件清單

  • Source:對接各種外部數(shù)據(jù)源娜庇,將收集到的事件發(fā)送到Channel中,一個source可以向多個channel發(fā)送event方篮,F(xiàn)lume內(nèi)置非常豐富的Source思灌,同時用戶可以自定義Source
Source類型 Type 用途
Avro Source avro 啟動一個Avro Server,可與上一級Agent連接
HTTP Source http 啟動一個HttpServer
Exec Source exec 執(zhí)行unix command恭取,獲取標(biāo)準(zhǔn)輸出熄守,如tail -f
Taildir Source TAILDIR 監(jiān)聽目錄或文件
Spooling Directory Source spooldir 監(jiān)聽目錄下的新增文件
Kafka Source org.apache.flume.sourc e.kafka.KafkaSource 讀取Kafka數(shù)據(jù)
JMS Source jms 從JMS源讀取數(shù)據(jù)

2.avro Source Agent 和Exec Source Agent

  • 配置一個avroagent,avrosource.conf 配置文件如下:
//avrosource.conf
avroagent.sources = r1
avroagent.channels = c1
avroagent.sinks = k1 
avroagent.sources.r1.type = avro
avroagent.sources.r1.bind = 192.168.43.20
avroagent.sources.r1.port = 8888
avroagent.sources.r1.threads= 3
avroagent.sources.r1.channels = c1
avroagent.channels.c1.type = memory
avroagent.channels.c1.capacity = 10000 
avroagent.channels.c1.transactionCapacity = 1000
avroagent.sinks.k1.type = logger
avroagent.sinks.k1.channel = c1
  • 啟動一個avrosource的agent
[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file avrosource.conf  --name avroagent -Dflume.root.logger=INFO,console

啟動成功入下圖所示:

...略
18/01/27 18:46:36 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
18/01/27 18:46:36 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
18/01/27 18:46:36 INFO node.Application: Starting Sink k1
18/01/27 18:46:36 INFO node.Application: Starting Source r1
18/01/27 18:46:36 INFO source.AvroSource: Starting Avro source r1: { bindAddress: 192.168.43.20, port: 8888 }...
18/01/27 18:46:37 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
18/01/27 18:46:37 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
18/01/27 18:46:37 INFO source.AvroSource: Avro source r1 started
  • 配置一個execAgent,實現(xiàn)與sourceAgent實現(xiàn)串聯(lián)云头,execsource.conf 配置文件如下:
execagent.sources = r1 
execagent.channels = c1
execagent.sinks = k1
execagent.sources.r1.type = exec 
execagent.sources.r1.command = tail -F /home/hadoop/apps/flume/execsource/exectest.log
execagent.sources.r1.channels = c1
execagent.channels.c1.type = memory
execagent.channels.c1.capacity = 10000 
execagent.channels.c1.transactionCapacity = 1000
execagent.sinks.k1.type = avro
execagent.sinks.k1.channel = c1
execagent.sinks.k1.hostname = 192.168.43.20
execagent.sinks.k1.port = 8888
  • 啟動一個execAgent,并實現(xiàn)execagent監(jiān)控文件變化影涉,sourceAgent接收變化內(nèi)容

啟動 execAgent

[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file execsource.conf --name execagent

啟動成功如下下圖所示:

18/01/27 18:58:43 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
18/01/27 18:58:43 INFO sink.AbstractRpcSink: Rpc sink k1: Building RpcClient with hostname: 192.168.43.20, port: 8888
18/01/27 18:58:43 INFO sink.AvroSink: Attempting to create Avro Rpc client.
18/01/27 18:58:43 WARN api.NettyAvroRpcClient: Using default maxIOWorkers
18/01/27 18:58:44 INFO sink.AbstractRpcSink: Rpc sink k1 started.

在execAgent監(jiān)控的文件下寫入內(nèi)容豁陆,觀察sourceagent是否接收到變化內(nèi)容

[root@hadoop01 execsource]# echo 222 > exectest.log 
[root@hadoop01 execsource]# echo 5555 >> exectest.log 
[root@hadoop01 execsource]# cat exectest.log 
222
5555

在sourceagent控制打印臺下查看監(jiān)控消息如下:

18/01/27 18:58:50 INFO sink.LoggerSink: Event: { headers:{} body: 31 32 33                                        123 }
18/01/27 18:59:55 INFO sink.LoggerSink: Event: { headers:{} body: 35 35 35 35                                     5555 }

則說明2個串聯(lián)agent傳遞信息成功。

說明:
avroagent 配置文件配置項起始名稱需要與服務(wù)啟動 -name 名稱相一致。

3.Source組件- Spooling Directory Source

  • 配置一個Spooling Directory Source ,spooldirsource.conf 配置文件內(nèi)容如下:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /home/hadoop/apps/flume/spoolDir
a1.sources.r1.fileHeader = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

/home/hadoop/apps/flume/spoolDir 必須已經(jīng)創(chuàng)建且具有用戶讀寫權(quán)限竭宰。

啟動 SpoolDirsourceAgent

[hadoop@hadoop01 conf]$ flume-ng agent --conf conf --conf-file spooldirsource.conf  --name a1 -Dflume.root.logger=INFO,console

在spoolDir文件夾下創(chuàng)建文件并寫入文件內(nèi)容空郊,觀察控制臺消息:

18/01/28 17:06:54 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/hadoop/apps/flume/spoolDir/test to /home/hadoop/apps/flume/spoolDir/test.COMPLETED
18/01/28 17:06:55 INFO sink.LoggerSink: Event: { headers:{file=/home/hadoop/apps/flume/spoolDir/test} body: 32 32 32                                        222 }

此時監(jiān)測到SpoolDirSourceAgent 可以監(jiān)控到文件變化。

值得說明的是:Spooling Directory Source Agent 并不能監(jiān)聽子級文件夾的文件變化,也不支持已存在的文件更新數(shù)據(jù)變化.

4.Source組件- Kafka Source

  • 配置一個Kafa Source , kafasource.conf 配置文件內(nèi)容如下:
kafkasourceagent.sources = r1
kafkasourceagent.channels = c1
kafkasourceagent.sinks = k1
kafkasourceagent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource 
kafkasourceagent.sources.r1.channels = c1 
kafkasourceagent.sources.r1.batchSize = 100
kafkasourceagent.sources.r1.batchDurationMillis = 1000
kafkasourceagent.sources.r1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.43.23:9092,192.168.43.24:9092
kafkasourceagent.sources.r1.kafka.topics = flumetopictest1
kafkasourceagent.sources.r1.kafka.consumer.group.id = flumekafkagroupid
kafkasourceagent.channels.c1.type = memory
kafkasourceagent.channels.c1.capacity = 10000 
kafkasourceagent.channels.c1.transactionCapacity = 1000
kafkasourceagent.sinks.k1.type = logger
kafkasourceagent.sinks.k1.channel = c1

首先啟動3個節(jié)點的kafka節(jié)點服務(wù)切揭,在每個kafka節(jié)點執(zhí)行狞甚,以后臺方式運(yùn)行

[root@hadoop03 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

在kafka節(jié)點上創(chuàng)建一個配置好的Topic flumetoptest1,命令如下:

[root@hadoop03 bin]# ./kafka-topics.sh --create --zookeeper 192.168.43.20:2181 --replication-factor 1 --partitions 3 --topic flumetopictest1
Created topic "flumetopictest1".

創(chuàng)建成功后,啟動一個kafka Source Agent廓旬,命令如下:

[root@hadoop01 conf]# flume-ng  agent --conf conf --conf-file kafkasource.conf --name kafkasourceagent -Dflume.root.logger=INFO,console

創(chuàng)建一個Kafka 生產(chǎn)者哼审,進(jìn)行消息發(fā)送

root@hadoop03 bin]# ./kafka-console-producer.sh --broker-list 192.168.43.22:9092,192.168.43.23:9092 --topic flumetopictest1

發(fā)送消息,此時kafka 就可以接收到消息:

18/02/03 20:36:57 INFO sink.LoggerSink: Event: { headers:{topic=flumetopictest1, partition=2, timestamp=1517661413068} body: 31 32 33 31 33 32 32 31                         12313221 }
18/02/03 20:37:09 INFO sink.LoggerSink: Event: { headers:{topic=flumetopictest1, partition=1, timestamp=1517661428930} body: 77 69 20 61 69 79 6F 75 08 08 08                wi aiyou... }

5.Source 組件 -Taildir source

監(jiān)聽一個文件夾或者文件孕豹,通過正則表達(dá)式匹配需要監(jiān)聽的 數(shù)據(jù)源文件涩盾,Taildir Source通過將監(jiān)聽的文件位置寫入到文件中來實現(xiàn)斷點續(xù)傳,并且能夠保證沒有重復(fù)數(shù)據(jù)的讀取.

  • 重要參數(shù)

    type:source類型TAILDIR

    positionFile:保存監(jiān)聽文件讀取位置的文件路徑

    idleTimeout:關(guān)閉空閑文件延遲時間励背,如果有新的記錄添加到已關(guān)閉的空閑文件

    taildir srouce將繼續(xù)打開該空閑文件春霍,默認(rèn)值120000毫秒

    writePosInterval:向保存讀取位置文件中寫入讀取文件位置的時間間隔,默認(rèn)值
    3000毫秒

    batchSize:批量寫入channel最大event數(shù)叶眉,默認(rèn)值100

    maxBackoffSleep:每次最后一次嘗試沒有獲取到監(jiān)聽文件最新數(shù)據(jù)的最大延遲時 間址儒,默認(rèn)值5000毫秒

    cachePatternMatching:對于監(jiān)聽的文件夾下通過正則表達(dá)式匹配的文件可能數(shù)量 會很多,將匹配成功的監(jiān)聽文件列表和讀取文件列表的順序都添加到緩存中衅疙,可以提高性能莲趣,默認(rèn)值true

    fileHeader :是否添加文件的絕對路徑到event的header中,默認(rèn)值false

    fileHeaderKey:添加到event header中文件絕對路徑的鍵值饱溢,默認(rèn)值file

    filegroups:監(jiān)聽的文件組列表喧伞,taildirsource通過文件組監(jiān)聽多個目錄或文件

    filegroups.<filegroupName>:文件正則表達(dá)式路徑或者監(jiān)聽指定文件路徑

    channels:Source對接的Channel名稱
  • 配置一個taildir Source,具體taildirsource.conf 配置文件內(nèi)容如下:
taildiragent.sources=r1
taildiragent.channels=c1
taildiragent.sinks=k1
taildiragent.sources.r1.type=TAILDIR
taildiragent.sources.r1.positionFile=/home/hadoop/apps/flume/taildir/position/taildir_position.json
taildiragent.sources.r1.filegroups=f1 f2
taildiragent.sources.r1.filegroups.f1=/home/hadoop/apps/flume/taildir/test1/test.log
taildiragent.sources.r1.filegroups.f2=/home/hadoop/apps/flume/taildir/test2/.*log.*
taildiragent.sources.r1.channels=c1
taildiragent.channels.c1.type=memory
taildiragent.channels.c1.transcationCapacity=1000
taildiragent.sinks.k1.type=logger
taildiragent.sinks.k1.channel=c1

啟動一個taildirSource agent ,代碼如下:

[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file taildirsource.conf --name taildiragent -Dflume.root.logger=INFO,console

開始在test1和test2文件夾寫入文件,觀察agent消息接收。

6.Channel組件

  • Channel:Channel被設(shè)計為event中轉(zhuǎn)暫存區(qū)絮识,存儲Source 收集并且沒有被Sink消費(fèi)的event 绿聘,為了平衡Source收集 和Sink讀取數(shù)據(jù)的速度,可視為Flume內(nèi)部的消息隊列次舌。
  • Channel是線程安全的并且具有事務(wù)性,支持source寫失 敗重復(fù)寫和sink讀失敗重復(fù)讀等操作
  • 常用的Channel類型有:Memory Channel兽愤、File Channel彼念、
    Kafka Channel、JDBC Channel等

7.Channel組件- Memory Channel

  • Memory Channel:使用內(nèi)存作為Channel浅萧,Memory Channel讀寫速度 快逐沙,但是存儲數(shù)據(jù)量小,F(xiàn)lume進(jìn)程掛掉洼畅、服務(wù)器停機(jī)或者重啟都會 導(dǎo)致數(shù)據(jù)丟失吩案。部署Flume Agent的線上服務(wù)器內(nèi)存資源充足、不關(guān) 心數(shù)據(jù)丟失的場景下可以使用
    關(guān)鍵參數(shù):
type :channel類型memory
capacity :channel中存儲的最大event數(shù)帝簇,默認(rèn)值100
transactionCapacity :一次事務(wù)中寫入和讀取的event最大數(shù)徘郭,默認(rèn)值100。
keep-alive:在Channel中寫入或讀取event等待完成的超時時間丧肴,默認(rèn)值3秒
byteCapacityBufferPercentage:緩沖空間占Channel容量(byteCapacity)的百分比残揉,為event中的頭信息保留了空間,默認(rèn)值20(單位百分比)
byteCapacity :Channel占用內(nèi)存的最大容量芋浮,默認(rèn)值為Flume堆內(nèi)存的80%

8. Channel組件- File Channel

  • File Channel:將event寫入到磁盤文件中抱环,與Memory Channel相比存 儲容量大,無數(shù)據(jù)丟失風(fēng)險纸巷。
  • File Channle數(shù)據(jù)存儲路徑可以配置多磁盤文件路徑镇草,提高寫入文件性能
  • Flume將Event順序?qū)懭氲紽ile Channel文件的末尾,在配置文件中通
    過設(shè)置maxFileSize參數(shù)設(shè)置數(shù)據(jù)文件大小上限
  • 當(dāng)一個已關(guān)閉的只讀數(shù)據(jù)文件中的Event被完全讀取完成瘤旨,并且Sink已經(jīng)提交讀取完成的事務(wù)梯啤,則Flume將刪除存儲該數(shù)據(jù)文件
  • 通過設(shè)置檢查點和備份檢查點在Agent重啟之后能夠快速將File Channle中的數(shù)據(jù)按順序回放到內(nèi)存中
    關(guān)鍵參數(shù)如下:
 type:channel類型為file 
 checkpointDir:檢查點目錄,默認(rèn)在啟動flume用戶目錄下創(chuàng)建裆站,建 議單獨(dú)配置磁盤路徑 
 useDualCheckpoints:是否開啟備份檢查點条辟,默認(rèn)false,建議設(shè)置為true開啟備份檢查點宏胯,備份檢查點的作用是當(dāng)Agent意外出錯導(dǎo)致寫 入檢查點文件異常羽嫡,在重新啟動File  Channel時通過備份檢查點將數(shù)據(jù)回放到內(nèi)存中,如果不開啟備份檢查點肩袍,在數(shù)據(jù)回放的過程中發(fā)現(xiàn)檢查點文件異常會對所數(shù)據(jù)進(jìn)行全回放杭棵,全回放的過程相當(dāng)耗時 
 backupCheckpointDir:備份檢查點目錄,最好不要和檢查點目錄在同 一塊磁盤上 
 checkpointInterval:每次寫檢查點的時間間隔,默認(rèn)值30000毫秒 
 dataDirs:數(shù)據(jù)文件磁盤存儲路徑魂爪,建議配置多塊盤的多個路徑先舷,通過磁盤的并行寫入來提高file channel性能,多個磁盤路徑用逗號隔開
 transactionCapacity:一次事務(wù)中寫入和讀取的event最大數(shù)滓侍,默認(rèn)值 10000
 maxFileSize:每個數(shù)據(jù)文件的最大大小蒋川,默認(rèn)值:2146435071字節(jié)
 minimumRequiredSpace:磁盤路徑最小剩余空間,如果磁盤剩余空 間小于設(shè)置值撩笆,則不再寫入數(shù)據(jù)
 capacity:file channel可容納的最大event數(shù)
 keep-alive:在Channel中寫入或讀取event等待完成的超時時間捺球,默認(rèn)值3秒

配置一個FileChannel,filechannel.conf 的配置內(nèi)容如下:

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.dataDirs = /home/hadoop/apps/flume/filechannel/data
a1.channels.c1.checkpointDir = /home/hadoop/apps/flume/filechannel/checkpoint 
a1.channels.c1.useDualCheckpoints = true
a1.channels.c1.backupCheckpointDir = /home/hadoop/apps/flume/filechannel/backup
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

啟動一個FileChannel,啟動命令如下:

[root@hadoop01 bin]# flume-ng agent --conf conf --conf-file filechannle.conf --name a1 -Dflume.root.logger=INFO,console

向配置文件端口44444發(fā)送數(shù)據(jù),觀察Channel記錄情況

telnet localhost asdfasd

此時可以觀察到控制臺打印監(jiān)控結(jié)果

18/02/04 21:15:44 INFO sink.LoggerSink: Event: { headers:{} body: 61 64 66 61 64 66 61 64 66 61 73 66 0D          adfadfadfasf. }
18/02/04 21:15:48 INFO file.EventQueueBackingStoreFile: Start checkpoint for /home/hadoop/apps/flume/filechannel/checkpoint/checkpoint, elements to sync = 1
18/02/04 21:15:48 INFO file.EventQueueBackingStoreFile: Updating checkpoint metadata: logWriteOrderID: 1517749968978, queueSize: 0, queueHead: 0
18/02/04 21:15:48 INFO file.EventQueueBackingStoreFile: Attempting to back up checkpoint.
18/02/04 21:15:48 INFO file.Serialization: Skipping in_use.lock because it is in excludes set
18/02/04 21:15:48 INFO file.Serialization: Deleted the following files: , checkpoint, checkpoint.meta, inflightputs, inflighttakes.
18/02/04 21:15:48 INFO file.Log: Updated checkpoint for file: /home/hadoop/apps/flume/filechannel/data/log-2 position: 170 logWriteOrderID: 1517749968978
18/02/04 21:15:49 INFO file.EventQueueBackingStoreFile: Checkpoint backup completed.

9.Channel組件- Kafka Channel

Kafka Channel:將分布式消息隊列kafka作為channel相對于Memory Channel和File Channel存儲容量更大夕冲、 容錯能力更強(qiáng)氮兵,彌補(bǔ)了其他兩種Channel的短板,如果合理利用Kafka的性能歹鱼,能夠達(dá)到事半功倍的效果泣栈。
關(guān)鍵參數(shù)如下:

type:Kafka Channel類型org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers:Kafka broker列表,格式為ip1:port1, ip2:port2…弥姻,建 議配置多個值提高容錯能力南片,多個值之間用逗號隔開
kafka.topic:topic名稱,默認(rèn)值“flume-channel”
kafka.consumer.group.id:Consumer Group Id蚁阳,全局唯一
parseAsFlumeEvent:是否以Avro FlumeEvent模式寫入到Kafka Channel中铃绒,  默認(rèn)值true,event的header信息與event body都寫入到kafka中
pollTimeout:輪詢超時時間螺捐,默認(rèn)值500毫秒
kafka.consumer.auto.offset.reset:earliest表示從最早的偏移量開始拉取颠悬,latest表示從最新的偏移量開始拉取,none表示如果沒有發(fā)現(xiàn)該Consumer組之前拉 取的偏移量則拋異常

配置一個KafakChannel定血, kafkachannel.conf 配置內(nèi)容如下:

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.43.23:9092
a1.channels.c1.kafka.topic = flumechannel2
a1.channels.c1.kafka.consumer.group.id = flumecgtest1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

啟動kafak服務(wù)赔癌,創(chuàng)建一個kafka主題,命令如下:

[root@hadoop03 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@hadoop03 bin]# ./kafka-topics.sh --create --zookeeper 192.168.43.20:2181 --replication-factor 1 --partitions 3 --topic flumechannel2

查看創(chuàng)建的主題信息

[root@hadoop03 bin]# ./kafka-topics.sh --list --zookeeper 192.168.43.20:2181
__consumer_offsets
flumechannel2
topicnewtest1

啟動kafka agent,使用telnet發(fā)送數(shù)據(jù)

[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file kafkachannel.conf --name a1 -Dflume.root.logger=INFO,console
[root@hadoop01 flume]# clear
[root@hadoop01 flume]# telnet localhost 44444 
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
abc
OK

監(jiān)聽信息如下:

18/02/04 21:39:33 INFO sink.LoggerSink: Event: { headers:{} body: 61 62 63 0D                                     abc. }

10.Sink組件

  • Sink:從Channel消費(fèi)event澜沟,輸出到外部存儲灾票,或者輸出到下一個階段的agent
  • 一個Sink只能從一個Channel中消費(fèi)event
  • 當(dāng)Sink寫出event成功后,就會向Channel提交事務(wù)茫虽。Sink 事務(wù)提交成功刊苍,處理完成的event將會被Channel刪除。否 則Channel會等待Sink重新消費(fèi)處理失敗的event
  • Flume提供了豐富的Sink組件濒析,如Avro Sink正什、HDFS Sink、Kafka Sink号杏、File Roll Sink婴氮、HTTP Sink等

11.Sink組件- Avro Sink

  • Avro Sink常用于對接下一層的Avro Source,通過發(fā)送RPC請求將Event發(fā)送到下一層的Avro Source
  • 為了減少Event傳輸占用大量的網(wǎng)絡(luò)資源, Avro Sink提供了端到端的批量壓縮數(shù)據(jù)傳輸

關(guān)鍵參數(shù)說明

type:Sink類型為avro主经。
hostname:綁定的目標(biāo)Avro Souce主機(jī)名稱或者IP
port:綁定的目標(biāo)Avro Souce端口號
batch-size:批量發(fā)送Event數(shù)荣暮,默認(rèn)值100
compression-type:是否使用壓縮,如果使用壓縮設(shè)則值為
“deflate”罩驻, Avro Sink設(shè)置了壓縮那么Avro Source也應(yīng)設(shè)置相同的 壓縮格式穗酥,目前支持zlib壓縮,默認(rèn)值none
compression-level:壓縮級別惠遏,0表示不壓縮迷扇,從1到9數(shù)字越大壓縮
效果越好,默認(rèn)值6

12.Sink組件- HDFS Sink

  • HDFS Sink將Event寫入到HDFS中持久化存儲
  • HDFS Sink提供了強(qiáng)大的時間戳轉(zhuǎn)義功能爽哎,根據(jù)Event頭信息中的
  • timestamp時間戳信息轉(zhuǎn)義成日期格式,在HDFS中以日期目錄分層存儲

關(guān)鍵參數(shù)信息說明如下:

type:Sink類型為hdfs器一。
hdfs.path:HDFS存儲路徑课锌,支持按日期時間分區(qū)。
hdfs.filePrefix:Event輸出到HDFS的文件名前綴祈秕,默認(rèn)前綴FlumeData
hdfs.fileSuffix:Event輸出到HDFS的文件名后綴
hdfs.inUsePrefix:臨時文件名前綴
hdfs.inUseSuffix:臨時文件名后綴渺贤,默認(rèn)值.tmp
hdfs.rollInterval:HDFS文件滾動生成時間間隔,默認(rèn)值30秒请毛,該值設(shè)置 為0表示文件不根據(jù)時間滾動生成

配置一個hdfsink.conf文件志鞍,配置內(nèi)容如下:

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000 
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /data/flume/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = hdfssink
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.callTimeout = 60000

啟動一個hdfssink agent,命令如下:

[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file hdfssink.conf --name a1 -Dflume.root.logger=INFO,console

使用telnet 向44444發(fā)送數(shù)據(jù)方仿,觀察數(shù)據(jù)寫入結(jié)果

[hadoop@hadoop01 root]$ telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
abc
OK
2323444
OK

此時控制臺打印固棚,在HDFS文件系統(tǒng)生成一個臨時文件

8/02/04 22:41:52 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
18/02/04 22:41:52 INFO hdfs.BucketWriter: Creating /data/flume/20180204/hdfssink.1517755312242.tmp
18/02/04 22:42:24 INFO hdfs.BucketWriter: Closing /data/flume/20180204/hdfssink.1517755312242.tmp
18/02/04 22:42:24 INFO hdfs.BucketWriter: Renaming /data/flume/20180204/hdfssink.1517755312242.tmp to /data/flume/20180204/hdfssink.1517755312242
18/02/04 22:42:24 INFO hdfs.HDFSEventSink: Writer callback called.

值得注意的是:請使用hadoop用戶來執(zhí)行agent的創(chuàng)建和消息的發(fā)送,避免因權(quán)限導(dǎo)致HDFS文件無法寫入

13.Sink組件- Kafka Sink

Flume通過KafkaSink將Event寫入到Kafka指定的主題中
主要參數(shù)說明如下:

 type:Sink類型仙蚜,值為KafkaSink類路徑  org.apache.flume.sink.kafka.KafkaSink此洲。
 kafka.bootstrap.servers:Broker列表,定義格式host:port委粉,多個Broker之間用逗號隔開呜师,可以配置一個也可以配置多個,用于Producer發(fā)現(xiàn)集群中的Broker贾节,建議配置多個汁汗,防止當(dāng)個Broker出現(xiàn)問題連接 失敗。
 kafka.topic:Kafka中Topic主題名稱栗涂,默認(rèn)值flume-topic知牌。
 flumeBatchSize:Producer端單次批量發(fā)送的消息條數(shù),該值應(yīng)該根據(jù)實際環(huán)境適當(dāng)調(diào)整戴差,增大批量發(fā)送消息的條數(shù)能夠在一定程度上提高性能送爸,但是同時也增加了延遲和Producer端數(shù)據(jù)丟失的風(fēng)險。 默認(rèn)值100。
 kafka.producer.acks:設(shè)置Producer端發(fā)送消息到Borker是否等待接收Broker返回成功送達(dá)信號袭厂。0表示Producer發(fā)送消息到Broker之后不需要等待Broker返回成功送達(dá)的信號墨吓,這種方式吞吐量高,但是存 在數(shù)據(jù)丟失的風(fēng)險纹磺。1表示Broker接收到消息成功寫入本地log文件后向Producer返回成功接收的信號帖烘,不需要等待所有的Follower全部同步完消息后再做回應(yīng),這種方式在數(shù)據(jù)丟失風(fēng)險和吞吐量之間做了平衡橄杨。all(或者-1)表示Broker接收到Producer的消息成功寫入本 地log并且等待所有的Follower成功寫入本地log后向Producer返回成功接收的信號秘症,這種方式能夠保證消息不丟失,但是性能最差式矫。默 認(rèn)值1乡摹。
 useFlumeEventFormat:默認(rèn)值false,Kafka Sink只會將Event body內(nèi) 容發(fā)送到Kafka Topic中采转。如果設(shè)置為true聪廉,Producer發(fā)送到KafkaTopic中的Event將能夠保留Producer端頭信息

配置一個kafkasink.conf,具體配置內(nèi)容如下:

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000 
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = c1
a1.sinks.k1.kafka.topic = FlumeKafkaSinkTopic1
a1.sinks.k1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.43.23:9092
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1

啟動kafka Broker節(jié)點22和Broker節(jié)點23

[root@hadoop03 bin]# ./kafka-server-start.sh -daemon ../config/server.properties 

按配置文件創(chuàng)建主題信息

[root@hadoop03 bin]# ./kafka-topics.sh --create --zookeeper 192.168.43.20:2181 --replication-factor 1 --partitions 3 --topic FlumeKafkaSinkTopic1
Created topic "FlumeKafkaSinkTopic1".

啟動一個kafkasink agent,啟動命令如下:

[root@hadoop01 conf]# flume-ng agent --conf conf --conf-file kafkasink.conf --name a1 >/dev/null 2>&1 &

14.Interceptor攔截器

  • Source將event寫入到Channel之前調(diào)用攔截器
  • Source和Channel之間可以有多個攔截器故慈,不同的攔截器使用不同的 規(guī)則處理Event
  • 可選跟伏、輕量級猛频、可插拔的插件
  • 通過實現(xiàn)Interceptor接口實現(xiàn)自定義的攔截器
  • 內(nèi)置攔截器:Timestamp Interceptor、Host Interceptor、UUID Interceptor蕊蝗、Static Interceptor镀裤、Regex Filtering Interceptor等

15.Timestamp Interceptor

  • Flume使用時間戳攔截器在event頭信息中添加時間戳信息镜悉, Key為timestamp辉词,Value為攔截器攔截Event時的時間戳
  • 頭信息時間戳的作用,比如HDFS存儲的數(shù)據(jù)采用時間分區(qū)存儲情萤,Sink可以根據(jù)Event頭信息中的時間戳將Event按照時間分區(qū)寫入到 HDFS
  • 關(guān)鍵參數(shù)說明:
    • type:攔截器類型為timestamp
    • preserveExisting:如果頭信息中存在timestamp時間戳信息是否保留原來的時間戳信息鸭蛙,true保留,false使用新的時間戳替換已經(jīng)存在的時間戳筋岛,默認(rèn)值為false

16.Host Interceptor

  • Flume使用主機(jī)戳攔截器在Event頭信息中添加主機(jī)名稱或者IP
  • 主機(jī)攔截器的作用:比如Source將Event按照主機(jī)名稱寫入到不同的Channel中便于后續(xù)的Sink對不同Channnel中的數(shù)據(jù)分開處理
  • 關(guān)鍵參數(shù)說明:
    • type:攔截器類型為host
    • preserveExisting:如果頭信息中存在timestamp時間戳信息是否保留原來的時間戳信息娶视,true保留,false使用新的時間戳替換已經(jīng)存在的時間戳睁宰,默認(rèn)值為false
    • useIP:是否使用IP作為主機(jī)信息寫入都信息肪获,默認(rèn)值為false
    • hostHeader:設(shè)置頭信息中主機(jī)信息的Key,默認(rèn)值為host

17.Host InterceptorStatic Interceptor

  • Flume使用static interceptor靜態(tài)攔截器在evetn頭信息添加靜態(tài)信息
  • 關(guān)鍵參數(shù)說明:
  • type:攔截器類型為static
    • preserveExisting:如果頭信息中存在timestamp時間戳信息是否保留原來的時間戳信息柒傻,true保留孝赫,false使用新的時間戳替換已經(jīng) 存在的時間戳,默認(rèn)值為false
    • key:頭信息中的鍵
    • value:頭信息中鍵對應(yīng)的值

18.Selector選擇器

  • Source將event寫入到Channel之前調(diào)用攔截器红符,如果配置了Interceptor攔截器青柄,則Selector在攔截器全部處理完之后調(diào)用伐债。通過
    selector決定event寫入Channel的方式
  • 內(nèi)置Replicating Channel Selector復(fù)制Channel選擇器、 Multiplexing Channel Selector復(fù)用Channel選擇器

19.Replicating Channel Selector

  • 如果Channel選擇器沒有指定致开,默認(rèn)是Replicating Channel Selector峰锁。即一個Source以復(fù)制的方式將一個event同時寫入到多個Channel中,不同的Sink可以從不同的Channel中獲取相同的event双戳。
  • 關(guān)鍵參數(shù)說明:
    • selector.type:Channel選擇器類型為replicating
    • selector.optional:定義可選Channel虹蒋,當(dāng)寫入event到可選Channel失敗時,不會向Source拋出異常飒货,繼續(xù)執(zhí)行魄衅。多個可選Channel之 間用空格隔開

一個source將一個event拷貝到多個channel,通過不同的sink消費(fèi)不同的channel塘辅,將相同的event輸出到不同的地方
配置文件:replicating_selector.conf

a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
#定義source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#設(shè)置復(fù)制選擇器
a1.sources.r1.selector.type = replicating
#設(shè)置required channel
a1.sources.r1.channels = c1 c2
#設(shè)置channel c1
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
#設(shè)置channel c2
a1.channels.c2.type = memory 
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 1000
#設(shè)置kafka sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = FlumeSelectorTopic1
a1.sinks.k1.kafka.bootstrap.servers = 192.168.43.22:9092,192.168.23.103:9092
a1.sinks.k1.kafka.flumeBatchSize = 5
a1.sinks.k1.kafka.producer.acks = 1
#設(shè)置file sink
a1.sinks.k2.channel = c2
a1.sinks.k2.type = file_roll
a1.sinks.k2.sink.directory = /home/hadoop/apps/flume/selector
a1.sinks.k2.sink.rollInterval = 60

分別寫入到kafka和文件中

創(chuàng)建主題FlumeKafkaSinkTopic1

bin/kafka-topics.sh --create --zookeeper 192.168.183.100:2181 --replication-factor 1 --partitions 3 --topic FlumeSelectorTopic1

啟動flume agent

bin/flume-ng agent --conf conf --conf-file conf/replicating_selector.conf --name a1

使用telnet發(fā)送數(shù)據(jù)

telnet localhost 44444

查看/home/hadoop/apps/flume/selector路徑下的數(shù)據(jù)

查看kafka FlumeSelectorTopic1主題數(shù)據(jù)

bin/kafka-console-consumer.sh --zookeeper 192.168.183.100:2181 --from-beginning --topic FlumeSelectorTopic1

20.Multiplexing Channel Selector

-Multiplexing Channel Selector多路復(fù)用選擇器根據(jù)event的頭信息中不
同鍵值數(shù)據(jù)來判斷Event應(yīng)該被寫入到哪個Channel中

  • 三種級別的Channel晃虫,分別是必選channle、可選channel扣墩、默認(rèn)channel
  • 關(guān)鍵參數(shù)說明:
selector.type:Channel選擇器類型為multiplexing
selector.header:設(shè)置頭信息中用于檢測的headerName
selector.default:默認(rèn)寫入的Channel列表
selector.mapping.*:headerName對應(yīng)的不同值映射的不同Channel列表
selector.optional:可選寫入的Channel列表

配置文件multiplexing_selector.conf傲茄、avro_sink1.conf、avro_sink2.conf沮榜、avro_sink3.conf
向不同的avro_sink對應(yīng)的配置文件的agent發(fā)送數(shù)據(jù),不同的avro_sink配置文件通過static interceptor在event頭信息中寫入不同的靜態(tài)數(shù)據(jù)
multiplexing_selector根據(jù)event頭信息中不同的靜態(tài)數(shù)據(jù)類型分別發(fā)送到不同的目的地

multiplexing_selector.conf

a3.sources = r1
a3.channels = c1 c2 c3
a3.sinks = k1 k2 k3
a3.sources.r1.type = avro
a3.sources.r1.bind = 192.168.183.100
a3.sources.r1.port = 8888
a3.sources.r1.threads= 3
#設(shè)置multiplexing selector
a3.sources.r1.selector.type = multiplexing
a3.sources.r1.selector.header = logtype
#通過header中l(wèi)ogtype鍵對應(yīng)的值來選擇不同的sink
a3.sources.r1.selector.mapping.ad = c1
a3.sources.r1.selector.mapping.search = c2
a3.sources.r1.selector.default = c3
a3.sources.r1.channels = c1 c2 c3
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 1000
a3.channels.c2.type = memory
a3.channels.c2.capacity = 10000
a3.channels.c2.transactionCapacity = 1000
a3.channels.c3.type = memory
a3.channels.c3.capacity = 10000
a3.channels.c3.transactionCapacity = 1000
#分別設(shè)置三個sink的不同輸出
a3.sinks.k1.type = file_roll
a3.sinks.k1.channel = c1
a3.sinks.k1.sink.directory = /home/hadoop/apps/flume/multiplexing/k11
a3.sinks.k1.sink.rollInterval = 60
a3.sinks.k2.channel = c2
a3.sinks.k2.type = file_roll
a3.sinks.k2.sink.directory = /home/hadoop/apps/flume/multiplexing/k12
a3.sinks.k2.sink.rollInterval = 60
a3.sinks.k3.channel = c3
a3.sinks.k3.type = file_roll
a3.sinks.k3.sink.directory = /home/hadoop/apps/flume/multiplexing/k13
a3.sinks.k3.sink.rollInterval = 60

avro_sink1.conf

agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
agent1.sources.r1.type = netcat
agent1.sources.r1.bind = localhost
agent1.sources.r1.port = 44444
agent1.sources.r1.interceptors = i1
agent1.sources.r1.interceptors.i1.type = static
agent1.sources.r1.interceptors.i1.key = logtype
agent1.sources.r1.interceptors.i1.value = ad
agent1.sources.r1.interceptors.i1.preserveExisting = false
agent1.sources.r1.channels = c1
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 10000 
agent1.channels.c1.transactionCapacity = 1000
agent1.sinks.k1.type = avro
agent1.sinks.k1.channel = c1
agent1.sinks.k1.hostname = 192.168.183.100
agent1.sinks.k1.port = 8888

avro_sink2.conf

agent2.sources = r1
agent2.channels = c1
agent2.sinks = k1
agent2.sources.r1.type = netcat
agent2.sources.r1.bind = localhost
agent2.sources.r1.port = 44445
agent2.sources.r1.interceptors = i1
agent2.sources.r1.interceptors.i1.type = static
agent2.sources.r1.interceptors.i1.key = logtype
agent2.sources.r1.interceptors.i1.value = search
agent2.sources.r1.interceptors.i1.preserveExisting = false
agent2.sources.r1.channels = c1
agent2.channels.c1.type = memory
agent2.channels.c1.capacity = 10000 
agent2.channels.c1.transactionCapacity = 1000
agent2.sinks.k1.type = avro
agent2.sinks.k1.channel = c1
agent2.sinks.k1.hostname = 192.168.183.100
agent2.sinks.k1.port = 8888

avro_sink3.conf

agent3.sources = r1
agent3.channels = c1
agent3.sinks = k1
agent3.sources.r1.type = netcat
agent3.sources.r1.bind = localhost
agent3.sources.r1.port = 44446
agent3.sources.r1.interceptors = i1
agent3.sources.r1.interceptors.i1.type = static
agent3.sources.r1.interceptors.i1.key = logtype
agent3.sources.r1.interceptors.i1.value = other
agent3.sources.r1.interceptors.i1.preserveExisting = false
agent3.sources.r1.channels = c1
agent3.channels.c1.type = memory
agent3.channels.c1.capacity = 10000 
agent3.channels.c1.transactionCapacity = 1000
agent3.sinks.k1.type = avro
agent3.sinks.k1.channel = c1
agent3.sinks.k1.hostname = 192.168.183.100
agent3.sinks.k1.port = 8888

在/home/hadoop/apps/flume/multiplexing目錄下分別創(chuàng)建看k1 k2 k3目錄

bin/flume-ng agent --conf conf --conf-file conf/multiplexing_selector.conf --name a3 -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf --conf-file conf/avro_sink1.conf --name agent1 >/dev/null 2>&1 &
bin/flume-ng agent --conf conf --conf-file conf/avro_sink2.conf --name agent2 >/dev/null 2>&1 &
bin/flume-ng agent --conf conf --conf-file conf/avro_sink3.conf --name agent3 >/dev/null 2>&1 &

使用telnet發(fā)送數(shù)據(jù)
telnet localhost 44444

21.Sink Processor

  • Sink Processor協(xié)調(diào)多個sink間進(jìn)行l(wèi)oad balance和fail over
  • Default Sink Processor只有一個sink喻粹,無需創(chuàng)建Sink Processor
  • Sink Group:將多個sink放到一個組內(nèi)蟆融,要求組內(nèi)一個sink消費(fèi)channel
  • Load-Balancing Sink Processor(負(fù)載均衡處理器)round_robin(默認(rèn))或 random
  • Failover Sink Processor(容錯處理器)可定義一個sink優(yōu)先級列表,根據(jù)優(yōu)先級選擇使用的sink

22.Load-Balancing Sink Processor

關(guān)鍵參數(shù)說明:

sinks:sink組內(nèi)的子Sink守呜,多個子sink之間用空格隔開
processor.type:設(shè)置負(fù)載均衡類型load_balance
processor.backoff:設(shè)置為true時型酥,如果在系統(tǒng)運(yùn)行過程中執(zhí)行的Sink失敗,會將失敗的Sink放進(jìn)一個冷卻池中查乒。默認(rèn)值false
processor.selector.maxTimeOut:失敗sink在冷卻池中最大駐留時間弥喉,默認(rèn)值30000ms
processor.selector:負(fù)載均衡選擇算法,可以使用輪詢“round_robin”玛迄、隨機(jī)“random”或者是繼承AbstractSinkSelector類的自定義負(fù)載均衡實現(xiàn)類
示例

23.Failover Sink Processor

關(guān)鍵參數(shù)說明:

sinks:sink組內(nèi)的子Sink由境,多個子sink之間用空格隔開
processor.type:設(shè)置故障轉(zhuǎn)移類型“failover”
processor.priority.<sinkName>:指定Sink組內(nèi)各子Sink的優(yōu)先級別,優(yōu)先級從高到低蓖议,數(shù)值越大優(yōu)先級越高
processor.maxpenalty:等待失敗的Sink恢復(fù)的最長時間虏杰,默認(rèn)值30000毫秒
示例

24.Failover應(yīng)用場景

  • 分布式日志收集場景
  • 多個agent收集不同機(jī)器上相同類型的日志數(shù)據(jù),為了保障高可用勒虾,采用分層部署纺阔,日志收集層Collector部署兩個甚至多個,Agent通過Failover SinkProcessor實現(xiàn)其中任何一個collector掛掉不影響系統(tǒng)的日志收集服務(wù)


    示例

總結(jié)

總結(jié)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末修然,一起剝皮案震驚了整個濱河市笛钝,隨后出現(xiàn)的幾起案子质况,更是在濱河造成了極大的恐慌,老刑警劉巖玻靡,帶你破解...
    沈念sama閱讀 221,406評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件结榄,死亡現(xiàn)場離奇詭異,居然都是意外死亡啃奴,警方通過查閱死者的電腦和手機(jī)潭陪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,395評論 3 398
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來最蕾,“玉大人依溯,你說我怎么就攤上這事∥猎颍” “怎么了黎炉?”我有些...
    開封第一講書人閱讀 167,815評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長醋拧。 經(jīng)常有香客問我慷嗜,道長,這世上最難降的妖魔是什么丹壕? 我笑而不...
    開封第一講書人閱讀 59,537評論 1 296
  • 正文 為了忘掉前任庆械,我火速辦了婚禮,結(jié)果婚禮上菌赖,老公的妹妹穿的比我還像新娘缭乘。我一直安慰自己,他們只是感情好琉用,可當(dāng)我...
    茶點故事閱讀 68,536評論 6 397
  • 文/花漫 我一把揭開白布堕绩。 她就那樣靜靜地躺著,像睡著了一般邑时。 火紅的嫁衣襯著肌膚如雪奴紧。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,184評論 1 308
  • 那天晶丘,我揣著相機(jī)與錄音黍氮,去河邊找鬼。 笑死浅浮,一個胖子當(dāng)著我的面吹牛滤钱,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播脑题,決...
    沈念sama閱讀 40,776評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼件缸,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了叔遂?” 一聲冷哼從身側(cè)響起他炊,我...
    開封第一講書人閱讀 39,668評論 0 276
  • 序言:老撾萬榮一對情侶失蹤争剿,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后痊末,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蚕苇,經(jīng)...
    沈念sama閱讀 46,212評論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,299評論 3 340
  • 正文 我和宋清朗相戀三年凿叠,在試婚紗的時候發(fā)現(xiàn)自己被綠了涩笤。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,438評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡盒件,死狀恐怖蹬碧,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情炒刁,我是刑警寧澤恩沽,帶...
    沈念sama閱讀 36,128評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站翔始,受9級特大地震影響罗心,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜城瞎,卻給世界環(huán)境...
    茶點故事閱讀 41,807評論 3 333
  • 文/蒙蒙 一渤闷、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧脖镀,春花似錦肤晓、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,279評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽漫萄。三九已至卷员,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間腾务,已是汗流浹背毕骡。 一陣腳步聲響...
    開封第一講書人閱讀 33,395評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留岩瘦,地道東北人未巫。 一個月前我還...
    沈念sama閱讀 48,827評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像启昧,于是被迫代替她去往敵國和親叙凡。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,446評論 2 359

推薦閱讀更多精彩內(nèi)容

  • 博客原文 翻譯作品密末,水平有限握爷,如有錯誤跛璧,煩請留言指正。原文請見 官網(wǎng)英文文檔 引言 概述 Apache Flume...
    rabbitGYK閱讀 11,481評論 13 34
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理新啼,服務(wù)發(fā)現(xiàn)追城,斷路器,智...
    卡卡羅2017閱讀 134,695評論 18 139
  • 介紹 概述 Apache Flume是為有效收集聚合和移動大量來自不同源到中心數(shù)據(jù)存儲而設(shè)計的可分布燥撞,可靠的座柱,可用...
    ximengchj閱讀 3,525評論 0 13
  • 轉(zhuǎn)自http://www.aboutyun.com/thread-8317-1-1.html 問題導(dǎo)讀: 1.Fl...
    大時代_f479閱讀 2,100評論 0 6
  • 沒有什么驚天動地的事情色洞,沒有什么驚世駭俗的生離死別 那些夢想的夢想,依然是夢想 那些承諾的承諾茶鉴,慢慢失去了它的有效...
    悲傷的嗜好閱讀 202評論 0 0