seata源碼解析系列-XA模式

寫在前面

seata-XA模式是利用事務(wù)資源(數(shù)據(jù)庫贱鼻、消息服務(wù)等)對 XA 協(xié)議的支持勒虾,以 XA 協(xié)議的機(jī)制來管理分支事務(wù)的一種解決方案县踢。

跟AT、 TCC模式對比悼瓮,XA擁有全局事務(wù)的隔離性戈毒,并且基于數(shù)據(jù)庫現(xiàn)有的XA能力,seata可以對全局事務(wù)的實(shí)現(xiàn)進(jìn)行“打薄”(比如undolog的保存重放刪除横堡、全局鎖等都可以不做)埋市,并且適配已經(jīng)在使用XA模式的業(yè)務(wù)形態(tài)。但缺點(diǎn)顯而易見命贴,XA是三種模式中性能最差的道宅。

在代碼實(shí)現(xiàn)中食听,AT模式與XA模式大同小異,都是一樣通過TM污茵、RM樱报、TC三種協(xié)同處理全局事務(wù)。只不過TCC模式下分支事務(wù)的注冊省咨、提交或回滾是調(diào)用自己實(shí)現(xiàn)的TCC行為類進(jìn)行處理肃弟。

XA模式三端協(xié)同處理圖

XA-TM端分析

首先說明,XA流程同樣需要打上@GolbalTransaction注解的TM范圍中完成分支注冊零蓉、提交或回滾笤受,不在細(xì)說。

XA-RM端分析

XA模式同樣是通過自上而下的動態(tài)代理數(shù)據(jù)庫相關(guān)類(Database敌蜂、Connection箩兽、Statement),在原方法執(zhí)行過程中對sql的執(zhí)行進(jìn)行攔截處理章喉。
首先AT-RM端會將原DataSource進(jìn)行代理形成DataSourceProxyXA汗贫,這樣可以通過DataSourceProxyXA從原DataSource獲取Connection進(jìn)行代理形成XAConnection封裝的ConnectionProxyXA,最后可以通過ConnectionProxyXA獲取Statement進(jìn)行代理形成StatementProxyXA秸脱。由ConnectionProxyXA與StatementProxyXA接管sql的執(zhí)行與全局事務(wù)的處理落包。


AT與XA模式的代理對比
    public DataSourceProxyXA(DataSource dataSource, String resourceGroupId) {
        if (dataSource instanceof SeataDataSourceProxy) {
            LOGGER.info("Unwrap the data source, because the type is: {}", dataSource.getClass().getName());
            dataSource = ((SeataDataSourceProxy) dataSource).getTargetDataSource();
        }
        this.dataSource = dataSource;
        this.branchType = BranchType.XA;
        JdbcUtils.initDataSourceResource(this, dataSource, resourceGroupId);

        //Set the default branch type to 'XA' in the RootContext.
        RootContext.setDefaultBranchType(this.getBranchType());
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        Connection connection = dataSource.getConnection(username, password);
        return getConnectionProxy(connection);
    }

    @Override
    public Statement createStatement() throws SQLException {
        Statement targetStatement = originalConnection.createStatement();
        return new StatementProxyXA(this, targetStatement);
    }

同樣的,看StatementProxyXA的方法都被ExecuteTemplateXA執(zhí)行模板封裝執(zhí)行摊唇。

    @Override
    public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
        return ExecuteTemplateXA.execute(connectionProxyXA, (statement, args) -> statement.executeUpdate((String)args[0], (int[])args[1]), targetStatement, sql, columnIndexes);
    }

