安裝前的環(huán)境準(zhǔn)備
由于Kafka是用Scala語(yǔ)言開(kāi)發(fā)的,運(yùn)行在JVM上偏友,因此在安裝Kafka之前需要先安裝JDK。
yum install java-1.8.0-openjdk* -y
kafka依賴zookeeper对供,所以需要先安裝zookeeper
wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
tar -zxvf zookeeper-3.4.12.tar.gz
cd zookeeper-3.4.12
cp conf/zoo_sample.cfg conf/zoo.cfg
啟動(dòng)zookeeper
bin/zkServer.sh start
bin/zkCli.sh
ls / #查看zk的根目錄相關(guān)節(jié)點(diǎn)
[zk: localhost:2181(CONNECTED) 8] ls /
[dubbo, zookeeper, locks]
[zk: localhost:2181(CONNECTED) 9]
第一步:下載安裝包
下載1.1.0 release版本位他,并解壓:
wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -xzf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0
第二步:?jiǎn)?dòng)服務(wù)
現(xiàn)在來(lái)啟動(dòng)kafka服務(wù):
啟動(dòng)腳本語(yǔ)法:kafka-server-start.sh [-daemon] server.properties
可以看到,server.properties的配置路徑是一個(gè)強(qiáng)制的參數(shù)产场,-daemon表示以后臺(tái)進(jìn)程運(yùn)行鹅髓,否則ssh客戶端退出后,就會(huì)停止服務(wù)京景。(注意窿冯,在啟動(dòng)kafka時(shí)會(huì)使用linux主機(jī)名關(guān)聯(lián)的ip地址,所以需要把主機(jī)名和linux的ip映射配置到本地host里确徙,用vim /etc/hosts)
bin/kafka-server-start.sh -daemon config/server.properties
我們進(jìn)入zookeeper目錄通過(guò)zookeeper客戶端查看下zookeeper的目錄樹(shù)
bin/zkCli.sh
ls / #查看zk的根目錄kafka相關(guān)節(jié)點(diǎn)
ls /brokers/ids #查看kafka節(jié)點(diǎn)
[zk: localhost:2181(CONNECTED) 3] ls /
[cluster, controller, brokers, zookeeper, dubbo, admin, isr_change_notification, log_dir_event_notification, locks, controller_epoch, consumers, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 4] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 5]
第三步:創(chuàng)建主題
現(xiàn)在我們來(lái)創(chuàng)建一個(gè)名字為“test”的Topic醒串,這個(gè)topic只有一個(gè)partition执桌,并且備份因子也設(shè)置為1:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
現(xiàn)在我們可以通過(guò)以下命令來(lái)查看kafka中目前存在的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
除了我們通過(guò)手工的方式創(chuàng)建Topic,我們可以配置broker厦凤,當(dāng)producer發(fā)布一個(gè)消息某個(gè)指定的Topic鼻吮,但是這個(gè)Topic并不存在時(shí)育苟,就自動(dòng)創(chuàng)建较鼓。
[root@iZ2ze8dv3a3mevar9w80f1Z kafka_2.12-2.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
[root@iZ2ze8dv3a3mevar9w80f1Z kafka_2.12-2.1.0]#
[root@iZ2ze8dv3a3mevar9w80f1Z kafka_2.12-2.1.0]#
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics
[test]
[zk: localhost:2181(CONNECTED) 7]
第四步:發(fā)送消息
kafka自帶了一個(gè)producer命令客戶端,可以從本地文件中讀取內(nèi)容违柏,或者我們也可以以命令行中直接輸入內(nèi)容博烂,并將這些內(nèi)容以消息的形式發(fā)送到kafka集群中。在默認(rèn)情況下漱竖,每一個(gè)行會(huì)被當(dāng)做成一個(gè)獨(dú)立的消息禽篱。
首先我們要運(yùn)行發(fā)布消息的腳本,然后在命令中輸入要發(fā)送的消息的內(nèi)容:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>this is a msg
>this is a another msg
第五步:消費(fèi)消息
對(duì)于consumer馍惹,kafka同樣也攜帶了一個(gè)命令行客戶端躺率,會(huì)將獲取到內(nèi)容在命令中進(jìn)行輸出:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning #老版本
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --consumer-property client.id=consumer-1 --topic test #新版本
如果你是通過(guò)不同的終端窗口來(lái)運(yùn)行以上的命令,你將會(huì)看到在producer終端輸入的內(nèi)容万矾,很快就會(huì)在consumer的終端窗口上顯示出來(lái)悼吱。
以上所有的命令都有一些附加的選項(xiàng);當(dāng)我們不攜帶任何參數(shù)運(yùn)行命令的時(shí)候良狈,將會(huì)顯示出這個(gè)命令的詳細(xì)用法后添。
還有一些其他命令如下:
查看組名
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer
查看消費(fèi)者的消費(fèi)偏移量
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
消費(fèi)多主題
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "test|test-2"
單播消費(fèi)
一條消息只能被某一個(gè)消費(fèi)者消費(fèi)的模式,類似queue模式薪丁,只需讓所有消費(fèi)者在同一個(gè)消費(fèi)組里即可
分別在兩個(gè)客戶端執(zhí)行如下消費(fèi)命令遇西,然后往主題里發(fā)送消息,結(jié)果只有一個(gè)客戶端能收到消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test
多播消費(fèi)
一條消息能被多個(gè)消費(fèi)者消費(fèi)的模式严嗜,類似publish-subscribe模式費(fèi)粱檀,針對(duì)Kafka同一條消息只能被同一個(gè)消費(fèi)組下的某一個(gè)消費(fèi)者消費(fèi)的特性,要實(shí)現(xiàn)多播只要保證這些消費(fèi)者屬于不同的消費(fèi)組即可漫玄。我們?cè)僭黾右粋€(gè)消費(fèi)者梧税,該消費(fèi)者屬于testGroup-2消費(fèi)組,結(jié)果兩個(gè)客戶端都能收到消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup-2 --topic tes
第六步:kafka集群配置
到目前為止称近,我們都是在一個(gè)單節(jié)點(diǎn)上運(yùn)行broker第队,這并沒(méi)有什么意思。對(duì)于kafka來(lái)說(shuō)刨秆,一個(gè)單獨(dú)的broker意味著kafka集群中只有一個(gè)接點(diǎn)凳谦。要想增加kafka集群中的節(jié)點(diǎn)數(shù)量,只需要多啟動(dòng)幾個(gè)broker實(shí)例即可衡未。為了有更好的理解尸执,現(xiàn)在我們?cè)谝慌_(tái)機(jī)器上同時(shí)啟動(dòng)三個(gè)broker實(shí)例家凯。
首先,我們需要建立好其他2個(gè)broker的配置文件:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id屬性在kafka集群中必須要是唯一的如失。我們需要重新指定port和log目錄绊诲,因?yàn)槲覀兪窃谕慌_(tái)機(jī)器上運(yùn)行多個(gè)實(shí)例。如果不進(jìn)行修改的話褪贵,consumer只能獲取到一個(gè)instance實(shí)例的信息掂之,或者是相互之間的數(shù)據(jù)會(huì)被影響。
目前我們已經(jīng)有一個(gè)zookeeper實(shí)例和一個(gè)broker實(shí)例在運(yùn)行了脆丁,現(xiàn)在我們只需要在啟動(dòng)2個(gè)broker實(shí)例即可:
bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties
現(xiàn)在我們創(chuàng)建一個(gè)新的topic世舰,備份因子設(shè)置為3:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
現(xiàn)在我們已經(jīng)有了集群,并且創(chuàng)建了一個(gè)3個(gè)備份因子的topic槽卫,但是到底是哪一個(gè)broker在為這個(gè)topic提供服務(wù)呢(因?yàn)槲覀冎挥幸粋€(gè)分區(qū)跟压,所以肯定同時(shí)只有一個(gè)broker在處理這個(gè)topic)?
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
以下是輸出內(nèi)容的解釋歼培,第一行是所有分區(qū)的概要信息震蒋,之后的每一行表示每一個(gè)partition的信息。因?yàn)槟壳拔覀冎挥幸粋€(gè)partition躲庄,因此關(guān)于partition的信息只有一行查剖。
leader節(jié)點(diǎn)負(fù)責(zé)給定partition的所有讀寫(xiě)請(qǐng)求。
replicas 表示某個(gè)partition在哪幾個(gè)broker上存在備份读跷。不管這個(gè)幾點(diǎn)是不是”leader“梗搅,甚至這個(gè)節(jié)點(diǎn)掛了,也會(huì)列出效览。
isr 是replicas的一個(gè)子集无切,它只列出當(dāng)前還存活著的,并且備份了該partition的節(jié)點(diǎn)丐枉。
現(xiàn)在我們的案例中哆键,0號(hào)節(jié)點(diǎn)是leader,即使用server.properties啟動(dòng)的那個(gè)進(jìn)程瘦锹。
我們可以運(yùn)行相同的命令查看之前創(chuàng)建的名稱為”test“的topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
之前設(shè)置了topic的partition數(shù)量為1籍嘹,備份因子為1,因此顯示就如上所示了弯院。
現(xiàn)在我們向新建的topic中發(fā)送一些message:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>my test msg 1
>my test msg 2
現(xiàn)在開(kāi)始消費(fèi):
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test msg 1
my test msg 2
現(xiàn)在我們來(lái)測(cè)試我們?nèi)蒎e(cuò)性辱士,因?yàn)閎roker0目前是leader,所以我們要將其kill
ps -ef | grep server.properties
kill -9 1177
現(xiàn)在再執(zhí)行命令:
bin/kafka-topics.sh --describe --zookeeper localhost:9092 --topic my-replicated-topic
我們可以看到听绳,leader節(jié)點(diǎn)已經(jīng)變成了broker 2.要注意的是颂碘,在Isr中,已經(jīng)沒(méi)有了0號(hào)節(jié)點(diǎn)椅挣。leader的選舉也是從ISR(in-sync replica)中進(jìn)行的头岔。
此時(shí)塔拳,我們依然可以 消費(fèi)新消息:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test msg 1
my test msg 2
查看主題分區(qū)對(duì)應(yīng)的leader信息: