Kafka設(shè)計(jì)

全局架構(gòu)圖

全局架構(gòu)圖

磁盤結(jié)構(gòu)

記錄格式

type VarInt int // 變長(zhǎng)整型钉蒲,使用Varints和ZigZag編碼的整型

type RecordBatch struct {
    FirstOffset          int64 // 起始偏移
    Length               int32 // 從PartitionLeaderEpoch開始的長(zhǎng)度
    PartitionLeaderEpoch int32 // 分區(qū)Leader紀(jì)元
    Magic                int8  // 消息版本號(hào)弟头,當(dāng)前為2,表示V2
    Crc32                int32 // crc校驗(yàn)和
    Attributes           int16 // [0-2]壓縮格式, 4時(shí)間戳類型, 5是否出于事務(wù)中, 6控制消息
    LastOffsetDelta      int32 // 最后一個(gè)Record的offset與FirstOffset的差值朵纷,用于保證消息組裝的正確性
    FirstTimestamp       int64 // 第一個(gè)Record的時(shí)間戳
    MaxTimestamp         int64 // 最后一個(gè)Record的時(shí)間戳锌俱,用于保證消息組裝的正確性
    ProducerID           int64 // 用于支持冪等性和事務(wù)
    ProducerEpoch        int32 // 用于支持冪等性和事務(wù)
    FirstSequence        int32 // 用于支持冪等性和事務(wù)
    RecordsCount         int32 // RecordsCount數(shù)組元素個(gè)數(shù)
    Records              []Record
}

type Record struct {
    Length         VarInt   // Record長(zhǎng)度
    Attributions   int8     // 屬性晤郑,暫時(shí)沒(méi)用
    TimestampDelta VarInt   // 相對(duì)于RecordBatch的FirstTimestamp的偏移量
    OffsetDelta    VarInt   // 相對(duì)于RecordBatch的FirstOffset的偏移量
    KeyLength      VarInt   // key長(zhǎng)度
    Key            []byte   // key內(nèi)容
    ValueLength    VarInt   // value長(zhǎng)度
    Value          []byte   // value內(nèi)容
    HeadersCount   VarInt   // Headers數(shù)組元素個(gè)數(shù)
    Headers        []Header // Headers數(shù)組,用于支持應(yīng)用級(jí)別擴(kuò)展
}

type Header struct {
    HeaderKeyLength   VarInt
    HeaderKey         string
    HeaderValueLength VarInt
    HeaderValue       string
}

日志文件存儲(chǔ)

disk

使用時(shí)間戳查找消息

  1. 通過(guò)時(shí)間戳日志分段索引文件名查找對(duì)應(yīng)的日志分段文件
  2. 在該日志分段中通過(guò)二分法查找到最近的偏移量
  3. 通過(guò)該偏移量在偏移量日志分段索引文件中查找對(duì)應(yīng)的消息位置
  4. 從該位置開始,向后查找造寝,直到找到不小于指定時(shí)間戳的消息

日志清理

日志刪除

  • 基于時(shí)間:rog.retention.hours/minutes/ms
  • 基于日志大小: log.retention.bytes
  • 基于起始偏移量: DelectRecordsRequest.logStartOffset

日志壓縮/合并

對(duì)于相同的key的不同value值磕洪,只保留最后一個(gè)版本。當(dāng)應(yīng)用僅關(guān)心消息的最新value時(shí)匹舞,可以開啟日志合并功能褐鸥。

  • 線程會(huì)選擇污濁率最高的日志文件來(lái)進(jìn)行清理,污濁率=dirtyBytes/(cleanBytes+dirtyBytes)赐稽。
  • log.cleaner.min.compaction.lag.ms,消息在被清理前的最小保留時(shí)間叫榕。
  • 實(shí)現(xiàn)方式:第一次遍歷日志來(lái)構(gòu)建key和最后的offset的映射關(guān)系,第二次遍歷判斷是否需要保留姊舵,如果不需要晰绎,就刪除。SkimpyOffsetMap使用md5來(lái)進(jìn)計(jì)算key的哈希值括丁,在映射時(shí)僅考慮md5荞下,如果不同的key哈希到了同一個(gè)md5,會(huì)導(dǎo)致某個(gè)key對(duì)應(yīng)的消息丟失史飞,丟失率取決于md5的沖突率尖昏,沖突時(shí)用線性探測(cè)法來(lái)處理。
  • 合并時(shí)构资,是對(duì)整個(gè)日志進(jìn)行合并抽诉,所以清理之后,可能會(huì)將多個(gè)日志分段合并為一個(gè)段吐绵。

