紹圣--kafka之消費者(五)

消費者能發(fā)送拉取請求的前提條件是:1愉适,消費者已經(jīng)連接上了服務(wù)端協(xié)調(diào)者所在的節(jié)點犯助;2,消費者必須獲取到服務(wù)端協(xié)調(diào)者分配給此消費者的分區(qū)维咸。

消費者協(xié)調(diào)者和服務(wù)端的協(xié)調(diào)者之間是通過心跳來維持關(guān)系的:讓消費者能聯(lián)系上協(xié)調(diào)者剂买,讓協(xié)調(diào)者知道消費者的存在。但是當兩邊某一方出現(xiàn)問題的時候癌蓖,會發(fā)生什么瞬哼?

1,消費者沒有發(fā)送心跳(消費者發(fā)生故障)租副,協(xié)調(diào)者應(yīng)該知道有消費者離開了消費組坐慰,需要對消費組內(nèi)所有的消費者重新分配分區(qū)。

2附井,服務(wù)端協(xié)調(diào)者發(fā)生故障讨越,服務(wù)端會自己容錯選出一個新的協(xié)調(diào)者節(jié)點來管理消費組。消費者必須等待一定的時間重新詢問服務(wù)端是否選擇出新的協(xié)調(diào)者永毅,如果還沒有選出把跨,就再等一段時間再詢問。如果已經(jīng)選出新的協(xié)調(diào)者節(jié)點沼死,消費者必須重新與其建立連接着逐,并向協(xié)調(diào)者發(fā)送獲取分配的分區(qū)信息請求。

消費者為了獲取協(xié)調(diào)者分配的分區(qū)意蛀,每個消費者都要發(fā)送加入組請求給協(xié)調(diào)者耸别。

消費者加入消費組

消費者發(fā)送加入消費組請求的方法在:AbstractCoordinator.ensureActiveGroup(),消費者每次輪詢操作都會調(diào)用該方法县钥,但是并不意味著每次輪詢都會發(fā)送加入組請求秀姐。因為后續(xù)發(fā)送拉取請求必須有分區(qū),所以加入消費組請求必須采用阻塞的輪詢等待異步請求完成若贮。異步請求完成后將分配的分區(qū)結(jié)果設(shè)置到訂閱狀態(tài)中(SubscriptionState)省有。

AbstractCoordinator.ensureActiveGroup()

public void ensureActiveGroup() {

ensureCoordinatorReady(); // 確保連接上服務(wù)端協(xié)調(diào)者

startHeartbeatThreadIfNeeded(); // 啟動心跳發(fā)送線程(啟動并不一定立即發(fā)送心跳,滿足一定條件后才會發(fā)送心跳)

joinGroupIfNeeded(); // 發(fā)送加入組請求

}

void joinGroupIfNeeded() {

// 首先判斷是否需要重新加入消費組

// 再看上一次的加入請求完成否:異步請求對象是否為空痒留。不等于NULL表示異步請求未完成(異步請求完成后會對異步請求對象設(shè)置為空表示這次發(fā)送請求完成)

// while循環(huán)是為了確保一定要消費者加入消費組中:發(fā)送加入組請求是阻塞的,拿到異步請求的結(jié)果蠢沿,如果不成功伸头,就會進行循環(huán),加入組成功后設(shè)置needRejoin()為false

while (needRejoin() || rejoinIncomplete()) {

ensureCoordinatorReady();

// 初始為true舷蟀,執(zhí)行一次后更新為false恤磷,完成后又設(shè)置為true

if (needsJoinPrepare) {

// 如果是定義自動提交偏移量,那么在發(fā)送加入組請求之前必須提交本地保存的最新的偏移量 onJoinPrepare(generation.generationId, generation.memberId);

needsJoinPrepare = false;

}

// 初始化JoinGroup請求,并發(fā)送該請求野宜,future此異步請求是加入消費組在組合模式中新創(chuàng)建的異步請求

RequestFuture<ByteBuffer> future = initiateJoinGroup();

// 阻塞式的客戶端輪詢確保異步請求完成后才會返回

// 完成的概念是等待異步請求結(jié)果回來并調(diào)用callback中的方法扫步,才算是完成 client.poll(future);

// 重置“重新加入消費組是否完成”對象為空

resetJoinGroupFuture();

if (future.succeeded()) { // 加入組請求完成,這一步時,實際上同步組也已經(jīng)成功了 needsJoinPrepare = true;

// 完成加入

onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());

} else {

// 異步請求完成,但是有異常速缨,重新發(fā)送加入組請求

// 有異常時:while循環(huán)中needRejoin()返回true锌妻,rejoinIncomplete()返回false代乃,繼續(xù)執(zhí)行旬牲。

// 在while循環(huán)中initiateJoinGroup()在rejoinIncomplete()返回false的情況下,會重新發(fā)送加入組請求

RuntimeException exception = future.exception();

if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException) continue;

else if (!future.isRetriable())

throw exception; time.sleep(retryBackoffMs);

}

}

}

