深入解析分布式事務中間件seata-AT模式 (友好版)

寫在前面

seata是阿里巴巴開源侦讨,用于解決分布式事務的中間件,目前在github上已經(jīng)擁有18k+的star紊服,是分布式事務中間件的翹楚伦泥,它擁有四種解決分布式事務的模式:AT、TCC芝雪、XA减余、SAGA。
下面給出四種模式的簡要說明:

AT:一種通過動態(tài)代理實現(xiàn)無侵入的分布式事務解決方案惩系,是2pc的一種實現(xiàn)佳励,一階段:業(yè)務數(shù)據(jù)和回滾日志記錄在同一個本地事務中提交,釋放本地鎖和連接資源蛆挫。二階段:提交異步化赃承,非常快速地完成悴侵,而回滾則通過一階段的回滾日志進行反向補償瞧剖。
TCC:一種侵入式解決方案,每個分支事務都需要自己實現(xiàn)TCC的行為可免,支持把 “自定義” 的分支事務納入到全局事務的管理中抓于。
XA:利用事務資源(數(shù)據(jù)庫、消息服務等)對 XA 協(xié)議的支持浇借,以 XA 協(xié)議的機制來管理分支事務的一種解決方案捉撮。
SAGA:把事務看成多個階段,每個階段都可以向前補償與向后回滾妇垢,需要自己實現(xiàn)一階段正向服務和二階段補償服務巾遭,以狀態(tài)機引擎來驅(qū)動全局事務的一種解決方案。

本文會按照AT模式的執(zhí)行全局事務的主流程順序闯估,對三端(TM灼舍、RM、TC)如何協(xié)同處理分布式事務進行解析涨薪,解析順序是TM->RM->TC骑素,會對源碼中無關的部分做適當?shù)膭h減,注明代碼省略刚夺,以便閱讀献丑。

下面是AT模式三端的簡要說明:
TM (Transaction Manager) - 事務管理器:定義全局事務的范圍:開始全局事務末捣、提交或回滾全局事務。
RM (Resource Manager) - 資源管理器:管理分支事務處理的資源创橄,與TC交談以注冊分支事務和報告分支事務的狀態(tài)塔粒,并驅(qū)動分支事務提交或回滾。
TC (Transaction Coordinator) - 事務協(xié)調(diào)者:維護全局和分支事務的狀態(tài)筐摘,驅(qū)動全局事務提交或回滾卒茬。

三端協(xié)同處理圖

AT-TM端分析

AT-TM端是基于spring的ioc與aop能力對原方法進行動態(tài)代理,處理全局事務的發(fā)起與提交或回滾咖熟。

    @Bean
    @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
        ...//代碼省略
        return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    }

SeataAutoConfiguration是AT-TM端的源頭圃酵,利用了spring自動注入的特點,為某個方法打上@Bean注解馍管,生成GlobalTransactionScanner郭赐。
接下來我們進入GlobalTransactionScanner,它實現(xiàn)了spring的InitializingBean接口确沸,因此利用spring的特性捌锭,當bean實現(xiàn)時會調(diào)用afterPropertiesSet方法進行初始化。

    @Override
    public void afterPropertiesSet() {
        ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
            (ConfigurationChangeListener)this);
        ...//代碼省略
        if (initialized.compareAndSet(false, true)) {
            initClient();
        }
        ...//代碼省略
    }

GlobalTransactionScanner的afterPropertiesSet方法中罗捎,initClient會對TM观谦、RM的client進行初始化,對關閉鉤子的注冊桨菜。
同時豁状,GlobalTransactionScanner也實現(xiàn)了AbstractAutoProxyCreator抽象類,會對所有由spring生成的bean進行wrapIfNecessary的二次處理倒得,正是由此方法泻红,為打上@GlobalTransaction注解的bean進行動態(tài)代理,由GlobalTransactionalInterceptor進行代理霞掺。

 @Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        try {
            synchronized (PROXYED_SET) {
                    ...//代碼省略
                    if (interceptor == null) {
                        if (globalTransactionalInterceptor == null) {
                            globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                            ConfigurationCache.addConfigListener(
                                ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                (ConfigurationChangeListener)globalTransactionalInterceptor);
                        }
                        interceptor = globalTransactionalInterceptor;
                    }
                }
                if (!AopUtils.isAopProxy(bean)) {
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
                PROXYED_SET.add(beanName);
                return bean;
            }
            ...//代碼省略
    }

