Spring多數(shù)據(jù)源開啟事務(wù)

背景

項(xiàng)目中一個(gè)service中需要更新兩個(gè)數(shù)據(jù)源中的數(shù)據(jù),并且業(yè)務(wù)邏輯比較復(fù)雜庐船。如果不加事務(wù)的話银酬,一旦程序報(bào)錯(cuò)容易產(chǎn)生臟數(shù)據(jù),處理起來比較麻煩筐钟。
考慮到此項(xiàng)目為單體應(yīng)用揩瞪,不涉及到分布式。所以沒有采用分布式事務(wù)來解決此次問題篓冲。此次采用的方案是使用AOP對(duì)需要開啟多數(shù)據(jù)源的方法前后進(jìn)行加強(qiáng)李破。

多數(shù)據(jù)源開啟事務(wù)參考了 https://www.cnblogs.com/shuaiandjun/p/8667815.html

spring + mybatis開啟事務(wù)的方法

單個(gè)數(shù)據(jù)庫開啟事務(wù),我們只需要在方法上方加@Transactional注解即可壹将,但是多數(shù)據(jù)源這個(gè)注解是不好使的嗤攻,只能管理一個(gè)數(shù)據(jù)庫的事務(wù)。下面來分析一下spring 加 mybatis 是如何管理事務(wù)的
根據(jù)配置文件中配置的

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="dataSource" />
</bean>

我們從org.springframework.jdbc.datasource.DataSourceTransactionManager 這個(gè)類入手诽俯,首先看下這個(gè)類中有哪些方法

圖片.png

根據(jù)以往看源碼的經(jīng)驗(yàn)妇菱,一般do...的方法是真正處理邏輯的。直接上源碼吧暴区,

@Override
protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    ConnectionHolder conHolder =
            (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

我們看到這個(gè)方法中只是設(shè)置了一些屬性闯团,并沒有開啟事務(wù),

圖片.png

通過idea我們找到調(diào)用此方法的地方颜启,是DataSourceTransactionManager的父類方法org.springframework.transaction.support.AbstractPlatformTransactionManager
而這個(gè)方法是spring容器提供的抽象方法偷俭,里面的getTransaction(TransactionDefinition definition)調(diào)用了上述方法

@Override
// TransactionDefinition 這個(gè)參數(shù)為事務(wù)的屬性信息 其中包括 PROPAGATION:事務(wù)傳播等級(jí) ISOLATION 事務(wù)隔離級(jí)別
    public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
        // 這里是調(diào)用了我們配置文件中配置的DataSourceTransactionManager 中的doGetTransaction();
        Object transaction = doGetTransaction();

        // Cache debug flag to avoid repeated checks.
        boolean debugEnabled = logger.isDebugEnabled();

        if (definition == null) {
            // Use defaults if no transaction definition given.
     // 這里創(chuàng)建了默認(rèn)的事務(wù)屬性信息
            definition = new DefaultTransactionDefinition();
        }

        if (isExistingTransaction(transaction)) {
            // Existing transaction found -> check propagation behavior to find out how to behave.
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

        // Check definition settings for new transaction.
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
     // 校驗(yàn)如果事務(wù)傳播級(jí)別如果是 
//REQUIRED:支持當(dāng)前事務(wù),如果當(dāng)前沒有事務(wù)缰盏,就新建一個(gè)事務(wù)涌萤。這是最常見的選擇。 
//REQUIRES_NEW:新建事務(wù)口猜,如果當(dāng)前存在事務(wù)负溪,把當(dāng)前事務(wù)掛起。 
//NESTED:支持當(dāng)前事務(wù)济炎,如果當(dāng)前事務(wù)存在川抡,則執(zhí)行一個(gè)嵌套事務(wù),如果當(dāng)前沒有事務(wù),就新建一個(gè)事務(wù)崖堤。
// 這三種的話就需要開啟事務(wù)了
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException ex) {
                resume(null, suspendedResources);
                throw ex;
            }
            catch (Error err) {
                resume(null, suspendedResources);
                throw err;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + definition);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }

這個(gè)方法中調(diào)用了 doBegin(transaction, definition)方法侍咱,而此方法在DataSourceTransactionManager這個(gè)方法中重寫了,也就是剛剛我們分析到的方法密幔。根據(jù)我們的經(jīng)驗(yàn)應(yīng)該是在此方法中打開事務(wù)的楔脯,那就直接上源碼吧

    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
            if (!txObject.hasConnectionHolder() ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection newCon = this.dataSource.getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }

            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();

            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);

            // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
            // so we don't want to do it unnecessarily (for example if we've explicitly
            // configured the connection pool to set it already).
    // 我們終于看到了在這里將 autocommit設(shè)置為了 false 也就是開啟了事務(wù)
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
                con.setAutoCommit(false);
            }

            prepareTransactionalConnection(con, definition);
            txObject.getConnectionHolder().setTransactionActive(true);

            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
                        // 將連接綁定到當(dāng)前線程
            // Bind the connection holder to the thread.
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
            }
        }

        catch (Throwable ex) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, this.dataSource);
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }

通過對(duì)源碼的分析,得出想要手動(dòng)設(shè)置開啟事務(wù)胯甩,需要獲取到spring容器中 DataSourceTransactionManager對(duì)象昧廷,調(diào)用它的getTransaction方法即可。
mybatis會(huì)將事務(wù)綁定到當(dāng)前線程偎箫,也就是開啟事務(wù)后當(dāng)前線程會(huì)一直持有數(shù)據(jù)庫連接木柬,當(dāng)前線程對(duì)數(shù)據(jù)庫的操作都是在一個(gè)事務(wù)中。直到事務(wù)回滾或者提交淹办。


編寫切面方法

了解到如何開啟事務(wù)眉枕,就可以自己動(dòng)手編寫多數(shù)據(jù)源開啟事務(wù)了。

  1. 首先我們需要獲取spring容器中的DataSourceTransactionManager對(duì)象娇唯。
  2. 創(chuàng)建注解類
  3. 編寫切面方法
// 注解類
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MultiTransactionAnno {
}
/**
 * @author :wang.j.f
 * @description:TODO
 * @date :Created in 2021/4/28 15:53
 * @modified By:
 * @version: 1.0$
 */
@Aspect
@Component
public class MultiTransactionalAspect  {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private static final String[] transactionManagerNames = {"transactionManager", "transactionManager2"};

    @Pointcut("@annotation(com.tianma.service.aop.MultiTransactionAnno)")
    public void transactionAnno(){}

    @Around(value = "transactionAnno()")
    public Object menageTransaction(ProceedingJoinPoint joinPoint){
        Object result = null;
        Stack<DataSourceTransactionManager> dataSourceTransactionManagers = new Stack<>();
        Stack<TransactionStatus> transactionStatuses = new Stack<>();
        openTransaction(dataSourceTransactionManagers, transactionStatuses);
        try {
            result = joinPoint.proceed();
            commit(dataSourceTransactionManagers, transactionStatuses);
        }catch (Throwable e){
            logger.error("異常", e);
            rollback(dataSourceTransactionManagers, transactionStatuses);
        }

        return result;
    }

    /**
     * 開啟事務(wù)
     * @param dataSourceTransactionManagers
     * @param transactionStatuses
     * @return
     */
    private void openTransaction(Stack<DataSourceTransactionManager> dataSourceTransactionManagers,
                                    Stack<TransactionStatus> transactionStatuses){

        // 開啟事務(wù)并將transactionManager 和 transactionStatus 入棧
        for (String transactionManagerName : transactionManagerNames) {
            DataSourceTransactionManager dataSourceTransactionManager = (DataSourceTransactionManager) SpringContextUtil.getBean(transactionManagerName);
            TransactionStatus status = dataSourceTransactionManager.getTransaction(new DefaultTransactionDefinition());
            transactionStatuses.push(status);
            dataSourceTransactionManagers.push(dataSourceTransactionManager);
            logger.info("事務(wù)開啟成功:{}", transactionManagerName);
        }
    }

