1Flume
概述
1.1 定義
Flume
是Cloudera
提供的一個(gè)高可用的祖凫,高可靠的型豁,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng)互纯;
Flume
基于流式架構(gòu)瑟幕,靈活簡(jiǎn)單。
1.2 特點(diǎn)
可以和任意存儲(chǔ)進(jìn)程集成
輸入的的數(shù)據(jù)速率大于寫入目的存儲(chǔ)的速率留潦,Flume
會(huì)進(jìn)行緩沖只盹,減小HDFS
的壓力
Flume
中的事務(wù)基于Channel
,使用了兩個(gè)事務(wù)模型(sender
+ receiver
)愤兵,確保消息被可靠發(fā)送
Flume
使用兩個(gè)獨(dú)立的事務(wù)分別負(fù)責(zé)從Soucrce
到Channel
鹿霸,以及從Channel
到Sink
的事件傳遞。一旦事務(wù)中所有的數(shù)據(jù)全部成功提交到Channel
秆乳,那么Source
才認(rèn)為該數(shù)據(jù)讀取完成懦鼠,同理,只有成功被Sink
寫出去的數(shù)據(jù)屹堰,才會(huì)從Channel
中移除
1.3 組成架構(gòu)
1.3.1Agent
Agent
是一個(gè)JVM
進(jìn)程肛冶,它以事件的形式將數(shù)據(jù)從源頭傳遞到目的地
Agent
主要由Source
、Channel
扯键、Sink
組成
1.3.2Source
Source
是負(fù)責(zé)接收數(shù)據(jù)到Agent
的組件睦袖,可以處理各種類型,包括avro
荣刑、thrift
馅笙、exec
、jms
厉亏、spooling directory
董习、netcat
、sequence generator
爱只、syslog
皿淋、http
、legacy
1.3.3Channel
Channel
是位于Source
和Sink
之間的緩沖區(qū),因此窝趣,Channel
允許Source
和Sink
運(yùn)作在不同的速率上疯暑,Channel
是線程安全的,可以同時(shí)處理幾個(gè)Source
的寫入操作和幾個(gè)Sink
的讀取操作哑舒。
Flume
自帶兩種Channel
:
Memory Channel
:內(nèi)存中的隊(duì)列速度快妇拯,適合在不需要關(guān)系數(shù)據(jù)丟失的情境下使用
File Channel
:將所有事件寫入磁盤,因此在程序關(guān)閉或機(jī)器宕機(jī)的情況下不會(huì)丟失數(shù)據(jù)
1.3.4Sink
Sink
不斷地輪詢Channel
中的事件且批量地移除它們散址,并將這些事件批量寫入到存儲(chǔ)或索引系統(tǒng)乖阵、或者被發(fā)送到另一個(gè)Flume Agent
。
Sink
是完全事務(wù)性的预麸,在從Channel
批量刪除數(shù)據(jù)之前,每個(gè)Sink
用Channel
啟動(dòng)一個(gè)事務(wù)儒将,批量事件一旦成功寫出到存儲(chǔ)系統(tǒng)或下一個(gè)Flume Agent
吏祸,Sink
就利用Channel
提交事務(wù),事務(wù)一旦被提交钩蚊,該Channel
從自己的內(nèi)部緩沖區(qū)刪除事件贡翘。
Sink
組件目的地包括hdfs
、logger
砰逻、avro
鸣驱、thrift
、ipc
蝠咆、file
踊东、null
、HBase
刚操、solr
闸翅、自定義。
1.3.5Event
傳輸單元菊霜,Flume
數(shù)據(jù)傳輸?shù)幕締卧峒剑允录男问綄?shù)據(jù)從源頭送至目的地。
Event
由可選的header
和載有數(shù)據(jù)的一個(gè)byte array
構(gòu)成鉴逞,Header
是容納了key-value
字符串對(duì)的HashMap
记某。
通常一條數(shù)據(jù)就是一個(gè) Event
,每2048
個(gè)字節(jié)劃分一個(gè)Event
构捡。
1.4 拓?fù)浣Y(jié)構(gòu)
這種模式是將多個(gè)Flume
給順序連接起來(lái)了液南,從最初的Source
開始到最終Sink
傳送的目的存儲(chǔ)系統(tǒng),此模式不建議橋接過(guò)多的Flume
數(shù)量叭喜, Flume
數(shù)量過(guò)多不僅會(huì)影響傳輸速率贺拣,而且一旦傳輸過(guò)程中某個(gè)節(jié)點(diǎn)Flume
宕機(jī),會(huì)影響整個(gè)傳輸系統(tǒng)。
Flum
支持將事件流向一個(gè)或者多個(gè)目的地譬涡,這種模式將數(shù)據(jù)源復(fù)制到多個(gè)Channel
中闪幽,每個(gè)Channel
都有相同的數(shù)據(jù),Sink
可以選擇傳送的不同的目的地涡匀。
Flume
支持使用將多個(gè)Sink
邏輯上分到一個(gè)Sink
組盯腌,Flume
將數(shù)據(jù)發(fā)送到不同的Sink
,主要解決負(fù)載均衡和故障轉(zhuǎn)移問題陨瘩。
這種模式是我們最常見的腕够,也非常實(shí)用,日常web
應(yīng)用通常分布在上百個(gè)服務(wù)器舌劳,大者甚至上千個(gè)帚湘、上萬(wàn)個(gè)服務(wù)器,產(chǎn)生的日志甚淡,處理起來(lái)也非常麻煩大诸,用Flume
的這種組合方式能很好的解決這一問題,每臺(tái)服務(wù)器部署一個(gè)Flume
采集日志贯卦,傳送到一個(gè)集中收集日志的Flume
资柔,再由此Flume
上傳到 hdfs
、hive
撵割、hbase
贿堰、jms
等進(jìn)行日志分析。
1.5Agent
原理
2Flume
部署
1啡彬、解壓apache-flume-1.7.0-bin.tar.gz
到/opt/module
目錄下
2羹与、修改apache-flume-1.7.0-bi
的名稱為flume
3、將flume/conf
下的flume-env.sh.template
文件修改為flume-env.sh
外遇,并配置flume-env.sh
中的JAVA_HOME
3 企業(yè)開發(fā)案例
3.1 監(jiān)控端口數(shù)據(jù)
需求分析:
服務(wù)端監(jiān)聽本機(jī)44444
端口
服務(wù)端使用netcat
工具向44444
端口發(fā)送消息
最后將數(shù)據(jù)展示在控制臺(tái)上
實(shí)現(xiàn)步驟:
1注簿、在job
文件夾下創(chuàng)建Agent
配置文件flume-netcat-logger.conf
[djm@hadoop102 job]$ vim flume-netcat-logger.conf
2、添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3跳仿、啟動(dòng)任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger==INFO,console
參數(shù)說(shuō)明:
--conf conf/
表示配置文件存儲(chǔ)在conf/
目錄
--name a1
表示給 Agent 起名為a1
--conf-file job/flume-netcat.conf Flume
本次啟動(dòng)讀取的配置文件是在job
文件夾下的 flume-telnet.conf
文件
-Dflume.root.logger==INFO,console -D
表示Flume
運(yùn)行時(shí)動(dòng)態(tài)修改flume.root.logger
參數(shù)屬性值诡渴,并將控制臺(tái)日志打印級(jí)別設(shè)置為INFO
級(jí)別
3.2 實(shí)時(shí)讀取本地文件到HDFS
需求分析:
實(shí)時(shí)監(jiān)控Hive
日志,并上傳到HDFS
中
實(shí)現(xiàn)步驟:
1菲语、在job
文件夾下創(chuàng)建Agent
配置文件flume-file-hdfs.conf
[djm@hadoop102 job]$ vim flume-file-hdfs.conf
2妄辩、添加如下內(nèi)容:
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k2.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設(shè)置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#設(shè)置每個(gè)文件的滾動(dòng)大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
3山上、啟動(dòng)任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 –f job/flume-file-hdfs.conf
注意:
要想讀取Linux
系統(tǒng)中的文件眼耀,就得按照Linux
命令的規(guī)則執(zhí)行命令,由于Hive
日志在Linux
系統(tǒng)中所以讀取文件的類型選擇:exec
即execute
執(zhí)行的意思佩憾。表示執(zhí)行Linux
命令來(lái)讀取文件哮伟。
3.3 實(shí)時(shí)讀取目錄文件到 HDFS
需求分析:
使用Flume
監(jiān)聽整個(gè)目錄的文件
實(shí)現(xiàn)步驟:
1干花、在job
文件夾下創(chuàng)建Agent
配置文件flume-dir-hdfs.conf
[djm@hadoop102 job]$ vim flume-dir-hdfs.conf
2、添加如下內(nèi)容:
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結(jié)尾的文件楞黄,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時(shí)間滾動(dòng)文件夾
a3.sinks.k3.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時(shí)間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設(shè)置文件類型池凄,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
3、啟動(dòng)任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 –f job/flume-dir-hdfs.conf
注意:
不要在監(jiān)控目錄中創(chuàng)建并持續(xù)修改文件
3.4 單數(shù)據(jù)源多出口案例(選擇器)
需求分析:
使用Flume-1
監(jiān)控文件變動(dòng)鬼廓,Flume-1
將變動(dòng)內(nèi)容傳遞給Flume-2
Flume-2
負(fù)責(zé)存儲(chǔ)到HDFS
同時(shí)Flume-1
將變動(dòng)內(nèi)容傳遞給Flume-3
肿仑,Flume-3
負(fù)責(zé)輸出到Local FileSystem
1、在group1
文件夾下創(chuàng)建Agent
配置文件flume-file-flume.conf
[djm@hadoop102 group1]$ vim flume-file-flume.conf
2碎税、添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數(shù)據(jù)流復(fù)制給所有channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink端的avro是一個(gè)數(shù)據(jù)發(fā)送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
3尤慰、在group1
文件夾下創(chuàng)建Agent
配置文件flume-flume-hdfs.conf
[djm@hadoop102 group1]$ vim flume-flume-hdfs.conf
4、添加如下內(nèi)容:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source端的avro是一個(gè)數(shù)據(jù)接收服務(wù)
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k1.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型雷蹂,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
5伟端、在group1
文件夾下創(chuàng)建 Agent 配置文件flume-flume-dir.conf
[djm@hadoop102 group1]$ vim flume-flume-dir.conf
6、添加如下內(nèi)容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
7匪煌、啟動(dòng)任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf
注意:
Avro
是一種語(yǔ)言無(wú)關(guān)的數(shù)據(jù)序列化和RPC
框架
輸出的本地目錄必須是已經(jīng)存在的目錄荔泳,如果該目錄不存在,并不會(huì)創(chuàng)建新的目錄
必須先啟動(dòng)Sink
存在的job
3.5 單數(shù)據(jù)源多出口案例(Sink
組)
需求分析:
使用Flume-1
監(jiān)控端口數(shù)據(jù)虐杯,Flume-1
將變動(dòng)內(nèi)容傳遞給Flume-2
Flume-2
負(fù)責(zé)將數(shù)據(jù)展示在控制臺(tái)上
同時(shí)Flume-1
將變動(dòng)內(nèi)容傳遞給Flume-3
,Flume-3
也負(fù)責(zé)將數(shù)據(jù)展示在控制臺(tái)上
實(shí)現(xiàn)步驟:
1昧港、在group2
文件夾下創(chuàng)建Agent
配置文件flume-netcat-flume.conf
2擎椰、添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
3、在group2
文件夾下創(chuàng)建Agent
配置文件flume-flume-console1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
5创肥、在 group2
文件夾下創(chuàng)建Agent
配置文件flume-flume-console2.conf
6达舒、添加如下內(nèi)容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
7、啟動(dòng)任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf
3.6 多數(shù)據(jù)源匯總
需求分析:
hadoop103
上的Flume-1
監(jiān)控文件/opt/module/group.log
hadoop102
上的Flume-2
監(jiān)控某一個(gè)端口的數(shù)據(jù)流
Flume-1
與Flume-2
將數(shù)據(jù)發(fā)送給hadoop104
上的Flume-3
叹侄,Flume-3
將最終數(shù)據(jù)打印到控制臺(tái)
實(shí)現(xiàn)步驟:
1巩搏、在group3
文件夾下創(chuàng)建Agent
配置文件flume1-logger-flume.conf
[djm@hadoop102 group3]$ vim flume1-logger-flume.conf
2、添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3趾代、在group3
文件夾下創(chuàng)建Agent
配置文件flume2-netcat-flume.conf
[djm@hadoop102 group3]$ vim flume2-netcat-flume.conf
4贯底、添加如下內(nèi)容:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
5、在group3
文件夾下創(chuàng)建Agent
配置文件flume3-flume-logger.conf
[djm@hadoop102 group3]$ vim flume3-flume-logger.conf
6撒强、添加如下內(nèi)容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
7禽捆、分發(fā)配置文件
[djm@hadoop102 group3]$ xsync /opt/module/flume/job
8、啟動(dòng)任務(wù)
[djm@hadoop104 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf
[djm@hadoop103 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf
4Ganglia
部署
1飘哨、安裝httpd
服務(wù)與php
yum -y install httpd php
2胚想、安裝其他依賴
yum -y install rrdtool perl-rrdtool rrdtool-devel
3、安裝ganglia
rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
yum -y install ganglia-gmetad ganglia-gmond ganglia-web
4芽隆、修改ganglia
配置文件
vim /etc/httpd/conf.d/ganglia.conf
#
# Ganglia monitoring system php web frontend
#
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
# Require local
Require all granted
# Require ip 10.1.2.3
# Require host example.org
</Location>
特別注意:以下配置是不能起作用的
<Location /ganglia>
Order deny,allow
Allow from all
</Location>
5浊服、修改gmetad
配置文件
vim /etc/ganglia/gmetad.conf
data_source "hadoop102" 192.168.1.102
6统屈、修改gmond
配置文件
vim /etc/ganglia/gmond.conf
cluster {
#name = "unspecified"
name = "hadoop102"
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel {
#bind_hostname = yes # Highly recommended, soon to be default.
# This option tells gmond to use a source address
# that resolves to the machine's hostname. Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
#mcast_join = 239.2.11.71
host = 192.168.10.102
port = 8649
ttl = 1
}
/* You can specify as many udp_recv_channels as you like as well. */
udp_recv_channel {
#mcast_join = 239.2.11.71
port = 8649
#bind = 239.2.11.71
bind = 192.168.10.102
retry_bind = true
# Size of the UDP buffer. If you are handling lots of metrics you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
}
6、查看SELinux
狀態(tài)
sestatus
如果不是disabled
牙躺,需修改以下配置文件:
vim /etc/selinux/config
或者臨時(shí)關(guān)閉SELinux
:
setenforce 0
7愁憔、啟動(dòng)ganglia
systemctl start httpd
systemctl start gmetad
systemctl start gmond
8、打開瀏覽器訪問
http://hadoop102/ganglia/
如果完成以上操作仍出現(xiàn)權(quán)限不足錯(cuò)誤述呐,可修改/var/lib/ganglia
目錄的權(quán)限嘗試
chmod -R 777 /var/lib/ganglia
5 自定義Source
需求分析:
編碼實(shí)現(xiàn):
1惩淳、引入依賴
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
2、代碼編寫
package com.djm.flume;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
public class MySource extends AbstractSource implements Configurable, PollableSource {
//定義配置文件將來(lái)要讀取的字段
private Long delay;
private String field;
/**
* 接收數(shù)據(jù)乓搬,將數(shù)據(jù)封裝成一個(gè)個(gè)event思犁,寫入channel
* @return
* @throws EventDeliveryException
*/
public Status process() throws EventDeliveryException {
HashMap<String, String> hearderMap = new HashMap<>();
SimpleEvent event = new SimpleEvent();
try {
for (int i = 0; i < 5; i++) {
event.setHeaders(hearderMap);
event.setBody((field + i).getBytes());
getChannelProcessor().processEvent(event);
Thread.sleep(delay);
}
} catch (InterruptedException e) {
e.printStackTrace();
return Status.BACKOFF;
}
return Status.READY;
}
public long getBackOffSleepIncrement() {
return 0;
}
public long getMaxBackOffSleepInterval() {
return 0;
}
/**
* 讀取配置文件
* @param context
*/
public void configure(Context context) {
delay = context.getLong("delay");
field = context.getString("field", "hello");
}
}
3、打包測(cè)試
利用Maven
打包并上傳到 /opt/module/flume/lib
目錄下
在job
文件夾下創(chuàng)建Agent
配置文件mysource.conf
[djm@hadoop102 job]$ vim mysource.conf
添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.djm.flume.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = djm
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動(dòng)任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
6 自定義Sink
需求分析:
編碼實(shí)現(xiàn):
1进肯、引入依賴
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
2激蹲、代碼編寫
package com.djm.flume;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable {
private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
private String prefix;
private String suffix;
@Override
public Status process() throws EventDeliveryException {
Status status = null;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
try {
Event event;
transaction.begin();
while ((event = channel.take()) == null) {
Thread.sleep(200);
}
LOG.info(prefix + new String(event.getBody()) + suffix);
transaction.commit();
status = Status.READY;
} catch (Throwable e) {
transaction.rollback();
status = Status.BACKOFF;
if (e instanceof Error)
throw (Error) e;
} finally {
transaction.close();
}
return status;
}
@Override
public void configure(Context context) {
prefix = context.getString("prefix");
suffix = context.getString("suffix");
}
}
3、打包測(cè)試
利用Maven
打包并上傳到 /opt/module/flume/lib
目錄下
在job
文件夾下創(chuàng)建Agent
配置文件mysource.conf
[djm@hadoop102 job]$ vim mysink.conf
添加如下內(nèi)容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.djm.flume.MySink
a1.sinks.k1.prefix = djm:
a1.sinks.k1.suffix = :end
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動(dòng)任務(wù)
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
7Flume
參數(shù)調(diào)優(yōu)
7.1Source
增加Source
個(gè)數(shù)可以增大Source
的讀取數(shù)據(jù)的能力江掩,例如:當(dāng)某一個(gè)目錄產(chǎn)生的文件過(guò)多時(shí)需要將這個(gè)文件目錄拆分成多個(gè)文件目錄学辱,同時(shí)配置好多個(gè)Source
以保證Source
有足夠的能力獲取到新產(chǎn)生的數(shù)據(jù)。
batchSize
參數(shù)決定Source
一次批量運(yùn)輸?shù)?code>Channel的Event
條數(shù)环形,適當(dāng)調(diào)大這個(gè)參數(shù)可以提高Source
搬運(yùn)Event
到Channel
時(shí)的性能策泣。
7.2Channel
Type
選擇Memory Channel
時(shí)Channel
的性能最好,但是如果Flume
進(jìn)程意外掛掉可能會(huì)丟失數(shù)據(jù)
Type
選擇File Channel
時(shí)Channel
的容錯(cuò)性更好抬吟,但是性能上會(huì)比Memory Channel
差萨咕,使用File Channel
時(shí)`dataDirs 配置多個(gè)不同盤下的目錄可以提高性能。
Capacity
參數(shù)決定Channel
可容納最大的Event
條數(shù)火本,TransactionCapacity
參數(shù)決定每次Source
往Channel
里面寫的最大Event
條數(shù)和每次Sink
從Channel
里面讀的最大Event
條數(shù)危队,TransactionCapacity
需要大于Source
和Sink
的batchSize
參數(shù)。
7.3Sink
增加Sink
的個(gè)數(shù)可以增加Sink
消費(fèi)Event
的能力钙畔,Sink
也不是越多越好夠用就行茫陆,過(guò)多的Sink
會(huì)占用系統(tǒng)資源,造成系統(tǒng)資源不必要的浪費(fèi)擎析。
batchSize
參數(shù)決定Sink
一次批量從Channel
讀取的Event
條數(shù)簿盅,適當(dāng)調(diào)大這個(gè)參數(shù)可以提高Sink
從Channel
搬出Event
的性能。