由GlobalTransactionalInterceptor接管后谊路,調(diào)用將經(jīng)過它的invoke方法,再經(jīng)過handleGlobalTransaction或者handleGlobalLock進行處理菩彬,這是看方法上的注解是@GlobalTransactional或者@GlobalLock決定的缠劝。

    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
            ...//代碼省略
            if (!localDisable) {
                if (globalTransactionalAnnotation != null) {
                    return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
                } else if (globalLockAnnotation != null) {
                    return handleGlobalLock(methodInvocation, globalLockAnnotation);
                }
            }
        }
        return methodInvocation.proceed();
    }

handleGlobalLock其實是handleGlobalTransaction的子集,接下來只分析handleGlobalTransaction方法挤巡,handleGlobalTransaction比較簡單剩彬,直接調(diào)用事務模板類TransactionalTemplate的execute方法繼續(xù)進行,報錯則調(diào)用錯誤處理器進行處理酷麦,最后發(fā)送消息到事務總線矿卑。execute入?yún)⑹荰ransactionalExecutor,封裝了對原方法的調(diào)用沃饶、名稱母廷、事務配置的獲取轻黑。

Object handleGlobalTransaction(final MethodInvocation methodInvocation,
        final GlobalTransactional globalTrxAnno) throws Throwable {
        boolean succeed = true;
        try {
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }
                ...//代碼省略
                @Override
                public TransactionInfo getTransactionInfo() {
                    // reset the value of timeout
                    int timeout = globalTrxAnno.timeoutMills();
                    if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                        timeout = defaultGlobalTransactionTimeout;
                    }

                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(timeout);
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(globalTrxAnno.propagation());
                    transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
                    transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    ...//代碼省略
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                ...//代碼省略
                case RollbackRetrying:
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                default:
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
            }
        } finally {
            if (degradeCheck) {
                EVENT_BUS.post(new DegradeCheckEvent(succeed));
            }
        }
    }

接下來到seata事務的核心椭岩,事務模板TransactionalTemplate移迫,里面封裝了spring的事務傳播模式:
NOT_SUPPORTED(如果有事務則掛起,不在事務中執(zhí)行原方法)
REQUIRES_NEW(如果有事務則掛起嗡贺,新建事務中執(zhí)行原方法)
SUPPORTS(如果不存在事務則直接執(zhí)行原方法业舍,若存在事務則在原事務執(zhí)行原方法)
REQUIRED(不進行任何處理抖拦,若存在事務則在事務中執(zhí)行,否則相反)
NEVER(若存在事務直接報錯舷暮,沒有事務則執(zhí)行)
MANDATORY(不存在事務則報錯态罪,必須在原事務中執(zhí)行)
,還有類似spring的事務執(zhí)行順序下面,beginTransaction->business.execute->completeTransactionAfterThrowing->commitTransaction->cleanUp复颈。似曾相識,在seata還能復習下spring事務沥割,哈哈耗啦。這里的英文注釋非常詳細,很容易看明白seata的封裝方式机杜。

   public Object execute(TransactionalExecutor business) throws Throwable {
        ...//代碼省略
        TransactionInfo txInfo = business.getTransactionInfo();
        Propagation propagation = txInfo.getPropagation();
        try {
            switch (propagation) {
                case NOT_SUPPORTED:
                    // If transaction is existing, suspend it.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                    }
                    // Execute without transaction and return.
                    return business.execute();
                case REQUIRES_NEW:
                    // If transaction is existing, suspend it, and then begin new transaction.
                    if (existingTransaction(tx)) {
                        suspendedResourcesHolder = tx.suspend();
                        tx = GlobalTransactionContext.createNew();
                    }
                    // Continue and execute with new transaction
                    break;
                case SUPPORTS:
                    // If transaction is not existing, execute without transaction.
                    if (notExistingTransaction(tx)) {
                        return business.execute();
                    }
                    // Continue and execute with new transaction
                    break;
                case REQUIRED:
                    // If current transaction is existing, execute with current transaction,
                    // else continue and execute with new transaction.
                    break;
                case NEVER:
                    // If transaction is existing, throw exception.
                    if (existingTransaction(tx)) {
                        throw new TransactionException(
                            String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                    , tx.getXid()));
                    } else {
                        // Execute without transaction and return.
                        return business.execute();
                    }
                case MANDATORY:
                    // If transaction is not existing, throw exception.
                    if (notExistingTransaction(tx)) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    // Continue and execute with current transaction.
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }
            ...//代碼省略
            try {
                beginTransaction(txInfo, tx);
                Object rs;
                try {
                    // Do Your Business
                    rs = business.execute();
                } catch (Throwable ex) {
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }
                commitTransaction(tx);
                return rs;
            } finally {
                ...//代碼省略
                cleanUp();
            }
        }...//代碼省略
    }