    /**
     * 提交棧中事務(wù)
     * @param dataSourceTransactionManagerStack
     * @param transactionStatuStack
     */
    private void commit(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack,
                        Stack<TransactionStatus> transactionStatuStack){
        while (!dataSourceTransactionManagerStack.isEmpty()){
            dataSourceTransactionManagerStack.pop().commit(transactionStatuStack.pop());
            logger.info("事務(wù)提交成功齐遵!");
        }
    }

    /**
     * 回滾棧中事務(wù)
     * @param dataSourceTransactionManagerStack
     * @param transactionStatuStack
     */
    private void rollback(Stack<DataSourceTransactionManager> dataSourceTransactionManagerStack,
                          Stack<TransactionStatus> transactionStatuStack){
        while (!dataSourceTransactionManagerStack.isEmpty()){
            dataSourceTransactionManagerStack.pop().rollback(transactionStatuStack.pop());
            logger.error("事務(wù)回滾惕艳!");
        }
    }
}

遺留問題

利用此方法是可以解決步淹,兩個(gè)事務(wù)一同提交和一同回滾的問題棠隐。但是由于多個(gè)事務(wù)提交和回滾是有先后順序的幽邓,當(dāng)首先執(zhí)行的事務(wù)提交或者回滾成功驶俊,而后面事務(wù)失敗時(shí)泼差,同樣會(huì)產(chǎn)生兩個(gè)數(shù)據(jù)庫數(shù)據(jù)不一致的情況躯概。
所以年栓,當(dāng)對(duì)數(shù)據(jù)一致性要求很高的業(yè)務(wù)場(chǎng)景流纹,還是盡量使用現(xiàn)有的分布式事務(wù)解決方案來管理事務(wù)比較穩(wěn)妥糜烹。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市漱凝,隨后出現(xiàn)的幾起案子疮蹦,更是在濱河造成了極大的恐慌,老刑警劉巖茸炒,帶你破解...
    沈念sama閱讀 217,277評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件愕乎,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡壁公,警方通過查閱死者的電腦和手機(jī)感论,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來紊册,“玉大人比肄,你說我怎么就攤上這事。” “怎么了芳绩?”我有些...
    開封第一講書人閱讀 163,624評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵掀亥,是天一觀的道長。 經(jīng)常有香客問我示括,道長铺浇,這世上最難降的妖魔是什么痢畜? 我笑而不...
    開封第一講書人閱讀 58,356評(píng)論 1 293
  • 正文 為了忘掉前任垛膝,我火速辦了婚禮,結(jié)果婚禮上丁稀,老公的妹妹穿的比我還像新娘吼拥。我一直安慰自己,他們只是感情好线衫,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,402評(píng)論 6 392
  • 文/花漫 我一把揭開白布凿可。 她就那樣靜靜地躺著,像睡著了一般授账。 火紅的嫁衣襯著肌膚如雪枯跑。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,292評(píng)論 1 301
  • 那天白热,我揣著相機(jī)與錄音敛助,去河邊找鬼。 笑死屋确,一個(gè)胖子當(dāng)著我的面吹牛纳击,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播攻臀,決...
    沈念sama閱讀 40,135評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼焕数,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了刨啸?” 一聲冷哼從身側(cè)響起堡赔,我...
    開封第一講書人閱讀 38,992評(píng)論 0 275
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎设联,沒想到半個(gè)月后善已,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡仑荐,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,636評(píng)論 3 334
  • 正文 我和宋清朗相戀三年雕拼,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片粘招。...
    茶點(diǎn)故事閱讀 39,785評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡啥寇,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情辑甜,我是刑警寧澤衰絮,帶...
    沈念sama閱讀 35,492評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站磷醋,受9級(jí)特大地震影響猫牡,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜邓线,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,092評(píng)論 3 328
  • 文/蒙蒙 一淌友、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧骇陈,春花似錦震庭、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至婿崭,卻和暖如春拨拓,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背氓栈。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評(píng)論 1 269
  • 我被黑心中介騙來泰國打工渣磷, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人颤绕。 一個(gè)月前我還...
    沈念sama閱讀 47,891評(píng)論 2 370
  • 正文 我出身青樓幸海,卻偏偏與公主長得像,于是被迫代替她去往敵國和親奥务。 傳聞我的和親對(duì)象是個(gè)殘疾皇子物独,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,713評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容