注意:initiateJoinGroup()返回的加入組請求的異步請求(組合模式中新創(chuàng)建的異步請求對象)搁吓。

疑問:什么情況下會出現(xiàn)needRejoin()為false原茅,rejoinIncomplete()為true的情況?

回答:在我看的kafka-0.10.1.0版本中堕仔,從代碼邏輯來看是不會出現(xiàn)以上情況的擂橘,因為client.poll(future)會一直阻塞直到異步請求對象完成。所以都會執(zhí)行resetJoinGroupFuture()重置邏輯摩骨。

加入組請求對象:JoinGroupRequest

public class JoinGroupRequest {

private final String groupId; //?消費組名稱

private final String memberId; //?消費者成員編號通贞;消費者初次加入消費組時;此值為:UNKNON_MEMBER恼五,協(xié)調(diào)者在處理每個消費者的加入組請求的時候昌罩,會為每個消費者指定唯一的消費者成員編號,在加入組響應(yīng)中返回給消費者灾馒;后面消費者再次發(fā)送加入組請求的時候茎用,memberId就是前面協(xié)調(diào)者分配的編號

private final String protocolType; //?協(xié)議類型 (消費者:consumer,連接器:connect)

private final List<ProtocolMetadata> groupProtocols;

/** * 協(xié)議元數(shù)據(jù) * */

public static class ProtocolMetadata {

private final String name; //?分區(qū)分配器的類名(PartitionAssignor兩種分區(qū)方式:RoundRobinAssignor循環(huán)睬罗,RangeAssignor范圍)

private final ByteBuffer metadata;? //?元數(shù)據(jù)內(nèi)容轨功。協(xié)議類型為消費者:那么元數(shù)據(jù)內(nèi)容是訂閱狀態(tài)對象其中包含:消費訂閱的topic

}

}

分區(qū)分配算法有兩種:循環(huán)(RoundRobinAssignor)和范圍(RangeAssignor),在調(diào)用assign()方法執(zhí)行分配算法時容达,必須要兩個參數(shù):partitionsPerTopic:Map<String, Integer>:有哪些topic這些topic有多少分區(qū)(<topic,3>)古涧,subscriptions:Map<String, List<String>>:消費者成員編號,訂閱的主題信息花盐。

PartitionAssignor

public interface PartitionAssignor {

Subscription subscription(Set<String> topics); //?每個消費者訂閱的主題列表

Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions); //?只有主消費者會調(diào)用assign()羡滑,其中subscriptions是所有消費者的訂閱信息

void onAssignment(Assignment assignment); //?分配到結(jié)果后的回調(diào)處理

String name();

/** * 消費者的訂閱信息圆米,即訂閱了哪些主題 */

class Subscription { private final List<String> topics; private final ByteBuffer userData;}

/** * 消費者的分配結(jié)果,即分配了哪些分區(qū) */

class Assignment { private final List<TopicPartition> partitions; private final ByteBuffer userData;}

}

分配方法返回值包含每個消費者的分配結(jié)果啄栓,分配結(jié)果是一個主題分區(qū)集合娄帖,表示分配給消費者的所有主題分區(qū)。

主消費者

在協(xié)調(diào)者收集完所有的消費者及其訂閱信息后昙楚,協(xié)調(diào)者并不執(zhí)行分區(qū)分配算法近速,而是交給其中一個消費者來執(zhí)行分區(qū)分配,選出的這個消費者叫主消費者(通常協(xié)調(diào)者會把第一個發(fā)送加入組請求的消費者選為主消費者堪旧,當主消費者掛掉后削葱,協(xié)調(diào)者再選擇下一個消費者作為主消費者)

使用主消費者來執(zhí)行分區(qū)分配算法而不是協(xié)調(diào)者本身來執(zhí)行,這樣可以減少協(xié)調(diào)者的負擔淳梦,但是也會增加消費者和協(xié)調(diào)者之間的通信次數(shù):主消費者完成分配后需要把結(jié)果同步回協(xié)調(diào)者析砸,然后協(xié)調(diào)者再把分配的結(jié)果發(fā)送給消費者包括主消費者。這樣會出現(xiàn)以下問題:

1爆袍,協(xié)調(diào)者如何選擇主消費者首繁?

2,主消費者失敗陨囊,協(xié)調(diào)者怎么處理弦疮?

3,協(xié)調(diào)者將所有的消費者以及訂閱信息蜘醋,通過加入組請求的響應(yīng)結(jié)果給主消費者胁塞,那么其他的發(fā)送加入組請求的消費者也應(yīng)該得到響應(yīng)結(jié)果,此時推送給其他消費者的響應(yīng)結(jié)果是什么喃压语?

