監(jiān)控端口數(shù)據(jù)官方案例
需求
首先,F(xiàn)lume監(jiān)控本機(jī)55566端口宜岛,然后通過telnet工具向本機(jī)55566端口發(fā)送消息,最后Flume將監(jiān)聽的數(shù)據(jù)實(shí)時(shí)顯示在控制臺(tái)
分析
- 通過telnet工具向本機(jī)的55566端口發(fā)送數(shù)據(jù)
- Flume監(jiān)控本機(jī)的55566端口功舀,通過Flume的source端讀取數(shù)據(jù)
- Flume將獲取的數(shù)據(jù)通過Sink端寫出到控制臺(tái)
實(shí)現(xiàn)步驟
- 安裝telnet
在/opt/module目錄下創(chuàng)建flume-telnet文件夾
mkdir flume-telnet
將telnet-0.17-59.el7.x86_64.rpm和telnet-server-0.17-59.el7.x86_64.rpm拷入到/opt/module/flume-telnet文件夾下面
cp -r CentOS7.2\ telnet/* flume-telnet/
執(zhí)行如下命令
rpm -ivh telnet-0.17-59.el7.x86_64.rpm
rpm -ivh telnet-server-0.17-59.el7.x86_64.rpm - 判斷端口55566端口是否被占用
sudo netstat -tunlp | grep 55566
功能描述:netstat命令是一個(gè)監(jiān)控TCP/IP網(wǎng)絡(luò)的非常有用的工具萍倡,它可以顯示路由表、實(shí)際的網(wǎng)絡(luò)連接以及每一個(gè)網(wǎng)絡(luò)接口設(shè)備的狀態(tài)信息辟汰。
基本語法:netstat [選項(xiàng)]
選項(xiàng)參數(shù):
-t或--tcp列敲,顯示TCP傳輸協(xié)議的連線狀況;
-u或--udp帖汞,顯示UDP傳輸協(xié)議的連線狀況戴而;
-n或--numeric,直接使用ip地址翩蘸,而不通過域名服務(wù)器所意;
-l或--listening,顯示監(jiān)控中的服務(wù)器的Socket;
-p或--programs扶踊,顯示正在使用Socket的程序識(shí)別碼和程序名稱泄鹏; - 創(chuàng)建Flume Agent配置文件flume-telnet-logger.conf
在flume目錄下創(chuàng)建job文件夾并進(jìn)入job文件夾
cd /opt/module/flume
mkdir job
cd job
在job文件夾下創(chuàng)建Flume Agent配置文件flume-telnet-logger.conf
vim flume-telnet-logger.conf
在flume-telnet-logger.conf文件中添加如下內(nèi)容
#表示將r1和c1連接起來
# Name the components on this agent #表示agent的名稱
#r1表示a1的輸入源
a1.sources = r1
#k1表示a1的輸出目的地
a1.sinks = k1
#c1表示a1的緩沖區(qū)
a1.channels = c1
# Describe/configure the source
#表示a1的輸入源類型為netcat端口類型
a1.sources.r1.type = netcat
#表示a1的監(jiān)聽的主機(jī)
a1.sources.r1.bind = localhost
#表示a1的監(jiān)聽的端口號(hào)
a1.sources.r1.port = 55566
# Describe the sink
#表示a1的輸出目的地是控制臺(tái)logger類型
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
#表示a1的channel類型是memory內(nèi)存型
a1.channels.c1.type = memory
#表示a1的channel總?cè)萘渴?000個(gè)event
a1.channels.c1.capacity = 1000
#表示a1的channel傳輸時(shí)收集到了100條event以后再去提交事務(wù)
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
#表示將r1和c1連接起來
a1.sources.r1.channels = c1
#表示將k1和c1連接起來
a1.sinks.k1.channel = c1
- 先開啟flume監(jiān)聽端口
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-telnet-logger.conf -Dflume.root.logger=INFO,console
參數(shù)說明:
--conf conf/ :表示配置文件存儲(chǔ)在conf/目錄
--name a1 :表示給agent起名為a1
--conf-file job/flume-telnet.conf :flume本次啟動(dòng)讀取的配置文件是在job文件夾下的flume-telnet.conf文件。
-Dflume.root.logger==INFO,console :-D表示flume運(yùn)行時(shí)動(dòng)態(tài)修改flume.root.logger參數(shù)屬性值秧耗,并將控制臺(tái)日志打印級(jí)別設(shè)置為INFO級(jí)別备籽。日志級(jí)別包括:log、info分井、warn车猬、error。 - 使用telnet工具向本機(jī)的55566端口發(fā)送內(nèi)容
telnet localhost 55566
實(shí)時(shí)讀取本地文件到HDFS案例
需求杂抽,實(shí)時(shí)監(jiān)控Hive日志诈唬,并上傳到HDFS中
- Flume要想將數(shù)據(jù)輸出到HDFS,必須持有Hadoop相關(guān)jar包
將commons-configuration-1.6.jar缩麸、hadoop-auth-2.8.3.jar铸磅、hadoop-common-2.8.3.jar、hadoop-hdfs-2.8.3.jar杭朱、commons-io-2.4.jar阅仔、htrace-core4-4.0.1-incubating.jar拷貝到/opt/module/flume/lib文件夾下,后兩個(gè)jar包是1.99版本必須引用的jar弧械,其他版本可以不用
其中commons-configuration-1.6.jar八酒,hadoop-auth-2.8.3.jar,commons-io-2.4.jar刃唐、htrace-core4-4.0.1-incubating.jar在/opt/module/hadoop-2.8.3/share/hadoop/common/lib可以找到羞迷,hadoop-common-2.8.3.jar在/opt/module/hadoop-2.8.3/share/hadoop/common可以找到、hadoop-hdfs-2.8.3.jar可以再/opt/module/hadoop-2.8.3/share/hadoop/hdfs找到 - 創(chuàng)建flume-file-hdfs.conf文件
vim flume-file-hdfs.conf - 輸入以下內(nèi)容
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop-100:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k2.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設(shè)置文件類型画饥,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無關(guān)
a2.sinks.k2.hdfs.rollCount = 0
#最小冗余數(shù)
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
- 執(zhí)行監(jiān)控配置
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf -
查看hdfs文件
實(shí)時(shí)讀取目錄文件到HDFS案例
需求衔瓮,使用flume監(jiān)聽整個(gè)目錄的文件
- 創(chuàng)建配置文件flume-dir-hdfs.conf
vim flume-dir-hdfs.conf - 添加如下內(nèi)容
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結(jié)尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop-100:9000/flume/upload/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時(shí)間滾動(dòng)文件夾
a3.sinks.k3.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時(shí)間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設(shè)置文件類型抖甘,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無關(guān)
a3.sinks.k3.hdfs.rollCount = 0
#最小冗余數(shù)
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
- 啟動(dòng)監(jiān)控文件夾命令
bin/flume-ng agent --name a3 --conf conf/ --conf-file job/flume-dir-hdfs.conf - 創(chuàng)建文件夾并添加新文件
-
結(jié)果
單數(shù)據(jù)源多出口案例一
需求热鞍,使用flume-1監(jiān)控文件變動(dòng),flume-1將變動(dòng)內(nèi)容傳遞給flume-2衔彻,flume-2負(fù)責(zé)存儲(chǔ)到HDFS薇宠。同時(shí)flume-1將變動(dòng)內(nèi)容傳遞給flume-3,flume-3負(fù)責(zé)輸出到local filesystem
- 準(zhǔn)備工作
在/opt/module/flume/job目錄下創(chuàng)建group1文件夾
在/opt/module/datas/目錄下創(chuàng)建flume3文件夾
了解一下Avro和RPC
Avro是由Hadoop創(chuàng)始人Doug Cutting創(chuàng)建的一種語言無關(guān)的數(shù)據(jù)序列化和RPC框架
RPC(Remote Procedure Call)—遠(yuǎn)程過程調(diào)用艰额,它是一種通過網(wǎng)絡(luò)從遠(yuǎn)程計(jì)算機(jī)程序上請(qǐng)求服務(wù)澄港,而不需要了解底層網(wǎng)絡(luò)技術(shù)的協(xié)議 - 創(chuàng)建創(chuàng)建flume-file-flume.conf
配置1個(gè)接收日志文件的source和兩個(gè)channel、兩個(gè)sink柄沮,分別輸送給flume-flume-hdfs和flume-flume-dir
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數(shù)據(jù)流復(fù)制給多個(gè)channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop-100
a1.sinks.k1.port = 14141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop-100
a1.sinks.k2.port = 14142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
- 創(chuàng)建flume-flume-hdfs.conf
配置上級(jí)flume輸出的source慢睡,輸出是到hdfs的sink
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop-100
a2.sources.r1.port = 14141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop-100:9000/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k1.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型逐工,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無關(guān)
a2.sinks.k1.hdfs.rollCount = 0
#最小冗余數(shù)
a2.sinks.k1.hdfs.minBlockReplicas = 1
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
- 創(chuàng)建flume-flume-dir.conf
配置上級(jí)flume輸出的source,輸出是到本地目錄的sink
#name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop-100
a3.sources.r1.port = 14142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/datas/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
- 執(zhí)行配置文件
bin/flume-ng agent --name a3 --conf conf/ --conf-file job/group1/flume-flume-dir.conf
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf -
檢查HDFS和本地的文件
單數(shù)據(jù)源多出口案例二
需求漂辐,使用flume-1監(jiān)控文件變動(dòng),flume-1將變動(dòng)內(nèi)容傳遞給flume-2棕硫,flume-2負(fù)責(zé)打印數(shù)據(jù)到控制臺(tái)髓涯。同時(shí)flume-1將變動(dòng)內(nèi)容傳遞給flume-3,flume-3也負(fù)責(zé)打印數(shù)據(jù)到控制臺(tái)哈扮,完成負(fù)載均衡功能
- 準(zhǔn)備工作
在/opt/module/flume/job目錄下創(chuàng)建group2文件夾 - 創(chuàng)建flume-netcat-flume.conf
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g2
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g2.processor.type = load_balance
a1.sinkgroups.g2.processor.backoff = true
a1.sinkgroups.g2.processor.selector = round_robin
a1.sinkgroups.g2.processor.selector.maxTimeOut=10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop-100
a1.sinks.k1.port = 14141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop-100
a1.sinks.k2.port = 14142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g2.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
- 創(chuàng)建flume-flume1.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop-100
a2.sources.r1.port = 14141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
- 創(chuàng)建flume-flume2.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop-100
a3.sources.r1.port = 14142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
- 啟動(dòng)
bin/flume-ng agent --name a2 --conf conf/ --conf-file job/group2/flume-flume1.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --name a3 --conf conf/ --conf-file job/group2/flume-flume2.conf -Dflume.root.logger=INFO,console
bin/flume-ng agent --name a1 --conf conf/ --conf-file job/group2/flume-telnet-flume.conf -
查看結(jié)果
多數(shù)據(jù)源匯總案例
需求纬纪,hadoop-100上的flume-1監(jiān)控文件hive.log,hadoop-101上的flume-2監(jiān)控某一個(gè)端口的數(shù)據(jù)流滑肉,flume-1與flume-2將數(shù)據(jù)發(fā)送給hadoop-102上的flume-3包各,flume-3將最終數(shù)據(jù)上傳到hdfs上
- 準(zhǔn)備
分發(fā)flume,創(chuàng)建group3文件夾
xsync flume
在hadoop-100靶庙,hadoop-101问畅,hadoop-102的/opt/module/flume/job/目錄下創(chuàng)建group3文件夾
mkdir group3 - 在hadoop-100創(chuàng)建flume1.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop-102
a1.sinks.k1.port = 14141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 在hadoop-101創(chuàng)建flume2.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop-101
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop-102
a2.sinks.k1.port = 14141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
- 在hadoop-102創(chuàng)建flume3.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop-102
a3.sources.r1.port = 14141
# Describe the sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://hadoop-100:9000/flume3/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k1.hdfs.filePrefix = flume3-
#是否按照時(shí)間滾動(dòng)文件夾
a3.sinks.k1.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a3.sinks.k1.hdfs.roundValue = 1
#重新定義時(shí)間單位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a3.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a3.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a3.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無關(guān)
a3.sinks.k1.hdfs.rollCount = 0
#最小冗余數(shù)
a3.sinks.k1.hdfs.minBlockReplicas = 1
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
- 啟動(dòng)
在hadoop-100上執(zhí)行
bin/flume-ng agent --name a1 --conf conf/ --conf-file job/group3/flume1.conf
在hadoop-101上執(zhí)行
bin/flume-ng agent --name a2 --conf conf/ --conf-file job/group3/flume2.conf
在hadoop-102上執(zhí)行
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3.conf -
查看結(jié)果