來到ExecuteTemplateXA咐蝇,同樣會執(zhí)行先開門再關(guān)門操作,調(diào)用 connectionProxyXA.setAutoCommit(false)先開門巷查,執(zhí)行原sql有序,再根據(jù)執(zhí)行結(jié)果調(diào)用connectionProxyXA.commit或者connectionProxyXA.rollback,最后調(diào)用connectionProxyXA.setAutoCommit(true)進(jìn)行關(guān)門岛请。

    public static <T, S extends Statement> T execute(AbstractConnectionProxyXA connectionProxyXA,
                                                     StatementCallback<T, S> statementCallback,
                                                     S targetStatement,
                                                     Object... args) throws SQLException {
        boolean autoCommitStatus = connectionProxyXA.getAutoCommit();
        if (autoCommitStatus) {
            // XA Start
            connectionProxyXA.setAutoCommit(false);
        }
        try {
            T res = null;
            try {
                // execute SQL
                res = statementCallback.execute(targetStatement, args);

            } catch (Throwable ex) {
                if (autoCommitStatus) {
                    // XA End & Rollback
                    try {
                        connectionProxyXA.rollback();
                    } catch (SQLException sqle) {
                        // log and ignore the rollback failure.
                        LOGGER.warn(
                            "Failed to rollback xa branch of " + connectionProxyXA.xid +
                                "(caused by SQL execution failure(" + ex.getMessage() + ") since " + sqle.getMessage(),
                            sqle);
                    }
                }

                if (ex instanceof SQLException) {
                    throw ex;
                } else {
                    throw new SQLException(ex);
                }

            }
            if (autoCommitStatus) {
                try {
                    // XA End & Prepare
                    connectionProxyXA.commit();
                } catch (Throwable ex) {
                    LOGGER.warn(
                        "Failed to commit xa branch of " + connectionProxyXA.xid + ") since " + ex.getMessage(),
                        ex);
                    // XA End & Rollback
                    if (!(ex instanceof SQLException) || AbstractConnectionProxyXA.SQLSTATE_XA_NOT_END != ((SQLException) ex).getSQLState()) {
                        try {
                            connectionProxyXA.rollback();
                        } catch (SQLException sqle) {
                            // log and ignore the rollback failure.
                            LOGGER.warn(
                                "Failed to rollback xa branch of " + connectionProxyXA.xid +
                                    "(caused by commit failure(" + ex.getMessage() + ") since " + sqle.getMessage(),
                                sqle);
                        }
                    }

                    if (ex instanceof SQLException) {
                        throw ex;
                    } else {
                        throw new SQLException(ex);
                    }

                }
            }
            return res;
        } finally {
            if (autoCommitStatus) {
                connectionProxyXA.setAutoCommit(true);
            }

        }
    }

1.來到connectionProxyXA.setAutoCommit(false)旭寿,內(nèi)部的操作其實(shí)并不只是設(shè)置自動提交狀態(tài)為否,還帶有向TC注冊分支的操作崇败,然后調(diào)用
xaResource.start盅称,啟動XA分支事務(wù)。

 @Override
    public void setAutoCommit(boolean autoCommit) throws SQLException {
        if (currentAutoCommitStatus == autoCommit) {
            return;
        }
        if (autoCommit) {
            // According to JDBC spec:
            // If this method is called during a transaction and the
            // auto-commit mode is changed, the transaction is committed.
            if (xaActive) {
                commit();
            }
        } else {
            if (xaActive) {
                throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active");
            }
            // Start a XA branch
            long branchId = 0L;
            try {
                // 1. register branch to TC then get the branchId
                branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null,
                    null);
            } catch (TransactionException te) {
                cleanXABranchContext();
                throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te);
            }
            // 2. build XA-Xid with xid and branchId
            this.xaBranchXid = XAXidBuilder.build(xid, branchId);
            try {
                // 3. XA Start
                xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);
            } catch (XAException e) {
                cleanXABranchContext();
                throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e);
            }
            // 4. XA is active
            this.xaActive = true;

        }

        currentAutoCommitStatus = autoCommit;
    }

2.statementCallback.execute后室,將在XA分支事務(wù)的范圍下執(zhí)行sql微渠,不再細(xì)說。

3調(diào)用 connectionProxyXA.commit或者connectionProxyXA.rollback咧擂。

