Fescar - RM 全局事務(wù)提交回滾流程

開篇

?這篇文章的目的主要是講解RM在接收TC的請求后執(zhí)行全局分支事務(wù)提交(doBranchCommit)和全局分支事務(wù)回滾(doBranchRollback)的流程。

?全局的分支事務(wù)提交過程和回滾過程也算RM處理流程中核心的一環(huán)楞抡,了解以后并結(jié)合之前講解的本地事務(wù)提交流程就能夠較好的理解整個過程了稚晚。


全局事務(wù)操作流程

整體流程

public class RMHandlerAT extends AbstractRMHandlerAT implements 
                   RMInboundHandler, TransactionMessageHandler {

    private DataSourceManager dataSourceManager = DataSourceManager.get();

    @Override
    protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)  
       throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        LOGGER.info("AT Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
        BranchStatus status = dataSourceManager.branchCommit(xid, branchId, resourceId, applicationData);
        response.setBranchStatus(status);
        LOGGER.info("AT Branch commit result: " + status);

    }

    @Override
    protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)  
       throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        LOGGER.info("AT Branch rolling back: " + xid + " " + branchId + " " + resourceId);
        BranchStatus status = dataSourceManager.branchRollback(xid, branchId, resourceId, applicationData);
        response.setBranchStatus(status);
        LOGGER.info("AT Branch rollback result: " + status);

    }
}

說明:

  • doBranchCommit()通過dataSourceManager.branchCommit()去執(zhí)行分支事務(wù)提交挚冤。
  • doBranchRollback()通過dataSourceManager.branchRollback()去執(zhí)行分支事務(wù)回滾。
  • dataSourceManager是DataSourceManager對象。


doBranchCommit流程

public class DataSourceManager implements ResourceManager {

    private ResourceManagerInbound asyncWorker;

    public void setAsyncWorker(ResourceManagerInbound asyncWorker) {
        this.asyncWorker = asyncWorker;
    }

    public BranchStatus branchCommit(String xid, long branchId, String resourceId, String applicationData)  
       throws TransactionException {
        return asyncWorker.branchCommit(xid, branchId, resourceId, applicationData);
    }
}


public class AsyncWorker implements ResourceManagerInbound {

    public BranchStatus branchCommit(String xid, long branchId, String resourceId, String applicationData) 
     throws TransactionException {
        if (ASYNC_COMMIT_BUFFER.size() < ASYNC_COMMIT_BUFFER_LIMIT) {
            ASYNC_COMMIT_BUFFER.add(new Phase2Context(xid, branchId, resourceId, applicationData));
        } else {
            LOGGER.warn("Async commit buffer is FULL. 
            Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");
        }
        return BranchStatus.PhaseTwo_Committed;
    }

    public synchronized void init() {
        LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);
        timerExecutor = new ScheduledThreadPoolExecutor(1,
            new NamedThreadFactory("AsyncWorker", 1, true));
        timerExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {

                    doBranchCommits();

                } catch (Throwable e) {
                    LOGGER.info("Failed at async committing ... " + e.getMessage());

                }
            }
        }, 10, 1000 * 1, TimeUnit.MILLISECONDS);
    }



    private void doBranchCommits() {
        if (ASYNC_COMMIT_BUFFER.size() == 0) {
            return;
        }
        Map<String, List<Phase2Context>> mappedContexts = new HashMap<>();
        Iterator<Phase2Context> iterator = ASYNC_COMMIT_BUFFER.iterator();
        while (iterator.hasNext()) {
            Phase2Context commitContext = iterator.next();
            List<Phase2Context> contextsGroupedByResourceId = mappedContexts.get(commitContext.resourceId);
            if (contextsGroupedByResourceId == null) {
                contextsGroupedByResourceId = new ArrayList<>();
                mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId);
            }
            contextsGroupedByResourceId.add(commitContext);

            iterator.remove();

        }

        for (String resourceId : mappedContexts.keySet()) {
            Connection conn = null;
            try {
                try {
                    DataSourceProxy dataSourceProxy = DataSourceManager.get().get(resourceId);
                    conn = dataSourceProxy.getPlainConnection();
                } catch (SQLException sqle) {
                    LOGGER.warn("Failed to get connection for async committing on " + resourceId, sqle);
                    continue;
                }

                List<Phase2Context> contextsGroupedByResourceId = mappedContexts.get(resourceId);
                for (Phase2Context commitContext : contextsGroupedByResourceId) {
                    try {
                        UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn);
                    } catch (Exception ex) {
                        LOGGER.warn("Failed to delete undo log [" + 
                         commitContext.branchId + "/" + commitContext.xid + "]", ex);
                    }
                }

            } finally {
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (SQLException closeEx) {
                        LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
                    }
                }
            }

        }

    }
}