消費(fèi)位移

  1. 保存在_comsumer_offset主題中
  2. 可以通過(guò)offset或者時(shí)間戳進(jìn)行定位
  3. 利用seek功能迹淌,我們可以將消費(fèi)位移保存在外部存儲(chǔ)中

消費(fèi)者重均衡

消費(fèi)組分區(qū)分配策略

RangeAssignor

  • 原理:對(duì)于每一個(gè)訂閱的主題,按照消費(fèi)者總數(shù)和主題分區(qū)數(shù)進(jìn)行整除運(yùn)算來(lái)獲得一個(gè)跨度己单,然后將分區(qū)按照跨度進(jìn)行平均分配
  • 例子:C0[T0P0,T0P1;T1P1,T1P2],C1[T0P2;T1P2]
  • 缺點(diǎn):在一個(gè)消費(fèi)者訂閱多個(gè)主題的情況下且主題分區(qū)無(wú)法整除消費(fèi)者數(shù)時(shí)唉窃,會(huì)導(dǎo)致不均衡
  • 評(píng)價(jià):適合消費(fèi)者和主題分區(qū)數(shù)能夠確定且不變時(shí),不實(shí)用纹笼,對(duì)擴(kuò)容不友好纹份,建議不要用

RoundRobinAssignor

  • 原理:將消費(fèi)組內(nèi)所有消費(fèi)者及消費(fèi)者訂閱的所有主題的分區(qū)按照字典序排序,然后通過(guò)輪詢方式逐個(gè)將分區(qū)依次分配給每個(gè)消費(fèi)者
  • 例子:C0[T0P0;T0P2;T1P1],C2[T0P1;T1P0;T1P2]
  • 缺點(diǎn):在消費(fèi)者訂閱的主題不一樣時(shí)廷痘,會(huì)導(dǎo)致不均衡
  • 評(píng)價(jià):一般情況下同一個(gè)消費(fèi)組會(huì)訂閱相同的主題信息矮嫉,可以使用

StickyAssignor

  • 原理:很復(fù)雜
  • 目的:要分區(qū)的分配要盡可能均勻;分區(qū)的分配盡可能與上次分配的保持相同牍疏。
  • 評(píng)價(jià):比上面兩種都好,建議使用

自定義Assignor

  • 原理:實(shí)現(xiàn)PartitionAssignor接口
  • 評(píng)價(jià):不建議

發(fā)生時(shí)機(jī)

  • 組成員數(shù)發(fā)生變更:加入組或者離開組或者被剔出組拨齐。
  • 訂閱主題數(shù)發(fā)生變更:正則訂閱或者手動(dòng)更改訂閱主題數(shù)鳞陨。
  • 訂閱主題的分區(qū)數(shù)發(fā)生變更:分區(qū)重分配。

流程

kafka重均衡

分區(qū)重分配

基本原理:先通過(guò)控制器為每個(gè)分區(qū)添加新副本(增加副本因子),待復(fù)制完成后厦滤,將舊的副本從副本清單中刪除(恢復(fù)為原先的副本因子)

事務(wù)

冪等性

