Kafka設(shè)計解析(三)恰好一次和事務(wù)消息

1.冪等消息

為了解決重試導(dǎo)致的消息重復(fù)、亂序問題柳沙,kafka引入了冪等消息跃赚。冪等消息保證producer在一次會話內(nèi)寫入一個partition內(nèi)的消息具有冪等性囱晴,可以通過重試來確保消息發(fā)布的Exactly Once語義澳泵。

實現(xiàn)邏輯很簡單:

  • 區(qū)分producer會話

producer每次啟動后实愚,首先向broker申請一個全局唯一的pid,用來標(biāo)識本次會話兔辅。

  • 消息檢測

message_v2 增加了sequence number字段腊敲,producer每發(fā)一批消息,seq就加1维苔。

broker在內(nèi)存維護(pid,seq)映射碰辅,收到消息后檢查seq,如果介时,

new_seq=old_seq+1: 正常消息没宾;

new_seq<=old_seq : 重復(fù)消息;

new_seq>old_seq+1: 消息丟失沸柔;
  • producer重試

producer在收到明確的的消息丟失ack榕吼,或者超時后未收到ack,要進行重試勉失。

2.事務(wù)消息

考慮在stream處理的場景中,需要多個消息的原子寫入語義原探,要么全部寫入成功乱凿,要么全部失敗,這就是kafka事務(wù)消息要解決的問題咽弦。

事務(wù)消息是由producer徒蟆、事務(wù)協(xié)調(diào)器、broker型型、組協(xié)調(diào)器段审、consumer共同參與實現(xiàn)的,

1)producer

為producer指定固定的TransactionalId闹蒜,可以穿越producer的多次會話(producer重啟/斷線重連)中寺枉,持續(xù)標(biāo)識producer的身份抑淫。

使用epoch標(biāo)識producer的每一次"重生",防止同一producer存在多個會話姥闪。

producer遵從冪等消息的行為始苇,并在發(fā)送的recordbatch中增加事務(wù)id和epoch。

2)事務(wù)協(xié)調(diào)器(Transaction Coordinator)

引入事務(wù)協(xié)調(diào)器筐喳,以兩階段提交的方式催式,實現(xiàn)消息的事務(wù)提交。

事務(wù)協(xié)調(diào)器使用一個特殊的topic:transaction避归,來做事務(wù)提交日志荣月。

事務(wù)控制器通過RPC調(diào)研,協(xié)調(diào) broker 和 consumer coordinator 實現(xiàn)事務(wù)的兩階段提交梳毙。

每一個broker都會啟動一個事務(wù)協(xié)調(diào)器哺窄,使用hash(TransactionalId)確定producer對應(yīng)的事務(wù)協(xié)調(diào)器,使得整個集群的負載均衡顿天。

3) broker

broker處理在事務(wù)協(xié)調(diào)器的commit/abort控制消息堂氯,把控制消息向正常消息一樣寫入topic(和正常消息交織在一起,用來確認事務(wù)提交的日志偏移)牌废,并向前推進消息提交偏移hw咽白。

4) 組協(xié)調(diào)器

如果在事務(wù)過程中,提交了消費偏移鸟缕,組協(xié)調(diào)器在offset log中寫入事務(wù)消費偏移晶框。當(dāng)事務(wù)提交時,在offset log中寫入事務(wù)offset確認消息懂从。

5)consumer

consumer過濾未提交消息和事務(wù)控制消息授段,使這些消息對用戶不可見。

有兩種實現(xiàn)方式番甩,

  • consumer緩存方式

設(shè)置isolation.level=read_uncommitted侵贵,此時topic的所有消息對consumer都可見。

consumer緩存這些消息缘薛,直到收到事務(wù)控制消息窍育。若事務(wù)commit,則對外發(fā)布這些消息宴胧;若事務(wù)abort漱抓,則丟棄這些消息。

  • broker過濾方式

設(shè)置isolation.level=read_committed恕齐,此時topic中未提交的消息對consumer不可見乞娄,只有在事務(wù)結(jié)束后,消息才對consumer可見。

broker給consumer的BatchRecord消息中仪或,會包含以列表确镊,指明哪些是"abort"事務(wù),consumer丟棄abort事務(wù)的消息即可溶其。

事務(wù)消息處理流程如圖1所示骚腥,

圖1 事務(wù)消息業(yè)務(wù)流程

圖1 事務(wù)消息業(yè)務(wù)流程

流程說明:

1. 查找事務(wù)協(xié)調(diào)器 -- FindCoordinatorRequest

