kafka0.11Exactly Once語(yǔ)義與事務(wù)機(jī)制原理介紹

http://www.aboutyun.com/thread-24367-1-1.html

問(wèn)題導(dǎo)讀

1.為什么要提供事務(wù)機(jī)制?

2.Exactly Once出現(xiàn)在哪個(gè)版本沐扳?

3.Exactly Once實(shí)現(xiàn)的原理是什么禁荒?

為什么要提供事務(wù)機(jī)制

Kafka事務(wù)機(jī)制的實(shí)現(xiàn)主要是為了支持

Exactly Once即正好一次語(yǔ)義

操作的原子性

有狀態(tài)操作的可恢復(fù)性

Exactly Once

《Kafka背景及架構(gòu)介紹》一文中有說(shuō)明Kafka在0.11.0.0之前的版本中只支持At Least Once和At Most Once語(yǔ)義,尚不支持Exactly Once語(yǔ)義。

但是在很多要求嚴(yán)格的場(chǎng)景下,如使用Kafka處理交易數(shù)據(jù)赋元,Exactly Once語(yǔ)義是必須的。我們可以通過(guò)讓下游系統(tǒng)具有冪等性來(lái)配合Kafka的At Least Once語(yǔ)義來(lái)間接實(shí)現(xiàn)Exactly Once。但是:

該方案要求下游系統(tǒng)支持冪等操作搁凸,限制了Kafka的適用場(chǎng)景 實(shí)現(xiàn)門(mén)檻相對(duì)較高媚值,需要用戶對(duì)Kafka的工作機(jī)制非常了解 對(duì)于Kafka Stream而言,Kafka本身即是自己的下游系統(tǒng)护糖,但Kafka在0.11.0.0版本之前不具有冪等發(fā)送能力

因此褥芒,Kafka本身對(duì)Exactly Once語(yǔ)義的支持就非常必要。

操作原子性

操作的原子性是指嫡良,多個(gè)操作要么全部成功要么全部失敗锰扶,不存在部分成功部分失敗的可能。 實(shí)現(xiàn)原子性操作的意義在于: 操作結(jié)果更可控寝受,有助于提升數(shù)據(jù)一致性 便于故障恢復(fù)坷牛。因?yàn)椴僮魇窃拥模瑥墓收现谢謴?fù)時(shí)只需要重試該操作(如果原操作失敽艹巍)或者直接跳過(guò)該操作(如果原操作成功)京闰,而不需要記錄中間狀態(tài),更不需要針對(duì)中間狀態(tài)作特殊處理

實(shí)現(xiàn)事務(wù)機(jī)制的幾個(gè)階段

冪等性發(fā)送

上文提到甩苛,實(shí)現(xiàn)Exactly Once的一種方法是讓下游系統(tǒng)具有冪等處理特性蹂楣,而在Kafka Stream中,Kafka Producer本身就是“下游”系統(tǒng)讯蒲,因此如果能讓Producer具有冪等處理特性捐迫,那就可以讓Kafka Stream在一定程度上支持Exactly once語(yǔ)義。

為了實(shí)現(xiàn)Producer的冪等語(yǔ)義爱葵,Kafka引入了Producer ID(即PID)和Sequence Number。每個(gè)新的Producer在初始化的時(shí)候會(huì)被分配一個(gè)唯一的PID反浓,該P(yáng)ID對(duì)用戶完全透明而不會(huì)暴露給用戶萌丈。

對(duì)于每個(gè)PID,該P(yáng)roducer發(fā)送數(shù)據(jù)的每個(gè)都對(duì)應(yīng)一個(gè)從0開(kāi)始單調(diào)遞增的Sequence Number雷则。 類(lèi)似地辆雾,Broker端也會(huì)為每個(gè)維護(hù)一個(gè)序號(hào),并且每次Commit一條消息時(shí)將其對(duì)應(yīng)序號(hào)遞增月劈。對(duì)于接收的每條消息度迂,如果其序號(hào)比Broker維護(hù)的序號(hào)(即最后一次Commit的消息的序號(hào))大一,則Broker會(huì)接受它猜揪,否則將其丟棄:

如果消息序號(hào)比Broker維護(hù)的序號(hào)大一以上惭墓,說(shuō)明中間有數(shù)據(jù)尚未寫(xiě)入,也即亂序而姐,此時(shí)Broker拒絕該消息腊凶,Producer拋出InvalidSequenceNumber 如果消息序號(hào)小于等于Broker維護(hù)的序號(hào),說(shuō)明該消息已被保存,即為重復(fù)消息钧萍,Broker直接丟棄該消息褐缠,Producer拋出DuplicateSequenceNumber 上述設(shè)計(jì)解決了0.11.0.0之前版本中的兩個(gè)問(wèn)題: Broker保存消息后,發(fā)送ACK前宕機(jī)风瘦,Producer認(rèn)為消息未發(fā)送成功并重試队魏,造成數(shù)據(jù)重復(fù) 前一條消息發(fā)送失敗,后一條消息發(fā)送成功万搔,前一條消息重試后成功胡桨,造成數(shù)據(jù)亂序

事務(wù)性保證