說明:

  • doBranchCommit()操作的核心實現(xiàn)通過AsyncWorker完成碴开,AsyncWorker類其實是一個生成消費模型哼鬓。
  • doBranchCommit()把需要提交的任務(wù)添加到AsyncWorker的ASYNC_COMMIT_BUFFER隊列當(dāng)中监右。
  • AsyncWorker內(nèi)部timerExecutor負責(zé)啟動執(zhí)行commit動作線程執(zhí)行doBranchCommits()動作。
  • doBranchCommits動作內(nèi)部負責(zé)刪除多余的UndoLog魄宏, UndoLogManager.deleteUndoLog秸侣。
  • doBranchCommit()的本質(zhì)任務(wù)就是刪除備份的回滾日志而已。


doBranchRollback流程

public class DataSourceManager implements ResourceManager {

    public BranchStatus branchRollback(String xid, long branchId, String resourceId, String applicationData) 
       throws TransactionException {
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException();
        }
        try {
            // 執(zhí)行回滾操作
            UndoLogManager.undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        return BranchStatus.PhaseTwo_Rollbacked;

    }
}


public final class UndoLogManager {
    private static String SELECT_UNDO_LOG_SQL = 
    "SELECT * FROM " + UNDO_LOG_TABLE_NAME + " WHERE log_status = 0 AND branch_id = ? AND xid = ? FOR UPDATE";

    public static void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) 
       throws TransactionException {

        assertDbSupport(dataSourceProxy.getTargetDataSource().getDbType());

        Connection conn = null;
        ResultSet rs = null;
        PreparedStatement selectPST = null;
        try {
            conn = dataSourceProxy.getPlainConnection();

            // The entire undo process should run in a local transaction.
            conn.setAutoCommit(false);

            // Find UNDO LOG
            selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
            selectPST.setLong(1, branchId);
            selectPST.setString(2, xid);
            rs = selectPST.executeQuery();

            // 遍歷所有回滾日志
            while (rs.next()) {
                Blob b = rs.getBlob("rollback_info");
                String rollbackInfo = StringUtils.blob2string(b);
                BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance().decode(rollbackInfo);

                for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
                    TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
                    sqlUndoLog.setTableMeta(tableMeta);
                    AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                                      dataSourceProxy.getDbType(), sqlUndoLog);
                    undoExecutor.executeOn(conn);
                }

            }
            deleteUndoLog(xid, branchId, conn);

            conn.commit();

        } catch (Throwable e) {
            if (conn != null) {
                try {
                    conn.rollback();
                } catch (SQLException rollbackEx) {
                    LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
                }
            }
            throw new TransactionException(BranchRollbackFailed_Retriable, String.format("%s/%s", branchId, xid), e);

        } finally {
            try {
                if (rs != null) {
                    rs.close();
                }
                if (selectPST != null) {
                    selectPST.close();
                }
                if (conn != null) {
                    conn.close();
                }
            } catch (SQLException closeEx) {
                LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
            }
        }
    }
}

說明:

  • doBranchRollback操作的核心實現(xiàn)通過UndoLogManager完成宠互,UndoLogManager.undo()負責(zé)執(zhí)行回滾味榛。
  • undo()操作的核心是通過SELECT_UNDO_LOG_SQL日志去獲取回滾日志內(nèi)容。
  • 根據(jù)undoLog對象通過UndoExecutorFactory.getUndoExecutor獲取回滾的執(zhí)行者Executor對象予跌。
  • undoExecutor.executeOn(conn)執(zhí)行回滾操作搏色,不同的回滾操作對象不同的undoExecutor
  • deleteUndoLog(xid, branchId, conn)執(zhí)行日志刪除操作券册。