1.來到beginTransaction帜讲,如果看過鎮(zhèn)樓圖,就是TM向TC注冊事務的過程椒拗,調(diào)用鏈路是TransactionalTemplate->DefaultGlobalTransaction->DefaultTransactionManager的begin方法舒帮,值得一提的是,TM端(DefaultTransactionManager)和TC端(DefaultCore)處理事務的類陡叠,都是實現(xiàn)了TransactionManager接口玩郊,的確TM和TC端的處理方法是一一對應的。在DefaultTransactionManager使用TmNettyRemotingClient發(fā)送事務注冊請求枉阵,還記得剛開始初始化的TMclient嗎译红?養(yǎng)兵千日,用在一時兴溜。

@Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        return response.getXid();
    }

2.business.execute是原方法的執(zhí)行侦厚,會在AT-RM端進行分析,此處省略拙徽。

3.completeTransactionAfterThrowing是在原方法調(diào)用后報錯進行rollback刨沦,跟begin注冊事務的方式幾乎一模一樣,都是使用TMclient向TC發(fā)出rollback請求膘怕,此處省略想诅。

4.commitTransaction是在原方法調(diào)用成功后進行commit,跟begin注冊事務的方式幾乎一模一樣,都是使用TMclient向TC發(fā)出commit請求来破,此處省略篮灼。

5.cleanUp是在事務模板執(zhí)行后進行清掃現(xiàn)場的方法,目的是對保存在ThreadLocal中的事務鉤子進行清除徘禁。

AT-TM端總結(jié)
AT-TM端對打上事務注解的方法使用了動態(tài)代理诅诱,將原方法封裝成事務模板進行執(zhí)行,是事務的注冊到事務的提交或回滾的發(fā)起方送朱∧锏矗可以說AT-TM端生于spring,也對spring的事務處理進行增強驶沼。

AT-RM端分析

AT-RM端的實現(xiàn)思想比較巧妙它改,是通過自上而下的動態(tài)代理數(shù)據(jù)庫相關類(Database、Connection商乎、Statement)央拖,在原方法執(zhí)行過程中對sql的執(zhí)行進行攔截處理。
首先AT-RM端會將原DataSource進行代理形成DataSourceProxy鹉戚,這樣可以通過DataSourceProxy從原DataSource獲取Connection進行代理形成ConnectionProxy鲜戒,最后可以通過ConnectionProxy獲取Statement進行代理形成StatementProxy。由ConnectionProxy與StatementProxy接管sql的執(zhí)行與全局事務的處理抹凳。

  @Bean
    public DataSourceProxy dataSourceProxy(DataSource dataSource) {
        return new DataSourceProxy(dataSource);
    }

 @Override
    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }

 @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
        Statement statement = targetConnection.createStatement(resultSetType, resultSetConcurrency);
        return new StatementProxy<Statement>(this, statement);
    }