上述冪等設(shè)計(jì)只能保證單個(gè)Producer對(duì)于同一個(gè)的Exactly Once語(yǔ)義。 另外蟹略,它并不能保證寫(xiě)操作的原子性——即多個(gè)寫(xiě)操作登失,要么全部被Commit要么全部不被Commit。 更不能保證多個(gè)讀寫(xiě)操作的的原子性挖炬。尤其對(duì)于Kafka Stream應(yīng)用而言揽浙,典型的操作即是從某個(gè)Topic消費(fèi)數(shù)據(jù),經(jīng)過(guò)一系列轉(zhuǎn)換后寫(xiě)回另一個(gè)Topic意敛,保證從源Topic的讀取與向目標(biāo)Topic的寫(xiě)入的原子性有助于從故障中恢復(fù)馅巷。

事務(wù)保證可使得應(yīng)用程序?qū)⑸a(chǎn)數(shù)據(jù)和消費(fèi)數(shù)據(jù)當(dāng)作一個(gè)原子單元來(lái)處理,要么全部成功草姻,要么全部失敗钓猬,即使該生產(chǎn)或消費(fèi)跨多個(gè)。

另外撩独,有狀態(tài)的應(yīng)用也可以保證重啟后從斷點(diǎn)處繼續(xù)處理敞曹,也即事務(wù)恢復(fù)。 為了實(shí)現(xiàn)這種效果综膀,應(yīng)用程序必須提供一個(gè)穩(wěn)定的(重啟后不變)唯一的ID澳迫,也即Transaction ID。Transactin ID與PID可能一一對(duì)應(yīng)剧劝。區(qū)別在于Transaction ID由用戶提供橄登,而PID是內(nèi)部的實(shí)現(xiàn)對(duì)用戶透明。

另外讥此,為了保證新的Producer啟動(dòng)后拢锹,舊的具有相同Transaction ID的Producer即失效,每次Producer通過(guò)Transaction ID拿到PID的同時(shí)萄喳,還會(huì)獲取一個(gè)單調(diào)遞增的epoch卒稳。由于舊的Producer的epoch比新Producer的epoch小,Kafka可以很容易識(shí)別出該P(yáng)roducer是老的Producer并拒絕其請(qǐng)求他巨。 有了Transaction ID后展哭,Kafka可保證: 跨Session的數(shù)據(jù)冪等發(fā)送湃窍。當(dāng)具有相同Transaction ID的新的Producer實(shí)例被創(chuàng)建且工作時(shí),舊的且擁有相同Transaction ID的Producer將不再工作匪傍。 跨Session的事務(wù)恢復(fù)您市。如果某個(gè)應(yīng)用實(shí)例宕機(jī),新的實(shí)例可以保證任何未完成的舊的事務(wù)要么Commit要么Abort役衡,使得新實(shí)例從一個(gè)正常狀態(tài)開(kāi)始工作茵休。 需要注意的是,上述的事務(wù)保證是從Producer的角度去考慮的手蝎。從Consumer的角度來(lái)看榕莺,該保證會(huì)相對(duì)弱一些。尤其是不能保證所有被某事務(wù)Commit過(guò)的所有消息都被一起消費(fèi)棵介,因?yàn)椋?對(duì)于壓縮的Topic而言钉鸯,同一事務(wù)的某些消息可能被其它版本覆蓋 事務(wù)包含的消息可能分布在多個(gè)Segment中(即使在同一個(gè)Partition內(nèi)),當(dāng)老的Segment被刪除時(shí)邮辽,該事務(wù)的部分?jǐn)?shù)據(jù)可能會(huì)丟失 Consumer在一個(gè)事務(wù)內(nèi)可能通過(guò)seek方法訪問(wèn)任意Offset的消息唠雕,從而可能丟失部分消息 Consumer可能并不需要消費(fèi)某一事務(wù)內(nèi)的所有Partition,因此它將永遠(yuǎn)不會(huì)讀取組成該事務(wù)的所有消息

事務(wù)機(jī)制原理

事務(wù)性消息傳遞

這一節(jié)所說(shuō)的事務(wù)主要指原子性吨述,也即Producer將多條消息作為一個(gè)事務(wù)批量發(fā)送岩睁,要么全部成功要么全部失敗。 為了實(shí)現(xiàn)這一點(diǎn)揣云,Kafka 0.11.0.0引入了一個(gè)服務(wù)器端的模塊捕儒,名為T(mén)ransaction Coordinator,用于管理Producer發(fā)送的消息的事務(wù)性邓夕。 該Transaction Coordinator維護(hù)Transaction Log刘莹,該log存于一個(gè)內(nèi)部的Topic內(nèi)。由于Topic數(shù)據(jù)具有持久性焚刚,因此事務(wù)的狀態(tài)也具有持久性栋猖。 Producer并不直接讀寫(xiě)Transaction Log,它與Transaction Coordinator通信汪榔,然后由Transaction Coordinator將該事務(wù)的狀態(tài)插入相應(yīng)的Transaction Log。 Transaction Log的設(shè)計(jì)與Offset Log用于保存Consumer的Offset類(lèi)似肃拜。

事務(wù)中Offset的提交

