目錄
kafka consumer
- 消費(fèi)方式
- 消費(fèi)分區(qū)分配策略
- 消費(fèi)過(guò)程中offset的維護(hù) - 老版本zk節(jié)點(diǎn)維護(hù)
1. 消費(fèi)方式
1.1 broker push
這種消費(fèi)方式由broker主動(dòng)推送消息給消費(fèi)者,消費(fèi)者被動(dòng)接收消息懂拾。
缺點(diǎn): consumer 消費(fèi)能力不強(qiáng)的情況下可能出現(xiàn)拒絕服務(wù)胚股、以及因網(wǎng)絡(luò)問(wèn)問(wèn)題產(chǎn)生的網(wǎng)絡(luò)擁塞的情況;
1.2 consumer pull
消費(fèi)者主動(dòng)輪詢broker是否有數(shù)據(jù)可以消費(fèi)额获,拉取消息的速率完全由consumer自己掌握,但是可能會(huì)出現(xiàn)broker沒(méi)有數(shù)據(jù)揣云,消費(fèi)者陷入無(wú)限循環(huán)當(dāng)中;
解決的辦法是,在kafka consumer消費(fèi)數(shù)據(jù)時(shí)傳入一個(gè)時(shí)長(zhǎng)參數(shù) timeout裆熙,防止cpu空轉(zhuǎn)
2. 消費(fèi)者組 consumer group 分區(qū)分配策略
一個(gè)consumer group 中有多個(gè)consumer, 一個(gè)topic 中會(huì)有多個(gè)partition;所以會(huì)出現(xiàn)消費(fèi)者消費(fèi)分區(qū)數(shù)據(jù)時(shí)禽笑,partition分配的問(wèn)題入录,即確定哪個(gè)partition 由哪個(gè)consumer來(lái)消費(fèi)?
2.1 round robin輪詢策略
如下圖所示:
假如同一個(gè)主題:
另外 consumer group 可以對(duì)多個(gè)主題進(jìn)行消費(fèi):
看如下場(chǎng)景:
2個(gè)主題 T1、T2
在這種場(chǎng)景下分區(qū)策略偽代碼如下所示:
def get_partition_index():
return map(TopicAndPartition:List, hash()) mod num(consumers)
來(lái)確定分配給消費(fèi)者組中消費(fèi)者的partition index
這種方式消費(fèi)者組會(huì)將所有topic 中的 partition 當(dāng)作一個(gè)整體來(lái)輪詢分配佳镜。
分配主體是消費(fèi)者組;
適用于 消費(fèi)者組中所有 消費(fèi)者 訂閱的都是同一個(gè)主題 的場(chǎng)景僚稿。
2.2 Range 策略
2.2.1 Range 分配策略詳解
range 分配主體是被消費(fèi)的broker的單個(gè)主題:
consumer group 中的單個(gè)consumer 被分配的 可消費(fèi) partition 個(gè)數(shù)差距越來(lái)越大。
要點(diǎn): 按主題來(lái)區(qū)分的蟀伸。
range策略詳述蚀同,我們根據(jù)一個(gè)場(chǎng)景來(lái)深入理解一下:
如下圖所示:
當(dāng)前某主題有8個(gè)partition缅刽,某消費(fèi)者組中消費(fèi)者的個(gè)數(shù)是3個(gè),那么最終的分配結(jié)果如下圖所示:
具體算法過(guò)程也很簡(jiǎn)單蠢络,簡(jiǎn)述一下:
1. 計(jì)算n = num(topic partitions)/num(consumers of consumer group) = 8/3 = 2
2. 計(jì)算m = num(topic partitions)%num(consumers of consumer group) = 2
3. 分配規(guī)則為消費(fèi)組中的前m個(gè)消費(fèi)者衰猛,每個(gè)消費(fèi)者可以分配到的分區(qū)數(shù)為n+1 = 2+1=3, 剩余的消費(fèi)者可消費(fèi)的分區(qū)數(shù)為n
range 分配策略是有一些問(wèn)題的:
加入新的主題,但是消費(fèi)者組中的消費(fèi)者數(shù)量不變刹孔,那么頭部的消費(fèi)者就會(huì)被分配更多的partition,造成分配不均的問(wèn)題啡省。
2.2.2 調(diào)用Range的時(shí)機(jī)
當(dāng)消費(fèi)者組中的消費(fèi)者數(shù)量發(fā)生變化的時(shí)候,就會(huì)調(diào)用Range策略芦疏。
3. 消費(fèi)過(guò)程中offset的維護(hù)
3.1 為什么要維護(hù)offset
- case_1 - consumer宕機(jī) consumer 在消費(fèi)的過(guò)程中可能出現(xiàn)斷電宕機(jī)的問(wèn)題冕杠,consumer恢復(fù)后需要從消費(fèi)前的位置(offset)繼續(xù)消費(fèi)消息,所以消費(fèi)者在消費(fèi)過(guò)程中需要實(shí)時(shí)記錄消費(fèi)到了哪個(gè)位置酸茴,
以便消費(fèi)者恢復(fù)之后繼續(xù)消費(fèi)分预。 - case_2 - 添加一個(gè)consumer,根據(jù)消費(fèi)者的分區(qū)分配策略薪捍,新加入的消費(fèi)者很可能獲取到之前消費(fèi)者已經(jīng)消費(fèi)過(guò)的分區(qū)笼痹,那么這個(gè)消費(fèi)者應(yīng)該繼續(xù)消費(fèi)后續(xù)的消息。從哪里開(kāi)始繼續(xù)消費(fèi)消息酪穿,就是offset的
作用所在凳干。
3.2 offset 新老版本維護(hù)策略
3.2.1 老版本 - zookeeper 上保存消費(fèi)者消費(fèi)過(guò)的 消息 的offset
如下圖所示,開(kāi)啟了4個(gè)窗口: (順時(shí)針描述)
左上: 1個(gè)producer console - 生產(chǎn)消息至first topic(first topic 有3個(gè)partition,每個(gè)partition有2個(gè)replication)分區(qū)被济,具體消息進(jìn)入哪個(gè)分區(qū)
救赐,屬于無(wú)指定partition,無(wú)key只磷,有value的情況经磅,可以參考 producer 這一節(jié)來(lái)理解。
右上: 1個(gè)consumer console(其實(shí)是一個(gè)消費(fèi)者組钮追,只不過(guò)只有1個(gè)消費(fèi)者) - 消費(fèi) first topic 分區(qū)(leader) 中的消息
右下: 1個(gè)consumer console (其實(shí)是一個(gè)消費(fèi)者組预厌,只不過(guò)只有1個(gè)消費(fèi)者) - 消費(fèi) first topic 分區(qū)(leader) 中的消息
左下: 1個(gè)zkcli console, 從中我們可以查看到具體的contoller元媚、brokers轧叽、consumers、config 等相關(guān)信息
通過(guò)producer 發(fā)送消息 + consumer 消費(fèi)消息炭晒,
在zkCli 中查看具體的消費(fèi)者消費(fèi)消息的offset變化
命令為:
./zkCli.sh -server localhost:2181
get /consumers/$consumer_group/offsets/$topi/$partition
所以老版本消費(fèi)者消費(fèi)消息的offset 記錄方式為
[consumer group] + [topic] + [partition index]
采用這樣的方式記錄offset,當(dāng)consumer group 中下線甥角、上線新的consumer時(shí)腰埂,消費(fèi)過(guò)的消息就不會(huì)被重新消費(fèi)。
下一節(jié)我們繼續(xù)學(xué)習(xí)新版本 bootstrap-server上保存消費(fèi)者消費(fèi)過(guò)的消息的offset機(jī)制