直接進入StatementProxy遏餐,可以看到里面的方法都是使用了ExecuteTemplate執(zhí)行模板進行處理,比如executeUpdate方法赢底。

 @Override
    public int executeUpdate(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate((String) args[0]), sql);
    }

可以看到ExecuteTemplate.execute會區(qū)別對待不同的sql類型失都,生成不同的Executor,最后調(diào)用Executor的execute方法執(zhí)行sql幸冻。

   public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
        ...//代碼省略
        Executor<T> executor;
        ...//代碼省略
                switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                new Object[]{statementProxy, statementCallback, sqlRecognizer});
                        break;
                    case UPDATE:
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }
            } else {
                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
            }
        }
        T rs;
        try {
            rs = executor.execute(args);
        } catch (Throwable ex) {
            ...//代碼省略
        }
        return rs;
    }

選取一個比較簡單又能體現(xiàn)完整AT-RM端流程的UpdateExecutor進行分析粹庞,如果設置了自動提交,會進入executeAutoCommitTrue流程洽损,否則會進入executeAutoCommitFalse流程庞溜。
繼續(xù)選取executeAutoCommitTrue流程進行分析,首先設置自動提交為false碑定,然后執(zhí)行executeAutoCommitFalse后會調(diào)用ConnectionProxy的commit方法流码,最后重置上下文與自動提交狀態(tài)位,這正是很多中間件處理本地事務的步驟延刘。

 protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            connectionProxy.setAutoCommit(false);
            return new LockRetryPolicy(connectionProxy).execute(() -> {
                T result = executeAutoCommitFalse(args);
                connectionProxy.commit();
                return result;
            });
        } catch (Exception e) {
            ...//代碼省略
            throw e;
        } finally {
            connectionProxy.getContext().reset();
            connectionProxy.setAutoCommit(true);
        }
    }

executeAutoCommitFalse流程會執(zhí)行beforeImage保存前置鏡像漫试,調(diào)用原sql執(zhí)行回調(diào),afterImage保存后置鏡像碘赖,合并鏡像形成undolog與lockkey驾荣。
形成鏡像的原理是根據(jù)原sql生成等價的查詢sql外构,執(zhí)行查詢sql形成鏡像。
形成lockkey的原理是根據(jù)鏡像獲取主鍵列表(primary key 簡稱pk)進行拼接形成lockkey秘车。

protected T executeAutoCommitFalse(Object[] args) throws Exception {
        ...//代碼省略
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
        ...//代碼省略
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
        String lockKeys = buildLockKey(lockKeyRecords);
        connectionProxy.appendLockKey(lockKeys);
        SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
        connectionProxy.appendUndoLog(sqlUndoLog);
    }

connectionProxy.commit就是最終的收尾流程典勇,向TC進行注冊的同時加上全局鎖劫哼,本地sql與undolog一起提交到本地數(shù)據(jù)庫叮趴,最后向TC報告本身事務狀態(tài),當然权烧,RM與TC的交互還是用最開始初始化的RmClient眯亦,就不再細說了。

    private void processGlobalTransactionCommit() throws SQLException {
        try {
            register();
        } catch (TransactionException e) {
            ...//代碼省略
        }
        try {
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            targetConnection.commit();
        } catch (Throwable ex) {
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        context.reset();
    }
AT-RM端總結(jié)
AT-RM端同樣使用了動態(tài)代理般码,將原方法封裝成執(zhí)行模板進行執(zhí)行妻率,保證了全局事務的加鎖、向TC報告本身事務狀態(tài)板祝、本地sql與undolog“同生共死”宫静,一起提交到數(shù)據(jù)庫,為全局提交或回滾做好準備券时。

AT-TC端分析

AT-TC端是負責整個全局事務的注冊孤里、提交或回滾的總控制節(jié)點。
由上文可知橘洞,AT-TC端接收并處理了TM端的全局事務的注冊捌袜、提交或回滾請求,RM端的分支事務的報告本身狀態(tài)請求炸枣。
當TM發(fā)起全局提交或回滾時虏等,會回調(diào)全局事務底下所有RM的最終提交或回滾接口,刪除undolog或者根據(jù)undolog進行重放恢復數(shù)據(jù)适肠。

AT-TC端是以NettyRemotingServer啟動并且接收處理來自TM與RM的請求霍衫,請求會流經(jīng)DefaultCoordinator->DefaultCore到達默認的核心處理類。

首先看TM發(fā)起的全局事務的注冊侯养,會調(diào)用DefaultCore的begin方法慕淡,創(chuàng)建GlobalSession通過生命周期監(jiān)聽服務保存到數(shù)據(jù)庫,向事務總線發(fā)送消息沸毁,調(diào)用完成后返回GlobalSession的xid給到TM峰髓,xid將由TM保存在執(zhí)行鏈路的上下文中,保證RM分支事務提交時關聯(lián)到TM全局事務息尺。

    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        session.begin();
        eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));

        return session.getXid();
    }