4啸罢,在收到響應(yīng)結(jié)果中其實并不包含分區(qū)分配的結(jié)果(因為這是主消費者還沒有計算完成或者說還沒有把結(jié)果發(fā)送給協(xié)調(diào)者),這是消費者怎么來獲取分區(qū)分配結(jié)果喃胎食?

應(yīng)對:

1扰才,協(xié)調(diào)者會選擇第一個發(fā)送加入組請求的消費者作為主消費者。

2斥季,主消費者實質(zhì)就是一個普通消費者训桶,所以主消費者和協(xié)調(diào)者之間還是以心跳的方式來監(jiān)聽對方是否宕機。

3酣倾,協(xié)調(diào)者在收集完成所有的消費者以及訂閱信息后舵揭,主消費者收到的加入組響應(yīng)結(jié)果中會包含所有的消費者和訂閱信息來執(zhí)行分區(qū)分配。而非主消費者里面不包含這些躁锡。

4午绳,每個消費者收到加入組響應(yīng)后,都會發(fā)送同步組請求給協(xié)調(diào)者來獲取分區(qū)映之。主消費者在執(zhí)行完分區(qū)分配任務(wù)后才會發(fā)送同步組請求拦焚,非主消費者會立即發(fā)送同步組請求蜡坊。但是這時主消費者還沒有將分配的結(jié)果發(fā)送給協(xié)調(diào)者,這時非主消費者的同步組請求在服務(wù)費會被延遲處理赎败。協(xié)調(diào)者收到主消費者的同步請求后會將分配結(jié)果放在同步組請求響應(yīng)中秕衙,返回給所有的消費者。

加入組請求和同步組請求

由于消費者接收到的加入組請求響應(yīng)中沒有分區(qū)信息僵刮,所以不能直接完成加入組異步請求据忘,這時要求客戶端要發(fā)送同步組請求,但是如果加入組請求有異常搞糕,就不需要繼續(xù)發(fā)送同步組請求了勇吊。消費者這時需要重新發(fā)送加入組請求。非主消費者在收到加入組請求的響應(yīng)后會立即發(fā)送同步組請求給協(xié)調(diào)者窍仰,主消費者會執(zhí)行完分區(qū)分配后再發(fā)送同步組請求汉规。

注意:收到加入組請求的響應(yīng),調(diào)用加入組響應(yīng)處理器的回調(diào)方法只是表示收到結(jié)果驹吮,并不代表加入組的異步請求完成(如果有異常的話针史,就完成加入組的異步請對象)。收到同步組請求響應(yīng)結(jié)果钥屈,調(diào)用同步組響應(yīng)處理器回調(diào)方法悟民,這時同步組請求響應(yīng)結(jié)果包含了分配給消費者的分區(qū)信息,這時就可以完成同步組請求的異步請求篷就,并一起完成加入組異步請求。這時消費者就可以從加入組的異步請求的結(jié)果中獲取分區(qū)分配結(jié)果(代碼中組合使用了組合模式和鏈式模式)近忙。

加入組請求 同步組請求分解流程

發(fā)送加入組請求
收到加入組響應(yīng)的處理邏輯
收到同步組響應(yīng)結(jié)果的處理邏輯

疑問:

1竭业,為什么說加入組異步請求成功完成時,同步組異步請求也成功完成了及舍?

2未辆,加入組異步請求是在什么時候完成的?

通過以上的流程的分解可以知道锯玛,通過加入組請求異步請求的鏈接模式咐柜,將同步組異步請求的結(jié)果設(shè)置為加入組異步請求的結(jié)果,從而完成加入組的異步請求攘残。即消費者收到同步組響應(yīng)后會完成同步組的異步請求拙友,再完成加入組的異步請求。

總結(jié)一下以上流程:

1歼郭,每個消費者都向協(xié)調(diào)者發(fā)送加入組請求遗契,申請加入消費組。

2病曾,協(xié)調(diào)者接收每個消費者的加入組請求牍蜂,收集消費組的消費者成員漾根。

3,協(xié)調(diào)者選擇一個消費者作為主消費者(一般是第一個發(fā)送加入組請求的消費者)鲫竞。

4辐怕,協(xié)調(diào)者向發(fā)送加入組請求的每個消費者返回響應(yīng)結(jié)果,包含消費者成員信息和訂閱信息从绘。

5秘蛇,步驟三選出的主消費者做分區(qū)分配算法,并在計算完成后發(fā)送同步組請求給協(xié)調(diào)者顶考。

6赁还,非主消費者收到包含消費者成員信息和訂閱信息的響應(yīng)結(jié)果后不做計算,立即發(fā)送同步組請求給協(xié)調(diào)者驹沿。

