Flume簡介
Flume 是一個分布式逛裤、可靠指孤、高可用的海量日志聚合系統(tǒng)启涯,支持在系統(tǒng)中定制各類數(shù)據(jù)發(fā)送 方, 用于收集數(shù)據(jù)恃轩,同時结洼,F(xiàn)lume 提供對數(shù)據(jù)的簡單處理,并寫到各種數(shù)據(jù)接收方的能力叉跛。
1补君、 Apache Flume 是一個分布式、可靠昧互、和高可用的海量日志采集挽铁、聚合和傳輸?shù)南到y(tǒng)伟桅,和 Sqoop 同 屬于數(shù)據(jù)采集系統(tǒng)組件,但是 Sqoop 用來采集關(guān)系型數(shù)據(jù)庫數(shù)據(jù)叽掘,而 Flume 用 來采集流動型數(shù)據(jù)楣铁。
2、 Flume 名字來源于原始的近乎實時的日志數(shù)據(jù)采集工具更扁,現(xiàn)在被廣泛用于任何流事件數(shù) 據(jù)的采集盖腕, 它支持從很多數(shù)據(jù)源聚合數(shù)據(jù)到 HDFS。
3浓镜、 一般的采集需求溃列,通過對 ?ume 的簡單配置即可實現(xiàn)。Flume 針對特殊場景也具備良好 的自定義 擴展能力膛薛,因此听隐,?ume 可以適用于大部分的日常數(shù)據(jù)采集場景 。
4哄啄、 Flume 最初由 Cloudera 開發(fā)雅任,在 2011 年貢獻給了 Apache 基金會,2012 年變成了 Apache 的頂 級項目咨跌。Flume OG(Original Generation)是 Flume 最初版本沪么,后升級換代成 Flume NG(Next/New Generation)。
5锌半、 Flume 的優(yōu)勢:可橫向擴展禽车、延展性、可靠性刊殉。
Flume體系結(jié)構(gòu)/核心組件
介紹
Flume 的數(shù)據(jù)流由事件(Event)貫穿始終哭当。事件是 Flume 的基本數(shù)據(jù)單位,它攜帶日志數(shù)據(jù)(字 節(jié) 數(shù)組形式)并且攜帶有頭信息冗澈,這些 Event 由 Agent 外部的 Source 生成钦勘,當(dāng) Source 捕獲事 件后會進 行特定的格式化,然后 Source 會把事件推入(單個或多個)Channel 中亚亲。你可以把 Channel 看作是一個 緩沖區(qū)彻采,它將保存事件直到 Sink 處理完該事件。Sink 負(fù)責(zé)持久化日志或 者把事件推向另一個 Source捌归。
核心組件
Agent:能夠獨立執(zhí)行一個數(shù)據(jù)收集任務(wù)的JVM進程肛响,F(xiàn)lume 以 Agent 為最小的獨立運行單位,一個 Agent 就是一個 JVM惜索,每臺機器運行一個Agent特笋。單 Agent 由 Source、Sink 和 Channel 三大組件構(gòu)巾兆,可以包含多個猎物。
Client:生產(chǎn)數(shù)據(jù)運行在一個獨立的線程
Source:一個Agent中的用來跟數(shù)據(jù)源對接的服務(wù)虎囚,從Client收集數(shù)據(jù),傳遞給Channel
Channel:Agent內(nèi)部的一個中轉(zhuǎn)組件
Sink:一個Agent中的用來跟目的地進行對接的服務(wù)蔫磨,從Channel收集數(shù)據(jù)
Event:在Source淘讥、Channel、Sink中間進行流轉(zhuǎn)的消息的封裝對象
常見的source分類:
avro source:接收網(wǎng)絡(luò)端口中的數(shù)據(jù)
exec source:檢測新增內(nèi)容堤如。tail -f 監(jiān)聽某個文件新增加的內(nèi)容蒲列。
spooldir source:監(jiān)控文件夾的,如果這個文件夾中的數(shù)據(jù)變化搀罢,就可以采集
customer source:自定義
常見的channel分類:
memory:內(nèi)存蝗岖,快,但是不安全
file:安全榔至,但是效率低
jdbc:使用數(shù)據(jù)庫進行保存
常見的sink分類:
loggerSink:做測試用
HDFSSink:離線數(shù)據(jù)的sink
KafkaSink:流式數(shù)據(jù)使用
Flume三大核心組件
Event
Event 是 Flume 數(shù)據(jù)傳輸?shù)幕締卧?
Flume 以事件的形式將數(shù)據(jù)從源頭傳送到最終的目的地抵赢。
Event 由可選的 header 和載有數(shù)據(jù)的一個 byte array 構(gòu)成。
載有的數(shù)據(jù)度 ?ume 是不透明的洛退。
Header 是容納了 key-value 字符串對的無序集合瓣俯,key 在集合內(nèi)是唯一的杰标。
Header 可以在上下文路由中使用擴展兵怯。
Client
Client 是一個將原始 log 包裝成 events 并且發(fā)送他們到一個或多個 agent 的實體
目的是從數(shù)據(jù)源系統(tǒng)中解耦 Flume
在 Flume 的拓?fù)浣Y(jié)構(gòu)中不是必須的
Agent
一個 Agent 包含 source,channel腔剂,sink 和其他組件媒区。
它利用這些組件將 events 從一個節(jié)點傳輸?shù)搅硪粋€節(jié)點或最終目的地。
Agent 是 ?ume 流的基礎(chǔ)部分掸犬。
Flume為這些組件提供了配置袜漩,聲明周期管理,監(jiān)控支持湾碎。
Agent 之 Source
Source 負(fù)責(zé)接收 event 或通過特殊機制產(chǎn)生 event宙攻,并將 events 批量的放到一個或多個
包含 event 驅(qū)動和輪詢兩種類型
不同類型的 Source
與系統(tǒng)集成的 Source:Syslog,Netcat介褥,監(jiān)測目錄池
自動生成事件的 Source:Exec 用于 Agent 和 Agent
之間通信的 IPC source:avro座掘,thrift
Source 必須至少和一個 channel 關(guān)聯(lián)
Agent 之 Channel
Channel 位于 Source 和 Sink 之間,用于緩存進來的 event
當(dāng) sink 成功的將 event 發(fā)送到下一個的 channel 或最終目的柔滔,event 從 channel 刪除
不同的 channel 提供的持久化水平也是不一樣的
Memory Channel:volatile(不穩(wěn)定的)
File Channel:基于 WAL(預(yù)寫式日志 Write-Ahead Logging)實現(xiàn)
JDBC Channel:基于嵌入式 database 實現(xiàn)
Channel 支持事務(wù)溢陪,提供較弱的順序保證
可以和任何數(shù)量的 source 和 sink 工作
Agent 之 Sink
Sink 負(fù)責(zé)將 event 傳輸?shù)较乱患壔蜃罱K目的地,成功后將 event 從 channel 移除
不同類型的 sink 睛廊,比如 HDFS形真,HBase
Flume三大案例:
一、官方案例監(jiān)控端口數(shù)據(jù)案例
1超全、在flume的目錄下面創(chuàng)建文件夾
cd /home/bigdata/apps/apache-flume-1.7.0-bin/
mkdir job
cd job
2咆霜、定義配置文件telnet-logger.conf
vim telnet-logger.conf
添加內(nèi)容如下:
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
解釋
# example.conf: A single-node Flume configuration
# a1:表示agent的名稱
# Name the components on this agent
a1.sources = r1 #r1:表示a1的輸入源
a1.sinks = k1 #k1:表示a1輸出目的地
a1.channels = c1 #c1:表示a1的緩存區(qū)
# Describe/configure the source
a1.sources.r1.type = netcat #表示a的輸入的數(shù)據(jù)源的類型是netcat端口類型
a1.sources.r1.bind = localhost #表示a1監(jiān)聽的主機
a1.sources.r1.port = 44444 #表示a1監(jiān)聽的端口號
# Describe the sink
a1.sinks.k1.type = logger #a1的輸出目的地是控制臺的logger類型
# Use a channel which buffers events in memory
a1.channels.c1.type = memory #a1的channel類型的memory
a1.channels.c1.capacity = 1000 #channel的總?cè)萘渴?000個event
a1.channels.c1.transactionCapacity = 100 #傳輸?shù)臅r候收集夠了100個event在提交
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 #將r1和c1連接起來
a1.sinks.k1.channel = c1 #將k1和c1連接起來
3邓馒、先開啟flume監(jiān)聽端口
退到flume目錄
cd /home/bigdata/apps/apache-flume-1.7.0-bin/
官方樣例:
bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
實際操作:
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,console
參數(shù)說明:
--conf conf: 表示配置文件存儲在conf這個目錄
--name a1: 表示給agent起名為a1
--conf-file job/telnet-logger.conf: flume本次啟動讀取的配置文件是在job文件夾下面的telnet-logger.conf文件
-Dflume.root.logger=INFO,console 打印日志
4、使用telnet測試端口
yum -y install telnet
telnet localhost 44444
5裕便、發(fā)送命令測試即可
二绒净、監(jiān)控目錄中的文件到HDFS
1、創(chuàng)建配置文件dir-hdfs.conf
在job目錄下面
vim dir-hdfs.conf
添加下面的內(nèi)容:
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /software/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop0:8020/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 1
a3.sinks.k3.hdfs.roundUnit = hour
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 100
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 600
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
解釋
a3.sources = r3 #定義source
a3.sinks = k3 #定義sink
a3.channels = c3 #定義channel
# Describe/configure the source
a3.sources.r3.type = spooldir #定義source的類型是目錄
a3.sources.r3.spoolDir =/home/bigdata/data/flumedata #監(jiān)控的本地目錄
a3.sources.r3.fileSuffix = .COMPLETED #上傳完了的后綴
a3.sources.r3.fileHeader = true #是否有文件頭
a3.sources.r3.ignorePattern = ([^ ]*\.tmp) #忽略以tmp結(jié)尾的
# Describe the sink
a3.sinks.k3.type = hdfs #sink的類型hdfs
a3.sinks.k3.hdfs.path = hdfs://bigdata02:9000/flume/upload/%Y%m%d/%H #本地文件上傳到hdfs上面的路徑
a3.sinks.k3.hdfs.filePrefix = upload- #上傳到hdfs上面的文件的前綴
a3.sinks.k3.hdfs.round = true #是否按照時間滾動生成文件
a3.sinks.k3.hdfs.roundValue = 1 #多長時間生成文件
a3.sinks.k3.hdfs.roundUnit = hour #單位
a3.sinks.k3.hdfs.useLocalTimeStamp = true #是否使用本地時間戳
a3.sinks.k3.hdfs.batchSize = 100 #到100個event刷寫到hdfs
a3.sinks.k3.hdfs.fileType = DataStream #文件類型
a3.sinks.k3.hdfs.rollInterval = 600 #多久生成新文件
a3.sinks.k3.hdfs.rollSize = 134217700 #多大的時候生成新文件
a3.sinks.k3.hdfs.rollCount = 0 #多少個event生成新文件
a3.sinks.k3.hdfs.minBlockReplicas = 1 #最小副本數(shù)
# Use a channel which buffers events in memory
a3.channels.c3.type = memory #第一個案例中有
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3 #第一個案例中有
a3.sinks.k3.channel = c3
2偿衰、啟動監(jiān)控目錄命令
cd ..
mkdir /home/bigdata/data/flumedata/
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-hdfs.conf
cp text3.txt /home/bigdata/data/flumedata/
等600秒查看
新傳一個文件立即查看挂疆,文件結(jié)尾
.tmp
三、監(jiān)控文件到HDFS
1下翎、創(chuàng)建一個自動化文件
vim mydateauto.sh
寫入:
#!/bin/bash
while true
do
echo `date`
sleep 1
done
然后運行測試:
sh mydateauto.sh
然后修改配置缤言,將輸出的日志追加到某個文件中
#!/bin/bash
while true
do
echo `date` >> /home/bigdata/data/flumedata/mydate.txt
sleep 1
done
再次執(zhí)行
sh mydateauto.sh
就會在/home/bigdata/data/flumedata/
的文件夾下面生成了mydate.txt文件
查看
tail -f /home/bigdata/data/flumedata/mydate.txt
2、創(chuàng)建配置file-hdfs.conf
vim file-hdfs.conf
添加下面的內(nèi)容:
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /home/bigdata/data/flumedata/mydate.txt
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://bigdata02:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs-
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
解釋
# Name the components on this agent
a2.sources = r2 #定義source
a2.sinks = k2 #定義sink
a2.channels = c2 #定義channel
# Describe/configure the source
a2.sources.r2.type = exec #source的類型
a2.sources.r2.command = tail -F /home/bigdata/data/flumedata/mydate.txt #監(jiān)控的本地文件
a2.sources.r2.shell = /bin/bash -c #執(zhí)行腳本的絕對路徑
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://bigdata02:9000/flume/%Y%m%d/%H
a2.sinks.k2.hdfs.filePrefix = logs- #前綴
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = hour
a2.sinks.k2.hdfs.useLocalTimeStamp = true
a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.rollInterval = 600
a2.sinks.k2.hdfs.rollSize = 134217700
a2.sinks.k2.hdfs.rollCount = 0
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
3视事、啟動
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-hdfs.conf