一檬果、基本概念
1. 發(fā)布與訂閱消息系統(tǒng)
數(shù)據(jù)(消息)的發(fā)送者(發(fā)布者)不會(huì)直接把消息發(fā)送給接收者。發(fā)布者以某種方式對(duì)消息進(jìn)行分類(lèi),接收者(訂閱者)訂閱它們膘盖,以便接收特定類(lèi)型的消息转锈。發(fā)布與訂閱系統(tǒng)一般會(huì)有一個(gè)broker盘寡,即發(fā)布消息的中心點(diǎn)。
2. 消息和批次
Kafka的數(shù)據(jù)單元被稱(chēng)為消息撮慨,由字節(jié)數(shù)組組成竿痰,消息有一個(gè)可選的元數(shù)據(jù),也就是鍵砌溺。當(dāng)需要控制消息寫(xiě)入特定的分區(qū)時(shí)影涉,可以指定消息的鍵,最簡(jiǎn)單的例子是為鍵生成一個(gè)一致性散列值规伐,然后使用散列值對(duì)主題分區(qū)數(shù)進(jìn)行取模蟹倾,為消息選取分區(qū),這樣可以保證相同鍵的消息總是被寫(xiě)到相同的分區(qū)上猖闪。
為了提高效率鲜棠,消息被分批次寫(xiě)入到kafka。批次就是一組消息培慌,這些消息同屬于一個(gè)主題和分區(qū)豁陆。批次消息可以減少網(wǎng)絡(luò)開(kāi)銷(xiāo),也可以被壓縮检柬。
3. 主題和分區(qū)
kafka的消息通過(guò)主題進(jìn)行分類(lèi)献联。主題可以被分為若干個(gè)分區(qū),一個(gè)分區(qū)就是一個(gè)提交日志(Commit Log)何址。消息以追加的方式寫(xiě)入分區(qū)里逆,然后以先入先出(FIFO)的順序讀取。由于一個(gè)主題一般包含多個(gè)分區(qū)用爪,因此無(wú)法在整個(gè)主題范圍內(nèi)保證消息的順序原押,但可以保證消息在單個(gè)分區(qū)內(nèi)的順序。
Kafka通過(guò)分區(qū)來(lái)實(shí)現(xiàn)數(shù)據(jù)冗余和伸縮性偎血。分區(qū)可以分布在不同的物理服務(wù)器上诸衔。
4. 生產(chǎn)者和消費(fèi)者
生產(chǎn)者創(chuàng)建消息。一個(gè)消息會(huì)被發(fā)布到一個(gè)指定的主題上颇玷。生產(chǎn)者在默認(rèn)情況下(不指定消息的鍵)吧消息均衡地發(fā)布到主題的所有分區(qū)上笨农。也可以通過(guò)消息鍵和分區(qū)器來(lái)實(shí)現(xiàn)將消息直接寫(xiě)到指定的分區(qū),分區(qū)器為鍵生成一個(gè)散列值帖渠,并將其映射到指定的分區(qū)上谒亦。
消費(fèi)者讀取消息。消費(fèi)者訂閱一個(gè)或多個(gè)主題,并按照消息生成的順序讀取它們份招。消費(fèi)者通過(guò)檢查消息的偏移量來(lái)區(qū)分已經(jīng)讀過(guò)的消息切揭。
偏移量是消息的元數(shù)據(jù),是一個(gè)不斷遞增的整數(shù)值锁摔,在創(chuàng)建消息時(shí)廓旬,kafka會(huì)把它添加到消息里。在同一個(gè)分區(qū)里谐腰,每個(gè)消息的偏移量都是唯一的孕豹。消費(fèi)者把每個(gè)分區(qū)的消息偏移量保存在Zookeeper或kafka上。(節(jié)點(diǎn)路徑:/consumers/{group_id}/offsets/{topic}/{broker_id}-{partition_id}
)
消費(fèi)者是消費(fèi)者群組的一部分怔蚌,若干個(gè)消費(fèi)者共同讀取一個(gè)主題巩步。消費(fèi)者組保證每個(gè)分區(qū)只能被一個(gè)消費(fèi)者使用。如果一個(gè)消費(fèi)者失效桦踊,群組里的其他消費(fèi)者可以接管失效消費(fèi)者的工作椅野。消費(fèi)者組里的消費(fèi)者平均地讀取固定的分區(qū),多于分區(qū)數(shù)量的消費(fèi)者將會(huì)被閑置籍胯。
5. broker和集群
一個(gè)獨(dú)立的Kafka服務(wù)器被稱(chēng)為broker竟闪。broker接收來(lái)自生產(chǎn)者的消息,為消息設(shè)置偏移量杖狼,并提交消息到磁盤(pán)保存炼蛤。broker為消費(fèi)者提供服務(wù),對(duì)讀取分區(qū)的請(qǐng)求作出響應(yīng)蝶涩,返回已經(jīng)提交到磁盤(pán)上的消息理朋。
broker是集群的組成部分。每個(gè)集群都有一個(gè)broker同時(shí)充當(dāng)了集群控制器的角色(自動(dòng)從集群的活躍成員中選舉出來(lái))绿聘∷陨希控制器負(fù)責(zé)管理工作,包括將分區(qū)分配給broker和監(jiān)控broker熄攘。
在集群中兽愤,一個(gè)分區(qū)從屬與一個(gè)broker,該broker被稱(chēng)為分區(qū)的首領(lǐng)挪圾。一個(gè)分區(qū)可以分配給多個(gè)broker浅萧,這時(shí)候會(huì)發(fā)生分區(qū)復(fù)制。也就是說(shuō)哲思,一個(gè)主題的同一分區(qū)會(huì)存在于集群的所有broker上洼畅,其中一個(gè)活躍可用的分區(qū)作為首領(lǐng),其余的作為副本棚赔。如果有一個(gè)broker失效帝簇,其他broker可以接管領(lǐng)導(dǎo)權(quán)务热。
首領(lǐng)副本負(fù)責(zé)所有客戶(hù)端讀寫(xiě)操作(包括生產(chǎn)者和消費(fèi)者),跟隨者副本僅僅從首領(lǐng)副本同步數(shù)據(jù)己儒。當(dāng)首領(lǐng)副本出現(xiàn)故障是,跟隨者副本中的一個(gè)副本會(huì)被選擇為新的首領(lǐng)副本捆毫。
因?yàn)槊總€(gè)分區(qū)的副本中只有首領(lǐng)副本接收讀寫(xiě),所以每個(gè)服務(wù)端都會(huì)作為某些分區(qū)的首領(lǐng)副本绩卤,以及另外一些分區(qū)的跟隨者副本途样,這樣Kafka集群的所有服務(wù)端整體上對(duì)客戶(hù)端是負(fù)載均衡的
6. 消息模型
推送模型(Push)
基于推送模型的消息系統(tǒng),由消息代理(broker)記錄消費(fèi)者的消息狀態(tài)濒憋。消息代理在將消息推送到消費(fèi)者后何暇,將這條消息標(biāo)記為已消費(fèi),但這種方式無(wú)法很好地保證消息的處理語(yǔ)義
拉取模型(Pull)
拉取模型由消費(fèi)者自己記錄消費(fèi)狀態(tài)凛驮,每個(gè)消費(fèi)者互相獨(dú)立地順序讀取每個(gè)分區(qū)的消息裆站。消費(fèi)者能拉取的最大上限通過(guò)最高水位(watermark)控制,生產(chǎn)者最新寫(xiě)入的消息如果還沒(méi)有達(dá)到備份數(shù)量黔夭,對(duì)消費(fèi)者是不可見(jiàn)的
二宏胯、Kafka的設(shè)計(jì)與實(shí)現(xiàn)
1. 文件系統(tǒng)的持久化與數(shù)據(jù)傳輸效率
- 預(yù)讀
提前將一個(gè)比較大的磁盤(pán)塊讀入內(nèi)存 - 后寫(xiě)
將若干小的邏輯寫(xiě)操作合并成一個(gè)大的物理寫(xiě)操作 - 磁盤(pán)緩存
在內(nèi)存中盡量報(bào)錯(cuò)盡可能多的數(shù)據(jù),并在需要時(shí)將這些數(shù)據(jù)刷新到磁盤(pán) - 消息分組
用批量的方式一次發(fā)送一個(gè)消息組 - 壓縮消息集
-
零拷貝
使用零拷貝技術(shù)只需將磁盤(pán)文件的數(shù)據(jù)復(fù)制到頁(yè)面緩存中一次本姥,然后將數(shù)據(jù)從頁(yè)面緩存中直接發(fā)送到網(wǎng)絡(luò)(發(fā)送給不同的使用者時(shí)肩袍,都可以重復(fù)使用同一個(gè)頁(yè)面緩存),避免了重復(fù)的復(fù)制操作
傳統(tǒng)的數(shù)據(jù)復(fù)制方法和優(yōu)化的零拷貝
2. 生產(chǎn)者與消費(fèi)者
后面詳細(xì)敘述
3. 副本機(jī)制與容錯(cuò)處理
Kafka的副本機(jī)制會(huì)在多個(gè)broker上對(duì)每個(gè)主題分區(qū)的日志進(jìn)行復(fù)制婚惫。副本的單位是主題的分區(qū)氛赐,每個(gè)主題的每個(gè)分區(qū)都有一個(gè)首領(lǐng)副本以及任意個(gè)跟隨者副本。
- 首領(lǐng)副本
所有的讀寫(xiě)請(qǐng)求總是被路由到分區(qū)的首領(lǐng)副本上 - 跟隨者副本
跟隨者副本會(huì)和首領(lǐng)副本保持?jǐn)?shù)據(jù)同步先舷,在首領(lǐng)副本失效時(shí)替換為首領(lǐng)副本 - 節(jié)點(diǎn)存活
節(jié)點(diǎn)的存活定義的條件:1. 節(jié)點(diǎn)必須和Zookeeper保持會(huì)話(huà)艰管;2. 如果這個(gè)節(jié)點(diǎn)是某個(gè)分區(qū)的跟隨者副本,它必須對(duì)分區(qū)首領(lǐng)副本的寫(xiě)操作進(jìn)行復(fù)制密浑,并且復(fù)制的進(jìn)度不能落后太多 - ISR
滿(mǎn)足上述兩個(gè)條件被稱(chēng)為in-sync(正在同步中)蛙婴。每個(gè)分區(qū)的首領(lǐng)副本會(huì)跟蹤in-sync的跟隨者副本節(jié)點(diǎn)(In Sync Replicas,即ISR)尔破。如果一個(gè)跟隨者副本掛掉街图、沒(méi)有響應(yīng)或落后太多,首領(lǐng)副本就會(huì)將其從同步副本中移除懒构。反之餐济,如果跟隨者副本重新趕上首領(lǐng)副本,他就會(huì)加入到首領(lǐng)副本的同步集合中 - 確認(rèn)提交
一條消息只有被ISR集合中所有副本都保存到本地的日志文件中胆剧,才會(huì)被認(rèn)為是成功提交了絮姆。任何時(shí)刻醉冤,只要ISR至少有一個(gè)副本是存活的,Kafka就可以保證“一條消息一旦被提交篙悯,就不會(huì)丟失”蚁阳。只有已經(jīng)提交的消息才能被消費(fèi)者消費(fèi)。
三鸽照、Kafka生產(chǎn)者——向Kafka寫(xiě)入數(shù)據(jù)
1. kafka發(fā)送消息的主要步驟
ProducerRecord
需要包含目標(biāo)主題和發(fā)送的內(nèi)容螺捐,還可以指定鍵或分區(qū)。在發(fā)送ProducerRecord
對(duì)象時(shí)矮燎,生產(chǎn)者要先把鍵和值對(duì)象序列化成字節(jié)數(shù)組定血。
接下來(lái),數(shù)據(jù)被傳給分區(qū)器诞外。如果ProducerRecord
對(duì)象里指定了分區(qū)澜沟,直接返回指定的分區(qū),如果沒(méi)有則分區(qū)器會(huì)根據(jù)鍵和分區(qū)值來(lái)確定分區(qū)峡谊。然后這條記錄被添加到一個(gè)記錄批次里茫虽,這個(gè)批次里的所有消息會(huì)被發(fā)送到相同的主題和分區(qū)上。有一個(gè)獨(dú)立的線(xiàn)程負(fù)責(zé)發(fā)送批次消息到響應(yīng)的broker上既们。
broker收到消息后會(huì)返回一個(gè)響應(yīng)席噩。如果消息成功寫(xiě)入到Kafka,就返回一個(gè)RecordMetaData
對(duì)象贤壁,包含了主題悼枢、分區(qū)信息和分區(qū)中的偏移量。如果失敗則返回錯(cuò)誤脾拆,生產(chǎn)者收到錯(cuò)誤之后會(huì)嘗試重新發(fā)送消息馒索。
2. 發(fā)送消息的模式
- 發(fā)送并忘記
把消息發(fā)送給broker,但并不關(guān)心它是否被正常送達(dá)名船,不能保證消息發(fā)送的可靠性 - 同步發(fā)送
使用send()
方法發(fā)送消息绰上,它會(huì)返回一個(gè)Future
對(duì)象,調(diào)用 get() 方法進(jìn)行等待渠驼,來(lái)判斷消息是否成功發(fā)送 - 異步發(fā)送
使用send()
方法并制定回調(diào)函數(shù)蜈块,服務(wù)器在返回響應(yīng)時(shí)調(diào)用該函數(shù)
四、Kafka消費(fèi)者——從Kafka讀取數(shù)據(jù)
1. 消費(fèi)者和消費(fèi)者群組
Kafka消費(fèi)者從屬于消費(fèi)者群組迷扇,一個(gè)群組里的消費(fèi)者訂閱的是同一個(gè)主題百揭,每個(gè)消費(fèi)者接收主題一部分分區(qū)的消息。消費(fèi)者群組里的消費(fèi)者總是平均地讀取固定的分區(qū)蜓席。多余分區(qū)數(shù)量的消費(fèi)者將會(huì)閑置器一,不會(huì)收到任何消息。
一個(gè)主題可以被多個(gè)消費(fèi)者群組讀取厨内,這些消費(fèi)者群組之間不會(huì)相互影響祈秕;一個(gè)消費(fèi)者群組也可以訂閱多個(gè)主題渺贤。
2. 消費(fèi)者群組和分區(qū)再均衡
再均衡的含義:分區(qū)的所有權(quán)從一個(gè)消費(fèi)者轉(zhuǎn)移到另一個(gè)消費(fèi)者
分區(qū)會(huì)再均衡的情況:
- 消費(fèi)者加入群組
- 消費(fèi)者離開(kāi)群組
- 主題分區(qū)發(fā)生變化
消費(fèi)者通過(guò)向北指派為群組協(xié)調(diào)器的broker(不同的群組可以有不同的協(xié)調(diào)器)發(fā)送心跳來(lái)維持它們和群組的從屬關(guān)系一級(jí)它們對(duì)分區(qū)的所有權(quán)關(guān)系。只要消費(fèi)者以正常的頻率發(fā)送心跳请毛,就被認(rèn)為是活躍的志鞍,說(shuō)明它還在讀取分區(qū)里的消息。
消費(fèi)者會(huì)在輪詢(xún)消息或提交已讀取偏移量時(shí)發(fā)送心跳方仿。如果消費(fèi)者停止發(fā)送心跳的時(shí)間足夠長(zhǎng)述雾,會(huì)話(huà)就會(huì)過(guò)期,群組協(xié)調(diào)器就會(huì)認(rèn)為它已經(jīng)死亡兼丰,就會(huì)觸發(fā)一次再均衡。
3. 消息輪詢(xún)
消息輪詢(xún)是消費(fèi)者API的核心唆缴,通過(guò)一個(gè)簡(jiǎn)單的輪詢(xún)向服務(wù)器請(qǐng)求數(shù)據(jù)鳍征。一旦消費(fèi)者訂閱了主題,輪詢(xún)就會(huì)處理所有的細(xì)節(jié)面徽,包括群組協(xié)調(diào)艳丛、分區(qū)再均衡、發(fā)送心跳和獲取數(shù)據(jù)趟紊。
4. 提交和偏移量
poll()
方法總是返回由生產(chǎn)者寫(xiě)入到Kafka但還沒(méi)有被消費(fèi)者讀取過(guò)的記錄(偏移量)氮双。更新分區(qū)當(dāng)前位置(偏移量)的操作被稱(chēng)為提交。
消費(fèi)者向_consumer_offset
的特殊主體發(fā)送消息霎匈,消息是包含每個(gè)分區(qū)的偏移量戴差。如果消費(fèi)者發(fā)生崩潰或者有新的消費(fèi)者加入群組,就會(huì)觸發(fā)再均衡铛嘱,完成再均衡之后暖释,每個(gè)消費(fèi)者可能分配到新的分區(qū)。此時(shí)消費(fèi)者需要讀取每個(gè)分區(qū)最后一次提交的偏移量墨吓,然后從偏移量指定的位置繼續(xù)處理球匕。
提交的方式:
- 自動(dòng)提交
如果enable.auto.commit
設(shè)置為true
,每過(guò)auto.commit.internal.ms
的時(shí)間帖烘,消費(fèi)者會(huì)自動(dòng)把從poll()
方法接收到的最大偏移量提交上去亮曹。消費(fèi)者每次在進(jìn)行輪詢(xún)時(shí)會(huì)檢查是否該提交偏移量了,如果是則提交上一次輪詢(xún)獲取到的偏移量 - 提交當(dāng)前偏移量
把enable.auto.commit
設(shè)為false
秘症,讓消費(fèi)者決定何時(shí)提交偏移量照卦。使用commitSync()
會(huì)提交由poll()
獲取的最新偏移量,提交成功后馬上返回乡摹,若失敗則拋出異常窄瘟。手動(dòng)提交再broker對(duì)提交請(qǐng)求作出回應(yīng)之前,應(yīng)用程序會(huì)一直阻塞 - 異步提交
在成功提交或碰到無(wú)法恢復(fù)的錯(cuò)誤之前趟卸,commitSync()
會(huì)一直重試蹄葱,但是commitAsync
不會(huì)氏义,之所以不會(huì)是因?yàn)樗盏椒?wù)器響應(yīng)的時(shí)候,可能有一個(gè)更大的偏移量已經(jīng)提交成功 - 同步和異步組合提交
- 提交指定的偏移量
每個(gè)分區(qū)都一個(gè)有序图云、不可變的記錄序列惯悠,新的消息會(huì)不斷追加到提交日志(commit log)。分區(qū)中的每條消息都會(huì)按照時(shí)間順序分配到一個(gè)單調(diào)遞增的順序編號(hào)竣况,叫做偏移量(offset)克婶,這個(gè)偏移量可以唯一確定當(dāng)前分區(qū)的任意一條消息
5. 再均衡監(jiān)聽(tīng)器
ConsumerRebalanceListener
,用于監(jiān)聽(tīng)再均衡事件并處理
6. 從指定偏移量處開(kāi)始處理記錄
- 從分區(qū)的起始位置開(kāi)始讀取消息
seekToBeginning(Collection<TopicPartition> tp)
- 從分區(qū)的末尾開(kāi)始讀取消息
seekToEnd(Collection<TopicPartition> tp)
7. 退出
如果確定要退出循環(huán)丹泉,需要通過(guò)另一個(gè)線(xiàn)程調(diào)用Consumer#wakeUp()
方法情萤;如果循環(huán)運(yùn)行在主線(xiàn)程里,可以在Runtime#addShutdownHook(Thread)
里調(diào)用該方法摹恨。Consumer#wakeUp()
是消費(fèi)者唯一一個(gè)可以從其他線(xiàn)程里安全調(diào)用的方法筋岛,該方法被調(diào)用可以退出poll()
,并拋出WakeupException
異常晒哄,如果線(xiàn)程沒(méi)有等待輪詢(xún)睁宰,那么異常將在下一次調(diào)用poll()
時(shí)拋出
在退出線(xiàn)程之前有必要調(diào)用Consumer#close()
,該方法會(huì)提交任何沒(méi)有提交的內(nèi)容寝凌,并向群組協(xié)調(diào)器發(fā)送消息告知其自己要離開(kāi)群組柒傻,接下來(lái)就會(huì)觸發(fā)再均衡,而不需要等待會(huì)話(huà)超時(shí)
8. 序列化與反序列化
生產(chǎn)者要用序列化器把對(duì)象轉(zhuǎn)換成字節(jié)數(shù)組再發(fā)送給Kafka较木,消費(fèi)者需要用反序列化器把從Kafka接收到的字節(jié)數(shù)組轉(zhuǎn)換成Java對(duì)象
四红符、深入Kafka
1. 集群成員關(guān)系
Kafka使用Zookeeper來(lái)維護(hù)集群成員的信息。每個(gè)broker都有一個(gè)唯一標(biāo)識(shí)符伐债,可以在配置文件指定违孝,也可以自動(dòng)生成。不能啟動(dòng)另一個(gè)存在相同ID的broker
在broker啟動(dòng)的時(shí)候泳赋,它通過(guò)創(chuàng)建臨時(shí)節(jié)點(diǎn)把自己的ID注冊(cè)到Zookeeper雌桑。Kafka組件訂閱Zookeeper的/broker/ids
路徑(broker在Zookeeper上的注冊(cè)路徑),當(dāng)有broker加入或退出集群時(shí)祖今,這些組件就會(huì)被通知校坑。
當(dāng)broker停機(jī)、出現(xiàn)網(wǎng)絡(luò)分區(qū)或長(zhǎng)時(shí)間垃圾回收停頓時(shí)千诬,broker會(huì)從Zookeeper上斷開(kāi)連接耍目,此時(shí)broker在啟動(dòng)時(shí)創(chuàng)建的臨時(shí)節(jié)點(diǎn)會(huì)自動(dòng)從Zookeeper上被移除。監(jiān)聽(tīng)borker列表的Kafka組件會(huì)被告知該broker已被移除
2. 控制器
控制器的產(chǎn)生
控制器是一個(gè)broker徐绑,除了具有普通broker的功能之外邪驮,還負(fù)責(zé)分區(qū)首領(lǐng)的選舉。集群里第一個(gè)啟動(dòng)的broker通過(guò)在Zookeeper里創(chuàng)建一個(gè)路徑為/controller
的臨時(shí)節(jié)點(diǎn)讓自己成為控制器傲茄。其他broker在啟動(dòng)的時(shí)候也會(huì)嘗試創(chuàng)建這個(gè)節(jié)點(diǎn)并失敗毅访,并在控制器節(jié)點(diǎn)上創(chuàng)建Zookeeper Watch對(duì)象用來(lái)接收控制器變更通知
如果控制器被關(guān)閉或者與Zookeeper斷開(kāi)連接沮榜,/controller
節(jié)點(diǎn)會(huì)被刪除。集群中的其他broker通過(guò)Watch對(duì)象得到控制器節(jié)點(diǎn)斷開(kāi)的通知喻粹,并嘗試讓自己成為新的控制器蟆融,非控制器broker重復(fù)上述過(guò)程
每個(gè)新選出的控制器通過(guò)Zookeeper的條件遞增操作獲得一個(gè)全新的值更大的controller epoch,其他broker在知道當(dāng)前controller epoch之后守呜,會(huì)忽略含有舊epoch的消息
分區(qū)首領(lǐng)的產(chǎn)生
當(dāng)控制器發(fā)現(xiàn)broker離開(kāi)集群(觀(guān)察相關(guān)Zookeeper路徑)型酥,它就知道,那些首領(lǐng)在這個(gè)broker上的分區(qū)需要一個(gè)新的首領(lǐng)查乒∶趾恚控制器遍歷這些分區(qū),并確定誰(shuí)應(yīng)該成為新首領(lǐng)(分區(qū)副本列表的下一個(gè)副本)玛迄,然后向所有包含新首領(lǐng)或現(xiàn)有跟隨者的broker發(fā)送請(qǐng)求由境,該請(qǐng)求消息包含了誰(shuí)是新首領(lǐng)以及誰(shuí)是分區(qū)跟隨者的信息。隨后憔晒,新首領(lǐng)愛(ài)是處理來(lái)自生產(chǎn)者和消費(fèi)者的請(qǐng)求,而跟隨者開(kāi)始從首領(lǐng)那里復(fù)制消息
3. 復(fù)制
復(fù)制功能是Kafka架構(gòu)的核心蔑舞,因?yàn)樗梢栽趥€(gè)別節(jié)點(diǎn)失效時(shí)仍能保證Kafka的可用性和持久性
Kafka使用主題來(lái)組織數(shù)據(jù)拒担,每個(gè)主題被分為若干個(gè)內(nèi)容不同的分區(qū),每個(gè)分區(qū)有多個(gè)內(nèi)容相同的副本
副本有兩種類(lèi)型:
- 首領(lǐng)副本
每個(gè)分區(qū)都有一個(gè)首領(lǐng)副本攻询,所有的生產(chǎn)者的寫(xiě)請(qǐng)求和消費(fèi)者的讀請(qǐng)求都會(huì)在首領(lǐng)副本上操作从撼。首領(lǐng)的另一個(gè)任務(wù)是了解哪個(gè)跟隨者副本是跟自己保持一致的。 - 跟隨者副本
首領(lǐng)以外的副本都是跟隨者副本钧栖。跟隨者副本不處理來(lái)自客戶(hù)端的請(qǐng)求低零,唯一的任務(wù)就是從首領(lǐng)復(fù)制消息,保持與首領(lǐng)一致的狀態(tài)拯杠。持續(xù)請(qǐng)求得到的最新消息副本被稱(chēng)為同步的副本(ISR)掏婶。在首領(lǐng)發(fā)生失效時(shí),只有同步副本才有可能成為新首領(lǐng)
如果跟隨者在指定時(shí)間內(nèi)沒(méi)有請(qǐng)求任何消息潭陪,或者雖然在請(qǐng)求消息雄妥,但是沒(méi)有請(qǐng)求最新的消息,那么它就不是同步的依溯。如果一個(gè)副本無(wú)法與首領(lǐng)保持一致老厌,在首領(lǐng)發(fā)生失效時(shí),它不能成為新首領(lǐng)
4. 處理請(qǐng)求
Kafka broker處理請(qǐng)求的過(guò)程如圖
- Acceptor線(xiàn)程
負(fù)責(zé)監(jiān)聽(tīng)端口并創(chuàng)建連接黎炉,并將連接交給Processor線(xiàn)程處理 - Processor線(xiàn)程
負(fù)責(zé)從客戶(hù)端獲取請(qǐng)求消息枝秤,并放入請(qǐng)求隊(duì)列,然后從響應(yīng)隊(duì)列獲取響應(yīng)消息慷嗜,把它們發(fā)送給客戶(hù)端淀弹。線(xiàn)程數(shù)量可以配置 - IO線(xiàn)程
負(fù)責(zé)處理請(qǐng)求丹壕,并產(chǎn)生響應(yīng)
主要請(qǐng)求類(lèi)型
- 生產(chǎn)請(qǐng)求
- 消費(fèi)請(qǐng)求
- 元數(shù)據(jù)請(qǐng)求
- 其他類(lèi)型
生產(chǎn)請(qǐng)求和獲取請(qǐng)求都必須發(fā)送給分區(qū)的首領(lǐng)副本。如果broker收到一個(gè)指定分區(qū)的請(qǐng)求垦页,而該分區(qū)的首領(lǐng)不在此broker雀费,那么broker會(huì)響應(yīng)“非分區(qū)首領(lǐng)”的錯(cuò)誤。Kafka客戶(hù)端負(fù)責(zé)把生產(chǎn)請(qǐng)求和獲取請(qǐng)求發(fā)送到正確的broker上
元數(shù)據(jù)請(qǐng)求包含了客戶(hù)端感興趣的主題列表痊焊,服務(wù)端的響應(yīng)信息里指明了這些主題包含的分區(qū)盏袄、每個(gè)分區(qū)都有哪些副本,以及哪個(gè)副本是首領(lǐng)
5. 物理存儲(chǔ)
Kafka的基本存儲(chǔ)單元是分區(qū)薄啥,在配置Kafka時(shí)辕羽,管理員指定了一個(gè)用于存儲(chǔ)分區(qū)的目錄清單log.dirs
。在創(chuàng)建主題時(shí)垄惧,Kafka首先會(huì)決定如何在broker間分配分區(qū)刁愿,分區(qū)要達(dá)到以下目標(biāo):
- 在broker間平均地分布分區(qū)副本
- 確保每個(gè)分區(qū)的每個(gè)副本分布在不同的broker上
- 如果為broker指定了機(jī)架信息,那么盡可能地把每個(gè)分區(qū)的副本分配到不同機(jī)架的broker上
五到逊、可靠數(shù)據(jù)傳遞
1. Kafka的可靠性保證
- 順序保證:
保證分區(qū)消息的順序铣口。如果使用同一個(gè)生產(chǎn)者往同一個(gè)分區(qū)寫(xiě)入消息,而且消息B在消息A之后寫(xiě)入觉壶,那么Kafka可以保證消息B的偏移量比消息A的偏移量大脑题,而且消費(fèi)者會(huì)先讀取消息A后讀取消息B - 提交確認(rèn):
只有當(dāng)消息被寫(xiě)入到分區(qū)的所有同步副本時(shí)(但不一定要寫(xiě)入磁盤(pán)),它才被認(rèn)為是“提交”的铜靶。生產(chǎn)者可以選擇接受不同類(lèi)型的確認(rèn):消息被完全提交時(shí)的確認(rèn)叔遂、消息寫(xiě)入首領(lǐng)副本時(shí)的確認(rèn)、消息被發(fā)送到網(wǎng)絡(luò)時(shí)的確認(rèn) - 消息不丟:
只要還有一個(gè)副本是活躍的争剿,那么已經(jīng)提交的消息就不會(huì)丟失 - 提交可見(jiàn):
消費(fèi)者只能讀取已經(jīng)提交的消息
2. 復(fù)制
Kafka的主題被分為多個(gè)分區(qū)已艰,分區(qū)是基本的數(shù)據(jù)塊。分區(qū)存儲(chǔ)在單個(gè)磁盤(pán)上蚕苇,Kafka可以保證分區(qū)里的事件是有序的哩掺。分區(qū)可以在線(xiàn),也可以離線(xiàn)(不可用)涩笤。
每個(gè)分區(qū)可以有多個(gè)副本疮丛,其中一個(gè)是首領(lǐng)。所有的消息都是直接發(fā)送給首領(lǐng)副本辆它,或者直接從首領(lǐng)副本讀取消息誊薄。其他分區(qū)只需要與首領(lǐng)副本保持同步,并及時(shí)復(fù)制最新的消息锰茉。當(dāng)首領(lǐng)副本不可用時(shí)呢蔫,分區(qū)其他任一同步副本將成為新首領(lǐng)。
同步副本需要滿(mǎn)足以下條件:
- 與Zookeeper之間有一個(gè)活躍的會(huì)話(huà),在過(guò)去6s(可配置)內(nèi)向其發(fā)送過(guò)心跳
- 過(guò)去10s內(nèi)(可配置)從首領(lǐng)那里獲取過(guò)消息
- 在過(guò)去10s內(nèi)從首領(lǐng)那里獲取過(guò)最新的消息
3. broker配置
3.1 復(fù)制系數(shù)
如果復(fù)制系數(shù)為N片吊,那么在N - 1個(gè)broker失效的情況下绽昏,仍然能夠從主題讀取數(shù)據(jù)或向主題寫(xiě)入數(shù)據(jù)。所以俏脊,更高的復(fù)制洗漱會(huì)帶來(lái)更高的可用性全谤、可靠性和更少的故障
3.2 不完全的首領(lǐng)選舉
3.3 最少同步副本
4. 在可靠的系統(tǒng)里使用生產(chǎn)者
4.1 發(fā)送確認(rèn)
-
acks = 0
如果生產(chǎn)者能夠通過(guò)網(wǎng)絡(luò)把消息發(fā)送出去,就認(rèn)為消息已成功寫(xiě)入Kafka -
acks = 1
首領(lǐng)在收到消息并把它寫(xiě)入到分區(qū)數(shù)據(jù)文件(不一定同步到磁盤(pán)上)是會(huì)返回確認(rèn)或錯(cuò)誤響應(yīng) -
acks = all
首領(lǐng)在返回確認(rèn)或錯(cuò)誤響應(yīng)之前爷贫,會(huì)等待素有同步副本都收到消息
4.2 配置生產(chǎn)者的重試參數(shù)
4.3 額外的錯(cuò)誤處理
5. 在可靠的系統(tǒng)里使用消費(fèi)者
5.1 消費(fèi)者的可靠配置
-
group.id
如果連個(gè)消費(fèi)者具有相同的group.id
认然,并且訂閱了同一個(gè)主題,那么每個(gè)消費(fèi)者會(huì)分到主題分區(qū)的一個(gè)子集漫萄。如果你希望消費(fèi)者可以看到主題的所有消息卷员,那么需要它們?cè)O(shè)置唯一的group.id
-
auto.offset.reset
=ealiest | latest
指定了再?zèng)]有偏移量可提交時(shí),或者請(qǐng)求的偏移量在broker上不存在時(shí)腾务,消費(fèi)者的行為
earliest
:消費(fèi)者會(huì)動(dòng)分區(qū)的開(kāi)始位置讀取數(shù)據(jù)毕骡,不管偏移量是否有效,這樣會(huì)導(dǎo)致消費(fèi)者讀取大量的重復(fù)數(shù)據(jù)岩瘦,但可以保證最少的數(shù)據(jù)丟失
latest
:消費(fèi)者會(huì)從分區(qū)的末尾開(kāi)始讀取數(shù)據(jù)未巫,這樣可以減少重復(fù)處理消息,單很有可能丟失數(shù)據(jù) -
enable.auto.commit
自動(dòng)提交偏移量启昧,異步處理消息可能導(dǎo)致提交錯(cuò)誤的偏移量 -
auto.commit.interval.ms
自動(dòng)提交偏移量的頻率
5.2 顯式提交偏移量
- 總是在處理完事件后再提交偏移量
- 提交頻度是性能和重復(fù)處理消息數(shù)量之間的權(quán)衡
- 確保對(duì)提交的偏移量心里有數(shù)
- 再均衡
- 消費(fèi)者可能需要重試
- 消費(fèi)者可能需要維護(hù)狀態(tài)
- 長(zhǎng)時(shí)間處理
- 僅一次傳遞
Q&A
- 推送消息給消費(fèi)者和消費(fèi)者拉取消息各自?xún)?yōu)缺點(diǎn)
broker主動(dòng)地推送消息給下游的消費(fèi)者叙凡,由broker控制數(shù)據(jù)傳輸?shù)乃俾剩莃roker對(duì)下游消費(fèi)者能否及時(shí)處理消息不得而知箫津。如果數(shù)據(jù)的消費(fèi)速率低于生產(chǎn)速率狭姨,消費(fèi)者就會(huì)處于符合狀態(tài)宰啦,那么發(fā)送給消費(fèi)者的消息就會(huì)堆積得越來(lái)越多苏遥。而且,推送方式頁(yè)難以應(yīng)付不同類(lèi)型的消費(fèi)者赡模,因?yàn)椴煌M(fèi)者的消費(fèi)速率不一定都相同田炭,broker需要調(diào)整不同消費(fèi)者的傳輸速率,并讓每個(gè)消費(fèi)者充分利用系統(tǒng)的資源漓柑。這種方式實(shí)現(xiàn)起來(lái)比較困難教硫。
消費(fèi)者從broker主動(dòng)拉取數(shù)據(jù),broker是無(wú)狀態(tài)的辆布,它不需要標(biāo)記哪些消息時(shí)被消費(fèi)者處理過(guò)瞬矩,也不需要保證一條消息只會(huì)被一個(gè)消費(fèi)者處理。而且锋玲,不同的消費(fèi)者可以按照自己最大的處理能力來(lái)拉取數(shù)據(jù)景用,及時(shí)有時(shí)候某個(gè)消費(fèi)者的處理速度稍微落后,它也不會(huì)影響其他的消費(fèi)者惭蹂,并且在這個(gè)消費(fèi)者恢復(fù)處理速度后伞插,仍然可以追趕之前落后的數(shù)據(jù)割粮。
再有就是,推送方式比較難保證消費(fèi)者正常消費(fèi)消息狀態(tài)一致性媚污,需要保存每條消息的多種狀態(tài)舀瓢;而拉取方式只需要為每個(gè)有序分區(qū)記錄一個(gè)偏移量,定時(shí)將分區(qū)的消費(fèi)進(jìn)度保存成檢查點(diǎn)(checkpoint)文件耗美,不需要記錄消息的任何狀態(tài)京髓,而且有需要時(shí),消費(fèi)者可以回退到某個(gè)舊的偏移量位置幽歼,重新處理數(shù)據(jù)