Flume 入門

1Flume概述

1.1 定義

FlumeCloudera提供的一個(gè)高可用的祖凫,高可靠的型豁,分布式的海量日志采集、聚合和傳輸?shù)南到y(tǒng)互纯;

Flume基于流式架構(gòu)瑟幕,靈活簡(jiǎn)單。

1.2 特點(diǎn)

可以和任意存儲(chǔ)進(jìn)程集成

輸入的的數(shù)據(jù)速率大于寫入目的存儲(chǔ)的速率留潦,Flume會(huì)進(jìn)行緩沖只盹,減小HDFS的壓力

Flume中的事務(wù)基于Channel,使用了兩個(gè)事務(wù)模型(sender+ receiver)愤兵,確保消息被可靠發(fā)送

Flume使用兩個(gè)獨(dú)立的事務(wù)分別負(fù)責(zé)從SoucrceChannel鹿霸,以及從ChannelSink 的事件傳遞。一旦事務(wù)中所有的數(shù)據(jù)全部成功提交到Channel秆乳,那么Source才認(rèn)為該數(shù)據(jù)讀取完成懦鼠,同理,只有成功被Sink寫出去的數(shù)據(jù)屹堰,才會(huì)從Channel中移除

1.3 組成架構(gòu)

image

1.3.1Agent

Agent是一個(gè)JVM進(jìn)程肛冶,它以事件的形式將數(shù)據(jù)從源頭傳遞到目的地

Agent主要由SourceChannel扯键、Sink組成

1.3.2Source

Source是負(fù)責(zé)接收數(shù)據(jù)到Agent的組件睦袖,可以處理各種類型,包括avro荣刑、thrift馅笙、execjms厉亏、spooling directory董习、netcatsequence generator爱只、syslog皿淋、httplegacy

1.3.3Channel

Channel是位于SourceSink之間的緩沖區(qū),因此窝趣,Channel允許SourceSink運(yùn)作在不同的速率上疯暑,Channel是線程安全的,可以同時(shí)處理幾個(gè)Source的寫入操作和幾個(gè)Sink的讀取操作哑舒。

Flume自帶兩種Channel

Memory Channel:內(nèi)存中的隊(duì)列速度快妇拯,適合在不需要關(guān)系數(shù)據(jù)丟失的情境下使用

File Channel:將所有事件寫入磁盤,因此在程序關(guān)閉或機(jī)器宕機(jī)的情況下不會(huì)丟失數(shù)據(jù)

1.3.4Sink

Sink不斷地輪詢Channel中的事件且批量地移除它們散址,并將這些事件批量寫入到存儲(chǔ)或索引系統(tǒng)乖阵、或者被發(fā)送到另一個(gè)Flume Agent

Sink是完全事務(wù)性的预麸,在從Channel批量刪除數(shù)據(jù)之前,每個(gè)SinkChannel啟動(dòng)一個(gè)事務(wù)儒将,批量事件一旦成功寫出到存儲(chǔ)系統(tǒng)或下一個(gè)Flume Agent吏祸,Sink就利用Channel提交事務(wù),事務(wù)一旦被提交钩蚊,該Channel從自己的內(nèi)部緩沖區(qū)刪除事件贡翘。

Sink組件目的地包括hdfslogger砰逻、avro鸣驱、thriftipc蝠咆、file踊东、nullHBase刚操、solr闸翅、自定義。

1.3.5Event

傳輸單元菊霜,Flume數(shù)據(jù)傳輸?shù)幕締卧峒剑允录男问綄?shù)據(jù)從源頭送至目的地。

Event由可選的header和載有數(shù)據(jù)的一個(gè)byte array構(gòu)成鉴逞,Header是容納了key-value字符串對(duì)的HashMap记某。

通常一條數(shù)據(jù)就是一個(gè) Event,每2048個(gè)字節(jié)劃分一個(gè)Event构捡。

1.4 拓?fù)浣Y(jié)構(gòu)

image

這種模式是將多個(gè)Flume給順序連接起來(lái)了液南,從最初的Source開始到最終Sink傳送的目的存儲(chǔ)系統(tǒng),此模式不建議橋接過(guò)多的Flume數(shù)量叭喜, Flume數(shù)量過(guò)多不僅會(huì)影響傳輸速率贺拣,而且一旦傳輸過(guò)程中某個(gè)節(jié)點(diǎn)Flume宕機(jī),會(huì)影響整個(gè)傳輸系統(tǒng)。

image

Flum支持將事件流向一個(gè)或者多個(gè)目的地譬涡,這種模式將數(shù)據(jù)源復(fù)制到多個(gè)Channel中闪幽,每個(gè)Channel都有相同的數(shù)據(jù),Sink可以選擇傳送的不同的目的地涡匀。

image

Flume支持使用將多個(gè)Sink邏輯上分到一個(gè)Sink組盯腌,Flume將數(shù)據(jù)發(fā)送到不同的Sink,主要解決負(fù)載均衡和故障轉(zhuǎn)移問題陨瘩。

image

這種模式是我們最常見的腕够,也非常實(shí)用,日常web應(yīng)用通常分布在上百個(gè)服務(wù)器舌劳,大者甚至上千個(gè)帚湘、上萬(wàn)個(gè)服務(wù)器,產(chǎn)生的日志甚淡,處理起來(lái)也非常麻煩大诸,用Flume的這種組合方式能很好的解決這一問題,每臺(tái)服務(wù)器部署一個(gè)Flume采集日志贯卦,傳送到一個(gè)集中收集日志的Flume资柔,再由此Flume上傳到 hdfshive撵割、hbase贿堰、jms等進(jìn)行日志分析。

1.5Agent原理

image

2Flume部署

1啡彬、解壓apache-flume-1.7.0-bin.tar.gz/opt/module目錄下

2羹与、修改apache-flume-1.7.0-bi的名稱為flume

3、將flume/conf下的flume-env.sh.template文件修改為flume-env.sh外遇,并配置flume-env.sh中的JAVA_HOME

3 企業(yè)開發(fā)案例

3.1 監(jiān)控端口數(shù)據(jù)

需求分析:

服務(wù)端監(jiān)聽本機(jī)44444端口

服務(wù)端使用netcat工具向44444端口發(fā)送消息

最后將數(shù)據(jù)展示在控制臺(tái)上

實(shí)現(xiàn)步驟:

1注簿、在job文件夾下創(chuàng)建Agent配置文件flume-netcat-logger.conf

[djm@hadoop102 job]$ vim flume-netcat-logger.conf

2、添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3跳仿、啟動(dòng)任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger==INFO,console

參數(shù)說(shuō)明:

--conf conf/表示配置文件存儲(chǔ)在conf/目錄

--name a1表示給 Agent 起名為a1

--conf-file job/flume-netcat.conf Flume本次啟動(dòng)讀取的配置文件是在job文件夾下的 flume-telnet.conf文件

-Dflume.root.logger==INFO,console -D表示Flume運(yùn)行時(shí)動(dòng)態(tài)修改flume.root.logger參數(shù)屬性值诡渴,并將控制臺(tái)日志打印級(jí)別設(shè)置為INFO級(jí)別

3.2 實(shí)時(shí)讀取本地文件到HDFS

需求分析:

實(shí)時(shí)監(jiān)控Hive日志,并上傳到HDFS

實(shí)現(xiàn)步驟:

1菲语、在job文件夾下創(chuàng)建Agent配置文件flume-file-hdfs.conf

[djm@hadoop102 job]$ vim flume-file-hdfs.conf

2妄辩、添加如下內(nèi)容:

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k2.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設(shè)置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#設(shè)置每個(gè)文件的滾動(dòng)大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

3山上、啟動(dòng)任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 –f job/flume-file-hdfs.conf

注意:

要想讀取Linux系統(tǒng)中的文件眼耀,就得按照Linux命令的規(guī)則執(zhí)行命令,由于Hive日志在Linux系統(tǒng)中所以讀取文件的類型選擇:execexecute執(zhí)行的意思佩憾。表示執(zhí)行Linux命令來(lái)讀取文件哮伟。

3.3 實(shí)時(shí)讀取目錄文件到 HDFS

需求分析:

使用Flume監(jiān)聽整個(gè)目錄的文件

實(shí)現(xiàn)步驟:

1干花、在job文件夾下創(chuàng)建Agent配置文件flume-dir-hdfs.conf

[djm@hadoop102 job]$ vim flume-dir-hdfs.conf

2、添加如下內(nèi)容:

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結(jié)尾的文件楞黄,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時(shí)間滾動(dòng)文件夾
a3.sinks.k3.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時(shí)間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設(shè)置文件類型池凄,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

3、啟動(dòng)任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 –f job/flume-dir-hdfs.conf

注意:

不要在監(jiān)控目錄中創(chuàng)建并持續(xù)修改文件

3.4 單數(shù)據(jù)源多出口案例(選擇器)

需求分析:

使用Flume-1監(jiān)控文件變動(dòng)鬼廓,Flume-1將變動(dòng)內(nèi)容傳遞給Flume-2

Flume-2負(fù)責(zé)存儲(chǔ)到HDFS

同時(shí)Flume-1將變動(dòng)內(nèi)容傳遞給Flume-3肿仑,Flume-3負(fù)責(zé)輸出到Local FileSystem

1、在group1文件夾下創(chuàng)建Agent配置文件flume-file-flume.conf

[djm@hadoop102 group1]$ vim flume-file-flume.conf

2碎税、添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數(shù)據(jù)流復(fù)制給所有channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一個(gè)數(shù)據(jù)發(fā)送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102 
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

3尤慰、在group1文件夾下創(chuàng)建Agent配置文件flume-flume-hdfs.conf

[djm@hadoop102 group1]$ vim flume-flume-hdfs.conf

4、添加如下內(nèi)容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source端的avro是一個(gè)數(shù)據(jù)接收服務(wù)
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k1.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型雷蹂,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

5伟端、在group1文件夾下創(chuàng)建 Agent 配置文件flume-flume-dir.conf

[djm@hadoop102 group1]$ vim flume-flume-dir.conf

6、添加如下內(nèi)容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

7匪煌、啟動(dòng)任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf

注意:

Avro是一種語(yǔ)言無(wú)關(guān)的數(shù)據(jù)序列化和RPC框架

輸出的本地目錄必須是已經(jīng)存在的目錄荔泳,如果該目錄不存在,并不會(huì)創(chuàng)建新的目錄

必須先啟動(dòng)Sink存在的job

3.5 單數(shù)據(jù)源多出口案例(Sink組)

需求分析:

使用Flume-1監(jiān)控端口數(shù)據(jù)虐杯,Flume-1將變動(dòng)內(nèi)容傳遞給Flume-2

Flume-2負(fù)責(zé)將數(shù)據(jù)展示在控制臺(tái)上

同時(shí)Flume-1將變動(dòng)內(nèi)容傳遞給Flume-3Flume-3也負(fù)責(zé)將數(shù)據(jù)展示在控制臺(tái)上

實(shí)現(xiàn)步驟:

1昧港、在group2文件夾下創(chuàng)建Agent配置文件flume-netcat-flume.conf

2擎椰、添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

3、在group2文件夾下創(chuàng)建Agent配置文件flume-flume-console1.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

5创肥、在 group2文件夾下創(chuàng)建Agent配置文件flume-flume-console2.conf

6达舒、添加如下內(nèi)容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

7、啟動(dòng)任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf

3.6 多數(shù)據(jù)源匯總

需求分析:

hadoop103上的Flume-1監(jiān)控文件/opt/module/group.log

hadoop102上的Flume-2監(jiān)控某一個(gè)端口的數(shù)據(jù)流

Flume-1Flume-2將數(shù)據(jù)發(fā)送給hadoop104上的Flume-3叹侄,Flume-3將最終數(shù)據(jù)打印到控制臺(tái)

實(shí)現(xiàn)步驟:

1巩搏、在group3文件夾下創(chuàng)建Agent配置文件flume1-logger-flume.conf

[djm@hadoop102 group3]$ vim flume1-logger-flume.conf 

2、添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3趾代、在group3文件夾下創(chuàng)建Agent配置文件flume2-netcat-flume.conf

[djm@hadoop102 group3]$ vim flume2-netcat-flume.conf

4贯底、添加如下內(nèi)容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

5、在group3文件夾下創(chuàng)建Agent配置文件flume3-flume-logger.conf

[djm@hadoop102 group3]$ vim flume3-flume-logger.conf

6撒强、添加如下內(nèi)容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141

# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

7禽捆、分發(fā)配置文件

[djm@hadoop102 group3]$ xsync /opt/module/flume/job

8、啟動(dòng)任務(wù)

[djm@hadoop104 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf
[djm@hadoop103 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf

4Ganglia部署

1飘哨、安裝httpd服務(wù)與php

yum -y install httpd php

2胚想、安裝其他依賴

yum -y install rrdtool perl-rrdtool rrdtool-devel

3、安裝ganglia

rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
yum -y install ganglia-gmetad ganglia-gmond ganglia-web 

4芽隆、修改ganglia配置文件

vim /etc/httpd/conf.d/ganglia.conf

#
# Ganglia monitoring system php web frontend
#

Alias /ganglia /usr/share/ganglia

<Location /ganglia>
  # Require local
  Require all granted
  # Require ip 10.1.2.3
  # Require host example.org
</Location> 

特別注意:以下配置是不能起作用的

<Location /ganglia>
  Order deny,allow
  Allow from all
</Location> 

5浊服、修改gmetad配置文件

vim /etc/ganglia/gmetad.conf 

data_source "hadoop102" 192.168.1.102

6统屈、修改gmond配置文件

vim /etc/ganglia/gmond.conf 

cluster {
  #name = "unspecified"
  name = "hadoop102"
  owner = "unspecified"
  latlong = "unspecified"
  url = "unspecified"
}

udp_send_channel { 
#bind_hostname = yes # Highly recommended, soon to be default. 
# This option tells gmond to use a source address
# that resolves to the machine's hostname. Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
  #mcast_join = 239.2.11.71
  host = 192.168.10.102
  port = 8649
  ttl = 1
}

/* You can specify as many udp_recv_channels as you like as well. */
udp_recv_channel {
  #mcast_join = 239.2.11.71
  port = 8649
  #bind = 239.2.11.71
  bind = 192.168.10.102
  retry_bind = true 

# Size of the UDP buffer. If you are handling lots of metrics you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
} 

6、查看SELinux狀態(tài)

sestatus

如果不是disabled牙躺,需修改以下配置文件:

vim /etc/selinux/config

或者臨時(shí)關(guān)閉SELinux

setenforce 0

7愁憔、啟動(dòng)ganglia

systemctl start httpd
systemctl start gmetad 
systemctl start gmond

8、打開瀏覽器訪問

http://hadoop102/ganglia/

如果完成以上操作仍出現(xiàn)權(quán)限不足錯(cuò)誤述呐,可修改/var/lib/ganglia目錄的權(quán)限嘗試

chmod -R 777 /var/lib/ganglia

5 自定義Source

需求分析:

image

編碼實(shí)現(xiàn):

1惩淳、引入依賴

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.7.0</version>
</dependency>

2、代碼編寫

package com.djm.flume;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;

public class MySource extends AbstractSource implements Configurable, PollableSource {

    //定義配置文件將來(lái)要讀取的字段
    private Long delay;

    private String field;

    /**
     * 接收數(shù)據(jù)乓搬,將數(shù)據(jù)封裝成一個(gè)個(gè)event思犁,寫入channel
     * @return
     * @throws EventDeliveryException
     */
    public Status process() throws EventDeliveryException {
        HashMap<String, String> hearderMap  = new HashMap<>();
        SimpleEvent event = new SimpleEvent();
            try {
                for (int i = 0; i < 5; i++) {
                    event.setHeaders(hearderMap);
                    event.setBody((field + i).getBytes());
                    getChannelProcessor().processEvent(event);
                    Thread.sleep(delay);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                return Status.BACKOFF;
            }
            return Status.READY;
    }

    public long getBackOffSleepIncrement() {
        return 0;
    }

    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    /**
     * 讀取配置文件
     * @param context
     */
    public void configure(Context context) {
        delay = context.getLong("delay");
        field = context.getString("field", "hello");
    }
}

3、打包測(cè)試

利用Maven打包并上傳到 /opt/module/flume/lib目錄下

job文件夾下創(chuàng)建Agent配置文件mysource.conf

[djm@hadoop102 job]$ vim mysource.conf

添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.djm.flume.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = djm

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟動(dòng)任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console

6 自定義Sink

需求分析:

image

編碼實(shí)現(xiàn):

1进肯、引入依賴

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.7.0</version>
</dependency>

2激蹲、代碼編寫

package com.djm.flume;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {

    private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);

    private String prefix;
    private String suffix;

    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;

        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        try {
            Event event;
            transaction.begin();
            while ((event = channel.take()) == null) {
                Thread.sleep(200);
            }
            LOG.info(prefix + new String(event.getBody()) + suffix);
            transaction.commit();
            status = Status.READY;
        } catch (Throwable e) {
            transaction.rollback();
            status = Status.BACKOFF;
            if (e instanceof Error)
                throw (Error) e;
        } finally {
            transaction.close();
        }
        return status;
    }

    @Override
    public void configure(Context context) {
        prefix = context.getString("prefix");
        suffix = context.getString("suffix");
    }
}

3、打包測(cè)試

利用Maven打包并上傳到 /opt/module/flume/lib目錄下

job文件夾下創(chuàng)建Agent配置文件mysource.conf

[djm@hadoop102 job]$ vim mysink.conf

添加如下內(nèi)容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = com.djm.flume.MySink
a1.sinks.k1.prefix = djm:
a1.sinks.k1.suffix = :end

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟動(dòng)任務(wù)

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console

7Flume參數(shù)調(diào)優(yōu)

7.1Source

增加Source個(gè)數(shù)可以增大Source的讀取數(shù)據(jù)的能力江掩,例如:當(dāng)某一個(gè)目錄產(chǎn)生的文件過(guò)多時(shí)需要將這個(gè)文件目錄拆分成多個(gè)文件目錄学辱,同時(shí)配置好多個(gè)Source以保證Source有足夠的能力獲取到新產(chǎn)生的數(shù)據(jù)。

batchSize參數(shù)決定Source一次批量運(yùn)輸?shù)?code>Channel的Event條數(shù)环形,適當(dāng)調(diào)大這個(gè)參數(shù)可以提高Source搬運(yùn)EventChannel時(shí)的性能策泣。

