根據(jù)上一篇的內(nèi)容勉盅,可以知道connect的大致運(yùn)行邏輯洛二,但是在分布式模式下蜀踏,各種情況都變得復(fù)雜了摩渺。主要差異就是調(diào)度管理層由StandaloneHerder變成了DistributedHerder。DistributedHerder中有個(gè)線程错蝴,循環(huán)調(diào)用DistributedHerder#tick用于以下功能:
維持自身在分布式系統(tǒng)的連接
根據(jù)客戶端的各種restful請(qǐng)求執(zhí)行業(yè)務(wù)邏輯.
進(jìn)入一個(gè)漫長(zhǎng)的沉睡,期間洲愤,如果有下列兩種情形,將被喚醒:1.有客戶端請(qǐng)求進(jìn)入顷锰;2.在集群group中失聯(lián)柬赐。
下面對(duì)第一個(gè)功能的邏輯展開(kāi)來(lái):
1.找到本機(jī)和kafka集群中隨機(jī)最不活躍的節(jié)點(diǎn)發(fā)送獲取節(jié)點(diǎn)的請(qǐng)求,并嘗試連接這個(gè)獲取來(lái)的節(jié)點(diǎn)用作以后流程的通訊官紫。
2.將所有正在運(yùn)行的任務(wù)停止肛宋,因?yàn)檫M(jìn)入了當(dāng)前這個(gè)狀態(tài)州藕,一定不是平衡狀態(tài),所以需要把上一輪分配給自己的任務(wù)停止(優(yōu)化:這里可以將要停止的任務(wù)和以后分配到的任務(wù)做比較后再操作酝陈,這樣可以減少啟停任務(wù)的次數(shù),但是有一個(gè)難點(diǎn):需要處理好任務(wù)的時(shí)序慎框,比如這種場(chǎng)景:任務(wù)t在rebalance中由機(jī)器A轉(zhuǎn)移到了機(jī)器B。 在assign任務(wù)以后后添,各members認(rèn)為自己進(jìn)入了平衡狀態(tài),進(jìn)行任務(wù)的啟停薪丁,那么可能會(huì)有時(shí)刻存在任務(wù)t同時(shí)運(yùn)行于機(jī)器A和B遇西,造成數(shù)據(jù)的不一致,方法:借助zk的臨時(shí)節(jié)點(diǎn))严嗜。
3.根據(jù)自身所屬的group.id 向1中獲取到的節(jié)點(diǎn)發(fā)送加入集群信號(hào)粱檀。kafka返回的信息中包括:當(dāng)前紀(jì)元,leader的標(biāo)識(shí)漫玄,整個(gè)group的任務(wù)茄蚯。如果connect成為leader,那么會(huì)分配當(dāng)前的任務(wù)信息并序列化生成一個(gè)同步命令睦优,發(fā)給kafka渗常,follower也會(huì)發(fā),只不過(guò)沒(méi)有成員信息,kafka在返回的信息中會(huì)攜帶紀(jì)元汗盘、各自的被分配到的任務(wù)皱碘,這樣,最終所有成員的眼中的集群狀態(tài)就是一樣的,分配到任務(wù)以后隐孽,進(jìn)入進(jìn)入rebalencing狀態(tài)癌椿,各個(gè)member將分配到的connector實(shí)例和任務(wù)開(kāi)起來(lái),rebalance完成菱阵。
4.定期給kafka節(jié)點(diǎn)發(fā)送心跳踢俄,包含當(dāng)前紀(jì)元和groupId,memberId. 如果遇到異常,如超時(shí)晴及,紀(jì)元更新都办,集群狀態(tài)更新等,將會(huì)重新回到1開(kāi)始下一輪虑稼。
由上面的邏輯可知脆丁,kafka并不是一個(gè)存粹的消息服務(wù)了。還給kafka-connect做了一個(gè)類似zk的角色动雹。
下面對(duì)存疑的幾個(gè)point做的代碼走讀:
1.任務(wù)如何分配槽卫?
這是第三點(diǎn)的一個(gè)細(xì)節(jié),具體實(shí)現(xiàn)在WorkerCoordinator#performTaskAssignment
胰蝠,比較簡(jiǎn)單歼培,創(chuàng)建一個(gè)循環(huán)隊(duì)列裝members震蒋,循環(huán)給每個(gè)member分配connector實(shí)例和task。所以每個(gè)members都需要安裝集群中的所有plugins躲庄。
針對(duì)第二點(diǎn)查剖,為什么要在這個(gè)單線程中執(zhí)行業(yè)務(wù)邏輯呢?
為了保證snapshot的一致性噪窘,響應(yīng)客戶端的restful請(qǐng)求笋庄,post和put操作均會(huì)對(duì)snapshot造成影響,假設(shè)是由多個(gè)線程去執(zhí)行的話倔监,不好stop-the-world直砂。用單線程就很容易控制,抓取自身快照的時(shí)候浩习,由于沒(méi)有其他線程干擾静暂,必定是個(gè)安全點(diǎn)。
2.rebalance的觸發(fā)時(shí)機(jī)
節(jié)點(diǎn)加入/掉線/成員的狀態(tài)發(fā)生變化谱秽;增刪connector實(shí)例洽蛀;task的配置或者connector實(shí)例的配置發(fā)生變化。
3.我們知道任務(wù)是由集群中的leader上傳到kafka的疟赊。任務(wù)是從kafka中的configStorage中獲取郊供。具體是什么邏輯呢?
具體分為以下幾種topic來(lái)存儲(chǔ)kafka-connect的狀態(tài):
-
config.storage.topic:
status-connector-
開(kāi)頭的是connector實(shí)例的狀態(tài)
status-task-
開(kāi)頭的是未提交任務(wù)的狀態(tài)
commit-task-
開(kāi)頭的是已提交任務(wù)的狀態(tài),只有接收到已提交任務(wù)的記錄近哟,才會(huì)啟動(dòng)reConfigbalance. offset.storage.topic
存儲(chǔ)每個(gè)connector任務(wù)的offset配置颂碘。當(dāng)kafka-connect進(jìn)程剛起來(lái)時(shí),會(huì)去這個(gè)topic讀取全量數(shù)據(jù)椅挣。見(jiàn)KafkaBasedLog#readToLogEnd
(具體邏輯是獲取到topic的所有patition結(jié)束offset头岔,每次poll后,比對(duì)每個(gè)patition的實(shí)際消費(fèi)進(jìn)度和結(jié)束offset鼠证,如果相等峡竣,那么這個(gè)patition就消費(fèi)完了。循環(huán)處理至所有patition消費(fèi)完量九,那么說(shuō)明處理完畢).
把offset存儲(chǔ)在kafka有以下幾個(gè)缺點(diǎn):
1.不能刪除舊配置且管理人員不好觀測(cè)實(shí)際的offset(比如如果放在redis里适掰,就可以直接觀測(cè)了)。
2.慢荠列,笨重且有丟失分險(xiǎn)类浪,在程序運(yùn)行了一段時(shí)間以后,我們項(xiàng)目上的總的配置不到一千個(gè)肌似,但是topic實(shí)際的offset已經(jīng)突破三千萬(wàn)了费就。盡管該topic是采用的是compact方式來(lái)壓縮舊數(shù)據(jù),依舊poll了將近兩百萬(wàn)記錄川队,耗時(shí)兩分鐘力细。
- status.storage.topic
任務(wù)狀態(tài)的topic睬澡,忽略。
4.如何處理一個(gè)不屬于當(dāng)前節(jié)點(diǎn)的請(qǐng)求眠蚂?
kafka-connect中定義了一種異成反希基類RequestTargetException
,放了個(gè)重定向的url.倘若執(zhí)行指令的member發(fā)現(xiàn)這個(gè)請(qǐng)求不應(yīng)該由自己來(lái)執(zhí)行,會(huì)填充這個(gè)url,并拋出異常(這個(gè)步驟在tick
所在的線程執(zhí)行)逝慧。jetty的請(qǐng)求分發(fā)線程等待這個(gè)furture任務(wù)昔脯,遇到這些異常時(shí)會(huì)將請(qǐng)求轉(zhuǎn)發(fā)到對(duì)應(yīng)的集群節(jié)點(diǎn)。再將拿到的結(jié)果返回給客戶端笛臣。