at least once //重試
at most once //seq id (冥等性)
exactly once // 重試+冥等性
//atomic writes across partitions
1 從transaction coordinator 獲取事務(wù)ID(事務(wù)信息和狀態(tài)需要持久化到topic中 如果transaction coordinator掛了 用來做事務(wù)回復(fù))
2 寫message到leader partition中(如果是事務(wù)消息 通過消息頭部的messageType來判斷是事務(wù)消息還是普通消息)3
3 更新transaction coordinator中的事務(wù)狀態(tài)(commit or abort)
4 transaction coordinator寫Marker(事務(wù)的元數(shù)據(jù) commit or abort)到leader topic中 更新LSO(Last Stable Offset LSO之后的offset對consumer不可見)
5 如果事務(wù)abort了 leader partition 把失敗的元數(shù)據(jù)寫到abort transaction的信息文件中
6 consumer消費時 如果message是事務(wù)消息 而且在abort transaction中 丟棄這個消息
7 如果在write Marker過程中 某個broker掛了就重寫 write-Marker是at least once消息 有重復(fù)也無所謂
文件存儲
segment file 組成:有兩大部分組成 分別為index file 和data file, 此兩個文件一一對應(yīng),后綴".index"和".log"分別表示segment的索引文件和數(shù)據(jù)文件夺蛇。索引文件存儲消息的元數(shù)據(jù)卡乾,log文件存儲消息的內(nèi)容。
segment的命名規(guī)則:partion全局的第一個segment從0開始信卡,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset隔缀,數(shù)值最大為64位,19位數(shù)字字符長度傍菇,沒有用0填充
以100.indx和100.log的文件為例
100.index 100.log
{1, 0} {(message101, 0), (message102, 239)}
{3, 500} {(message103, 500), (message104, 589), (message105, 666), (message106, 700)}
{7, 739} {(message107, 739)}
... ...
{N, postion} message100+N postion
{消息在本文件的系列號猾瘸, 消息在log的物理偏移地址} {消息的全局系列號 物理偏移地址}
如何查找offset為107的message 根據(jù)文件名查找到對應(yīng)的100的segment file。依次定位到100.index的元數(shù)據(jù)物理位置和100.log的物理偏移地址
Isr
Kafka結(jié)合同步復(fù)制和異步復(fù)制丢习,使用ISR(與Partition Leader保持同步的Replica列表)的方式在確保數(shù)據(jù)不丟失和吞吐率之間做了平衡须妻。Producer只需把消息發(fā)送到Partition Leader,Leader將消息寫入本地Log泛领。Follower則從Leader pull數(shù)據(jù)荒吏。Follower在收到該消息向Leader發(fā)送ACK。一旦Leader收到了ISR中所有Replica的ACK渊鞋,該消息就被認為已經(jīng)commit了绰更,Leader將增加HW并且向Producer發(fā)送ACK。這樣如果leader掛了锡宋,只要Isr中有一個replica存活儡湾,就不會丟數(shù)據(jù)。
Isr動態(tài)更新
Leader會跟蹤ISR执俩,如果ISR中一個Follower宕機徐钠,或者落后太多,Leader將把它從ISR中移除役首。這里所描述的“落后太多”指Follower復(fù)制的消息落后于Leader后的條數(shù)超過預(yù)定值(replica.lag.max.messages)或者Follower超過一定時間(replica.lag.time.max.ms)未向Leader發(fā)送fetch請求尝丐。ISR是AR中的一個子集显拜,由leader維護ISR列表。Kafka的ISR的管理最終都會反饋到Zookeeper節(jié)點上爹袁。具體位置為:/brokers/topics/[topic]/partitions/[partition]/state远荠。目前有兩個地方會對這個Zookeeper的節(jié)點進行維護。
Controller來維護:Kafka集群中的其中一個Broker會被選舉為Controller失息,主要負責(zé)Partition管理和副本狀態(tài)管理譬淳,也會執(zhí)行類似于重分配partition之類的管理任務(wù)。在符合某些特定條件下盹兢,Controller下的LeaderSelector會選舉新的leader邻梆,ISR和新的leader_epoch及controller_epoch寫入Zookeeper的相關(guān)節(jié)點中。同時發(fā)起LeaderAndIsrRequest通知所有的replicas绎秒。
Leader來維護:leader有單獨的線程定期檢測ISR中follower是否脫離ISR, 如果發(fā)現(xiàn)ISR變化确虱,則會將新的ISR的信息返回到Zookeeper的相關(guān)節(jié)點中。
選舉機制
kafka中所有topic的leader選舉都有controller負責(zé)替裆。在所有的broker中選擇一個作為controller校辩,controller通過rpc的方式叫l(wèi)eader的變更通知broker。 controller的選舉則依賴zookeeper辆童,每個broker啟動的時候都嘗試去zookeeper上創(chuàng)建一個臨時節(jié)點宜咒,只有創(chuàng)建成功的broker才是controller,其他的broker則watch改節(jié)點把鉴,防止controller宕機后故黑,執(zhí)行重新選舉新的controller。
controller_epoch:此值為一個數(shù)字,kafka集群中第一個broker第一次啟動時為1庭砍,以后只要集群中center controller中央控制器所在broker變更或掛掉场晶,就會重新選舉新的center controller,每次center controller變更controller_epoch值就會 + 1