7.2Channel

Type選擇Memory Channel時(shí)Channel的性能最好,但是如果Flume進(jìn)程意外掛掉可能會(huì)丟失數(shù)據(jù)

Type選擇File Channel時(shí)Channel的容錯(cuò)性更好抬吟,但是性能上會(huì)比Memory Channel差萨咕,使用File Channel時(shí)`dataDirs 配置多個(gè)不同盤下的目錄可以提高性能。

Capacity參數(shù)決定Channel可容納最大的Event條數(shù)火本,TransactionCapacity 參數(shù)決定每次SourceChannel里面寫的最大Event條數(shù)和每次SinkChannel里面讀的最大Event條數(shù)危队,TransactionCapacity需要大于SourceSinkbatchSize參數(shù)。

7.3Sink

增加Sink的個(gè)數(shù)可以增加Sink消費(fèi)Event的能力钙畔,Sink也不是越多越好夠用就行茫陆,過(guò)多的Sink會(huì)占用系統(tǒng)資源,造成系統(tǒng)資源不必要的浪費(fèi)擎析。

batchSize參數(shù)決定Sink一次批量從Channel讀取的Event條數(shù)簿盅,適當(dāng)調(diào)大這個(gè)參數(shù)可以提高SinkChannel搬出Event的性能。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末叔锐,一起剝皮案震驚了整個(gè)濱河市挪鹏,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌愉烙,老刑警劉巖讨盒,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異步责,居然都是意外死亡返顺,警方通過(guò)查閱死者的電腦和手機(jī)禀苦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)遂鹊,“玉大人振乏,你說(shuō)我怎么就攤上這事”耍” “怎么了慧邮?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)舟陆。 經(jīng)常有香客問我误澳,道長(zhǎng),這世上最難降的妖魔是什么秦躯? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任忆谓,我火速辦了婚禮,結(jié)果婚禮上踱承,老公的妹妹穿的比我還像新娘倡缠。我一直安慰自己,他們只是感情好茎活,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布昙沦。 她就那樣靜靜地躺著,像睡著了一般载荔。 火紅的嫁衣襯著肌膚如雪桅滋。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天身辨,我揣著相機(jī)與錄音,去河邊找鬼芍碧。 笑死煌珊,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的泌豆。 我是一名探鬼主播定庵,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼踪危!你這毒婦竟也來(lái)了蔬浙?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤贞远,失蹤者是張志新(化名)和其女友劉穎畴博,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蓝仲,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡俱病,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年官疲,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片亮隙。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡途凫,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出溢吻,到底是詐尸還是另有隱情维费,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布促王,位于F島的核電站犀盟,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏硼砰。R本人自食惡果不足惜且蓬,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望题翰。 院中可真熱鬧恶阴,春花似錦、人聲如沸豹障。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)血公。三九已至昵仅,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間累魔,已是汗流浹背摔笤。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留垦写,地道東北人吕世。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像梯投,于是被迫代替她去往敵國(guó)和親命辖。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • 1. Flume 介紹 1.1. 概述 Flume是一個(gè)分布式分蓖、可靠尔艇、和高可用的海量日志采集、聚合和傳輸?shù)南到y(tǒng)么鹤。 ...
    你值得擁有更好的12138閱讀 10,086評(píng)論 1 0
  • 概要 Apache Flume 是一個(gè)分布式终娃,可靠且可用的系統(tǒng),用于有效地從許多不同的源收集蒸甜、聚合和移動(dòng)大量日志數(shù)...
    Kooola大數(shù)據(jù)閱讀 4,522評(píng)論 0 6
  • 一尝抖、安裝 1毡们、必備條件 因?yàn)橐讶罩旧蟼鞯絟dfs上,所以需要以下hadoop依賴包: commons-confi...
    Bottle丶Fish閱讀 345評(píng)論 0 1
  • flume的誕生背景 現(xiàn)在大數(shù)據(jù)昧辽、數(shù)據(jù)分析在企業(yè)中的應(yīng)用越來(lái)越廣泛衙熔,大數(shù)據(jù)的一個(gè)主要應(yīng)用場(chǎng)景是對(duì)一些日志進(jìn)行分析,...
    南山軍少閱讀 760評(píng)論 0 0
  • flume簡(jiǎn)單介紹 官網(wǎng)的一句話:Flume is a distributed, reliable, and av...
    義焃閱讀 850評(píng)論 0 2