Flume實踐
實例一:
單機畜吊,監(jiān)控指定端口泽疆,輸出到控制臺
一、步驟:
- 1.編輯配置文件
- 2.啟動flume
- 3.登錄指定主機玲献,指定端口于微,發(fā)送數(shù)據(jù)
- 4.查看控制臺輸出
二、過程記錄
1.example.conf內(nèi)容
# 給agent的三個組件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 配置sink
a1.sinks.k1.type = logger
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# source青自、sink與channel之間的綁定連接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.啟動flume
命令:flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console
3.登錄主機發(fā)送數(shù)據(jù)
4.查看控制臺輸出
實例二:
示意圖:
h1和h2監(jiān)聽指定端口的http請求株依,將數(shù)據(jù)發(fā)送給h3,h3把數(shù)據(jù)發(fā)送到HDFS
一延窜、步驟:
- 1.分別編輯h1恋腕,h2,h3配置文件
- 2.分別啟動三臺機器的flume
- 3.發(fā)送http請求給h1逆瑞,h2
- 4.查看HDFS目錄
二荠藤、過程記錄
1.h1和h2配置文件
# 給agent的三個組件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source
a1.sources.r1.type = http
a1.sources.r1.port = 8888
# 配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = h3
a1.sinks.k1.port = 4141
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# source伙单、sink與channel之間的綁定連接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.h3配置文件
# agent的三個組件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# 配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://h1:9000/flumeData
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# source、sink與channel之間的綁定連接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.分別啟動flume
h3命令:flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console
h1和h2命令:flume-ng agent --conf conf --conf-file conf/example.conf --name a1 -Dflume.root.logger=INFO,console
4.發(fā)送http請求給h1哈肖,h2
5.查看HDFS目錄文件內(nèi)容
實例三:
示例圖:
h1數(shù)據(jù)源:監(jiān)聽指定文件內(nèi)容的變化吻育。
h1輸出:h2和HDFS。
h2輸出:落地到本地文件系統(tǒng)淤井。
一布疼、步驟:
- 1.分別編輯h1,h2配置文件
- 2.分別啟動兩臺機器的flume
- 3.追加內(nèi)容到被監(jiān)聽文件
- 4.查看HDFS目錄和h2上文件目錄
二币狠、過程記錄
1.h1配置文件
# 給agent的三個組件命名
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 配置source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/vagrant/testdir/flumeTestData
a1.sources.r1.channels = c1 c2
# 配置flow1的channel和sink
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = h2
a1.sinks.k1.port = 4141
a1.sinks.k1.channel = c1
# 配置flow2的channel和sink
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://h1:9000/flumeData
a1.sinks.k2.channel = c2
2.h2配置文件
# 給agent的三個組件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 配置source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
# 配置sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /home/vagrant/testdir/flumelog
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# source游两、sink與channel之間的綁定連接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3.啟動flume
h1和h2命令:flume-ng agent --conf conf --conf-file conf/example3.conf --name a1 -Dflume.root.logger=INFO,console
4.追加內(nèi)容到被監(jiān)聽文件
[root@h1 testdir]# echo "123" >> flumeTestData
[root@h1 testdir]# echo "123456" >> flumeTestData
5.查看HDFS和h2文件目錄變化
實例四:
flume收集數(shù)據(jù)發(fā)送到kafka集群
一、步驟:
- 1.編輯h1配置文件
- 2.分別啟動h1的flume漩绵,啟動h1和h2贱案、h3組成的kafka集群,啟動消費者
- 3.追加內(nèi)容到被監(jiān)聽文件
- 4.觀察消費者接收的數(shù)據(jù)
二止吐、過程記錄
1.h1配置文件
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/vagrant/testdir/flumeTestData
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = first-topic
a1.sinks.k1.kafka.bootstrap.servers = h1:9092,h2:9092,h3:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
a1.sinks.k1.channel = c1
2.啟動h1的flume
flume-ng agent --conf conf --conf-file conf/kafka.conf --name a1 -Dflume.root.logger=INFO,console
3.追加數(shù)據(jù)到被監(jiān)聽文件宝踪,查看消費者