事務(wù)協(xié)調(diào)器是分配pid和管理事務(wù)的核心,produer首先對任何一個broker發(fā)送FindCoordinatorRequest瓶逃,發(fā)現(xiàn)自己的事務(wù)協(xié)調(diào)器束铭。

2. 申請pid -- InitPidRequest

緊接著,producer向事務(wù)協(xié)調(diào)器發(fā)送InitPidRequest厢绝,申請生成pid契沫。

2a.當(dāng)指定了transactional.id時,事務(wù)協(xié)調(diào)器為producer分區(qū)pid昔汉,并更新epoch懈万,把(tid,pid)的映射關(guān)系寫入事務(wù)日志。 同時清理tid任何未完成的事務(wù)靶病,丟棄未提交的消息会通。

3. 啟動事務(wù)

啟動事務(wù)是producer的本地操作,促使producer更新內(nèi)部狀態(tài)娄周,不會和事務(wù)協(xié)調(diào)器發(fā)生關(guān)系涕侈。

事務(wù)協(xié)調(diào)器自動啟動事務(wù),始終處在一個接一個的事務(wù)處理狀態(tài)機中煤辨。

4. consume-transform-produce 事務(wù)循環(huán)
4.1. 注冊partition -- AddPartitionsToTxnRequest

對于每一個要在事務(wù)中寫消息的topic分區(qū)裳涛,producer應(yīng)當(dāng)在第一次發(fā)消息前,向事務(wù)處理器注冊分區(qū)众辨。

4.1a.事務(wù)處理器把事務(wù)關(guān)聯(lián)的分區(qū)寫入事務(wù)日志端三。

在提交或終止事務(wù)時,事務(wù)協(xié)調(diào)器需要這些信息鹃彻,控制事務(wù)涉及的所有分區(qū)leader完成事務(wù)提交或終止郊闯。

4.2. 寫消息 -- ProduceRequest

4.2a. producer向分區(qū)leader寫消息,消息中包含tid,pid,epoch和seq蛛株。

4.3. 提交消費偏移 -- AddOffsetCommitsToTxnRequest

4.3a. producer向事務(wù)協(xié)調(diào)器發(fā)送消費偏移虚婿,事務(wù)協(xié)調(diào)器在事務(wù)日志中記錄偏移信息,并把組協(xié)調(diào)器返回給producer泳挥。

4.4. 提交消費偏移 -- TxnOffsetCommitRequest

4.4a. producer向組協(xié)調(diào)器發(fā)送TxnOffsetCommitRequest,組協(xié)調(diào)器把偏移信息寫入偏移日志至朗。但是屉符,要一直等到事務(wù)提交后,這個偏移才生效,對外部可見矗钟。

5. 提交或終止事務(wù)
5.1. EndTxnRequest

收到提交或終止事務(wù)的請求時唆香,事務(wù)處理器執(zhí)行下面的操作:

1. 在事務(wù)日志中寫入PREPARE_COMMIT或PREPARE_ABORT消息(5.1a)。

2. 通過WriteTxnMarkerRequest向事務(wù)中的所有broker發(fā)事務(wù)控制消息(5.2)吨艇。

3. 在事務(wù)之日中寫入COMMITTED或ABORTED消息(5.3)躬它。

5.2. WriteTxnMarkerRequest

這個消息由事務(wù)處理器發(fā)給事務(wù)中所涉及分區(qū)的leader。

當(dāng)收到這個消息后东涡,broker會在分區(qū)log中寫入一個COMMIT或ABORT控制消息冯吓。同時,也會更新該分區(qū)的事務(wù)提交偏移hw疮跑。

如果事務(wù)中有提交消費偏移组贺, broker也會把控制消息寫入 __consumer-offsets log,并通知組協(xié)調(diào)器使事務(wù)中提交的消費偏移生效祖娘。

5.3. 寫最終的commit或abort消息

當(dāng)所有的commit或abort消息寫入數(shù)據(jù)日志失尖,事務(wù)協(xié)調(diào)器在事務(wù)日志中寫入事務(wù)日志,標(biāo)志這事務(wù)結(jié)束渐苏。

至此掀潮,本事務(wù)的所有狀態(tài)信息都可以被刪除,可以開始一個新的事務(wù)琼富。

在實現(xiàn)上仪吧,還有很多細節(jié),比如公黑,事務(wù)協(xié)調(diào)器會啟動定時器邑商,用來檢測并終止開始后長時間不活動的事務(wù),具體請參考下面列出的kafka社區(qū)技術(shù)文檔凡蚜。

【總結(jié)】:

我們要認識到人断,雖然kafka事務(wù)消息提供了多個消息原子寫的保證,但它不保證原子讀朝蜘。

