一杨名、單broker
1.下載解壓并進(jìn)入目錄
????$ cd /Users/didi/Softwres/kafka_2.11-0.11.0.2
2.啟動(dòng)服務(wù)
? ? 啟動(dòng)zookeeper:$ bin/zookeeper-server-start.sh config/zookeeper.properties
? ? 啟動(dòng)kafka:$ bin/kafka-server-start.sh config/server.properties
3.創(chuàng)建一個(gè)主題(名稱為test布疼,只有一個(gè)分區(qū)贞瞒,一份復(fù)制)
????$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
4.檢查是否創(chuàng)建成功
????$ bin/kafka-topics.sh --list --zookeeper localhost:2181
5.向kafka集群發(fā)送一些信息
? ? 啟動(dòng)producer,并發(fā)送一些信息
????$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test? ??
>This is a message
>This is another message
>
6.啟動(dòng)一個(gè)consumer并消費(fèi)發(fā)送的信息
? ??$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
二、啟動(dòng)多個(gè)broker詳見(jiàn):https://kafka.apache.org/quickstart#quickstart_multibroker
?1.? broker.id屬性是集群中每個(gè)節(jié)點(diǎn)的唯一標(biāo)識(shí),由于集群?jiǎn)?dòng)在單一電腦上踊餐,cp配置文件并修改
$ cp server.properties server-1.properties
$ cp config/server.properties config/server-2.properties
修改如下:
? ??config/server-1.properties:
????broker.id=1
????listeners=PLAINTEXT://:9093
????log.dirs=/tmp/kafka-logs-1
-------------------------------------------------------------
????config/server-2.properties:
????broker.id=2
????listeners=PLAINTEXT://:9094
????log.dirs=/tmp/kafka-logs-2
2.啟動(dòng)這兩個(gè)server
? ? ? ? $?bin/kafka-server-start.sh config/server-1.properties
? ? ? ? $? bin/kafka-server-start.sh config/server-2.properties
目前有三個(gè)server啟動(dòng)
3.創(chuàng)建一個(gè)新的topic(my-replicated-topic)?
三分復(fù)制,三個(gè)分區(qū)
? ?$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic my-replicated-topic
查看現(xiàn)在有的主題:
? ? $ bin/kafka-topics.sh --list --zookeeper localhost:2181
? ??????__consumer_offsets
????????my-replicated-topic
????????test
4. 查看這個(gè)主題的描述信息:
? ? $ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
? ??????Topic:my-replicated-topic PartitionCount:3 ReplicationFactor:3 Configs:(注:總的描述诺擅,下面三行為每一個(gè)分區(qū)的信息)
????????????Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
????????????Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
????????????Topic: my-replicated-topic Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
? ? Leader:負(fù)責(zé)該分區(qū)的數(shù)據(jù)讀寫(xiě)
????Replicas:分區(qū)的復(fù)制所在的三個(gè)節(jié)點(diǎn)
? ? 5.向kafka集群發(fā)送一些信息
? ??????$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
? ? ? ? ? ? >my test message 1
????????????>my test message 2
????????????>
6.消費(fèi)信息
? ??$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
????my test message 2
????my test message 1
????my test message 3
7.容錯(cuò)能力測(cè)試
? ? 查看server1的進(jìn)程號(hào)市袖,并把它殺死
? ??$ ps aux | grep server-1.properties
? ??????kill -9 9485
? ? 可以看到控制臺(tái)server1被殺死
重新查看描述,發(fā)現(xiàn)server1是leader的已經(jīng)不存在:
? ?$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
? ? ? Topic:my-replicated-topic PartitionCount:3 ReplicationFactor:3 Configs:
? ? ? Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0
? ? ? Topic: my-replicated-topic Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,2
? ? ? Topic: my-replicated-topic Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 2,0
8.重新啟動(dòng),producer和consumer苍碟,繼續(xù)發(fā)送消息酒觅,沒(méi)有影響
三、使用kafka輸入輸出信息
1.生成一個(gè)文件
? ??$ echo -e "foo\nbar">test.txt
2.啟動(dòng)connector
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
以上配置文件(詳見(jiàn)配置文件信息):默認(rèn)topic:connect-test
? ? ? ? ? ? ? ? ? ? 默認(rèn)文件:安裝目錄下:test.txt
? ? ? ? ? ? ? ? ? ? 默認(rèn)輸出:安裝目錄下:test.sink.txt
3.?$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning