全局架構(gòu)圖
磁盤結(jié)構(gòu)
記錄格式
type VarInt int // 變長(zhǎng)整型钉蒲,使用Varints和ZigZag編碼的整型
type RecordBatch struct {
FirstOffset int64 // 起始偏移
Length int32 // 從PartitionLeaderEpoch開始的長(zhǎng)度
PartitionLeaderEpoch int32 // 分區(qū)Leader紀(jì)元
Magic int8 // 消息版本號(hào)弟头,當(dāng)前為2,表示V2
Crc32 int32 // crc校驗(yàn)和
Attributes int16 // [0-2]壓縮格式, 4時(shí)間戳類型, 5是否出于事務(wù)中, 6控制消息
LastOffsetDelta int32 // 最后一個(gè)Record的offset與FirstOffset的差值朵纷,用于保證消息組裝的正確性
FirstTimestamp int64 // 第一個(gè)Record的時(shí)間戳
MaxTimestamp int64 // 最后一個(gè)Record的時(shí)間戳锌俱,用于保證消息組裝的正確性
ProducerID int64 // 用于支持冪等性和事務(wù)
ProducerEpoch int32 // 用于支持冪等性和事務(wù)
FirstSequence int32 // 用于支持冪等性和事務(wù)
RecordsCount int32 // RecordsCount數(shù)組元素個(gè)數(shù)
Records []Record
}
type Record struct {
Length VarInt // Record長(zhǎng)度
Attributions int8 // 屬性晤郑,暫時(shí)沒(méi)用
TimestampDelta VarInt // 相對(duì)于RecordBatch的FirstTimestamp的偏移量
OffsetDelta VarInt // 相對(duì)于RecordBatch的FirstOffset的偏移量
KeyLength VarInt // key長(zhǎng)度
Key []byte // key內(nèi)容
ValueLength VarInt // value長(zhǎng)度
Value []byte // value內(nèi)容
HeadersCount VarInt // Headers數(shù)組元素個(gè)數(shù)
Headers []Header // Headers數(shù)組,用于支持應(yīng)用級(jí)別擴(kuò)展
}
type Header struct {
HeaderKeyLength VarInt
HeaderKey string
HeaderValueLength VarInt
HeaderValue string
}
日志文件存儲(chǔ)
使用時(shí)間戳查找消息
- 通過(guò)時(shí)間戳日志分段索引文件名查找對(duì)應(yīng)的日志分段文件
- 在該日志分段中通過(guò)二分法查找到最近的偏移量
- 通過(guò)該偏移量在偏移量日志分段索引文件中查找對(duì)應(yīng)的消息位置
- 從該位置開始,向后查找造寝,直到找到不小于指定時(shí)間戳的消息
日志清理
日志刪除
- 基于時(shí)間:rog.retention.hours/minutes/ms
- 基于日志大小: log.retention.bytes
- 基于起始偏移量: DelectRecordsRequest.logStartOffset
日志壓縮/合并
對(duì)于相同的key的不同value值磕洪,只保留最后一個(gè)版本。當(dāng)應(yīng)用僅關(guān)心消息的最新value時(shí)匹舞,可以開啟日志合并功能褐鸥。
- 線程會(huì)選擇污濁率最高的日志文件來(lái)進(jìn)行清理,污濁率=dirtyBytes/(cleanBytes+dirtyBytes)赐稽。
- log.cleaner.min.compaction.lag.ms,消息在被清理前的最小保留時(shí)間叫榕。
- 實(shí)現(xiàn)方式:第一次遍歷日志來(lái)構(gòu)建key和最后的offset的映射關(guān)系,第二次遍歷判斷是否需要保留姊舵,如果不需要晰绎,就刪除。SkimpyOffsetMap使用md5來(lái)進(jìn)計(jì)算key的哈希值括丁,在映射時(shí)僅考慮md5荞下,如果不同的key哈希到了同一個(gè)md5,會(huì)導(dǎo)致某個(gè)key對(duì)應(yīng)的消息丟失史飞,丟失率取決于md5的沖突率尖昏,沖突時(shí)用線性探測(cè)法來(lái)處理。
- 合并時(shí)构资,是對(duì)整個(gè)日志進(jìn)行合并抽诉,所以清理之后,可能會(huì)將多個(gè)日志分段合并為一個(gè)段吐绵。
消費(fèi)位移
- 保存在_comsumer_offset主題中
- 可以通過(guò)offset或者時(shí)間戳進(jìn)行定位
- 利用seek功能迹淌,我們可以將消費(fèi)位移保存在外部存儲(chǔ)中
消費(fèi)者重均衡
消費(fèi)組分區(qū)分配策略
RangeAssignor
- 原理:對(duì)于每一個(gè)訂閱的主題,按照消費(fèi)者總數(shù)和主題分區(qū)數(shù)進(jìn)行整除運(yùn)算來(lái)獲得一個(gè)跨度己单,然后將分區(qū)按照跨度進(jìn)行平均分配
- 例子:C0[T0P0,T0P1;T1P1,T1P2],C1[T0P2;T1P2]
- 缺點(diǎn):在一個(gè)消費(fèi)者訂閱多個(gè)主題的情況下且主題分區(qū)無(wú)法整除消費(fèi)者數(shù)時(shí)唉窃,會(huì)導(dǎo)致不均衡
- 評(píng)價(jià):適合消費(fèi)者和主題分區(qū)數(shù)能夠確定且不變時(shí),不實(shí)用纹笼,對(duì)擴(kuò)容不友好纹份,建議不要用
RoundRobinAssignor
- 原理:將消費(fèi)組內(nèi)所有消費(fèi)者及消費(fèi)者訂閱的所有主題的分區(qū)按照字典序排序,然后通過(guò)輪詢方式逐個(gè)將分區(qū)依次分配給每個(gè)消費(fèi)者
- 例子:C0[T0P0;T0P2;T1P1],C2[T0P1;T1P0;T1P2]
- 缺點(diǎn):在消費(fèi)者訂閱的主題不一樣時(shí)廷痘,會(huì)導(dǎo)致不均衡
- 評(píng)價(jià):一般情況下同一個(gè)消費(fèi)組會(huì)訂閱相同的主題信息矮嫉,可以使用
StickyAssignor
- 原理:很復(fù)雜
- 目的:要分區(qū)的分配要盡可能均勻;分區(qū)的分配盡可能與上次分配的保持相同牍疏。
- 評(píng)價(jià):比上面兩種都好,建議使用
自定義Assignor
- 原理:實(shí)現(xiàn)PartitionAssignor接口
- 評(píng)價(jià):不建議
發(fā)生時(shí)機(jī)
- 組成員數(shù)發(fā)生變更:加入組或者離開組或者被剔出組拨齐。
- 訂閱主題數(shù)發(fā)生變更:正則訂閱或者手動(dòng)更改訂閱主題數(shù)鳞陨。
- 訂閱主題的分區(qū)數(shù)發(fā)生變更:分區(qū)重分配。
流程
分區(qū)重分配
基本原理:先通過(guò)控制器為每個(gè)分區(qū)添加新副本(增加副本因子),待復(fù)制完成后厦滤,將舊的副本從副本清單中刪除(恢復(fù)為原先的副本因子)
事務(wù)
冪等性
實(shí)現(xiàn)原理:
Kafka 的冪等只能保證單個(gè)生產(chǎn)者會(huì)話(session)中單分區(qū)的冪等援岩。對(duì)于每一個(gè)生產(chǎn)者,kafka會(huì)為其分配一個(gè)pid掏导,每一對(duì)<pid,partiton>都對(duì)應(yīng)一個(gè)序列號(hào)享怀,在生產(chǎn)者發(fā)送消息的時(shí)候,序列號(hào)遞增趟咆。當(dāng)kafka收到新消息時(shí)添瓷,如果序列號(hào)sn<so+1,則說(shuō)明發(fā)生了重復(fù)寫入值纱,則丟棄鳞贷;如果序列號(hào)sn>so+1,說(shuō)明出現(xiàn)了消息亂序,拋出異常OutOfOrderSequenceException虐唠。
事務(wù)
概念:kafka的事務(wù)可以保證應(yīng)用程序?qū)⒍鄠€(gè)的消費(fèi)消息搀愧、生產(chǎn)消息、提交消費(fèi)位移當(dāng)作原子操作來(lái)處理疆偿,同時(shí)成功或失敗咱筛,即使該生產(chǎn)或消費(fèi)會(huì)跨多個(gè)分區(qū)。
應(yīng)用場(chǎng)景:Consume-Transform-Produce杆故,以支持流失計(jì)算
- 使用transactionID獲取計(jì)算獲取TransactionCoordinator的broker地址迅箩。
- 使用transactionID請(qǐng)求得到PID信息,TC在收到該請(qǐng)求后會(huì)將transaction和pid保存到__transaction_state中反番,以進(jìn)行持久化沙热。
- 生產(chǎn)者使用beginTransaction()開啟一個(gè)事務(wù)。
- 消費(fèi)-轉(zhuǎn)換-生產(chǎn)
- 應(yīng)用程序通過(guò)消費(fèi)者消費(fèi)到消息罢缸,轉(zhuǎn)換完成后篙贸,在生產(chǎn)者向新的分區(qū)寫入消息之前,先通過(guò)AddPartitionsToTxnRequest將新的分區(qū)記錄到__transaction_state中枫疆,包括<transactionID,pid,topic-partitions>爵川。
- 生產(chǎn)者向?qū)?yīng)的分區(qū)所在的broker發(fā)送消息,消息中會(huì)包含<pid,seq_num>息楔,注意由于寫入的消息的事務(wù)控制字段都是1寝贡,所以在read_commited級(jí)別下對(duì)應(yīng)用程序是不可見的。
- 通過(guò)AddOffsetsToTxnRequest將所有要提交的分區(qū)的offset的信息和group_id寫入__transaction_state中值依,TC可以通過(guò)對(duì)應(yīng)的group_id來(lái)計(jì)算出GC圃泡,GC也會(huì)保存在__transaction_state中,從而在生產(chǎn)者宕機(jī)后愿险,支持后續(xù)TC的崩潰恢復(fù)颇蜡。
- 生產(chǎn)者通過(guò)TxnOffsetCommitRequest將所有分區(qū)的偏移量條,寫入到__consumer_offsets中,注意由于寫入的消息的事務(wù)控制字段都是1风秤,所以在read_commited級(jí)別下對(duì)應(yīng)用程序是不可見的鳖目。
- 生產(chǎn)者通過(guò)EndTxnRequest向TC提交或者中止事務(wù),TC會(huì)將PREPARE_COMMIT或PREPARE_ABORT信息寫入到__transaction_state中缤弦,然后在通過(guò)WriteTxnMarkersRequest請(qǐng)求向分區(qū)(GC和生產(chǎn)者寫入的分區(qū))寫入COMMIT或ABORT消息领迈,再之后將COMPLETE_COMMIT或COMPLETE_ABORT寫入到__transaction_state中。
復(fù)制
如上一主三從
- 其中2個(gè)follower在ISR集合中碍沐,1個(gè)失效follower在OSR集合中狸捅,其中min.insync.replicas=2,當(dāng)ISR集合中的broker數(shù)少于2個(gè)時(shí)抢韭,該分區(qū)將禁止寫入薪贫。
- ISR集合中的所有follower中最小的LEO為HW,每次follower向leader進(jìn)行fetch時(shí)刻恭,會(huì)帶上自身的leo瞧省,leader會(huì)計(jì)算出hw進(jìn)行更新,并返回給follower鳍贾。
- 每當(dāng)OSR中的一個(gè)follower追上最小的LEO即HW時(shí)鞍匾,該集合將會(huì)進(jìn)入ISR集合中;每次follower請(qǐng)求拉取到leader副本leo前最新的消息時(shí)骑科,則認(rèn)為是一次caughUp橡淑,leader副本將會(huì)更新對(duì)應(yīng)follower的lastCaughUpTime時(shí)間,在每replicaMaxLagTime/2一次的isr-expiration后臺(tái)周期任務(wù)中咆爽,如果檢查到某個(gè)follower滿足now-lastCaughUpTime>replicaMaxLagTime梁棠,則將該follower將到OSR集合中。
- 每次ISR集合的變更都會(huì)被集合到isrChangeSet中斗埂,2.5s一周期的isr-changge-propagation任務(wù)會(huì)將ISR變更信息寫入ZK中的/isr_change_notification/isr_change_*中符糊,controller會(huì)通過(guò)Watcher監(jiān)聽到該消息,進(jìn)而更新自身的元數(shù)據(jù)并向其它broker發(fā)送更新元數(shù)據(jù)的請(qǐng)求呛凶,然后刪除isr_change節(jié)點(diǎn)男娄。
- 當(dāng)producer發(fā)送消息時(shí),攜帶的acks參數(shù)會(huì)告訴leader需要幾個(gè)節(jié)點(diǎn)的確認(rèn)才能響應(yīng)成功漾稀,leader副本寫入數(shù)據(jù)到本地日志后會(huì)hold模闲,等待其它follower將這條消息復(fù)制走,當(dāng)acks-1個(gè)follower復(fù)制后崭捍,才會(huì)解除hold尸折,響應(yīng)成功。