3. 分布式發(fā)布訂閱消息系統(tǒng)Kafka
3.1 Kafka概述
Kafka官網(wǎng)(http://kafka.apache.org)
-
## PUBLISH & SUBSCRIBE
Read and write streams of data like a message system. -
## PROCESS
Write scalable stream processing applications that react to events in real-time. -
## STORE
Store streams of data safely in a distributed, replicated, fault-tolerant cluster.
Kafka? is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.
Kakfa起初是由LinkedIn公司開發(fā)的一個(gè)分布式的消息系統(tǒng)楞泼,后成為Apache的一部分霸琴,它使用Scala編寫吧雹,以可水平擴(kuò)展和高吞吐率而被廣泛使用贱案。目前越來越多的開源分布式處理系統(tǒng)如Cloudera项戴、Apache Storm古戴、Spark等都支持與Kafka集成秩伞。
Kafka憑借著自身的優(yōu)勢涛酗,越來越受到互聯(lián)網(wǎng)企業(yè)的青睞。Kafka作為一個(gè)商業(yè)級消息中間件沈跨,消息可靠性的重要性可想而知由捎。如何確保消息的精確傳輸?如何確保消息的準(zhǔn)確存儲饿凛?如何確保消息的正確消費(fèi)狞玛?這些都是需要考慮的問題软驰。本文首先從Kafka的架構(gòu)著手,先了解下Kafka的基本原理心肪,然后通過對kakfa的存儲機(jī)制锭亏、復(fù)制原理、同步原理硬鞍、可靠性和持久性保證等等一步步對其可靠性進(jìn)行分析慧瘤,最后通過benchmark來增強(qiáng)對Kafka高可靠性的認(rèn)知。
3.2 Kafka架構(gòu)及核心概念
Kafka核心概念
- Broker:Kafka 集群包含一個(gè)或多個(gè)服務(wù)器固该,這種服務(wù)器被稱為 broker锅减。
- Topic:每條發(fā)布到 Kafka 集群的消息都有一個(gè)類別,這個(gè)類別被稱為 Topic伐坏。(物理上不同 Topic 的消息分開存儲怔匣,邏輯上一個(gè) Topic 的消息雖然保存于一個(gè)或多個(gè) broker 上,但用戶只需指定消息的 Topic 即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)桦沉。
- Partition:Partition 是物理上的概念每瞒,每個(gè) Topic 包含一個(gè)或多個(gè) Partition,每個(gè)Partition內(nèi)部是有序的纯露。
- Producer:消息生產(chǎn)者剿骨,負(fù)責(zé)發(fā)布消息到 Kafka broker。
- Consumer:消息消費(fèi)者苔埋,向 Kafka broker 讀取消息的客戶端懦砂。
- Consumer Group:每個(gè) Consumer 屬于一個(gè)特定的 Consumer Group(可為每個(gè) Consumer 指定 group name,若不指定 group name 則屬于默認(rèn)的 group)组橄。
Kafka工作流程
Kafka 是一個(gè)基于分布式的消息發(fā)布-訂閱系統(tǒng)荞膘,它被設(shè)計(jì)成快速、可擴(kuò)展的玉工、持久的羽资。與其他消息發(fā)布-訂閱系統(tǒng)類似指蚁,Kafka 在主題當(dāng)中保存消息的信息摹量。生產(chǎn)者向主題寫入數(shù)據(jù),消費(fèi)者從主題讀取數(shù)據(jù)誉尖。由于 Kafka 的特性是支持分布式狭郑,同時(shí)也是基于分布式的腹暖,所以主題也是可以在多個(gè)節(jié)點(diǎn)上被分區(qū)和覆蓋的。
信息是一個(gè)字節(jié)數(shù)組翰萨,程序員可以在這些字節(jié)數(shù)組中存儲任何對象脏答,支持的數(shù)據(jù)格式包括 String、JSON、Avro殖告。Kafka 通過給每一個(gè)消息綁定一個(gè)鍵值的方式來保證生產(chǎn)者可以把所有的消息發(fā)送到指定位置阿蝶。屬于某一個(gè)消費(fèi)者群組的消費(fèi)者訂閱了一個(gè)主題,通過該訂閱消費(fèi)者可以跨節(jié)點(diǎn)地接收所有與該主題相關(guān)的消息黄绩,每一個(gè)消息只會發(fā)送給群組中的一個(gè)消費(fèi)者羡洁,所有擁有相同鍵值的消息都會被確保發(fā)給這一個(gè)消費(fèi)者。
Kafka 設(shè)計(jì)中將每一個(gè)主題分區(qū)當(dāng)作一個(gè)具有順序排列的日志爽丹。同處于一個(gè)分區(qū)中的消息都被設(shè)置了一個(gè)唯一的偏移量筑煮。Kafka 只會保持跟蹤未讀消息,一旦消息被置為已讀狀態(tài)粤蝎,Kafka 就不會再去管理它了咆瘟。Kafka 的生產(chǎn)者負(fù)責(zé)在消息隊(duì)列中對生產(chǎn)出來的消息保證一定時(shí)間的占有,消費(fèi)者負(fù)責(zé)追蹤每一個(gè)主題 (可以理解為一個(gè)日志通道) 的消息并及時(shí)獲取它們诽里。基于這樣的設(shè)計(jì)飞蛹,Kafka 可以在消息隊(duì)列中保存大量的開銷很小的數(shù)據(jù)谤狡,并且支持大量的消費(fèi)者訂閱。
Kafka體系架構(gòu)
如上圖所示卧檐,一個(gè)典型的Kafka體系架構(gòu)包括若干Producer(可以是服務(wù)器日志墓懂,業(yè)務(wù)數(shù)據(jù),頁面前端產(chǎn)生的page view等等)霉囚,若干broker(Kafka支持水平擴(kuò)展捕仔,一般broker數(shù)量越多,集群吞吐率越高)盈罐,若干Consumer (Group)榜跌,以及一個(gè)Zookeeper集群。Kafka通過Zookeeper管理集群配置盅粪,選舉leader钓葫,以及在consumer group發(fā)生變化時(shí)進(jìn)行rebalance。Producer使用push(推)模式將消息發(fā)布到broker票顾,Consumer使用pull(拉)模式從broker訂閱并消費(fèi)消息础浮。
-
Topic & Partition
一個(gè)topic可以認(rèn)為一個(gè)一類消息,每個(gè)topic將被分成多個(gè)partition奠骄,每個(gè)partition在存儲層面是append log文件豆同。任何發(fā)布到此partition的消息都會被追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量)含鳞,offset為一個(gè)long型的數(shù)字影锈,它唯一標(biāo)記一條消息。每條消息都被append到partition中,是順序?qū)懘疟P精居,因此效率非常高(經(jīng)驗(yàn)證锄禽,順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存還要高,這是Kafka高吞吐率的一個(gè)很重要的保證)靴姿。
每一條消息被發(fā)送到broker中沃但,會根據(jù)partition規(guī)則選擇被存儲到哪一個(gè)partition。如果partition規(guī)則設(shè)置的合理佛吓,所有消息可以均勻分布到不同的partition里宵晚,這樣就實(shí)現(xiàn)了水平擴(kuò)展。(如果一個(gè)topic對應(yīng)一個(gè)文件维雇,那這個(gè)文件所在的機(jī)器I/O將會成為這個(gè)topic的性能瓶頸淤刃,而partition解決了這個(gè)問題)。在創(chuàng)建topic時(shí)可以在$KAFKA_HOME/config/server.properties中指定這個(gè)partition的數(shù)量(如下所示)吱型,當(dāng)然可以在topic創(chuàng)建之后去修改partition的數(shù)量逸贾。
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3
在發(fā)送一條消息時(shí),可以指定這個(gè)消息的key津滞,producer根據(jù)這個(gè)key和partition機(jī)制來判斷這個(gè)消息發(fā)送到哪個(gè)partition铝侵。partition機(jī)制可以通過指定producer的partition.class這一參數(shù)來指定,該class必須實(shí)現(xiàn)kafka.producer.Partitioner接口触徐。
- Kafka文件存儲機(jī)制
Kafka的高可靠性的保障來源于其健壯的副本(replication)策略咪鲜。通過調(diào)節(jié)其副本相關(guān)參數(shù),可以使得Kafka在性能和可靠性之間運(yùn)轉(zhuǎn)的游刃有余撞鹉。Kafka從0.8.x版本開始提供partition級別的復(fù)制,replication的數(shù)量可以在$KAFKA_HOME/config/server.properties中配置(default.replication.refactor)疟丙。
先從Kafka文件存儲機(jī)制入手,從最底層了解Kafka的存儲細(xì)節(jié)鸟雏,進(jìn)而對其的存儲有個(gè)微觀的認(rèn)知享郊。
Kafka中消息是以topic進(jìn)行分類的,生產(chǎn)者通過topic向Kafka broker發(fā)送消息孝鹊,消費(fèi)者通過topic讀取數(shù)據(jù)拂蝎。然而topic在物理層面又能以partition為分組,一個(gè)topic可以分成若干個(gè)partition惶室,那么topic以及partition又是怎么存儲的呢温自?partition還可以細(xì)分為segment,一個(gè)partition物理上由多個(gè)segment組成皇钞,那么這些segment又是什么呢悼泌?下面我們來一一揭曉。
為了便于說明問題夹界,假設(shè)這里只有一個(gè)Kafka集群馆里,且這個(gè)集群只有一個(gè)Kafka broker,即只有一臺物理機(jī)。在這個(gè)Kafka broker中配置
# 配置文件$KAFKA_HOME/config/server.properties中鸠踪,設(shè)置Kafka消息文件存儲目錄
log.dirs=/tmp/kafka-logs
# 創(chuàng)建一個(gè)topic:topic_zzh_test丙者,設(shè)置partition的數(shù)量為4。
$KAFKA_HOME/bin/kafka-topics.sh –create –zookeeper localhost:2181 –partitions 4 \
–topic topic_zzh_test –replication-factor 4
# 那么我們此時(shí)可以在/tmp/kafka-logs目錄中可以看到生成了4個(gè)目錄:
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-0
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-1
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-2
drwxr-xr-x 2 root root 4096 Apr 10 16:10 topic_zzh_test-3
在Kafka文件存儲中营密,同一個(gè)topic下有多個(gè)不同的partition械媒,每個(gè)partiton為一個(gè)目錄,partition的名稱規(guī)則為:topic名稱+有序序號评汰,第一個(gè)序號從0開始計(jì)纷捞,最大的序號為partition數(shù)量減1,partition是實(shí)際物理上的概念被去,而topic是邏輯上的概念主儡。
上面提到partition還可以細(xì)分為segment,這個(gè)segment又是什么惨缆?如果就以partition為最小存儲單位糜值,我們可以想象當(dāng)Kafka producer不斷發(fā)送消息,必然會引起partition文件的無限擴(kuò)張坯墨,這樣對于消息文件的維護(hù)以及已經(jīng)被消費(fèi)的消息的清理帶來嚴(yán)重的影響臀玄,所以這里以segment為單位又將partition細(xì)分。每個(gè)partition(目錄)相當(dāng)于一個(gè)巨型文件被平均分配到多個(gè)大小相等的segment(段)數(shù)據(jù)文件中(每個(gè)segment 文件中消息數(shù)量不一定相等)這種特性也方便old segment的刪除畅蹂,即方便已被消費(fèi)的消息的清理,提高磁盤的利用率荣恐。每個(gè)partition只需要支持順序讀寫就行液斜,segment的文件生命周期由服務(wù)端配置參數(shù)(log.segment.bytes,log.roll.{ms,hours}等若干參數(shù))決定叠穆。
segment文件由兩部分組成少漆,分別為“.index”文件和“.log”文件,分別表示為segment索引文件和數(shù)據(jù)文件硼被。這兩個(gè)文件的命令規(guī)則為:partition全局的第一個(gè)segment從0開始示损,后續(xù)每個(gè)segment文件名為上一個(gè)segment文件最后一條消息的offset值,數(shù)值大小為64位嚷硫,20位數(shù)字字符長度检访,沒有數(shù)字用0填充,如下:
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
以上面的segment文件為例仔掸,展示出segment:00000000000000170410的“.index”文件和“.log”文件的對應(yīng)的關(guān)系脆贵,如下圖:
如上圖,“.index”索引文件存儲大量的元數(shù)據(jù)起暮,“.log”數(shù)據(jù)文件存儲大量的消息卖氨,索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址。其中以“.index”索引文件中的元數(shù)據(jù)[3, 348]為例,在“.log”數(shù)據(jù)文件表示第3個(gè)消息筒捺,即在全局partition中表示170410+3=170413個(gè)消息柏腻,該消息的物理偏移地址為348。
那么如何從partition中通過offset查找message呢系吭?
以上圖為例五嫂,讀取offset=170418的消息,首先查找segment文件村斟,其中00000000000000000000.index為最開始的文件贫导,第二個(gè)文件為00000000000000170410.index(起始偏移為170410+1=170411),而第三個(gè)文件為00000000000000239430.index(起始偏移為239430+1=239431)蟆盹,所以這個(gè)offset=170418就落到了第二個(gè)文件之中孩灯。其他后續(xù)文件可以依次類推,以其實(shí)偏移量命名并排列這些文件逾滥,然后根據(jù)二分查找法就可以快速定位到具體文件位置峰档。其次根據(jù)00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置進(jìn)行讀取。
要是讀取offset=170418的消息寨昙,從00000000000000170410.log文件中的1325的位置進(jìn)行讀取讥巡,那么怎么知道何時(shí)讀完本條消息,否則就讀到下一條消息的內(nèi)容了舔哪?
這個(gè)就需要聯(lián)系到消息的物理結(jié)構(gòu)了欢顷,消息都具有固定的物理結(jié)構(gòu),包括:offset(8 Bytes)捉蚤、消息體的大刑俊(4 Bytes)、crc32(4 Bytes)缆巧、magic(1 Byte)布持、attributes(1 Byte)、key length(4 Bytes)陕悬、key(K Bytes)题暖、payload(N Bytes)等等字段,可以確定一條消息的大小捉超,即讀取到哪里截止胧卤。
-
復(fù)制原理和同步方式
Kafka中topic的每個(gè)partition有一個(gè)預(yù)寫式的日志文件,雖然partition可以繼續(xù)細(xì)分為若干個(gè)segment文件拼岳,但是對于上層應(yīng)用來說可以將partition看成最小的存儲單元(一個(gè)有多個(gè)segment文件拼接的“巨型”文件)灌侣,每個(gè)partition都由一些列有序的、不可變的消息組成裂问,這些消息被連續(xù)的追加到partition中侧啼。
上圖中有兩個(gè)新名詞:HW和LEO牛柒。這里先介紹下LEO,LogEndOffset的縮寫痊乾,表示每個(gè)partition的log最后一條Message的位置皮壁。HW是HighWatermark的縮寫,是指consumer能夠看到的此partition的位置哪审,這個(gè)涉及到多副本的概念蛾魄,這里先提及一下,下節(jié)再詳表湿滓。
言歸正傳滴须,為了提高消息的可靠性,Kafka每個(gè)topic的partition有N個(gè)副本(replicas)叽奥,其中N(大于等于1)是topic的復(fù)制因子(replica fator)的個(gè)數(shù)扔水。Kafka通過多副本機(jī)制實(shí)現(xiàn)故障自動(dòng)轉(zhuǎn)移,當(dāng)Kafka集群中一個(gè)broker失效情況下仍然保證服務(wù)可用朝氓。在Kafka中發(fā)生復(fù)制時(shí)確保partition的日志能有序地寫到其他節(jié)點(diǎn)上魔市,N個(gè)replicas中,其中一個(gè)replica為leader赵哲,其他都為follower, leader處理partition的所有讀寫請求待德,與此同時(shí),follower會被動(dòng)定期地去復(fù)制leader上的數(shù)據(jù)枫夺。
如下圖所示将宪,Kafka集群中有4個(gè)broker, 某topic有3個(gè)partition,且復(fù)制因子即副本個(gè)數(shù)也為3:
Kafka提供了數(shù)據(jù)復(fù)制算法保證,如果leader發(fā)生故障或掛掉橡庞,一個(gè)新leader被選舉并被接受客戶端的消息成功寫入较坛。Kafka確保從同步副本列表中選舉一個(gè)副本為leader,或者說follower追趕leader數(shù)據(jù)毙死。leader負(fù)責(zé)維護(hù)和跟蹤ISR(In-Sync Replicas的縮寫,表示副本同步隊(duì)列喻鳄,具體可參考下節(jié))中所有follower滯后的狀態(tài)扼倘。當(dāng)producer發(fā)送一條消息到broker后,leader寫入消息并復(fù)制到所有follower除呵。消息提交之后才被成功復(fù)制到所有的同步副本再菊。消息復(fù)制延遲受最慢的follower限制,重要的是快速檢測慢副本颜曾,如果follower“落后”太多或者失效纠拔,leader將會把它從ISR中刪除。
- ISR
上節(jié)我們涉及到ISR (In-Sync Replicas)泛豪,這個(gè)是指副本同步隊(duì)列稠诲。副本數(shù)對Kafka的吞吐率是有一定的影響侦鹏,但極大的增強(qiáng)了可用性。默認(rèn)情況下Kafka的replica數(shù)量為1臀叙,即每個(gè)partition都有一個(gè)唯一的leader略水,為了確保消息的可靠性,通常應(yīng)用中將其值(由broker的參數(shù)offsets.topic.replication.factor指定)大小設(shè)置為大于1劝萤,比如3渊涝。 所有的副本(replicas)統(tǒng)稱為Assigned Replicas,即AR床嫌。ISR是AR中的一個(gè)子集跨释,由leader維護(hù)ISR列表,follower從leader同步數(shù)據(jù)有一些延遲(包括延遲時(shí)間replica.lag.time.max.ms和延遲條數(shù)replica.lag.max.messages兩個(gè)維度, 當(dāng)前最新的版本0.10.x中只支持replica.lag.time.max.ms這個(gè)維度)厌处,任意一個(gè)超過閾值都會把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表鳖谈,新加入的follower也會先存放在OSR中。AR=ISR+OSR嘱蛋。
Kafka 0.10.x版本后移除了replica.lag.max.messages參數(shù)蚯姆,只保留了replica.lag.time.max.ms作為ISR中副本管理的參數(shù)。為什么這樣做呢洒敏?replica.lag.max.messages表示當(dāng)前某個(gè)副本落后leaeder的消息數(shù)量超過了這個(gè)參數(shù)的值龄恋,那么leader就會把follower從ISR中刪除。假設(shè)設(shè)置replica.lag.max.messages=4凶伙,那么如果producer一次傳送至broker的消息數(shù)量都小于4條時(shí)郭毕,因?yàn)樵趌eader接受到producer發(fā)送的消息之后而follower副本開始拉取這些消息之前,follower落后leader的消息數(shù)不會超過4條消息函荣,故此沒有follower移出ISR显押,所以這時(shí)候replica.lag.max.message的設(shè)置似乎是合理的。但是producer發(fā)起瞬時(shí)高峰流量傻挂,producer一次發(fā)送的消息超過4條時(shí)乘碑,也就是超過replica.lag.max.messages,此時(shí)follower都會被認(rèn)為是與leader副本不同步了金拒,從而被踢出了ISR兽肤。但實(shí)際上這些follower都是存活狀態(tài)的且沒有性能問題。那么在之后追上leader,并被重新加入了ISR绪抛。于是就會出現(xiàn)它們不斷地剔出ISR然后重新回歸ISR资铡,這無疑增加了無謂的性能損耗。而且這個(gè)參數(shù)是broker全局的幢码。設(shè)置太大了笤休,影響真正“落后”follower的移除;設(shè)置的太小了症副,導(dǎo)致follower的頻繁進(jìn)出店雅。無法給定一個(gè)合適的replica.lag.max.messages的值政基,故此,新版本的Kafka移除了這個(gè)參數(shù)底洗。
注:ISR中包括:leader和follower腋么。
上面一節(jié)還涉及到一個(gè)概念,即HW亥揖。HW俗稱高水位珊擂,HighWatermark的縮寫,取一個(gè)partition對應(yīng)的ISR中最小的LEO作為HW费变,consumer最多只能消費(fèi)到HW所在的位置摧扇。另外每個(gè)replica都有HW,leader和follower各自負(fù)責(zé)更新自己的HW的狀態(tài)。對于leader新寫入的消息挚歧,consumer不能立刻消費(fèi)扛稽,leader會等待該消息被所有ISR中的replicas同步后更新HW,此時(shí)消息才能被consumer消費(fèi)滑负。這樣就保證了如果leader所在的broker失效在张,該消息仍然可以從新選舉的leader中獲取。對于來自內(nèi)部broKer的讀取請求矮慕,沒有HW的限制帮匾。
下圖詳細(xì)的說明了當(dāng)producer生產(chǎn)消息至broker后,ISR以及HW和LEO的流轉(zhuǎn)過程:
由此可見痴鳄,Kafka的復(fù)制機(jī)制既不是完全的同步復(fù)制瘟斜,也不是單純的異步復(fù)制。事實(shí)上痪寻,同步復(fù)制要求所有能工作的follower都復(fù)制完螺句,這條消息才會被commit,這種復(fù)制方式極大的影響了吞吐率橡类。而異步復(fù)制方式下蛇尚,follower異步的從leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被leader寫入log就被認(rèn)為已經(jīng)commit顾画,這種情況下如果follower都還沒有復(fù)制完取劫,落后于leader時(shí),突然leader宕機(jī)亲雪,則會丟失數(shù)據(jù)勇凭。而Kafka的這種使用ISR的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率疚膊。
Kafka的ISR的管理最終都會反饋到Zookeeper節(jié)點(diǎn)上义辕。具體位置為:/brokers/topics/[topic]/partitions/[partition]/state。目前有兩個(gè)地方會對這個(gè)Zookeeper的節(jié)點(diǎn)進(jìn)行維護(hù):
Controller來維護(hù):Kafka集群中的其中一個(gè)Broker會被選舉為Controller寓盗,主要負(fù)責(zé)Partition管理和副本狀態(tài)管理灌砖,也會執(zhí)行類似于重分配partition之類的管理任務(wù)璧函。在符合某些特定條件下,Controller下的LeaderSelector會選舉新的leader基显,ISR和新的leader_epoch及controller_epoch寫入Zookeeper的相關(guān)節(jié)點(diǎn)中蘸吓。同時(shí)發(fā)起LeaderAndIsrRequest通知所有的replicas。
leader來維護(hù):leader有單獨(dú)的線程定期檢測ISR中follower是否脫離ISR, 如果發(fā)現(xiàn)ISR變化撩幽,則會將新的ISR的信息返回到Zookeeper的相關(guān)節(jié)點(diǎn)中库继。
3.3 Kafka單節(jié)點(diǎn)單broker部署及使用
前置條件
- Java Runtime Environment - Java 1.8 or later
- ZooKeeper
安裝JDK(略)
安裝ZooKeeper(略)
安裝Kafka
- 下載Kafka包
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.0.0/kafka_2.11-2.0.0.tgz
- 解壓Kafka包
tar -zxvf kafka_2.11-2.0.0.tgz -C [install dir]
- 配置Kafka環(huán)境變量
vim /etc/profile 或者
vim ~/.bash_profile
export KAFKA_HOME=[flume install dir]
export PATH = $KAFKA_HOME/bin:$PATH
執(zhí)行指令
source /etc/profile 或者
source ~/.bash_profile
使得配置生效。
- 啟動(dòng)服務(wù)
# 首先啟動(dòng)ZooKeeper服務(wù)
bin/zookeeper-server-start.sh config/zookeeper.propertis
# 然后啟動(dòng)Kafka服務(wù)
bin/kafka-server-start.sh config/server.propertis
- 創(chuàng)建Topic
# 創(chuàng)建一個(gè)只有單一partition和單一副本的topic test
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \
--partitions 1 --topic test
# 驗(yàn)證topic創(chuàng)建是否成功
bin/kafka-topic.sh --list --zookeeper localhost:2181
test
# 控制臺反饋test窜醉,說明topic 創(chuàng)建成功宪萄,否則失敗。
- 發(fā)布messages
# Run the producer and then type a few messages into the console to send to the server.
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
- 啟動(dòng)Consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
3.4 Kafka單節(jié)點(diǎn)多broker部署及使用
- 設(shè)置多broker集群
# 首先為每個(gè)broker定制配置文件
cp config/server.properies config/server-1.properties
cp config/server.properies config/server-2.properties
cp config/server.properies config/server-3.properties
# 修改每個(gè)配置文件中的重要配置項(xiàng)
# broker.id = (*)
# listeners = (*)
# log.dirs = (*)
#配置文件1: config/server-1.properties
broker.id=1
listeners=[PLAINTEXT://:9093](plaintext://:9093)
log.dirs=$KAFKA_HOME/tmp/logs/kafka-logs-1
#配置文件2: config/server-2.properties
broker.id=2
listeners=[PLAINTEXT://:9094](plaintext://:9094)
log.dirs=$KAFKA_HOME/tmp/logs/kafka-logs-2
#配置文件3: config/server-3.properties
broker.id=3
listeners=[PLAINTEXT://:9095](plaintext://:9095)
log.dirs=$KAFKA_HOME/tmp/logs/kafka-logs-3
- 啟動(dòng)Kafka后臺服務(wù)
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
bin/kafka-server-start.sh config/server-3.properties &
- 創(chuàng)建Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 \
--partitions 1 --topic my-replicated-topic
- 查看topic詳細(xì)信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
`Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:`
`Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0`
- 發(fā)布messages
bin/kafka-console-producer.sh --broker-list localhost:9093, localhost:9094, localhost:9095 \
--topic my-replicated-topic
my test message 1
my test message 2
- 消費(fèi)messages
bin/kafka-console-consumer.sh --bootstrap-server localhost:9093, localhost:9094, localhost:9095 \
--from-beginning --topic my-replicated-topic`
my test message 1
my test message 2
3.5 Kafka容錯(cuò)性測試
測試方法榨惰,通過強(qiáng)制中斷topic副本中對應(yīng)的某些broke拜英,測試消息傳遞的正確性。
------------------- 未完琅催,待續(xù) --------------------