例如恶迈,

1)事務(wù)向topic_a和topic_b兩個分區(qū)寫入消息,在事務(wù)提交后的某個時刻谱醇,topic_a的全部副本失效暇仲。這時topic_b中的消息可以正常消費,但topic_a中的消息就丟失了副渴。

2)假如consumer只消費了topic_a奈附,沒有消費topic_b,這樣也不能讀到完整的事務(wù)消息煮剧。

3)典型的kafka stream應(yīng)用從多個topic消費斥滤,然后向一個或多個topic寫将鸵。在一次故障后,kafka stream應(yīng)用重新開始處理流數(shù)據(jù)佑颇,由于從多個topic讀到的數(shù)據(jù)之間不存在穩(wěn)定的順序(即便只有一個topic顶掉,從多個分區(qū)讀到的數(shù)據(jù)之間也沒有穩(wěn)定的順序),那么兩次處理輸出的結(jié)果就可能會不一樣挑胸。

也就是說痒筒,雖然kafka log持久化了數(shù)據(jù),也可以通過指定offset多次消費數(shù)據(jù)茬贵,但由于分區(qū)數(shù)據(jù)之間的無序性簿透,導(dǎo)致每次處理輸出的結(jié)果都是不同的。這使得kafka stream不能像hadoop批處理任務(wù)一樣闷沥,可以隨時重新執(zhí)行萎战,保證每次執(zhí)行的結(jié)果相同。除非我們只從一個topic分區(qū)讀數(shù)據(jù)舆逃。

【參考】:

[0] https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer
[1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-4.1AddPartitionsToTxnRequest
[2]https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#
[3]https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
[4]https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
[5]https://www.confluent.io/blog/transactions-apache-kafka/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蚂维,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子路狮,更是在濱河造成了極大的恐慌虫啥,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,589評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件奄妨,死亡現(xiàn)場離奇詭異涂籽,居然都是意外死亡,警方通過查閱死者的電腦和手機砸抛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,615評論 3 396
  • 文/潘曉璐 我一進店門评雌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人直焙,你說我怎么就攤上這事景东。” “怎么了奔誓?”我有些...
    開封第一講書人閱讀 165,933評論 0 356
  • 文/不壞的土叔 我叫張陵斤吐,是天一觀的道長。 經(jīng)常有香客問我厨喂,道長和措,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,976評論 1 295
  • 正文 為了忘掉前任蜕煌,我火速辦了婚禮派阱,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘斜纪。我一直安慰自己颁褂,他們只是感情好故响,可當(dāng)我...
    茶點故事閱讀 67,999評論 6 393
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著颁独,像睡著了一般。 火紅的嫁衣襯著肌膚如雪伪冰。 梳的紋絲不亂的頭發(fā)上誓酒,一...
    開封第一講書人閱讀 51,775評論 1 307
  • 那天,我揣著相機與錄音贮聂,去河邊找鬼靠柑。 笑死,一個胖子當(dāng)著我的面吹牛吓懈,可吹牛的內(nèi)容都是我干的歼冰。 我是一名探鬼主播,決...
    沈念sama閱讀 40,474評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼耻警,長吁一口氣:“原來是場噩夢啊……” “哼隔嫡!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起甘穿,我...
    開封第一講書人閱讀 39,359評論 0 276
  • 序言:老撾萬榮一對情侶失蹤腮恩,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后温兼,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體秸滴,經(jīng)...
    沈念sama閱讀 45,854評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,007評論 3 338
  • 正文 我和宋清朗相戀三年募判,在試婚紗的時候發(fā)現(xiàn)自己被綠了荡含。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,146評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡届垫,死狀恐怖释液,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情敦腔,我是刑警寧澤均澳,帶...
    沈念sama閱讀 35,826評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站符衔,受9級特大地震影響找前,放射性物質(zhì)發(fā)生泄漏错沃。R本人自食惡果不足惜崭倘,卻給世界環(huán)境...
    茶點故事閱讀 41,484評論 3 331
  • 文/蒙蒙 一脸哀、第九天 我趴在偏房一處隱蔽的房頂上張望橱赠。 院中可真熱鬧里覆,春花似錦吧寺、人聲如沸论泛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,029評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽界斜。三九已至仿耽,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間各薇,已是汗流浹背项贺。 一陣腳步聲響...
    開封第一講書人閱讀 33,153評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留峭判,地道東北人开缎。 一個月前我還...
    沈念sama閱讀 48,420評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像林螃,于是被迫代替她去往敵國和親奕删。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,107評論 2 356