背景
項(xiàng)目中一個(gè)service中需要更新兩個(gè)數(shù)據(jù)源中的數(shù)據(jù),并且業(yè)務(wù)邏輯比較復(fù)雜庐船。如果不加事務(wù)的話银酬,一旦程序報(bào)錯(cuò)容易產(chǎn)生臟數(shù)據(jù),處理起來比較麻煩筐钟。
考慮到此項(xiàng)目為單體應(yīng)用揩瞪,不涉及到分布式。所以沒有采用分布式事務(wù)來解決此次問題篓冲。此次采用的方案是使用AOP對(duì)需要開啟多數(shù)據(jù)源的方法前后進(jìn)行加強(qiáng)李破。
多數(shù)據(jù)源開啟事務(wù)參考了 https://www.cnblogs.com/shuaiandjun/p/8667815.html
spring + mybatis開啟事務(wù)的方法
單個(gè)數(shù)據(jù)庫開啟事務(wù),我們只需要在方法上方加@Transactional
注解即可壹将,但是多數(shù)據(jù)源這個(gè)注解是不好使的嗤攻,只能管理一個(gè)數(shù)據(jù)庫的事務(wù)。下面來分析一下spring 加 mybatis 是如何管理事務(wù)的
根據(jù)配置文件中配置的
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
我們從org.springframework.jdbc.datasource.DataSourceTransactionManager
這個(gè)類入手诽俯,首先看下這個(gè)類中有哪些方法
根據(jù)以往看源碼的經(jīng)驗(yàn)妇菱,一般do...的方法是真正處理邏輯的。直接上源碼吧暴区,
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
我們看到這個(gè)方法中只是設(shè)置了一些屬性闯团,并沒有開啟事務(wù),
通過idea我們找到調(diào)用此方法的地方颜启,是DataSourceTransactionManager的父類方法
org.springframework.transaction.support.AbstractPlatformTransactionManager
而這個(gè)方法是spring容器提供的抽象方法偷俭,里面的
getTransaction(TransactionDefinition definition)
調(diào)用了上述方法
@Override
// TransactionDefinition 這個(gè)參數(shù)為事務(wù)的屬性信息 其中包括 PROPAGATION:事務(wù)傳播等級(jí) ISOLATION 事務(wù)隔離級(jí)別
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
// 這里是調(diào)用了我們配置文件中配置的DataSourceTransactionManager 中的doGetTransaction();
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
// 這里創(chuàng)建了默認(rèn)的事務(wù)屬性信息
definition = new DefaultTransactionDefinition();
}
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 校驗(yàn)如果事務(wù)傳播級(jí)別如果是
//REQUIRED:支持當(dāng)前事務(wù),如果當(dāng)前沒有事務(wù)缰盏,就新建一個(gè)事務(wù)涌萤。這是最常見的選擇。
//REQUIRES_NEW:新建事務(wù)口猜,如果當(dāng)前存在事務(wù)负溪,把當(dāng)前事務(wù)掛起。
//NESTED:支持當(dāng)前事務(wù)济炎,如果當(dāng)前事務(wù)存在川抡,則執(zhí)行一個(gè)嵌套事務(wù),如果當(dāng)前沒有事務(wù),就新建一個(gè)事務(wù)崖堤。
// 這三種的話就需要開啟事務(wù)了
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
這個(gè)方法中調(diào)用了 doBegin(transaction, definition)方法侍咱,而此方法在DataSourceTransactionManager
這個(gè)方法中重寫了,也就是剛剛我們分析到的方法密幔。根據(jù)我們的經(jīng)驗(yàn)應(yīng)該是在此方法中打開事務(wù)的楔脯,那就直接上源碼吧
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
// 我們終于看到了在這里將 autocommit設(shè)置為了 false 也就是開啟了事務(wù)
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 將連接綁定到當(dāng)前線程
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
通過對(duì)源碼的分析,得出想要手動(dòng)設(shè)置開啟事務(wù)胯甩,需要獲取到spring容器中 DataSourceTransactionManager對(duì)象昧廷,調(diào)用它的getTransaction方法即可。
mybatis會(huì)將事務(wù)綁定到當(dāng)前線程偎箫,也就是開啟事務(wù)后當(dāng)前線程會(huì)一直持有數(shù)據(jù)庫連接木柬,當(dāng)前線程對(duì)數(shù)據(jù)庫的操作都是在一個(gè)事務(wù)中。直到事務(wù)回滾或者提交淹办。
編寫切面方法
了解到如何開啟事務(wù)眉枕,就可以自己動(dòng)手編寫多數(shù)據(jù)源開啟事務(wù)了。
- 首先我們需要獲取spring容器中的DataSourceTransactionManager對(duì)象娇唯。
- 創(chuàng)建注解類
- 編寫切面方法
// 注解類
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MultiTransactionAnno {
}
/**
* @author :wang.j.f
* @description:TODO
* @date :Created in 2021/4/28 15:53
* @modified By:
* @version: 1.0$
*/
@Aspect
@Component
public class MultiTransactionalAspect {
private Logger logger = LoggerFactory.getLogger(getClass());
private static final String[] transactionManagerNames = {"transactionManager", "transactionManager2"};
@Pointcut("@annotation(com.tianma.service.aop.MultiTransactionAnno)")
public void transactionAnno(){}
@Around(value = "transactionAnno()")
public Object menageTransaction(ProceedingJoinPoint joinPoint){
Object result = null;
Stack<DataSourceTransactionManager> dataSourceTransactionManagers = new Stack<>();
Stack<TransactionStatus> transactionStatuses = new Stack<>();
openTransaction(dataSourceTransactionManagers, transactionStatuses);
try {
result = joinPoint.proceed();
commit(dataSourceTransactionManagers, transactionStatuses);
}catch (Throwable e){
logger.error("異常", e);
rollback(dataSourceTransactionManagers, transactionStatuses);
}
return result;
}
/**
* 開啟事務(wù)
* @param dataSourceTransactionManagers
* @param transactionStatuses
* @return
*/
private void openTransaction(Stack<DataSourceTransactionManager> dataSourceTransactionManagers,
Stack<TransactionStatus> transactionStatuses){
// 開啟事務(wù)并將transactionManager 和 transactionStatus 入棧
for (String transactionManagerName : transactionManagerNames) {
DataSourceTransactionManager dataSourceTransactionManager = (DataSourceTransactionManager) SpringContextUtil.getBean(transactionManagerName);
TransactionStatus status = dataSourceTransactionManager.getTransaction(new DefaultTransactionDefinition());
transactionStatuses.push(status);
dataSourceTransactionManagers.push(dataSourceTransactionManager);
logger.info("事務(wù)開啟成功:{}", transactionManagerName);
}
}
/**
* 提交棧中事務(wù)
* @param dataSourceTransactionManagerStack
* @param transactionStatuStack
*/
private void commit(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack,
Stack<TransactionStatus> transactionStatuStack){
while (!dataSourceTransactionManagerStack.isEmpty()){
dataSourceTransactionManagerStack.pop().commit(transactionStatuStack.pop());
logger.info("事務(wù)提交成功齐遵!");
}
}
/**
* 回滾棧中事務(wù)
* @param dataSourceTransactionManagerStack
* @param transactionStatuStack
*/
private void rollback(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack,
Stack<TransactionStatus> transactionStatuStack){
while (!dataSourceTransactionManagerStack.isEmpty()){
dataSourceTransactionManagerStack.pop().rollback(transactionStatuStack.pop());
logger.error("事務(wù)回滾惕艳!");
}
}
}
遺留問題
利用此方法是可以解決步淹,兩個(gè)事務(wù)一同提交和一同回滾的問題棠隐。但是由于多個(gè)事務(wù)提交和回滾是有先后順序的幽邓,當(dāng)首先執(zhí)行的事務(wù)提交或者回滾成功驶俊,而后面事務(wù)失敗時(shí)泼差,同樣會(huì)產(chǎn)生兩個(gè)數(shù)據(jù)庫數(shù)據(jù)不一致的情況躯概。
所以年栓,當(dāng)對(duì)數(shù)據(jù)一致性要求很高的業(yè)務(wù)場(chǎng)景流纹,還是盡量使用現(xiàn)有的分布式事務(wù)解決方案來管理事務(wù)比較穩(wěn)妥糜烹。