一判耕、消費(fèi)方式
只要提到消息隊(duì)列透绩,必然會(huì)涉及到其推模式(push)和拉模式(pull),在kafka中壁熄,同樣會(huì)涉及到帚豪。
consumer采用的時(shí)pull模式,從broker中去拉取消息草丧。
為什么采用pull模式狸臣?
如果采用push模式,很容易造成消息的堆積方仿,因?yàn)閎roker控制消息的推送速率固棚,消息數(shù)量大的話,很難使每個(gè)消費(fèi)者很難適應(yīng)消息推送速率仙蚜。
采用pull模式則可以使每個(gè)消費(fèi)者以自身的消費(fèi)能力去消費(fèi)。
pull模式有什么不足厂汗?kafka如何解決這一問(wèn)題委粉?
如果kafka沒(méi)有消息,則每次消費(fèi)者拉去的都是空的數(shù)據(jù)娶桦,會(huì)使得其陷入循環(huán)贾节,一直返回空數(shù)據(jù)。針對(duì)這一情況衷畦,每次消費(fèi)數(shù)據(jù)的時(shí)候栗涂,消費(fèi)者會(huì)帶有一個(gè)時(shí)長(zhǎng)參數(shù)timeout,當(dāng)返回空數(shù)據(jù)時(shí)祈争,消費(fèi)者會(huì)等待timeout的時(shí)間斤程,再去消費(fèi)。
二菩混、分區(qū)分配策略
一個(gè)topic有多個(gè)partition忿墅,所以必然會(huì)涉及到partition的分配問(wèn)題扁藕,即確定那個(gè) partition 由哪個(gè) consumer 來(lái)消費(fèi)。
Kafka 有兩種分配策略疚脐,一是 Range亿柑,一是 RoundRobin。
有以下情況會(huì)導(dǎo)致消費(fèi)者分區(qū)的重新分配:
1) 當(dāng)consumer group的成員數(shù)量增加或者減少棍弄。
2) 當(dāng)消費(fèi)者訂閱的主題的partition數(shù)量變更望薄。
2.1 Range
Range策略是對(duì)每個(gè)主題而言的。它會(huì)將每個(gè)topic的分區(qū)從0往后一次排列呼畸。
其分配算法是用topic分區(qū)的總個(gè)數(shù)除以消費(fèi)者個(gè)數(shù)(這里指的消費(fèi)者是同一個(gè)消費(fèi)者組內(nèi)的式矫,因?yàn)椴煌M會(huì)得到topic的全量消息),除盡的話則消費(fèi)者均勻分配役耕,除不盡的話采转,在前面的消費(fèi)者會(huì)多消費(fèi)一個(gè)分區(qū)。
我們列舉三種情況來(lái)說(shuō)明它:
1)有一個(gè)topic瞬痘,有6個(gè)partition故慈,有兩個(gè)消費(fèi)者,則經(jīng)過(guò)首次分區(qū)分配后框全,會(huì)形成如下的形式:
如上所示察绷,將6個(gè)分區(qū)均勻的分給了兩個(gè)消費(fèi)者,6除以三除盡了津辩,所以前三個(gè)屬于consumer1拆撼,后三個(gè)屬于consumer2。
2)有一個(gè)topic喘沿,有7個(gè)partition闸度,有兩個(gè)消費(fèi)者,則經(jīng)過(guò)首次分區(qū)分配后蚜印,會(huì)形成如下的形式:
如上所示莺禁,consumer1比consumer2多消費(fèi)了一個(gè)分區(qū)的數(shù)據(jù)。
3)有兩個(gè)topic窄赋,每個(gè)有5個(gè)partition哟冬,有兩個(gè)消費(fèi)者,則經(jīng)過(guò)首次分區(qū)分配后忆绰,會(huì)形成如下的形式:
如上所示浩峡,發(fā)現(xiàn)consumer1總共消費(fèi)6分區(qū),而consumer2只消費(fèi)4個(gè)错敢。
綜上所述翰灾,能夠看出Range一個(gè)較為明顯的弊端。
2.2 RoundRobin策略
將所有topic的partition組成TopicAndPartition列表,然后對(duì)TopicAndPartition列表按照hashCode進(jìn)行排序预侯。
使用RoundRobin策略必須滿足以下條件:
1.同一個(gè)Consumer Group里面的所有consumer的num.streams必須相等(關(guān)于這個(gè)num.streams我還不知道是什么東西致开,知道的可以幫忙解答下)。
2.每個(gè)consumer訂閱的topic必須相同萎馅。
分以下三種情況展示:
1)假設(shè)有個(gè)topic双戳,有六個(gè)分區(qū),假設(shè)6個(gè)分區(qū)經(jīng)過(guò)hashcode的排序后順序是5糜芳,0飒货,4,3,2,1盖溺, 兩個(gè)消費(fèi)者:consumer1聘萨,consumer2华弓,分配結(jié)果如下所示:
2)假設(shè)有個(gè)topic,有7個(gè)分區(qū),假設(shè)6個(gè)分區(qū)經(jīng)過(guò)hashcode的排序后順序是6,5呻惕,0,4滥比,3亚脆,2,1盲泛, 兩個(gè)消費(fèi)者:consumer1濒持,consumer2,分配結(jié)果如下所示:
如上所示寺滚,consumer1會(huì)比consumer2多消費(fèi)一個(gè)分區(qū)柑营,但是也僅會(huì)多一個(gè)。沒(méi)有range策略的問(wèn)題玛迄。
2)假設(shè)有兩個(gè)topic由境,每個(gè)有5個(gè)分區(qū),假設(shè)6個(gè)分區(qū)經(jīng)過(guò)hashcode的排序后順序是topic1-0蓖议,topic1-4,topic1-3讥蟆,topic1-2勒虾,topic1-1,topic2-0瘸彤,topic2-4修然,topic2-3,topic2-2,topic2-1 愕宋,兩個(gè)消費(fèi)者:consumer1玻靡,consumer2,分配結(jié)果如下所示:
綜上所述中贝,RoundRobin與Range最大的區(qū)別:
Range是為某個(gè)topic完成了分區(qū)分派以后囤捻,再進(jìn)行下一個(gè)topic的分區(qū)分派;
RoundRobin是首先將這個(gè)group中的所有consumer訂閱的所有的topic-partition按順序展開(kāi)邻寿,依次對(duì)于每一個(gè)topic-partition蝎土,在consumer進(jìn)行round robin,為這個(gè)topic-partition選擇一個(gè)consumer绣否。
三誊涯、offset
由于 consumer 在消費(fèi)過(guò)程中可能會(huì)出現(xiàn)斷電宕機(jī)等故障,consumer 恢復(fù)后蒜撮,需要從故障前的位置的繼續(xù)消費(fèi)暴构,所以 consumer 需要實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè) offset,以便故障恢復(fù)后繼續(xù)消費(fèi)段磨。
Kafka 0.9 版本之前取逾,consumer 默認(rèn)將 offset 保存在 Zookeeper 中,從 0.9 版本開(kāi)始薇溃,consumer 默認(rèn)將 offset 保存在 Kafka 一個(gè)內(nèi)置的 topic 中菌赖,該 topic 為__consumer_offsets。
當(dāng)有消費(fèi)者第一次消費(fèi)kafka數(shù)據(jù)時(shí)就會(huì)自動(dòng)創(chuàng)建沐序,它的副本數(shù)不受集群配置的topic副本數(shù)限制琉用,分區(qū)數(shù)默認(rèn)50(可以配置),默認(rèn)壓縮策略為compact策幼。
使用以下的命令可以查看__consumer_offsets:
[root@localhost kafka_2.12-2.7.0]# bin/kafka-consumer-groups.sh --bootstrap-server 192.168.184.134:9092 --describe --group test
查看結(jié)果如下所示:
[root@localhost kafka_2.12-2.7.0]# bin/kafka-consumer-groups.sh --bootstrap-server 192.168.184.134:9092 --describe --group test
Consumer group 'test' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test test-kafka1 9 0 0 0 - - -
test test-kafka2 9 0 0 0 - - -
test test-kafka2 2 0 0 0 - - -
test test-kafka1 0 0 0 0 - - -
test test-kafka2 4 0 0 0 - - -
test test-kafka1 2 0 0 0 - - -
test test-kafka2 6 0 0 0 - - -
test test-kafka1 4 0 0 0 - - -
test test-kafka 3 202 203 1 - - -
test test-kafka2 8 0 0 0 - - -
test test-kafka1 6 0 0 0 - - -
test test-kafka 1 74 75 1 - - -
test test-kafka 7 73 74 1 - - -
test test-kafka 5 74 75 1 - - -
test test-kafka2 0 0 0 0 - - -
test test-kafka 9 75 76 1 - - -
test test-kafka1 8 0 0 0 - - -
test test-kafka1 1 0 0 0 - - -
test test-kafka 0 75 76 1 - - -
test test-kafka2 1 1 1 0 - - -
test test-kafka1 3 0 0 0 - - -
test test-kafka2 3 0 0 0 - - -
test test-kafka1 5 0 0 0 - - -
test test-kafka 4 75 76 1 - - -
test test-kafka2 5 0 0 0 - - -
test test-kafka1 7 0 0 0 - - -
test test-kafka 2 75 76 1 - - -
test test-kafka2 7 0 0 0 - - -
test test-kafka 8 74 75 1 - - -
test test-kafka 6 73 74 1 - - -
GROUP:消費(fèi)者組的id
TOPIC:主題
PARTITION:分區(qū)
CURRENT-OFFSET:當(dāng)前消費(fèi)的offset
LOG-END-OFFSET:最后一個(gè)offset
當(dāng)CURRENT-OFFSET與LOG-END-OFFSET相等時(shí)邑时,表示當(dāng)前分區(qū)的所有消息都被消費(fèi)了。
四特姐、消費(fèi)者組
kafka區(qū)別于其他的消息隊(duì)列晶丘,有著自己的特性,由于消費(fèi)者組的引入唐含,使得一條消息不但能夠一對(duì)一的被消費(fèi)者組內(nèi)的唯一消費(fèi)者消費(fèi)浅浮,也可以被不通消費(fèi)者組的消費(fèi)者同時(shí)消費(fèi)。
總結(jié)起來(lái)這個(gè)消費(fèi)者組和消費(fèi)者的關(guān)系:如果應(yīng)用需要讀取全量消息捷枯,那么請(qǐng)為該應(yīng)用設(shè)置一個(gè)消費(fèi)組滚秩;如果該應(yīng)用消費(fèi)能力不足,那么可以考慮在這個(gè)消費(fèi)組里增加消費(fèi)者淮捆。
通過(guò)下圖舉例:在springboot中使用以下方式配置:
spring:
kafka:
consumer:
group-id: test