7.1 分區(qū)分配策略
在 3.1 節(jié)中講述了消費(fèi)者與消費(fèi)組的模型,并且在默認(rèn)分區(qū)分配策略的背景下通過案例進(jìn) 行了具體的分析。 Kafka 提供了消費(fèi)者客戶端參數(shù) partition . assignrr陽1t . strategy 來設(shè) 置消費(fèi)者與訂閱主題之間的分區(qū)分配策略好渠。默認(rèn)情況下,此參數(shù)的值為 org.apache.kafka. clients.consumer.RangeAssignor彤恶,即采用 RangeAssignor分配策略袋倔。除此之外, Kafka還提供了另 外兩種分配策略: RoundRobinAssignor 和 StickyAssignor诅岩。 消費(fèi)者客戶端參數(shù) partitio口- assignment.strategy 可以配置多個(gè)分配策略讳苦,彼此之間以逗號分隔。
7.1 .1 RangeAssignor 分配策略
RangeAssignor 分配策略的原理是按照消費(fèi)者總數(shù)和分區(qū)總數(shù)進(jìn)行整除運(yùn)算來獲得一個(gè)跨度吩谦,然后將分區(qū)按照跨度進(jìn)行平均分配鸳谜, 以保證 分區(qū)盡可 能均勻地分配 給所 有的消費(fèi)者 。 對于每一個(gè)主題 式廷, RangeAssignor 策略會將消費(fèi)組內(nèi)所有訂閱這個(gè)主題的消費(fèi)者按照名稱的字典序排 序 咐扭, 然后為每個(gè)消費(fèi)者劃分固定的分區(qū)范圍,如果不夠平均分配滑废,那么字典序靠前的消費(fèi)者會 被多分配一個(gè)分區(qū)蝗肪。假設(shè) n=分區(qū)數(shù)/消費(fèi)者數(shù)量, m=分區(qū)數(shù)%消費(fèi)者數(shù)量蠕趁,那么前 m 個(gè)消費(fèi)者每個(gè)分配 n+l 個(gè) 分區(qū)薛闪,后面的(消費(fèi)者數(shù)量-m)個(gè)消費(fèi)者每個(gè)分配 n個(gè)分區(qū)。
除了第 3.1 節(jié)的示例俺陋,為了更加通俗地講解 RangeAssignor 策略 豁延, 我們不妨再舉一些示例昙篙。 假設(shè)消費(fèi)組內(nèi)有 2個(gè)消費(fèi)者 co和 Cl,都訂閱了主題 tO和 tl诱咏,并且每個(gè)主題都有 4個(gè)分區(qū)苔可, 那 么訂閱的所有分區(qū)可以標(biāo)識為 : tOpO、 tOpl胰苏、 t0p2硕蛹、 t0p3、 tlpO硕并、 tlp1法焰、 tlp2、 tlp3倔毙。最終的分配 結(jié)果為 :
消費(fèi)者 CO: tOpO埃仪、 tOpl, t lpO、 tlpl
消貨者 Cl : t0p2、 t0p3, tlp2怜校、 tlp3
這樣分配得很均勻机蔗,那么這個(gè)分配策略能夠一直保持這種良好的特性嗎?我們不妨再來看 另一種情況。假設(shè)上面例子中 2個(gè)主題都只有 3個(gè)分區(qū)径筏,那么訂閱的所有分區(qū)可以標(biāo)識為: tOpO指黎、 tOpl叭首、 t0p2、 tlpO见坑、 tlp 1嚷掠、 tlp2。最終的分配結(jié)果為 :
消費(fèi)者 CO: tOpO荞驴、 tOpL tlpO不皆、 tlpl
消費(fèi)者 Cl: t0p2, tlp2
可以明顯地看到這樣的分配并不均勻,如果將類似的情形擴(kuò)大熊楼, 則有可能出現(xiàn)部分消費(fèi)者 過載的情況霹娄。對此我們再來看另一種 RoundRobinAssignor策略的分配效果如何。
7.1.2 RoundRobinAssignor 分配策略
RoundRobinAssignor 分配策略的原理是將消費(fèi)組內(nèi)所有消費(fèi)者及消費(fèi)者訂閱的所有主題的分 區(qū)按照字典序 排序鲫骗,然后通過輪詢方式逐 個(gè)將分區(qū)依次分配給每個(gè)消費(fèi)者犬耻。 RoundRobinAssignor 分配策略對應(yīng)的 partition.assignment.strategy 參數(shù)值為 org.apache.kafka.clients.consumer.RoundRobinAssignor。
如果同一個(gè)消費(fèi)組內(nèi)所有的消費(fèi)者的訂閱信息都是相同的执泰,那么 RoundRobinAssignor分配 策略 的分區(qū)分配會是均勻的枕磁。舉個(gè)例子术吝,假設(shè)消費(fèi)組中有 2 個(gè)消費(fèi)者 co 和 Cl计济,都訂閱了主題 tO 和 tl,并且每個(gè)主題都有 3 個(gè)分區(qū) 排苍, 那么訂閱的所有分區(qū)可以標(biāo)識為: tOpO沦寂、 tOpl、 t0p2淘衙、 tlpO传藏、 tlpl、 tlp2。最終的分配結(jié)果為 :
消費(fèi)者 CO: tOpO毯侦、 t0p2, tlpl
消費(fèi)者 Cl: tOpl, tlpO西壮、 tlp2
如果同一個(gè)消費(fèi)組內(nèi)的消費(fèi)者訂閱的信息是不相同的,那么在執(zhí)行分區(qū)分配的時(shí)候就不是 完全的輪詢分配叫惊,有可能導(dǎo)致分區(qū)分配得不均勻。如果某個(gè)消費(fèi)者沒有訂閱消費(fèi)組內(nèi)的某個(gè)主 題做修,那么在分配分區(qū)的時(shí)候此消費(fèi)者將分配不到這個(gè)主題的任何分區(qū)霍狰。
舉個(gè)例子,假設(shè)消費(fèi)組內(nèi)有 3個(gè)消費(fèi)者 CCO饰及、 Cl 和 C刀蔗坯,它們共訂閱了 3個(gè)主題(tO、 tl燎含、 t刀宾濒,這 3個(gè)主題分別有 l、 2屏箍、 3個(gè)分區(qū)绘梦,即整個(gè)消費(fèi)組訂閱了 tOpO、 tlpO赴魁、 tlpl卸奉、 t2p0、 t2pl颖御、 t2p2這6個(gè)分區(qū)榄棵。 具體而言,消費(fèi)者co訂閱的是主題tO潘拱,消費(fèi)者Cl訂閱的是主題tO和tl. 消費(fèi)者 C2 訂閱的是主題 tO疹鳄、 ti 和 t2, 那么最終的分配結(jié)果為 :
消費(fèi)者CO: 消費(fèi)者Cl: 消費(fèi)者C2:
tOpO
tlpO
tlpL t2p0芦岂、 t2pL t2p2
可以看到 RoundRobinAssignor策略也不是十分完美瘪弓,這樣分配其實(shí)并不是最優(yōu)解,因?yàn)橥?全可以將分區(qū) tlpl 分配給消費(fèi)者 Cl盔腔。
7.1.3 StickyAssignor分配策略
我們再來看一下 StickyAssignor分配策略杠茬,“sticky”這個(gè)單詞可以翻譯為“黠性的”, Kafka從 0.11.x 版本開始引入這種分配策略弛随,它主要有兩個(gè)目的 :
Cl )分區(qū)的分配要盡可能均勻 瓢喉。
(2)分區(qū)的分配盡可能與上次分配的保持相同。
當(dāng)兩者發(fā)生沖突時(shí)舀透,第 一個(gè)目標(biāo)優(yōu)先于第二個(gè)目標(biāo) 栓票。 鑒于這兩個(gè)目標(biāo) , StickyAssignor 分配 策略的具體實(shí)現(xiàn)要比 RangeAssignor 和 RoundRobinAssignor 這兩種分配策略要復(fù)雜得多 。 我們 舉例來看一下 StickyAssignor 分配策略的實(shí)際效果 走贪。
假設(shè)消費(fèi)組內(nèi)有3個(gè)消費(fèi)者 (CO佛猛、 Cl和C2),它們都訂閱了4個(gè)主題 (tO坠狡、 tl继找、 t2、 t3) ' 并且每個(gè)主題有 2 個(gè) 分區(qū) 逃沿。 也就是說婴渡,整個(gè)消費(fèi)組訂閱了 tOpO、 tOpl凯亮、 tlpO边臼、 tlpl、 t2p0假消、 t2pl柠并、 t3p0、 t3pl這8個(gè)分區(qū)富拗。 最終的分配結(jié)果如下:
這樣初看上去似乎與采用 RoundRobinAssignor分配策略所分配的結(jié)果相同臼予,但事實(shí)是否真 的如此呢?再假設(shè)此時(shí)消費(fèi)者 Cl 脫離了消費(fèi)組,那么消費(fèi)組就會執(zhí)行再均衡操作媒峡,進(jìn)而消費(fèi) 分區(qū)會重新分配瘟栖。如果采用 RoundRobinAssignor分配策略,那么此時(shí)的分配結(jié)果如下:
可以看到分配結(jié)果中保留了上一次分配中對消費(fèi)者 co和 C2 的所有分配結(jié)果谅阿,并將原來消 費(fèi)者 Cl 的“ 負(fù)擔(dān)”分配給了剩余的兩個(gè)消費(fèi)者 co 和 C2半哟, 最終 co 和 C2 的分配還保持了均衡 。
如果發(fā)生分區(qū)重分配签餐,那么對于同一個(gè)分區(qū)而言寓涨,有可能之前的消費(fèi)者和新指派的消費(fèi)者 不是同一個(gè),之前消費(fèi)者進(jìn)行到 一半的處理還要在新指派的消費(fèi)者中再次復(fù)現(xiàn)一遍氯檐,這顯然很 瑯費(fèi)系統(tǒng)資源戒良。 StickyAssignor 分配策略如同其名稱中的“ sticky” 一樣,讓分配策略具備一定 的“勤性”冠摄,盡可能地讓前后兩次分配相同糯崎,進(jìn)而減少系統(tǒng)資源的損耗及其他異常情況的發(fā)生 。
到目前為止河泳,我們分析的都是消 費(fèi)者的訂閱信息都是相同的情況沃呢,我們來看一下訂閱信息 不同的情況下的處理。
舉個(gè)例子拆挥,同樣消費(fèi)組內(nèi)有3個(gè)消費(fèi)者 CCO薄霜、 Cl和C2), 集群中有3個(gè)主題 CtO、 tl和 t2)惰瓜,這 3 個(gè)主題分別有 1否副、 2、 3 個(gè)分區(qū) 崎坊。 也就是說备禀, 集群中有 tOpO、 tlpO奈揍、 tlpl痹届、 t2p0、 t2pl打月、 t2p2這6個(gè)分區(qū)。 消費(fèi)者co訂閱了主題tO蚕捉,消費(fèi)者Cl訂閱了主題tO和tl奏篙,消費(fèi)者C2訂閱了 主題tO、 ti和t2迫淹。
如果此時(shí)采用 RoundRobinAssignor分配策略秘通,那么最終的分配結(jié)果如分配清單 7-1 所示(和 講述 RoundRobinAssignor分配策略時(shí)的一樣,這樣不妨贅述一下):
可以看到這才是一個(gè)最優(yōu)解(消費(fèi)者 co 沒有訂閱主題 tl 和 t2敛熬,所以不能分配主題 tl 和 t2 中的任何分區(qū)給它肺稀,對于消費(fèi)者 Cl 也可同理推斷〉 。
StickyAssignor分配策略比另外兩者分配策略而言顯得更加優(yōu)異嘉抓,這個(gè)策略的 代碼實(shí)現(xiàn)也異常復(fù)雜 臂聋,
7.1.4 自定義分區(qū)分配策略
讀者不僅可以任意選用 Kafka提供的 3種分配策略螃诅, 還可以自定義分配策略來實(shí)現(xiàn)更多可 選的功能 。自定義 的 分配策略 必 須要實(shí) 現(xiàn) org.apache.kafka.clients.consumer.intemals. PartitionAssignor接口繁仁。 PartitionAssignor接口的定義如下:
Subscription subscription(Set<String> top工cs); String name() ;
PartitionAssignor接口中定義了兩個(gè)內(nèi)部類: Subscription和 Assignment。
Subscription 類用來表示消費(fèi)者 的訂閱 信息归园,類中有兩 個(gè)屬性: topics 和 userData黄虱,分 別表示消費(fèi)者 的訂閱主題列表和用戶自 定義信息。 PartitionAssignor 接口通過 subscription()方法 來設(shè)置消費(fèi)者自身相關(guān)的 Subscription 信 息庸诱,注意到此方法中只有 一 個(gè)參數(shù) topics 捻浦, 與 Subscription 類中的 topics 的相呼應(yīng),但并沒有體現(xiàn)有關(guān) userData 的參數(shù) 桥爽。 為了增強(qiáng)用戶 對分配結(jié)果的控制朱灿,可以在 subscription()方法內(nèi)部添加 一 些影 響 分配的用戶自定義信息賦予 userData,比如權(quán)重聚谁、 IP 地址 母剥、 host 或機(jī)架 Crack)等 。
舉個(gè)例子,在 subscription()方法 中提供機(jī)架 信息 环疼,標(biāo)識此消費(fèi)者所部署的機(jī)架位置习霹,在分 區(qū)分配時(shí)可以根據(jù)分區(qū)的 leader 副本所在的機(jī)架位置來實(shí)施具體的分配,這樣可以讓消費(fèi)者與 所需拉取消息的 broker 節(jié)點(diǎn)處于同 一機(jī)架 炫隶。參考圖 7-1淋叶, 消費(fèi)者 consumer!和 brokerl 都部署在 機(jī)架 rackl 上,消 費(fèi)者 consumer2 和 broker2 都部署在機(jī)架 rack2 上 伪阶。 如果分區(qū)的分配不是機(jī)架 感知的煞檩,那么有可能與圖 7”1 (上半部分)中的分配結(jié)果一樣, consumerl 消費(fèi) broker2 中的分 區(qū)栅贴,而 consumer2 消費(fèi) brokerl 中的分區(qū) ; 如果分區(qū)的分配是機(jī)架感知的 斟湃, 那么就會出現(xiàn)圖 7-1(下半部分〉的分配結(jié)果, consumer!消 費(fèi) broker! 中的分區(qū)檐薯,而 consumer2 消費(fèi) broker2 中的 分區(qū)凝赛,這樣相 比前一種情形,既可以減少消費(fèi)延時(shí)坛缕,又可以減少跨機(jī)架帶寬的占用 墓猎。
再來說一下 Assignment類,它用來表示分配結(jié)果信息赚楚, 類中也有兩個(gè)屬性: partitions 和 userData毙沾, 分別表示所分配到的分區(qū)集合和用戶自定義的數(shù)據(jù) 。 PartitionAssignor 接口中的 onAssignment()方法是在每個(gè)消費(fèi)者收到消費(fèi)組 leader 分配結(jié)果時(shí)的回調(diào)函數(shù)宠页,例如在 StickyAssignor 分配策略中就是通過這個(gè)方法保存當(dāng)前的分配方案左胞,以備在下次消費(fèi)組再均衡
(rebalance)時(shí)可以提供分配參考依據(jù) 。
接口中的 name()方法用來提供分配策略的名稱举户,對 Kafka 提供的 3 種分配策略而言罩句, RangeAssignor 對應(yīng)的 protocol_name 為“ range”, RoundRobinAssignor 對應(yīng)的 protocol name 為“ roundrobin”敛摘, StickyAssignor 對應(yīng)的 protocol_name 為“ sticky”门烂,所以自定義的分配策略 中 要注意命名的時(shí)候不要與己存在的分配策略發(fā)生沖突。這個(gè)命名用來標(biāo)識分配策略的名稱兄淫, 在后面所描述的加入消費(fèi)組及選舉消費(fèi)組 leader 的時(shí)候會有涉及 屯远。
真正的分區(qū)分配方案的實(shí)現(xiàn)是在 assign()方法中,方法中的參數(shù) metadata表示集群的元數(shù)據(jù) 信息捕虽,而 subscriptions 表示消費(fèi)組內(nèi)各個(gè)消費(fèi)者成員的訂閱信息慨丐,最終方法返 回各個(gè)消費(fèi)者的 分配信息。
Kafka 還提供了一個(gè)抽象類 org.apache.kafka.clients.consumer.intemals.AbstractPartitionAssignor, 它可以簡化實(shí)現(xiàn) PartitionAssignor 接口的工作泄私,井對 assign()方法進(jìn)行了詳細(xì)實(shí)現(xiàn)房揭, 其中會將 Subscription中的 us巳rData信息去掉后再進(jìn)行分配备闲。 Kafka提供的 3種分配策略都繼承自這個(gè)抽 象類 。 如果開發(fā)人員在自定義分區(qū)分配策略時(shí)需要使用 userData 信息來控制分區(qū)分配的結(jié)果捅暴, 那么就不能直接繼承 AbstractPartitionAssignor 這個(gè)抽象類恬砂,而需 要直接實(shí)現(xiàn) PartitionAssignor 接口 。
下面筆者參考 Kafka 的 RangeAssignor 分配策略來自定義 一個(gè)隨機(jī) 的分配策略蓬痒,這里筆者稱 之為 RandomAssignor泻骤,具體代碼實(shí)現(xiàn)如下:
package chapter7;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
/**
*/
public class RandomAssignor extends AbstractPartitionAssignor {
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {
Map<String, List<String>> consumersPerTopic =
consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet()) {
assignment.put(memberId, new ArrayList<>());
}
//針對每一個(gè)主題進(jìn)行分區(qū)分配
for (Map.Entry<String, List<String>> topicEntry :
consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
List<String> consumersForTopic = topicEntry.getValue();
int consumerSize = consumersForTopic.size();
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null) {
continue;
}
//當(dāng)前主題下的所有分區(qū)
List<TopicPartition> partitions =
AbstractPartitionAssignor.partitions(topic,
numPartitionsForTopic);
//將每個(gè)分區(qū)隨機(jī)分配給一個(gè)消費(fèi)者
for (TopicPartition partition : partitions) {
int rand = new Random().nextInt(consumerSize);
String randomConsumer = consumersForTopic.get(rand);
assignment.get(randomConsumer).add(partition);
}
}
return assignment;
}
@Override
public String name() {
return "name";
}
private Map<String,List<String>> consumersPerTopic(Map<String,Subscription> consumerMetadata){
Map<String, List<String>> res = new HashMap<>();
for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
String consumerId = subscriptionEntry.getKey();
for (String topic : subscriptionEntry.getValue().topics()) {
put(res, topic, consumerId);
}
}
return res;
}
}
在使用時(shí), 消費(fèi)者客戶端需要添加相 應(yīng) 的 Properties 參數(shù) 梧奢, 示例如 下 :
properties .put(ConsumerConfig . PARTITION ASSIGNMENT STRATEGY CONFIG,
RandomAssignor .cl ass.getName ());
這里只是演示如何自定義實(shí)現(xiàn)一個(gè)分區(qū)分配策略狱掂, RandomAssignor 的實(shí)現(xiàn)并不是特別理 想, 并不見得會比 Kafka 自身提供的 RangeAssignor之類的策略要好亲轨。
在第3章中陳述了一個(gè)事實(shí): 按照Kafka默認(rèn)的消費(fèi)邏輯設(shè)定趋惨, 一個(gè)分區(qū)只能被同一個(gè)消 費(fèi)組(ConsumerGroup) 內(nèi)的一個(gè)消費(fèi)者消費(fèi)。 但這一設(shè)定不是絕對的惦蚊,我們可以通過自定義 分區(qū)分配策略使一個(gè)分區(qū)可以分配給多個(gè)消費(fèi)者消費(fèi)希柿。
考慮一種極端情況, 同一消費(fèi)組 內(nèi)的任意消費(fèi)者都可以消費(fèi)訂閱主題的所有分區(qū)养筒, 從而實(shí) 現(xiàn)了一種“組內(nèi)廣播(消費(fèi))”的功能。 針對第3章中圖3-4的7個(gè)分區(qū)和3個(gè)消費(fèi)者的情形端姚,
如果采用組內(nèi)廣播的分配策略 晕粪, 那么就會變成圖 7-2 中的這種分配結(jié)果。
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class BroadcastAssignor extends AbstractPartitionAssignor {
@Override
public String name() {
return "broadcast";
}
private Map<String, List<String>> consumersPerTopic(
Map<String, Subscription> consumerMetadata) {
//(具體實(shí)現(xiàn)請參考 RandomAssignor 中的 consumersPerTopic()方法)
return null;
}
@Override
public Map<String, List<TopicPartition>> assign(
Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<String>> consumersPerTopic =
consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
//Java8
subscriptions.keySet().forEach(memberId ->
assignment.put(memberId, new ArrayList<>()));
//針對每一個(gè)主題渐裸,為每一個(gè)訂閱的消費(fèi)者分配所有的分區(qū)
consumersPerTopic.entrySet().forEach(topicEntry->{
String topic = topicEntry.getKey();
List<String> members = topicEntry.getValue();
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null || members.isEmpty())
return;
List<TopicPartition> partitions = AbstractPartitionAssignor
.partitions(topic, numPartitionsForTopic);
if (!partitions.isEmpty()) {
members.forEach(memberId ->
assignment.get(memberId).addAll(partitions));
}
});
return assignment;
}
}
注意組內(nèi)廣播的這種實(shí)現(xiàn)方式會有一個(gè)嚴(yán)重的問題一一默認(rèn)的消費(fèi)位移的提交會失效巫湘。所 有的消費(fèi)者都會提交它自身的消費(fèi)位移到 consumer_offsets 中 , 后提交的消費(fèi)位移會覆蓋前面 提交的消費(fèi)位移昏鹃。
假設(shè)消費(fèi)者 consumerl 提交了分區(qū) tpO 的消 費(fèi)位移為 10尚氛, 這時(shí)消費(fèi)者 consumer2 緊接著提 交了同一分區(qū) tpO 的消費(fèi)位移為 12,如果此時(shí)消費(fèi)者 consumer!由于某些原因重啟了 洞渤,那么 consum巳rl 就會從位移 12 之后重新開始消費(fèi)阅嘶,這樣 consumer! 就丟失了部分消息。
再考慮另一種情況载迄,同樣消費(fèi)者 consumerl 提交了分區(qū) tpO 的消費(fèi)位移為 10讯柔, 這時(shí)消費(fèi)者 consumer2 緊接著提交了同 一分區(qū)的消費(fèi)位移為 8,如果此時(shí)消費(fèi)者 consumerl 由于某些原因重 啟了护昧,那么 consumerl 就會從位移 8 之后重新開始消費(fèi)魂迄,這樣 consumerl 就重復(fù)消費(fèi)了消息。 很多情形下 惋耙, 重復(fù)消費(fèi)少量消息對于上層業(yè)務(wù)應(yīng)用來說可以忍受捣炬。但是設(shè)想這樣一種情況 熊昌, 消 費(fèi)組 內(nèi)的消 費(fèi)者對于分區(qū) tpO 的 消費(fèi)位移都在 100000 之后了, 此時(shí) 又有一 個(gè)新的消 費(fèi)者 consumer3 加入進(jìn)來湿酸,消費(fèi)了部分消息之后提交了 tpO 的消費(fèi)位移為 9婿屹, 那么此時(shí)原消費(fèi)組內(nèi)的 任何消 費(fèi)者重啟都會從這個(gè)消 費(fèi)位移 9 之后再開始重新消費(fèi) ,這樣大量的重復(fù)消息會讓上層業(yè) 務(wù)應(yīng)用猝不及防稿械,同樣會造成計(jì)算資源的浪費(fèi) 选泻。
針對上述這種情況,如果要真正實(shí)現(xiàn)組內(nèi)廣播美莫,則需要自己保存每個(gè)消費(fèi)者的消費(fèi)位移 页眯。 筆者的實(shí)踐經(jīng)驗(yàn)是,可以通過將消費(fèi)位移保存到本地文件或數(shù)據(jù)庫中等方法來實(shí)現(xiàn)組內(nèi)廣播的 位移提交厢呵。
7.2 消費(fèi)者協(xié)調(diào)器和組協(xié)調(diào)器
了解了 Kafka 中消費(fèi)者的分區(qū)分配策略之后是否會有這樣的疑問:如果消費(fèi)者客戶端中配 置了兩個(gè)分配策略窝撵,那么以哪個(gè)為準(zhǔn)呢?如果有 多個(gè)消費(fèi)者,彼此所配置的分配策略并不完全 相同襟铭,那么以哪個(gè)為準(zhǔn)?多個(gè)消費(fèi)者之間的分區(qū)分配是需要協(xié)同的碌奉,那么這個(gè)協(xié)同的過程又是 怎樣的呢?這一切都是交由消費(fèi)者協(xié)調(diào)器( ConsumerCoordinator )和組協(xié)調(diào)器(GroupCoordinator)來完成的,它們之間使用 一套組協(xié)調(diào)協(xié)議進(jìn)行交互 寒砖。
7.2.1 舊版消費(fèi)者客戶端的問題
消費(fèi)者協(xié)調(diào)器和組協(xié)調(diào)器的概念是針對新版的消費(fèi)者客戶端而言的赐劣, Kafka 建立之初并沒 有它們。舊版的消費(fèi)者客戶端是使用 ZooKeeper 的監(jiān)聽器( Watcher〕來實(shí)現(xiàn)這些功能的哩都。
每個(gè)消費(fèi)組(<group>)在 ZooKeeper 中都維護(hù)了 一個(gè)/ consumers/<group>/ids 路徑魁兼, 在此路徑下使用臨時(shí)節(jié)點(diǎn)記錄隸屬于此消費(fèi)組的消費(fèi)者的唯 一標(biāo)識( consumerldString) , consumerldString由消費(fèi)者啟動時(shí)創(chuàng)建。消費(fèi)者的唯一標(biāo)識由 consumer.id+主機(jī)名+時(shí)間戳+UUID 的部分信息構(gòu)成漠嵌,其中 consumer.id 是舊版消 費(fèi) 者客戶端中的配置咐汞,相當(dāng)于新版客戶端中的 client.id。比如某個(gè)消費(fèi)者的唯一標(biāo)識為 consumerld localhost-1510734527562-64b377f5儒鹿, 那么其中 consumerId 為指定的 consumer.id, localhost 為計(jì)算機(jī)的主機(jī)名化撕, 1510734527562 代表時(shí)間戳,而 64b377f5 表示 UUID 的部分信息 约炎。
參考圖 7-4植阴,與/consumers/<group>/工ds 同級的還有兩個(gè)節(jié)點(diǎn): owners 和 offsets, /consumers/<group>/owner 路徑下記錄了分區(qū)和消費(fèi)者的對應(yīng)關(guān)系,/ consumers/ <group>/offsets 路徑下記錄了 此消 費(fèi)組在分區(qū)中對應(yīng)的消費(fèi)位移圾浅。
每個(gè) broker墙贱、 主題和分區(qū)在 ZooKeeper 中也都對應(yīng)一個(gè)路徑 : /brokers/ids/<id>記錄 了 host、 port及分配在此 broker上的主題分區(qū)列表; /brokers/topics/<topic>記錄了每 個(gè)分區(qū)的 leader 副本贱傀、 ISR 集合等信息 惨撇。/brokers/topics/<topic>/partitions/ <partition>/state記錄了當(dāng)前 leader副本、 leader epoch等信息府寒。
每個(gè)消費(fèi)者在啟動時(shí)都會在/ consumers/<group>/ids 和/brokers/ids 路徑上注冊 一個(gè)監(jiān)聽器魁衙。當(dāng)/consumers/<group>/ids 路徑下的子節(jié)點(diǎn)發(fā)生變化時(shí)报腔,表示消費(fèi)組中的消 費(fèi)者發(fā)生了變化;當(dāng)/ brokers/ids 路徑下的子節(jié)點(diǎn)發(fā)生變化時(shí),表示 broker 出現(xiàn)了增減 剖淀。 這 樣通過 ZooKeeper所提供的 Watcher纯蛾, 每個(gè)消費(fèi)者就可以監(jiān)昕消費(fèi)組和 Kafka集群的狀態(tài)了 。
這種方式下每個(gè)消費(fèi)者對 ZooKeeper 的相關(guān)路徑分別進(jìn)行監(jiān)聽纵隔, 當(dāng)觸發(fā)再均衡操作時(shí)翻诉, 一 個(gè)消費(fèi)組下的所有消費(fèi)者會同時(shí)進(jìn)行再均衡操作,而消費(fèi)者之間并不知道彼此操作的結(jié)果捌刮,這 樣可能導(dǎo)致 Kafka工作在一個(gè)不正確的狀態(tài)碰煌。 與此同時(shí),這種嚴(yán)重依賴于 ZooKeeper集群的做 法還有兩個(gè) 比較嚴(yán)重的問題 绅作。
(1)羊群效應(yīng)(HerdEffect) : 所謂的羊群效應(yīng)是指 ZooK巳eper中一個(gè)被監(jiān)聽的節(jié)點(diǎn)變化芦圾, 大量的 Watcher 通知被發(fā)送到客戶端 , 導(dǎo)致在通知期間的其他操作延遲俄认,也有可能發(fā)生類似死 鎖的情況 个少。
(2)腦裂問題( Split Brain) : 消費(fèi)者進(jìn)行再均衡操作時(shí)每個(gè)消費(fèi)者都與 ZooKeeper 進(jìn)行 通信以判斷消費(fèi)者或 broker變化的情況,由于 ZooKeeper本身的特性眯杏,可能導(dǎo)致在同一時(shí)刻各 個(gè)消 費(fèi)者獲取的狀態(tài)不一致 夜焦, 這樣會導(dǎo)致異常 問題 發(fā)生。
7.2.2 再均衡的原理
新版的消費(fèi)者客戶端對此進(jìn)行了重新設(shè)計(jì) 岂贩, 將全部消費(fèi)組分成多個(gè)子集 茫经, 每個(gè)消費(fèi)組的子 集在服務(wù)端對應(yīng)一個(gè) GroupCoordinator 對其進(jìn)行管理, GroupCoordinator 是 Kafka 服務(wù)端 中用于 管理消費(fèi)組的組件河闰。而消費(fèi)者客戶端中的 ConsumerCoordinator組件負(fù)責(zé)與 GroupCoordinator進(jìn) 行交互 。
Consum巳rCoordinator 與 GroupCoordinator 之間最重要 的職 責(zé)就是負(fù)責(zé)執(zhí)行消費(fèi)者再 均衡的 操作褥紫,包括前面提及的分區(qū)分配的工作也是在再均衡期間完成的姜性。就目前而言 , 一共有 如下幾 種情形會觸發(fā)再均衡的操作 :
- 有新的消 費(fèi)者 加入 消費(fèi)組髓考。
- 有消費(fèi)者若機(jī)下線 部念。 消費(fèi)者并不一定需要真正下線,例如遇到長時(shí)間的 GC氨菇、網(wǎng)絡(luò)延 遲導(dǎo)致消費(fèi)者長時(shí)間未向 GroupCoordinator 發(fā)送心跳等情況時(shí)儡炼, GroupCoordinator 會認(rèn) 為消費(fèi)者己經(jīng)下線。
- 有消費(fèi)者主動退出消費(fèi)組(發(fā)送 LeaveGroupRequest 請求) 查蓉。 比如客戶端調(diào)用了 unsubscrible()方法取消對某些主題 的訂閱 乌询。
- 消費(fèi)組所對應(yīng)的 GroupCoorinator節(jié)點(diǎn)發(fā)生了變更。
- 消費(fèi)組內(nèi)所訂閱的任一主題或者主題的分區(qū)數(shù)量發(fā)生變化豌研。
下面就以一個(gè)簡單的例子來講解一下再均衡操作的具體內(nèi)容 妹田。當(dāng)有消費(fèi)者加入消費(fèi)組時(shí)唬党,消費(fèi)者、消費(fèi)組及組協(xié)調(diào)器之間會經(jīng)歷 一 下 幾個(gè) 階段鬼佣。
第一階段( FIND_COORDINATOR)
消費(fèi)者需要確定它所屬的消費(fèi)組對應(yīng)的 GroupCoordinator所在的 brok巳r驶拱,并創(chuàng)建與該 broker 相互通信的網(wǎng)絡(luò)連接 。 如果消 費(fèi)者 己經(jīng)保存了與消費(fèi)組對應(yīng)的 GroupCoordinator 節(jié)點(diǎn)的信息晶衷, 并且與它之間的網(wǎng)絡(luò)連接是正常的蓝纲,那么就可以進(jìn)入第二階段。否則晌纫, 就需要向集群中 的某個(gè) 節(jié)點(diǎn)發(fā)送 FindCoordinatorRequest請求來查找對應(yīng)的 GroupCoordinator税迷, 這里的“某個(gè)節(jié)點(diǎn)” 并 非是集群中的任意節(jié)點(diǎn),而是負(fù)載最小的節(jié)點(diǎn)缸匪,即 2.2.2 節(jié)中的leastLoadedNode翁狐。
如圖 7-5 所示, FindCoordinatorRequest請求體中只有兩個(gè)域( Field) : coordinator key 和 coord工nator_typeo coordinator_key 在這里就是消費(fèi)組的名稱 凌蔬, 即 groupid, coordinator type 置為 0露懒。
Kafka 在收到 FindCoordinatorRequest 請求之后,會根據(jù) coordinator_key (也就是 groupld)查找對應(yīng)的 GroupCoordinator 節(jié)點(diǎn)砂心,如果找到對應(yīng)的 GroupCoordinator 則會返回其相 對應(yīng)的 node_id懈词、 host 和 port 信息。
具體查找 GroupCoordinator的方式是先根據(jù)消費(fèi)組 groupid的晗希值計(jì)算_consumer_offsets 中的分區(qū)編號辩诞,具體算法如代碼清單 7-1 所示坎弯。 :
代碼清單 7-1 消費(fèi)組所對應(yīng)的分區(qū)號的計(jì)算方式 Utils.abs(groupid.hashCode ) % groupMetadataTopicPartit 工 onCount
其中 groupid.hashCode 就是使用 Java 中 String 類的 hashCode()方法獲得的, groupMetadataTopicPartitio口Cou口t 為主題 consumer一offsets 的分區(qū)個(gè)數(shù)译暂,這個(gè)可以 通過 broker端參數(shù) offsets.topic.num.partitions 來配置抠忘,默認(rèn)值為 50。
找到對應(yīng)的 consumer offsets 中的分區(qū)之后外永,再尋找此分區(qū) leader副本所在的 broker節(jié)點(diǎn)崎脉, 該 broker節(jié)點(diǎn)即為這個(gè) groupld所對應(yīng)的 GroupCoordinator節(jié)點(diǎn)。消費(fèi)者 groupId最終的分區(qū)分 配方案及組內(nèi)消費(fèi)者所提交的消費(fèi)位移信息都會發(fā)送給此分區(qū) leader副本所在的 broker節(jié)點(diǎn)伯顶, 讓此 broker節(jié)點(diǎn)既扮演 GroupCoordinator 的角色囚灼,又扮演保存分區(qū)分配方案和組內(nèi)消費(fèi)者位移 的角色祭衩,這樣可以省去很多不必要的中間輪轉(zhuǎn)所帶來的開銷 。
第二階段(JOIN GROUP)
在成功找到消費(fèi)組所對應(yīng)的 GroupCoordinator 之后就進(jìn)入加入消費(fèi)組的階段,在此階段的 消費(fèi)者會向 GroupCoordinator 發(fā)送 JoinGroupRequest請求层坠,并處理響應(yīng)。
如圖 7-6所示刁笙, JoinGroupR巳quest的結(jié)構(gòu)包含多個(gè)域:
- group_id 就是消費(fèi)組的 id破花,通常也表示為 groupld。
- sessioηtimout 對應(yīng)消費(fèi)端參數(shù) sess工on.timeout.ms疲吸,默認(rèn)值為 10000座每,即10 秒 。 GroupCoordinator 超過 session_timeout 指 定 的時(shí)間內(nèi)沒有收到心跳報(bào)文則 認(rèn)為此消 費(fèi)者已經(jīng)下線摘悴。
- rebalance timeout 對應(yīng)消費(fèi)端參數(shù) max .poll . interval.ms 峭梳, 默認(rèn)值 為 300000,即 5 分鐘 蹂喻。表示當(dāng)消費(fèi)組再平衡的時(shí)候葱椭, GroupCoordinator 等待各個(gè)消費(fèi)者 重新加入的最長等待時(shí)間 。
- member id 表示 GroupCoordinator 分配給消費(fèi)者的 id 標(biāo)識口四。 消費(fèi)者第一次發(fā)送 JoinGroupRequest請求的時(shí)候此字段設(shè)置為 nulla
- protocol_type 表示消費(fèi)組實(shí)現(xiàn)的協(xié)議孵运,對于消費(fèi)者而言此字段值為“consumer”。
JoinGroupRequest 中的 group protocols 域?yàn)閿?shù)組類型蔓彩,其中可以囊括多個(gè)分區(qū)分配策 略治笨,這個(gè)主要取決于消 費(fèi)者客戶端參數(shù) pa:::-titio口 .assignment. strategy 的配置。 如果 配置了多種策略赤嚼,那么 JoinGroupRequest 中就會包含多個(gè) protocol name 和 protocol metadata旷赖。其中 protocol name 對應(yīng)于 PartitionAssignor 接口中的 name()方法,我們在講 述消費(fèi)者分區(qū)分配策略的時(shí)候提及過相關(guān)內(nèi)容(參考 7.1.4 節(jié)) 更卒。 而 protocol metadata 和 PartitionAssignor接口中的 subscription()方法有直接關(guān)系等孵, protocol_metadata 是一個(gè) bytes 類型,其實(shí)質(zhì)上還可以更細(xì)粒度地劃分為 version逞壁、 topics 和 user data流济,如圖 7-7 所示锐锣。
version 占 2個(gè)字節(jié)腌闯,目前其值固定為 0; topics 對應(yīng) PartitionAssignor接口的 subscription() 方法返回值類型 Subscription 中的 topics,代表一個(gè)主題列表; user_data 對應(yīng) Subscription 中 的 userData雕憔,可以為空 姿骏。
如果是原有的消費(fèi)者重新加入消費(fèi)組,那么在真正發(fā)送 JoinGroupRequest 請求之前還要執(zhí) 行一些準(zhǔn)備工作:
(1)如果消費(fèi)端參數(shù)enable.auto.commit設(shè)置為true(默認(rèn)值也為true)斤彼, 即開啟自 動提交位移功能分瘦,那么在請求加入消費(fèi)組之前需要向 GroupCoordinator 提交消費(fèi)位移蘸泻。這個(gè)過 程是阻塞執(zhí)行的,要么成功提交消費(fèi)位移嘲玫,要么超時(shí)悦施。
(2)如果消 費(fèi)者添加了自定義的再均衡監(jiān)聽器( ConsumerRebalanceListener),那么此時(shí) 會調(diào)用 onPartitionsRevoked()方法在重新加入消費(fèi)組之前實(shí)施自定義的規(guī)則邏輯去团,比如清除一些 狀態(tài)抡诞,或者提交消費(fèi)位移 等。
(3)因?yàn)槭侵匦录尤胂M(fèi)組土陪,之前與 GroupCoordinator節(jié)點(diǎn)之間的心跳檢測也就不需要了昼汗, 所以在成功地重新加入消費(fèi)組之前 需要禁止 心跳檢測的 運(yùn)作。
消費(fèi)者在發(fā)送 JoinGroupRequest 請求之后會阻塞等待 Kafka 服務(wù)端的響應(yīng)鬼雀。服務(wù)端在收到JainGroupRequest 請求 后會交由 GroupCoordinator 來進(jìn)行處理 顷窒。 GroupCoordinator 首先 會對 JoinGroupRequest 請求做合法性校驗(yàn),比如 group 工d 是否為空源哩、當(dāng)前 broker 節(jié)點(diǎn)是否是請求 的消費(fèi)者組所對應(yīng)的組協(xié)調(diào)器鞋吉、 rebalance timeout 的值是否在合理的范圍之內(nèi)。如果消費(fèi) 者是第一次請求加入消費(fèi)組璧疗,那么 JoinGroupRequest 請求中的 member_id 值為 null坯辩,即沒有 它自身的唯一標(biāo)志,此時(shí)組協(xié)調(diào)器負(fù)責(zé)為此消費(fèi)者生成一個(gè) member id崩侠。這個(gè)生成的算法很 簡單漆魔,具體如以下偽代碼所示。
Stringmemberid= clientid+ ”+ UUID.randomUUID().toStr工ng();
其中 clientld 為消費(fèi)者客戶端的 clientld却音,對應(yīng)請求頭中的 client id改抡。由此可見消費(fèi)者的member 工d 由 clientld 和 UUID 用“-” 字符拼接而成。
選舉消費(fèi)紐的 leader
GroupCoordinator 需要為消費(fèi)組內(nèi)的消費(fèi)者選舉出一個(gè)消費(fèi)組的 leader系瓢,這個(gè)選舉的算法也 很簡單阿纤,分兩種情況分析。如果消費(fèi)組內(nèi)還沒有 leader夷陋,那么第一個(gè)加入消費(fèi)組的消費(fèi)者即為 消費(fèi)組的 leader欠拾。如果某一時(shí)刻 leader 消費(fèi)者由于某些原因退出了消費(fèi)組,1 那么會重新選舉一 個(gè)新的 leader骗绕,這個(gè)重新選舉 leader 的過程又更“隨 意”了藐窄,相關(guān)代碼如下 :
//scala code.
pr工vate val members = new mutable.HashMap[String, MemberMetadata] var leaderid = members.keys.head
解釋一下這 2 行代碼:在 GroupCoordinator 中消費(fèi)者的信息是以 HashMap 的形式存儲的,其中 key 為消 費(fèi)者的 member id酬土,而 value 是消 費(fèi)者相關(guān)的元數(shù)據(jù)信息荆忍。 leaderld 表示 leader 消費(fèi)者的 member id,它的取值為 HashMap 中的第一個(gè)鍵值對的 key,這種選舉的方式基本 上和隨機(jī)無異刹枉。 總體上來說叽唱,消費(fèi)組的 l巳ader選舉過程是很隨意的。
選舉分區(qū)分配某咯
每個(gè)消費(fèi)者都可以設(shè)置自己的分區(qū)分配策略微宝,對消費(fèi)組而言需要從各個(gè)消費(fèi)者呈報(bào)上來的 各個(gè)分配策略中選舉一個(gè)彼此都“信服”的策略來進(jìn)行整體上的分區(qū)分配 棺亭。 這個(gè)分區(qū)分配的選 舉并非由 leader消費(fèi)者決定,而是根據(jù)消費(fèi)組內(nèi)的各個(gè)消費(fèi)者投票來決定的蟋软。這里所說的 “根據(jù)組內(nèi)的各個(gè)消費(fèi)者投票來決定”不是指 GroupCoordinator 還要再與各個(gè)消費(fèi)者進(jìn)行進(jìn)一步交 互侦铜,而是根據(jù)各個(gè)消費(fèi)者呈報(bào)的分配策略來實(shí)施。最終選舉的分配策略基本上可以看作被各個(gè) 消費(fèi)者支持的最多的策略钟鸵,具體的選舉過程如下:
(1)收集各個(gè)消費(fèi)者支持的所有分配策略钉稍,組成候選集 candidates。 (2)每個(gè)消費(fèi)者從候選集 candidates 中找出第一個(gè)自身支持的策略棺耍,為這個(gè)策略投上一票贡未。
(3)計(jì)算候選集中各個(gè)策略的選票數(shù),選票數(shù)最多的策略即為當(dāng)前消費(fèi)組的分配策略蒙袍。
如果有消費(fèi)者并不支持選出的分配策略俊卤,那么就會報(bào)出異常 IllegalArgumentException: Member does not supp。此 protocol害幅。 需要注意的是消恍,這里所說的“消費(fèi)者所支持的分配策略”是 指 partition.assignment.strategy 參數(shù)配置的策略,如果這個(gè)參數(shù)值只配置了 RangeAssignor以现, 那么這個(gè)消費(fèi)者客戶端只支持 RangeAssignor 分配策略狠怨,而不是消費(fèi)者客戶端 代碼中實(shí)現(xiàn)的 3 種分配策略及可能的自定義分配策略 。
在此之后邑遏, Kafka 服務(wù)端就要發(fā)送 JoinGroupResponse 響應(yīng)給各個(gè)消費(fèi)者佣赖, leader 消費(fèi)者和 其他普通消費(fèi)者收到的響應(yīng)內(nèi)容并不相同,首先我們看一下 JoinGroupResponse 的具體結(jié)構(gòu)记盒,如 圖 7-8 所 示憎蛤。
JoinGroupRespons巳包含了多個(gè)域,其中 ge口eratio口一工d 用來標(biāo)識當(dāng)前消費(fèi)組的年代信息纪吮,避免受到過期請求的影響俩檬。 leader 工d 表示消費(fèi)組 leader 消費(fèi)者的 member id。
Kafka 發(fā)送給普通消費(fèi)者的 JoinGroupResponse 中的 members 內(nèi)容為空碾盟,而只有 leader 消 費(fèi)者的 JoinGroupResponse 中的 members 包含有效數(shù)據(jù)或辖。members 為數(shù)組類型往弓,其中包含各 個(gè)成員信息 跟压。 member_metadata 為消費(fèi)者的訂閱信息亥揖,與 JoinGroupRequest 中的 protocol metadata 內(nèi)容相同兄墅,不同的是 JoinGroupR巳quest可以包含多個(gè)<protocol 口ame, protocol metadata>的鍵值對,在收到 JoinGroupRequest 之后 哨颂, GroupCoordinator 已經(jīng)選舉 出唯一的分配策略车柠。也就是說, protocol name 己經(jīng)確定( group protocol 〉 骡尽, 那么對應(yīng) 的 protocol metadata 也就確定了遣妥,最終各個(gè)消費(fèi)者收到的 JoinGroupResponse 響應(yīng)中的 member_metadata 就是這個(gè)確定了的 protocol_metadata。 由此可見攀细, Kafka 把分區(qū)分配 的具體分配交還給客戶端箫踩,自身并不參與具體的分配細(xì)節(jié),這樣即使以后分區(qū)分配的策略發(fā)生 了變更谭贪,也只需要重啟消費(fèi)端的應(yīng)用即可境钟,而不需要重啟服務(wù)端。
第三階段( SYNC GROUP)
leader 消費(fèi)者根據(jù)在第二階段中選舉出來的分區(qū)分配策略來實(shí)施具體的分區(qū)分配俭识,在此之 后需要將分配的方案同步給各個(gè)消費(fèi)者慨削,此時(shí) leader 消費(fèi)者并不是直接和其余的普通消費(fèi)者同 步分配方案,而是通過 GroupCoordinator 這個(gè)“中間人”來負(fù)責(zé)轉(zhuǎn)發(fā)同步分配方案的套媚。 在第三 階段缚态,也就是同步階段, 各個(gè)消 費(fèi)者會 向 GroupCoordinator 發(fā)送 SyncGroupRequest 請求來同步 分配方案堤瘤,如圖 7-11 所示玫芦。
我們再來看一下SyncGroupRequest 請求的具體結(jié)構(gòu) ,如 圖 7-12 所示 本辐。 SyncGroupRequest 中的 group id桥帆、 generation id 和 member id 前面都有涉及,這里不再贅述 慎皱。 只有 leader 消費(fèi)者發(fā)送的 SyncGroupRequest 請求中才包含具體的分區(qū)分配方案环葵,這個(gè)分配方案保存在 group ass工gnment 中,而其余消費(fèi)者發(fā)送的 SyncGroupRequest請求中的 group assignment 為空宝冕。
group assignment是一個(gè)數(shù)組類型张遭,其中包含了各個(gè)消費(fèi)者對應(yīng)的具體分配方案 :
member id 表示消費(fèi)者的唯一標(biāo)識,而 member assignment 是與消費(fèi)者對應(yīng)的分配方案地梨,它還可以做更具體的劃分菊卷,
服務(wù)端在收到消費(fèi)者發(fā)送的 SyncGroupRequest 請求之后 會交 由 GroupCoordinator 來負(fù)責(zé)具 體的邏輯處理。 GroupCoordinator 同樣會先對 SyncGroupRequest 請求做合法性校驗(yàn)宝剖,在此之后 會將從 leader 消費(fèi)者發(fā)送過來的分配方案提取出來洁闰,連同整個(gè)消費(fèi)組的元數(shù)據(jù)信息一起存入 Kafka 的 consumer offsets 主題中 , 最后發(fā)送響應(yīng)給各個(gè)消費(fèi)者 以提供給各個(gè)消費(fèi)者各自所屬 的分配方案万细。
這里所說的響應(yīng)就是指 SyncGroupRequest 請求對應(yīng) 的 SyncGroupResponse, SyncGroupResponse
的內(nèi)容很簡單扑眉,里面包含的就是消費(fèi)者對應(yīng)的所屬分配方案, SyncGroupResponse 的結(jié)構(gòu)如圖
7-14 所示,具體字段的釋義可以從前面的內(nèi)容中推測出來腰素,這里就不贅述了聘裁。
當(dāng)消費(fèi)者收到所屬的分配方案之后會調(diào)用 PartitionAssignor 中的 onAssignment()方法。隨后再調(diào)用 ConsumerRebalanceListener 中的 OnPartitionAssigned()方法 弓千。 之后開啟 心跳任務(wù) 衡便, 消費(fèi)者定期 向服 務(wù)端的 GroupCoordinator 發(fā)送 HeartbeatRequest 來確定彼此在線。
消費(fèi)組元數(shù)據(jù)信息
我們知道消費(fèi)者客戶端提交的消費(fèi)位移會保存在Kafka的_consumer_offsets主題中洋访,這里也一樣镣陕,只不過保存的是消費(fèi)組的元數(shù)據(jù)信息(GroupMetadata)。
圖 7-15 中對應(yīng)的就是消費(fèi)組元數(shù)據(jù)信息的具體內(nèi)容格式姻政,上面是消息的 key呆抑,下面是消息 的value≈梗可以看到 key和 value中都包含 version字段理肺,用來標(biāo)識具體的 key和 value的版本信 息 , 不同的版本對應(yīng)的內(nèi)容格式可能并不相同善镰,就目前版本而言妹萨, key 的 version 為 2,而 value 的 version 為 1炫欺, 讀者在理解時(shí)其實(shí)可以忽略這個(gè)字段而探究其他具備特定含義的 內(nèi)容乎完。 key 中除了 versio口就是 group 宇段,它表示消費(fèi)組的名稱品洛,和 JoinGroupRequest 或 SyncGroupRequest 請求中的 group_id 是同 一個(gè)東西 树姨。 雖然 key 中包含了 version 字段, 但確定這條信息所要存儲的分區(qū)還是根據(jù)單獨(dú)的 group 字段來計(jì)算的桥状,這樣就可以保證消費(fèi)組 的元數(shù)據(jù)信息與消費(fèi)組對應(yīng)的 GroupCoordinator 處于同 一個(gè) broker 節(jié)點(diǎn)上帽揪,省去了中間輪轉(zhuǎn)的 開銷。
value 中包含的內(nèi)容有很多辅斟,可以參照和 JoinGroupRequest 或 SyncGroupRequest 請求中的內(nèi)容來理解转晰,具體各個(gè)字段的釋義如下 。
- protocol type:消費(fèi)組實(shí)現(xiàn)的協(xié)議士飒,這里的值為“ consumer”查邢。 * generation:標(biāo)識當(dāng)前消費(fèi)組的年代信息,避免收到過期請求的影響酵幕。 protocol : 消費(fèi)組選取的分區(qū)分配策略扰藕。
- leader: 消費(fèi)組的 lead巳r消費(fèi)者的名稱。
- members: 數(shù)組類型芳撒,其中包含了消費(fèi)組的各個(gè)消費(fèi)者成員信息邓深,圖 7-15 中右邊部分 就是消費(fèi)者成員的具體信息未桥,每個(gè)具體字段都 比較容易辨 別, 需要著重說明的是 subscription 和 assignment 這兩個(gè)字段芥备, 分別代碼消費(fèi)者的訂閱信息和分配信息
第四階段( HEARTBEAT)
進(jìn)入這個(gè)階段之后冬耿,消 費(fèi)組中的所有消費(fèi)者就會處于正常工作狀態(tài)。在正式消費(fèi)之前 门躯,消 費(fèi)者還需要確定拉取消息的起始位置。假設(shè)之前已經(jīng)將最后的消費(fèi)位移提交到了 GroupCoordinator酷师,并且GroupCoordinator將其保存到了Kafka內(nèi)部的一consumer_offsets主題中讶凉, 此時(shí)消費(fèi)者可以通過 OffsetFetchRequest 請求獲取上次提交的消 費(fèi)位移并 從此處繼續(xù)消費(fèi) 。
消費(fèi)者通過向 GroupCoordinator 發(fā)送心跳來維持它們與消費(fèi)組的從屬關(guān)系山孔,以及它們對分區(qū)的所有權(quán)關(guān)系懂讯。只要消費(fèi)者以正常的時(shí)間間隔發(fā)送 心跳 , 就被認(rèn)為是活躍的 台颠,說明它還在讀 取分區(qū)中的消息褐望。 心跳線程是一個(gè)獨(dú)立的線程,可以在輪詢消息的空檔發(fā)送心跳串前。如果消費(fèi)者停 止發(fā)送心跳的時(shí)間足夠長瘫里,則整個(gè)會話就被判定為過期, GroupCoordinator 也會認(rèn)為這個(gè)消費(fèi)者 己經(jīng)死亡荡碾,就會觸發(fā)一次再均衡行為谨读。消費(fèi)者的心跳間隔時(shí)間由參數(shù) heartbeat.interval.ms 指定,默認(rèn)值為 3000坛吁,即 3 秒 劳殖, 這個(gè)參數(shù)必須比 session . timeout.ms 參數(shù)設(shè)定的值要小, 一般情況下 heartbeat. interval.ms 的配置值不能超過 session.timeout.ms配置值的 1/3拨脉。這個(gè)參數(shù)可以調(diào)整得更低哆姻,以控制正常 重新平衡 的預(yù)期時(shí)間 。
消費(fèi)者通過向 GroupCoordinator 發(fā)送心跳來維持它們與消費(fèi)組的從屬關(guān)系玫膀,以及它們對分區(qū)的所有權(quán)關(guān)系矛缨。只要消費(fèi)者以正常的時(shí)間間隔發(fā)送 心跳 , 就被認(rèn)為是活躍的 帖旨,說明它還在讀 取分區(qū)中的消息劳景。 心跳線程是一個(gè)獨(dú)立的線程,可以在輪詢消息的空檔發(fā)送心跳碉就。如果消費(fèi)者停 止發(fā)送心跳的時(shí)間足夠長盟广,則整個(gè)會話就被判定為過期, GroupCoordinator 也會認(rèn)為這個(gè)消費(fèi)者 己經(jīng)死亡瓮钥,就會觸發(fā)一次再均衡行為筋量。消費(fèi)者的心跳間隔時(shí)間由參數(shù) heartbeat.interval.ms 指定烹吵,默認(rèn)值為 3000,即 3 秒 桨武, 這個(gè)參數(shù)必須比 session . timeout.ms 參數(shù)設(shè)定的值要小肋拔, 一般情況下 heartbeat. interval.ms 的配置值不能超過 session.timeout.ms配置值的 1/3。這個(gè)參數(shù)可以調(diào)整得更低呀酸,以控制正常 重新平衡 的預(yù)期時(shí)間 凉蜂。
如果一個(gè)消費(fèi)者發(fā)生崩潰,并停止讀取消息 性誉, 那么 GroupCoordinator 會等待一小段時(shí)間 窿吩, 確認(rèn)這個(gè)消費(fèi)者死亡之后才會觸發(fā)再均衡。在這一小段 時(shí)間內(nèi)错览, 死掉的消費(fèi)者井不會讀取分區(qū) 里的消息纫雁。這個(gè)一小段時(shí)間由 session . timeout.ms 參數(shù)控制,該參數(shù)的配置值必須在 broker 端參數(shù) group.m工n.sessio口.timeout.ms (默認(rèn)值為 6000倾哺,即 6 秒)和 group.max. session. timeout. ms (默認(rèn)值為 300000轧邪,即 5 分鐘)允許的范圍內(nèi)。
還有一個(gè)參數(shù) max.poll.interval.ms羞海,它用來指定使用消費(fèi)者組管理時(shí) poll()方法調(diào) 用之 間的 最大延遲 忌愚,也就是消費(fèi)者在獲取更多消息之前可以空閑的時(shí)間量的上限。如果 此超時(shí) 時(shí)間期滿之前 poll()沒有調(diào)用却邓, 則消費(fèi)者被視為失敗菜循,并且分組將重新平衡, 以便將分區(qū)重新分 配給別的成員申尤。
除了被動退 出消費(fèi)組癌幕,還可 以使用 Leav巳GroupRequest 請求主動退出消費(fèi)組,比如客戶端調(diào)用了 unsubscrible()方法取消對某些主題的訂閱昧穿,這個(gè)比較簡單勺远,這里就不再贅述了 。
7.3 _consumer_offsets 剖析
位移提交是使用消費(fèi)者客戶端過程中一個(gè)比較“講究”的操作时鸵, 3.2.5 節(jié)也 使用了較大的篇 幅來介紹它胶逢。位移提交的內(nèi)容最終會保存到 Kafka 的內(nèi)部主題 consumer offsets 中,對于主題
consumer offsets 的深度掌握也可以讓我們更好地理解和使用好位移提交饰潜。
一般情況下初坠,當(dāng)集群中第一次有消費(fèi)者消費(fèi)消息時(shí)會自動創(chuàng)建主題 consumer offsets,不 過它的副本因子還受 offsets.topic.replication.factor參數(shù)的約束彭雾,這個(gè)參數(shù)的默認(rèn)值為 3 (下載安 裝的包中此值可能為 1)碟刺,分區(qū)數(shù)可以通過 offsets.topic.num.partitions參數(shù)設(shè)置,默認(rèn)為 50薯酝“牍粒客 戶端提交消費(fèi)位移是使用爽柒。他etCommitRequest 請求實(shí)現(xiàn)的, OffsetCommitRequest 的結(jié)構(gòu)如圖 7-16 所示 者填。
如果已經(jīng)掌握了 6.1 節(jié)和] 7.2 節(jié)的內(nèi)容浩村,那么就很容易理解 OffsetCommitRequest 的結(jié)構(gòu) 。 請求體第一層中的 group id占哟、 generation_id 和 member_id 在前面的內(nèi)容中已經(jīng)介紹過 多次了心墅, retention time 表示當(dāng)前提交的消費(fèi)位移所能保留的時(shí)長,不過對于消費(fèi)者而言 這個(gè)值保持為 I榨乎。也就是說怎燥,按照 broker 端的配置 offsets . retention . minutes 來確定 保留時(shí)長 。 offsets . retention .minutes 的默認(rèn)值為 10080谬哀,即 7 天刺覆,超過這個(gè)時(shí)間后消 費(fèi)位移的信息就會被刪除(使用墓碑消息和日志壓縮策略) 严肪。 注意這個(gè)參數(shù)在 2.0.0 版本之前的 默認(rèn)值為 1440史煎,即 1 天,很多關(guān)于消費(fèi)位移的異常也是由這個(gè)參數(shù)的值配置不當(dāng)造成的 驳糯。 有些 定時(shí)消費(fèi)的任務(wù)在執(zhí)行完某次消費(fèi)任務(wù)之后保存了消費(fèi)位移篇梭,之后隔了一段時(shí)間再次執(zhí)行消費(fèi)任務(wù),如果這個(gè)問隔時(shí)間超過 offsets.retent工on.minutes 的配置值酝枢,那么原先的位移信 息就會丟失恬偷, 最后只能根據(jù)客戶端參數(shù) auto . offset.reset 來決定開始消費(fèi)的位置,遇到 這種情況時(shí)就需要根據(jù)實(shí)際情況來調(diào)配 offsets.retention.minutes 參數(shù) 的值 帘睦。
OffsetCommitRequest 中的其余字段大抵也是按照分區(qū)的粒度來劃分消費(fèi)位移的 : topic 表 示主題名稱袍患, partition 表示分區(qū)編號等。注意這里還有一個(gè) metadata 字段竣付。在 3.2.5 節(jié)中 講到手動位移提交時(shí)提到了可以通過 Map<TopicPartition, OffsetAndMetadata> offsets 參數(shù)來指 定要提交的分區(qū)位移
同消費(fèi)組的元數(shù)據(jù)信息 一樣诡延,最終提交的消費(fèi)位移也會以消息的形式發(fā)送至主題 _consumer_offsets,與消費(fèi)位移對應(yīng)的消息也只定義了 key 和 value 字段的具體內(nèi)容古胆,它不依 賴于具體版本的消息格式肆良,以此做到與具體的消息格式無關(guān) 。
圖 7”17 中展示了消費(fèi)位移對應(yīng)的消息內(nèi)容格式逸绎,上面是消息的 key惹恃,下面是消息的 value。
可以看到 key 和 value 中都包含了 version 宇段 棺牧,這個(gè)用來標(biāo)識具體的 key 和 value 的版本信 息巫糙,不同的版本對應(yīng)的內(nèi)容格式可能并不相同 。就目前版本 而 言 颊乘, key 和 value 的 version 值 都為 l曲秉。 key 中除了 version 字段還有 group采蚀、 topic、 partition 字段承二,分別表示消費(fèi)組 的 groupId榆鼠、 主題名稱和分區(qū)編號。雖然 key 中包含了 4 個(gè)字段亥鸠,但最終確定這條消息所要存儲 的分區(qū)還是根據(jù)單獨(dú)的 group 字段來計(jì)算的妆够,這樣就可以保證消費(fèi)位移信息與消費(fèi)組對應(yīng)的 GroupCoordinator 處于同 一個(gè) broker 節(jié)點(diǎn)上,省去了中間輪轉(zhuǎn)的開銷负蚊,這 一點(diǎn) 與消費(fèi)組的元數(shù) 據(jù)信息的存儲是一樣的 神妹。
value 中包含了 5 個(gè)字段,除 version 宇段外家妆,其余的 offset鸵荠、 metadata、 commit
timestamp伤极、 expire timestamp 宇段分別表示消費(fèi)位移蛹找、自定義的元數(shù)據(jù)信息、位移提交 到 Kafka 的時(shí)間戳哨坪、消費(fèi)位移被判定為超時(shí)的時(shí)間戳 庸疾。其 中 offset 和 metadata 與 OffsetCommitRequest 請求體中的 offset 和 metadata 對應(yīng),而 expire timestamp 和 OffsetCommitRequest 請求體中的 retention time 也有關(guān)聯(lián)当编, commit timestamp 值與 offsets . retention .minutes 參數(shù)值之和即為 expire_timestamp (默認(rèn)情況下)届慈。
在處理完消費(fèi)位移之后, Kafka返回 OffsetCommitResponse給客戶端 忿偷,OffsetCommitResponse 的結(jié)構(gòu)如圖 7-18 所示 金顿。 OffsetCornmitResponse 中各個(gè)域的具體含義可以通過前面內(nèi)容中推斷出 來,這里就不再贅述了 鲤桥。
我們可以通過kafka-console-consumer.sh腳本來查看 consumeroffsets中的內(nèi)容揍拆,不過要設(shè) 定 formatter 參數(shù)為 kafka.coordinator.group.GroupMetadataManager$0ffsetsMessageForrnatter。 假設(shè)我們要查看消費(fèi)組“consumerGroupid” 的位移提交信息 芜壁, 首先可 以根據(jù)代碼清單 7-1 中的 計(jì)算方式得出分區(qū)編號為 20礁凡, 然后查看這個(gè)分區(qū)中的消息,相關(guān)示例如下:
有時(shí)候在查看主題 consumer offsets 中的內(nèi)容時(shí)有可能出現(xiàn)下面這種情況:
[consumerGroupid, topic-offsets,21]: :null
這說明對應(yīng)的消費(fèi)位移己經(jīng)過期了 慧妄。在 Kafka 中有一個(gè)名為“ delete-expired-group-metadata” 的定時(shí)任務(wù)來負(fù)責(zé)清理過期的消費(fèi)位移顷牌,這個(gè)定時(shí)任務(wù)的執(zhí)行周期由參數(shù) offsets . retention.check.interval . ms 控制,默認(rèn)值為 600000塞淹,即 10 分鐘窟蓝。
事務(wù)
7.4.1 消息傳輸保障
一般而言,消息中間件的消息傳輸保障有 3個(gè)層級饱普,分別如下运挫。
(1)at most once:至多 一次状共。消息可能會丟失,但絕對不會重復(fù)傳輸谁帕。
(2)at least once: 最少一次峡继。消息絕不會丟失,但可能會重復(fù)傳輸匈挖。
(3) exactly once:恰好一次碾牌。每條消息肯定會被傳輸一次且僅傳輸一次。
Kafka 的消息傳輸保障機(jī)制非常直觀 儡循。 當(dāng)生產(chǎn)者向 Kafka 發(fā)送消息時(shí)舶吗,一旦消息被成功提 交到日志文件,由于多副本機(jī)制的存在择膝,這條消息就不會丟失誓琼。如果生產(chǎn)者發(fā)送消息到 Kafka 之后,遇到了網(wǎng)絡(luò)問題而造成通信中斷肴捉,那么生產(chǎn)者就無法判斷該消息是否己經(jīng)提交腹侣。雖然 Kafka 無法確定網(wǎng)絡(luò)故障期間發(fā)生了什么,但生產(chǎn)者可以進(jìn)行多次重試來確保消息已經(jīng)寫入 Kafka, 這個(gè)重試的過程中有可能會造成消息的重復(fù)寫入每庆,所以這里 Kafka 提供的消息傳輸保障為 at least once筐带。
對消費(fèi)者而言今穿,消費(fèi)者處理消息和提 交消費(fèi)位移 的順序在很大程度上決定了消費(fèi)者提供哪 一種消息傳輸保障 缤灵。 如果消費(fèi)者在拉取完消息之后 ,應(yīng)用邏輯先處理消息后提交消費(fèi)位移 蓝晒,那 么在消息處理之后且在位移提交之前消費(fèi)者看機(jī)了腮出,待它重新上線之后,會從上一次位移提交 的位置拉取芝薇,這樣就出現(xiàn)了重復(fù)消費(fèi)胚嘲,因?yàn)橛胁?分消息已經(jīng)處理過了只是還沒來得及提交消費(fèi) 位移,此時(shí)就對應(yīng) at least once洛二。如果消費(fèi)者在拉完消息之后馋劈,應(yīng)用邏輯先提交消費(fèi)位移后進(jìn)行消息處理,那么在位移提交之后且在消息處理完成之前消費(fèi)者巖機(jī)了晾嘶,待它重新上線之后妓雾,會 從己經(jīng)提交的位移處開始重新消費(fèi),但之前尚有部分消息未進(jìn)行消 費(fèi)垒迂,如此就會發(fā)生消 息丟失械姻, 此時(shí)就對應(yīng) atmost once。
Kafka 從 0.11.0.0 版本開始引 入了軍等和事務(wù)這兩個(gè)特性机断,以此來實(shí)現(xiàn) EOS ( exactly once semantics楷拳,精確 一次 處理語義) 绣夺。
7.4.2 幕等
所謂的幕等,簡單地說就是對接口的多次調(diào)用所產(chǎn)生的結(jié)果和調(diào)用 一次是一致 的 欢揖。生產(chǎn)者 在進(jìn)行重試的時(shí)候有可能會重復(fù)寫入消息陶耍,而使用 Kafka 的幕等性功能之后就可以避免這種情況。
開啟幕等性功能的方式很簡單她混,只需要顯式地將生產(chǎn)者客戶端參數(shù) enable.idempotence 設(shè)置為 true 即可(這個(gè)參數(shù)的默認(rèn)值為 false)物臂,參考如下:
properties .put(ProducerConfig .ENABLE_IDEMPOTENCE CONFIG, true);
或者
properties.put ("enable . idempotence ” , true);
不過如果要確保軍等性功能正常产上,還需要確保生產(chǎn)者客戶端的 retries 棵磷、 acks 、 max.in. flight.requests.per. connect工on 這幾個(gè)參數(shù)不被配置錯(cuò)晋涣。實(shí)際上在使用幕等 性功能的時(shí)候仪媒,用戶完全可以不用配置(也不建議配置)這幾個(gè) 參數(shù)。
如果用戶顯式地指定了 retries 參數(shù)谢鹊,那么這個(gè)參數(shù)的值必須大于 0算吩, 否則會報(bào)出 ConfigException:
如果用戶沒有顯式地指定 retries 參數(shù),那么 KafkaProducer 會將它置為 Integer.MAX_ VALUE佃扼。 同時(shí)還需要保證 max.in.flight.requests.per . connection 參數(shù)的值不能大 于 5 (這個(gè)參數(shù)的值默認(rèn)為 5偎巢, 在 2.2.1 節(jié)中有相關(guān)的介紹),否則也會報(bào)出 ConfigException:
如果用戶還顯式地指定了 acks 參數(shù)兼耀,那么還需要保證這個(gè)參數(shù) 的值為一1 (all)压昼,如果不 為 1 (這個(gè)參數(shù)的值默認(rèn)為 1' 2.3 節(jié)中有相關(guān)的介紹),那么 也會報(bào)出 ConfigException:
org.apache.kafka.common.config.ConfigException: Must set acks to all in order
to use the jdempotent producer . Otherwise we cannot guarantee idempotence .
如果用戶沒有顯式地指定這個(gè) 參數(shù) 瘤运,那么 KafkaProducer 會將它置為 1窍霞。 開啟幕等性功能之 后 ,生 產(chǎn)者就可以如同未開啟幕等 時(shí) 一樣發(fā)送消息了拯坟。
為了實(shí)現(xiàn)生產(chǎn)者的幕等性但金,Kafka為此引入了 producerid(以下簡稱 PID)和序列號(sequence number)這兩個(gè)概念,這兩個(gè)概念其實(shí)在 5.2.5 節(jié)中就講過郁季,分別對應(yīng) v2 版的日志格式中 RecordBatch 的 producer id 和 first seqe口ce 這兩個(gè)宇段(參考圖 5-7)冷溃。每個(gè)新的生產(chǎn) 者實(shí)例在初始化的時(shí)候都會被分配一個(gè) PID,這個(gè) PID 對用戶而言是完全透明的 梦裂。對于每個(gè) PID, 消息發(fā)送到的每一個(gè)分區(qū)都有對應(yīng)的序列號似枕,這些序列號從 0 開始單調(diào)遞增。生產(chǎn)者每發(fā)送一 條消息就會將<PID塞琼, 分區(qū)>對應(yīng)的序列號的值加 l菠净。
broker 端會在內(nèi)存中為 每一對<PID,分區(qū)>維護(hù)一個(gè)序列號。對于收到的每一條消息毅往,只有 當(dāng)它的序列號的值(SN new)比broker端中維護(hù)的對應(yīng)的序列號的值(SN old)大 1(即 SN new =SN old+1)時(shí)牵咙, broker才會接收它。 如果SN new<SN old+I攀唯, 那么說明消息被重復(fù)寫入洁桌, broker 可以直接將其丟棄 。 如果 SN new> SN old + l侯嘀,那么說明中間有數(shù)據(jù)尚未寫入另凌, 出現(xiàn)了 亂序,暗示可能有消息丟失戒幔,對應(yīng)的生產(chǎn)者會拋出 OutOfOrderSequenceException吠谢,這個(gè)異常是 一個(gè)嚴(yán)重的異常,后續(xù)的諸如 send()诗茎、 beginTransaction()工坊、 commitTransaction()等方法的調(diào)用都 會拋出 Illega!StateException 的異常 。
引入序列號來實(shí)現(xiàn)幕等也只 是針對每一對<PID敢订, 分區(qū)>而言的王污,也就是說, Kafka 的霖等只 能保證單個(gè)生產(chǎn)者會話( session)中單分區(qū)的事等 楚午。
ProducerRecord<String , String> record
=new ProducerRecord<>(topic, "key", ”msg” ) ;
producer . send(record) ;
producer . send (record ) ;
注意昭齐,上面示例中發(fā)送了兩條相同的消息,不過這僅僅是指消息 內(nèi) 容相同矾柜,但對 Kafka 而 言是兩條不同 的消息阱驾,因?yàn)闀檫@兩條消息分配不同的序列號 。 Kafka 并不會保證消息 內(nèi)容的 罪等把沼。
7.4.3 事務(wù)
軍等性并不能跨多個(gè)分區(qū)運(yùn)作啊易,而事務(wù)可以彌補(bǔ)這個(gè)缺陷吁伺。事務(wù)可以保證對多個(gè)分區(qū)寫操作的原子性饮睬。操作的原子性是指多個(gè)操作要么全部成功,要么全部失敗篮奄,不存在部分成功捆愁、部分失敗的可能。
對流式應(yīng)用( Stream Processing Applications )而 言 窟却, 一 個(gè)典型的 應(yīng)用模 式為“ consume- transform-produce” 昼丑。在這種模式下消費(fèi)和生產(chǎn) 并存: 應(yīng)用程序從某個(gè)主題中消費(fèi)消息 , 然后經(jīng) 過一系列轉(zhuǎn)換后寫入另一個(gè)主題 夸赫,消費(fèi)者可能在提交消費(fèi)位移的過程中出現(xiàn)問題而導(dǎo)致重復(fù)消 費(fèi)菩帝, 也有可能生產(chǎn)者重復(fù)生產(chǎn)消息 。 Kafka 中的事務(wù)可以使應(yīng)用程序?qū)⑾M(fèi)消息、生產(chǎn)消息 呼奢、 提交消費(fèi)位移當(dāng)作原子操作來處理宜雀,同時(shí)成功或失敗,即使該生產(chǎn)或消費(fèi)會跨多個(gè)分區(qū) 握础。
為了實(shí)現(xiàn)事務(wù)辐董,應(yīng)用程序必須提供唯一的 transactionalld,這個(gè) transactionalld 通過客戶端 參數(shù) transact工onal.id 來顯式設(shè)置禀综,參考如下 :
properties.put(ProducerConfig.TRANSACTIONAL ID CONFIG,”transactionId”);
或者:
properties .put (”transactional .id”,”transactionid”),
事務(wù)要求生產(chǎn)者開啟幕等特性简烘,因 此通過將 transactional . id 參數(shù)設(shè)置為非空從而開 啟事務(wù)特性的同時(shí) 需要將 enable.idempotence 設(shè)置為 true ( 如果未顯式設(shè)置 , 則 KafkaProducer 默認(rèn)會將它 的值設(shè) 置 為 true) ,如果用 戶顯式地將 enable . idempotence 設(shè)置 為 false云稚,則會報(bào)出 ConfigException:
transactionalld 與 PID 一一對應(yīng)纯露,兩者之間所不同的是 transactionalld 由用戶顯式設(shè)置, 而 PID是由 Kafka內(nèi)部分配的亥至。另外,為了保證新的生產(chǎn)者啟動后具有相同 transactionalld的舊生 產(chǎn)者能夠立即失效 贱迟,每個(gè)生產(chǎn)者通過 transactionalld 獲取 PID 的 同時(shí)姐扮,還會獲取一個(gè)單調(diào)遞增的 producer epoch (對應(yīng)下面 要講述 的 KafkaProducer.initTransactions()方法〉。如果使用 同 一 個(gè) transactionalld 開啟兩個(gè)生產(chǎn)者衣吠,那么前 一個(gè) 開啟的生產(chǎn)者會報(bào)出如下的錯(cuò)誤:
org . apache . kafka . common . errors . ProducerFencedExcept工on : Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalid, or the producer’s transaction has been expired by the broker.
從生產(chǎn)者 的角度分析茶敏,通過事務(wù) , Kafka 可以保證跨生產(chǎn)者會話的消息幕等發(fā)送缚俏,以及跨 生產(chǎn)者會話的事務(wù)恢復(fù) 惊搏。 前者表示具有相同 transactionalld 的新生產(chǎn)者實(shí)例被創(chuàng)建且工作的時(shí)候,舊的且擁有相同 transactionalld 的生產(chǎn)者實(shí)例將不再工作忧换。后者指當(dāng)某個(gè)生產(chǎn)者實(shí)例君機(jī)后恬惯, 新的生產(chǎn)者實(shí)例可以保證任何未完成的舊事務(wù)要么被提交( Commit),要么被中止( Abo時(shí))亚茬, 如此可以使新的生產(chǎn)者實(shí)例從一個(gè)正常的狀態(tài)開始工作酪耳。
而從消費(fèi)者的角度分析, 事務(wù)能保證的語義相對偏弱刹缝。出于以下原因碗暗, Kafka 并不能保證 己提交的事務(wù)中的所有消息都能夠被消 費(fèi) :
- 對采用日志壓縮策略的主題而言,事務(wù)中的某些消息有可能被清理(相同 key 的消息梢夯, 后寫入的消息會覆蓋前面寫入的消息)言疗。
- 事務(wù)中消息可能分布在同一個(gè)分區(qū)的多個(gè)日志分段( LogSegment)中,當(dāng)老的日志分 段被刪除時(shí)颂砸,對應(yīng)的消息可能會丟失噪奄。
- 消費(fèi)者可以通過 seekO方法訪問任意 offset 的消息死姚,從而可能遺漏事務(wù)中的部分消息。
- 消費(fèi)者在消費(fèi)時(shí)可能沒有分配到事務(wù)內(nèi)的所有分區(qū)勤篮,如 此它也就不能讀取事務(wù)中的所 有消息知允。
initTransactions()方法用來初始化事務(wù),這個(gè)方法能夠執(zhí)行的前提是配置了 transactionalld, 如果沒有則會報(bào)出 IllegalStateException:
beginTransaction()方法用來開啟 事務(wù): sendOffsetsToTransaction()方法為消費(fèi)者提供在事務(wù) 內(nèi)的位移提交 的操作; commitTransaction()方法用來提交事務(wù) : abortTransaction()方法用來中止 事務(wù) 叙谨,類似于事務(wù)回滾 温鸽。
package chapter7;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* 代碼清單7-2
*/
public class TransactionOnlySend {
public static final String topic = "topic-transaction";
public static final String brokerList = "localhost:9092";
public static final String transactionId = "transactionId";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
producer.initTransactions();
producer.beginTransaction();
try {
//處理業(yè)務(wù)邏輯并創(chuàng)建ProducerRecord
ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1");
producer.send(record1);
ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2");
producer.send(record2);
ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3");
producer.send(record3);
//處理一些其它邏輯
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.abortTransaction();
}
producer.close();
}
}
在消費(fèi)端有一個(gè)參數(shù) isolation.level,與事務(wù)有著莫大的關(guān)聯(lián)手负,這個(gè)參數(shù)的默認(rèn)值為“ read uncommitted”涤垫, 意思是說消費(fèi)端應(yīng)用可 以看到(消費(fèi)到)未提交的事務(wù), 當(dāng)然對于己提 交的事務(wù)也是可見的竟终。這個(gè)參數(shù)還可以設(shè)置為“ read committed”蝠猬,表示消費(fèi)端應(yīng)用不可以看到 尚未提交的事務(wù)內(nèi)的消息。舉個(gè)例子统捶,如果生產(chǎn)者開啟事務(wù)并向某個(gè)分區(qū)值發(fā)送 3 條消息 msgl 榆芦、 msg2 和 msg3,在執(zhí)行 commitTransaction()或 abortTransaction()方法前喘鸟,設(shè)置為“read_committed” 的消費(fèi)端應(yīng)用是消費(fèi)不到這些消息的匆绣,不過在 KafkaConsumer 內(nèi)部會緩存這些消息,直到生產(chǎn) 者執(zhí)行 commitTransaction()方法之后它才能將這些消息推送給消費(fèi)端應(yīng)用什黑。反之崎淳,如果生產(chǎn)者 執(zhí)行了 abortTransaction()方法,那么 KafkaConsumer 會將這些緩存的消息丟棄而不推送給消費(fèi) 端應(yīng)用愕把。
日志文件中除了普通的消息拣凹,還有 一種消息專門用來標(biāo)志 一個(gè)事務(wù) 的結(jié)束,它就是控制消息( Contro!Batch)恨豁∠担控制消息一共有兩種類型 : COMMIT 和 ABORT,分別用來表征事務(wù)己經(jīng) 成功提交或己經(jīng)被成功中止橘蜜。 KafkaConsumer 可以通過這個(gè)控制消息來判斷對應(yīng)的事務(wù)是被提 交了還是被中止了菊匿,然后結(jié)合參數(shù) isolation.level 配置的隔離級別來決定是否將相應(yīng)的消 息返回給消費(fèi)端應(yīng)用,如圖 7-19所示扮匠。注意 Contro!Batch對消費(fèi)端應(yīng)用不可見捧请,后面還會對它 有更加詳細(xì)的介紹。
本節(jié)開頭就提及了 consume-transform-produce 這種應(yīng)用模式 棒搜,這里還涉及在代碼清單 7-2 中尚未使用的 s巳ndOffsetsToTransaction()方法。該模式的具體結(jié)構(gòu)如圖 7-20 所示活箕。與此對應(yīng)的 應(yīng)用示例如代碼清單 7-3 所示力麸。
package chapter7;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.*;
/**
* 代碼清單7-3
*/
public class TransactionConsumeTransformProduce {
public static final String brokerList = "10.198.197.73:9092";
public static Properties getConsumerProperties(){
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
return props;
}
public static Properties getProducerProperties(){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
return props;
}
public static void main(String[] args) {
//初始化生產(chǎn)者和消費(fèi)者
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(getConsumerProperties());
consumer.subscribe(Collections.singletonList("topic-source"));
KafkaProducer<String, String> producer =
new KafkaProducer<>(getProducerProperties());
//初始化事務(wù)
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
if (!records.isEmpty()) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
//開啟事務(wù)
producer.beginTransaction();
try {
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords
= records.records(partition);
for (ConsumerRecord<String, String> record :
partitionRecords) {
//do some logical processing.
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("topic-sink", record.key(),
record.value());
//消費(fèi)-生產(chǎn)模型
producer.send(producerRecord);
}
long lastConsumedOffset = partitionRecords.
get(partitionRecords.size() - 1).offset();
offsets.put(partition,
new OffsetAndMetadata(lastConsumedOffset + 1));
}
//提交消費(fèi)位移
producer.sendOffsetsToTransaction(offsets,"groupId");
//提交事務(wù)
producer.commitTransaction();
} catch (ProducerFencedException e) {
//log the exception
//中止事務(wù)
producer.abortTransaction();
}
}
}
}
}
注意 : 在使用 KafkaConsumer 的時(shí) 候要將 enable .auto .commit 參數(shù)設(shè)置為 false,代碼里也不能手動提交消費(fèi)位移 。
為了實(shí)現(xiàn)事務(wù)的功能克蚂, Kafka還引入了事務(wù)協(xié)調(diào)器 CTransactionCoordinator)來負(fù)責(zé)處理事 務(wù)闺鲸,這一點(diǎn)可以類比一下組協(xié)調(diào)器( GroupCoordinator) 。每一個(gè)生產(chǎn)者都會被指派一個(gè)特定的 TransactionCoordinator埃叭,所有的事務(wù)邏輯包括分派 PID 等都是由 TransactionCoordinator 來負(fù)責(zé) 實(shí)施的摸恍。 TransactionCoordinator 會將事務(wù)狀態(tài)持久化到內(nèi)部主題 位ansaction state 中 。下面就以最復(fù)雜 的 consume-transform-produce 的流程 (參考圖 7-21 )為例來分析 Kafka 事務(wù)的 實(shí)現(xiàn)原 理赤屋。
1 查找 TransactionCoordinator
TransactionCoordinator 負(fù)責(zé)分配 PID 和管理事務(wù)立镶,因此生產(chǎn)者要做的第一件事情就是找出 對應(yīng)的 TransactionCoordinator 所在 的 broker 節(jié)點(diǎn) 。與查找 GroupCoordinator 節(jié)點(diǎn) 一樣 类早,也是通 過 FindCoordinatorRequest請求來實(shí)現(xiàn)的媚媒,只不過 FindCoordinatorRequest 中的 coordinator_ type 就由原來的 0變成了 1,由此來表示與事務(wù)相關(guān)聯(lián)(FindCoordinatorRequest請求的具體結(jié) 構(gòu)參考圖 7-5)涩僻。
Kafka 在收到 FindCoorinatorRequest 請求之后 缭召, 會根據(jù) coord工nator_key (也就是 transactionalld)查找對應(yīng)的TransactionCoordinator節(jié)點(diǎn)。如果找到逆日,則會返回其相對應(yīng)的node id嵌巷、 host 和 port 信息。具體查找 TransactionCoordinator 的方式是根據(jù) transactionalld 的 哈希值計(jì)算主 題 transaction state中的分區(qū)編號室抽, 具體算法如代碼清單 7-4所示晴竞。
代碼:青單 7-4 計(jì)算分區(qū)編號
Utils.abs(transactionalid.hashCode) % transactionTopicPartit工onCount
其中 transactionTopicPartitionCount 為主題一transaction_state 中的分區(qū)個(gè)數(shù) , 這 個(gè)可以通過 brok巳r端參數(shù) transaction.state.log.num.partitions 來配置狠半,默認(rèn)值為 50 噩死。
找到對應(yīng)的分區(qū)之后,再尋找此分區(qū) leader 副本所在 的 broker 節(jié)點(diǎn)神年,該 broker 節(jié)點(diǎn)即為這 個(gè) transactionalld對應(yīng)的 TransactionCoordinator節(jié)點(diǎn)已维。細(xì)心的讀者可以發(fā)現(xiàn),這一整套的邏輯和 查找 GroupCoordinator 的邏輯如出 一轍(參考 7.2.2 節(jié)) 已日。
- 獲取 PID
在找到 TransactionCoordinator 節(jié)點(diǎn)之后垛耳,就需要為當(dāng)前生產(chǎn)者分配一個(gè) PID 了 。凡是開啟 了罪等性功能的生產(chǎn)者都必須執(zhí)行這個(gè)操作飘千,不需要考慮該生 產(chǎn)者是否還開啟了事務(wù)堂鲜。生產(chǎn)者 獲取 PID 的操作是通過 InitProducerldRequest 請求來實(shí)現(xiàn)的, InitProducerldRequest 請求體結(jié)構(gòu) 如圖 7-22 所示护奈,其中 transactional id 表示 事務(wù) 的 transactiona!Id, transaction timeout ms 表示 TransactionCoordinaor等待事務(wù)狀態(tài)更新的超時(shí)時(shí)間缔莲,通過生產(chǎn)者客戶端參 數(shù) transact工on . timeout .ms 配置,默認(rèn)值為 60000霉旗。
保存 PID
生產(chǎn)者的 InitProducerldRequest請求會被發(fā)送給 TransactionCoordinator痴奏。 注意蛀骇,如果未開啟 事務(wù)特性 而 只開啟幕等特性 , 那么 InitProducerldRequest 請求可以發(fā)送給任意的 broker读拆。當(dāng) TransactionCoordinator 第一次收到包含該 transactiona!Id 的 InitProduc巳rldRequest 請求時(shí)擅憔,它會 把 transactiona!Id 和對應(yīng)的 PID 以消息(我們習(xí) 慣性地把這類消息稱為“事務(wù)日志消息”〉的形 式保存到主題 transaction state 中,如圖 7-21 步驟 2.1 所示 檐晕。這樣可以保證<transaction Id, PID> 的對應(yīng)關(guān)系被持久化暑诸,從而保證即使 TransactionCoordinator 右機(jī)該對應(yīng)關(guān)系也不會丟失 。 存儲 到主題 transaction state 中的具體內(nèi)容格式如圖 7-23 所示 辟灰。
其中 transaction status 包含 Empty(O)个榕、 Ongoing(l)、 PrepareCommit(2) 伞矩、 PrepareAbort(3)笛洛、 CompleteCommit(4)、 CompleteAbort(S)乃坤、 Dead(6)這 幾種狀態(tài) 苛让。在存入主題
transaction state 之前,事務(wù)日志消息同樣會根據(jù)單獨(dú)的 transactiona!Id 來計(jì)算要發(fā)送的分區(qū)湿诊, 算法同代碼清單 7-4 一樣狱杰。
與InitProducerldRequest 對應(yīng)的 InitProducerldResponse 響應(yīng)體結(jié)構(gòu)如圖 7-24 所示,
3. 開啟事務(wù)
通過 KafkaProduc町的 beginTransactionO方法可以開啟一個(gè)事務(wù)厅须, 調(diào)用該方法后仿畸,生產(chǎn)者本 地會標(biāo)記己經(jīng)開啟了 一個(gè)新的事務(wù) ,只有在生產(chǎn)者發(fā)送第一條消息之后 TransactionCoordinator 才會認(rèn)為該事務(wù) 己經(jīng)開啟 朗和。
4 . Consume-Transform-Produce
這個(gè)階段囊括了整個(gè)事務(wù)的數(shù)據(jù)處理過程错沽,其中還涉及多種請求
5. 提交或者中止事務(wù)
一旦數(shù)據(jù)被寫入成功,我 們 就可以調(diào)用 KafkaProducer 的 commitTransaction()方法或 abortTransaction()方法來結(jié)束當(dāng)前 的 事務(wù) 眶拉。