使用Flume-1監(jiān)控文件變動徙垫,F(xiàn)lume-1將變動內(nèi)容傳遞給Flume-2讥裤,F(xiàn)lume-2負(fù)責(zé)存儲到HDFS。同時Flume-1將變動內(nèi)容傳遞給Flume-3姻报,F(xiàn)lume-3負(fù)責(zé)輸出到 local filesystem己英。
1、flume1
創(chuàng)建Flume-1.conf吴旋,用于監(jiān)控hive.log文件的變動剧辐,同時產(chǎn)生兩個channel和兩個sink分別輸送給Flume-2和Flume3
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數(shù)據(jù)流復(fù)制給多個channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/apache-hive-1.2.1-bin/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
2、flume2
創(chuàng)建flume-2.conf邮府,用于接收flume-1的event荧关,同時產(chǎn)生1個channel和1個sink,將數(shù)據(jù)輸送給hdfs
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位創(chuàng)建一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型褂傀,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個文件的滾動大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數(shù)量無關(guān)
a2.sinks.k1.hdfs.rollCount = 0
#最小冗余數(shù)
a2.sinks.k1.hdfs.minBlockReplicas = 1
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
3忍啤、flume3
創(chuàng)建Flume-3.conf,用于接收Flume-1的event仙辟,同時產(chǎn)生1個channel和1個sink同波,將數(shù)據(jù)輸送給本地目錄
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume/data/flume3
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
尖叫提示:輸出的本地目錄必須是已經(jīng)存在的目錄,如果該目錄不存在叠国,并不會創(chuàng)建新的目錄未檩。
4、執(zhí)行測試
分別開啟對應(yīng)flume-job(依次啟動flume-3粟焊,flume-2冤狡,flume-1),同時產(chǎn)生文件變動并觀察結(jié)果
[victor@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group-job1/flume-3.conf
[victor@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group-job1/flume-2.conf
[victor@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group-job1/flume-1.conf