RM的分支注冊携兵,會調(diào)用DefaultCore的branchRegister方法,創(chuàng)建BatchSession搂誉,檢查全局鎖徐紧,同樣會通過生命周期監(jiān)聽服務保存到數(shù)據(jù)庫,調(diào)用完成后返回BatchSession的btachId給到RM,btachId對RM的主要作用是與xid一起定位唯一一條undolog并级。

  @Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
                               String applicationData, String lockKeys) throws TransactionException {
        GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
        return SessionHolder.lockAndExecute(globalSession, () -> {
            globalSessionStatusCheck(globalSession);
            globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
            BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
                    applicationData, lockKeys, clientId);
            branchSessionLock(globalSession, branchSession);
            try {
                globalSession.addBranch(branchSession);
            } catch (RuntimeException ex) {
                branchSessionUnlock(branchSession);
                 ...//代碼省略
            }
            ...//代碼省略
            return branchSession.getBranchId();
        });
    }

RM的報告本身事務狀態(tài)拂檩,會調(diào)用DefaultCore的branchReport方法,會根據(jù)xid獲取到關聯(lián)的GlobalSession嘲碧,再根據(jù)batchId獲取到BranchSession稻励,調(diào)用GlobalSession的changeBranchStatus改變分支狀態(tài),同樣會通過生命周期監(jiān)聽服務保存到數(shù)據(jù)庫愈涩。

 @Override
    public void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status,
                             String applicationData) throws TransactionException {
        GlobalSession globalSession = assertGlobalSessionNotNull(xid, true);
        BranchSession branchSession = globalSession.getBranch(branchId);
        ...//代碼省略
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        globalSession.changeBranchStatus(branchSession, status);
        ...//代碼省略
    }

TM的全局提交望抽,會調(diào)用DefaultCore的commit方法,會通過加鎖與判斷全局事務模式與分支節(jié)點是否進行異步提交履婉,如果是異步提交則將globalSession的status置為AsyncCommitting煤篙,等待定時線程池撈取此狀態(tài)的GlobalSession進行提交,否則直接調(diào)用doGlobalCommit進行全局提交毁腿,若全局事務提交成功辑奈,但是還擁有分支節(jié)點,則繼續(xù)走異步提交流程已烤,清除分支節(jié)點鸠窗。

 @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
         ...//代碼省略
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
            globalSession.closeAndClean();
            if (globalSession.getStatus() == GlobalStatus.Begin) {
                if (globalSession.canBeCommittedAsync()) {
                    globalSession.asyncCommit();
                    return false;
                } else {
                    globalSession.changeStatus(GlobalStatus.Committing);
                    return true;
                }
            }
            return false;
        });
        if (shouldCommit) {
            boolean success = doGlobalCommit(globalSession, false);
            if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
                globalSession.asyncCommit();
                return GlobalStatus.Committed;
            } else {
                return globalSession.getStatus();
            }
        } else {
            return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
        }
    }

