這里主要介紹幾種常見的日志的source來源秸滴,包括監(jiān)控文件型武契,監(jiān)控文件內(nèi)容增量,TCP和HTTP。
Spool類型
用于監(jiān)控指定目錄內(nèi)數(shù)據(jù)變更咒唆,若有新文件届垫,則將新文件內(nèi)數(shù)據(jù)讀取上傳
在教你一步搭建Flume分布式日志系統(tǒng)最后有介紹此案例
Exec
EXEC執(zhí)行一個(gè)給定的命令獲得輸出的源,如果要使用tail命令,必選使得file足夠大才能看到輸出內(nèi)容
創(chuàng)建agent配置文件
vi /usr/local/flume170/conf/exec_tail.conf
[](javascript:void(0); "復(fù)制代碼")
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2 # Describe/configure the source
a1.sources.r1.type = exec a1.sources.r1.channels = c1 c2
a1.sources.r1.command = tail -F /var/log/haproxy.log # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /usr/local/flume170/checkpoint
a1.channels.c2.dataDirs = /usr/local/flume170/data # Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel =c1
a1.sinks.k2.type = FILE_ROLL
a1.sinks.k2.channel = c2
a1.sinks.k2.sink.directory = /usr/local/flume170/files
a1.sinks.k2.sink.rollInterval = 0</pre>
](javascript:void(0); "復(fù)制代碼")
啟動(dòng)flume agent a1
# /usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console
生成足夠多的內(nèi)容在文件里
# for i in {1..100};do echo "exec tail$i" >> /usr/local/flume170/log_exec_tail;echo $i;sleep 0.1;done
在H32的控制臺(tái)全释,可以看到以下信息:
Http
JSONHandler型
基于HTTP POST或GET方式的數(shù)據(jù)源装处,支持JSON、BLOB表示形式
創(chuàng)建agent配置文件
vi /usr/local/flume170/conf/post_json.conf
[](javascript:void(0); "復(fù)制代碼")
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">a1.sources = r1
a1.channels = c1
a1.sinks = k1 # Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.port = 5142 a1.sources.r1.channels = c1 # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1</pre>
](javascript:void(0); "復(fù)制代碼")
啟動(dòng)flume agent a1
/usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console
生成JSON 格式的POST request
curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "idoall.org_body"}]' http://localhost:8888
在H32的控制臺(tái)浸船,可以看到以下信息:
Tcp
Syslogtcp監(jiān)聽TCP的端口做為數(shù)據(jù)源
創(chuàng)建agent配置文件
vi /usr/local/flume170/conf/syslog_tcp.conf
[](javascript:void(0); "復(fù)制代碼")
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">a1.sources = r1
a1.channels = c1
a1.sinks = k1 # Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140 a1.sources.r1.host = H32
a1.sources.r1.channels = c1 # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1</pre>
](javascript:void(0); "復(fù)制代碼")
啟動(dòng)flume agent a1
/usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console
測(cè)試產(chǎn)生syslog
echo "hello idoall.org syslog" | nc localhost 5140
在H32的控制臺(tái)妄迁,可以看到以下信息:
Flume Sink Processors和Avro類型
Avro可以發(fā)送一個(gè)給定的文件給Flume,Avro 源使用AVRO RPC機(jī)制李命。
failover的機(jī)器是一直發(fā)送給其中一個(gè)sink登淘,當(dāng)這個(gè)sink不可用的時(shí)候,自動(dòng)發(fā)送到下一個(gè)sink封字。channel的transactionCapacity參數(shù)不能小于sink的batchsiz
在H32創(chuàng)建Flume_Sink_Processors配置文件
# vi /usr/local/flume170/conf/Flume_Sink_Processors.conf
](javascript:void(0); "復(fù)制代碼")
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2 # Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 c2
a1.sources.r1.selector.type = replicating # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100
Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = H32
a1.sinks.k1.port = 5141 a1.sinks.k2.type = avro
a1.sinks.k2.channel = c2
a1.sinks.k2.hostname = H33
a1.sinks.k2.port = 5141
這個(gè)是配置failover的關(guān)鍵黔州,需要有一個(gè)sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 # 處理的類型是failover
a1.sinkgroups.g1.processor.type = failover # 優(yōu)先級(jí),數(shù)字越大優(yōu)先級(jí)越高阔籽,每個(gè)sink的優(yōu)先級(jí)必須不相同
a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10
設(shè)置為10秒流妻,當(dāng)然可以根據(jù)你的實(shí)際狀況更改成更快或者很慢
a1.sinkgroups.g1.processor.maxpenalty = 10000
</pre>
](javascript:void(0); "復(fù)制代碼")
在H32創(chuàng)建Flume_Sink_Processors_avro配置文件
# vi /usr/local/flume170/conf/Flume_Sink_Processors_avro.conf
[](javascript:void(0); "復(fù)制代碼")
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">a1.sources = r1
a1.channels = c1
a1.sinks = k1 # Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5141
Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1</pre>
](javascript:void(0); "復(fù)制代碼")
將2個(gè)配置文件復(fù)制到H33上一份
/usr/local/flume170# scp -r /usr/local/flume170/conf/Flume_Sink_Processors.conf H33:/usr/local/flume170/conf/Flume_Sink_Processors.conf
/usr/local/flume170# scp -r /usr/local/flume170/conf/Flume_Sink_Processors_avro.conf H33:/usr/local/flume170/conf/Flume_Sink_Processors_avro.conf
打開4個(gè)窗口,在H32和H33上同時(shí)啟動(dòng)兩個(gè)flume agent
# /usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
# /usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
然后在H32或H33的任意一臺(tái)機(jī)器上笆制,測(cè)試產(chǎn)生log
# echo "idoall.org test1 failover" | nc H32 5140
因?yàn)镠33的優(yōu)先級(jí)高绅这,所以在H33的sink窗口,可以看到以下信息在辆,而H32沒有:
這時(shí)我們停止掉H33機(jī)器上的sink(ctrl+c)君躺,再次輸出測(cè)試數(shù)據(jù)
# echo "idoall.org test2 failover" | nc localhost 5140
可以在H32的sink窗口,看到讀取到了剛才發(fā)送的兩條測(cè)試數(shù)據(jù):
我們?cè)僭贖33的sink窗口中开缎,啟動(dòng)sink:
# /usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
輸入兩批測(cè)試數(shù)據(jù):
# echo "idoall.org test3 failover" | nc localhost 5140 && echo "idoall.org test4 failover" | nc localhost 5140
在H33的sink窗口棕叫,我們可以看到以下信息,因?yàn)閮?yōu)先級(jí)的關(guān)系奕删,log消息會(huì)再次落到H33上:
Load balancing Sink Processor
load balance type和failover不同的地方是俺泣,load balance有兩個(gè)配置,一個(gè)是輪詢完残,一個(gè)是隨機(jī)伏钠。兩種情況下如果被選擇的sink不可用,就會(huì)自動(dòng)嘗試發(fā)送到下一個(gè)可用的sink上面谨设。
在H32創(chuàng)建Load_balancing_Sink_Processors配置文件
# vi /usr/local/flume170/conf/Load_balancing_Sink_Processors.conf
](javascript:void(0); "復(fù)制代碼")
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 # Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = H32
a1.sinks.k1.port = 5141 a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = H33
a1.sinks.k2.port = 5141
這個(gè)是配置failover的關(guān)鍵熟掂,需要有一個(gè)sink group
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2 # 處理的類型是load_balance
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin </pre>
](javascript:void(0); "復(fù)制代碼")
在H32創(chuàng)建Load_balancing_Sink_Processors_avro配置文件
# vi /usr/local/flume170/conf/Load_balancing_Sink_Processors_avro.conf
[](javascript:void(0); "復(fù)制代碼")
<pre style="margin: 0px; padding: 0px; white-space: pre-wrap; word-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">a1.sources = r1
a1.channels = c1
a1.sinks = k1 # Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5141
Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1</pre>
](javascript:void(0); "復(fù)制代碼")
將2個(gè)配置文件復(fù)制到H33上一份
/usr/local/flume170# scp -r /usr/local/flume170/conf/Load_balancing_Sink_Processors.conf H33:/usr/local/flume170/conf/Load_balancing_Sink_Processors.conf
/usr/local/flume170# scp -r /usr/local/flume170/conf/Load_balancing_Sink_Processors_avro.conf H33:/usr/local/flume170/conf/Load_balancing_Sink_Processors_avro.conf
打開4個(gè)窗口,在H32和H33上同時(shí)啟動(dòng)兩個(gè)flume agent
/usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
/usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
然后在H32或H33的任意一臺(tái)機(jī)器上扎拣,測(cè)試產(chǎn)生log赴肚,一行一行輸入素跺,輸入太快,容易落到一臺(tái)機(jī)器上
echo "idoall.org test1" | nc H32 5140
echo "idoall.org test2" | nc H32 5140
echo "idoall.org test3" | nc H32 5140
echo "idoall.org test4" | nc H32 5140
在H32的sink窗口誉券,可以看到以下信息
1. 14/08/10 15:35:29 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }
2. 14/08/10 15:35:33 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }
在H33的sink窗口指厌,可以看到以下信息:
1. 14/08/10 15:35:27 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }
2. 14/08/10 15:35:29 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 }
說明輪詢模式起到了作用。
以上均是建立在H32和H33能互通踊跟,且Flume配置都正確的情況下運(yùn)行踩验,且都是非常簡單的場(chǎng)景應(yīng)用,值得注意的一點(diǎn)是Flume說是日志收集商玫,其實(shí)還可以廣泛的認(rèn)為“日志”可以當(dāng)作是信息流箕憾,不局限于認(rèn)知的日志。