分布式事務(wù) golang saga 模式實現(xiàn)
分布式事務(wù)介紹
在分布式系統(tǒng)環(huán)境下由不同的節(jié)點之間通過網(wǎng)絡(luò)遠程協(xié)作完成的事務(wù)稱之為分布式事務(wù)兼贡。也就是事務(wù)的參與者、管理者吼具、資源及資源管理者分別位于分布式系統(tǒng)的不同節(jié)點之上赡勘。分布式事務(wù)的作用就是用于保證不同節(jié)點中的數(shù)據(jù)一致性。
例如典型的創(chuàng)建訂單業(yè)務(wù)邏輯由以下幾個部分構(gòu)成:
- 倉庫服務(wù):扣減特定商品庫存數(shù)量
- 訂單服務(wù):生成訂單
分布式事務(wù)的作用就是保證數(shù)據(jù)在訂單服務(wù)和倉庫服務(wù)這個整體上的一致性楼熄。不能說訂單生成了忆绰,消耗了10件商品,而倉庫服務(wù)中的庫存說了沒有一致性的改變可岂。
分布式事務(wù)相關(guān)協(xié)議:
- 2PC错敢,Two Phases Commit,二階段提交
- 3PC缕粹,Three Phases Commit稚茅,三階段提交
- 最終一致性
典型的分布式事務(wù)模式纸淮,依據(jù)一致性由強到弱排序如下:
- XA,eXtended Architecture亚享, 數(shù)據(jù)庫層面的分布式事務(wù)規(guī)范咽块,目前主流數(shù)據(jù)庫基本都支持 XA 事務(wù),已知方案中的最強的一致性方案
- TCC欺税,Try侈沪、Confirm、Cancel魄衅,應(yīng)用(服務(wù))層參與的 2PC 方案
- 事務(wù)消息峭竣,利用消息隊列異步確保事務(wù)的一致性
- Saga,將長事務(wù)拆分為多個本地短事務(wù)協(xié)調(diào)執(zhí)行晃虫,若某個短事務(wù)失敗皆撩,則反順序調(diào)用補償(undo)操作
- 最大努力通知,發(fā)起通知方通過一定的機制最大努力將事務(wù)結(jié)果通知到接收方哲银,來保證一致性
還有幾個事實模式扛吞,特定產(chǎn)品中實現(xiàn)的模式:
- Seata 實現(xiàn) AT 模式
- DTM 推出的二階段消息和 Workflow 模式
常見的實現(xiàn)分布式事務(wù)的產(chǎn)品(解決方案):
- Seata,支持 TCC荆责、Saga 和 AT(Seata(阿里團隊)實現(xiàn))模式滥比,提供 Seata-go 支持 Go 技術(shù)棧
- DTM,支持 XA做院,Saga盲泛、TCC、和 DTM 推出的 二階段消息和 Workflow 模式键耕,基于 Go 開發(fā)
分布式事務(wù)協(xié)議
2PC
二階段提交協(xié)議(Two-phase Commit寺滚,即 2PC)是常用的分布式事務(wù)協(xié)議,即將事務(wù)的提交過程分為準備階段和提交階段兩個階段來進行處理屈雄。通過引入?yún)f(xié)調(diào)者(Coordinator)來協(xié)調(diào)參與者的行為村视,并最終決定這些參與者是否要真正執(zhí)行事務(wù)。
2PC 中的兩種角色:
- 事務(wù)協(xié)調(diào)者(事務(wù)管理器)酒奶,Coordinator:事務(wù)的發(fā)起者
- 事務(wù)參與者(資源管理器), Resource Manager:事務(wù)的執(zhí)行者
2PC 中的兩個階段:
- 準備階段蚁孔,Prepare
- 協(xié)調(diào)者向各個參與者發(fā)出準備執(zhí)行事務(wù)指令
- 參與者執(zhí)行但不提交事務(wù),數(shù)據(jù)庫中的體現(xiàn)就是將操作記錄在 redo/undo 日志中
- 參與者將執(zhí)行結(jié)果反饋給協(xié)調(diào)者惋嚎,成功 yes杠氢,失敗 no
- 提交階段,Commit
- 若協(xié)調(diào)者收到全部的參與者的 yes 反饋另伍,則向參與者發(fā)送 commit 指令
- 若協(xié)調(diào)者未收到全部參與者的 yes 反饋(有的反饋no修然,有的超時未反饋),則向參與者發(fā)送 rollback 指令
- 參與者根據(jù)收到的 commit 或 rollback 指令质况,完成事務(wù)的提交或者回滾
- 協(xié)調(diào)者收到全部參與者的 commit 或 rollback 的 ack愕宋,完成事務(wù)
prepare 階段全部反饋為 yes 情況,如圖所示:
prepare 階段未全部反饋為 yes 情況结榄,如圖所示:
2PC 的中心思想就是將事務(wù)拆解為兩步中贝,先確認再提交,是當前實現(xiàn)分布式事務(wù)的核心思想臼朗。在數(shù)據(jù)庫層面實現(xiàn)就是 XA 模式邻寿,在業(yè)務(wù)層面實現(xiàn)就是 Saga、TCC 模式视哑,Seata 的 At 也是二階段的思路绣否。
3PC
三階段提交協(xié)議(Three-phase Commit,即 3PC)是基于 2PC 的改造版本挡毅,主要的改動是將 2PC 中的 Prepare 階段分為了 canCommit 和 PreCommit 兩個階段蒜撮,那三個階段就是:
- CanCommit,提交檢查階段跪呈,詢問參與者是否具備條件執(zhí)行事務(wù)
- 協(xié)調(diào)者向全部參與者詢問是否具備條件執(zhí)行事務(wù)
- 參與者根據(jù)自身狀態(tài)段磨,反饋結(jié)果,具備 yes耗绿,不具備 no苹支。通常是資源的檢測。
- PreCommit误阻,預(yù)提交階段债蜜,與 2PC 中的 Prepare 階段類型,執(zhí)行事務(wù)但不提交
- 若協(xié)調(diào)者收到全部的參與者的 yes 反饋究反,則向參與者發(fā)送 preCommit 指令
- 若協(xié)調(diào)者未收到全部參與者的 yes 反饋(有的反饋no寻定,有的超時未反饋),則向參與者發(fā)送 abort 中斷指令
- 參與者根據(jù)收到的 preCommit 或 abort 指令奴紧,選擇執(zhí)行但不提交事務(wù)特姐,或中斷
- 參與者將執(zhí)行結(jié)果反饋給協(xié)調(diào)者,成功的 ack黍氮,或失敗 no
- 若執(zhí)行的是 abort 指令唐含,那么不再繼續(xù),事務(wù)終止
- DoCommit沫浆,提交階段
- 若協(xié)調(diào)者收到全部的參與者關(guān)于 preCommit 指令的 yes 反饋捷枯,則向參與者發(fā)送 doCommit 指令
- 若協(xié)調(diào)者未收到全部參與者關(guān)于 preCommit 指令的 yes 反饋的 yes 反饋(有的反饋no,有的超時未反饋)专执,則向參與者發(fā)送 abort 指令
- 參與者根據(jù)收到的 commit 或 abort 指令淮捆,完成事務(wù)的提交或者中斷
- 協(xié)調(diào)者收到全部參與者的 commit 或 abort 的 ack,完成事務(wù)
canCommit、preCommit 階段全部反饋 yes 的情況攀痊,如圖:
canCommit 未全部反饋 yes 的情況桐腌,如圖:
canCommit 全部反饋 yes,但 preCommit 未全部 ACK 的情況苟径,如圖:
3PC 增加了 canCommit 階段案站,也就是在進入到事務(wù)執(zhí)行階段前,可以完成一些必要的檢查棘街,而不會像 2PC 那樣直接進入事務(wù)執(zhí)行而鎖定資源蟆盐。這樣在一定成都上減少了資源的阻塞范圍。但步驟多了遭殉,實現(xiàn)必然復(fù)雜了石挂。
分布式事務(wù)模式
XA
XA,eXtended Architecture险污,是由 X/Open 組織提出的分布式事務(wù)的規(guī)范痹愚,XA 規(guī)范主要定義了事務(wù)管理器(TM)和資源管理器(RM)之間的接口。目前主流的數(shù)據(jù)庫基本都支持XA事務(wù)罗心,包括 mysql里伯、oracle、sqlserver渤闷、postgre疾瓮。
本地的數(shù)據(jù)庫如 MySQL 在 XA 中扮演的是 RM 角色。而 XA 工具扮演的是 TM 的角色飒箭,例如 Seata 和 Dtm狼电。通過 TM 與數(shù)據(jù)庫RM的交互,完成分布式事務(wù)的調(diào)度控制弦蹂。
XA 是 2PC 在數(shù)據(jù)庫層面實現(xiàn)的一種規(guī)范肩碟,分為兩階段:
- 第一階段(prepare):所有的參與者 RM 準備執(zhí)行事務(wù)并鎖住需要的資源。參與者 ready 時凸椿,向 TM 報告已準備就緒
- 第二階段 (commit/rollback):當事務(wù)管理者(TM)確認所有參與者(RM)都 ready 后削祈,向所有參與者發(fā)送 commit 命令或 rollback 命令
XA 模式在數(shù)據(jù)庫層面實現(xiàn),因此一致性非常嚴格脑漫,但同時并發(fā)性能較差髓抑。因此適合做那種要求強一致性的業(yè)務(wù)邏輯,例如轉(zhuǎn)賬优幸、金融等吨拍。
MySQL 提供如下的語句實現(xiàn) XA 事務(wù):
# Begin
XA {START|BEGIN} xid [JOIN|RESUME]
# End
XA END xid [SUSPEND [FOR MIGRATE]]
# Prepare
XA PREPARE xid
# Commit
XA COMMIT xid [ONE PHASE]
# rollback
XA ROLLBACK xid
# recover
XA RECOVER [CONVERT XID]
MySQL 通過關(guān)聯(lián)特定的 xid 來控制本地事務(wù)(子事務(wù)),TM 負責控制全局事務(wù)网杆。
以確認訂單為例:
- 確認訂單業(yè)務(wù)邏輯
- 通知 TM 開啟全局事務(wù)
- 注冊訂單服務(wù)創(chuàng)建訂單本地事務(wù)接口
- 注冊庫存服務(wù)扣減庫存本地事務(wù)接口
- 訂單服務(wù)
- 實現(xiàn)創(chuàng)建訂單本地事務(wù)接口
- 庫存服務(wù)
- 實現(xiàn)扣減庫存本地事務(wù)接口
全部 prepare 成功羹饰,如圖所示:
Saga
Saga 最初出現(xiàn)在1987年 Hector Garcaa-Molrna & Kenneth Salem 發(fā)表的論文《SAGAS》里伊滋。其核心思想是將長事務(wù)拆分為多個短事務(wù),由 Saga 事務(wù)協(xié)調(diào)器協(xié)調(diào)队秩,如果每個短事務(wù)都成功提交完成笑旺,那么全局事務(wù)就正常完成,如果某個步驟失敗刹碾,則根據(jù)相反順序一次調(diào)用補償操作燥撞。
Saga 是業(yè)務(wù)邏輯層面實現(xiàn)的分布式事務(wù)管理。意味著任何一個事務(wù)內(nèi)的操作迷帜,都要定義正向和反向(補償)兩個操作。正向就是常規(guī)業(yè)務(wù)邏輯色洞,反向(補償)就是取消正向業(yè)務(wù)邏輯帶來的影響戏锹。總結(jié)就是火诸,先使用資源锦针,不行再退回去。
以確認訂單為例:
- 訂單服務(wù)
- 正向操作:創(chuàng)建訂單和訂單產(chǎn)品置蜀,同時訂單為已確認狀態(tài)奈搜。
- 反向(補償)操作:更新訂單狀態(tài)為未確認
- 庫存服務(wù)
- 正向操作:扣減庫存
- 反向補償操作:將扣減的庫存退回去
實操時,我們需要在訂單和庫存服務(wù)中盯荤,分別實現(xiàn)正向和反向兩個接口馋吗。而 Saga 事務(wù)管理器負責完成對接口的調(diào)用。
不需要補償操作的情況秋秤,如圖:
需要補償操作的情況宏粤,如圖:
真正實現(xiàn)時,確認訂單為主業(yè)務(wù)邏輯灼卢,負責與 Saga 事務(wù)協(xié)調(diào)器(Seata, Dtm) 通訊绍哎,將倉庫和訂單服務(wù)的正向和反向操作接口注冊到 Saga 事務(wù)協(xié)調(diào)器中。Saga 事務(wù)協(xié)調(diào)器負責根據(jù)正向操作的結(jié)果覺得是否調(diào)用補償操作鞋真。示例請參考 《DTM 的 Saga 示例》
TCC
TCC 是 Try崇堰、Confirm、Cancel 三個詞語的縮寫涩咖,最早是由 Pat Helland 于 2007 年發(fā)表的一篇名為《Life beyond Distributed Transactions:an Apostate’s Opinion》的論文提出海诲。
TCC 是 2PC 在業(yè)務(wù)邏輯層面的實現(xiàn),也就意味著脫離數(shù)據(jù)庫抠藕,完成準備和提交階段饿肺。在 TCC 中,采用的是先凍結(jié)資源盾似,若全部節(jié)點凍結(jié)成功敬辣,則提交占有資源雪标,若存在失敗階段,則釋放凍結(jié)的資源溉跃〈迮伲總結(jié)就是:先凍結(jié),成功則占有否則釋放撰茎。
TCC 分為3個操作嵌牺,2個階段
- Try 操作,1階段:嘗試執(zhí)行龄糊,完成所有業(yè)務(wù)檢查(一致性), 預(yù)留必須業(yè)務(wù)資源(準隔離性)逆粹,凍結(jié)資源
- Confirm 操作,2階段:如果所有分支的 Try 都成功了炫惩,則走到Confirm階段僻弹。Confirm真正執(zhí)行業(yè)務(wù),不作任何業(yè)務(wù)檢查他嚷,只使用 Try 階段預(yù)留的業(yè)務(wù)資源
- Cancel 操作蹋绽,2階段:如果所有分支的Try有一個失敗了,則走到 Cancel 階段筋蓖。Cancel釋放 Try 階段預(yù)留的業(yè)務(wù)資源卸耘。
以確認訂單為例:
- Try 操作,1階段
- 訂單服務(wù)會生成確認中的訂單
- 庫存服務(wù)會凍結(jié)需要的庫存
- Confirm 操作粘咖,2階段蚣抗,庫存服務(wù)都成功,則確認事務(wù)
- 訂單狀態(tài)更新為已確認
- 執(zhí)行扣減庫存
- Cancel 操作涂炎,2階段忠聚,若訂單、庫存服務(wù)存在失敗的情況唱捣,則取消事務(wù)
- 釋放凍結(jié)的庫存
- 將訂單狀態(tài)更新為未確認狀態(tài)
我們需要在訂單服務(wù)和庫存服務(wù)中两蟀,分別實現(xiàn) Try、Confirm和Cancel 接口震缭。而 分布式事務(wù)管理器(Seata赂毯、Dtm)服務(wù)完成調(diào)度,根據(jù)結(jié)果選則 Confirm 還是 Cancel拣宰。
執(zhí)行 Confirm 情況党涕,如圖所示:
執(zhí)行 Cancel 情況,如圖所示:
真正實現(xiàn)時巡社,確認訂單為主業(yè)務(wù)邏輯膛堤,負責與TCC事務(wù)協(xié)調(diào)器(Seata, Dtm)通訊,將倉庫和訂單服務(wù)的 Confirm 和 Cancel 接口注冊到 TCC 事務(wù)協(xié)調(diào)器中晌该。TCC 事務(wù)協(xié)調(diào)器負責根據(jù) Try 的結(jié)果調(diào)用 Confirm 或 Cancel肥荔。
TCC vs Saga 思想:
- Saga绿渣,先使用,不行再退回
- TCC燕耿,先鎖定(凍結(jié))中符,再選擇占用或釋放
事務(wù)消息
消息事務(wù)的原理是將兩個事務(wù)通過消息中間件進行異步解耦。RocketMQ 實現(xiàn)的該方案誉帅。若服務(wù) A淀散、B要實現(xiàn)分布式事務(wù),其思路是:
- A 服務(wù)發(fā)布事務(wù)成功的 half 消息到 MQ蚜锨。half 消息是消費者(服務(wù) B)不可見的消息档插。
- A 服務(wù)執(zhí)行本地事務(wù)
- 事務(wù)提交,則通知 MQ 將 half 消息對消費者可見
- 事務(wù)回滾踏志,則通知 MQ 將 half 消息刪除
- B 服務(wù)只有在 A 服務(wù)事務(wù)提交成功后阀捅,才會消費到消息,消費到消息后针余,B 服務(wù)完成自己的事務(wù)部分
- 若消費失敗,服務(wù) B 會不斷重試凄诞,直到消費成功
RocketMQ 還要求 A 服務(wù)提供一個接口圆雁,用于查詢關(guān)于這條 half 消息對應(yīng)的事務(wù)是否成功,來控制 half 消息的狀態(tài)帆谍。
該方案主要適用于不需要業(yè)務(wù)回滾的場景伪朽,如某些附加操作。例如注冊成功后汛蝙,領(lǐng)取禮品烈涮,發(fā)送郵件等。
最大努力通知
最大努力通知的方案適用于一些最終一致性要求較低的業(yè)務(wù)窖剑。
執(zhí)行流程:
- 服務(wù) A 本地事務(wù)執(zhí)行完之后坚洽,發(fā)送個消息到 MQ;
- 最大努力通知調(diào)度器西土,消費該消息讶舰,同時調(diào)用服務(wù) B 的接口完成事務(wù)
- 要是服務(wù) B 執(zhí)行失敗了,那么最大努力通知調(diào)度器就定時嘗試重新調(diào)用服務(wù) B的事務(wù)幾口, 反復(fù) N 次需了,最后還是不行就放棄跳昼。
該方案主要適用于不需要業(yè)務(wù)回滾、一致性較低的場景肋乍,如某些附加操作鹅颊。例如注冊成功后,領(lǐng)取禮品墓造,發(fā)送郵件等堪伍。
分布式事務(wù)異常
分布式系統(tǒng)最大的問題就是 NPC 锚烦,是 Network Delay, Process Pause, Clock Drift 的首字母縮寫。指的是:
- Network Delay杠娱,網(wǎng)絡(luò)延遲
- Process Pause挽牢,進程暫停。當基于某些需要摊求,例如內(nèi)存垃圾回收禽拔、CPU 排隊、服務(wù)遷移等室叉,某服務(wù)會暫時暫停睹栖。
- Clock Drift,時鐘漂移茧痕。分布式系統(tǒng)涉及大量的服務(wù)器野来,而不同服務(wù)器通常使用 NTP (Network Time Protocol)協(xié)議將本地設(shè)備的時間與時間服務(wù)器對齊對齊后,通常會導(dǎo)致本地時間跳躍踪旷。
由于分布式事務(wù)系統(tǒng)由于存在 NPC 問題曼氛,意味著分布式事務(wù)需要考慮:
- 空補償,也叫空回滾令野,補償操作在主動操作未執(zhí)行前執(zhí)行舀患,系統(tǒng)設(shè)計應(yīng)該允許空補償。因此本地事務(wù)的補償操作需要判定出來主動操作是否執(zhí)行气破。補償操作聊浅,可以理解成逆向操作或 rollback 操作,主動操作可以理解為正向操作或 prepare 操作现使,例如:
- TCC 中低匙,就是 cancel 操作的執(zhí)行需要判定 try 是否執(zhí)行
- Sage 中,就是逆向補償操作需要判定正向操作是否執(zhí)行
- 懸掛碳锈,正向操作在執(zhí)行時顽冶,補償操作已經(jīng)完成,因此本地事務(wù)的正向操作需要判定出補償操作是否執(zhí)行完成殴胧,例如:
- TCC 中渗稍,就是 Try 操作執(zhí)行時需要判定 cancel 操作是否執(zhí)行完畢
- Saga 中,就是正向操作執(zhí)行時需要判定逆向補償操作是否執(zhí)行
以 Saga 模式的網(wǎng)絡(luò)異常時序圖為例:
[站外圖片上傳中...(image-5ca2a5-1672155282813)]
倉儲服務(wù)存在 NPC 問題团滥,導(dǎo)致操作1 扣減庫存 操作遲遲到達不了倉儲服務(wù)竿屹,同時假設(shè) 2 操作的成功結(jié)果由于 NPC 問題到達不了調(diào)度器,這就導(dǎo)致:
- 空補償:2 操作灸姊,因為超時而導(dǎo)致分布式事務(wù)失敗拱燃,需要補償。因此向倉儲服務(wù)發(fā)出補償操作請求力惯。此時正向操作還未抵達倉儲服務(wù)碗誉,因此是個空補償操作召嘶。
- 懸掛:1 操作,當1 扣減庫存這個正向操作到達倉儲服務(wù)時哮缺,由于已經(jīng)執(zhí)行過補償操作弄跌,因此 1 操作出現(xiàn)懸掛。
不論空補償還是懸掛尝苇,都需要在業(yè)務(wù)邏輯層面做出判定铛只。通常的做法是通過分布式事務(wù)事件日志的方案來標識操作狀態(tài),進而決定是否需要處理空補償和防止懸掛糠溜。
上圖中淳玩,2 操作超時而執(zhí)行的補償,若在倉庫服務(wù)執(zhí)行成功非竿,但反饋的結(jié)果由于 NPC 問題不能到達事務(wù)調(diào)度器蜕着,那么事務(wù)調(diào)度器還有可能再次發(fā)送 3 超時而執(zhí)行補償操作。這就意味著倉庫服務(wù)的補償操作會被多次重復(fù)調(diào)用红柱。我們必須保證分布式事務(wù)的全部操作分支保證冪等性承匣。也就是重復(fù)調(diào)用操作分支,但不會產(chǎn)生疊加的影響锤悄。
在編程中一個冪等操作的特點是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同悄雅。
DTM 示例
DTM 是一款開源的分布式事務(wù)管理器,解決跨數(shù)據(jù)庫铁蹈、跨服務(wù)、跨語言棧更新數(shù)據(jù)的一致性問題众眨。
DTM 安裝
https://www.dtm.pub/guide/install.html
Docker
$ sudo docker run \
--net host \
--rm -it \
--name dtmDev \
-p 36789:36789 \
-p 36790:36790 \
yedf/dtm:latest
Unable to find image 'yedf/dtm:latest' locally
latest: Pulling from yedf/dtm
530afca65e2e: Already exists
cfc3c688efa0: Pull complete
4f4fb700ef54: Pull complete
Digest: sha256:1b7f6da9d959ba62ea4aa995f92696158ff82f8472064c081c262a58a73be9b4
二進制安裝包
地址:
https://github.com/dtm-labs/dtm/releases/latest
下載對應(yīng)平臺的版本握牧,運行即可。
源碼安裝
源碼為 Go娩梨,編譯需要 Go 1.16+
$ git clone https://github.com/dtm-labs/dtm && cd dtm
$ go build
$ ./dtm
DTM 的 Saga 示例
完整代碼位于:microService/dtm_saga
業(yè)務(wù)邏輯說明沿腰,共有三個服務(wù):
- bff/ensure_order.go,確認訂單的聚合服務(wù)狈定。負責開啟 saga 事務(wù)
- order/serviceOrder.go颂龙,訂單服務(wù),創(chuàng)建訂單纽什,當訂單庫存檢測失敗時措嵌,將訂單狀態(tài)更新為未確認
- storage/serviceStorage.go,倉庫服務(wù)芦缰,扣減庫存企巢,當庫存不足時,錯誤让蕾。補償接口完成庫存回退
編碼實現(xiàn):
一:準備訂單服務(wù)和倉庫服務(wù)相關(guān)表:
drop table if exists saga.orders;
create table if not exists saga.orders (
id int unsigned primary key auto_increment,
tx_id varchar(255) unique,
status enum('confirming', 'confirmed', 'not confirmed'),
total int
);
drop table if exists saga.order_products;
create table if not exists saga.order_products (
order_id int unsigned,
product_id int unsigned,
quantity int,
price int,
primary key (order_id, product_id)
);
drop table if exists saga.storages;
create table if not exists saga.storages (
id int unsigned primary key auto_increment,
inventory int
);
insert into saga.storages values
(3, 108),
(8, 10);
二:在各自服務(wù)的 MySQL 中執(zhí)行以上 SQL浪规,準備好基礎(chǔ)數(shù)據(jù):
# order
$ sudo docker run \
--rm \
--net host \
--name mysqlSagaOrder \
-e MYSQL_TCP_PORT=3306 \
-e MYSQL_ROOT_PASSWORD=mashibing \
-e MYSQL_DATABASE=saga \
-d mysql:latest
# login mysql && create table
$ sudo docker exec -it mysqlSagaOrder mysql -hlocalhost -pmashibing
# storage
$ sudo docker run \
--rm \
--net host \
--name mysqlSagaStorage \
-e MYSQL_TCP_PORT=3307 \
-e MYSQL_ROOT_PASSWORD=mashibing \
-e MYSQL_DATABASE=saga \
-d mysql:latest
# login mysql && create table && insert some rows.
$ sudo docker exec -it mysqlSagaStorage mysql -hlocalhost -pmashibing
三:編寫業(yè)務(wù)服務(wù)
聚合服務(wù) bff/ensure_order.go
package main
import (
"github.com/dtm-labs/client/dtmcli"
"log"
)
// 業(yè)務(wù)請求數(shù)據(jù)對象
type Req struct {
Quantity int `json:"quantity"`
Id int `json:"id"`
TxId string `json:"tx_id"`
}
// 模擬創(chuàng)建訂單的聚合業(yè)務(wù)邏輯
func main() {
// dtm 服務(wù)器地址
const dtmServer = "http://192.168.177.131:36789/api/dtmsvr"
// 關(guān)聯(lián)的兩個服務(wù)的地址
const orderServer = "http://192.168.177.1:8081"
const storageServer = "http://192.168.177.1:8082"
// dtm 生成 事務(wù)id
gid := dtmcli.MustGenGid(dtmServer)
// 偽造請求數(shù)據(jù)
req := Req{20, 3, gid}
// 啟動 Saga 事務(wù)
saga := dtmcli.NewSaga(dtmServer, gid).
Add(orderServer+"/order-create", orderServer+"/order-create-compensate", req).
Add(storageServer+"/deduct", storageServer+"/deduct-compensate", req)
// 事務(wù)提交
err := saga.Submit()
log.Fatalln(err)
}
訂單服務(wù):
order/serviceOrder.go
package main
import (
"database/sql"
"github.com/gin-gonic/gin"
_ "github.com/go-sql-driver/mysql"
"log"
"net/http"
)
var DB *sql.DB
func init() {
db, err := sql.Open("mysql", "root:mashibing@tcp(192.168.177.131:3306)/saga?charset=utf8mb4&parseTime=True&loc=Local")
if err != nil {
log.Fatalln(err)
}
DB = db
}
type Req struct {
Quantity int `json:"quantity"`
Id int `json:"id"`
TxId string `json:"tx_id"`
}
func main() {
r := gin.Default()
r.POST("/order-create", func(c *gin.Context) {
req := &Req{}
if err := c.BindJSON(req); err != nil {
log.Fatalln(err)
}
// 創(chuàng)建訂單和訂單商品記錄
query := "insert into orders values (null, ?, 'confirmed', 1024)"
result, err := DB.Exec(query, req.TxId)
if err != nil {
c.JSON(http.StatusOK, gin.H{"dtm_result": "FAILURE", "message": "order create failed"})
return
}
orderId, err := result.LastInsertId()
if err != nil {
c.JSON(http.StatusOK, gin.H{"dtm_result": "FAILURE", "message": "order create failed"})
return
}
query = "insert into order_products values (?, ?, ?, 99)"
if _, err := DB.Exec(query, orderId, req.Id, req.Quantity); err != nil {
c.JSON(http.StatusOK, gin.H{"dtm_result": "FAILURE", "message": "order products create failed"})
return
}
c.JSON(http.StatusOK, gin.H{"dtm_result": "SUCCESS"})
})
r.POST("/order-create-compensate", func(c *gin.Context) {
// 確認或听,生成訂單信息,插入 orders 和 order_products 表記錄笋婿,本例中誉裆,僅使用 order_products 表
req := &Req{}
if err := c.BindJSON(req); err != nil {
log.Fatalln(err)
}
// 將訂單狀態(tài)更新為未確認
query := "update orders set status='not confirmed' where tx_id=?"
if _, err := DB.Exec(query, req.TxId); err != nil {
c.JSON(http.StatusOK, gin.H{"dtm_result": "FAILURE", "message": "compensate order status failed"})
return
}
c.JSON(http.StatusOK, gin.H{"dtm_result": "SUCCESS"})
})
r.Run(":8081")
}
庫存服務(wù):
storage/serviceStorage.go
package main
import (
"database/sql"
"github.com/gin-gonic/gin"
_ "github.com/go-sql-driver/mysql"
"log"
"net/http"
)
var DB *sql.DB
func init() {
db, err := sql.Open("mysql", "root:mashibing@tcp(192.168.177.131:3307)/saga?charset=utf8mb4&parseTime=True&loc=Local")
if err != nil {
log.Fatalln(err)
}
DB = db
}
type Req struct {
Quantity int `json:"quantity"`
Id int `json:"id"`
TxId string `json:"tx_id"`
}
func main() {
r := gin.Default()
r.POST("/deduct", func(c *gin.Context) {
// 解析請求數(shù)據(jù)
req := &Req{}
if err := c.BindJSON(req); err != nil {
log.Fatalln(err)
}
// 執(zhí)行 扣減庫存
query := "update storages set inventory = inventory-? where id = ?"
if _, err := DB.Exec(query, req.Quantity, req.Id); err != nil {
c.JSON(http.StatusOK, gin.H{"dtm_result": "FAILURE", "message": "deduct inventory failed"})
return
}
// 判定庫存是否為負數(shù),負數(shù)失敗
inventory := 0
query = "select inventory from storages where id=?"
if err := DB.QueryRow(query, req.Id).Scan(&inventory); err != nil {
log.Fatalln(err)
}
if inventory < 0 {
c.JSON(http.StatusOK, gin.H{"dtm_result": "FAILURE", "message": "inventory is not enough"})
return
}
c.JSON(http.StatusOK, gin.H{"dtm_result": "SUCCESS"})
})
r.POST("/deduct-compensate", func(c *gin.Context) {
req := &Req{}
if err := c.BindJSON(req); err != nil {
log.Fatalln(err)
}
// 執(zhí)行 扣減庫存的補償操作
query := "update storages set inventory = inventory+? where id = ?"
if _, err := DB.Exec(query, req.Quantity, req.Id); err != nil {
c.JSON(http.StatusOK, gin.H{"dtm_result": "FAILURE", "message": "compensate inventory failed"})
return
}
c.JSON(http.StatusOK, gin.H{"dtm_result": "SUCCESS"})
})
r.Run(":8082")
}
啟動 DTM缸濒,完成測試足丢。通過數(shù)據(jù)庫中的數(shù)據(jù)變化,體現(xiàn)分布式事務(wù)的實現(xiàn)绍填。