1.分布式事務的問題
在微服務的架構下仪糖,隨著業(yè)務服務的拆分和數(shù)據(jù)庫的拆分,會存在多個業(yè)務對應多個數(shù)據(jù)庫的情況调俘,如下圖所示判族,訂單和庫存分別拆分成兩個獨立的數(shù)據(jù)庫躺盛,當客戶端發(fā)送一個下單操作時,需要在訂單服務的數(shù)據(jù)庫中創(chuàng)建訂單形帮,同時庫存服務完成商品庫存的扣減槽惫。由于每個數(shù)據(jù)庫的事務執(zhí)行情況只有自己知道,比如訂單數(shù)據(jù)庫并不知道庫存數(shù)據(jù)庫的執(zhí)行情況辩撑,就會導致訂單數(shù)據(jù)庫和庫存數(shù)據(jù)庫數(shù)據(jù)不一致的問題界斜。
2.seata
Seata一款開源的分布式事務解決方案,致力于在微服務架構下提高性能和簡單易用的分布式事務服務合冀。
state術語
TC:事務協(xié)調(diào)者
維護全局和分支事務的狀態(tài)各薇,驅動全局事務提交或回滾。
TM:事務管理者
定義全局事務的范圍:開始全局范圍君躺,提交或回滾全局事務
RM:資源管理器
管理分支事務處理的資源峭判,與TC交談以注冊分支事務和報告分支事務的狀態(tài),并驅動分支事務提交或回滾棕叫。
具體執(zhí)行流程:
- TM向TC注冊全局事務林螃,并生成全局唯一的XID
- RM向TC注冊分支事務,并將其納入該XID對應的全局事務范圍
- RM向TC匯報資源的準備狀態(tài)
- TC匯總所有事務參與者的執(zhí)行狀態(tài)俺泣,決定該分布式事務是否全部回滾或提交
- TC通知所有RM提交或回滾事務疗认。
下面結合例子來解釋seata的操作過程完残,具體了解分布式事務的操作過程
1.seata-server的安裝
官網(wǎng)下載解壓
2.修改conf下的file.conf文件,修改里面的service和store横漏,并添加數(shù)據(jù)庫seata
service {
#vgroup->rgroup
vgroup_mapping.my_test_tx_group = "fsp_tx_group"http://起一個名稱
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}
//----------------------------------------------------//
store {
## store mode: file谨设、db
mode = "db" //改為數(shù)據(jù)庫存儲
## file store
file {
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
max-branch-session-size = 16384
# globe session size , if exceeded throws exceptions
max-global-session-size = 512
# file buffer size , if exceeded allocate new buffer
file-write-buffer-cache-size = 16384
# when recover batch read size
session.reload.read_size = 100
# async, sync
flush-disk-mode = async
}
## database store
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
##添加你的數(shù)據(jù)庫的相關配置
db-type = "mysql"
driver-class-name = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC"
user = "root"
password = "123"
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}
}
3.修改register.conf下的配置文件
registry {
# file 、nacos 绊茧、eureka铝宵、redis打掘、zk华畏、consul、etcd3尊蚁、sofa
type = "nacos" //指明注冊中心是nacos
nacos {
serverAddr = "localhost:8848" //修改服務地址
namespace = ""
cluster = "default"
}
4.啟動nacos和seata
注冊成功
5.開啟測試
首先創(chuàng)建3個微服務亡笑,一個訂單服務,一個庫存服務横朋,一個賬戶服務
當用戶下單時仑乌,訂單服務中生成一個訂單,然后通過遠程調(diào)用庫存服務扣減庫存琴锭,再通過遠程調(diào)用扣減余額晰甚,最后在訂單服務中修改訂單的狀態(tài)為已完成。
1.創(chuàng)建數(shù)據(jù)庫和模塊
模塊的創(chuàng)建
導入依賴
<!-- seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<!-- 因為兼容版本問題,所以需要剔除它自己的seata的包 -->
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 使用自己引入的seata版本-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.1.0</version>
</dependency>
這里主要使用order模塊對storage和account模塊進行操作
storage層業(yè)務代碼
@RestController
public class StorageController {
@Autowired
private StorageService storageService;
@RequestMapping("/storage/decrease")
public CommonResult decrease(@RequestParam("productId")Long productId, @RequestParam("count")Integer count){
storageService.decrease(productId,count);
return new CommonResult(200,"扣減庫存成功决帖!");
}
}
service層
@Service
@Slf4j
public class StorageServiceImpl implements StorageService {
@Resource
private StorageDao storageDao;
@Override
public void decrease(Long productId, Integer count) {
log.info("------->扣減庫存開始");
storageDao.decrease(productId,count);
log.info("------->扣減庫存完成");
}
}
account層業(yè)務代碼類似厕九,并且需要配置file.conf和register.conf,因為seata默認不支持yml配置方式,可以使用文件的方式進行配置地回。
file.conf文件下的修改
service {
#vgroup->rgroup
vgroup_mapping.fsp_tx_group = "default" //*******這里注意修改
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}
register.conf文件不變
registry {
# file 扁远、nacos 、eureka刻像、redis畅买、zk、consul细睡、etcd3谷羞、sofa
type = "nacos"
nacos {
serverAddr = "localhost:8848"
namespace = ""
cluster = "default"
}
我們知道,對于事務的處理溜徙,最重要的是要拿到數(shù)據(jù)源洒宝,因為通過數(shù)據(jù)源我們可以控制事務什么時候回滾或提交,所以數(shù)據(jù)源我們需要讓seata來代理萌京,在我們的啟動注解上排除自動加載的數(shù)據(jù)源@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@Configuration
public class DataSourceProxyConfig {
@Value("${mybatis.mapper-locations}")
private String mapperLocations;
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource() {
return new DruidDataSource();
}
@Bean
public DataSourceProxy dataSourceProxy(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSourceProxy);
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
bean.setMapperLocations(resolver.getResources(mapperLocations));
bean.setTransactionFactory(new SpringManagedTransactionFactory());
return bean.getObject();
}
}
下面主要看order層的業(yè)務代碼雁歌。
service層的業(yè)務結構
AccountService和StorageService的業(yè)務代碼
@FeignClient(value = "seata-account-service")
public interface AccountService {
@PostMapping("/account/decrease")
CommonResult decrease(@RequestParam("userId")Long userId, @RequestParam("money")BigDecimal money);
}
@FeignClient(value = "seata-storage-service")
public interface StorageService {
@PostMapping("/storage/decrease")
CommonResult decrease(@RequestParam("productId")Long productId,@RequestParam("count")Integer count);
}
orderService的實現(xiàn)類
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Resource
private OrderDao orderDao;
@Resource
private StorageService storageService;
@Resource
private AccountService accountService;
@Override
@GlobalTransactional(name="fsp-create-order",rollbackFor = Exception.class) //這里名字不唯一,處理異持校回滾
public void create(Order order) {
log.info("------>開始新建訂單");
orderDao.create(order);
log.info("------>訂單微服務開始調(diào)用庫存");
storageService.decrease(order.getProductId(),order.getCount());
log.info("------->訂單微服務開始調(diào)用庫存靠瞎,做扣減end");
accountService.decrease(order.getUserId(),order.getMoney());
//修改訂單的狀態(tài)
log.info("------->修改訂單");
orderDao.update(order.getUserId(),0);
log.info("------->訂單完成");
}
}
最后執(zhí)行操作比庄,完成。
總結
TC:事務協(xié)調(diào)者
維護全局和分支事務的狀態(tài)乏盐,驅動全局事務提交或回滾佳窑。
TM:事務管理者
定義全局事務的范圍:開始全局范圍,提交或回滾全局事務
RM:資源管理器
管理分支事務處理的資源父能,與TC交談以注冊分支事務和報告分支事務的狀態(tài)神凑,并驅動分支事務提交或回滾。
這里TC就等于seata服務器何吝,TM就是添加@GlobalTransactional注解的事務發(fā)起方溉委,RM就是每一個數(shù)據(jù)庫。