許多基于Kafka的應(yīng)用痴腌,尤其是Kafka Stream應(yīng)用中同時(shí)包含Consumer和Producer,前者負(fù)責(zé)從Kafka中獲取消息燃领,后者負(fù)責(zé)將處理完的數(shù)據(jù)寫(xiě)回Kafka的其它Topic中士聪。 為了實(shí)現(xiàn)該場(chǎng)景下的事務(wù)的原子性,Kafka需要保證對(duì)Consumer Offset的Commit與Producer對(duì)發(fā)送消息的Commit包含在同一個(gè)事務(wù)中猛蔽。否則剥悟,如果在二者Commit中間發(fā)生異常灵寺,根據(jù)二者Commit的順序可能會(huì)造成數(shù)據(jù)丟失和數(shù)據(jù)重復(fù): 如果先Commit Producer發(fā)送數(shù)據(jù)的事務(wù)再Commit Consumer的Offset,即At Least Once語(yǔ)義区岗,可能造成數(shù)據(jù)重復(fù)略板。 如果先Commit Consumer的Offset,再Commit Producer數(shù)據(jù)發(fā)送事務(wù)慈缔,即At Most Once語(yǔ)義叮称,可能造成數(shù)據(jù)丟失。

用于事務(wù)特性的控制型消息 為了區(qū)分寫(xiě)入Partition的消息被Commit還是Abort藐鹤,Kafka引入了一種特殊類(lèi)型的消息瓤檐,即Control Message。該類(lèi)消息的Value內(nèi)不包含任何應(yīng)用相關(guān)的數(shù)據(jù)娱节,并且不會(huì)暴露給應(yīng)用程序挠蛉。它只用于Broker與Client間的內(nèi)部通信。 對(duì)于Producer端事務(wù)肄满,Kafka以Control Message的形式引入一系列的Transaction Marker谴古。Consumer即可通過(guò)該標(biāo)記判定對(duì)應(yīng)的消息被Commit了還是Abort了,然后結(jié)合該Consumer配置的隔離級(jí)別決定是否應(yīng)該將該消息返回給應(yīng)用程序悄窃。

事務(wù)處理樣例代碼

[Bash shell] 純文本查看 復(fù)制代碼 ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 Producer producer = new KafkaProducer(props);

// 初始化事務(wù)贼陶,包括結(jié)束該Transaction ID對(duì)應(yīng)的未完成的事務(wù)(如果有) // 保證新的事務(wù)在一個(gè)正確的狀態(tài)下啟動(dòng) producer.initTransactions(); // 開(kāi)始事務(wù) producer.beginTransaction(); // 消費(fèi)數(shù)據(jù) ConsumerRecords records = consumer.poll(100); try{ // 發(fā)送數(shù)據(jù) producer.send(new ProducerRecord("Topic", "Key", "Value"));

// 發(fā)送消費(fèi)數(shù)據(jù)的Offset,將上述數(shù)據(jù)消費(fèi)與數(shù)據(jù)發(fā)送納入同一個(gè)Transaction內(nèi)producer.sendOffsetsToTransaction(offsets,"group1");// 數(shù)據(jù)發(fā)送及Offset發(fā)送均成功的情況下根欧,提交事務(wù)producer.commitTransaction();

} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // 數(shù)據(jù)發(fā)送或者Offset發(fā)送出現(xiàn)異常時(shí)疾忍,終止事務(wù) producer.abortTransaction(); } finally { // 關(guān)閉Producer和Consumer producer.close(); consumer.close(); }

完整事務(wù)過(guò)程

找到Transaction Coordinator 由于Transaction Coordinator是分配PID和管理事務(wù)的核心,因此Producer要做的第一件事情就是通過(guò)向任意一個(gè)Broker發(fā)送FindCoordinator請(qǐng)求找到Transaction Coordinator的位置横媚。

注意:只有應(yīng)用程序?yàn)镻roducer配置了Transaction ID時(shí)才可使用事務(wù)特性纠炮,也才需要這一步。另外灯蝴,由于事務(wù)性要求Producer開(kāi)啟冪等特性恢口,因此通過(guò)將transactional.id設(shè)置為非空從而開(kāi)啟事務(wù)特性的同時(shí)也需要通過(guò)將enable.idempotence設(shè)置為true來(lái)開(kāi)啟冪等特性。

獲取PID 找到Transaction Coordinator后穷躁,具有冪等特性的Producer必須發(fā)起InitPidRequest請(qǐng)求以獲取PID耕肩。 注意:只要開(kāi)啟了冪等特性即必須執(zhí)行該操作,而無(wú)須考慮該P(yáng)roducer是否開(kāi)啟了事務(wù)特性问潭。 如果事務(wù)特性被開(kāi)啟 InitPidRequest會(huì)發(fā)送給Transaction Coordinator猿诸。如果Transaction Coordinator是第一次收到包含有該Transaction ID的InitPidRequest請(qǐng)求,它將會(huì)把該存入Transaction Log狡忙,如上圖中步驟2.1所示梳虽。這樣可保證該對(duì)應(yīng)關(guān)系被持久化,從而保證即使Transaction Coordinator宕機(jī)該對(duì)應(yīng)關(guān)系也不會(huì)丟失灾茁。 除了返回PID外窜觉,InitPidRequest還會(huì)執(zhí)行如下任務(wù): 增加該P(yáng)ID對(duì)應(yīng)的epoch谷炸。具有相同PID但epoch小于該epoch的其它Producer(如果有)新開(kāi)啟的事務(wù)將被拒絕。 恢復(fù)(Commit或Abort)之前的Producer未完成的事務(wù)(如果有)禀挫。 注意:InitPidRequest的處理過(guò)程是同步阻塞的旬陡。一旦該調(diào)用正確返回,Producer即可開(kāi)始新的事務(wù)特咆。 另外季惩,如果事務(wù)特性未開(kāi)啟,InitPidRequest可發(fā)送至任意Broker腻格,并且會(huì)得到一個(gè)全新的唯一的PID画拾。該P(yáng)roducer將只能使用冪等特性以及單一Session內(nèi)的事務(wù)特性,而不能使用跨Session的事務(wù)特性菜职。

開(kāi)啟事務(wù) Kafka從0.11.0.0版本開(kāi)始青抛,提供beginTransaction()方法用于開(kāi)啟一個(gè)事務(wù)。調(diào)用該方法后酬核,Producer本地會(huì)記錄已經(jīng)開(kāi)啟了事務(wù)蜜另,但Transaction Coordinator只有在Producer發(fā)送第一條消息后才認(rèn)為事務(wù)已經(jīng)開(kāi)啟。

Consume-Transform-Produce 這一階段嫡意,包含了整個(gè)事務(wù)的數(shù)據(jù)處理過(guò)程举瑰,并且包含了多種請(qǐng)求。

AddPartitionsToTxnRequest 一個(gè)Producer可能會(huì)給多個(gè)發(fā)送數(shù)據(jù)蔬螟,給一個(gè)新的發(fā)送數(shù)據(jù)前此迅,它需要先向Transaction Coordinator發(fā)送AddPartitionsToTxnRequest。 Transaction Coordinator會(huì)將該存于Transaction Log內(nèi)旧巾,并將其狀態(tài)置為BEGIN耸序,如上圖中步驟4.1所示。有了該信息后鲁猩,我們才可以在后續(xù)步驟中為每個(gè)Topic, Partition>設(shè)置COMMIT或者ABORT標(biāo)記(如上圖中步驟5.2所示)坎怪。 另外,如果該為該事務(wù)中第一個(gè)廓握,Transaction Coordinator還會(huì)啟動(dòng)對(duì)該事務(wù)的計(jì)時(shí)(每個(gè)事務(wù)都有自己的超時(shí)時(shí)間)搅窿。 ProduceRequest Producer通過(guò)一個(gè)或多個(gè)ProduceRequest發(fā)送一系列消息。除了應(yīng)用數(shù)據(jù)外隙券,該請(qǐng)求還包含了PID男应,epoch,和Sequence Number是尔。該過(guò)程如上圖中步驟4.2所示。

AddOffsetsToTxnRequest 為了提供事務(wù)性开仰,Producer新增了sendOffsetsToTransaction方法拟枚,該方法將多組消息的發(fā)送和消費(fèi)放入同一批處理內(nèi)薪铜。 該方法先判斷在當(dāng)前事務(wù)中該方法是否已經(jīng)被調(diào)用并傳入了相同的Group ID。若是恩溅,直接跳到下一步隔箍;若不是,則向Transaction Coordinator發(fā)送AddOffsetsToTxnRequests請(qǐng)求脚乡,Transaction Coordinator將對(duì)應(yīng)的所有存于Transaction Log中蜒滩,并將其狀態(tài)記為BEGIN,如上圖中步驟4.3所示奶稠。該方法會(huì)阻塞直到收到響應(yīng)俯艰。

TxnOffsetCommitRequest 作為sendOffsetsToTransaction方法的一部分,在處理完AddOffsetsToTxnRequest后锌订,Producer也會(huì)發(fā)送TxnOffsetCommit請(qǐng)求給Consumer Coordinator從而將本事務(wù)包含的與讀操作相關(guān)的各的Offset持久化到內(nèi)部的__consumer_offsets中竹握,如上圖步驟4.4所示。 在此過(guò)程中辆飘,Consumer Coordinator會(huì)通過(guò)PID和對(duì)應(yīng)的epoch來(lái)驗(yàn)證是否應(yīng)該允許該P(yáng)roducer的該請(qǐng)求啦辐。 這里需要注意: 寫(xiě)入__consumer_offsets的Offset信息在當(dāng)前事務(wù)Commit前對(duì)外是不可見(jiàn)的。也即在當(dāng)前事務(wù)被Commit前蜈项,可認(rèn)為該Offset尚未Commit芹关,也即對(duì)應(yīng)的消息尚未被完成處理。 Consumer Coordinator并不會(huì)立即更新緩存中相應(yīng)的Offset紧卒,因?yàn)榇藭r(shí)這些更新操作尚未被COMMIT或ABORT侥衬。

Commit或Abort事務(wù) 一旦上述數(shù)據(jù)寫(xiě)入操作完成,應(yīng)用程序必須調(diào)用KafkaProducer的commitTransaction方法或者abortTransaction方法以結(jié)束當(dāng)前事務(wù)常侦。

EndTxnRequest

commitTransaction方法使得Producer寫(xiě)入的數(shù)據(jù)對(duì)下游Consumer可見(jiàn)浇冰。abortTransaction方法通過(guò)Transaction Marker將Producer寫(xiě)入的數(shù)據(jù)標(biāo)記為Aborted狀態(tài)。下游的Consumer如果將isolation.level設(shè)置為READ_COMMITTED聋亡,則它讀到被Abort的消息后直接將其丟棄而不會(huì)返回給客戶程序肘习,也即被Abort的消息對(duì)應(yīng)用程序不可見(jiàn)。

