seata AT模式

描述

seata是分布式事務(wù)解決方案。分布式事務(wù)是包含若干分支事務(wù)的全局事務(wù)。如果各分支事務(wù)提交成功矛市,則全局事務(wù)提交。如果各分支有一個(gè)執(zhí)行失敗诲祸,則全局事務(wù)回滾浊吏。
分布式事務(wù)處理過程抽象為事務(wù)管理器(Transaction Manager)、分支事務(wù)(Resource Manager )救氯、事務(wù)協(xié)調(diào)器(Transaction Coordinator)找田。
TC作為服務(wù)端部署,協(xié)調(diào)RM的提交或回滾着憨。RM和TM集成作為客戶端部署墩衙。如果有注冊中心,則TC甲抖、RM漆改、TM都需要向注冊中心注冊。

適合在MVC框架中准谚,在service層方法上添加@GlobalTransactional開啟全局事務(wù)挫剑,然后service中用到的dao層是分支事務(wù)

測試環(huán)境

下面使用seata-samples中springboot-mybatis項(xiàng)目做測試,seata-simple該項(xiàng)目依賴很多柱衔,可以將不需要的子項(xiàng)目刪除再測試樊破。還需要下載seata-server

初始化

初始化DataSourceProxy中秀存,緩存DataSourceProxy捶码,生成ResourceId并向TC注冊資源。

一階段

TM

在方法上添加注解@GlobalTransactional開啟分布式事務(wù)或链。該注解由GlobalTransactionalInterceptor攔截處理惫恼。由下圖可知TM處理流程:
1.開啟全局事務(wù),向TC注冊全局事務(wù)并返回XID
2.如果業(yè)務(wù)執(zhí)行成功澳盐,通知TC全局事務(wù)提交
3.如果業(yè)務(wù)執(zhí)行失敗祈纯,通知TC全局事務(wù)回滾
4.清除內(nèi)存中XID

public Object execute(TransactionalExecutor business) throws Throwable {
        // 1. get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 1.1 get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        try {

            // 2. begin transaction
            beginTransaction(txInfo, tx);

            Object rs = null;
            try {

                // Do Your Business
                rs = business.execute();

            } catch (Throwable ex) {

                // 3.the needed business exception to rollback.
                completeTransactionAfterThrowing(txInfo,tx,ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            triggerAfterCompletion();
            cleanUp();
        }
    }

DefaultGlobalTransaction定義了提交令宿、回滾方法。執(zhí)行操作的角色有發(fā)起者(Launcher)腕窥、參與者(Participant)粒没。只有發(fā)起者才可以執(zhí)行begin、commit簇爆、rollback癞松。

RM

1.向TC注冊分支事務(wù)
2.插入回滾日志
3.提交本地事務(wù)
4.通知TC本地事務(wù)執(zhí)行狀態(tài)
5.清空內(nèi)存中XID、branchId

AT模式主要封裝了數(shù)據(jù)庫操作入蛆。DataSource响蓉、Connection、Statement封裝為DataSourceProxy哨毁、ConnectionProxy枫甲、StatementProxy。

執(zhí)行sql時(shí)扼褪,ConnectionProxy創(chuàng)建Statement包裝類StatementProxy對象想幻。StatementProxy創(chuàng)建Executor。Executor中解析目標(biāo)sql话浇,再創(chuàng)建beforeImage脏毯,執(zhí)行sql語句、創(chuàng)建afterImage幔崖。最后ConnectionProxy提交本地事務(wù)抄沮。

protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    try {
        // 設(shè)置手動(dòng)提交
        connectionProxy.setAutoCommit(false);
        return new LockRetryPolicy(connectionProxy.getTargetConnection()).execute(() -> {
            // 創(chuàng)建beforeImage、執(zhí)行sql岖瑰、創(chuàng)建afterImage
            T result = executeAutoCommitFalse(args);
            // 提交本地事務(wù)
            connectionProxy.commit();
            return result;
        });
    } catch (Exception e) {
        // when exception occur in finally,this exception will lost, so just print it here
        LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
        if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
            connectionProxy.getTargetConnection().rollback();
        }
        throw e;
    } finally {
        ((ConnectionProxy) connectionProxy).getContext().reset();
        connectionProxy.setAutoCommit(true);
    }
}


