一、安裝kafka
請(qǐng)參考:kafka安裝
二涌哲、安裝logstash
請(qǐng)參考:logstash安裝
三哄孤、kafka寫(xiě)入logstash
3.1 注意
- 請(qǐng)注意kafka版本必須為 kafka_2.10-0.10.0.1
- kafka争拐、kafka-client猾昆、logstash具體對(duì)應(yīng)關(guān)系可以參考:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
3.2 相關(guān)主機(jī)配置上hosts文件
192.168.200.21 elk-node1
192.168.200.81 flume
192.168.200.91 kafka
3.3 配置啟動(dòng)kafka
mkdir -p /data/kafka/kafka-logs
grep '^[a-z]' /usr/local/kafka/config/server.properties
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=kafka:2181
zookeeper.connection.timeout.ms=6000
- 啟動(dòng)zookeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &
- 啟動(dòng)kafka
bin/kafka-server-start.sh -daemon config/server.properties &
- 創(chuàng)建topic
bin/kafka-topics.sh --create --zookeeper kafka:2181 --replication-factor 1 --partitions 1 --topic zsdaitest
- 生產(chǎn)數(shù)據(jù)
bin/kafka-console-producer.sh --broker-list kafka:9092 --topic zsdaitest
- 消費(fèi)數(shù)據(jù)
bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --zookeeper kafka:2181 --topic zsdaitest --from-beginning
3.4 配置啟動(dòng)logstash
- vim 1-inputKafka.conf
input{
kafka {
topics => ["zsdaitest"]
bootstrap_servers => "kafka:9092"
}
}
output{
stdout{
codec => rubydebug
}
}
- 啟動(dòng)
./bin/logstash -f 1-inputKafka.conf
- 啟動(dòng)后玉凯,在kafka生產(chǎn)端輸入任意字符進(jìn)行測(cè)試仲智,如:kafka test1沉帮,然后觀察logstash控制臺(tái)屡萤,如果有接受到對(duì)應(yīng)字符信息珍剑,則成功。