核心概念
- Agent:使用JVM 運(yùn)行Flume矩桂。每臺(tái)機(jī)器運(yùn)行一個(gè)agent晨川,但是可以在一個(gè)agent中包含多個(gè)sources和sinks返十。
Client:生產(chǎn)數(shù)據(jù)策肝,運(yùn)行在一個(gè)獨(dú)立的線程肛捍。 - Source:從Client專(zhuān)門(mén)用來(lái)收集數(shù)據(jù)隐绵,傳遞給Channel,可以處理各種類(lèi)型拙毫、各種格式的日志數(shù)據(jù),包括avro依许、thrift、exec缀蹄、jms峭跳、spooling directory、netcat缺前、sequence generator蛀醉、syslog、http衅码、legacy拯刁、自定義。
- Sink:從Channel收集數(shù)據(jù)逝段,運(yùn)行在一個(gè)獨(dú)立線程,sink組件是用于把數(shù)據(jù)發(fā)送到目的地的組件垛玻,目的地包括hdfs、logger奶躯、avro帚桩、thrift、ipc嘹黔、file账嚎、null、Hbase参淹、solr醉锄、自定義。
- Channel:連接 sources 和 sinks 浙值,這個(gè)有點(diǎn)像一個(gè)隊(duì)列,source組件把數(shù)據(jù)收集來(lái)以后恳不,臨時(shí)存放在channel中,即channel組件在agent中是專(zhuān)門(mén)用來(lái)存放臨時(shí)數(shù)據(jù)的——對(duì)采集到的數(shù)據(jù)進(jìn)行簡(jiǎn)單的緩存开呐,可以存放在memory烟勋、jdbc、file等等筐付。
- Events:可以是日志記錄卵惦、 avro 對(duì)象等。
Agent的概念
Flume以agent為最小的獨(dú)立運(yùn)行單位瓦戚。一個(gè)agent就是一個(gè)JVM,agent本身是一個(gè)Java進(jìn)程沮尿,運(yùn)行在日志收集節(jié)點(diǎn)—所謂日志收集節(jié)點(diǎn)就是服務(wù)器節(jié)點(diǎn)。
單agent由Source、Sink和Channel三大組件構(gòu)成畜疾,類(lèi)似生產(chǎn)者赴邻、倉(cāng)庫(kù)、消費(fèi)者的架構(gòu).如下圖:
[站外圖片上傳中...(image-64c038-1541939331883)]
a single node flume
NetCat Source:監(jiān)聽(tīng)一個(gè)指定的網(wǎng)絡(luò)端口啡捶,即只要應(yīng)用程序向這個(gè)端口里面寫(xiě)數(shù)據(jù)姥敛,這個(gè)source組件就可以獲取到信息。
在/home/hadoop/script/flume下新建配置文件a-single-node.conf瞎暑,配置文件如下:
#a-single-node.conf : A single node flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
保存之后運(yùn)行彤敛,執(zhí)行命令:
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/a-single-node.conf \
-Dflume.root.logger=INFO,console
參數(shù)說(shuō)明:
-n 指定agent名稱(chēng)(與配置文件中代理的名字相同)
-c 指定flume中配置文件的目錄
-f 指定配置文件
-Dflume.root.logger=DEBUG,console 設(shè)置日志等級(jí)
通過(guò)telnet監(jiān)聽(tīng)端口:
telnet localhost 44444
輸入任意數(shù)據(jù),在flume中可以看到輸出:
18/08/02 15:25:29 INFO sink.LoggerSink: Event: { headers:{} body: 61 62 63 0D abc. }
采集指定文件數(shù)據(jù)存入到hdfs
source-channel-sink :exec - memory - hdfs
配置文件如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
# 監(jiān)聽(tīng)文件路徑
a1.sources.r1.command = tail -F /home/hadoop/data/flume/logs/access.log
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = hdfs
# hdfs路徑
a1.sinks.k1.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/tail
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.batchSize=10
a1.sinks.k1.hdfs.useLocalTimeStamp=true
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
缺少這個(gè)配置的時(shí)候
a1.sinks.k1.hdfs.useLocalTimeStamp=true
會(huì)出現(xiàn)異常
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
原因是因?yàn)閷?xiě)入到hfds時(shí)使用到了時(shí)間戳來(lái)區(qū)分目錄結(jié)構(gòu)了赌,flume的消息組件event在接受到之后在header中沒(méi)有發(fā)現(xiàn)時(shí)間戳參數(shù)墨榄,導(dǎo)致該錯(cuò)誤發(fā)生,有三種方法可以解決這個(gè)錯(cuò)誤:
- agent1.sources.source1.interceptors = t1
agent1.sources.source1.interceptors.t1.type = timestamp 為source添加攔截勿她,每條event頭中加入時(shí)間戳渠概;(效率會(huì)慢一些) - agent1.sinks.sink1.hdfs.useLocalTimeStamp = true 為sink指定該參數(shù)為true (如果客戶端和flume集群時(shí)間不一致數(shù)據(jù)時(shí)間會(huì)不準(zhǔn)確)
- 在向source發(fā)送event時(shí),將時(shí)間戳參數(shù)添加到event的header中即可嫂拴,header是一個(gè)map播揪,添加時(shí)mapkey為timestamp(推薦使用)
采集指定文件夾的內(nèi)容到控制臺(tái)
source - channel - sink :spooling - memory - logger
目錄下的文件如果已經(jīng)讀取完畢會(huì)增加后綴.COMPELETE,且文件名不能相同
配置文件如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir=/home/hadoop/temp
a1.sources.r1.fileHeader=true
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = logger
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
小文件問(wèn)題
案例:采集指定文件夾內(nèi)容到hdfs taildir - memory - hdfs
配置文件如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/hadoop/temp/position/taildir_position.json
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/home/hadoop/temp/flume/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2=/home/hadoop/temp/flume/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/manager
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.hdfs.batchSize=15
#關(guān)鍵參數(shù) 三個(gè)是或的關(guān)系 滿足一個(gè)就會(huì)roll
a1.sinks.k1.hdfs.rollInterval= 0 #按時(shí)間 0為參數(shù)不生效
a1.sinks.k1.hdfs.rollSize= 500 #按大小 0為參數(shù)不生效
a1.sinks.k1.hdfs.rollCount = 0 #按記錄數(shù) 0為參數(shù)不生效
a1.sinks.k1.hdfs.useLocalTimeStamp=true
# Bind the source and sink to the channel
a1.sources.r1.channels =
多個(gè)channel
一個(gè)channel對(duì)應(yīng)輸出到日志的sink,另外一個(gè)對(duì)應(yīng)寫(xiě)入到Hdfs的sink
配置文件如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/hadoop/temp/position/taildir_position.json
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/home/hadoop/temp/flume/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2=/home/hadoop/temp/flume/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
# Use a channel which buffers events in memory
a1.channels.c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type=memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/manager
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=TEXT
a1.sinks.k2.hdfs.hdfs.batchSize=15
a1.sinks.k2.hdfs.rollInterval= 0
a1.sinks.k2.hdfs.rollSize= 0
a1.sinks.k2.hdfs.rollCount = 100
# Bind the source and sink to the channel
a1.sources.r1.channels =c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
sink processor
主要包含兩種方式:failover和load_balance
- failover:Failover Sink Processor維護(hù)了一個(gè)sink的優(yōu)先級(jí)列表,具有故障轉(zhuǎn)移的功能筒狠,每個(gè)sink都有一個(gè)權(quán)值用于表示自己的優(yōu)先級(jí)猪狈,優(yōu)先級(jí)值高Sink會(huì)更早被激活。值越大辩恼,優(yōu)先級(jí)越高雇庙。表示優(yōu)先級(jí)的權(quán)值不能相同。
- load_balance:按照一定的算法選擇sink輸出到指定地方灶伊,如果在文件輸出量很大的情況下疆前,負(fù)載均衡還是很有必要的,通過(guò)多個(gè)通道輸出緩解輸出壓力,flume內(nèi)置的負(fù)載均衡的算法默認(rèn)是round robin(輪詢(xún)算法),還有一個(gè)random(隨機(jī)算法)聘萨。
failover 配置如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop002:9001/user/hadoop/flume/manager
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=TEXT
a1.sinks.k2.hdfs.hdfs.batchSize=15
a1.sinks.k2.hdfs.rollInterval= 0
a1.sinks.k2.hdfs.rollSize= 0
a1.sinks.k2.hdfs.rollCount = 10
a1.sinks.k2.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
load_balance配置如下(更改負(fù)載均衡策略進(jìn)行測(cè)試):
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/manager
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=TEXT
a1.sinks.k2.hdfs.hdfs.batchSize=15
a1.sinks.k2.hdfs.rollInterval= 0
a1.sinks.k2.hdfs.rollSize= 0
a1.sinks.k2.hdfs.rollCount = 10
a1.sinks.k2.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
avro source和avro sink
該案例需要用到兩個(gè)agent竹椒,一個(gè)作為數(shù)據(jù)源:產(chǎn)生數(shù)據(jù),一個(gè)作為數(shù)據(jù)接收端:接收數(shù)據(jù)
數(shù)據(jù)源agent配置如下:
# Name the components on this agent
avro-source-agent.sources = exec-source
avro-source-agent.sinks = avro-sink
avro-source-agent.channels = avro-memory-channel
# Describe/configure the source
avro-source-agent.sources.exec-source.type = exec
avro-source-agent.sources.exec-source.command = tail -F /home/hadoop/data/flume/logs/access.log
# Use a channel which buffers events in memory
avro-source-agent.channels.avro-memory-channel.type = memory
avro-source-agent.channels.avro-memory-channel.capacity = 1000
avro-source-agent.channels.avro-memory-channel.transactionCapacity = 100
avro-source-agent.sinks.avro-sink.type=avro
avro-source-agent.sinks.avro-sink.hostname=hadoop002
avro-source-agent.sinks.avro-sink.port=44444
# Bind the source and sink to the channel
avro-source-agent.sources.exec-source.channels = avro-memory-channel
avro-source-agent.sinks.avro-sink.channel = avro-memory-channel
數(shù)據(jù)接收端配置如下:
# Name the components on this agent
avro-sink-agent.sources = avro-source
avro-sink-agent.sinks = avro-logger
avro-sink-agent.channels = avro-memory-channel
# Describe/configure the source
avro-sink-agent.sources.avro-source.type = avro
avro-sink-agent.sources.avro-source.bind = hadoop002
avro-sink-agent.sources.avro-source.port = 44444
# Use a channel which buffers events in memory
avro-sink-agent.channels.avro-memory-channel.type = memory
avro-sink-agent.channels.avro-memory-channel.capacity = 1000
avro-sink-agent.channels.avro-memory-channel.transactionCapacity = 100
avro-sink-agent.sinks.avro-logger.type=logger
# Bind the source and sink to the channel
avro-sink-agent.sources.avro-source.channels = avro-memory-channel
avro-sink-agent.sinks.avro-logger.channel = avro-memory-channel
依次啟動(dòng)avro-sink-agent,和avro-source-agent
flume-ng agent \
--name avro-sink-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/avro-sink.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name avro-source-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/avro-source.conf \
-Dflume.root.logger=INFO,console