前面重點說了消費組的再平衡洪规,等待同步,穩(wěn)定狀態(tài)。還有一種狀態(tài)是離開狀態(tài)蹄胰。
消費者離開消費組的情況:消費者應用程序關(guān)閉或者消費者不訂閱主題了。此時協(xié)調(diào)者就不需要再在消費組中管理此消費者了奕翔。
離開消費組的方法:1裕寨,消費者取消心跳任務。2,發(fā)送離開組請求宾袜。
協(xié)調(diào)者處理離開組請求
協(xié)調(diào)者在處理離開組請求時捻艳,首先移除心跳檢測,然后將消費者從消費組元數(shù)據(jù)中移除试和。因為要操作消費組元數(shù)據(jù)讯泣,所以需要對消費組元數(shù)據(jù)進行加鎖。
消費組狀態(tài)是準備再平衡
準備再平衡時阅悍,一定有延遲操作對象存在好渠,并且還不能完成。這時如果有消費者選擇離開节视,有可能會使延遲操作對象完成拳锚,所以每次處理離開請求都會去嘗試完成延遲操作對象。
消費組狀態(tài)是等待同步
等待同步說明延遲操作已經(jīng)完成寻行,消費者已經(jīng)存在于消費組中霍掺,并且已經(jīng)收到了加入組響應。這時主消費者正在分配分區(qū)拌蜘,這時如果有消費者要離開杆烁,那么原本分配給這個離開的消費者的分區(qū)就沒有意義了。所以在等待同步狀態(tài)下處理離開請求简卧,要改變消費組的狀態(tài)為準備再平衡兔魂。讓其他消費者重新發(fā)送加入組請求。
消費組狀態(tài)是穩(wěn)定
穩(wěn)定狀態(tài)下的操作和等待同步一樣举娩。同樣原本分配給這個離開的消費者的分區(qū)必須分給消費組中其他的消費者析校。所以要改變消費組的狀態(tài)為準備再平衡。讓其他消費者重新發(fā)送加入組請求铜涉。
再平衡超時與會話超時
在協(xié)調(diào)者把消費組的狀態(tài)變?yōu)闇蕚湓倨胶獾臅r候智玻,會創(chuàng)建一個延遲操作對象。這個延遲操作對象會等待消費組中所有的消費者都重新發(fā)送加入組請求后才能完成芙代。這里就存在一個問題吊奢,如果某一個消費者一直不重新發(fā)送加入組請求,那么導致延遲操作對象一直都不會完成纹烹,協(xié)調(diào)者就一直不會發(fā)送加入組響應給消費者事甜。所以必須設(shè)置一個超時時間,讓過了超時時間后滔韵,延遲操作對象還不能完成逻谦,就強制完成。
延遲的加入組操作對象陪蜻,會選擇消費組中所有消費者會話超時時間的最大值邦马,作為延遲操作的超時時間。在過了超時時間后延遲操作對象會被強制完成。在完成延遲操作時滋将,協(xié)調(diào)者會找出那些沒有在規(guī)定時間內(nèi)重新發(fā)送加入組請求的消費者邻悬,將它們從消費組中移除。因為在完成延遲加入組操作對象時随闽,會發(fā)送加入組響應給消費組中所有的消費者父丰,所以要在事先移除掉超時未發(fā)送請求的消費者。
延遲的心跳
協(xié)調(diào)者返回加入組響應給消費者后掘宪,都會立即完成本次的延遲心跳蛾扇,并創(chuàng)建下一次延遲心跳(針對消費組中所有消費者都會完成延遲心跳)。延遲心跳是用來對各個消費者的監(jiān)控魏滚,檢查消費者是否存活镀首,它的超時時間是消費者的會話超時時間。延遲的心跳操作對象什么時候完成鼠次?外部依賴條件是協(xié)調(diào)者和消費者之間網(wǎng)絡(luò)通信更哄,不管是協(xié)調(diào)者處理消費者的各種請求,還是協(xié)調(diào)者發(fā)送給消費者的響應腥寇,都會去完成延遲心跳成翩,并創(chuàng)建下一次的延遲心跳。消費組的一次再平衡操作過程中赦役,協(xié)調(diào)者只會創(chuàng)建一個延遲的加入操作對象麻敌,并且會為每一個消費者都保存一個延遲心跳對象。延遲心跳的創(chuàng)建是在協(xié)調(diào)者發(fā)送加入組響應給消費者后扩劝,就會為每個消費者創(chuàng)建一個延遲心跳。消費者收到加入組響應后职辅,應該在會話時間內(nèi)及時發(fā)送同步組請求給協(xié)調(diào)者棒呛,因為這個時候在協(xié)調(diào)者側(cè)已經(jīng)創(chuàng)建了延遲心跳用來監(jiān)控消費者,如果沒有及時發(fā)送域携,那么協(xié)調(diào)者就會認為消費者故障簇秒,從而讓消費者離開消費組(按照離開消費組請求的邏輯處理)。
在處理同步組請求時秀鞭,有多個地方的調(diào)用可以去本次完成延遲心跳和創(chuàng)建下一次的延遲心跳趋观。
1,狀態(tài)為等待同步锋边,設(shè)置消費者元數(shù)據(jù)的回調(diào)方法后調(diào)用皱坛。針對一個非主消費者。
2豆巨,狀態(tài)為穩(wěn)定剩辟,在發(fā)送同步組響應給消費者后調(diào)用。針對一個消費者(發(fā)送同步請求不及時,但是未超時的非主消費者)贩猎。
3熊户,狀態(tài)為等待同步,收到主消費者的同步組請求吭服,給每個消費者發(fā)送同步組響應后調(diào)用嚷堡。針對消費組里面的所有已經(jīng)發(fā)送了同步組請求的消費者。
延遲心跳的嘗試完成方法的判斷條件是:消費者是否存活艇棕。判斷消費者是否存活有三種條件蝌戒,只要滿足其中一種,就認為消費者是存活的欠肾。
1瓶颠,消費者元數(shù)據(jù)中的awaitingJoinCallback回調(diào)方法不為空。
2刺桃,消費者元數(shù)據(jù)中的awaitingSyncCallback回調(diào)方法不為空粹淋。
3,消費者最近的心跳時間加上會話超時時間大于下一次心跳截止時間瑟慈。
延遲心跳的截止時間是在創(chuàng)建延遲心跳時指定的桃移,延遲心跳的創(chuàng)建是在完成上一次的延遲心跳操作之后創(chuàng)建下一次的延遲心跳。在完成了上一次的延遲心跳后葛碧,協(xié)調(diào)者會計算出下一次延遲心跳的截止時間借杰,并創(chuàng)建新的延遲心跳,延遲心跳創(chuàng)建后进泼,和延遲的加入一樣蔗衡,都會馬上嘗試去完成這個延遲心跳,但是如果是剛剛創(chuàng)建的延遲心跳就嘗試去完成是不會完成的乳绕。因為剛剛創(chuàng)建的延遲心跳的截止時間等于最新的時間加上會話超時時間绞惦。所以不會完成。
有三個地方會去完成延遲心跳并創(chuàng)建下一次的延遲心跳:
1洋措,協(xié)調(diào)者返回加入組響應給每個消費者后济蝉。
2,協(xié)調(diào)者處理消費者的同步組請求設(shè)置回調(diào)方法時菠发。
3王滤,協(xié)調(diào)者返回同步組響應給每個消費者后。
每次創(chuàng)建新的延遲心跳都會計算最新的截止時間滓鸠,如果沒有在下一次心跳截止時間之前完成延遲心跳并創(chuàng)建下一次的延遲心跳雁乡,那么延遲心跳就會超時,對應的消費者就可能被協(xié)調(diào)者從消費組中移除(協(xié)調(diào)者創(chuàng)建的每一個延遲心跳都和消費者一一對應糜俗,只要消費者存活蔗怠,都對應延遲緩存中的一個延遲心跳)墩弯。
為什么說過了超時時間可能被協(xié)調(diào)者清除喃?因為還有其他的兩個條件awaitingJoinCallback和awaitingSyncCallback寞射,只要滿足這兩個條件其中的一個渔工,就算是超時了也會認為消費者存活。為什么需要這樣設(shè)計喃桥温?設(shè)想一下這個場景引矩,在等待同步狀態(tài)下,有三個消費者(C1,C2,C3侵浸;C3是主消費者)旺韭。C1的新延遲心跳的截止時間為10秒,C2的新延遲心跳的截止時間為20秒掏觉,C3的新延遲心跳的截止時間為60秒区端。C1和C2都發(fā)送同步組請求設(shè)置回調(diào)方法:awaitingSyncCallback,完成舊的延遲心跳并創(chuàng)建了新的延遲心跳:C1的新延遲心跳的截止時間為20秒澳腹,C2的新延遲心跳的截止時間為40秒织盼,這個時候只會完成C1和C2的延遲心跳,C3的舊延遲心跳還存在截止時間為60秒酱塔,C3由于自身原因在分區(qū)分配時花費了比較久的時候沥邻,在45秒的時候才發(fā)送同步組請求,在這個時間點上按理說C1和C2早就應該超時被移除消費組了羊娃,如果被移除就是純粹的誤殺唐全。C1和C2其實這個時候正在等到協(xié)調(diào)者的同步組響應。所以如果awaitingSyncCallback不為空的話蕊玷,就算是超時了邮利,要認為消費者存活,上面的場景在收到C3主消費者的同步組請求后垃帅,返回同步組響應給所有的消費者延届,這是完成C1,C2,C3的延遲心跳并計算出下一次延遲心跳的截止時間創(chuàng)建新的延遲心跳。
協(xié)調(diào)者在處理消費者的加入組請求時挺智,會設(shè)置awaitingJoinCallback回調(diào)方法祷愉。但是設(shè)置之后不會去調(diào)用完成延遲心跳和創(chuàng)建下一次的延遲心跳窗宦。這里假如:C1和C2已經(jīng)在消費組里面了赦颇,必定也會有與之對應的延遲心跳,下一次心跳的截止時間時間為:C1:10秒赴涵,C2:20秒媒怯。新的消費者C3發(fā)送了加入組請求,那么C1和C2必須在心跳的截止時間內(nèi)重新發(fā)送加入組請求髓窜,C1馬上就發(fā)送了加入組請求扇苞,但是延遲加入操作對象不能完成欺殿,所以不會發(fā)送加入組響應給客戶端就不能去完成延遲心跳,10秒后鳖敷,C1的延遲心跳就超時了脖苏,按理說C1會被協(xié)調(diào)者移除消費組,但是由于協(xié)調(diào)者在處理C1的加入組請求是設(shè)置了awaitingJoinCallback回調(diào)方法定踱,所以C1不會被移除棍潘,認為是存活的。C2在15秒的時候發(fā)送了加入組請求崖媚,延遲加入操作可以完成亦歉,返回加入組響應給三個消費者,并更新下次心跳的截止時間為:C1:25秒畅哑,C2:35秒肴楷,C1:75秒。
當消費組的狀態(tài)變?yōu)榉€(wěn)定后荠呐,每個消費者都需要重新發(fā)送心跳給協(xié)調(diào)者赛蔫。
為什么在處理加入組請求時,不去完成延遲心跳喃直秆?按理說消費者能發(fā)送加入組請求濒募,就代表消費者存活?
消費者能發(fā)送加入組請求是能代表消費者存活圾结,但是現(xiàn)在協(xié)調(diào)者的處理再平衡狀態(tài)的時候瑰剃,認為消費組是不穩(wěn)定的,在消費組不穩(wěn)定的時候去設(shè)置心跳沒有多大意義(通過前面的分析也知道筝野,消費組在再平衡狀態(tài)中晌姚,多個消費者會發(fā)送多次的加入組請求,消費組才會最終穩(wěn)定)歇竟。所以在發(fā)送加入組響應給消費者后去完成心跳這個時候消費組中的所有消費者都發(fā)送了加入組請求挥唠,這樣用心跳去管理消費者才有意義。所以在處理加入組請求時焕议,不去完成延遲心跳是個很不錯的設(shè)計宝磨。
參考資料:
Kafka技術(shù)內(nèi)幕:圖文詳解Kafka源碼設(shè)計與實現(xiàn)