期待

?下篇文章會針對undoExecutor作具體的介紹频轿。


Fescar源碼分析連載

Fescar 源碼解析系列

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市烁焙,隨后出現(xiàn)的幾起案子航邢,更是在濱河造成了極大的恐慌,老刑警劉巖骄蝇,帶你破解...
    沈念sama閱讀 222,104評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件膳殷,死亡現(xiàn)場離奇詭異,居然都是意外死亡九火,警方通過查閱死者的電腦和手機赚窃,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評論 3 399
  • 文/潘曉璐 我一進店門册招,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人勒极,你說我怎么就攤上這事是掰。” “怎么了辱匿?”我有些...
    開封第一講書人閱讀 168,697評論 0 360
  • 文/不壞的土叔 我叫張陵键痛,是天一觀的道長。 經(jīng)常有香客問我掀鹅,道長散休,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,836評論 1 298
  • 正文 為了忘掉前任乐尊,我火速辦了婚禮戚丸,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘扔嵌。我一直安慰自己限府,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,851評論 6 397
  • 文/花漫 我一把揭開白布痢缎。 她就那樣靜靜地躺著胁勺,像睡著了一般。 火紅的嫁衣襯著肌膚如雪独旷。 梳的紋絲不亂的頭發(fā)上署穗,一...
    開封第一講書人閱讀 52,441評論 1 310
  • 那天,我揣著相機與錄音嵌洼,去河邊找鬼案疲。 笑死,一個胖子當(dāng)著我的面吹牛麻养,可吹牛的內(nèi)容都是我干的褐啡。 我是一名探鬼主播,決...
    沈念sama閱讀 40,992評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼鳖昌,長吁一口氣:“原來是場噩夢啊……” “哼备畦!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起许昨,我...
    開封第一講書人閱讀 39,899評論 0 276
  • 序言:老撾萬榮一對情侶失蹤懂盐,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后糕档,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體允粤,經(jīng)...
    沈念sama閱讀 46,457評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,529評論 3 341
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了类垫。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,664評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡琅坡,死狀恐怖悉患,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情榆俺,我是刑警寧澤售躁,帶...
    沈念sama閱讀 36,346評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站茴晋,受9級特大地震影響陪捷,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜诺擅,卻給世界環(huán)境...
    茶點故事閱讀 42,025評論 3 334
  • 文/蒙蒙 一市袖、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧烁涌,春花似錦苍碟、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至抒钱,卻和暖如春蜓肆,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背谋币。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評論 1 272
  • 我被黑心中介騙來泰國打工仗扬, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人瑞信。 一個月前我還...
    沈念sama閱讀 49,081評論 3 377
  • 正文 我出身青樓厉颤,卻偏偏與公主長得像,于是被迫代替她去往敵國和親凡简。 傳聞我的和親對象是個殘疾皇子逼友,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,675評論 2 359

推薦閱讀更多精彩內(nèi)容

  • 事務(wù)(Transaction)是數(shù)據(jù)庫區(qū)別于文件系統(tǒng)的重要特性之一。 在文件系統(tǒng)中秤涩, 如果正在寫文件帜乞, 但是操作系...
    好好學(xué)習(xí)Sun閱讀 1,018評論 0 5
  • 萍聚 萍散 我是天涯走散的游子 回不了故鄉(xiāng) 你是漸行漸遠的蘭舟 河流把我們隔開 是因為我的丑陋 我無心出水 也無力...
    五十畝老傅閱讀 781評論 0 6
  • 好不容易把校本課程編寫完畢上傳給了廣告公司,美美地出門跑步回家做營養(yǎng)早餐筐眷。 “婉葉可以請你上堂課么黎烈?”同事電話給我...
    婉葉老師閱讀 690評論 3 13
  • 我是南飛雁,你可以叫我飛雁,我是一名奮斗者照棋,在實現(xiàn)財富自由的路上…… 我喜歡分享资溃,也喜歡思考;我有自己的人生規(guī)劃和...
    李貴功閱讀 408評論 0 2