無(wú)論是Commit還是Abort坡倔,Producer都會(huì)發(fā)送EndTxnRequest請(qǐng)求給Transaction Coordinator漂佩,并通過(guò)標(biāo)志位標(biāo)識(shí)是應(yīng)該Commit還是Abort。 收到該請(qǐng)求后罪塔,Transaction Coordinator會(huì)進(jìn)行如下操作 將PREPARE_COMMIT或PREPARE_ABORT消息寫(xiě)入Transaction Log投蝉,如上圖中步驟5.1所示 通過(guò)WriteTxnMarker請(qǐng)求以Transaction Marker的形式將COMMIT或ABORT信息寫(xiě)入用戶數(shù)據(jù)日志以及Offset Log中,如上圖中步驟5.2所示 最后將COMPLETE_COMMIT或COMPLETE_ABORT信息寫(xiě)入Transaction Log中征堪,如上圖中步驟5.3所示

補(bǔ)充說(shuō)明:對(duì)于commitTransaction方法瘩缆,它會(huì)在發(fā)送EndTxnRequest之前先調(diào)用flush方法以確保所有發(fā)送出去的數(shù)據(jù)都得到相應(yīng)的ACK。對(duì)于abortTransaction方法佃蚜,在發(fā)送EndTxnRequest之前直接將當(dāng)前Buffer中的事務(wù)性消息(如果有)全部丟棄庸娱,但必須等待所有被發(fā)送但尚未收到ACK的消息發(fā)送完成着绊。

上述第二步是實(shí)現(xiàn)將一組讀操作與寫(xiě)操作作為一個(gè)事務(wù)處理的關(guān)鍵。因?yàn)镻roducer寫(xiě)入的數(shù)據(jù)Topic以及記錄Comsumer Offset的Topic會(huì)被寫(xiě)入相同的Transactin Marker熟尉,所以這一組讀操作與寫(xiě)操作要么全部COMMIT要么全部ABORT归露。

WriteTxnMarkerRequest 上面提到的WriteTxnMarkerRequest由Transaction Coordinator發(fā)送給當(dāng)前事務(wù)涉及到的每個(gè)的Leader。收到該請(qǐng)求后斤儿,對(duì)應(yīng)的Leader會(huì)將對(duì)應(yīng)的COMMIT(PID)或者ABORT(PID)控制信息寫(xiě)入日志剧包,如上圖中步驟5.2所示。

該控制消息向Broker以及Consumer表明對(duì)應(yīng)PID的消息被Commit了還是被Abort了往果。

這里要注意疆液,如果事務(wù)也涉及到__consumer_offsets,即該事務(wù)中有消費(fèi)數(shù)據(jù)的操作且將該消費(fèi)的Offset存于__consumer_offsets中棚放,Transaction Coordinator也需要向該內(nèi)部Topic的各Partition的Leader發(fā)送WriteTxnMarkerRequest從而寫(xiě)入COMMIT(PID)或COMMIT(PID)控制信息枚粘。

寫(xiě)入最終的COMPLETE_COMMIT或COMPLETE_ABORT消息 寫(xiě)完所有的Transaction Marker后,Transaction Coordinator會(huì)將最終的COMPLETE_COMMIT或COMPLETE_ABORT消息寫(xiě)入Transaction Log中以標(biāo)明該事務(wù)結(jié)束飘蚯,如上圖中步驟5.3所示馍迄。

此時(shí),Transaction Log中所有關(guān)于該事務(wù)的消息全部可以移除局骤。當(dāng)然攀圈,由于Kafka內(nèi)數(shù)據(jù)是Append Only的,不可直接更新和刪除峦甩,這里說(shuō)的移除只是將其標(biāo)記為null從而在Log Compact時(shí)不再保留赘来。

另外,COMPLETE_COMMIT或COMPLETE_ABORT的寫(xiě)入并不需要得到所有Rreplica的ACK凯傲,因?yàn)槿绻撓G失犬辰,可以根據(jù)事務(wù)協(xié)議重發(fā)。 補(bǔ)充說(shuō)明冰单,如果參與該事務(wù)的某些在被寫(xiě)入Transaction Marker前不可用幌缝,它對(duì)READ_COMMITTED的Consumer不可見(jiàn),但不影響其它可用的COMMIT或ABORT诫欠。在該恢復(fù)可用后涵卵,Transaction Coordinator會(huì)重新根據(jù)PREPARE_COMMIT或PREPARE_ABORT向該發(fā)送Transaction Marker。

總結(jié) PID與Sequence Number的引入實(shí)現(xiàn)了寫(xiě)操作的冪等性 寫(xiě)操作的冪等性結(jié)合At Least Once語(yǔ)義實(shí)現(xiàn)了單一Session內(nèi)的Exactly Once語(yǔ)義 Transaction Marker與PID提供了識(shí)別消息是否應(yīng)該被讀取的能力荒叼,從而實(shí)現(xiàn)了事務(wù)的隔離性 Offset的更新標(biāo)記了消息是否被讀取轿偎,從而將對(duì)讀操作的事務(wù)處理轉(zhuǎn)換成了對(duì)寫(xiě)(Offset)操作的事務(wù)處理 Kafka事務(wù)的本質(zhì)是,將一組寫(xiě)操作(如果有)對(duì)應(yīng)的消息與一組讀操作(如果有)對(duì)應(yīng)的Offset的更新進(jìn)行同樣的標(biāo)記(即Transaction Marker)來(lái)實(shí)現(xiàn)事務(wù)中涉及的所有讀寫(xiě)操作同時(shí)對(duì)外可見(jiàn)或同時(shí)對(duì)外不可見(jiàn) Kafka只提供對(duì)Kafka本身的讀寫(xiě)操作的事務(wù)性被廓,不提供包含外部系統(tǒng)的事務(wù)性 異常處理

Exception處理 InvalidProducerEpoch 這是一種Fatal Error坏晦,它說(shuō)明當(dāng)前Producer是一個(gè)過(guò)期的實(shí)例,有Transaction ID相同但epoch更新的Producer實(shí)例被創(chuàng)建并使用。此時(shí)Producer會(huì)停止并拋出Exception昆婿。

InvalidPidMapping Transaction Coordinator沒(méi)有與該Transaction ID對(duì)應(yīng)的PID间护。此時(shí)Producer會(huì)通過(guò)包含有Transaction ID的InitPidRequest請(qǐng)求創(chuàng)建一個(gè)新的PID。 NotCorrdinatorForGTransactionalId 該Transaction Coordinator不負(fù)責(zé)該當(dāng)前事務(wù)挖诸。Producer會(huì)通過(guò)FindCoordinatorRequest請(qǐng)求重新尋找對(duì)應(yīng)的Transaction Coordinator。

InvalidTxnRequest 違反了事務(wù)協(xié)議法精。正確的Client實(shí)現(xiàn)不應(yīng)該出現(xiàn)這種Exception多律。如果該異常發(fā)生了,用戶需要檢查自己的客戶端實(shí)現(xiàn)是否有問(wèn)題搂蜓。

CoordinatorNotAvailable Transaction Coordinator仍在初始化中狼荞。Producer只需要重試即可。

DuplicateSequenceNumber 發(fā)送的消息的序號(hào)低于Broker預(yù)期帮碰。該異常說(shuō)明該消息已經(jīng)被成功處理過(guò)相味,Producer可以直接忽略該異常并處理下一條消息

InvalidSequenceNumber 這是一個(gè)Fatal Error,它說(shuō)明發(fā)送的消息中的序號(hào)大于Broker預(yù)期殉挽。此時(shí)有兩種可能 數(shù)據(jù)亂序丰涉。比如前面的消息發(fā)送失敗后重試期間,新的消息被接收斯碌。正常情況下不應(yīng)該出現(xiàn)該問(wèn)題一死,因?yàn)楫?dāng)冪等發(fā)送啟用時(shí),max.inflight.requests.per.connection被強(qiáng)制設(shè)置為1傻唾,而acks被強(qiáng)制設(shè)置為all投慈。故前面消息重試期間,后續(xù)消息不會(huì)被發(fā)送冠骄,也即不會(huì)發(fā)生亂序伪煤。并且只有ISR中所有Replica都ACK,Producer才會(huì)認(rèn)為消息已經(jīng)被發(fā)送凛辣,也即不存在Broker端數(shù)據(jù)丟失問(wèn)題抱既。 服務(wù)器由于日志被Truncate而造成數(shù)據(jù)丟失。此時(shí)應(yīng)該停止Producer并將此Fatal Error報(bào)告給用戶蟀给。

InvalidTransactionTimeout InitPidRequest調(diào)用出現(xiàn)的Fatal Error蝙砌。它表明Producer傳入的timeout時(shí)間不在可接受范圍內(nèi),應(yīng)該停止Producer并報(bào)告給用戶跋理。

處理Transaction Coordinator失敗

寫(xiě)PREPARE_COMMIT/PREPARE_ABORT前失敗 Producer通過(guò)FindCoordinatorRequest找到新的Transaction Coordinator择克,并通過(guò)EndTxnRequest請(qǐng)求發(fā)起COMMIT或ABORT流程,新的Transaction Coordinator繼續(xù)處理EndTxnRequest請(qǐng)求——寫(xiě)PREPARE_COMMIT或PREPARE_ABORT前普,寫(xiě)Transaction Marker肚邢,寫(xiě)COMPLETE_COMMIT或COMPLETE_ABORT。

寫(xiě)完P(guān)REPARE_COMMIT/PREPARE_ABORT后失敗 此時(shí)舊的Transaction Coordinator可能已經(jīng)成功寫(xiě)入部分Transaction Marker。新的Transaction Coordinator會(huì)重復(fù)這些操作骡湖,所以部分Partition中可能會(huì)存在重復(fù)的COMMIT或ABORT贱纠,但只要該P(yáng)roducer在此期間沒(méi)有發(fā)起新的事務(wù),這些重復(fù)的Transaction Marker就不是問(wèn)題响蕴。

寫(xiě)完COMPLETE_COMMIT/ABORT后失敗 舊的Transaction Coordinator可能已經(jīng)寫(xiě)完了COMPLETE_COMMIT或COMPLETE_ABORT但在返回EndTxnRequest之前失敗谆焊。該場(chǎng)景下,新的Transaction Coordinator會(huì)直接給Producer返回成功浦夷。

事務(wù)過(guò)期機(jī)制

事務(wù)超時(shí)

