090-BigData-18Flume

上一篇:089-BigData-17ZooKeeper秒殺和hadoop高可用

一、Flume簡(jiǎn)介

  1. Flume提供一個(gè)分布式的,可靠的,對(duì)大數(shù)據(jù)量的日志進(jìn)行高效收集溉瓶、聚集、移動(dòng)的服務(wù)谤民,F(xiàn)lume只能在Unix環(huán)境下運(yùn)行堰酿。

  2. Flume基于流式架構(gòu),容錯(cuò)性強(qiáng)赖临,也很靈活簡(jiǎn)單胞锰。

  3. Flume、Kafka用來(lái)實(shí)時(shí)進(jìn)行數(shù)據(jù)收集兢榨,Spark嗅榕、Flink用來(lái)實(shí)時(shí)處理數(shù)據(jù),impala用來(lái)實(shí)時(shí)查詢吵聪。

二凌那、Flume角色

image.png

2.1、Source

用于采集數(shù)據(jù)吟逝,Source是產(chǎn)生數(shù)據(jù)流的地方帽蝶,同時(shí)Source會(huì)將產(chǎn)生的數(shù)據(jù)流傳輸?shù)紺hannel,這個(gè)有點(diǎn)類(lèi)似于Java IO部分的Channel块攒。

2.2励稳、Channel

用于橋接Sources和Sinks佃乘,類(lèi)似于一個(gè)隊(duì)列。

2.3驹尼、Sink

從Channel收集數(shù)據(jù)趣避,將數(shù)據(jù)寫(xiě)到目標(biāo)源(可以是下一個(gè)Source,也可以是HDFS或者HBase)新翎。

2.4程帕、Event

傳輸單元,F(xiàn)lume數(shù)據(jù)傳輸?shù)幕締卧貑允录男问綄?shù)據(jù)從源頭送至目的地愁拭。

三、Flume傳輸過(guò)程

source監(jiān)控某個(gè)文件或數(shù)據(jù)流亏吝,數(shù)據(jù)源產(chǎn)生新的數(shù)據(jù)岭埠,拿到該數(shù)據(jù)后,將數(shù)據(jù)封裝在一個(gè)Event中顺呕,并put到channel后commit提交枫攀,channel隊(duì)列先進(jìn)先出括饶,sink去channel隊(duì)列中拉取數(shù)據(jù)株茶,然后寫(xiě)入到HDFS中。

四图焰、Flume部署及使用

4.1启盛、文件配置
查詢JAVA_HOME: echo $JAVA_HOME

/opt/module/jdk1.8.0_144  /opt/module/jdk1.8.0_144

安裝Flume

[AncientMing@bigdata113 software]$ tar -zxvf apache-flume1.8.0-bin.tar.gz -C /opt/module/

改名:

[AncientMing@bigdata113 conf]$ mv flume-env.sh.template flume-env.sh

flume-env.sh涉及修改項(xiàng):

export JAVA_HOME=/opt/module/jdk1.8.0_144

Linux中文件上傳命令是:rz,下載:sz 如果沒(méi)有這兩個(gè)命令使用yum下載:yum install lrzsz

4.2技羔、案例
4.2.1僵闯、案例一:監(jiān)控端口數(shù)據(jù)
目標(biāo):Flume監(jiān)控一端Console,另一端Console發(fā)送消息藤滥,使被監(jiān)控端實(shí)時(shí)顯示鳖粟。
分步實(shí)現(xiàn):

  1. 安裝telnet工具
image.png
  1. 創(chuàng)建Flume Agent配置文件flume-telnet.conf
#定義Agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#定義netcatsource
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata111
a1.sources.r1.port = 44445

# 定義sink
a1.sinks.k1.type = logger

# 定義channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 雙向鏈接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 判斷44444端口是否被占用
$ netstat -tunlp | grep 44445
  1. 啟動(dòng)flume配置文件
