kafka集群搭建與使用

安裝前的環(huán)境準(zhǔn)備

由于Kafka是用Scala語(yǔ)言開(kāi)發(fā)的,運(yùn)行在JVM上偏友,因此在安裝Kafka之前需要先安裝JDK。

yum install java-1.8.0-openjdk* -y

kafka依賴zookeeper对供,所以需要先安裝zookeeper

wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz

tar -zxvf zookeeper-3.4.12.tar.gz

cd zookeeper-3.4.12

cp conf/zoo_sample.cfg conf/zoo.cfg

啟動(dòng)zookeeper

bin/zkServer.sh start

bin/zkCli.sh

ls / #查看zk的根目錄相關(guān)節(jié)點(diǎn)

[zk: localhost:2181(CONNECTED) 8] ls /
[dubbo, zookeeper, locks]
[zk: localhost:2181(CONNECTED) 9] 

第一步:下載安裝包

下載1.1.0 release版本位他,并解壓:

wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -xzf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0

第二步:?jiǎn)?dòng)服務(wù)

現(xiàn)在來(lái)啟動(dòng)kafka服務(wù):
啟動(dòng)腳本語(yǔ)法:kafka-server-start.sh [-daemon] server.properties
可以看到,server.properties的配置路徑是一個(gè)強(qiáng)制的參數(shù)产场,-daemon表示以后臺(tái)進(jìn)程運(yùn)行鹅髓,否則ssh客戶端退出后,就會(huì)停止服務(wù)京景。(注意窿冯,在啟動(dòng)kafka時(shí)會(huì)使用linux主機(jī)名關(guān)聯(lián)的ip地址,所以需要把主機(jī)名和linux的ip映射配置到本地host里确徙,用vim /etc/hosts)
bin/kafka-server-start.sh -daemon config/server.properties
我們進(jìn)入zookeeper目錄通過(guò)zookeeper客戶端查看下zookeeper的目錄樹(shù)
bin/zkCli.sh
ls / #查看zk的根目錄kafka相關(guān)節(jié)點(diǎn)
ls /brokers/ids #查看kafka節(jié)點(diǎn)

[zk: localhost:2181(CONNECTED) 3] ls /
[cluster, controller, brokers, zookeeper, dubbo, admin, isr_change_notification, log_dir_event_notification, locks, controller_epoch, consumers, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 4] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 5] 

第三步:創(chuàng)建主題

現(xiàn)在我們來(lái)創(chuàng)建一個(gè)名字為“test”的Topic醒串,這個(gè)topic只有一個(gè)partition执桌,并且備份因子也設(shè)置為1:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
現(xiàn)在我們可以通過(guò)以下命令來(lái)查看kafka中目前存在的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
除了我們通過(guò)手工的方式創(chuàng)建Topic,我們可以配置broker厦凤,當(dāng)producer發(fā)布一個(gè)消息某個(gè)指定的Topic鼻吮,但是這個(gè)Topic并不存在時(shí)育苟,就自動(dòng)創(chuàng)建较鼓。

[root@iZ2ze8dv3a3mevar9w80f1Z kafka_2.12-2.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
[root@iZ2ze8dv3a3mevar9w80f1Z kafka_2.12-2.1.0]# 
[root@iZ2ze8dv3a3mevar9w80f1Z kafka_2.12-2.1.0]# 

[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics
[test]
[zk: localhost:2181(CONNECTED) 7] 

第四步:發(fā)送消息

kafka自帶了一個(gè)producer命令客戶端,可以從本地文件中讀取內(nèi)容违柏,或者我們也可以以命令行中直接輸入內(nèi)容博烂,并將這些內(nèi)容以消息的形式發(fā)送到kafka集群中。在默認(rèn)情況下漱竖,每一個(gè)行會(huì)被當(dāng)做成一個(gè)獨(dú)立的消息禽篱。
首先我們要運(yùn)行發(fā)布消息的腳本,然后在命令中輸入要發(fā)送的消息的內(nèi)容:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
>this is a msg
>this is a another msg 

第五步:消費(fèi)消息

對(duì)于consumer馍惹,kafka同樣也攜帶了一個(gè)命令行客戶端躺率,會(huì)將獲取到內(nèi)容在命令中進(jìn)行輸出:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning #老版本
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --consumer-property client.id=consumer-1 --topic test #新版本
如果你是通過(guò)不同的終端窗口來(lái)運(yùn)行以上的命令,你將會(huì)看到在producer終端輸入的內(nèi)容万矾,很快就會(huì)在consumer的終端窗口上顯示出來(lái)悼吱。
以上所有的命令都有一些附加的選項(xiàng);當(dāng)我們不攜帶任何參數(shù)運(yùn)行命令的時(shí)候良狈,將會(huì)顯示出這個(gè)命令的詳細(xì)用法后添。
還有一些其他命令如下:

查看組名
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer
查看消費(fèi)者的消費(fèi)偏移量
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
消費(fèi)多主題
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist "test|test-2"

單播消費(fèi)

一條消息只能被某一個(gè)消費(fèi)者消費(fèi)的模式,類似queue模式薪丁,只需讓所有消費(fèi)者在同一個(gè)消費(fèi)組里即可
分別在兩個(gè)客戶端執(zhí)行如下消費(fèi)命令遇西,然后往主題里發(fā)送消息,結(jié)果只有一個(gè)客戶端能收到消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test

多播消費(fèi)
一條消息能被多個(gè)消費(fèi)者消費(fèi)的模式严嗜,類似publish-subscribe模式費(fèi)粱檀,針對(duì)Kafka同一條消息只能被同一個(gè)消費(fèi)組下的某一個(gè)消費(fèi)者消費(fèi)的特性,要實(shí)現(xiàn)多播只要保證這些消費(fèi)者屬于不同的消費(fèi)組即可漫玄。我們?cè)僭黾右粋€(gè)消費(fèi)者梧税,該消費(fèi)者屬于testGroup-2消費(fèi)組,結(jié)果兩個(gè)客戶端都能收到消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup-2 --topic tes

第六步:kafka集群配置

到目前為止称近,我們都是在一個(gè)單節(jié)點(diǎn)上運(yùn)行broker第队,這并沒(méi)有什么意思。對(duì)于kafka來(lái)說(shuō)刨秆,一個(gè)單獨(dú)的broker意味著kafka集群中只有一個(gè)接點(diǎn)凳谦。要想增加kafka集群中的節(jié)點(diǎn)數(shù)量,只需要多啟動(dòng)幾個(gè)broker實(shí)例即可衡未。為了有更好的理解尸执,現(xiàn)在我們?cè)谝慌_(tái)機(jī)器上同時(shí)啟動(dòng)三個(gè)broker實(shí)例家凯。
首先,我們需要建立好其他2個(gè)broker的配置文件:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1

config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2

broker.id屬性在kafka集群中必須要是唯一的如失。我們需要重新指定port和log目錄绊诲,因?yàn)槲覀兪窃谕慌_(tái)機(jī)器上運(yùn)行多個(gè)實(shí)例。如果不進(jìn)行修改的話褪贵,consumer只能獲取到一個(gè)instance實(shí)例的信息掂之,或者是相互之間的數(shù)據(jù)會(huì)被影響。
目前我們已經(jīng)有一個(gè)zookeeper實(shí)例和一個(gè)broker實(shí)例在運(yùn)行了脆丁,現(xiàn)在我們只需要在啟動(dòng)2個(gè)broker實(shí)例即可:

bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties

現(xiàn)在我們創(chuàng)建一個(gè)新的topic世舰,備份因子設(shè)置為3:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

現(xiàn)在我們已經(jīng)有了集群,并且創(chuàng)建了一個(gè)3個(gè)備份因子的topic槽卫,但是到底是哪一個(gè)broker在為這個(gè)topic提供服務(wù)呢(因?yàn)槲覀冎挥幸粋€(gè)分區(qū)跟压,所以肯定同時(shí)只有一個(gè)broker在處理這個(gè)topic)?

 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
image.png

以下是輸出內(nèi)容的解釋歼培,第一行是所有分區(qū)的概要信息震蒋,之后的每一行表示每一個(gè)partition的信息。因?yàn)槟壳拔覀冎挥幸粋€(gè)partition躲庄,因此關(guān)于partition的信息只有一行查剖。
leader節(jié)點(diǎn)負(fù)責(zé)給定partition的所有讀寫(xiě)請(qǐng)求。
replicas 表示某個(gè)partition在哪幾個(gè)broker上存在備份读跷。不管這個(gè)幾點(diǎn)是不是”leader“梗搅,甚至這個(gè)節(jié)點(diǎn)掛了,也會(huì)列出效览。
isr 是replicas的一個(gè)子集无切,它只列出當(dāng)前還存活著的,并且備份了該partition的節(jié)點(diǎn)丐枉。
現(xiàn)在我們的案例中哆键,0號(hào)節(jié)點(diǎn)是leader,即使用server.properties啟動(dòng)的那個(gè)進(jìn)程瘦锹。
我們可以運(yùn)行相同的命令查看之前創(chuàng)建的名稱為”test“的topic

 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test 
image.png

之前設(shè)置了topic的partition數(shù)量為1籍嘹,備份因子為1,因此顯示就如上所示了弯院。
現(xiàn)在我們向新建的topic中發(fā)送一些message:

 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>my test msg 1
>my test msg 2

現(xiàn)在開(kāi)始消費(fèi):

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test msg 1
my test msg 2

現(xiàn)在我們來(lái)測(cè)試我們?nèi)蒎e(cuò)性辱士,因?yàn)閎roker0目前是leader,所以我們要將其kill

ps -ef | grep server.properties
kill -9 1177
現(xiàn)在再執(zhí)行命令:
bin/kafka-topics.sh --describe --zookeeper localhost:9092 --topic my-replicated-topic
image.png

我們可以看到听绳,leader節(jié)點(diǎn)已經(jīng)變成了broker 2.要注意的是颂碘,在Isr中,已經(jīng)沒(méi)有了0號(hào)節(jié)點(diǎn)椅挣。leader的選舉也是從ISR(in-sync replica)中進(jìn)行的头岔。
此時(shí)塔拳,我們依然可以 消費(fèi)新消息:

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test msg 1
my test msg 2

查看主題分區(qū)對(duì)應(yīng)的leader信息:


image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市峡竣,隨后出現(xiàn)的幾起案子靠抑,更是在濱河造成了極大的恐慌,老刑警劉巖适掰,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件颂碧,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡攻谁,警方通過(guò)查閱死者的電腦和手機(jī)稚伍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)弯予,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)戚宦,“玉大人,你說(shuō)我怎么就攤上這事锈嫩∈苈ィ” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵呼寸,是天一觀的道長(zhǎng)艳汽。 經(jīng)常有香客問(wèn)我,道長(zhǎng)对雪,這世上最難降的妖魔是什么河狐? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮瑟捣,結(jié)果婚禮上馋艺,老公的妹妹穿的比我還像新娘。我一直安慰自己迈套,他們只是感情好捐祠,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著桑李,像睡著了一般踱蛀。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上贵白,一...
    開(kāi)封第一講書(shū)人閱讀 49,144評(píng)論 1 285
  • 那天率拒,我揣著相機(jī)與錄音,去河邊找鬼禁荒。 笑死猬膨,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的圈浇。 我是一名探鬼主播寥掐,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼靴寂,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了召耘?” 一聲冷哼從身側(cè)響起百炬,我...
    開(kāi)封第一講書(shū)人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎污它,沒(méi)想到半個(gè)月后剖踊,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡衫贬,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年德澈,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片固惯。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡梆造,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出葬毫,到底是詐尸還是另有隱情镇辉,我是刑警寧澤,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布贴捡,位于F島的核電站忽肛,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏烂斋。R本人自食惡果不足惜屹逛,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望汛骂。 院中可真熱鬧罕模,春花似錦、人聲如沸香缺。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)图张。三九已至锋拖,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間祸轮,已是汗流浹背兽埃。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留适袜,地道東北人柄错。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親售貌。 傳聞我的和親對(duì)象是個(gè)殘疾皇子给猾,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容