transaction.timeout.ms?終止過(guò)期事務(wù) 當(dāng)Producer失敗時(shí)辖试,Transaction Coordinator必須能夠主動(dòng)的讓某些進(jìn)行中的事務(wù)過(guò)期。否則沒(méi)有Producer的參與劈狐,Transaction Coordinator無(wú)法判斷這些事務(wù)應(yīng)該如何處理罐孝,這會(huì)造成: 如果這種進(jìn)行中事務(wù)太多,會(huì)造成Transaction Coordinator需要維護(hù)大量的事務(wù)狀態(tài)肥缔,大量占用內(nèi)存 Transaction Log內(nèi)也會(huì)存在大量數(shù)據(jù)莲兢,造成新的Transaction Coordinator啟動(dòng)緩慢 READ_COMMITTED的Consumer需要緩存大量的消息,造成不必要的內(nèi)存浪費(fèi)甚至是OOM 如果多個(gè)Transaction ID不同的Producer交叉寫(xiě)同一個(gè)Partition续膳,當(dāng)一個(gè)Producer的事務(wù)狀態(tài)不更新時(shí)改艇,READ_COMMITTED的Consumer為了保證順序消費(fèi)而被阻塞 為了避免上述問(wèn)題,Transaction Coordinator會(huì)周期性遍歷內(nèi)存中的事務(wù)狀態(tài)Map坟岔,并執(zhí)行如下操作?如果狀態(tài)是BEGIN并且其最后更新時(shí)間與當(dāng)前時(shí)間差大于transaction.remove.expired.transaction.cleanup.interval.ms(默認(rèn)值為1小時(shí))遣耍,則主動(dòng)將其終止:1)未避免原Producer臨時(shí)恢復(fù)與當(dāng)前終止流程沖突,增加該P(yáng)roducer對(duì)應(yīng)的PID的epoch炮车,并確保將該更新的信息寫(xiě)入Transaction Log舵变;2)以更新后的epoch回滾事務(wù),從而使得該事務(wù)相關(guān)的所有Broker都更新其緩存的該P(yáng)ID的epoch從而拒絕舊Producer的寫(xiě)操作 如果狀態(tài)是PREPARE_COMMIT瘦穆,完成后續(xù)的COMMIT流程————向各寫(xiě)入Transaction Marker纪隙,在Transaction Log內(nèi)寫(xiě)入COMPLETE_COMMIT 如果狀態(tài)是PREPARE_ABORT,完成后續(xù)ABORT流程

終止Transaction ID

某Transaction ID的Producer可能很長(zhǎng)時(shí)間不再發(fā)送數(shù)據(jù)扛或,Transaction Coordinator沒(méi)必要再保存該Transaction ID與PID等的映射绵咱,否則可能會(huì)造成大量的資源浪費(fèi)。因此需要有一個(gè)機(jī)制探測(cè)不再活躍的Transaction ID并將其信息刪除熙兔。 Transaction Coordinator會(huì)周期性遍歷內(nèi)存中的Transaction ID與PID映射悲伶,如果某Transaction?ID沒(méi)有對(duì)應(yīng)的正在進(jìn)行中的事務(wù)并且它對(duì)應(yīng)的最后一個(gè)事務(wù)的結(jié)束時(shí)間與當(dāng)前時(shí)間差大于transactional.id.expiration.ms(默認(rèn)值是7天),則將其從內(nèi)存中刪除并在Transaction Log中將其對(duì)應(yīng)的日志的值設(shè)置為null從而使得Log Compact可將其記錄刪除住涉。

與其它系統(tǒng)事務(wù)機(jī)制對(duì)比

PostgreSQL MVCC

Kafka的事務(wù)機(jī)制與《MVCC PostgreSQL實(shí)現(xiàn)事務(wù)和多版本并發(fā)控制的精華》一文中介紹的PostgreSQL通過(guò)MVCC實(shí)現(xiàn)事務(wù)的機(jī)制非常類(lèi)似麸锉,對(duì)于事務(wù)的回滾,并不需要?jiǎng)h除已寫(xiě)入的數(shù)據(jù)舆声,都是將寫(xiě)入數(shù)據(jù)的事務(wù)標(biāo)記為Rollback/Abort從而在讀數(shù)據(jù)時(shí)過(guò)濾該數(shù)據(jù)花沉。

兩階段提交

Kafka的事務(wù)機(jī)制與《分布式事務(wù)(一)兩階段提交及JTA》一文中所介紹的兩階段提交機(jī)制看似相似柳爽,都分PREPARE階段和最終COMMIT階段,但又有很大不同碱屁。 Kafka事務(wù)機(jī)制中磷脯,PREPARE時(shí)即要指明是PREPARE_COMMIT還是PREPARE_ABORT,并且只須在Transaction Log中標(biāo)記即可娩脾,無(wú)須其它組件參與赵誓。而兩階段提交的PREPARE需要發(fā)送給所有的分布式事務(wù)參與方,并且事務(wù)參與方需要盡可能準(zhǔn)備好柿赊,并根據(jù)準(zhǔn)備情況返回Prepared或Non-Prepared狀態(tài)給事務(wù)管理器架曹。 Kafka事務(wù)中,一但發(fā)起PREPARE_COMMIT或PREPARE_ABORT闹瞧,則確定該事務(wù)最終的結(jié)果應(yīng)該是被COMMIT或ABORT。而分布式事務(wù)中展辞,PREPARE后由各事務(wù)參與方返回狀態(tài)奥邮,只有所有參與方均返回Prepared狀態(tài)才會(huì)真正執(zhí)行COMMIT,否則執(zhí)行ROLLBACK Kafka事務(wù)機(jī)制中罗珍,某幾個(gè)Partition在COMMIT或ABORT過(guò)程中變?yōu)椴豢捎们⑾伲挥绊懺揚(yáng)artition不影響其它Partition。兩階段提交中覆旱,若唯一收到COMMIT命令參與者Crash蘸朋,其它事務(wù)參與方無(wú)法判斷事務(wù)狀態(tài)從而使得整個(gè)事務(wù)阻塞 Kafka事務(wù)機(jī)制引入事務(wù)超時(shí)機(jī)制,有效避免了掛起的事務(wù)影響其它事務(wù)的問(wèn)題 Kafka事務(wù)機(jī)制中存在多個(gè)Transaction Coordinator實(shí)例扣唱,而分布式事務(wù)中只有一個(gè)事務(wù)管理器