/opt/module/flume-1.8.0/bin/flume-ng agent \
--conf /opt/module/flume1.8.0/conf/ \
--name a1 \
--conf-file /opt/module/flume-1.8.0/jobconf/flume-telnet.conf \
-Dflume.root.logger==INFO,console
bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/flume-telnet.conf -Dflume.root.logger==INFO,console
  1. 使用telnet工具向本機(jī)的44444端口發(fā)送內(nèi)容
$ telnet bigdata111 44445

4.2.2、案例二:實(shí)時(shí)讀取本地文件到HDFS

  1. 創(chuàng)建flume-hdfs.conf文件
# 1 agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# 2 source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/plus
a2.sources.r2.shell = /bin/bash -c

# 3 sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://bigdata111:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k2.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設(shè)置文件類(lèi)型拙绊,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a2.sinks.k2.hdfs.rollCount = 0
#最小副本數(shù)
a2.sinks.k2.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

  1. 執(zhí)行監(jiān)控配置
/opt/module/flume1.8.0/bin/flume-ng agent \
--conf /opt/module/flume1.8.0/conf/ \
--name a2 \
--conf-file /opt/module/flume1.8.0/jobconf/flume-hdfs.conf

4.2.3向图、案例三:實(shí)時(shí)讀取目錄文件到HDFS
目標(biāo):使用flume監(jiān)聽(tīng)整個(gè)目錄的文件
分步實(shí)現(xiàn):

  1. 創(chuàng)建配置文件flume-dir.conf
#1 Agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3

#2 source
#監(jiān)控目錄的類(lèi)型
a3.sources.r3.type = spooldir
#監(jiān)控目錄的路徑
a3.sources.r3.spoolDir = /opt/module/flume1.8.0/upload
#哪個(gè)文件上傳hdfs,然后給這個(gè)文件添加一個(gè)后綴
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結(jié)尾的文件标沪,不上傳(可選)
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# 3 sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://bigdata111:9000/flume/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時(shí)間滾動(dòng)文件夾
a3.sinks.k3.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時(shí)間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設(shè)置文件類(lèi)型榄攀,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a3.sinks.k3.hdfs.rollCount = 0
#最小副本數(shù)
a3.sinks.k3.hdfs.minBlockReplicas = 1

  1. 執(zhí)行測(cè)試:執(zhí)行如下腳本后,請(qǐng)向upload文件夾中添加文件試試
/opt/module/flume1.8.0/bin/flume-ng agent \
--conf /opt/module/flume1.8.0/conf/ \
--name a3 \
--conf-file /opt/module/flume1.8.0/jobconf/flume-dir.conf

尖叫提示: 在使用Spooling Directory Source時(shí)

  1. 不要在監(jiān)控目錄中創(chuàng)建并持續(xù)修改文件
  2. 上傳完成的文件會(huì)以.COMPLETED結(jié)尾
  3. 被監(jiān)控文件夾每500毫秒掃描一次文件變動(dòng)

4.2.4金句、案例四:Flume與Flume之間數(shù)據(jù)傳遞:?jiǎn)蜦lume多Channel檩赢、Sink

image.png

目標(biāo):使用flume1監(jiān)控文件變動(dòng),flume1將變動(dòng)內(nèi)容傳遞給flume-2违寞,flume-2負(fù)責(zé)存儲(chǔ)到HDFS贞瞒。同時(shí)flume1將變動(dòng)內(nèi)容傳遞給flume-3偶房,flume-3負(fù)責(zé)輸出到local
分步實(shí)現(xiàn):

  1. 創(chuàng)建flume1.conf,用于監(jiān)控某文件的變動(dòng)军浆,同時(shí)產(chǎn)生兩個(gè)channel和兩個(gè)sink分別輸送給flume2和flume3:
# 1.agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數(shù)據(jù)流復(fù)制給多個(gè)channel
a1.sources.r1.selector.type = replicating

# 2.source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.shell = /bin/bash -c

# 3.sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata111
a1.sinks.k1.port = 4141

# sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata111
a1.sinks.k2.port = 4142

# 4.channel—1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 4.channel—2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
  1. 創(chuàng)建flume-2.conf蝴悉,用于接收f(shuō)lume1的event,同時(shí)產(chǎn)生1個(gè)channel和1個(gè)sink瘾敢,將數(shù)據(jù)輸送給hdfs:
# 1 agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# 2 source
a2.sources.r1.type = avro
a2.sources.r1.bind = bigdata111
a2.sources.r1.port = 4141

# 3 sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume2/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k1.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類(lèi)型拍冠,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a2.sinks.k1.hdfs.rollCount = 0
#最小副本數(shù)
a2.sinks.k1.hdfs.minBlockReplicas = 1


# 4 channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

  1. 創(chuàng)建flume-3.conf,用于接收f(shuō)lume1的event簇抵,同時(shí)產(chǎn)生1個(gè)channel和1個(gè)sink庆杜,將數(shù)據(jù)輸送給本地目錄:
#1 agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# 2 source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata111
a3.sources.r1.port = 4142

#3 sink
a3.sinks.k1.type = file_roll
#備注:此處的文件夾需要先創(chuàng)建好
a3.sinks.k1.sink.directory = /opt/flume3

# 4 channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# 5 Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

尖叫提示:輸出的本地目錄必須是已經(jīng)存在的目錄,如果該目錄不存在碟摆,并不會(huì)創(chuàng)建新的目錄晃财。

  1. 執(zhí)行測(cè)試:分別開(kāi)啟對(duì)應(yīng)flume-job(依次啟動(dòng)flume1,flume-2典蜕,flume-3)断盛,同時(shí)產(chǎn)生文件變動(dòng)并觀察結(jié)果:
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume1.conf

$ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume2.conf

$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume3.conf

4.2.5、案例五:Flume與Flume之間數(shù)據(jù)傳遞愉舔,多Flume匯總數(shù)據(jù)到單Flume

image.png

目標(biāo):flume11監(jiān)控文件hive.log钢猛,flume-22監(jiān)控某一個(gè)端口的數(shù)據(jù)流,flume11與flume-22將數(shù)據(jù)發(fā)送給flume-33轩缤,flume33將最終數(shù)據(jù)寫(xiě)入到HDFS命迈。
分步實(shí)現(xiàn):

  1. 創(chuàng)建flume11.conf,用于監(jiān)控hive.log文件火的,同時(shí)sink數(shù)據(jù)到flume-33:
# 1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 2 source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.shell = /bin/bash -c

# 3 sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata111
a1.sinks.k1.port = 4141

# 4 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 5. Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 創(chuàng)建flume-22.conf壶愤,用于監(jiān)控端口44444數(shù)據(jù)流,同時(shí)sink數(shù)據(jù)到flume-33:
# 1 agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# 2 source
a2.sources.r1.type = netcat
a2.sources.r1.bind = bigdata111
a2.sources.r1.port = 44444

#3 sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = bigdata111
a2.sinks.k1.port = 4141

# 4 channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# 5 Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
  1. 創(chuàng)建flume33.conf馏鹤,用于接收f(shuō)lume11與flume22發(fā)送過(guò)來(lái)的數(shù)據(jù)流征椒,最終合并后sink到HDFS:
# 1 agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# 2 source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata111
a3.sources.r1.port = 4141

# 3 sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume3/%H
#上傳文件的前綴
a3.sinks.k1.hdfs.filePrefix = flume3-
#是否按照時(shí)間滾動(dòng)文件夾
a3.sinks.k1.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a3.sinks.k1.hdfs.roundValue = 1
#重新定義時(shí)間單位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a3.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類(lèi)型,可支持壓縮
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a3.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a3.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a3.sinks.k1.hdfs.rollCount = 0
#最小冗余數(shù)
a3.sinks.k1.hdfs.minBlockReplicas = 1

# 4 channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# 5 Bind

  1. 執(zhí)行測(cè)試:分別開(kāi)啟對(duì)應(yīng)flume-job(依次啟動(dòng)flume-33湃累,flume-22勃救,flume11),同時(shí)產(chǎn)生文件變動(dòng)并觀察結(jié)果:
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume33.conf
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume22.conf
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume11.conf

數(shù)據(jù)發(fā)送

telnet bigdata111 44444    打開(kāi)后發(fā)送5555555

在/opt/Andy 中追加666666

4.2.6脱茉、案例六:Flume攔截器
時(shí)間戳攔截器
Timestamp.conf

#1.定義agent名剪芥, source、channel琴许、sink的名稱(chēng)
a4.sources = r1
a4.channels = c1
a4.sinks = k1

#2.具體定義source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /opt/module/flume-1.8.0/upload

#定義攔截器税肪,為文件最后添加時(shí)間戳
a4.sources.r1.interceptors = i1
a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

#具體定義channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100


#具體定義sink
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume-interceptors/%H
a4.sinks.k1.hdfs.filePrefix = events-
a4.sinks.k1.hdfs.fileType = DataStream

#不按照條數(shù)生成文件
a4.sinks.k1.hdfs.rollCount = 0
#HDFS上的文件達(dá)到128M時(shí)生成一個(gè)文件
a4.sinks.k1.hdfs.rollSize = 134217728
#HDFS上的文件達(dá)到60秒生成一個(gè)文件
a4.sinks.k1.hdfs.rollInterval = 60

#組裝source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1

啟動(dòng)命令

/opt/module/flume-1.8.0/bin/flume-ng agent -n a4 \
-f /opt/module/flume-1.8.0/jobconf/flume-interceptors.conf \
-c /opt/module/flume-1.8.0/conf \
-Dflume.root.logger=INFO,console

主機(jī)名攔截器

Host.conf

#1.定義agent
a1.sources= r1
a1.sinks = k1
a1.channels = c1

#2.定義source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
#攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

#參數(shù)為true時(shí)用IP192.168.1.111益兄,參數(shù)為false時(shí)用主機(jī)名锻梳,默認(rèn)為true
a1.sources.r1.interceptors.i1.useIP = false
a1.sources.r1.interceptors.i1.hostHeader = agentHost

 #3.定義sinks
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flumehost/%H
a1.sinks.k1.hdfs.filePrefix = Andy_%{agentHost}
#往生成的文件加后綴名.log
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.useLocalTimeStamp = true
 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啟動(dòng)命令:

bin/flume-ng agent -c conf/ -f jobconf/host.conf -n a1 -Dflume.root.logger=INFO,console

UUID攔截器
uuid.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
#type的參數(shù)不能寫(xiě)成uuid,得寫(xiě)具體净捅,否則找不到類(lèi)
a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
#如果UUID頭已經(jīng)存在,它應(yīng)該保存
a1.sources.r1.interceptors.i1.preserveExisting = true
a1.sources.r1.interceptors.i1.prefix = UUID_

#如果sink類(lèi)型改為HDFS疑枯,那么在HDFS的文本中沒(méi)有headers的信息數(shù)據(jù)
a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# bin/flume-ng agent -c conf/ -f jobconf/uuid.conf -n a1 -Dflume.root.logger==INFO,console

查詢替換攔截器
search.conf

#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = search_replace

#遇到數(shù)字改成AncientMing,A123會(huì)替換為AAncientMing
a1.sources.r1.interceptors.i1.searchPattern = [0-9]+
a1.sources.r1.interceptors.i1.replaceString = AncientMing
a1.sources.r1.interceptors.i1.charset = UTF-8

#3 sink
a1.sinks.k1.type = logger

#4 Chanel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#5 bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# bin/flume-ng agent -c conf/ -f jobconf/search.conf -n a1 -Dflume.root.logger=INFO,console

正則過(guò)濾攔截器
filter.conf

#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^A.*
#如果excludeEvents設(shè)為false,表示過(guò)濾掉不是以A開(kāi)頭的events蛔六。如果excludeEvents設(shè)為true荆永,則表示過(guò)濾掉以A開(kāi)頭的events。
a1.sources.r1.interceptors.i1.excludeEvents = true

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# bin/flume-ng agent -c conf/ -f jobconf/filter.conf -n a1 -Dflume.root.logger=INFO,console

正則抽取攔截器
extractor.conf

#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = hostname is (.*?) ip is (.*)
a1.sources.r1.interceptors.i1.serializers = s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name = cookieid
a1.sources.r1.interceptors.i1.serializers.s2.name = ip

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# bin/flume-ng agent -c conf/ -f jobconf/extractor.conf -n a1 -Dflume.root.logger=INFO,console

注:正則抽取攔截器的headers不會(huì)出現(xiàn)在文件名和文件內(nèi)容中

4.2.7国章、案例七:Flume自定義攔截器
字母小寫(xiě)變大寫(xiě)

1.Pom.xml

 <dependencies>
        <!-- flume核心依賴(lài) -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.8.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 打包插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <!-- 編譯插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

2.自定義實(shí)現(xiàn)攔截器

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
 
import java.util.ArrayList;
import java.util.List;
 
public class MyInterceptor implements Interceptor {
    @Override
    public void initialize() {
    }
 
    @Override
    public void close() {
    }
 
    /**
     * 攔截source發(fā)送到通道channel中的消息
     *
     * @param event 接收過(guò)濾的event
     * @return event    根據(jù)業(yè)務(wù)處理后的event
     */
    @Override
    public Event intercept(Event event) {
        // 獲取事件對(duì)象中的字節(jié)數(shù)據(jù)
        byte[] arr = event.getBody();
        // 將獲取的數(shù)據(jù)轉(zhuǎn)換成大寫(xiě)
        event.setBody(new String(arr).toUpperCase().getBytes());
        // 返回到消息中
        return event;
    }
    // 接收被過(guò)濾事件集合
    @Override
    public List<Event> intercept(List<Event> events) {
        List<Event> list = new ArrayList<>();
        for (Event event : events) {
            list.add(intercept(event));
        }
        return list;
    }
 
    public static class Builder implements Interceptor.Builder {
        // 獲取配置文件的屬性
        @Override
        public Interceptor build() {
            return new MyInterceptor();
        }
 
        @Override
        public void configure(Context context) {
 
        }
    }

使用Maven做成Jar包具钥,在flume的目錄下mkdir jar,上傳此jar到j(luò)ar目錄中

Flume配置文件

ToUpCase.conf

#1.agent 
a1.sources = r1
a1.sinks =k1
a1.channels = c1
 
 
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
#全類(lèi)名$Builder
a1.sources.r1.interceptors.i1.type = ToUpCase.MyInterceptor$Builder
 
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /ToUpCase1
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件類(lèi)型液兽,默認(rèn)是 Sequencefile骂删,可用 DataStream,則為普通文本
a1.sinks.k1.hdfs.fileType = DataStream
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

運(yùn)行命令:

bin/flume-ng agent -c conf/ -n a1 -f jar/ToUpCase.conf -C jar/Flume-1.0-SNAPSHOT.jar -Dflume.root.logger=DEBUG,console

4.2.8四啰、案例七:Flume對(duì)接kafka

配置flume(flume-kafka.conf)

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/jars/calllog.csv
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = bigdata111:9092,bigdata112:9092,bigdata113:9092
a1.sinks.k1.topic = calllog
a1.sinks.k1.batchSize = 20
a1.sinks.k1.requiredAcks = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

進(jìn)入flume根目錄下宁玫,啟動(dòng)flume

/opt/module/flume-1.8.0/bin/flume-ng agent --conf /opt/module/flume-1.8.0/conf/ --name a1 --conf-file /opt/jars/flume2kafka.conf

4.2.9、案例八:kafka對(duì)接Flume
kafka2flume.conf

agent.sources = kafkaSource
agent.channels = memoryChannel
agent.sinks = hdfsSink

# The channel can be defined as follows.
agent.sources.kafkaSource.channels = memoryChannel
agent.sources.kafkaSource.type=org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.zookeeperConnect=bigdata111:2181,bigdata112:2181,bigdata113:2181
agent.sources.kafkaSource.topic=calllog
#agent.sources.kafkaSource.groupId=flume
agent.sources.kafkaSource.kafka.consumer.timeout.ms=100


agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.capacity=10000
agent.channels.memoryChannel.transactionCapacity=1000
agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.capacity=10000
agent.channels.memoryChannel.transactionCapacity=1000


# the sink of hdfs
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.channel = memoryChannel
agent.sinks.hdfsSink.hdfs.path=hdfs://bigdata111:9000/kafka2flume
agent.sinks.hdfsSink.hdfs.writeFormat=Text
agent.sinks.hdfsSink.hdfs.fileType=DataStream
#這兩個(gè)不配置柑晒,會(huì)產(chǎn)生大量的小文件
agent.sinks.hdfsSink.hdfs.rollSize=0
agent.sinks.hdfsSink.hdfs.rollCount=0

啟動(dòng)命令

bin/flume-ng agent --conf conf --conf-file jobconf/kafka2flume.conf --name agent -Dflume.root.logger=INFO,console

注意:這個(gè)配置是從kafka過(guò)數(shù)據(jù)欧瘪,但是需要重新向kafka的topic灌數(shù)據(jù),他才會(huì)傳到HDFS

嗯敦迄,需要多練習(xí)恋追,實(shí)戰(zhàn)!

rz和sz工具罚屋,上傳和下載工具

yum install -y lrzsz

下一篇:091-BigData-19Flume與Flume之間數(shù)據(jù)傳遞

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市嗅绸,隨后出現(xiàn)的幾起案子脾猛,更是在濱河造成了極大的恐慌,老刑警劉巖鱼鸠,帶你破解...
    沈念sama閱讀 221,273評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件猛拴,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡蚀狰,警方通過(guò)查閱死者的電腦和手機(jī)愉昆,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)麻蹋,“玉大人跛溉,你說(shuō)我怎么就攤上這事。” “怎么了芳室?”我有些...
    開(kāi)封第一講書(shū)人閱讀 167,709評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵专肪,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我堪侯,道長(zhǎng)嚎尤,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,520評(píng)論 1 296
  • 正文 為了忘掉前任伍宦,我火速辦了婚禮芽死,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘次洼。我一直安慰自己收奔,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,515評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布滓玖。 她就那樣靜靜地躺著坪哄,像睡著了一般。 火紅的嫁衣襯著肌膚如雪势篡。 梳的紋絲不亂的頭發(fā)上翩肌,一...
    開(kāi)封第一講書(shū)人閱讀 52,158評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音禁悠,去河邊找鬼念祭。 笑死,一個(gè)胖子當(dāng)著我的面吹牛碍侦,可吹牛的內(nèi)容都是我干的粱坤。 我是一名探鬼主播,決...
    沈念sama閱讀 40,755評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼瓷产,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼站玄!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起濒旦,我...
    開(kāi)封第一講書(shū)人閱讀 39,660評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤株旷,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后尔邓,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體晾剖,經(jīng)...
    沈念sama閱讀 46,203評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,287評(píng)論 3 340
  • 正文 我和宋清朗相戀三年梯嗽,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了齿尽。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,427評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡灯节,死狀恐怖循头,靈堂內(nèi)的尸體忽然破棺而出绵估,到底是詐尸還是另有隱情,我是刑警寧澤贷岸,帶...
    沈念sama閱讀 36,122評(píng)論 5 349
  • 正文 年R本政府宣布壹士,位于F島的核電站,受9級(jí)特大地震影響偿警,放射性物質(zhì)發(fā)生泄漏躏救。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,801評(píng)論 3 333
  • 文/蒙蒙 一螟蒸、第九天 我趴在偏房一處隱蔽的房頂上張望盒使。 院中可真熱鬧,春花似錦七嫌、人聲如沸少办。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,272評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)英妓。三九已至,卻和暖如春绍赛,著一層夾襖步出監(jiān)牢的瞬間蔓纠,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,393評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工吗蚌, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留腿倚,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,808評(píng)論 3 376
  • 正文 我出身青樓蚯妇,卻偏偏與公主長(zhǎng)得像敷燎,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子箩言,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,440評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容