doGlobalCommit執(zhí)行全局事務下的所有分支提交回調(diào),若分支提交狀態(tài)不是PhaseTwo_Committed會進行重試草戈,將globalSession的狀態(tài)置為CommitRetrying塌鸯,等待定時線程池撈取此狀態(tài)的GlobalSession進行提交。

    @Override
    public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
        boolean success = true;
          ...//代碼省略
         ...//if是SAGA模式處理唐片,代碼省略
         else {
            for (BranchSession branchSession : globalSession.getSortedBranches()) {
                if (!retrying && branchSession.canBeCommittedAsync()) {
                    continue;
                }

                BranchStatus currentStatus = branchSession.getStatus();
                if (currentStatus == BranchStatus.PhaseOne_Failed) {
                    globalSession.removeBranch(branchSession);
                    continue;
                }
                try {
                    BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);

                    switch (branchStatus) {
                        case PhaseTwo_Committed:
                            globalSession.removeBranch(branchSession);
                            continue;
                        case PhaseTwo_CommitFailed_Unretryable:
                            if (globalSession.canBeCommittedAsync()) {
                                continue;
                            } else {
                                SessionHelper.endCommitFailed(globalSession);
                                return false;
                            }
                        default:
                            if (!retrying) {
                                globalSession.queueToRetryCommit();
                                return false;
                            }
                            if (globalSession.canBeCommittedAsync()) {
                                continue;
                            } else {
                                return false;
                            }
                    }
                } catch (Exception ex) {
                    if (!retrying) {
                        globalSession.queueToRetryCommit();
                        throw new TransactionException(ex);
                    }
                }
            }
            if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
                return false;
            }
        }
        if (success && globalSession.getBranchSessions().isEmpty()) {
            SessionHelper.endCommitted(globalSession);
            ...//代碼省略
        }
        return success;
    }

全局提交對每個branchSession都進行分支提交的回調(diào)丙猬,在RM端由DataSourceManager.branchCommit方法提交到待提交隊列ASYNC_COMMIT_BUFFER,異步等待timerExecutor調(diào)用UndoLogManager的batchDeleteUndoLog方法進行刪除undolog费韭。

    @Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
        ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData)));
         ...//代碼省略
        return BranchStatus.PhaseTwo_Committed;
    }

    private void doBranchCommits() {
         ...//代碼省略
        for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
            Connection conn = null;
            DataSourceProxy dataSourceProxy;
            try {
                try {
                   ...//代碼省略
                for (Phase2Context commitContext : contextsGroupedByResourceId) {
                     ...//代碼省略
                    if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
                          ...//代碼省略  
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(
                                xids, branchIds, conn);
                         ...//代碼省略
                        xids.clear();
                        branchIds.clear();
                    }
                }
  ...//代碼省略
    }

TM的全局回滾茧球,會調(diào)用DefaultCore的rollback方法,會通過加鎖與判斷全局事務模式是否進行回滾星持,如果是進行回滾則將globalSession的status置為Rollbacking抢埋,直接調(diào)用doGlobalRollback進行全局回滾。

 @Override
    public GlobalStatus rollback(String xid) throws TransactionException {
        GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
        ...//代碼省略
        globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
            globalSession.close(); 
            if (globalSession.getStatus() == GlobalStatus.Begin) {
                globalSession.changeStatus(GlobalStatus.Rollbacking);
                return true;
            }
            return false;
        });
       ...//代碼省略
        doGlobalRollback(globalSession, false);
        return globalSession.getStatus();
    }

在doGlobalRollback中督暂,會執(zhí)行全局事務下的所有分支回滾回調(diào)揪垄,若分支提交狀態(tài)不是PhaseTwo_Rollbacked會進行重試,將globalSession的狀態(tài)置為TimeoutRollbackRetrying或者RollbackRetrying逻翁,等待定時線程池撈取此狀態(tài)的GlobalSession進行回滾饥努。

