上一篇:089-BigData-17ZooKeeper秒殺和hadoop高可用
一、Flume簡(jiǎn)介
Flume提供一個(gè)分布式的,可靠的,對(duì)大數(shù)據(jù)量的日志進(jìn)行高效收集溉瓶、聚集、移動(dòng)的服務(wù)谤民,F(xiàn)lume只能在Unix環(huán)境下運(yùn)行堰酿。
Flume基于流式架構(gòu),容錯(cuò)性強(qiáng)赖临,也很靈活簡(jiǎn)單胞锰。
Flume、Kafka用來(lái)實(shí)時(shí)進(jìn)行數(shù)據(jù)收集兢榨,Spark嗅榕、Flink用來(lái)實(shí)時(shí)處理數(shù)據(jù),impala用來(lái)實(shí)時(shí)查詢吵聪。
二凌那、Flume角色
2.1、Source
用于采集數(shù)據(jù)吟逝,Source是產(chǎn)生數(shù)據(jù)流的地方帽蝶,同時(shí)Source會(huì)將產(chǎn)生的數(shù)據(jù)流傳輸?shù)紺hannel,這個(gè)有點(diǎn)類(lèi)似于Java IO部分的Channel块攒。
2.2励稳、Channel
用于橋接Sources和Sinks佃乘,類(lèi)似于一個(gè)隊(duì)列。
2.3驹尼、Sink
從Channel收集數(shù)據(jù)趣避,將數(shù)據(jù)寫(xiě)到目標(biāo)源(可以是下一個(gè)Source,也可以是HDFS或者HBase)新翎。
2.4程帕、Event
傳輸單元,F(xiàn)lume數(shù)據(jù)傳輸?shù)幕締卧貑允录男问綄?shù)據(jù)從源頭送至目的地愁拭。
三、Flume傳輸過(guò)程
source監(jiān)控某個(gè)文件或數(shù)據(jù)流亏吝,數(shù)據(jù)源產(chǎn)生新的數(shù)據(jù)岭埠,拿到該數(shù)據(jù)后,將數(shù)據(jù)封裝在一個(gè)Event中顺呕,并put到channel后commit提交枫攀,channel隊(duì)列先進(jìn)先出括饶,sink去channel隊(duì)列中拉取數(shù)據(jù)株茶,然后寫(xiě)入到HDFS中。
四图焰、Flume部署及使用
4.1启盛、文件配置
查詢JAVA_HOME: echo $JAVA_HOME
/opt/module/jdk1.8.0_144 /opt/module/jdk1.8.0_144
安裝Flume
[AncientMing@bigdata113 software]$ tar -zxvf apache-flume1.8.0-bin.tar.gz -C /opt/module/
改名:
[AncientMing@bigdata113 conf]$ mv flume-env.sh.template flume-env.sh
flume-env.sh涉及修改項(xiàng):
export JAVA_HOME=/opt/module/jdk1.8.0_144
Linux中文件上傳命令是:rz,下載:sz 如果沒(méi)有這兩個(gè)命令使用yum下載:yum install lrzsz
4.2技羔、案例
4.2.1僵闯、案例一:監(jiān)控端口數(shù)據(jù)
目標(biāo):Flume監(jiān)控一端Console,另一端Console發(fā)送消息藤滥,使被監(jiān)控端實(shí)時(shí)顯示鳖粟。
分步實(shí)現(xiàn):
- 安裝telnet工具
- 創(chuàng)建Flume Agent配置文件flume-telnet.conf
#定義Agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定義netcatsource
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata111
a1.sources.r1.port = 44445
# 定義sink
a1.sinks.k1.type = logger
# 定義channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 雙向鏈接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 判斷44444端口是否被占用
$ netstat -tunlp | grep 44445
- 啟動(dòng)flume配置文件
/opt/module/flume-1.8.0/bin/flume-ng agent \
--conf /opt/module/flume1.8.0/conf/ \
--name a1 \
--conf-file /opt/module/flume-1.8.0/jobconf/flume-telnet.conf \
-Dflume.root.logger==INFO,console
bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/flume-telnet.conf -Dflume.root.logger==INFO,console
- 使用telnet工具向本機(jī)的44444端口發(fā)送內(nèi)容
$ telnet bigdata111 44445
4.2.2、案例二:實(shí)時(shí)讀取本地文件到HDFS
- 創(chuàng)建flume-hdfs.conf文件
# 1 agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# 2 source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/plus
a2.sources.r2.shell = /bin/bash -c
# 3 sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://bigdata111:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k2.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設(shè)置文件類(lèi)型拙绊,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a2.sinks.k2.hdfs.rollCount = 0
#最小副本數(shù)
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
- 執(zhí)行監(jiān)控配置
/opt/module/flume1.8.0/bin/flume-ng agent \
--conf /opt/module/flume1.8.0/conf/ \
--name a2 \
--conf-file /opt/module/flume1.8.0/jobconf/flume-hdfs.conf
4.2.3向图、案例三:實(shí)時(shí)讀取目錄文件到HDFS
目標(biāo):使用flume監(jiān)聽(tīng)整個(gè)目錄的文件
分步實(shí)現(xiàn):
- 創(chuàng)建配置文件flume-dir.conf
#1 Agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3
#2 source
#監(jiān)控目錄的類(lèi)型
a3.sources.r3.type = spooldir
#監(jiān)控目錄的路徑
a3.sources.r3.spoolDir = /opt/module/flume1.8.0/upload
#哪個(gè)文件上傳hdfs,然后給這個(gè)文件添加一個(gè)后綴
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結(jié)尾的文件标沪,不上傳(可選)
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# 3 sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://bigdata111:9000/flume/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時(shí)間滾動(dòng)文件夾
a3.sinks.k3.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時(shí)間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設(shè)置文件類(lèi)型榄攀,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a3.sinks.k3.hdfs.rollCount = 0
#最小副本數(shù)
a3.sinks.k3.hdfs.minBlockReplicas = 1
- 執(zhí)行測(cè)試:執(zhí)行如下腳本后,請(qǐng)向upload文件夾中添加文件試試
/opt/module/flume1.8.0/bin/flume-ng agent \
--conf /opt/module/flume1.8.0/conf/ \
--name a3 \
--conf-file /opt/module/flume1.8.0/jobconf/flume-dir.conf
尖叫提示: 在使用Spooling Directory Source時(shí)
- 不要在監(jiān)控目錄中創(chuàng)建并持續(xù)修改文件
- 上傳完成的文件會(huì)以.COMPLETED結(jié)尾
- 被監(jiān)控文件夾每500毫秒掃描一次文件變動(dòng)
4.2.4金句、案例四:Flume與Flume之間數(shù)據(jù)傳遞:?jiǎn)蜦lume多Channel檩赢、Sink
目標(biāo):使用flume1監(jiān)控文件變動(dòng),flume1將變動(dòng)內(nèi)容傳遞給flume-2违寞,flume-2負(fù)責(zé)存儲(chǔ)到HDFS贞瞒。同時(shí)flume1將變動(dòng)內(nèi)容傳遞給flume-3偶房,flume-3負(fù)責(zé)輸出到local
分步實(shí)現(xiàn):
- 創(chuàng)建flume1.conf,用于監(jiān)控某文件的變動(dòng)军浆,同時(shí)產(chǎn)生兩個(gè)channel和兩個(gè)sink分別輸送給flume2和flume3:
# 1.agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數(shù)據(jù)流復(fù)制給多個(gè)channel
a1.sources.r1.selector.type = replicating
# 2.source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.shell = /bin/bash -c
# 3.sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata111
a1.sinks.k1.port = 4141
# sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata111
a1.sinks.k2.port = 4142
# 4.channel—1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4.channel—2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
- 創(chuàng)建flume-2.conf蝴悉,用于接收f(shuō)lume1的event,同時(shí)產(chǎn)生1個(gè)channel和1個(gè)sink瘾敢,將數(shù)據(jù)輸送給hdfs:
# 1 agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# 2 source
a2.sources.r1.type = avro
a2.sources.r1.bind = bigdata111
a2.sources.r1.port = 4141
# 3 sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume2/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k1.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類(lèi)型拍冠,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a2.sinks.k1.hdfs.rollCount = 0
#最小副本數(shù)
a2.sinks.k1.hdfs.minBlockReplicas = 1
# 4 channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
- 創(chuàng)建flume-3.conf,用于接收f(shuō)lume1的event簇抵,同時(shí)產(chǎn)生1個(gè)channel和1個(gè)sink庆杜,將數(shù)據(jù)輸送給本地目錄:
#1 agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# 2 source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata111
a3.sources.r1.port = 4142
#3 sink
a3.sinks.k1.type = file_roll
#備注:此處的文件夾需要先創(chuàng)建好
a3.sinks.k1.sink.directory = /opt/flume3
# 4 channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# 5 Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
尖叫提示:輸出的本地目錄必須是已經(jīng)存在的目錄,如果該目錄不存在碟摆,并不會(huì)創(chuàng)建新的目錄晃财。
- 執(zhí)行測(cè)試:分別開(kāi)啟對(duì)應(yīng)flume-job(依次啟動(dòng)flume1,flume-2典蜕,flume-3)断盛,同時(shí)產(chǎn)生文件變動(dòng)并觀察結(jié)果:
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume1.conf
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume2.conf
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume3.conf
4.2.5、案例五:Flume與Flume之間數(shù)據(jù)傳遞愉舔,多Flume匯總數(shù)據(jù)到單Flume
目標(biāo):flume11監(jiān)控文件hive.log钢猛,flume-22監(jiān)控某一個(gè)端口的數(shù)據(jù)流,flume11與flume-22將數(shù)據(jù)發(fā)送給flume-33轩缤,flume33將最終數(shù)據(jù)寫(xiě)入到HDFS命迈。
分步實(shí)現(xiàn):
- 創(chuàng)建flume11.conf,用于監(jiān)控hive.log文件火的,同時(shí)sink數(shù)據(jù)到flume-33:
# 1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.shell = /bin/bash -c
# 3 sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata111
a1.sinks.k1.port = 4141
# 4 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 5. Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 創(chuàng)建flume-22.conf壶愤,用于監(jiān)控端口44444數(shù)據(jù)流,同時(shí)sink數(shù)據(jù)到flume-33:
# 1 agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# 2 source
a2.sources.r1.type = netcat
a2.sources.r1.bind = bigdata111
a2.sources.r1.port = 44444
#3 sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = bigdata111
a2.sinks.k1.port = 4141
# 4 channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# 5 Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
- 創(chuàng)建flume33.conf馏鹤,用于接收f(shuō)lume11與flume22發(fā)送過(guò)來(lái)的數(shù)據(jù)流征椒,最終合并后sink到HDFS:
# 1 agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# 2 source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata111
a3.sources.r1.port = 4141
# 3 sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume3/%H
#上傳文件的前綴
a3.sinks.k1.hdfs.filePrefix = flume3-
#是否按照時(shí)間滾動(dòng)文件夾
a3.sinks.k1.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a3.sinks.k1.hdfs.roundValue = 1
#重新定義時(shí)間單位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a3.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類(lèi)型,可支持壓縮
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a3.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a3.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a3.sinks.k1.hdfs.rollCount = 0
#最小冗余數(shù)
a3.sinks.k1.hdfs.minBlockReplicas = 1
# 4 channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# 5 Bind
- 執(zhí)行測(cè)試:分別開(kāi)啟對(duì)應(yīng)flume-job(依次啟動(dòng)flume-33湃累,flume-22勃救,flume11),同時(shí)產(chǎn)生文件變動(dòng)并觀察結(jié)果:
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume33.conf
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume22.conf
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume11.conf
數(shù)據(jù)發(fā)送
telnet bigdata111 44444 打開(kāi)后發(fā)送5555555
在/opt/Andy 中追加666666
4.2.6脱茉、案例六:Flume攔截器
時(shí)間戳攔截器
Timestamp.conf
#1.定義agent名剪芥, source、channel琴许、sink的名稱(chēng)
a4.sources = r1
a4.channels = c1
a4.sinks = k1
#2.具體定義source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /opt/module/flume-1.8.0/upload
#定義攔截器税肪,為文件最后添加時(shí)間戳
a4.sources.r1.interceptors = i1
a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#具體定義channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100
#具體定義sink
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume-interceptors/%H
a4.sinks.k1.hdfs.filePrefix = events-
a4.sinks.k1.hdfs.fileType = DataStream
#不按照條數(shù)生成文件
a4.sinks.k1.hdfs.rollCount = 0
#HDFS上的文件達(dá)到128M時(shí)生成一個(gè)文件
a4.sinks.k1.hdfs.rollSize = 134217728
#HDFS上的文件達(dá)到60秒生成一個(gè)文件
a4.sinks.k1.hdfs.rollInterval = 60
#組裝source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
啟動(dòng)命令
/opt/module/flume-1.8.0/bin/flume-ng agent -n a4 \
-f /opt/module/flume-1.8.0/jobconf/flume-interceptors.conf \
-c /opt/module/flume-1.8.0/conf \
-Dflume.root.logger=INFO,console
主機(jī)名攔截器
Host.conf
#1.定義agent
a1.sources= r1
a1.sinks = k1
a1.channels = c1
#2.定義source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
#攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
#參數(shù)為true時(shí)用IP192.168.1.111益兄,參數(shù)為false時(shí)用主機(jī)名锻梳,默認(rèn)為true
a1.sources.r1.interceptors.i1.useIP = false
a1.sources.r1.interceptors.i1.hostHeader = agentHost
#3.定義sinks
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flumehost/%H
a1.sinks.k1.hdfs.filePrefix = Andy_%{agentHost}
#往生成的文件加后綴名.log
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動(dòng)命令:
bin/flume-ng agent -c conf/ -f jobconf/host.conf -n a1 -Dflume.root.logger=INFO,console
UUID攔截器
uuid.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
#type的參數(shù)不能寫(xiě)成uuid,得寫(xiě)具體净捅,否則找不到類(lèi)
a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
#如果UUID頭已經(jīng)存在,它應(yīng)該保存
a1.sources.r1.interceptors.i1.preserveExisting = true
a1.sources.r1.interceptors.i1.prefix = UUID_
#如果sink類(lèi)型改為HDFS疑枯,那么在HDFS的文本中沒(méi)有headers的信息數(shù)據(jù)
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# bin/flume-ng agent -c conf/ -f jobconf/uuid.conf -n a1 -Dflume.root.logger==INFO,console
查詢替換攔截器
search.conf
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = search_replace
#遇到數(shù)字改成AncientMing,A123會(huì)替換為AAncientMing
a1.sources.r1.interceptors.i1.searchPattern = [0-9]+
a1.sources.r1.interceptors.i1.replaceString = AncientMing
a1.sources.r1.interceptors.i1.charset = UTF-8
#3 sink
a1.sinks.k1.type = logger
#4 Chanel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#5 bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# bin/flume-ng agent -c conf/ -f jobconf/search.conf -n a1 -Dflume.root.logger=INFO,console
正則過(guò)濾攔截器
filter.conf
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^A.*
#如果excludeEvents設(shè)為false,表示過(guò)濾掉不是以A開(kāi)頭的events蛔六。如果excludeEvents設(shè)為true荆永,則表示過(guò)濾掉以A開(kāi)頭的events。
a1.sources.r1.interceptors.i1.excludeEvents = true
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# bin/flume-ng agent -c conf/ -f jobconf/filter.conf -n a1 -Dflume.root.logger=INFO,console
正則抽取攔截器
extractor.conf
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = hostname is (.*?) ip is (.*)
a1.sources.r1.interceptors.i1.serializers = s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid
a1.sources.r1.interceptors.i1.serializers.s2.name = ip
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# bin/flume-ng agent -c conf/ -f jobconf/extractor.conf -n a1 -Dflume.root.logger=INFO,console
注:正則抽取攔截器的headers不會(huì)出現(xiàn)在文件名和文件內(nèi)容中
4.2.7国章、案例七:Flume自定義攔截器
字母小寫(xiě)變大寫(xiě)
1.Pom.xml
<dependencies>
<!-- flume核心依賴(lài) -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<!-- 編譯插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>utf-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
2.自定義實(shí)現(xiàn)攔截器
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
public class MyInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public void close() {
}
/**
* 攔截source發(fā)送到通道channel中的消息
*
* @param event 接收過(guò)濾的event
* @return event 根據(jù)業(yè)務(wù)處理后的event
*/
@Override
public Event intercept(Event event) {
// 獲取事件對(duì)象中的字節(jié)數(shù)據(jù)
byte[] arr = event.getBody();
// 將獲取的數(shù)據(jù)轉(zhuǎn)換成大寫(xiě)
event.setBody(new String(arr).toUpperCase().getBytes());
// 返回到消息中
return event;
}
// 接收被過(guò)濾事件集合
@Override
public List<Event> intercept(List<Event> events) {
List<Event> list = new ArrayList<>();
for (Event event : events) {
list.add(intercept(event));
}
return list;
}
public static class Builder implements Interceptor.Builder {
// 獲取配置文件的屬性
@Override
public Interceptor build() {
return new MyInterceptor();
}
@Override
public void configure(Context context) {
}
}
使用Maven做成Jar包具钥,在flume的目錄下mkdir jar,上傳此jar到j(luò)ar目錄中
Flume配置文件
ToUpCase.conf
#1.agent
a1.sources = r1
a1.sinks =k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
#全類(lèi)名$Builder
a1.sources.r1.interceptors.i1.type = ToUpCase.MyInterceptor$Builder
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /ToUpCase1
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件類(lèi)型液兽,默認(rèn)是 Sequencefile骂删,可用 DataStream,則為普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
運(yùn)行命令:
bin/flume-ng agent -c conf/ -n a1 -f jar/ToUpCase.conf -C jar/Flume-1.0-SNAPSHOT.jar -Dflume.root.logger=DEBUG,console
4.2.8四啰、案例七:Flume對(duì)接kafka
配置flume(flume-kafka.conf)
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/jars/calllog.csv
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = bigdata111:9092,bigdata112:9092,bigdata113:9092
a1.sinks.k1.topic = calllog
a1.sinks.k1.batchSize = 20
a1.sinks.k1.requiredAcks = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
進(jìn)入flume根目錄下宁玫,啟動(dòng)flume
/opt/module/flume-1.8.0/bin/flume-ng agent --conf /opt/module/flume-1.8.0/conf/ --name a1 --conf-file /opt/jars/flume2kafka.conf
4.2.9、案例八:kafka對(duì)接Flume
kafka2flume.conf
agent.sources = kafkaSource
agent.channels = memoryChannel
agent.sinks = hdfsSink
# The channel can be defined as follows.
agent.sources.kafkaSource.channels = memoryChannel
agent.sources.kafkaSource.type=org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.zookeeperConnect=bigdata111:2181,bigdata112:2181,bigdata113:2181
agent.sources.kafkaSource.topic=calllog
#agent.sources.kafkaSource.groupId=flume
agent.sources.kafkaSource.kafka.consumer.timeout.ms=100
agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.capacity=10000
agent.channels.memoryChannel.transactionCapacity=1000
agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.capacity=10000
agent.channels.memoryChannel.transactionCapacity=1000
# the sink of hdfs
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.channel = memoryChannel
agent.sinks.hdfsSink.hdfs.path=hdfs://bigdata111:9000/kafka2flume
agent.sinks.hdfsSink.hdfs.writeFormat=Text
agent.sinks.hdfsSink.hdfs.fileType=DataStream
#這兩個(gè)不配置柑晒,會(huì)產(chǎn)生大量的小文件
agent.sinks.hdfsSink.hdfs.rollSize=0
agent.sinks.hdfsSink.hdfs.rollCount=0
啟動(dòng)命令
bin/flume-ng agent --conf conf --conf-file jobconf/kafka2flume.conf --name agent -Dflume.root.logger=INFO,console
注意:這個(gè)配置是從kafka過(guò)數(shù)據(jù)欧瘪,但是需要重新向kafka的topic灌數(shù)據(jù),他才會(huì)傳到HDFS
嗯敦迄,需要多練習(xí)恋追,實(shí)戰(zhàn)!
rz和sz工具罚屋,上傳和下載工具
yum install -y lrzsz