前言:測試環(huán)境
LINUX:centos6.5
FLUME:1.6.0-cdh5.7.0
KAFKA:2.12-0.11.0
一熄诡、定義Kafka所使用的Topic
啟動Kafka:
$ kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
創(chuàng)建Topic:
$ kafka-topics.sh --create --zookeeper hadoopmaster:2181 --replication-factor 1 --partitions 1 --topic test_topic
查看Topic是否創(chuàng)建成功:
$ kafka-topics.sh --list --zookeeper hadoopmaster:2181
二、定義Flume Agent的類型
1、配置Agent1,類型為exec-memory-avro(監(jiān)聽某個文檔是否有文檔生成)
#定義agent name,source,sinks,channels的對應(yīng)名稱
exec-memory-avro.sources=exec-source
exec-memory-avro.sinks=avro-sink
exec-memory-avro.channels=memory-channel
#定義source為exec澎粟,監(jiān)聽目錄及方式
exec-memory-avro.sources.exec-source.type=exec
exec-memory-avro.sources.exec-source.command=tail -F /home/hadoop/data/test.log
exec-memory-avro.sources.exec-source.shell=/bin/sh -c
#定義sinks為avro,對應(yīng)的主機名及端口
exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = hadoopmaster
exec-memory-avro.sinks.avro-sink.port = 22222
#定義channel為memory方式欢瞪,統(tǒng)一source和sink的channel
exec-memory-avro.channels.memory-channel.type = memory
exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel
2活烙、配置Agent2,類型為avro-memory-kafka(監(jiān)聽某個文檔是否有文檔生成)
#同上遣鼓,接收agent1 avro sink的消息
avro-memory-kafka.sources=avro-source
avro-memory-kafka.sinks=kafka-sink
avro-memory-kafka.channels=memory-channel
avro-memory-kafka.sources.avro-source.type=avro
avro-memory-kafka.sources.avro-source.bind=hadoopmaster
avro-memory-kafka.sources.avro-source.port=22222
#Kafka sinks配置啸盏,詳情請參考flume官方文檔“Kafka Sink”
avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
avro-memory-kafka.sinks.kafka-sink.brokerList = hadoopmaster:9092
avro-memory-kafka.sinks.kafka-sink.topic = test_topic
avro-memory-kafka.sinks.kafka-sink.batchSize = 5? ##當(dāng)消息量多少條才處理
avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1
avro-memory-kafka.channels.memory-channel.type = memory
avro-memory-kafka.sources.avro-source.channels = memory-channel
avro-memory-kafka.sinks.kafka-sink.channel = memory-channel
三、Flume與Kafka啟動
1骑祟、先啟動agent2(avro-memory-kafka)
$ flume-ng agent --name avro-memory-kafka --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro-memory-kafka -Dflume.root.logger = INFO,console
2回懦、啟動agent1(exec-memory-avro--flume)
$ flume-ng agent --name exec-memory-avro --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/exec-memory-avro.conf -Dflume.root.logger = INFO,console
3、啟動Kafka Consumer
?$ kafka-console-consumer.sh --zookeeper hadoopmaster:2181 --topic test_topic
四次企、在監(jiān)聽目錄上進行測試
$ echo test1 >> test.log
$ echo test2 >> test.log
$ echo test3 >> test.log
$ echo test4 >> test.log
$ echo test5 >> test.log
$ echo test6 >> test.log
若啟動的Kafka Consumer控制臺上怯晕,有對應(yīng)消息輸出,則測試成功缸棵。