先看connectionProxyXA.commit逞盆,由上文可知,到這個(gè)方法處理前松申,已經(jīng)完成了xaResource.start開始XA分支事務(wù)與執(zhí)行sql云芦,因此commit方法需要調(diào)用xaResource.end表明執(zhí)行成功結(jié)束XA分支事務(wù)俯逾,并且調(diào)用xaResource.prepare方法,保持xa連接舅逸,將等待TC全局提交的回調(diào)桌肴,最后清除上下文。

  @Override
    public void commit() throws SQLException {
        if (currentAutoCommitStatus) {
            // Ignore the committing on an autocommit session.
            return;
        }
        if (!xaActive || this.xaBranchXid == null) {
            throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END);
        }
        try {
            // XA End: Success
            xaResource.end(xaBranchXid, XAResource.TMSUCCESS);
            // XA Prepare
            xaResource.prepare(xaBranchXid);
            // Keep the Connection if necessary
            keepIfNecessary();
        } catch (XAException xe) {
            try {
                // Branch Report to TC: Failed
                DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(),
                    BranchStatus.PhaseOne_Failed, null);
            } catch (TransactionException te) {
                LOGGER.warn("Failed to report XA branch commit-failure on " + xid + "-" + xaBranchXid.getBranchId()
                    + " since " + te.getCode() + ":" + te.getMessage() + " and XAException:" + xe.getMessage());

            }
            throw new SQLException(
                "Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
                    .getMessage(), xe);
        } finally {
            cleanXABranchContext();
        }
    }

再看connectionProxyXA.rollback琉历,xa分支事務(wù)回滾需要向TC報(bào)告分支事務(wù)執(zhí)行失敗坠七,然后需要調(diào)用xaResource.end表明執(zhí)行失敗結(jié)束XA分支事務(wù),最后清除上下文旗笔。

  @Override
    public void rollback() throws SQLException {
        if (currentAutoCommitStatus) {
            // Ignore the committing on an autocommit session.
            return;
        }
        if (!xaActive || this.xaBranchXid == null) {
            throw new SQLException("should NOT rollback on an inactive session");
        }
        try {
            // Branch Report to TC
            DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), BranchStatus.PhaseOne_Failed, null);

        } catch (TransactionException te) {
            // log and ignore the report failure
            LOGGER.warn("Failed to report XA branch rollback on " + xid + "-" + xaBranchXid.getBranchId()
                + " since " + te.getCode() + ":" + te.getMessage());
        }
        try {
            // XA End: Fail
            xaResource.end(xaBranchXid, XAResource.TMFAIL);
        } catch (XAException xe) {
            throw new SQLException(
                "Failed to end(TMFAIL) xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
                    .getMessage(), xe);
        } finally {
            cleanXABranchContext();
        }
    }

TC的全局提交或回滾回調(diào)會流經(jīng)ResourceManagerXA進(jìn)行處理彪置,都會進(jìn)入到finishBranch方法,如果是全局提交回調(diào)則調(diào)用connectionProxyXA.xaCommit蝇恶,否則調(diào)用connectionProxyXA.xaRollback拳魁,報(bào)錯(cuò)則會通知TC進(jìn)行補(bǔ)償重試。

private BranchStatus finishBranch(boolean committed, BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        XAXid xaBranchXid = XAXidBuilder.build(xid, branchId);
        Resource resource = dataSourceCache.get(resourceId);
        if (resource instanceof AbstractDataSourceProxyXA) {
            try (ConnectionProxyXA connectionProxyXA = ((AbstractDataSourceProxyXA)resource).getConnectionForXAFinish(xaBranchXid)) {
                if (committed) {
                    connectionProxyXA.xaCommit(xid, branchId, applicationData);
                    LOGGER.info(xaBranchXid + " was committed.");
                    return BranchStatus.PhaseTwo_Committed;
                } else {
                    connectionProxyXA.xaRollback(xid, branchId, applicationData);
                    LOGGER.info(xaBranchXid + " was rolled back.");
                    return BranchStatus.PhaseTwo_Rollbacked;
                }
            } catch (XAException | SQLException sqle) {
                if (sqle instanceof XAException) {
                    if (((XAException)sqle).errorCode == XAException.XAER_NOTA) {
                        if (committed) {
                            return BranchStatus.PhaseTwo_Committed;
                        } else {
                            return BranchStatus.PhaseTwo_Rollbacked;
                        }
                    }
                }
                if (committed) {
                    LOGGER.info(xaBranchXid + " commit failed since " + sqle.getMessage(), sqle);
                    // FIXME: case of PhaseTwo_CommitFailed_Unretryable
                    return BranchStatus.PhaseTwo_CommitFailed_Retryable;
                } else {
                    LOGGER.info(xaBranchXid + " rollback failed since " + sqle.getMessage(), sqle);
                    // FIXME: case of PhaseTwo_RollbackFailed_Unretryable
                    return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
                }
            }
        } else {
            LOGGER.error("Unknown Resource for XA resource " + resourceId + " " + resource);
            if (committed) {
                return BranchStatus.PhaseTwo_CommitFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            }
        }
    }

