服務(wù)冪等性架構(gòu)設(shè)計(jì)
- 作者: 博學(xué)谷狂野架構(gòu)師
- GitHub:GitHub地址 (有我精心準(zhǔn)備的130本電子書PDF)
只分享干貨绣否、不吹水誊涯,讓我們一起加油!??
防重表實(shí)現(xiàn)冪等
對(duì)于防止數(shù)據(jù)重復(fù)提交蒜撮,還有一種解決方案就是通過(guò)防重表實(shí)現(xiàn)暴构。
防重表的實(shí)現(xiàn)思路也非常簡(jiǎn)單,首先創(chuàng)建一張表作為防重表段磨,同時(shí)在該表中建立一個(gè)或多個(gè)字段的唯一索引作為防重字段取逾,用于保證并發(fā)情況下,數(shù)據(jù)只有一條苹支。在向業(yè)務(wù)表中插入數(shù)據(jù)之前先向防重表插入砾隅,如果插入失敗則表示是重復(fù)數(shù)據(jù)。
為什么不用悲觀鎖
對(duì)于防重表的解決方案沐序,可能有人會(huì)說(shuō)為什么不使用悲觀鎖琉用,悲觀鎖在使用的過(guò)程中也是會(huì)發(fā)生死鎖的。
悲觀鎖是通過(guò)鎖表的方式實(shí)現(xiàn)的策幼,假設(shè)現(xiàn)在一個(gè)用戶A訪問(wèn)表A(鎖住了表A)邑时,然后試圖訪問(wèn)表B;
另一個(gè)用戶B訪問(wèn)表B(鎖住了表B)特姐,然后試圖訪問(wèn)表A晶丘。 這時(shí)對(duì)于用戶A來(lái)說(shuō),由于表B已經(jīng)被用戶B鎖住了唐含,所以用戶A必須等到用戶B釋放表B才能訪問(wèn)浅浮。
同時(shí)對(duì)于用戶B來(lái)說(shuō),由于表A已經(jīng)被用戶A鎖住了捷枯,所以用戶B必須等到用戶A釋放表A才能訪問(wèn)滚秩。此時(shí)死鎖就已經(jīng)產(chǎn)生了。
閱讀專業(yè)類書籍是Java程序員必備的學(xué)習(xí)方式之一淮捆。通過(guò)不斷學(xué)習(xí)和積累郁油,可以不斷提高自己的技術(shù)能力和職業(yè)水平,實(shí)現(xiàn)職業(yè)發(fā)展的目標(biāo)攀痊。
非常建議大家注重閱讀桐腌,并選擇一些有深度、有價(jià)值的書籍不斷提升自己的技術(shù)水平和能力苟径。這些書籍包括Java編程語(yǔ)言案站、數(shù)據(jù)結(jié)構(gòu)和算法、面向?qū)ο笤O(shè)計(jì)棘街、設(shè)計(jì)模式蟆盐、框架原理與應(yīng)用等等承边。
對(duì)于一位2-3年的Java程序員來(lái)說(shuō),閱讀專業(yè)類書籍是更加的重要舱禽,因?yàn)樗鼈兛梢詭椭銛U(kuò)展技術(shù)廣度和深度炒刁,提高你的技術(shù)能力和職業(yè)水平。以下是我給這些程序員的一些建議:
學(xué)會(huì)尋找優(yōu)秀的書籍:在選擇書籍時(shí)誊稚,要選擇那些被廣泛認(rèn)可和推薦的經(jīng)典書籍翔始。可以通過(guò)搜索網(wǎng)上的書籍推薦列表里伯,向其他經(jīng)驗(yàn)豐富的程序員請(qǐng)教城瞎,或者參考公司內(nèi)部的學(xué)習(xí)計(jì)劃,來(lái)找到好的書籍疾瓮。
閱讀有關(guān)設(shè)計(jì)模式和架構(gòu)的書籍:對(duì)于Java程序員來(lái)說(shuō)脖镀,掌握設(shè)計(jì)模式和架構(gòu)原則是非常重要的±堑纾可以選擇閱讀《設(shè)計(jì)模式:可復(fù)用面向?qū)ο筌浖幕A(chǔ)》蜒灰、《大話設(shè)計(jì)模式》、《Java程序員修煉之道》等書籍來(lái)深入學(xué)習(xí)肩碟。
學(xué)習(xí)新的技術(shù)和框架:Java技術(shù)不斷發(fā)展强窖,新的技術(shù)和框架也不斷涌現(xiàn)。因此削祈,Java程序員應(yīng)該定期閱讀有關(guān)新技術(shù)和框架的書籍翅溺,比如Spring、Spring Boot髓抑、MyBatis咙崎、Netty等。
學(xué)習(xí)算法和數(shù)據(jù)結(jié)構(gòu):算法和數(shù)據(jù)結(jié)構(gòu)是編程基礎(chǔ)吨拍,掌握這些知識(shí)可以提高代碼的質(zhì)量和效率褪猛。可以選擇閱讀《算法》羹饰、《算法導(dǎo)論》等書籍來(lái)學(xué)習(xí)算法和數(shù)據(jù)結(jié)構(gòu)握爷。
參考開(kāi)源項(xiàng)目和源代碼:閱讀開(kāi)源項(xiàng)目和源代碼是非常有益的,可以學(xué)習(xí)到其他程序員的編碼技巧和設(shè)計(jì)思路严里。可以選擇一些知名的開(kāi)源項(xiàng)目追城,如Spring刹碾、MyBatis等來(lái)進(jìn)行學(xué)習(xí)。
當(dāng)然座柱,我也知道迷帜,光是建議是不足以激發(fā)大家學(xué)習(xí)的動(dòng)力的物舒,所以,書也我也幫大家整理好了戏锹,把飯喂到嘴里了冠胯,我只能幫你到這里了,剩下的就靠你自己了锦针。
以下這份包含46本Java程序員必備經(jīng)典的書單(豆瓣評(píng)分8分+)荠察,是我花費(fèi)一個(gè)月時(shí)間整理的:GitHub:GitHub地址
唯一主鍵實(shí)現(xiàn)冪等
數(shù)據(jù)庫(kù)唯一主鍵的實(shí)現(xiàn)主要是利用數(shù)據(jù)庫(kù)中主鍵唯一約束的特性,一般來(lái)說(shuō)唯一主鍵比較適用于“插入”時(shí)的冪等性奈搜,其能保證一張表中只能存在一條帶該唯一主鍵的記錄悉盆。
使用數(shù)據(jù)庫(kù)唯一主鍵完成冪等性時(shí)需要注意的是,該主鍵一般來(lái)說(shuō)并不是使用數(shù)據(jù)庫(kù)中自增主鍵馋吗,而是使用分布式 ID 充當(dāng)主鍵焕盟,這樣才能能保證在分布式環(huán)境下 ID 的全局唯一性。
對(duì)于一些后臺(tái)系統(tǒng)宏粤,并發(fā)量并不高的情況下脚翘,對(duì)于冪等的實(shí)現(xiàn)非常簡(jiǎn)單,通過(guò)這種思想即可完成冪等控制绍哎。
適用場(chǎng)景
- 插入操作
- 刪除操作
使用限制
- 需要生成全局唯一主鍵 ID来农;
主要流程
主要流程如下:
- 客戶端執(zhí)行創(chuàng)建請(qǐng)求,調(diào)用服務(wù)端接口蛇摸。
- 服務(wù)端執(zhí)行業(yè)務(wù)邏輯备图,生成一個(gè)分布式
ID
,將該 ID 充當(dāng)待插入數(shù)據(jù)的主鍵赶袄,然 后執(zhí)數(shù)據(jù)插入操作揽涮,運(yùn)行對(duì)應(yīng)的SQL
語(yǔ)句。 - 服務(wù)端將該條數(shù)據(jù)插入數(shù)據(jù)庫(kù)中饿肺,如果插入成功則表示沒(méi)有重復(fù)調(diào)用接口蒋困。如果拋出主鍵重復(fù)異常,則表示數(shù)據(jù)庫(kù)中已經(jīng)存在該條記錄敬辣,返回錯(cuò)誤信息到客戶端雪标。
在業(yè)務(wù)執(zhí)行前,先判斷是否已經(jīng)操作過(guò)溉跃,如果沒(méi)有則執(zhí)行村刨,否則判斷為重復(fù)操作。
效果演示
在并發(fā)下訪問(wèn)時(shí)撰茎,因?yàn)槭腔趇d進(jìn)行判斷嵌牺,那id值就必須要保證在多次提交時(shí),需要唯一。訪問(wèn)流程如下:
@Override
@Transactional(rollbackFor = Exception.class)
public String addOrder(Order order) {
order.setCreateTime(new Date());
order.setUpdateTime(new Date());
//查詢
Order orderResult = orderMapper.selectByPrimaryKey(order.getId());
Optional<Order> orderOptional = Optional.ofNullable(orderResult);
if (orderOptional.isPresent()){
return "repeat request";
}
int result = orderMapper.insert(order);
if (result != 1){
return "fail";
}
return "success";
}
對(duì)于上述功能實(shí)現(xiàn)逆粹,在并發(fā)下募疮,并不能完成冪等性控制。通過(guò)jemeter測(cè)試僻弹,模擬50個(gè)并發(fā)阿浓,可以發(fā)現(xiàn),插入了重復(fù)數(shù)據(jù)蹋绽。產(chǎn)生了臟數(shù)據(jù)芭毙。
要解決這個(gè)問(wèn)題,非常簡(jiǎn)單蟋字,在數(shù)據(jù)庫(kù)層面添加唯一索引即可稿蹲,將id設(shè)置為唯一索引,也是最容易想到的方式鹊奖,一旦id出現(xiàn)重復(fù)苛聘,就會(huì)出現(xiàn)異常,避免了臟數(shù)據(jù)的發(fā)生也可以解決永久性冪等忠聚。但該方案無(wú)法用于分庫(kù)分表情況设哗,其只適用于單表情況。
樂(lè)觀鎖實(shí)現(xiàn)冪等性
數(shù)據(jù)庫(kù)樂(lè)觀鎖方案一般只能適用于執(zhí)行更新操作的過(guò)程两蟀,我們可以提前在對(duì)應(yīng)的數(shù)據(jù)表中多添加一個(gè)字段网梢,充當(dāng)當(dāng)前數(shù)據(jù)的版本標(biāo)識(shí)。
這樣每次對(duì)該數(shù)據(jù)庫(kù)該表的這條數(shù)據(jù)執(zhí)行更新時(shí)赂毯,都會(huì)將該版本標(biāo)識(shí)作為一個(gè)條件战虏,值為上次待更新數(shù)據(jù)中的版本標(biāo)識(shí)的值。
適用操作
- 更新操作
使用限制
- 需要數(shù)據(jù)庫(kù)對(duì)應(yīng)業(yè)務(wù)表中添加額外字段
問(wèn)題拋出
扣減庫(kù)存數(shù)據(jù)錯(cuò)誤
通過(guò)jemeter進(jìn)行測(cè)試党涕,可以發(fā)現(xiàn)烦感。當(dāng)模擬一萬(wàn)并發(fā)時(shí),最終的庫(kù)存數(shù)量是錯(cuò)誤的膛堤。這主要是因?yàn)楫?dāng)多線程訪問(wèn)時(shí)手趣,一個(gè)線程讀取到了另外線程未提交的數(shù)據(jù)造成。
synchronized失效問(wèn)題
對(duì)于現(xiàn)在的問(wèn)題肥荔,暫不考慮秒殺設(shè)計(jì)绿渣、隊(duì)列請(qǐng)求串行化等,只考慮如何通過(guò)鎖進(jìn)行解決燕耿,要通過(guò)鎖解決的話中符,那最先想到的可能是synchronized。
根據(jù)synchronized定義誉帅,當(dāng)多線程并發(fā)訪問(wèn)時(shí)淀散,會(huì)對(duì)當(dāng)前加鎖的方法產(chǎn)生阻塞谭期,從而保證線程安全,避免臟數(shù)據(jù)吧凉。但是,真的能如預(yù)期的一樣嗎踏志?
@Service
public class StockServiceImpl implements StockService {
@Autowired
private StockMapper stockMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public synchronized int lessInventory(String goodsId, int num) {
return stockMapper.lessInventory(goodsId, num);
}
}
當(dāng)前已經(jīng)在在方法上添加了synchronized阀捅,對(duì)當(dāng)前方法對(duì)象進(jìn)行了鎖定。 通過(guò)Jemeter针余,模擬一萬(wàn)并發(fā)對(duì)其進(jìn)行訪問(wèn)饲鄙。可以發(fā)現(xiàn)圆雁,仍然出現(xiàn)了臟數(shù)據(jù)忍级。
事務(wù)導(dǎo)致鎖失效
該問(wèn)題的產(chǎn)生原因,就在于在方法上synchronized搭配使用了@Transactional伪朽。
首先synchronized鎖定的是當(dāng)前方法對(duì)象轴咱,而@Transactional會(huì)對(duì)當(dāng)前方法進(jìn)行AOP增強(qiáng),動(dòng)態(tài)代理出一個(gè)代理對(duì)象烈涮,在方法執(zhí)行前開(kāi)啟事務(wù)朴肺,執(zhí)行后提交事務(wù)。
所以synchronized和@Transactional其實(shí)操作的是兩個(gè)不同的對(duì)象坚洽,換句話說(shuō)就是@Transactional的事務(wù)操作并不在synchronized鎖定范圍之內(nèi)戈稿。
假設(shè)A線程執(zhí)行完扣減庫(kù)存方法,會(huì)釋放鎖并提交事務(wù)讶舰。但A線程釋放鎖但還沒(méi)提交事務(wù)前鞍盗,B線程執(zhí)行扣減庫(kù)存方法,B線程執(zhí)行后跳昼,和A線程一起提交事務(wù)般甲,就出現(xiàn)了線程安全問(wèn)題,造成臟數(shù)據(jù)的出現(xiàn)庐舟。
樂(lè)觀鎖保證冪等
基于版本號(hào)實(shí)現(xiàn)
MySQL樂(lè)觀鎖是基于數(shù)據(jù)庫(kù)完成分布式鎖的一種實(shí)現(xiàn)欣除,實(shí)現(xiàn)的方式有兩種:
- 基于版本號(hào)
- 基于條件
但是實(shí)現(xiàn)思想都是基于MySQL的行鎖思想來(lái)實(shí)現(xiàn)的。
- 修改數(shù)據(jù)表挪略,添加version字段历帚,默認(rèn)值為0
- 修改StockMapper添加基于版本修改數(shù)據(jù)方法
@Update("update tb_stock set amount=amount-#{num},version=version+1 where goods_id=#{goodsId} and version=#{version}")
int lessInventoryByVersion(@Param("goodsId") String goodsId,@Param("num") int num,@Param("version") int version);
- 測(cè)試模擬一萬(wàn)并發(fā)進(jìn)行數(shù)據(jù)修改,此時(shí)可以發(fā)現(xiàn)當(dāng)前版本號(hào)從0變?yōu)?杠娱,且?guī)齑媪空_挽牢。
基于條件實(shí)現(xiàn)
通過(guò)版本號(hào)控制是一種非常常見(jiàn)的方式,適合于大多數(shù)場(chǎng)景摊求。
但現(xiàn)在庫(kù)存扣減的場(chǎng)景來(lái)說(shuō)禽拔,通過(guò)版本號(hào)控制就是多人并發(fā)訪問(wèn)購(gòu)買時(shí),查詢時(shí)顯示可以購(gòu)買,但最終只有一個(gè)人能成功睹栖,這也是不可以的硫惕。其實(shí)最終只要商品庫(kù)存不發(fā)生超賣就可以。那此時(shí)就可以通過(guò)條件來(lái)進(jìn)行控制野来。
- 修改StockMapper:
@Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId} and amount-#{num}>=0")
int lessInventoryByVersionOut(@Param("goodsId") String goodsId,@Param("num") int num);
- 修改StockController:
@PutMapping("/lessInventoryByVersionOut/{goodsId}/{num}")
public String lessInventoryByVersionOut(@PathVariable("goodsId") String goodsId,@PathVariable("num") int num){
int result = stockService.lessInventoryByVersionOut(goodsId, num);
if (result == 1){
System.out.println("購(gòu)買成功");
return "success";
}
System.out.println("購(gòu)買失敗");
return "fail";
}
- 通過(guò)jemeter進(jìn)行測(cè)試恼除,可以發(fā)現(xiàn)當(dāng)多人并發(fā)扣減庫(kù)存時(shí),控制住了商品超賣的問(wèn)題曼氛。
樂(lè)觀鎖實(shí)現(xiàn)冪等性
在系統(tǒng)中豁辉,不光要保證客戶端訪問(wèn)的冪等性,同時(shí)還要保證服務(wù)間冪等舀患。
比較常見(jiàn)的情況徽级,當(dāng)服務(wù)間進(jìn)行調(diào)用時(shí),因?yàn)榫W(wǎng)絡(luò)抖動(dòng)等原因出現(xiàn)超時(shí)聊浅,則很有可能出現(xiàn)數(shù)據(jù)錯(cuò)誤餐抢。此時(shí)在分布式環(huán)境下,就需要通過(guò)分布式事務(wù)或分布式鎖來(lái)保證數(shù)據(jù)的一致性狗超。分布式鎖的解決方案中MySQL樂(lè)觀鎖就是其中一種實(shí)現(xiàn)弹澎。
feign超時(shí)重試效果演示
以上圖為例,當(dāng)客戶端要生成訂單時(shí)努咐,可以基于token機(jī)制保證生成訂單的冪等性苦蒿,接著訂單生成成功后,還會(huì)基于feign調(diào)用庫(kù)存服務(wù)進(jìn)行庫(kù)存扣減渗稍,此時(shí)則很有可能出現(xiàn)佩迟,庫(kù)存服務(wù)執(zhí)行扣減庫(kù)存成功,但是當(dāng)結(jié)果返回時(shí)竿屹,出現(xiàn)網(wǎng)絡(luò)抖動(dòng)超時(shí)了报强,那么上游的訂單服務(wù)則很有可能會(huì)發(fā)起重試,此時(shí)如果不進(jìn)行扣減庫(kù)存的冪等性保證的話拱燃,則出現(xiàn)扣減庫(kù)存執(zhí)行多次秉溉。
那可以先來(lái)演示當(dāng)下游服務(wù)出現(xiàn)延遲,上游服務(wù)基于feign進(jìn)行重試的效果碗誉。
- 當(dāng)前是order調(diào)用feign召嘶,所以在order中會(huì)存在feignConfigure配置類,用于配置超時(shí)時(shí)間與重試次數(shù)哮缺。
/**
* 自定義feign超時(shí)時(shí)間弄跌、重試次數(shù)
* 默認(rèn)超時(shí)為10秒,不會(huì)進(jìn)行重試尝苇。
*/
@Configuration
public class FeignConfigure {
//超時(shí)時(shí)間铛只,時(shí)間單位毫秒
public static int connectTimeOutMillis = 5000;
public static int readTimeOutMillis = 5000;
@Bean
public Request.Options options() {
return new Request.Options(connectTimeOutMillis, readTimeOutMillis);
}
//自定義重試次數(shù)
@Bean
public Retryer feignRetryer(){
Retryer retryer = new Retryer.Default(100, 1000, 4);
return retryer;
}
}
-
stock服務(wù)的StockController中demo方法會(huì)延遲六秒埠胖。
通過(guò)這種方式模擬超時(shí)效果。此時(shí)在order中調(diào)用stock服務(wù)淳玩,可以發(fā)現(xiàn)直撤,order服務(wù)會(huì)對(duì)stock服務(wù)調(diào)用四次。
這里就演示了服務(wù)間調(diào)用超時(shí)的效果蜕着,當(dāng)下游服務(wù)超時(shí)谊惭,上游服務(wù)會(huì)進(jìn)行重試。
服務(wù)調(diào)用超時(shí)庫(kù)存多次扣減
根據(jù)上述演示侮东,當(dāng)下游服務(wù)超時(shí),上游服務(wù)就會(huì)進(jìn)行重試豹芯。
那么結(jié)合當(dāng)前的業(yè)務(wù)場(chǎng)景悄雅,當(dāng)用戶下單成功去調(diào)用庫(kù)存服務(wù)扣減庫(kù)存時(shí), 如果庫(kù)存服務(wù)執(zhí)行扣減庫(kù)存成功但返回結(jié)果超時(shí)铁蹈,則上游訂單服務(wù)就會(huì)重試宽闲,再次進(jìn)行扣減庫(kù)存,此時(shí)就會(huì)出現(xiàn)同一訂單商品庫(kù)存被多次扣減握牧。
- 在訂單服務(wù)中生成訂單容诬,并調(diào)用庫(kù)存服務(wù)扣減庫(kù)存
@Idemptent
@PostMapping("/genOrder")
public String genOrder(@RequestBody Order order){
String orderId = String.valueOf(idWorker.nextId());
order.setId(orderId);
order.setCreateTime(new Date());
order.setUpdateTime(new Date());
int result = orderService.addOrder(order);
if (result != 1){
System.out.println("fail");
return "fail";
}
//生成訂單詳情信息
List<String> goodsIdArray = JSON.parseArray(order.getGoodsIds(), String.class);
goodsIdArray.stream().forEach(goodsId->{
//插入訂單詳情
OrderDetail orderDetail = new OrderDetail();
orderDetail.setId(String.valueOf(idWorker.nextId()));
orderDetail.setGoodsId(goodsId);
orderDetail.setOrderId(orderId);
orderDetail.setGoodsName("heima");
orderDetail.setGoodsNum(1);
orderDetail.setGoodsPrice(1);
orderDetailService.addOrderDetail(orderDetail);
//扣減庫(kù)存(不考慮鎖)
stockFeign.reduceStockNoLock(goodsId, orderDetail.getGoodsNum());
});
return "success";
}
- 庫(kù)存服務(wù)直接基于商品信息進(jìn)行庫(kù)存扣減
@Update("update tb_stock set amount=amount-#{num} where goods_id=#{goodsId}")
int reduceStockNoLock(@Param("goodsId") String goodsId,@Param("num") Integer num);
@PutMapping("/reduceStockNoLock/{goodsId}/{num}")
public String reduceStockNoLock(@PathVariable("goodsId") String goodsId,
@PathVariable("num") Integer num) throws InterruptedException {
System.out.println("reduce stock");
int result = stockService.reduceStockNoLock(goodsId, num);
if (result != 1){
return "reduce stock fail";
}
//延遲
TimeUnit.SECONDS.sleep(6000);
return "reduce stock success";
}
- 執(zhí)行生成訂單扣減庫(kù)存,此時(shí)可以發(fā)現(xiàn)扣減庫(kù)存方法被執(zhí)行多次沿腰,并且?guī)齑鏀?shù)量也被扣減了多次
{"totalNum":1,"payMoney":1,"goodsIds":"['1271700536000909313']"}
樂(lè)觀鎖解決服務(wù)間重試保證冪等
- 修改StockMapper览徒,添加樂(lè)觀鎖控制控制庫(kù)存
@Update("update tb_stock set version=version+1,amount=amount-#{num} where goods_id=#{goodsId} and version=#{version} and amount-#{num}>=0")
int reduceStock(@Param("goodsId") String goodsId,@Param("num") Integer num,@Param("version") Integer version);
- 修改StockController,添加樂(lè)觀鎖扣減庫(kù)存方法
/**
* 樂(lè)觀鎖扣減庫(kù)存
* @param goodsId
* @param num
* @param version
* @return
*/
@PutMapping("/reduceStock/{goodsId}/{num}/{version}")
public int reduceStock(@PathVariable("goodsId") String goodsId,
@PathVariable("num") Integer num,
@PathVariable("version") Integer version) throws InterruptedException {
System.out.println("exec reduce stock");
int result = stockService.reduceStock(goodsId, num, version);
if (result != 1){
//扣減失敗
return result;
}
//延遲
TimeUnit.SECONDS.sleep(6000);
return result;
}
- 測(cè)試颂龙,可以發(fā)現(xiàn)雖然發(fā)生多次重試习蓬,但是庫(kù)存只會(huì)被扣減成功一次。保證了服務(wù)間的冪等性措嵌。
ps:order服務(wù)出現(xiàn)異常躲叼,是因?yàn)閛rder服務(wù)會(huì)超時(shí)重試四次,但stock服務(wù)的延遲每一次都是超過(guò)超時(shí)時(shí)間的企巢,所以最終在order服務(wù)才會(huì)出現(xiàn)read timeout異常提示枫慷。
消息冪等
在系統(tǒng)中當(dāng)使用消息隊(duì)列時(shí),無(wú)論做哪種技術(shù)選型浪规,有很多問(wèn)題是無(wú)論如何也不能忽視的或听,如:消息必達(dá)、消息冪等等罗丰。本章節(jié)以典型的RabbitMQ為例神帅,講解如何保證消息冪等的可實(shí)施解決方案,其他MQ選型均可參考萌抵。
消息重試演示
消息隊(duì)列的消息冪等性找御,主要是由MQ重試機(jī)制引起的元镀。
因?yàn)橄⑸a(chǎn)者將消息發(fā)送到MQ-Server后,MQ-Server會(huì)將消息推送到具體的消息消費(fèi)者霎桅。假設(shè)由于網(wǎng)絡(luò)抖動(dòng)或出現(xiàn)異常時(shí)栖疑,MQ-Server根據(jù)重試機(jī)制就會(huì)將消息重新向消息消費(fèi)者推送,造成消息消費(fèi)者多次收到相同消息滔驶,造成數(shù)據(jù)不一致仍劈。
在RabbitMQ中冀墨,消息重試機(jī)制是默認(rèn)開(kāi)啟的,但只會(huì)在consumer出現(xiàn)異常時(shí),才會(huì)重復(fù)推送臭胜。在使用中,異常的出現(xiàn)有可能是由于消費(fèi)方又去調(diào)用第三方接口长已,由于網(wǎng)絡(luò)抖動(dòng)而造成異常技俐,但是這個(gè)異常有可能是暫時(shí)的。所以當(dāng)消費(fèi)者出現(xiàn)異常吏口,可以讓其重試幾次奄容,如果重試幾次后,仍然有異常产徊,則需要進(jìn)行數(shù)據(jù)補(bǔ)償昂勒。
數(shù)據(jù)補(bǔ)償方案:當(dāng)重試多次后仍然出現(xiàn)異常,則讓此條消息進(jìn)入死信隊(duì)列舟铜,最終進(jìn)入到數(shù)據(jù)庫(kù)中戈盈,接著設(shè)置定時(shí)job查詢這些數(shù)據(jù),進(jìn)行手動(dòng)補(bǔ)償谆刨。
本節(jié)中以consumer消費(fèi)異常為演示主體奕谭,因此需要修改RabbitMQ配置文件。
修改配置文件
修改consumer一方的配置文件
# 消費(fèi)者監(jiān)聽(tīng)相關(guān)配置
listener:
simple:
retry:
# 開(kāi)啟消費(fèi)者(程序出現(xiàn)異常)重試機(jī)制痴荐,默認(rèn)開(kāi)啟并一直重試
enabled: true
# 最大重試次數(shù)
max-attempts: 5
# 重試間隔時(shí)間(毫秒)
initial-interval: 3000
設(shè)置消費(fèi)異常
當(dāng)consumer消息監(jiān)聽(tīng)類中添加異常血柳,最終接受消息時(shí),可以發(fā)現(xiàn)生兆,消息在接收五次后难捌,最終出現(xiàn)異常。
消息冪等解決
要保證消息冪等性的話鸦难,其實(shí)最終要解決的就是保證多次操作根吁,造成的影響是相同的。那么其解決方案的思路與服務(wù)間冪等的思路其實(shí)基本都是一致的合蔽。
- 消息防重表击敌,解決思路與服務(wù)間冪等的防重表一致。
- redis:利用redis防重拴事。
這兩種方案是最常見(jiàn)的解決方案沃斤。其實(shí)現(xiàn)思路其實(shí)都是一致的圣蝎。
代碼實(shí)現(xiàn)
修改OrderController
/**
* 此處為了方便演示,不做基礎(chǔ)添加數(shù)據(jù)庫(kù)操作
* @return
*/
@PostMapping("/addOrder")
public String addOrder(){
String uniqueKey = String.valueOf(idWorker.nextId());
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(uniqueKey);
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding("utf-8");
Message message = new Message("1271700536000909313".getBytes(),messageProperties);
rabbitTemplate.convertAndSend(RabbitMQConfig.REDUCE_STOCK_QUEUE,message);
return "success";
}
修改stockApplication
@Bean
public JedisPool jedisPool(){
return new JedisPool("192.168.200.150",6379);
}
新增消息監(jiān)聽(tīng)類
@Component
public class ReduceStockListener {
@Autowired
private StockService stockService;
@Autowired
private JedisPool jedisPool;
@Autowired
private StockFlowService stockFlowService;
@RabbitListener(queues = RabbitMQConfig.REDUCE_STOCK_QUEUE)
@Transactional
public void receiveMessage(Message message){
//獲取消息id
String messageId = message.getMessageProperties().getMessageId();
Jedis jedis = jedisPool.getResource();
System.out.println(messageId);
try {
//redis鎖去重校驗(yàn)
if (!"OK".equals(jedis.set(messageId, messageId, "NX", "PX", 300000))){
System.out.println("重復(fù)請(qǐng)求");
return;
}
//mysql狀態(tài)校驗(yàn)
if (!(stockFlowService.findByFlag(messageId).size() == 0)){
System.out.println("數(shù)據(jù)已處理");
return;
}
String goodsId = null;
try {
//獲取消息體中g(shù)oodsId
goodsId = new String(message.getBody(),"utf-8");
stockService.reduceStock(goodsId,messageId);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
int nextInt = new Random().nextInt(100);
System.out.println("隨機(jī)數(shù):"+nextInt);
if (nextInt%2 ==0){
int i= 1/0;
}
} catch (RuntimeException e) {
//解鎖
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
jedis.eval(script, Collections.singletonList(messageId), Collections.singletonList(messageId));
System.out.println("出現(xiàn)異常了");
System.out.println(messageId+":釋放鎖");
throw e;
}
}
}
消息緩沖區(qū)
對(duì)于RabbitMQ的使用衡瓶,默認(rèn)情況下徘公,每條消息都會(huì)進(jìn)行分別的ack通知,消費(fèi)完一條后哮针,再來(lái)消費(fèi)下一條关面。但是這樣就會(huì)造成大量消息的阻塞情況。所以為了提升消費(fèi)者對(duì)于消息的消費(fèi)速度十厢,可以增加consumer數(shù)據(jù)或者對(duì)消息進(jìn)行批量消費(fèi)等太。MQ接收到producer發(fā)送的消息后,不會(huì)直接推送給consumer蛮放。而是積攢到一定數(shù)量后澈驼,再進(jìn)行消息的發(fā)送。 這種方式的實(shí)現(xiàn)筛武,可以理解為是一種緩沖區(qū)的實(shí)現(xiàn),提升了消息的消費(fèi)速度挎塌,但是會(huì)在一定程度上舍棄結(jié)果返回的實(shí)時(shí)性徘六。
對(duì)于批量消費(fèi)來(lái)說(shuō),也是需要考慮冪等的榴都。對(duì)于冪等性的解決方案待锈,沿用剛才的思路即可解決。
本文由
傳智教育博學(xué)谷狂野架構(gòu)師
教研團(tuán)隊(duì)發(fā)布嘴高。如果本文對(duì)您有幫助竿音,歡迎
關(guān)注
和點(diǎn)贊
;如果您有任何建議也可留言評(píng)論
或私信
拴驮,您的支持是我堅(jiān)持創(chuàng)作的動(dòng)力春瞬。轉(zhuǎn)載請(qǐng)注明出處!