實(shí)現(xiàn)原理:
Kafka 的冪等只能保證單個(gè)生產(chǎn)者會(huì)話(session)中單分區(qū)的冪等援岩。對(duì)于每一個(gè)生產(chǎn)者,kafka會(huì)為其分配一個(gè)pid掏导,每一對(duì)<pid,partiton>都對(duì)應(yīng)一個(gè)序列號(hào)享怀,在生產(chǎn)者發(fā)送消息的時(shí)候,序列號(hào)遞增趟咆。當(dāng)kafka收到新消息時(shí)添瓷,如果序列號(hào)sn<so+1,則說(shuō)明發(fā)生了重復(fù)寫入值纱,則丟棄鳞贷;如果序列號(hào)sn>so+1,說(shuō)明出現(xiàn)了消息亂序,拋出異常OutOfOrderSequenceException虐唠。

事務(wù)

概念:kafka的事務(wù)可以保證應(yīng)用程序?qū)⒍鄠€(gè)的消費(fèi)消息搀愧、生產(chǎn)消息、提交消費(fèi)位移當(dāng)作原子操作來(lái)處理疆偿,同時(shí)成功或失敗咱筛,即使該生產(chǎn)或消費(fèi)會(huì)跨多個(gè)分區(qū)。

應(yīng)用場(chǎng)景:Consume-Transform-Produce杆故,以支持流失計(jì)算

事務(wù)流程
  1. 使用transactionID獲取計(jì)算獲取TransactionCoordinator的broker地址迅箩。
  2. 使用transactionID請(qǐng)求得到PID信息,TC在收到該請(qǐng)求后會(huì)將transaction和pid保存到__transaction_state中反番,以進(jìn)行持久化沙热。
  3. 生產(chǎn)者使用beginTransaction()開啟一個(gè)事務(wù)。
  4. 消費(fèi)-轉(zhuǎn)換-生產(chǎn)
    1. 應(yīng)用程序通過(guò)消費(fèi)者消費(fèi)到消息罢缸,轉(zhuǎn)換完成后篙贸,在生產(chǎn)者向新的分區(qū)寫入消息之前,先通過(guò)AddPartitionsToTxnRequest將新的分區(qū)記錄到__transaction_state中枫疆,包括<transactionID,pid,topic-partitions>爵川。
    2. 生產(chǎn)者向?qū)?yīng)的分區(qū)所在的broker發(fā)送消息,消息中會(huì)包含<pid,seq_num>息楔,注意由于寫入的消息的事務(wù)控制字段都是1寝贡,所以在read_commited級(jí)別下對(duì)應(yīng)用程序是不可見的。
    3. 通過(guò)AddOffsetsToTxnRequest將所有要提交的分區(qū)的offset的信息和group_id寫入__transaction_state中值依,TC可以通過(guò)對(duì)應(yīng)的group_id來(lái)計(jì)算出GC圃泡,GC也會(huì)保存在__transaction_state中,從而在生產(chǎn)者宕機(jī)后愿险,支持后續(xù)TC的崩潰恢復(fù)颇蜡。
    4. 生產(chǎn)者通過(guò)TxnOffsetCommitRequest將所有分區(qū)的偏移量條,寫入到__consumer_offsets中,注意由于寫入的消息的事務(wù)控制字段都是1风秤,所以在read_commited級(jí)別下對(duì)應(yīng)用程序是不可見的鳖目。
    5. 生產(chǎn)者通過(guò)EndTxnRequest向TC提交或者中止事務(wù),TC會(huì)將PREPARE_COMMIT或PREPARE_ABORT信息寫入到__transaction_state中缤弦,然后在通過(guò)WriteTxnMarkersRequest請(qǐng)求向分區(qū)(GC和生產(chǎn)者寫入的分區(qū))寫入COMMIT或ABORT消息领迈,再之后將COMPLETE_COMMIT或COMPLETE_ABORT寫入到__transaction_state中。

復(fù)制

復(fù)制