connectionProxyXA的xaCommit與xaRollback撮弧,會分別調(diào)用xaResource.commit與xaResource.rollback方法潘懊,完成二階段的提交或回滾,并且釋放緩存的XA連接贿衍。

 public void xaCommit(String xid, long branchId, String applicationData) throws XAException {
        XAXid xaXid = XAXidBuilder.build(xid, branchId);
        xaResource.commit(xaXid, false);
        releaseIfNecessary();

    }

 public void xaRollback(String xid, long branchId, String applicationData) throws XAException {
        XAXid xaXid = XAXidBuilder.build(xid, branchId);
        xaResource.rollback(xaXid);
        releaseIfNecessary();

    }

XA-TC端分析

同樣的授舟,RM端分支注冊會創(chuàng)建BatchSession保存在數(shù)據(jù)庫,TM執(zhí)行全局事務(wù)的提交或回滾時(shí)也會進(jìn)行回調(diào)與補(bǔ)償處理贸辈,TC端的處理不再細(xì)說释树。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者裙椭。
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市署浩,隨后出現(xiàn)的幾起案子揉燃,更是在濱河造成了極大的恐慌,老刑警劉巖筋栋,帶你破解...
    沈念sama閱讀 219,188評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件炊汤,死亡現(xiàn)場離奇詭異,居然都是意外死亡弊攘,警方通過查閱死者的電腦和手機(jī)抢腐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來襟交,“玉大人迈倍,你說我怎么就攤上這事〉酚颍” “怎么了啼染?”我有些...
    開封第一講書人閱讀 165,562評論 0 356
  • 文/不壞的土叔 我叫張陵宴合,是天一觀的道長。 經(jīng)常有香客問我迹鹅,道長卦洽,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,893評論 1 295
  • 正文 為了忘掉前任斜棚,我火速辦了婚禮阀蒂,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘弟蚀。我一直安慰自己蚤霞,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,917評論 6 392
  • 文/花漫 我一把揭開白布粗梭。 她就那樣靜靜地躺著争便,像睡著了一般。 火紅的嫁衣襯著肌膚如雪断医。 梳的紋絲不亂的頭發(fā)上滞乙,一...
    開封第一講書人閱讀 51,708評論 1 305
  • 那天,我揣著相機(jī)與錄音鉴嗤,去河邊找鬼斩启。 笑死,一個(gè)胖子當(dāng)著我的面吹牛醉锅,可吹牛的內(nèi)容都是我干的兔簇。 我是一名探鬼主播,決...
    沈念sama閱讀 40,430評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼硬耍,長吁一口氣:“原來是場噩夢啊……” “哼垄琐!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起经柴,我...
    開封第一講書人閱讀 39,342評論 0 276
  • 序言:老撾萬榮一對情侶失蹤狸窘,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后坯认,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體翻擒,經(jīng)...
    沈念sama閱讀 45,801評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,976評論 3 337
  • 正文 我和宋清朗相戀三年牛哺,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了陋气。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,115評論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡引润,死狀恐怖巩趁,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情淳附,我是刑警寧澤晶渠,帶...
    沈念sama閱讀 35,804評論 5 346
  • 正文 年R本政府宣布凰荚,位于F島的核電站,受9級特大地震影響褒脯,放射性物質(zhì)發(fā)生泄漏便瑟。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,458評論 3 331
  • 文/蒙蒙 一番川、第九天 我趴在偏房一處隱蔽的房頂上張望到涂。 院中可真熱鬧,春花似錦颁督、人聲如沸践啄。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,008評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽屿讽。三九已至,卻和暖如春吠裆,著一層夾襖步出監(jiān)牢的瞬間伐谈,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,135評論 1 272
  • 我被黑心中介騙來泰國打工试疙, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留诵棵,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,365評論 3 373
  • 正文 我出身青樓祝旷,卻偏偏與公主長得像履澳,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子怀跛,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,055評論 2 355

推薦閱讀更多精彩內(nèi)容