分布式事務(wù)
我們知道在單數(shù)據(jù)庫系統(tǒng)中,實(shí)現(xiàn)數(shù)據(jù)的一致性,通過數(shù)據(jù)庫的事務(wù)來處理比較簡單屎暇。在微服務(wù)或分布式系統(tǒng)中,各個(gè)獨(dú)立的服務(wù)都會(huì)有自己的數(shù)據(jù)庫驻粟,而不是在同一個(gè)數(shù)據(jù)庫中根悼,所以當(dāng)一組事務(wù)(如商品交易中,商品的庫存蜀撑、用戶的賬戶資金和交易記錄等)的處理是分布在不同數(shù)據(jù)庫中的番挺,分布式事務(wù)就是為了解決在多個(gè)數(shù)據(jù)庫節(jié)點(diǎn)中保證這些數(shù)據(jù)的一致性。
分布式事務(wù)里有個(gè)BASE理論屯掖,在分布式數(shù)據(jù)庫中玄柏,存在強(qiáng)一致性和弱一致性。
強(qiáng)一致性的好處是贴铜,對(duì)于開發(fā)者來說比較友好粪摘,數(shù)據(jù)始終可以讀取到最新值,但這種方式需要復(fù)雜的協(xié)議绍坝,并且需要犧牲很多的性能徘意。
弱一致性,對(duì)于開發(fā)者來說相對(duì)沒有那么友好轩褐,無法保證讀取的值是最新的椎咧,但是不需要引入復(fù)雜的協(xié)議,也不需要犧牲很多的性能把介。
弱一致性是當(dāng)今企業(yè)采用的主流方案勤讽,它并不能保證所有數(shù)據(jù)的實(shí)時(shí)一致性,所以有時(shí)候?qū)崟r(shí)讀取數(shù)據(jù)是不可信的拗踢。它只是在正常的流程中脚牍,加入了提供修復(fù)數(shù)據(jù)的可能性,從而減少數(shù)據(jù)不一致的可能性巢墅,大大降低數(shù)據(jù)不一致的可能性诸狭。
什么時(shí)候使用分布式事務(wù)
對(duì)于像電商中用戶隱私信息券膀、商品信息、交易記錄以及資金等數(shù)據(jù)驯遇,這些具備價(jià)值的核心數(shù)據(jù)芹彬,關(guān)系到用戶隱私和財(cái)產(chǎn)的內(nèi)容,應(yīng)該考慮使用分布式事務(wù)來保證一致性叉庐。
但對(duì)于用戶評(píng)價(jià)雀监、自身裝飾和其他一些非重要的個(gè)性化信息,可以采用非事務(wù)的處理眨唬。因?yàn)橐粋€(gè)正常的系統(tǒng)出現(xiàn)不一致的情況是小概率事件,而非大概率事件好乐,對(duì)于一些小概率的數(shù)據(jù)丟失匾竿,一般來說是允許的。之所以這樣選擇蔚万,主要基于兩點(diǎn)岭妖,一個(gè)是開發(fā)者的開發(fā)難度;另一個(gè)是用戶的體驗(yàn)反璃,過多的分布式事務(wù)會(huì)造成性能的不斷丟失
弱一致性分布式事務(wù)解決方案有如下幾種:
- 狀態(tài)表
- RabbitMQ可靠事件
- 最大嘗試
- TCC模式
冪等性
在分布式事務(wù)中昵慌,各個(gè)訪問操作的接口,都需要保證冪等性淮蜈。
所謂冪等性斋攀,是指在HTTP協(xié)議中,一次和多次請(qǐng)求某一個(gè)資源梧田,對(duì)于資源本身應(yīng)該具有同樣的結(jié)果淳蔼,也就是其執(zhí)行任意多次時(shí),對(duì)資源本身所產(chǎn)生的影響裁眯,與執(zhí)行一次時(shí)的相同鹉梨。
實(shí)現(xiàn)方式有以下幾種:
- 唯一索引 -- 防止新增臟數(shù)據(jù)
- token機(jī)制 -- 防止頁面重復(fù)提交
- 悲觀鎖 -- 獲取數(shù)據(jù)的時(shí)候加鎖(鎖表或鎖行)
- 樂觀鎖 -- 基于版本號(hào)version實(shí)現(xiàn), 在更新數(shù)據(jù)那一刻校驗(yàn)數(shù)據(jù)
- 分布式鎖 -- redis(jedis、redisson)或zookeeper實(shí)現(xiàn)
- 狀態(tài)機(jī) -- 狀態(tài)變更, 更新數(shù)據(jù)時(shí)判斷狀態(tài)
※說明:如何實(shí)現(xiàn)接口的冪等性,可以分篇在接口的冪等性文章里解說穿稳。
狀態(tài)表實(shí)現(xiàn)分布式事務(wù)
這里拿電商的商品交易為例存皂,講述下思路:
- 需要商品數(shù)據(jù)庫:商品表、商品交易明細(xì)表逢艘;資金數(shù)據(jù)庫:用戶賬戶表旦袋、賬戶交易明細(xì)表
- 主要流程包括:
商品表減商品庫存、
商品交易明細(xì)表中添加新的交易記錄它改、
用戶賬戶表中扣減用戶賬戶表的資金猜憎、
資金交易明細(xì)表中記錄賬戶交易明細(xì)表 - 需要準(zhǔn)備一個(gè)狀態(tài)表,用redis的Hset數(shù)據(jù)類型比較合適
- 這里假設(shè)相關(guān)的明細(xì)記錄表中,有4個(gè)狀態(tài):
1--準(zhǔn)備交易,
2--交易成功,
3--被沖正,
4--沖正記錄
流程說明
在商品服務(wù)中,商品減庫存后搔课,記錄商品交易明細(xì)胰柑,如果沒有異常截亦,就將商品交易記錄的狀態(tài)位設(shè)置為“1—準(zhǔn)備提交”,并且記錄在Redis的狀態(tài)表中柬讨。
商品服務(wù)通過RESTFUL調(diào)用資金服務(wù)崩瓤,如果成功,就將賬戶交易明細(xì)表的記錄的狀態(tài)位設(shè)置為“1—準(zhǔn)備提交”踩官,并且記錄在Redis的狀態(tài)表中却桶。
最后,讀取Redis相關(guān)的所有狀態(tài)位蔗牡,確定是否所有的操作都為“1—準(zhǔn)備提交”狀態(tài)颖系,如果是,則更新產(chǎn)品服務(wù)的記錄狀態(tài)為“2—提交成功”辩越,然后發(fā)起資金服務(wù)調(diào)用嘁扼,將對(duì)應(yīng)的記錄(可通過業(yè)務(wù)流水號(hào)關(guān)聯(lián))的狀態(tài)也更新為“2—提交成功”,這樣就完成了整個(gè)交易黔攒。
如果不全部為“1—準(zhǔn)備提交”狀態(tài)趁啸,則發(fā)起各庫的沖正交易,沖掉原有的記錄督惰,并且歸還商品庫存和賬戶金額不傅。發(fā)起沖正交易,把原明細(xì)記錄狀態(tài)更新為3--被沖正,并往明細(xì)表中添加對(duì)應(yīng)的新記錄,狀態(tài)為4--沖正記錄
RabbitMQ可靠事件
使用RabbitMQ等消息隊(duì)列中間件的可靠事件,來實(shí)現(xiàn)分布式事務(wù),這里結(jié)合SpringBoot
前面有介紹過SpringBoot整合多數(shù)據(jù)庫的文章,這里可以用到赏胚,具體參考《Spring Boot學(xué)習(xí):MyBatis配置Druid多數(shù)據(jù)源》,切換數(shù)據(jù)源使用@DataSource注解,如下
@DataSource(value = DataSourceType.MASTER) //切換到商品數(shù)據(jù)庫
@DataSource(value = DataSourceType.SLAVE) //切換到賬戶數(shù)據(jù)庫
在此基礎(chǔ)上我們加入RabbitMQ實(shí)現(xiàn)分布式事務(wù)功能
- 在pom.xml文件中加入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- yml配置文件中,關(guān)于RabbitMQ的配置如下:
# Spring 配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
#使用發(fā)布者確認(rèn)模式,發(fā)布消息者會(huì)得到一個(gè)“消息是否被服務(wù)提供者接收”的確認(rèn)消息
publisher-confirms: true
#RabbitMQ 隊(duì)列名稱配置
rabbitmq:
queue:
fund: fund
3.創(chuàng)建RabbitMQ配置文件RabbitConfig.java
package com.zhlab.demo.config;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName RabbitConfig
* @Description //RabbitMQ消息隊(duì)列配置
* @Author singleZhang
* @Email 405780096@qq.com
* @Date 2020/12/11 0011 上午 11:10
**/
@Configuration
public class RabbitConfig {
// 讀取配置屬性
@Value("${rabbitmq.queue.fund}")
private String fundQueueName = null;
// 創(chuàng)建RabbitMQ消息隊(duì)列
@Bean(name="fundQueue")
public Queue createFundQueue() {
return new Queue(fundQueueName);
}
}
- 創(chuàng)建數(shù)據(jù)傳輸對(duì)象FundParams.java
package com.zhlab.demo.model;
import java.io.Serializable;
/**
* @ClassName FundParams
* @Description //FundParams
* @Author singleZhang
* @Email 405780096@qq.com
* @Date 2020/12/11 0011 上午 11:30
**/
public class FundParams implements Serializable {
// 序列化版本號(hào)
public static final long serialVersionUID = 989878441231256478L;
private Long xid; // 業(yè)務(wù)流水號(hào)
private Long userId; // 用戶編號(hào)
private Double amount; // 交易金額
public FundParams() {
}
public FundParams(Long xid, Long userId, Double amount) {
this.xid = xid;
this.userId = userId;
this.amount = amount;
}
public Long getXid() {
return xid;
}
public void setXid(Long xid) {
this.xid = xid;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
}
- 創(chuàng)建商品服務(wù) 業(yè)務(wù)邏輯PurchaseService.java
package com.zhlab.demo.service.goods;
import com.zhlab.demo.db.DataSourceType;
import com.zhlab.demo.db.annotation.DataSource;
import com.zhlab.demo.model.FundParams;
import com.zhlab.demo.utils.SnowFlakeUtil;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
* @ClassName PurchaseService
* @Description //商品 業(yè)務(wù)邏輯
* @Author singleZhang
* @Email 405780096@qq.com
* @Date 2020/12/11 0011 上午 11:24
**/
@Service
public class PurchaseService implements RabbitTemplate.ConfirmCallback {
//實(shí)現(xiàn)RabbitTemplate.ConfirmCallback接口
//需要實(shí)現(xiàn)它定義的confirm方法访娶,這樣它便可以作為一個(gè)發(fā)布者檢測(cè)消息是否被消費(fèi)者所接收的確認(rèn)類
// SnowFlake算法生成ID
SnowFlakeUtil worker = new SnowFlakeUtil(003);
// RabbitMQ模板
@Autowired
private RabbitTemplate rabbitTemplate;
// 讀取配置屬性
@Value("${rabbitmq.queue.fund}")
private String fundQueueName;
// 購買業(yè)務(wù)方法
@DataSource(value = DataSourceType.MASTER) //切換到商品數(shù)據(jù)庫
public Long purchase(Long productId, Long userId, Double amount) {
rabbitTemplate.setConfirmCallback(this);//設(shè)置了回調(diào)類為當(dāng)前類
// SnowFlake算法生成序列號(hào),用戶跨服務(wù)的關(guān)聯(lián),這里用本地自定義方法觉阅,可以借助Leaf TinyID等分布式ID生成服務(wù)中間件
Long xid = worker.nextId();
// 傳遞給消費(fèi)者的參數(shù)
FundParams params = new FundParams(xid, userId, amount);
// 發(fā)送消息給資金服務(wù)做扣款
this.rabbitTemplate.convertAndSend(fundQueueName, params); // ④
System.out.println("執(zhí)行產(chǎn)品服務(wù)邏輯");
return xid;
}
/**
* 確認(rèn)回調(diào)震肮,會(huì)異步執(zhí)行
* @param correlationData --相關(guān)數(shù)據(jù)
* @param ack -- 是否被消費(fèi)
* @param cause -- 失敗原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
/*
* ack代表是否成功。
* 如果投遞消息失敗留拾,就會(huì)先停滯1秒戳晌,然后嘗試進(jìn)行沖正交易,沖掉原有交易痴柔,這樣就可以使得數(shù)據(jù)平整
*/
if (ack){ // 消息投遞成功
System.out.println("執(zhí)行交易成功");
} else { // 消息投遞失敗
try {
// 停滯1秒(稍微等待可能沒有完成的正常流程)沦偎,然后發(fā)起沖正交易
Thread.sleep(1000);
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println("嘗試產(chǎn)品減庫存沖正交易。");
System.out.println("嘗試賬戶扣減沖正交易咳蔚。");
//在confirm方法中豪嚎,如果參數(shù)ack為false,則說明消息傳遞失敗谈火,就要嘗試執(zhí)行沖正交易侈询,把數(shù)據(jù)還原回來
System.out.println(cause); // 打印消息投遞失敗的原因
}
}
}
- 創(chuàng)建賬戶服務(wù)業(yè)務(wù)邏輯AccountService.java
package com.zhlab.demo.service.fund;
import com.zhlab.demo.db.DataSourceType;
import com.zhlab.demo.db.annotation.DataSource;
import com.zhlab.demo.model.FundParams;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
/**
* @ClassName AccountService
* @Description //賬戶 業(yè)務(wù)邏輯
* @Author singleZhang
* @Email 405780096@qq.com
* @Date 2020/12/11 0011 上午 11:25
**/
@Service
public class AccountService {
/* 消息監(jiān)聽,取YAML文件配置的隊(duì)列名
*因?yàn)橄⒈幌M(fèi)糯耍,所以觸發(fā)PurchaseService類的confirm方法
*spring.rabbitmq.listener.simple.acknowledge-mode = manual
*如果配置為手動(dòng),這里就需要手動(dòng)確認(rèn)消息扔字,默認(rèn)為自動(dòng)的
*自動(dòng)確認(rèn):這種模式下囊嘉,當(dāng)發(fā)送者發(fā)送完消息之后,它會(huì)自動(dòng)認(rèn)為消費(fèi)者已經(jīng)成功接收到該條消息革为。
*這種方式效率較高扭粱,當(dāng)時(shí)如果在發(fā)送過程中,如果網(wǎng)絡(luò)中斷或者連接斷開震檩,將會(huì)導(dǎo)致消息丟失
*手動(dòng)確認(rèn):消費(fèi)者成功消費(fèi)完消息之后琢蛤,會(huì)顯式發(fā)回一個(gè)應(yīng)答(ack信號(hào)),
*RabbitMQ只有成功接收到這個(gè)應(yīng)答消息抛虏,才將消息從內(nèi)存或磁盤中移除消息博其。
*這種方式效率較低點(diǎn),但是能保證絕大部分的消息不會(huì)丟失迂猴,當(dāng)然肯定還有一些小概率會(huì)發(fā)生消息丟失的情況
*主要方法:basicAck慕淡、basicNack、basicReject根據(jù)具體業(yè)務(wù)情況使用,配合redis做冪等檢驗(yàn)
*/
@RabbitListener(queues = "${rabbitmq.queue.fund}")
@DataSource(value = DataSourceType.SLAVE) //切換到賬戶數(shù)據(jù)庫
public void dealAccount(FundParams params) {
//TODO具體業(yè)務(wù)邏輯需自己實(shí)現(xiàn)
System.out.println("扣減賬戶金額邏輯......");
}
}
7.寫個(gè)測(cè)試接口來測(cè)試一下错忱,創(chuàng)建MqController.java
package com.zhlab.demo.controller;
import com.zhlab.demo.service.goods.PurchaseService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName MqController
* @Description //RabbitMQ可靠消息 接口測(cè)試
* @Author singleZhang
* @Email 405780096@qq.com
* @Date 2020/12/11 0011 下午 2:25
**/
@RestController
@RequestMapping("/mq")
public class MqController {
@Autowired
private PurchaseService purchaseService;
@GetMapping("/test")
public String testMq() {
return purchaseService.purchase(1L, 1L, 200.0) + "";
}
}
以上就是基于RabbitMQ可靠消息 實(shí)現(xiàn)的分布式事務(wù)處理,邏輯和說明都在注釋里了。
※說明:這樣的確認(rèn)方式挂据,只是保證了事件的有效傳遞以清,但是不能保證消費(fèi)類能夠沒有異常或者錯(cuò)誤發(fā)生崎逃,當(dāng)消費(fèi)類有異持谰螅或錯(cuò)誤發(fā)生時(shí),數(shù)據(jù)依舊會(huì)存在不一致的情況个绍。這樣的方式勒葱,只是保證了消息傳遞的有效性,降低了不一致的可能性巴柿,從而大大降低了后續(xù)需要運(yùn)維和業(yè)務(wù)人員處理的不一致數(shù)據(jù)的數(shù)量
TCC補(bǔ)償事務(wù)
TCC代表的是
- try(嘗試)
- confirm(確認(rèn))
- cancel(取消)
在TCC事務(wù)中凛虽,要求任何一個(gè)服務(wù)邏輯都有3個(gè)接口,它們對(duì)應(yīng)的就是嘗試(try)方法广恢、確認(rèn)(confirm)方法和取消(cancel)方法凯旋。
TCC事務(wù)的一致性可達(dá)99.99%,是一種較為成熟的方案钉迷,因此在目前有著較為廣泛的應(yīng)用至非。
繼續(xù)通過上面的商品交易流程來解析這個(gè)模型:
- 一階段
商品表減庫存,商品交易明細(xì)表記錄商品交易明細(xì)糠聪,并且將對(duì)應(yīng)記錄狀態(tài)設(shè)置為“1—準(zhǔn)備提交”荒椭。
調(diào)用賬戶服務(wù),用戶賬戶表扣減賬戶資金舰蟆,賬戶交易明細(xì)表記錄交易明細(xì)趣惠,并且將對(duì)應(yīng)記錄狀態(tài)設(shè)置為“1—準(zhǔn)備提交”
在一階段的調(diào)用中狸棍,如果沒有發(fā)生異常,就可以執(zhí)行正常二階段進(jìn)行提交了 - 正常二階段
商品服務(wù) 更新對(duì)應(yīng)記錄的狀態(tài)為“2—提交成功”信卡,使得數(shù)據(jù)生效
調(diào)用賬戶服務(wù)隔缀,使得對(duì)應(yīng)的記錄狀態(tài)也為“2—提交成功”,這樣正常的提交就完成了
如果在一階段發(fā)生異常傍菇,需要取消操作猾瘸,可以執(zhí)行異常二階段 - 異常二階段
商品服務(wù)執(zhí)行沖正交易,沖掉原有的產(chǎn)品交易丢习,將庫存歸還給商品表
調(diào)用賬戶服務(wù)牵触,發(fā)起沖正交易,沖掉原有的資金交易咐低,將資金歸還到賬戶里
注意揽思,這些提交和退出機(jī)制在TCC中,都需要開發(fā)者對(duì)接口作冪等性處理
TCC事務(wù)機(jī)制见擦,也并不能保證所有的數(shù)據(jù)都是完全一致的钉汗,它只是提供了一個(gè)可以修復(fù)的機(jī)制,來降低不一致的情況鲤屡,從而大大降低后續(xù)維護(hù)數(shù)據(jù)的代價(jià)损痰。TCC事務(wù)也會(huì)帶來兩個(gè)較大的麻煩:第一個(gè)是,原本的一個(gè)方法實(shí)現(xiàn)酒来,現(xiàn)在需要拆分為3個(gè)方法卢未,代價(jià)較大;第二個(gè)是堰汉,需要開發(fā)者自已實(shí)現(xiàn)提交和取消方法的冪等性
總結(jié)
使用分布式事務(wù)辽社,并不是很容易的事情,甚至有些方法還相當(dāng)復(fù)雜翘鸭。
在互聯(lián)網(wǎng)中滴铅,并不是所有的數(shù)據(jù)都需要使用分布式事務(wù),所以首先要考慮的是:在什么時(shí)候使用分布式事務(wù)就乓。即使需要使用分布式事務(wù)失息,有時(shí)候也并非需要實(shí)時(shí)實(shí)現(xiàn)數(shù)據(jù)的一致性,因?yàn)榭梢栽诤罄m(xù)通過一定的手段來完成档址。例如電商網(wǎng)站盹兢,對(duì)買家來說,需要的是快速響應(yīng)守伸,但對(duì)商家來說绎秒,就未必需要得到實(shí)時(shí)數(shù)據(jù)了,過段時(shí)間得到數(shù)據(jù)也是可以的尼摹,而這段時(shí)間就可以考慮進(jìn)行數(shù)據(jù)補(bǔ)償了见芹。無論我們?nèi)绾问褂梅植际绞聞?wù)剂娄,也無法使數(shù)據(jù)完全達(dá)到百分之百的一致性,因此一般金融和電商企業(yè)會(huì)通過對(duì)賬等形式來完成最終一致性的操作玄呛。
在分布式事務(wù)的選擇中阅懦,都會(huì)采用弱一致性代替強(qiáng)一致性,相對(duì)來說徘铝,弱一致性更加靈活耳胎,更方便我們開發(fā)。從網(wǎng)站的角度來說惕它,弱一致性可以獲得更佳的性能怕午,提升用戶的體驗(yàn),這是互聯(lián)網(wǎng)應(yīng)用需要首先考慮的要素淹魄。
拓展---電商中的高并發(fā)和分布式事務(wù)
電商網(wǎng)站中高并發(fā)是常見的郁惜,高并發(fā)是針對(duì)用戶而言的,比如搶購中甲锡,用戶只希望短時(shí)間內(nèi)快速搶到商品兆蕉,而商家對(duì)于交易信息可以延遲處理得到。
這就是意味著缤沦,對(duì)于用戶交易部分虎韵,要盡可能通過分布式事務(wù)進(jìn)行保證,但而對(duì)于商戶數(shù)據(jù)部分疚俱,實(shí)時(shí)性要求相對(duì)不是那么高劝术,可以過段時(shí)間通過后續(xù)手段來補(bǔ)償修復(fù)缩多,從而縮小分布式事務(wù)的范圍呆奕。
這里可以看出使用分布式事務(wù)的主要是請(qǐng)求數(shù)據(jù),保證這個(gè)過程可以提高數(shù)據(jù)可靠性衬吆。對(duì)于商戶數(shù)據(jù)梁钾,不需要使用分布式事務(wù),這樣可以提升性能逊抡,使搶購進(jìn)行得更快姆泻,滿足買家的需求,但是這也會(huì)引發(fā)數(shù)據(jù)的丟失冒嫡。為了解決這個(gè)問題拇勃,后續(xù)可以通過和請(qǐng)求數(shù)據(jù)進(jìn)行對(duì)比來修復(fù)數(shù)據(jù),使數(shù)據(jù)達(dá)到一致孝凌,這個(gè)過程可以在高并發(fā)過后(一般高并發(fā)都是時(shí)間段性的方咆,如性價(jià)比高的產(chǎn)品發(fā)布點(diǎn)、購物節(jié)開始時(shí)間段)進(jìn)行蟀架,這樣商戶最終也可以得到可靠的數(shù)據(jù)瓣赂,只是不是實(shí)時(shí)的榆骚,但是這并不影響商戶和用戶的業(yè)務(wù)。