作者簡(jiǎn)介:
陶陶老師
10年后端工作經(jīng)驗(yàn)跑慕,
專注Java、SpringBoot摧找、SpringCloud核行、分布式系統(tǒng)/微服務(wù)、中間件等領(lǐng)域蹬耘。
公眾號(hào):陶陶技術(shù)筆記
一芝雪、背景
在微服務(wù)架構(gòu)中,我們常常使用異步化的手段來提升系統(tǒng)的 吞吐量 和 解耦 上下游综苔,而構(gòu)建異步架構(gòu)最常用的手段就是使用 消息隊(duì)列(MQ)
惩系,那異步架構(gòu)怎樣才能實(shí)現(xiàn)數(shù)據(jù)一致性呢?本文主要介紹如何使用RocketMQ
的事務(wù)消息
來解決一致性問題如筛。
RocketMQ 是阿里巴巴開源的分布式消息中間件堡牡,目前已成為 Apache 的頂級(jí)項(xiàng)目。歷經(jīng)多次天貓雙十一海量消息考驗(yàn)杨刨,具有高性能晤柄、低延時(shí)和高可靠等特性
PS:同步場(chǎng)景怎樣保證一致性?請(qǐng)看文章《Spring Cloud同步場(chǎng)景分布式事務(wù)怎樣做妖胀?試試Seata》
二芥颈、MQ選型
可以看到在 業(yè)務(wù)處理 方面來說 RocketMQ
優(yōu)于其他對(duì)手惠勒,而且原生支持 事務(wù)消息
PS:業(yè)務(wù)系統(tǒng)用的是其他 MQ
產(chǎn)品但是又需要 事務(wù)消息 怎么辦?學(xué)習(xí)原理自己開發(fā)實(shí)現(xiàn)浇借!
三捉撮、什么是事務(wù)消息
例如下圖的場(chǎng)景:生成訂單記錄 -> MQ -> 增加積分
我們是應(yīng)該先 創(chuàng)建訂單記錄,還是先 發(fā)送MQ消息 呢妇垢?
先發(fā)送MQ消息:這個(gè)明顯是不行的巾遭,因?yàn)槿绻l(fā)送成功,而訂單創(chuàng)建失敗的話是沒辦法把消息收回來的
先創(chuàng)建訂單記錄:如果訂單創(chuàng)建成功后MQ消息發(fā)送失敗 拋出異常闯估,因?yàn)閮蓚€(gè)操作都在本地事務(wù)中所以訂單數(shù)據(jù)是可以 回滾 的
上面的 方式二 看似沒問題灼舍,但是 網(wǎng)絡(luò)是不可靠的!如果 MQ
的響應(yīng)因?yàn)榫W(wǎng)絡(luò)原因沒有收到涨薪,所以在面對(duì)不確定的結(jié)果只好進(jìn)行回滾骑素;但是 MQ
端又確實(shí)是收到了這條消息的,只是回給客戶端的 響應(yīng)丟失 了刚夺!
所以 事務(wù)消息
就是用來保證 本地事務(wù) 與 MQ消息發(fā)送 的原子性献丑!
四、RocketMQ事務(wù)消息原理
主要的邏輯分為兩個(gè)流程:
- 事務(wù)消息發(fā)送及提交:
發(fā)送
half消息
MQ服務(wù)端
響應(yīng)消息寫入結(jié)果根據(jù)發(fā)送結(jié)果執(zhí)行
本地事務(wù)
(如果寫入失敗侠姑,此時(shí)half消息對(duì)業(yè)務(wù) 不可見创橄,本地邏輯不執(zhí)行)根據(jù)本地事務(wù)狀態(tài)執(zhí)行
Commit
或者Rollback
(Commit操作生成消息索引,消息對(duì)消費(fèi)者 可見)
- 回查流程:
對(duì)于長(zhǎng)時(shí)間沒有
Commit/Rollback
的事務(wù)消息(pending
狀態(tài)的消息)莽红,從服務(wù)端發(fā)起一次 回查Producer
收到回查消息妥畏,檢查回查消息對(duì)應(yīng)的本地事務(wù)狀態(tài)
根據(jù)本地事務(wù)狀態(tài),重新
Commit
或者Rollback
邏輯時(shí)序圖
五安吁、異步架構(gòu)一致性實(shí)現(xiàn)思路
從上面的原理可以發(fā)現(xiàn) 事務(wù)消息
僅僅只是保證本地事務(wù)和MQ消息發(fā)送形成整體的 原子性
醉蚁,而投遞到MQ服務(wù)器后,并無法保證消費(fèi)者一定能消費(fèi)成功鬼店!
如果 消費(fèi)端消費(fèi)失敗 后的處理方式网棍,建議是記錄異常信息然后 人工處理,并不建議回滾上游服務(wù)的數(shù)據(jù)(因?yàn)閮烧呤?解耦 的妇智,而且 復(fù)雜度 太高)
我們可以利用 MQ
的兩個(gè)特性 重試
和 死信隊(duì)列
來協(xié)助消費(fèi)端處理:
消費(fèi)失敗后進(jìn)行一定次數(shù)的
重試
重試后也失敗的話該消息丟進(jìn)
死信隊(duì)列
里另外起一個(gè)線程監(jiān)聽消費(fèi)
死信隊(duì)列
里的消息确沸,記錄日志并且預(yù)警!
因?yàn)橛?重試
所以消費(fèi)者需要實(shí)現(xiàn)冪等性
六俘陷、分布式事務(wù)場(chǎng)景樣例
下面就用剛剛提到的場(chǎng)景:生成訂單記錄 -> MQ -> 增加積分罗捎;來簡(jiǎn)單講一下 Spring Cloud
中應(yīng)該怎么做,詳細(xì)代碼請(qǐng) 下載demo 查看拉盾。
PS:怎樣安裝部署RocketMQ可以參考《Apache RocketMQ 消息隊(duì)列部署與可視化界面安裝》
6.1. 引入依賴
使用 spring-cloud-stream
框架來訪問 RocketMQ
Spring Cloud Stream 是一個(gè)構(gòu)建消息驅(qū)動(dòng)的框架桨菜,通過抽象的定義實(shí)現(xiàn)應(yīng)用與MQ消息隊(duì)列之間的解耦,目前支持
RabbitMQ
、kafka
和RocketMQ
6.2. 開啟事務(wù)消息
消息生產(chǎn)者需要添加 transactional: true
開啟 事務(wù)消息
6.3. 訂單服務(wù)發(fā)送half消息
因?yàn)殚_啟了
事務(wù)消息
所以這里發(fā)送的是half消息
對(duì)于消費(fèi)端是不可見
的
6.4. 訂單服務(wù)監(jiān)聽half消息
使用 @RocketMQTransactionListener
注解監(jiān)聽 半消息倒得,并實(shí)現(xiàn) RocketMQLocalTransactionListener
接口泻红,該接口有兩個(gè)方法
executeLocalTransaction:用于提交本地事務(wù)
checkLocalTransaction:用于事務(wù)回查
如果提交事務(wù)消息失敗,需等待約1分鐘左右 事務(wù)回查 方法才會(huì)被調(diào)用
6.5. 積分服務(wù)消費(fèi)消息
注意:因?yàn)橛?重試 這里如果是真實(shí)的業(yè)務(wù)需要自行實(shí)現(xiàn)
冪等性
6.6. 消費(fèi)死信隊(duì)列預(yù)警
監(jiān)聽并消費(fèi)死信隊(duì)列中的消息霞掺,用于記錄錯(cuò)誤日志谊路,并且預(yù)警通知運(yùn)維人員等
6.7. 測(cè)試用例
demo中提供了3個(gè)接口分別測(cè)試不同的場(chǎng)景:
- 事務(wù)成功
http://localhost:11002/success
流程如下:
訂單創(chuàng)建 成功
提交事務(wù)消息 成功
消費(fèi)消息增加積分 成功
- 訂單創(chuàng)建成功但提交事務(wù)消息失敗
http://localhost:11002/produceError
流程如下:
訂單創(chuàng)建 成功
提交事務(wù)消息 失敗
事務(wù)回查(等待1分鐘左右) 成功
提交事務(wù)消息 成功
消費(fèi)消息增加積分 成功
- 消費(fèi)消息失敗
http://localhost:11002/consumeError
流程如下:
訂單創(chuàng)建 成功
提交事務(wù)消息 成功
消費(fèi)消息增加積分 失敗
重試消費(fèi)消息 失敗
進(jìn)入死信隊(duì)列 成功
消費(fèi)死信隊(duì)列的消息 成功
記錄日志并發(fā)出預(yù)警 成功
七、demo下載地址
作者簡(jiǎn)介:
陶陶老師
10年后端工作經(jīng)驗(yàn)菩彬,
專注Java缠劝、SpringBoot、SpringCloud骗灶、分布式系統(tǒng)/微服務(wù)惨恭、中間件等領(lǐng)域。
公眾號(hào):陶陶技術(shù)筆記
本文已經(jīng)獲得陶陶老師授權(quán)轉(zhuǎn)發(fā)耙旦,其他人若有興趣轉(zhuǎn)載脱羡,請(qǐng)直接聯(lián)系作者授權(quán)。