如上一主三從

  • 其中2個(gè)follower在ISR集合中碍沐,1個(gè)失效follower在OSR集合中狸捅,其中min.insync.replicas=2,當(dāng)ISR集合中的broker數(shù)少于2個(gè)時(shí)抢韭,該分區(qū)將禁止寫入薪贫。
  • ISR集合中的所有follower中最小的LEO為HW,每次follower向leader進(jìn)行fetch時(shí)刻恭,會(huì)帶上自身的leo瞧省,leader會(huì)計(jì)算出hw進(jìn)行更新,并返回給follower鳍贾。
  • 每當(dāng)OSR中的一個(gè)follower追上最小的LEO即HW時(shí)鞍匾,該集合將會(huì)進(jìn)入ISR集合中;每次follower請(qǐng)求拉取到leader副本leo前最新的消息時(shí)骑科,則認(rèn)為是一次caughUp橡淑,leader副本將會(huì)更新對(duì)應(yīng)follower的lastCaughUpTime時(shí)間,在每replicaMaxLagTime/2一次的isr-expiration后臺(tái)周期任務(wù)中咆爽,如果檢查到某個(gè)follower滿足now-lastCaughUpTime>replicaMaxLagTime梁棠,則將該follower將到OSR集合中。
  • 每次ISR集合的變更都會(huì)被集合到isrChangeSet中斗埂,2.5s一周期的isr-changge-propagation任務(wù)會(huì)將ISR變更信息寫入ZK中的/isr_change_notification/isr_change_*中符糊,controller會(huì)通過(guò)Watcher監(jiān)聽到該消息,進(jìn)而更新自身的元數(shù)據(jù)并向其它broker發(fā)送更新元數(shù)據(jù)的請(qǐng)求呛凶,然后刪除isr_change節(jié)點(diǎn)男娄。
  • 當(dāng)producer發(fā)送消息時(shí),攜帶的acks參數(shù)會(huì)告訴leader需要幾個(gè)節(jié)點(diǎn)的確認(rèn)才能響應(yīng)成功漾稀,leader副本寫入數(shù)據(jù)到本地日志后會(huì)hold模闲,等待其它follower將這條消息復(fù)制走,當(dāng)acks-1個(gè)follower復(fù)制后崭捍,才會(huì)解除hold尸折,響應(yīng)成功。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末殷蛇,一起剝皮案震驚了整個(gè)濱河市翁授,隨后出現(xiàn)的幾起案子拣播,更是在濱河造成了極大的恐慌,老刑警劉巖收擦,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異谍倦,居然都是意外死亡塞赂,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門昼蛀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)宴猾,“玉大人,你說(shuō)我怎么就攤上這事叼旋〕鸲撸” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵夫植,是天一觀的道長(zhǎng)讹剔。 經(jīng)常有香客問(wèn)我,道長(zhǎng)详民,這世上最難降的妖魔是什么延欠? 我笑而不...
    開封第一講書人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮沈跨,結(jié)果婚禮上由捎,老公的妹妹穿的比我還像新娘。我一直安慰自己饿凛,他們只是感情好狞玛,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著涧窒,像睡著了一般心肪。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上杀狡,一...
    開封第一講書人閱讀 51,631評(píng)論 1 305
  • 那天蒙畴,我揣著相機(jī)與錄音,去河邊找鬼呜象。 笑死膳凝,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的恭陡。 我是一名探鬼主播蹬音,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼休玩!你這毒婦竟也來(lái)了著淆?” 一聲冷哼從身側(cè)響起劫狠,我...
    開封第一講書人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎永部,沒(méi)想到半個(gè)月后独泞,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡苔埋,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年懦砂,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片组橄。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡荞膘,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出玉工,到底是詐尸還是另有隱情羽资,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布遵班,位于F島的核電站屠升,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏费奸。R本人自食惡果不足惜弥激,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望愿阐。 院中可真熱鬧微服,春花似錦、人聲如沸缨历。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)辛孵。三九已至丛肮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間魄缚,已是汗流浹背宝与。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留冶匹,地道東北人习劫。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像嚼隘,于是被迫代替她去往敵國(guó)和親诽里。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容