protected T executeAutoCommitFalse(Object[] args) throws Exception {
    TableRecords beforeImage = beforeImage();
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    TableRecords afterImage = afterImage(beforeImage);
    prepareUndoLog(beforeImage, afterImage);
    return result;
}
private void processGlobalTransactionCommit() throws SQLException {
    try {
        // RM向TC注冊分支事務(wù),獲取branchId
        register();
    } catch (TransactionException e) {
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }

    try {
        if (context.hasUndoLog()) {
            // 插入數(shù)據(jù)庫回滾日志
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        }
        // 提交本地事務(wù)
        targetConnection.commit();
    } catch (Throwable ex) {
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
        // RM向TC報(bào)告本地事務(wù)提交失敗
        report(false);
        throw new SQLException(ex);
    }
    // RM向TC報(bào)告本地事務(wù)提交成功
    report(true);
    // 清空XID砂代、branchId
    context.reset();
}

二階段

TM通知TC全局事務(wù)提交/回滾蹋订。TC通知各RM提交/回滾本地事務(wù)。
RM以RMHandlerAT來處理TC的事務(wù)提交通知刻伊。提交全局事務(wù)時(shí)露戒,RM將刪除undo log。先將刪除操作封裝為任務(wù)放入AsyncWorker中的阻塞隊(duì)列中捶箱,并返回TC成功消息智什。AsyncWorker中的定時(shí)器每隔1s執(zhí)行刪除任務(wù)。

回滾時(shí)丁屎,RM先獲取undo log回滾數(shù)據(jù)荠锭,然后刪除undo log。邏輯在AbstractUndoLogManager中undo()



public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
    Connection conn = null;
    ResultSet rs = null;
    PreparedStatement selectPST = null;
    boolean originalAutoCommit = true;

    for (; ; ) {
        try {
            // 獲取源數(shù)據(jù)庫鏈接
            conn = dataSourceProxy.getPlainConnection();

            // The entire undo process should run in a local transaction.
            if (originalAutoCommit = conn.getAutoCommit()) {
                conn.setAutoCommit(false);
            }

            // Find UNDO LOG
            selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
            selectPST.setLong(1, branchId);
            selectPST.setString(2, xid);
            rs = selectPST.executeQuery();

            boolean exists = false;
            while (rs.next()) {
                exists = true;

                // It is possible that the server repeatedly sends a rollback request to roll back
                // the same branch transaction to multiple processes,
                // ensuring that only the undo_log in the normal state is processed.
                int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
                if (!canUndo(state)) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, ignore {} undo_log",
                                xid, branchId, state);
                    }
                    return;
                }

                String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                Map<String, String> context = parseContext(contextString);
                Blob b = rs.getBlob(ClientTableColumnsName.UNDO_LOG_ROLLBACK_INFO);
                byte[] rollbackInfo = BlobUtils.blob2Bytes(b);

                String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() :
                        UndoLogParserFactory.getInstance(serializer);
                BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                try {
                    // put serializer name to local
                    setCurrentSerializer(parser.getName());
                    List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                    if (sqlUndoLogs.size() > 1) {
                        Collections.reverse(sqlUndoLogs);
                    }
                    for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                        TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy).getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
                        sqlUndoLog.setTableMeta(tableMeta);
                        AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                dataSourceProxy.getDbType(),
                                sqlUndoLog);
                        // 回滾數(shù)據(jù)
                        undoExecutor.executeOn(conn);
                    }
                } finally {
                    // remove serializer name
                    removeCurrentSerializer();
                }
            }

            // If undo_log exists, it means that the branch transaction has completed the first phase,
            // we can directly roll back and clean the undo_log
            // Otherwise, it indicates that there is an exception in the branch transaction,
            // causing undo_log not to be written to the database.
            // For example, the business processing timeout, the global transaction is the initiator rolls back.
            // To ensure data consistency, we can insert an undo_log with GlobalFinished state
            // to prevent the local transaction of the first phase of other programs from being correctly submitted.
            // See https://github.com/seata/seata/issues/489

            if (exists) {
                // 刪除undo log
                deleteUndoLog(xid, branchId, conn);
                conn.commit();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("xid {} branch {}, undo_log deleted with {}",
                            xid, branchId, State.GlobalFinished.name());
                }
            } else {
                // 插入undo log晨川,防止一階段重復(fù)寫入
                insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                conn.commit();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("xid {} branch {}, undo_log added with {}",
                            xid, branchId, State.GlobalFinished.name());
                }
            }

            return;
        }
    }
}

