當(dāng)前基于kafaka最新版 kafka_2.12-2.2.1.tgz 進(jìn)行配置 。
官網(wǎng)地址:http://kafka.apache.org/intro
kafka的一些基礎(chǔ)知識 參考:http://www.hechunbo.com/index.php/archives/140.html
-
配置java環(huán)境安裝jdk
-
解壓kafaka
[root@localhost hcb]# tar -zxvf kafka_2.12-2.2.1.tgz -C /usr/local
-
啟動(dòng)zookeeper .因?yàn)樽钚掳?已經(jīng)包含有zookeeper 所以不用另外安裝了
[root@localhost kafka_2.12-2.2.1]# bin/zookeeper-server-start.sh config/zookeeper.properties [2019-06-22 17:47:49,667] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
-
重新開一個(gè)連接 忧饭。輸入jps 發(fā)現(xiàn)多了一個(gè)進(jìn)程
[root@localhost ~]# jps 3136 Jps 2842 QuorumPeerMain
-
啟動(dòng)kafka
[root@localhost kafka_2.12-2.2.1]# ./bin/kafka-server-start.sh config/server.properties [2019-06-22 17:51:18,786] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2019-06-22 17:51:20,624] INFO starting (kafka.server.KafkaServer)
-
再開一個(gè)連接 輸入jps查看當(dāng)前運(yùn)行的進(jìn)程
發(fā)現(xiàn)多了一個(gè)kafka[root@localhost ~]# jps 3504 Jps 2842 QuorumPeerMain 3147 Kafka
-
創(chuàng)建一個(gè)topic
[root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test [root@localhost kafka_2.12-2.2.1]#
-
查看topic消息
[root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --list --bootstrap-server localhost:9092 test
-
發(fā)送消息 到test
[root@localhost kafka_2.12-2.2.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test hi,>welcome to to kafka >hi ,how are you
-
消費(fèi)者取消息
[root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning hi,welcome to to kafka hi ,how are you
生產(chǎn)者發(fā)送消息以扣,消費(fèi)者有通知 的烁,
1561197972428 -
進(jìn)行多臺機(jī)子測試
因?yàn)槲覀兪菃闻_機(jī)子,所以把配置文件復(fù)制兩份,更改端口和id配置進(jìn)行第二臺,第三臺的模擬-
[root@localhost ~]# cd /usr/local/kafka_2.12-2.2.1/ [root@localhost kafka_2.12-2.2.1]# cp config/server.properties config/server-1.properties [root@localhost kafka_2.12-2.2.1]# cp config/server.properties config/server-2.properties
修改第二臺機(jī)子的配置
vi config/server-1.properties log.dirs=/tmp/kafka-logs-1 listeners=PLAINTEXT://:9093 broker.id=1
-
![1561198817482](https://upload-images.jianshu.io/upload_images/9129298-90f2d59fb6fee96d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
修改第三臺機(jī)子
```
vi config/server-2.properties
log.dirs=/tmp/kafka-logs-2
listeners=PLAINTEXT://:9094
broker.id=2
```
![1561198876385](https://upload-images.jianshu.io/upload_images/9129298-1ebc35f5697ff16b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
-
啟動(dòng)新模擬的兩臺服務(wù)器
[root@localhost kafka_2.12-2.2.1]# bin/kafka-server-start.sh config/server-1.properties [2019-06-22 18:23:56,237] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
新開連接 繼續(xù)啟動(dòng)第三臺造寝,順便查看下當(dāng)前的進(jìn)程 磕洪。發(fā)現(xiàn)有兩個(gè)kafka存在了
[root@localhost ~]# jps 4370 ConsoleProducer 2842 QuorumPeerMain 5642 Jps 3147 Kafka 4955 ConsoleConsumer 5278 Kafka [root@localhost ~]# cd /usr/local/kafka_2.12-2.2.1/ ^C[root@localhost kafka_2.12-2.2.1]# bin/kafka-server-start.sh config/server-2.properties [2019-06-22 18:27:31,947] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
新開一個(gè)連接 吭练,查看下當(dāng)前進(jìn)程 ,三個(gè)kafka正常啟動(dòng)了
[root@localhost ~]# jps 4370 ConsoleProducer 6307 Jps 2842 QuorumPeerMain 3147 Kafka 4955 ConsoleConsumer 5948 Kafka 5278 Kafka
-
創(chuàng)建一個(gè)帶有備份的topic
[root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replication-topic
-
查看哪個(gè)borke【kafka服務(wù)器】在工作
[root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replication-topic Topic:my-replication-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824 Topic: my-replication-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
leader:哪個(gè)broker在讀寫
replicas:當(dāng)前可以正常工作的kafka集群析显。當(dāng)leader掛掉時(shí)會(huì)自動(dòng)替補(bǔ)
isr:同步消息的列表集合
-
查看我們之前創(chuàng)建的topic消息
當(dāng)時(shí)我們只有一個(gè)kafka服務(wù)器鲫咽。可以看只leader是0谷异,替被和備份的都是0分尸,
[root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824 Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
-
在新的topic中發(fā)布新的消息
[root@localhost kafka_2.12-2.2.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replication-topic >message one >message two
-
消費(fèi)者去獲取消息
[root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replication-topic message one message two
-
檢查當(dāng)前的leader
[root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replication-topic Topic:my-replication-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824 Topic: my-replication-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
-
模擬leader1掛掉以后的狀態(tài)
把leader1關(guān)掉
檢查leader1的進(jìn)程
ps aux 顯示用戶當(dāng)前的所有進(jìn)程 。并根據(jù)grep后面的內(nèi)容進(jìn)行搜索
用kill殺死相關(guān)進(jìn)程
[root@localhost kafka_2.12-2.2.1]# ps aux | grep server-1.properties root 5278 3.5 20.5 3232460 205560 pts/5 Sl+ 18:23 1:06 /usr/local/jdk1.8.0_211/bin/java -Xmx1G [root@localhost kafka_2.12-2.2.1]# kill -9 5278
-
再次檢查當(dāng)前topic的消息
發(fā)現(xiàn)leader已經(jīng)從1變成了2.
[root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replication-topic Topic:my-replication-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824 Topic: my-replication-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
-
使用kafka connect 導(dǎo)入導(dǎo)出數(shù)據(jù)
souce connector 從text.txt讀取文件 歹嘹,把內(nèi)容發(fā)送到connect-test., sink connector 從conect-test讀寫消息
[root@localhost kafka_2.12-2.2.1]# bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties [2019-06-22 19:05:55,493] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67)
進(jìn)行jps分發(fā)現(xiàn)多了一個(gè)ConnectStandalone的進(jìn)程
[root@localhost ~]# jps 4370 ConsoleProducer 9478 Jps 9160 ConnectStandalone 2842 QuorumPeerMain 3147 Kafka 4955 ConsoleConsumer 5948 Kafka
顯示文件內(nèi)容
more 命令類似 cat 箩绍,不過會(huì)以一頁一頁的形式顯示,更方便使用者逐頁閱讀尺上,
[root@localhost kafka_2.12-2.2.1]# more test.sink.txt foo bar
使用消費(fèi)者控制 臺顯示
[root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"}
繼續(xù)測試
生產(chǎn)者進(jìn)行消息追加
[root@localhost kafka_2.12-2.2.1]# echo -e "foo\nbarddddaaa\aaaaa\dddd\1\2\2\3" > test.txt [root@localhost kafka_2.12-2.2.1]# echo -e "foo\nbarddddaaa\aaaaa\dddd\1\2\2\3\new append" > test.txt
消費(fèi)者進(jìn)行實(shí)時(shí)顯示
[root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"} {"schema":{"type":"string","optional":false},"payload":"dddd"} {"schema":{"type":"string","optional":false},"payload":"aaaaaaad"} {"schema":{"type":"string","optional":false},"payload":"dd"} ^[[A^[[A^[[B{"schema":{"type":"string","optional":false},"payload":"1\\2\\2\\3"} {"schema":{"type":"string","optional":false},"payload":"ew append"}