Flume概述
Flume在大數(shù)據(jù)中扮演著數(shù)據(jù)收集的角色你踩,收集到數(shù)據(jù)以后在通過計(jì)算框架進(jìn)行處理熄浓。Flume是Cloudera提供的一個(gè)高可用的,高可靠的,分布式的海量日志采集辛臊、聚合和傳輸?shù)南到y(tǒng),F(xiàn)lume支持在日志系統(tǒng)中定制各類數(shù)據(jù)發(fā)送方房交,用于收集數(shù)據(jù)彻舰;同時(shí),F(xiàn)lume提供對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單處理候味,并寫到各種數(shù)據(jù)接受方(可定制)的能力刃唤。
Flume架構(gòu)和核心組件
Event的概念
在這里有必要先介紹一下flume中event的相關(guān)概念:flume的核心是把數(shù)據(jù)從數(shù)據(jù)源(source)收集過來,在將收集到的數(shù)據(jù)送到指定的目的地(sink)白群。為了保證輸送的過程一定成功尚胞,在送到目的地(sink)之前,會(huì)先緩存數(shù)據(jù)(channel),待數(shù)據(jù)真正到達(dá)目的地(sink)后帜慢,flume在刪除自己緩存的數(shù)據(jù)笼裳。在整個(gè)數(shù)據(jù)的傳輸?shù)倪^程中,流動(dòng)的是event粱玲,即事務(wù)保證是在event級(jí)別進(jìn)行的躬柬。那么什么是event呢?—–event將傳輸?shù)臄?shù)據(jù)進(jìn)行封裝抽减,是flume傳輸數(shù)據(jù)的基本單位允青,如果是文本文件,通常是一行記錄卵沉,event也是事務(wù)的基本單位颠锉。event從source,流向channel史汗,再到sink琼掠,本身為一個(gè)字節(jié)數(shù)組,并可攜帶headers(頭信息)信息停撞。event代表著一個(gè)數(shù)據(jù)的最小完整單元瓷蛙,從外部數(shù)據(jù)源來,向外部的目的地去。
Flume架構(gòu)
flume之所以這么神奇速挑,是源于它自身的一個(gè)設(shè)計(jì),這個(gè)設(shè)計(jì)就是agent副硅,agent本身是一個(gè)java進(jìn)程姥宝,運(yùn)行在日志收集節(jié)點(diǎn)—所謂日志收集節(jié)點(diǎn)就是服務(wù)器節(jié)點(diǎn)。
agent里面包含3個(gè)核心的組件:source—->channel—–>sink,類似生產(chǎn)者恐疲、倉(cāng)庫(kù)腊满、消費(fèi)者的架構(gòu)。
- source:source組件是專門用來收集數(shù)據(jù)的培己,可以處理各種類型碳蛋、各種格式的日志數(shù)據(jù),包括avro、thrift省咨、exec肃弟、jms、spooling directory零蓉、netcat笤受、sequence generator、syslog敌蜂、http箩兽、legacy、自定義章喉。
- channel:source組件把數(shù)據(jù)收集來以后汗贫,臨時(shí)存放在channel中,即channel組件在agent中是專門用來存放臨時(shí)數(shù)據(jù)的——對(duì)采集到的數(shù)據(jù)進(jìn)行簡(jiǎn)單的緩存秸脱,可以存放在memory落包、jdbc、file等等撞反。
- sink:sink組件是用于把數(shù)據(jù)發(fā)送到目的地的組件妥色,目的地包括hdfs、logger遏片、avro嘹害、thrift、ipc吮便、file笔呀、null、hbase髓需、solr许师、自定義。
flume的運(yùn)行機(jī)制
flume的核心就是一個(gè)agent,這個(gè)agent對(duì)外有兩個(gè)進(jìn)行交互的地方微渠,一個(gè)是接受數(shù)據(jù)的輸入——source搭幻,一個(gè)是數(shù)據(jù)的輸出sink,sink負(fù)責(zé)將數(shù)據(jù)發(fā)送到外部指定的目的地逞盆。source接收到數(shù)據(jù)之后檀蹋,將數(shù)據(jù)發(fā)送給channel,chanel作為一個(gè)數(shù)據(jù)緩沖區(qū)會(huì)臨時(shí)存放這些數(shù)據(jù)云芦,隨后sink會(huì)將channel中的數(shù)據(jù)發(fā)送到指定的地方—-例如HDFS等俯逾,注意:只有在sink將channel中的數(shù)據(jù)成功發(fā)送出去之后,channel才會(huì)將臨時(shí)數(shù)據(jù)進(jìn)行刪除舅逸,這種機(jī)制保證了數(shù)據(jù)傳輸?shù)目煽啃耘c安全性桌肴。
flume的廣義用法
flume可以支持多級(jí)flume的agent,即flume可以前后相繼琉历,例如sink可以將數(shù)據(jù)寫到下一個(gè)agent的source中坠七,這樣的話就可以連成串了,可以整體處理了善已。flume還支持扇入(fan-in)灼捂、扇出(fan-out)。所謂扇入就是source可以接受多個(gè)輸入换团,所謂扇出就是sink可以將數(shù)據(jù)輸出多個(gè)目的地destination中悉稠。
值得注意的是,F(xiàn)lume提供了大量?jī)?nèi)置的Source艘包、Channel和Sink類型的猛。不同類型的Source,Channel和Sink可以自由組合。組合方式基于用戶設(shè)置的配置文件想虎,非常靈活卦尊。比如:Channel可以把事件暫存在內(nèi)存里,也可以持久化到本地硬盤上舌厨。Sink可以把日志寫入HDFS, HBase岂却,甚至是另外一個(gè)Source等等。Flume支持用戶建立多級(jí)流裙椭,也就是說躏哩,多個(gè)agent可以協(xié)同工作,并且支持Fan-in揉燃、Fan-out扫尺、Contextual Routing、Backup Routes炊汤。如下圖所示:
Flume環(huán)境搭建
前置條件
- Flume需要Java 1.7及以上(推薦1.8)正驻。
- 足夠的內(nèi)存和磁盤空間弊攘。
- 對(duì)Agent監(jiān)控目錄的讀寫權(quán)限。
搭建
- 下載flume-ng-1.6.0-cdh5.7.0.tar.gz(下載地址http://archive.cloudera.com/cdh5/cdh/5/
)姑曙,因?yàn)樵谇懊娲髷?shù)據(jù)入門章節(jié)中(http://www.reibang.com/p/10700514e3e0
)我們選擇的是cdh5.7.0版本襟交,所以現(xiàn)在也選擇這個(gè)版本。 - 上傳到服務(wù)器伤靠,并解壓婿着。
tar -zxvf flume-ng-1.6.0-cdh5.7.0.tar.gz -C ../apps/
- 配置環(huán)境變量。
vi ~/.bash_profile
//在文件中配置Flume的路徑醋界,根據(jù)自己安裝的路徑進(jìn)行修改
export FLUME_HOME=/root/apps/apache-flume-1.6.0-cdh5.7.0-bin
export PATH=$FLUME_HOME/bin:$PATH
//使配置文件生效
source ~/.bash_profile
- 在flume-env.sh中配置Java JDK的路徑。
cd $FLUME_HOME/conf
// 復(fù)制模板
cp flume-env.sh.template flume-env.sh
vi flume-env.sh
// 配置為安裝的Java目錄
export JAVA_HOME=/usr/jdk1.8.0_181
- 檢測(cè),在flume的bin目錄下執(zhí)行flume-ng version可查看版本提完。
cd $FLUME_HOME/bin
flume-ng version
出現(xiàn)以下內(nèi)容形纺,說明安裝成功
Flume實(shí)戰(zhàn)
使用flume的關(guān)鍵就是寫配置文件。主要是以下四步:
- 配置Source
- 配置Channel
- 配置Sink
- 把以上三個(gè)組件串起來徒欣。
需求1:從指定網(wǎng)絡(luò)端口采集數(shù)據(jù)輸出到控制臺(tái)
- 配置agent
#example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# a1: agent名稱 r1: source的名稱 k1:sink的名稱 c1:channel的名稱
# Describe/configure the source
a1.sources.r1.type = netcat #source的類型
a1.sources.r1.bind = localhost #source綁定的主機(jī)
a1.sources.r1.port = 44444 #source綁定的主機(jī)端口
# Describe the sink
a1.sinks.k1.type = logger #sink的類型
# Use a channel which buffers events in memory
a1.channels.c1.type = memory #channel的類型
a1.channels.c1.capacity = 1000 #通道中存儲(chǔ)的最大event數(shù)
a1.channels.c1.transactionCapacity = 100 # 通道從源或提供給接收器的最大event數(shù)
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 #把source和channel做關(guān)聯(lián)逐样,其中屬性是channels,說明sources可以和多個(gè)channel做關(guān)聯(lián)打肝。
a1.sinks.k1.channel = c1 #sink和channel做關(guān)聯(lián)脂新,只能輸出到一個(gè)channel
- 在flume的conf目錄下新建example.conf文件(目錄和文件名可自定義,在后續(xù)啟動(dòng)agent時(shí)需要用到)粗梭。
vi example.conf
把配置好的agent配置復(fù)制到文件中
- 啟動(dòng)agent
flume-ng agent \
-- name a1 \ #指定agent的名稱争便,在上面配置中我們配置的是a1
-- conf $FLUME_HOME/conf \ # flume的配置目錄
-- conf-file $FLUME_HOME/conf/example.conf \ # agent配置的文件全路徑
-- Dflume.root.logger=INFO,console #日志級(jí)別和輸出形式
- 測(cè)試,可使用telnet到source關(guān)聯(lián)的主機(jī)断医,在對(duì)應(yīng)端口下輸入字符滞乙,在控制臺(tái)可以看到輸入的字符。
案例2:監(jiān)控一個(gè)文件實(shí)時(shí)采集新增的數(shù)據(jù)輸出到控制臺(tái)
- agent的選型:
exec source+memory channel +logger sink
agent配置:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec #source的類型
a1.sources.r1.command= tail -F /root/data/data.log #執(zhí)行的命令
a1.sources.r1.shell = /bin/sh -c
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 啟動(dòng)agent
flume-ng agent --name a1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-logger.conf -Dflume.root.logger=INFO,console
- 測(cè)試:往監(jiān)聽的文件中輸入數(shù)據(jù)鉴嗤,在控制臺(tái)會(huì)打印event數(shù)據(jù)
echo hello >> data.log
如圖可見Event = 可選的header + byte array
這里把數(shù)據(jù)輸出到控制臺(tái)沒有任何意義斩启,實(shí)際需求可能需要輸出到hdfs之上,只需要改agent配置醉锅,把sink的類型改為hdfs兔簇,然后指定hdfs的url和寫入的路徑。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec #source的類型
a1.sources.r1.command= tail -F /root/data/data.log #執(zhí)行的命令
a1.sources.r1.shell = /bin/sh -c
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.30.130:8020/root/flume/hive-logs/
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.batchSize = 10
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
需求三:從服務(wù)器A收集數(shù)據(jù)到服務(wù)器B
重點(diǎn):服務(wù)器A的Sink 類型是AVRO硬耍, 而 服務(wù)器 B的Source 是AVRO
流程:
- 機(jī)器A監(jiān)控一個(gè)文件垄琐,把日志記錄到data.log中
- avro sink把新產(chǎn)生的日志輸出到指定的hostname和port上
- 通過avro source對(duì)應(yīng)的agent將日志輸出到控制臺(tái)|Kafka|hdfs等
機(jī)器A配置
agentA.sources = r1
agentA.sinks = k1
agentA.channels = c1
# Describe/configure the source
agentA.sources.r1.type = exec
agentA.sources.r1.command= tail -F /root/data/data.log
agentA.sources.r1.shell = /bin/sh -c
# Describe the sink
agentA.sinks.k1.type = avro
agentA.sinks.k1.hostname = localhost
agentA.sinks.k1.port = 44444
# Use a channel which buffers events in memory
agentA.channels.c1.type = memory
# Bind the source and sink to the channel
agentA.sources.r1.channels = c1
agentA.sinks.k1.channel = c1
機(jī)器B配置
# Name the components on this agent
agentB.sources = avro-source
agentB.sinks = logger-sink
agentB.channels = memory-channel
# Describe/configure the source
agentB.sources.avro-source.type = avro
agentB.sources.avro-source.bind = localhost
agentB.sources.avro-source.port = 44444
# Describe the sink
agentB.sinks.logger-sink.type = logger
# Use a channel which buffers events in memory
agentB.channels.memory-channel.type = memory
# Bind the source and sink to the channel
agentB.sources.avro-source.channels = memory-channel
agentB.sinks.logger-sink.channel = memory-channel