消費者能發(fā)送拉取請求的前提條件是:1愉适,消費者已經(jīng)連接上了服務(wù)端協(xié)調(diào)者所在的節(jié)點犯助;2,消費者必須獲取到服務(wù)端協(xié)調(diào)者分配給此消費者的分區(qū)维咸。
消費者協(xié)調(diào)者和服務(wù)端的協(xié)調(diào)者之間是通過心跳來維持關(guān)系的:讓消費者能聯(lián)系上協(xié)調(diào)者剂买,讓協(xié)調(diào)者知道消費者的存在。但是當兩邊某一方出現(xiàn)問題的時候癌蓖,會發(fā)生什么瞬哼?
1,消費者沒有發(fā)送心跳(消費者發(fā)生故障)租副,協(xié)調(diào)者應(yīng)該知道有消費者離開了消費組坐慰,需要對消費組內(nèi)所有的消費者重新分配分區(qū)。
2附井,服務(wù)端協(xié)調(diào)者發(fā)生故障讨越,服務(wù)端會自己容錯選出一個新的協(xié)調(diào)者節(jié)點來管理消費組。消費者必須等待一定的時間重新詢問服務(wù)端是否選擇出新的協(xié)調(diào)者永毅,如果還沒有選出把跨,就再等一段時間再詢問。如果已經(jīng)選出新的協(xié)調(diào)者節(jié)點沼死,消費者必須重新與其建立連接着逐,并向協(xié)調(diào)者發(fā)送獲取分配的分區(qū)信息請求。
消費者為了獲取協(xié)調(diào)者分配的分區(qū)意蛀,每個消費者都要發(fā)送加入組請求給協(xié)調(diào)者耸别。
消費者加入消費組
消費者發(fā)送加入消費組請求的方法在:AbstractCoordinator.ensureActiveGroup(),消費者每次輪詢操作都會調(diào)用該方法县钥,但是并不意味著每次輪詢都會發(fā)送加入組請求秀姐。因為后續(xù)發(fā)送拉取請求必須有分區(qū),所以加入消費組請求必須采用阻塞的輪詢等待異步請求完成若贮。異步請求完成后將分配的分區(qū)結(jié)果設(shè)置到訂閱狀態(tài)中(SubscriptionState)省有。
AbstractCoordinator.ensureActiveGroup()
public void ensureActiveGroup() {
ensureCoordinatorReady(); // 確保連接上服務(wù)端協(xié)調(diào)者
startHeartbeatThreadIfNeeded(); // 啟動心跳發(fā)送線程(啟動并不一定立即發(fā)送心跳,滿足一定條件后才會發(fā)送心跳)
joinGroupIfNeeded(); // 發(fā)送加入組請求
}
void joinGroupIfNeeded() {
// 首先判斷是否需要重新加入消費組
// 再看上一次的加入請求完成否:異步請求對象是否為空痒留。不等于NULL表示異步請求未完成(異步請求完成后會對異步請求對象設(shè)置為空表示這次發(fā)送請求完成)
// while循環(huán)是為了確保一定要消費者加入消費組中:發(fā)送加入組請求是阻塞的,拿到異步請求的結(jié)果蠢沿,如果不成功伸头,就會進行循環(huán),加入組成功后設(shè)置needRejoin()為false
while (needRejoin() || rejoinIncomplete()) {
ensureCoordinatorReady();
// 初始為true舷蟀,執(zhí)行一次后更新為false恤磷,完成后又設(shè)置為true
if (needsJoinPrepare) {
// 如果是定義自動提交偏移量,那么在發(fā)送加入組請求之前必須提交本地保存的最新的偏移量 onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}
// 初始化JoinGroup請求,并發(fā)送該請求野宜,future此異步請求是加入消費組在組合模式中新創(chuàng)建的異步請求
RequestFuture<ByteBuffer> future = initiateJoinGroup();
// 阻塞式的客戶端輪詢確保異步請求完成后才會返回
// 完成的概念是等待異步請求結(jié)果回來并調(diào)用callback中的方法扫步,才算是完成 client.poll(future);
// 重置“重新加入消費組是否完成”對象為空
resetJoinGroupFuture();
if (future.succeeded()) { // 加入組請求完成,這一步時,實際上同步組也已經(jīng)成功了 needsJoinPrepare = true;
// 完成加入
onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
} else {
// 異步請求完成,但是有異常速缨,重新發(fā)送加入組請求
// 有異常時:while循環(huán)中needRejoin()返回true锌妻,rejoinIncomplete()返回false代乃,繼續(xù)執(zhí)行旬牲。
// 在while循環(huán)中initiateJoinGroup()在rejoinIncomplete()返回false的情況下,會重新發(fā)送加入組請求
RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException) continue;
else if (!future.isRetriable())
throw exception; time.sleep(retryBackoffMs);
}
}
}
注意:initiateJoinGroup()返回的加入組請求的異步請求(組合模式中新創(chuàng)建的異步請求對象)搁吓。
疑問:什么情況下會出現(xiàn)needRejoin()為false原茅,rejoinIncomplete()為true的情況?
回答:在我看的kafka-0.10.1.0版本中堕仔,從代碼邏輯來看是不會出現(xiàn)以上情況的擂橘,因為client.poll(future)會一直阻塞直到異步請求對象完成。所以都會執(zhí)行resetJoinGroupFuture()重置邏輯摩骨。
加入組請求對象:JoinGroupRequest
public class JoinGroupRequest {
private final String groupId; //?消費組名稱
private final String memberId; //?消費者成員編號通贞;消費者初次加入消費組時;此值為:UNKNON_MEMBER恼五,協(xié)調(diào)者在處理每個消費者的加入組請求的時候昌罩,會為每個消費者指定唯一的消費者成員編號,在加入組響應(yīng)中返回給消費者灾馒;后面消費者再次發(fā)送加入組請求的時候茎用,memberId就是前面協(xié)調(diào)者分配的編號
private final String protocolType; //?協(xié)議類型 (消費者:consumer,連接器:connect)
private final List<ProtocolMetadata> groupProtocols;
/** * 協(xié)議元數(shù)據(jù) * */
public static class ProtocolMetadata {
private final String name; //?分區(qū)分配器的類名(PartitionAssignor兩種分區(qū)方式:RoundRobinAssignor循環(huán)睬罗,RangeAssignor范圍)
private final ByteBuffer metadata;? //?元數(shù)據(jù)內(nèi)容轨功。協(xié)議類型為消費者:那么元數(shù)據(jù)內(nèi)容是訂閱狀態(tài)對象其中包含:消費訂閱的topic
}
}
分區(qū)分配算法有兩種:循環(huán)(RoundRobinAssignor)和范圍(RangeAssignor),在調(diào)用assign()方法執(zhí)行分配算法時容达,必須要兩個參數(shù):partitionsPerTopic:Map<String, Integer>:有哪些topic這些topic有多少分區(qū)(<topic,3>)古涧,subscriptions:Map<String, List<String>>:消費者成員編號,訂閱的主題信息花盐。
PartitionAssignor
public interface PartitionAssignor {
Subscription subscription(Set<String> topics); //?每個消費者訂閱的主題列表
Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions); //?只有主消費者會調(diào)用assign()羡滑,其中subscriptions是所有消費者的訂閱信息
void onAssignment(Assignment assignment); //?分配到結(jié)果后的回調(diào)處理
String name();
/** * 消費者的訂閱信息圆米,即訂閱了哪些主題 */
class Subscription { private final List<String> topics; private final ByteBuffer userData;}
/** * 消費者的分配結(jié)果,即分配了哪些分區(qū) */
class Assignment { private final List<TopicPartition> partitions; private final ByteBuffer userData;}
}
分配方法返回值包含每個消費者的分配結(jié)果啄栓,分配結(jié)果是一個主題分區(qū)集合娄帖,表示分配給消費者的所有主題分區(qū)。
主消費者
在協(xié)調(diào)者收集完所有的消費者及其訂閱信息后昙楚,協(xié)調(diào)者并不執(zhí)行分區(qū)分配算法近速,而是交給其中一個消費者來執(zhí)行分區(qū)分配,選出的這個消費者叫主消費者(通常協(xié)調(diào)者會把第一個發(fā)送加入組請求的消費者選為主消費者堪旧,當主消費者掛掉后削葱,協(xié)調(diào)者再選擇下一個消費者作為主消費者)
使用主消費者來執(zhí)行分區(qū)分配算法而不是協(xié)調(diào)者本身來執(zhí)行,這樣可以減少協(xié)調(diào)者的負擔淳梦,但是也會增加消費者和協(xié)調(diào)者之間的通信次數(shù):主消費者完成分配后需要把結(jié)果同步回協(xié)調(diào)者析砸,然后協(xié)調(diào)者再把分配的結(jié)果發(fā)送給消費者包括主消費者。這樣會出現(xiàn)以下問題:
1爆袍,協(xié)調(diào)者如何選擇主消費者首繁?
2,主消費者失敗陨囊,協(xié)調(diào)者怎么處理弦疮?
3,協(xié)調(diào)者將所有的消費者以及訂閱信息蜘醋,通過加入組請求的響應(yīng)結(jié)果給主消費者胁塞,那么其他的發(fā)送加入組請求的消費者也應(yīng)該得到響應(yīng)結(jié)果,此時推送給其他消費者的響應(yīng)結(jié)果是什么喃压语?
4啸罢,在收到響應(yīng)結(jié)果中其實并不包含分區(qū)分配的結(jié)果(因為這是主消費者還沒有計算完成或者說還沒有把結(jié)果發(fā)送給協(xié)調(diào)者),這是消費者怎么來獲取分區(qū)分配結(jié)果喃胎食?
應(yīng)對:
1扰才,協(xié)調(diào)者會選擇第一個發(fā)送加入組請求的消費者作為主消費者。
2斥季,主消費者實質(zhì)就是一個普通消費者训桶,所以主消費者和協(xié)調(diào)者之間還是以心跳的方式來監(jiān)聽對方是否宕機。
3酣倾,協(xié)調(diào)者在收集完成所有的消費者以及訂閱信息后舵揭,主消費者收到的加入組響應(yīng)結(jié)果中會包含所有的消費者和訂閱信息來執(zhí)行分區(qū)分配。而非主消費者里面不包含這些躁锡。
4午绳,每個消費者收到加入組響應(yīng)后,都會發(fā)送同步組請求給協(xié)調(diào)者來獲取分區(qū)映之。主消費者在執(zhí)行完分區(qū)分配任務(wù)后才會發(fā)送同步組請求拦焚,非主消費者會立即發(fā)送同步組請求蜡坊。但是這時主消費者還沒有將分配的結(jié)果發(fā)送給協(xié)調(diào)者,這時非主消費者的同步組請求在服務(wù)費會被延遲處理赎败。協(xié)調(diào)者收到主消費者的同步請求后會將分配結(jié)果放在同步組請求響應(yīng)中秕衙,返回給所有的消費者。
加入組請求和同步組請求
由于消費者接收到的加入組請求響應(yīng)中沒有分區(qū)信息僵刮,所以不能直接完成加入組異步請求据忘,這時要求客戶端要發(fā)送同步組請求,但是如果加入組請求有異常搞糕,就不需要繼續(xù)發(fā)送同步組請求了勇吊。消費者這時需要重新發(fā)送加入組請求。非主消費者在收到加入組請求的響應(yīng)后會立即發(fā)送同步組請求給協(xié)調(diào)者窍仰,主消費者會執(zhí)行完分區(qū)分配后再發(fā)送同步組請求汉规。
注意:收到加入組請求的響應(yīng),調(diào)用加入組響應(yīng)處理器的回調(diào)方法只是表示收到結(jié)果驹吮,并不代表加入組的異步請求完成(如果有異常的話针史,就完成加入組的異步請對象)。收到同步組請求響應(yīng)結(jié)果钥屈,調(diào)用同步組響應(yīng)處理器回調(diào)方法悟民,這時同步組請求響應(yīng)結(jié)果包含了分配給消費者的分區(qū)信息,這時就可以完成同步組請求的異步請求篷就,并一起完成加入組異步請求。這時消費者就可以從加入組的異步請求的結(jié)果中獲取分區(qū)分配結(jié)果(代碼中組合使用了組合模式和鏈式模式)近忙。
加入組請求 同步組請求分解流程
疑問:
1竭业,為什么說加入組異步請求成功完成時,同步組異步請求也成功完成了及舍?
2未辆,加入組異步請求是在什么時候完成的?
通過以上的流程的分解可以知道锯玛,通過加入組請求異步請求的鏈接模式咐柜,將同步組異步請求的結(jié)果設(shè)置為加入組異步請求的結(jié)果,從而完成加入組的異步請求攘残。即消費者收到同步組響應(yīng)后會完成同步組的異步請求拙友,再完成加入組的異步請求。
總結(jié)一下以上流程:
1歼郭,每個消費者都向協(xié)調(diào)者發(fā)送加入組請求遗契,申請加入消費組。
2病曾,協(xié)調(diào)者接收每個消費者的加入組請求牍蜂,收集消費組的消費者成員漾根。
3,協(xié)調(diào)者選擇一個消費者作為主消費者(一般是第一個發(fā)送加入組請求的消費者)鲫竞。
4辐怕,協(xié)調(diào)者向發(fā)送加入組請求的每個消費者返回響應(yīng)結(jié)果,包含消費者成員信息和訂閱信息从绘。
5秘蛇,步驟三選出的主消費者做分區(qū)分配算法,并在計算完成后發(fā)送同步組請求給協(xié)調(diào)者顶考。
6赁还,非主消費者收到包含消費者成員信息和訂閱信息的響應(yīng)結(jié)果后不做計算,立即發(fā)送同步組請求給協(xié)調(diào)者驹沿。
7艘策,協(xié)調(diào)者收到主消費者發(fā)送的同步組請求后(其中包含分區(qū)分配結(jié)果),向每一個發(fā)送同步組請求的消費者組發(fā)生同步組響應(yīng)(包含每個消費者的分區(qū)結(jié)果)渊季。
加入組前的準備工作
消費者在加入組過程中會調(diào)用onJoinPrepare()朋蔫,表示在加入組之前要預(yù)處理一些事情:
1,執(zhí)行一次同步提交偏移量:把本地消費的最新偏移量提交到服務(wù)端却汉,這樣再平衡完成后驯妄,消費者從協(xié)調(diào)者得到的分區(qū)偏移量就是最新(該偏移量以前的消息都是已經(jīng)被消費過的)。
2合砂,觸發(fā)用戶自定義再平衡監(jiān)聽器:客戶端可以在再平衡發(fā)生時做一些額外的操作青扔,比如把偏移量保存到數(shù)據(jù)庫中等操作。
參考資料:
Kafka技術(shù)內(nèi)幕:圖文詳解Kafka源碼設(shè)計與實現(xiàn)