7艘策,協(xié)調(diào)者收到主消費者發(fā)送的同步組請求后(其中包含分區(qū)分配結(jié)果),向每一個發(fā)送同步組請求的消費者組發(fā)生同步組響應(yīng)(包含每個消費者的分區(qū)結(jié)果)渊季。

加入組前的準備工作

消費者在加入組過程中會調(diào)用onJoinPrepare()朋蔫,表示在加入組之前要預(yù)處理一些事情:

1,執(zhí)行一次同步提交偏移量:把本地消費的最新偏移量提交到服務(wù)端却汉,這樣再平衡完成后驯妄,消費者從協(xié)調(diào)者得到的分區(qū)偏移量就是最新(該偏移量以前的消息都是已經(jīng)被消費過的)。

2合砂,觸發(fā)用戶自定義再平衡監(jiān)聽器:客戶端可以在再平衡發(fā)生時做一些額外的操作青扔,比如把偏移量保存到數(shù)據(jù)庫中等操作。

參考資料:

Kafka技術(shù)內(nèi)幕:圖文詳解Kafka源碼設(shè)計與實現(xiàn)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末翩伪,一起剝皮案震驚了整個濱河市微猖,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌缘屹,老刑警劉巖凛剥,帶你破解...
    沈念sama閱讀 217,907評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異轻姿,居然都是意外死亡犁珠,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評論 3 395
  • 文/潘曉璐 我一進店門互亮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來犁享,“玉大人,你說我怎么就攤上這事胳挎”恚” “怎么了?”我有些...
    開封第一講書人閱讀 164,298評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長窑眯。 經(jīng)常有香客問我屏积,道長,這世上最難降的妖魔是什么磅甩? 我笑而不...
    開封第一講書人閱讀 58,586評論 1 293
  • 正文 為了忘掉前任炊林,我火速辦了婚禮,結(jié)果婚禮上卷要,老公的妹妹穿的比我還像新娘渣聚。我一直安慰自己,他們只是感情好僧叉,可當我...
    茶點故事閱讀 67,633評論 6 392
  • 文/花漫 我一把揭開白布奕枝。 她就那樣靜靜地躺著,像睡著了一般瓶堕。 火紅的嫁衣襯著肌膚如雪隘道。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,488評論 1 302
  • 那天郎笆,我揣著相機與錄音谭梗,去河邊找鬼。 笑死宛蚓,一個胖子當著我的面吹牛激捏,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播凄吏,決...
    沈念sama閱讀 40,275評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼远舅,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了竞思?” 一聲冷哼從身側(cè)響起表谊,我...
    開封第一講書人閱讀 39,176評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎悔据,沒想到半個月后探越,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,619評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,819評論 3 336
  • 正文 我和宋清朗相戀三年撬呢,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鞠呈。...
    茶點故事閱讀 39,932評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡藐唠,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出爆土,到底是詐尸還是另有隱情椭懊,我是刑警寧澤,帶...
    沈念sama閱讀 35,655評論 5 346
  • 正文 年R本政府宣布步势,位于F島的核電站氧猬,受9級特大地震影響背犯,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜盅抚,卻給世界環(huán)境...
    茶點故事閱讀 41,265評論 3 329
  • 文/蒙蒙 一漠魏、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧妄均,春花似錦柱锹、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至邑彪,卻和暖如春瞧毙,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背锌蓄。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評論 1 269
  • 我被黑心中介騙來泰國打工升筏, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人瘸爽。 一個月前我還...
    沈念sama閱讀 48,095評論 3 370
  • 正文 我出身青樓您访,卻偏偏與公主長得像,于是被迫代替她去往敵國和親剪决。 傳聞我的和親對象是個殘疾皇子灵汪,可洞房花燭夜當晚...
    茶點故事閱讀 44,884評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 此篇開始進入kafka的另外一側(cè):消費者。kafka中的消費者比生產(chǎn)者要復(fù)雜的多柑潦,里面涉及到的消費組享言,偏移量等概念...
    紹圣閱讀 1,915評論 0 0
  • 消費者網(wǎng)絡(luò)客戶端輪詢:ConsumerNetworkClient。ConsumerNetworkClient是對N...
    紹圣閱讀 773評論 0 0
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理渗鬼,服務(wù)發(fā)現(xiàn)览露,斷路器,智...
    卡卡羅2017閱讀 134,656評論 18 139
  • 消費者輪詢通過拉取器(Fetcher)發(fā)送拉取請求譬胎,拉取器會調(diào)用消費者網(wǎng)絡(luò)客戶端的發(fā)送方法(send)和網(wǎng)絡(luò)輪詢方...
    紹圣閱讀 753評論 0 0
  • 原文地址 當kafka最初被創(chuàng)建的時候差牛,它附帶一個Scala的生產(chǎn)者和消費者客戶端。隨著時間的推移堰乔,我們逐漸意識到...
    明翼閱讀 6,619評論 0 9