Zookeeper

Zookeeper的原子廣播協(xié)議與兩階段提交以及Kafka事務(wù)機(jī)制有相似之處藕坯,但又有各自的特點(diǎn) Kafka事務(wù)可COMMIT也可ABORT。而Zookeeper原子廣播協(xié)議只有COMMIT沒(méi)有ABORT噪沙。當(dāng)然炼彪,Zookeeper不COMMIT某消息也即等效于ABORT該消息的更新。 Kafka存在多個(gè)Transaction Coordinator實(shí)例正歼,擴(kuò)展性較好辐马。而Zookeeper寫(xiě)操作只能在Leader節(jié)點(diǎn)進(jìn)行,所以其寫(xiě)性能遠(yuǎn)低于讀性能局义。 Kafka事務(wù)是COMMIT還是ABORT完全取決于Producer即客戶端喜爷。而Zookeeper原子廣播協(xié)議中某條消息是否被COMMIT取決于是否有一大半FOLLOWER ACK該消息。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末萄唇,一起剝皮案震驚了整個(gè)濱河市檩帐,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌另萤,老刑警劉巖轿塔,帶你破解...
    沈念sama閱讀 216,496評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡勾缭,警方通過(guò)查閱死者的電腦和手機(jī)揍障,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)俩由,“玉大人毒嫡,你說(shuō)我怎么就攤上這事』锰荩” “怎么了兜畸?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,632評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)碘梢。 經(jīng)常有香客問(wèn)我咬摇,道長(zhǎng),這世上最難降的妖魔是什么煞躬? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,180評(píng)論 1 292
  • 正文 為了忘掉前任肛鹏,我火速辦了婚禮,結(jié)果婚禮上恩沛,老公的妹妹穿的比我還像新娘在扰。我一直安慰自己,他們只是感情好雷客,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,198評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布芒珠。 她就那樣靜靜地躺著,像睡著了一般搅裙。 火紅的嫁衣襯著肌膚如雪皱卓。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,165評(píng)論 1 299
  • 那天部逮,我揣著相機(jī)與錄音好爬,去河邊找鬼。 笑死甥啄,一個(gè)胖子當(dāng)著我的面吹牛存炮,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蜈漓,決...
    沈念sama閱讀 40,052評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼穆桂,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了融虽?” 一聲冷哼從身側(cè)響起享完,我...
    開(kāi)封第一講書(shū)人閱讀 38,910評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎有额,沒(méi)想到半個(gè)月后般又,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體彼绷,經(jīng)...
    沈念sama閱讀 45,324評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,542評(píng)論 2 332
  • 正文 我和宋清朗相戀三年茴迁,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了寄悯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,711評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡堕义,死狀恐怖猜旬,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情倦卖,我是刑警寧澤洒擦,帶...
    沈念sama閱讀 35,424評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站怕膛,受9級(jí)特大地震影響熟嫩,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜褐捻,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,017評(píng)論 3 326
  • 文/蒙蒙 一掸茅、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧舍扰,春花似錦、人聲如沸希坚。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,668評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)裁僧。三九已至个束,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間聊疲,已是汗流浹背茬底。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,823評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留获洲,地道東北人阱表。 一個(gè)月前我還...
    沈念sama閱讀 47,722評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像贡珊,于是被迫代替她去往敵國(guó)和親最爬。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,611評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,721評(píng)論 13 425
  • [TOC] 簡(jiǎn)介 kafka是一個(gè)分布式消息隊(duì)列糠悯。具有高性能帮坚、持久化、多副本備份互艾、橫向擴(kuò)展能力试和。生產(chǎn)者往隊(duì)列里寫(xiě)消...
    123archu閱讀 290,117評(píng)論 55 355
  • Kafka簡(jiǎn)介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)忘朝。主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O(1)的方...
    Alukar閱讀 3,079評(píng)論 0 43
  • 香草 年齡20的她灰署,心理停滯在7歲。傻傻的他陰差陽(yáng)錯(cuò)地要追她局嘁,發(fā)現(xiàn)她真實(shí)智商無(wú)法接受而離開(kāi)溉箕,卻又不忍心在馬路...
    靜待雪絨花綻放閱讀 396評(píng)論 0 0
  • 微信公眾號(hào)開(kāi)發(fā)中,微信平臺(tái)向第三方服務(wù)器轉(zhuǎn)發(fā)的消息悦昵,在處理之前肴茄,是應(yīng)該進(jìn)行校驗(yàn)的。這一點(diǎn)但指,稍有安全常識(shí)的開(kāi)發(fā)者都應(yīng)...
    ButteredCat閱讀 596評(píng)論 0 0