@Override
    public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
        boolean success = true;
         ...//if是SAGA模式,代碼省略
         else {
            for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
                BranchStatus currentBranchStatus = branchSession.getStatus();
                if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
                    globalSession.removeBranch(branchSession);
                    continue;
                }
                try {
                    BranchStatus branchStatus = branchRollback(globalSession, branchSession);
                    switch (branchStatus) {
                        case PhaseTwo_Rollbacked:
                            globalSession.removeBranch(branchSession);
                            continue;
                        case PhaseTwo_RollbackFailed_Unretryable:
                            SessionHelper.endRollbackFailed(globalSession);
                            return false;
                        default:
                            if (!retrying) {
                                globalSession.queueToRetryRollback();
                            }
                            return false;
                    }
                } catch (Exception ex) {
            
                    if (!retrying) {
                        globalSession.queueToRetryRollback();
                    }
                    throw new TransactionException(ex);
                }
            }
        ...//代碼省略
        if (success) {
            SessionHelper.endRollbacked(globalSession);
        ...//代碼省略
        }
        return success;
    }

全局回滾對每個branchSession都進行分支回滾的回調(diào)八回,在RM端由DataSourceManager.branchRollback方法調(diào)用UndoLogManager.undo邏輯酷愧,將undolog進行重放恢復數(shù)據(jù)驾诈。

  @Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        DataSourceProxy dataSourceProxy = get(resourceId);
         ...//代碼省略
        try {
            UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
          ...//代碼省略
          return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
        return BranchStatus.PhaseTwo_Rollbacked;
    }
AT-TC端總結(jié)
新版的AT-TM端將數(shù)據(jù)存放在數(shù)據(jù)庫而不是本地文件,進而可以基于zk溶浴、etcd等分布式協(xié)調(diào)服務進行高可用部署乍迄,它的主要功能是將全局事務的相關數(shù)據(jù)進行保存,掌控全局事務的提交與回滾回調(diào)士败,并且可以對全局事務進行補償重試闯两。

最后總結(jié)

AT模式通過本地事務先提交,全局事務提交時異步刪除分支undolog拱烁,提高了全局事務的性能生蚁,但是相對于樸素的正向補償噩翠,AT模式帶來了形成鏡像的查詢戏自、與TC通信加大耗時等負面效果,只能說AT模式是把雙刃劍伤锚,并不是銀彈擅笔。軟件的藝術之美源于trade-off,是否使用還是要看業(yè)務形態(tài)的適用情況與布道者在關鍵時機的推廣落地屯援。
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末猛们,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子狞洋,更是在濱河造成了極大的恐慌弯淘,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件吉懊,死亡現(xiàn)場離奇詭異庐橙,居然都是意外死亡,警方通過查閱死者的電腦和手機借嗽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門态鳖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人恶导,你說我怎么就攤上這事浆竭。” “怎么了惨寿?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵邦泄,是天一觀的道長。 經(jīng)常有香客問我裂垦,道長顺囊,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任缸废,我火速辦了婚禮包蓝,結(jié)果婚禮上驶社,老公的妹妹穿的比我還像新娘。我一直安慰自己测萎,他們只是感情好亡电,可當我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著硅瞧,像睡著了一般份乒。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上腕唧,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天或辖,我揣著相機與錄音,去河邊找鬼枣接。 笑死颂暇,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的但惶。 我是一名探鬼主播耳鸯,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼膀曾!你這毒婦竟也來了县爬?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤添谊,失蹤者是張志新(化名)和其女友劉穎财喳,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體斩狱,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡耳高,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了喊废。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片祝高。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖污筷,靈堂內(nèi)的尸體忽然破棺而出工闺,到底是詐尸還是另有隱情,我是刑警寧澤瓣蛀,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布陆蟆,位于F島的核電站,受9級特大地震影響惋增,放射性物質(zhì)發(fā)生泄漏叠殷。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一诈皿、第九天 我趴在偏房一處隱蔽的房頂上張望林束。 院中可真熱鬧像棘,春花似錦、人聲如沸壶冒。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽胖腾。三九已至烟零,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間咸作,已是汗流浹背锨阿。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留记罚,地道東北人墅诡。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像毫胜,于是被迫代替她去往敵國和親书斜。 傳聞我的和親對象是個殘疾皇子诬辈,可洞房花燭夜當晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容