隔離界別

寫隔離

一階段本地事務(wù)提交前要先獲取全局鎖证九,獲取到才可提交删豺。獲取全局鎖被限定在一定時(shí)間范圍內(nèi)。
比如tx1和tx2兩個(gè)事務(wù)更新數(shù)據(jù)場景愧怜。tx1先獲取本地鎖執(zhí)行更新呀页,然后獲取全局鎖去提交本地事務(wù),本地事務(wù)提交后釋放本地鎖拥坛。此時(shí)全局鎖還是tx1蓬蝶。tx2更新時(shí),無法獲取全局鎖猜惋,也就無法提交本地事務(wù)丸氛。

如果tx1提交,則釋放全局鎖惨奕,tx2才可以執(zhí)行雪位。
如果tx1回滾,需要獲取本地鎖執(zhí)行補(bǔ)償操作梨撞,而tx2擁有本地鎖雹洗。所以tx1本地事務(wù)回滾失敗,并一直重試卧波。直到tx2獲取全局鎖超時(shí)时肿,釋放本地鎖為止。

讀隔離

默認(rèn)分支事務(wù)是讀已提交級別或之上港粱,全局事務(wù)是讀未提交級別螃成。如果應(yīng)用需要全局隔離級別到讀已提交,則是通過SELECT FOR UPDATE代理語句實(shí)現(xiàn)查坪。
SELECT FOR UPDATE執(zhí)行時(shí)會(huì)先獲取全局鎖寸宏,而也會(huì)獲取數(shù)據(jù)庫拍他鎖。如果獲取失敗偿曙,就釋放本地鎖氮凝,然后重試。這時(shí)查詢是阻塞的望忆。直到獲取全局鎖罩阵。

引用

https://my.oschina.net/u/1464083/blog/3040896#h2_8
https://www.sofastack.tech/blog/seata-distributed-transaction-deep-dive/
https://mp.weixin.qq.com/s/EzmZ-DAi-hxJhRkFvFhlJQ
http://seata.io/zh-cn/docs/overview/what-is-seata.html
https://github.com/javagrowing/JGrowing/tree/master/%E5%88%86%E5%B8%83%E5%BC%8F/%E5%88%86%E5%B8%83%E5%BC%8F%E4%BA%8B%E5%8A%A1

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市启摄,隨后出現(xiàn)的幾起案子稿壁,更是在濱河造成了極大的恐慌,老刑警劉巖歉备,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件傅是,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)落午,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進(jìn)店門谎懦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人溃斋,你說我怎么就攤上這事界拦。” “怎么了梗劫?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵享甸,是天一觀的道長。 經(jīng)常有香客問我梳侨,道長蛉威,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任走哺,我火速辦了婚禮蚯嫌,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘丙躏。我一直安慰自己择示,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布晒旅。 她就那樣靜靜地躺著栅盲,像睡著了一般。 火紅的嫁衣襯著肌膚如雪废恋。 梳的紋絲不亂的頭發(fā)上谈秫,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天,我揣著相機(jī)與錄音鱼鼓,去河邊找鬼拟烫。 笑死,一個(gè)胖子當(dāng)著我的面吹牛迄本,可吹牛的內(nèi)容都是我干的构灸。 我是一名探鬼主播,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼岸梨,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了稠氮?” 一聲冷哼從身側(cè)響起曹阔,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎隔披,沒想到半個(gè)月后赃份,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年抓韩,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了纠永。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,722評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡谒拴,死狀恐怖尝江,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情英上,我是刑警寧澤炭序,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站苍日,受9級特大地震影響惭聂,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜相恃,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一辜纲、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧拦耐,春花似錦耕腾、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至火脉,卻和暖如春牵舵,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背倦挂。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工畸颅, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人方援。 一個(gè)月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓没炒,卻偏偏與公主長得像,于是被迫代替她去往敵國和親犯戏。 傳聞我的和親對象是個(gè)殘疾皇子送火,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內(nèi)容