Flume是一種分布式暂题,可靠且可用的服務(wù)啥纸,用于有效地收集裹唆,聚合和移動(dòng)大量日志數(shù)據(jù)誓斥。 它具有基于流數(shù)據(jù)流的簡(jiǎn)單靈活的架構(gòu)。 它具有可靠的可靠性機(jī)制和許多故障轉(zhuǎn)移和恢復(fù)機(jī)制许帐,具有強(qiáng)大的容錯(cuò)能力劳坑。 它使用簡(jiǎn)單的可擴(kuò)展數(shù)據(jù)模型,允許在線(xiàn)分析應(yīng)用程序成畦。本文講述如何使用Flume搜集Nginx的日志距芬,并給出了幾個(gè)使用示例
主要內(nèi)容:
- 1.運(yùn)行機(jī)制
- 2.部署Flume
相關(guān)文章:
1.CentOS7安裝Nginx
2.Flume之采集Nginx的日志
3.Flume之自定義Intercept
1.運(yùn)行機(jī)制
Flume 的核心是把數(shù)據(jù)從數(shù)據(jù)源(source)收集過(guò)來(lái),在將收集到的數(shù)據(jù)送到指定的目的地(sink)循帐。為了保證輸送的過(guò)程一定成功框仔,在送到目的地(sink)之前,會(huì)先緩存數(shù)據(jù)(channel),待數(shù)據(jù)真正到達(dá)目的地(sink)后拄养,flume 在刪除自己緩存的數(shù)據(jù)离斩。 Flume 分布式系統(tǒng)中核心的角色是 agent,agent 本身是一個(gè) Java 進(jìn)程瘪匿,一般運(yùn)行在日志收集節(jié)點(diǎn)跛梗。
flume 采集系統(tǒng)就是由一個(gè)個(gè) agent 所連接起來(lái)形成。
每一個(gè) agent 相當(dāng)于一個(gè)數(shù)據(jù)傳遞員棋弥,內(nèi)部有三個(gè)組件:
Source:采集源核偿,用于跟數(shù)據(jù)源對(duì)接,以獲取數(shù)據(jù)顽染;
Sink:下沉地漾岳,采集數(shù)據(jù)的傳送目的轰绵,用于往下一級(jí) agent 傳遞數(shù)據(jù)或者往最終存儲(chǔ)系統(tǒng)傳遞數(shù)據(jù);
Channel:agent 內(nèi)部的數(shù)據(jù)傳輸通道尼荆,用于從 source 將數(shù)據(jù)傳遞到 sink左腔;在整個(gè)數(shù)據(jù)的傳輸?shù)倪^(guò)程中,流動(dòng)的是 event耀找,它是 Flume 內(nèi)部數(shù)據(jù)傳輸?shù)淖罨締卧栌啤vent 將傳輸?shù)臄?shù)據(jù)進(jìn)行封裝。如果是文本文件野芒,通常是一行記錄蓄愁,event 也是事務(wù)的基本單位。event 從 source狞悲,流向 channel撮抓,再到 sink,本身為一個(gè)字節(jié)數(shù)組摇锋,并可攜帶headers(頭信息)信息丹拯。event 代表著一個(gè)數(shù)據(jù)的最小完整單元,從外部數(shù)據(jù)源來(lái)荸恕,向外部的目的地去乖酬。 一個(gè)完整的 event 包括:event headers、event body融求、event 信息咬像,其中event 信息就是 flume 收集到的日記記錄。
2.部署Flume
2.1.下載Flume
下載地址:傳輸門(mén)
2.2.部署Flume
2.2.1.解壓
將下載的.tar.gz解壓到任意目錄
2.2.2.配置環(huán)境變量
在Path后面加入解壓后的路徑
tar -zxvf apache-flume-1.8.0-bin.tar.gz -C .
2.2.3.實(shí)例1:采集Nginx的日志在控制臺(tái)顯示
在根目錄的conf下新建nginx-logger.conf文件生宛,文件內(nèi)容如下:
# 定義一個(gè)名為a1的agent中各組件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置 source 組件:r1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/soft/nginx-1.14.0/logs/access.log
# 描述和配置 sink 組件:k1
a1.sinks.k1.type = logger
# 描述和配置 channel 組件县昂,此處使用是內(nèi)存緩存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置 source、channel陷舅、sink 之間的連接關(guān)系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
這里我們定義了一個(gè)source監(jiān)聽(tīng)access.log文件的變化倒彰,并將采集到的日志文件打印在控制臺(tái)
2.2.4.啟動(dòng)Flume
./bin/flume-ng agent -n a1 -c conf -f conf/nginx-logger.conf -Dflume.root.logger=INFO,console
-a a1 指定 agent 的名字
-c conf 指定配置文件目錄
-f conf/nginx-logger.conf 指定配置文件
訪問(wèn)Nginx測(cè)試
2.2.4.實(shí)例2:采集到kafka
# 定義一個(gè)名為a1的agent中各組件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置 source 組件:r1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/soft/nginx-1.14.0/logs/access.log
# 描述和配置 sink 組件:k1
#設(shè)置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#設(shè)置Kafka的broker地址和端口號(hào)
a1.sinks.k1.brokerList=127.0.0.1:9092, 127.0.0.1:9092, 127.0.0.1:9092
#設(shè)置Kafka的Topic
a1.sinks.k1.topic=App
#設(shè)置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
# 描述和配置 channel 組件:c1,此處使用是內(nèi)存緩存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置 source莱睁、channel待讳、sink 之間的連接關(guān)系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.2.5.實(shí)例3:采集到HDFS
切分日志腳本
# /bin/bash
_prefix="/opt/soft/nginx-1.14.0"
time=`date +%Y%m%d%H`
mv ${_prefix}/logs/access.log ${_prefix}/logs/flume/access-${time}.log
kill -USR1 `cat ${_prefix}/logs/nginx.pid`
定時(shí)切分日志,每個(gè)小時(shí)的59分切分日志
# 編輯crontab文件
vi /etc/crontab
# 加入,每個(gè)小時(shí)的59分切分一次日志
59 * * * * root /opt/soft/nginx-1.14.0/log_spilt.sh
# 重啟cron服務(wù)
systemctl restart crond.service
nginx-hdfs.conf
# 定義這個(gè) agent 中各組件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置 source 組件:r1
##注意:不能往監(jiān)控目中重復(fù)丟同名文件
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/soft/nginx-1.14.0/logs/flume
a1.sources.r1.fileHeader = true
# 描述和配置 channel 組件仰剿,此處使用是內(nèi)存緩存的方式
a1.channels.c1.type = memory
# 默認(rèn)該通道中最大的可以存儲(chǔ)的 event 數(shù)量
a1.channels.c1.capacity = 1000
# 每次最大可以從 source 中拿到或者送到 sink 中的 event 數(shù)量
a1.channels.c1.transactionCapacity = 100
# 描述和配置 sink 組件:k1
a1.sinks.k1.channel = c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.2.111:9000/business/%Y-%m-%d/%H-%M
a1.sinks.k1.hdfs.filePrefix = logs
a1.sinks.k1.hdfs.inUsePrefix = .
# 默認(rèn)值:30; hdfs sink 間隔多長(zhǎng)將臨時(shí)文件滾動(dòng)成最終目標(biāo)文件耙箍,單位:秒; 如果設(shè)置成 0,則表示不根據(jù)時(shí)間來(lái)滾動(dòng)文件
a1.sinks.k1.hdfs.rollInterval = 0
# 默認(rèn)值:1024; 當(dāng)臨時(shí)文件達(dá)到該大兴肘伞(單位:bytes)時(shí),滾動(dòng)成目標(biāo)文件; 如果設(shè)置成 0阅酪,則表示不根據(jù)臨時(shí)文件大小來(lái)滾動(dòng)文件
a1.sinks.k1.hdfs.rollSize = 16777216
# 默認(rèn)值:10; 當(dāng) events 數(shù)據(jù)達(dá)到該數(shù)量時(shí)候旨袒,將臨時(shí)文件滾動(dòng)成目標(biāo)文件; 如果設(shè)置成 0汁针,則表示不根據(jù) events 數(shù)據(jù)來(lái)滾動(dòng)文件
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.batchSize = 1000
a1.sinks.k1.hdfs.writeFormat = text
# 生成的文件類(lèi)型,默認(rèn)是 Sequencefile砚尽,可用 DataStream施无,則為普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# 操作 hdfs 超時(shí)時(shí)間
a1.sinks.k1.callTimeout =10000
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 描述和配置 source channel sink 之間的連接關(guān)系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.2.6.實(shí)例4:flume-flume-kafka
flume1
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/soft/nginx-1.14.0/logs/access.log
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 0.0.0.0
a1.sinks.k1.port = 41414
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
flume2
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port =41414
# Describe the sink
#設(shè)置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#設(shè)置Kafka的broker地址和端口號(hào)
a1.sinks.k1.brokerList=127.0.0.1:9092, 127.0.0.1:9092, 127.0.0.1:9092
#設(shè)置Kafka的Topic
a1.sinks.k1.topic=App
#設(shè)置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100