寫在前面
seata是阿里巴巴開源侦讨,用于解決分布式事務的中間件,目前在github上已經(jīng)擁有18k+的star紊服,是分布式事務中間件的翹楚伦泥,它擁有四種解決分布式事務的模式:AT、TCC芝雪、XA减余、SAGA。
下面給出四種模式的簡要說明:
AT:一種通過動態(tài)代理實現(xiàn)無侵入的分布式事務解決方案惩系,是2pc的一種實現(xiàn)佳励,一階段:業(yè)務數(shù)據(jù)和回滾日志記錄在同一個本地事務中提交,釋放本地鎖和連接資源蛆挫。二階段:提交異步化赃承,非常快速地完成悴侵,而回滾則通過一階段的回滾日志進行反向補償瞧剖。
TCC:一種侵入式解決方案,每個分支事務都需要自己實現(xiàn)TCC的行為可免,支持把 “自定義” 的分支事務納入到全局事務的管理中抓于。
XA:利用事務資源(數(shù)據(jù)庫、消息服務等)對 XA 協(xié)議的支持浇借,以 XA 協(xié)議的機制來管理分支事務的一種解決方案捉撮。
SAGA:把事務看成多個階段,每個階段都可以向前補償與向后回滾妇垢,需要自己實現(xiàn)一階段正向服務和二階段補償服務巾遭,以狀態(tài)機引擎來驅(qū)動全局事務的一種解決方案。
本文會按照AT模式的執(zhí)行全局事務的主流程順序闯估,對三端(TM灼舍、RM、TC)如何協(xié)同處理分布式事務進行解析涨薪,解析順序是TM->RM->TC骑素,會對源碼中無關的部分做適當?shù)膭h減,注明代碼省略刚夺,以便閱讀献丑。
下面是AT模式三端的簡要說明:
TM (Transaction Manager) - 事務管理器:定義全局事務的范圍:開始全局事務末捣、提交或回滾全局事務。
RM (Resource Manager) - 資源管理器:管理分支事務處理的資源创橄,與TC交談以注冊分支事務和報告分支事務的狀態(tài)塔粒,并驅(qū)動分支事務提交或回滾。
TC (Transaction Coordinator) - 事務協(xié)調(diào)者:維護全局和分支事務的狀態(tài)筐摘,驅(qū)動全局事務提交或回滾卒茬。
AT-TM端分析
AT-TM端是基于spring的ioc與aop能力對原方法進行動態(tài)代理,處理全局事務的發(fā)起與提交或回滾咖熟。
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
...//代碼省略
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
SeataAutoConfiguration是AT-TM端的源頭圃酵,利用了spring自動注入的特點,為某個方法打上@Bean注解馍管,生成GlobalTransactionScanner郭赐。
接下來我們進入GlobalTransactionScanner,它實現(xiàn)了spring的InitializingBean接口确沸,因此利用spring的特性捌锭,當bean實現(xiàn)時會調(diào)用afterPropertiesSet方法進行初始化。
@Override
public void afterPropertiesSet() {
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)this);
...//代碼省略
if (initialized.compareAndSet(false, true)) {
initClient();
}
...//代碼省略
}
GlobalTransactionScanner的afterPropertiesSet方法中罗捎,initClient會對TM观谦、RM的client進行初始化,對關閉鉤子的注冊桨菜。
同時豁状,GlobalTransactionScanner也實現(xiàn)了AbstractAutoProxyCreator抽象類,會對所有由spring生成的bean進行wrapIfNecessary的二次處理倒得,正是由此方法泻红,為打上@GlobalTransaction注解的bean進行動態(tài)代理,由GlobalTransactionalInterceptor進行代理霞掺。
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
try {
synchronized (PROXYED_SET) {
...//代碼省略
if (interceptor == null) {
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
}
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
...//代碼省略
}
由GlobalTransactionalInterceptor接管后谊路,調(diào)用將經(jīng)過它的invoke方法,再經(jīng)過handleGlobalTransaction或者handleGlobalLock進行處理菩彬,這是看方法上的注解是@GlobalTransactional或者@GlobalLock決定的缠劝。
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
...//代碼省略
if (!localDisable) {
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed();
}
handleGlobalLock其實是handleGlobalTransaction的子集,接下來只分析handleGlobalTransaction方法挤巡,handleGlobalTransaction比較簡單剩彬,直接調(diào)用事務模板類TransactionalTemplate的execute方法繼續(xù)進行,報錯則調(diào)用錯誤處理器進行處理酷麦,最后發(fā)送消息到事務總線矿卑。execute入?yún)⑹荰ransactionalExecutor,封裝了對原方法的調(diào)用沃饶、名稱母廷、事務配置的獲取轻黑。
Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
boolean succeed = true;
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
...//代碼省略
@Override
public TransactionInfo getTransactionInfo() {
// reset the value of timeout
int timeout = globalTrxAnno.timeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
timeout = defaultGlobalTransactionTimeout;
}
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);
transactionInfo.setName(name());
transactionInfo.setPropagation(globalTrxAnno.propagation());
transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
...//代碼省略
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
switch (code) {
...//代碼省略
case RollbackRetrying:
failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
throw e.getOriginalException();
default:
throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
}
} finally {
if (degradeCheck) {
EVENT_BUS.post(new DegradeCheckEvent(succeed));
}
}
}
接下來到seata事務的核心椭岩,事務模板TransactionalTemplate移迫,里面封裝了spring的事務傳播模式:
NOT_SUPPORTED(如果有事務則掛起,不在事務中執(zhí)行原方法)
REQUIRES_NEW(如果有事務則掛起嗡贺,新建事務中執(zhí)行原方法)
SUPPORTS(如果不存在事務則直接執(zhí)行原方法业舍,若存在事務則在原事務執(zhí)行原方法)
REQUIRED(不進行任何處理抖拦,若存在事務則在事務中執(zhí)行,否則相反)
NEVER(若存在事務直接報錯舷暮,沒有事務則執(zhí)行)
MANDATORY(不存在事務則報錯态罪,必須在原事務中執(zhí)行)
,還有類似spring的事務執(zhí)行順序下面,beginTransaction->business.execute->completeTransactionAfterThrowing->commitTransaction->cleanUp复颈。似曾相識,在seata還能復習下spring事務沥割,哈哈耗啦。這里的英文注釋非常詳細,很容易看明白seata的封裝方式机杜。
public Object execute(TransactionalExecutor business) throws Throwable {
...//代碼省略
TransactionInfo txInfo = business.getTransactionInfo();
Propagation propagation = txInfo.getPropagation();
try {
switch (propagation) {
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
}
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
...//代碼省略
try {
beginTransaction(txInfo, tx);
Object rs;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
commitTransaction(tx);
return rs;
} finally {
...//代碼省略
cleanUp();
}
}...//代碼省略
}
1.來到beginTransaction帜讲,如果看過鎮(zhèn)樓圖,就是TM向TC注冊事務的過程椒拗,調(diào)用鏈路是TransactionalTemplate->DefaultGlobalTransaction->DefaultTransactionManager的begin方法舒帮,值得一提的是,TM端(DefaultTransactionManager)和TC端(DefaultCore)處理事務的類陡叠,都是實現(xiàn)了TransactionManager接口玩郊,的確TM和TC端的處理方法是一一對應的。在DefaultTransactionManager使用TmNettyRemotingClient發(fā)送事務注冊請求枉阵,還記得剛開始初始化的TMclient嗎译红?養(yǎng)兵千日,用在一時兴溜。
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid();
}
2.business.execute是原方法的執(zhí)行侦厚,會在AT-RM端進行分析,此處省略拙徽。
3.completeTransactionAfterThrowing是在原方法調(diào)用后報錯進行rollback刨沦,跟begin注冊事務的方式幾乎一模一樣,都是使用TMclient向TC發(fā)出rollback請求膘怕,此處省略想诅。
4.commitTransaction是在原方法調(diào)用成功后進行commit,跟begin注冊事務的方式幾乎一模一樣,都是使用TMclient向TC發(fā)出commit請求来破,此處省略篮灼。
5.cleanUp是在事務模板執(zhí)行后進行清掃現(xiàn)場的方法,目的是對保存在ThreadLocal中的事務鉤子進行清除徘禁。
AT-TM端總結(jié)
AT-TM端對打上事務注解的方法使用了動態(tài)代理诅诱,將原方法封裝成事務模板進行執(zhí)行,是事務的注冊到事務的提交或回滾的發(fā)起方送朱∧锏矗可以說AT-TM端生于spring,也對spring的事務處理進行增強驶沼。
AT-RM端分析
AT-RM端的實現(xiàn)思想比較巧妙它改,是通過自上而下的動態(tài)代理數(shù)據(jù)庫相關類(Database、Connection商乎、Statement)央拖,在原方法執(zhí)行過程中對sql的執(zhí)行進行攔截處理。
首先AT-RM端會將原DataSource進行代理形成DataSourceProxy鹉戚,這樣可以通過DataSourceProxy從原DataSource獲取Connection進行代理形成ConnectionProxy鲜戒,最后可以通過ConnectionProxy獲取Statement進行代理形成StatementProxy。由ConnectionProxy與StatementProxy接管sql的執(zhí)行與全局事務的處理抹凳。
@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
return new DataSourceProxy(dataSource);
}
@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
Statement statement = targetConnection.createStatement(resultSetType, resultSetConcurrency);
return new StatementProxy<Statement>(this, statement);
}
直接進入StatementProxy遏餐,可以看到里面的方法都是使用了ExecuteTemplate執(zhí)行模板進行處理,比如executeUpdate方法赢底。
@Override
public int executeUpdate(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate((String) args[0]), sql);
}
可以看到ExecuteTemplate.execute會區(qū)別對待不同的sql類型失都,生成不同的Executor,最后調(diào)用Executor的execute方法執(zhí)行sql幸冻。
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
...//代碼省略
Executor<T> executor;
...//代碼省略
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
}
T rs;
try {
rs = executor.execute(args);
} catch (Throwable ex) {
...//代碼省略
}
return rs;
}
選取一個比較簡單又能體現(xiàn)完整AT-RM端流程的UpdateExecutor進行分析粹庞,如果設置了自動提交,會進入executeAutoCommitTrue流程洽损,否則會進入executeAutoCommitFalse流程庞溜。
繼續(xù)選取executeAutoCommitTrue流程進行分析,首先設置自動提交為false碑定,然后執(zhí)行executeAutoCommitFalse后會調(diào)用ConnectionProxy的commit方法流码,最后重置上下文與自動提交狀態(tài)位,這正是很多中間件處理本地事務的步驟延刘。
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
connectionProxy.setAutoCommit(false);
return new LockRetryPolicy(connectionProxy).execute(() -> {
T result = executeAutoCommitFalse(args);
connectionProxy.commit();
return result;
});
} catch (Exception e) {
...//代碼省略
throw e;
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
}
}
executeAutoCommitFalse流程會執(zhí)行beforeImage保存前置鏡像漫试,調(diào)用原sql執(zhí)行回調(diào),afterImage保存后置鏡像碘赖,合并鏡像形成undolog與lockkey驾荣。
形成鏡像的原理是根據(jù)原sql生成等價的查詢sql外构,執(zhí)行查詢sql形成鏡像。
形成lockkey的原理是根據(jù)鏡像獲取主鍵列表(primary key 簡稱pk)進行拼接形成lockkey秘车。
protected T executeAutoCommitFalse(Object[] args) throws Exception {
...//代碼省略
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
...//代碼省略
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
String lockKeys = buildLockKey(lockKeyRecords);
connectionProxy.appendLockKey(lockKeys);
SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
connectionProxy.appendUndoLog(sqlUndoLog);
}
connectionProxy.commit就是最終的收尾流程典勇,向TC進行注冊的同時加上全局鎖劫哼,本地sql與undolog一起提交到本地數(shù)據(jù)庫叮趴,最后向TC報告本身事務狀態(tài),當然权烧,RM與TC的交互還是用最開始初始化的RmClient眯亦,就不再細說了。
private void processGlobalTransactionCommit() throws SQLException {
try {
register();
} catch (TransactionException e) {
...//代碼省略
}
try {
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
targetConnection.commit();
} catch (Throwable ex) {
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
AT-RM端總結(jié)
AT-RM端同樣使用了動態(tài)代理般码,將原方法封裝成執(zhí)行模板進行執(zhí)行妻率,保證了全局事務的加鎖、向TC報告本身事務狀態(tài)板祝、本地sql與undolog“同生共死”宫静,一起提交到數(shù)據(jù)庫,為全局提交或回滾做好準備券时。
AT-TC端分析
AT-TC端是負責整個全局事務的注冊孤里、提交或回滾的總控制節(jié)點。
由上文可知橘洞,AT-TC端接收并處理了TM端的全局事務的注冊捌袜、提交或回滾請求,RM端的分支事務的報告本身狀態(tài)請求炸枣。
當TM發(fā)起全局提交或回滾時虏等,會回調(diào)全局事務底下所有RM的最終提交或回滾接口,刪除undolog或者根據(jù)undolog進行重放恢復數(shù)據(jù)适肠。
AT-TC端是以NettyRemotingServer啟動并且接收處理來自TM與RM的請求霍衫,請求會流經(jīng)DefaultCoordinator->DefaultCore到達默認的核心處理類。
首先看TM發(fā)起的全局事務的注冊侯养,會調(diào)用DefaultCore的begin方法慕淡,創(chuàng)建GlobalSession通過生命周期監(jiān)聽服務保存到數(shù)據(jù)庫,向事務總線發(fā)送消息沸毁,調(diào)用完成后返回GlobalSession的xid給到TM峰髓,xid將由TM保存在執(zhí)行鏈路的上下文中,保證RM分支事務提交時關聯(lián)到TM全局事務息尺。
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
session.begin();
eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));
return session.getXid();
}
RM的分支注冊携兵,會調(diào)用DefaultCore的branchRegister方法,創(chuàng)建BatchSession搂誉,檢查全局鎖徐紧,同樣會通過生命周期監(jiān)聽服務保存到數(shù)據(jù)庫,調(diào)用完成后返回BatchSession的btachId給到RM,btachId對RM的主要作用是與xid一起定位唯一一條undolog并级。
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
return SessionHolder.lockAndExecute(globalSession, () -> {
globalSessionStatusCheck(globalSession);
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
applicationData, lockKeys, clientId);
branchSessionLock(globalSession, branchSession);
try {
globalSession.addBranch(branchSession);
} catch (RuntimeException ex) {
branchSessionUnlock(branchSession);
...//代碼省略
}
...//代碼省略
return branchSession.getBranchId();
});
}
RM的報告本身事務狀態(tài)拂檩,會調(diào)用DefaultCore的branchReport方法,會根據(jù)xid獲取到關聯(lián)的GlobalSession嘲碧,再根據(jù)batchId獲取到BranchSession稻励,調(diào)用GlobalSession的changeBranchStatus改變分支狀態(tài),同樣會通過生命周期監(jiān)聽服務保存到數(shù)據(jù)庫愈涩。
@Override
public void branchReport(BranchType branchType, String xid, long branchId, BranchStatus status,
String applicationData) throws TransactionException {
GlobalSession globalSession = assertGlobalSessionNotNull(xid, true);
BranchSession branchSession = globalSession.getBranch(branchId);
...//代碼省略
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
globalSession.changeBranchStatus(branchSession, status);
...//代碼省略
}
TM的全局提交望抽,會調(diào)用DefaultCore的commit方法,會通過加鎖與判斷全局事務模式與分支節(jié)點是否進行異步提交履婉,如果是異步提交則將globalSession的status置為AsyncCommitting煤篙,等待定時線程池撈取此狀態(tài)的GlobalSession進行提交,否則直接調(diào)用doGlobalCommit進行全局提交毁腿,若全局事務提交成功辑奈,但是還擁有分支節(jié)點,則繼續(xù)走異步提交流程已烤,清除分支節(jié)點鸠窗。
@Override
public GlobalStatus commit(String xid) throws TransactionException {
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
...//代碼省略
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
globalSession.closeAndClean();
if (globalSession.getStatus() == GlobalStatus.Begin) {
if (globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return false;
} else {
globalSession.changeStatus(GlobalStatus.Committing);
return true;
}
}
return false;
});
if (shouldCommit) {
boolean success = doGlobalCommit(globalSession, false);
if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return GlobalStatus.Committed;
} else {
return globalSession.getStatus();
}
} else {
return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
}
}
doGlobalCommit執(zhí)行全局事務下的所有分支提交回調(diào),若分支提交狀態(tài)不是PhaseTwo_Committed會進行重試草戈,將globalSession的狀態(tài)置為CommitRetrying塌鸯,等待定時線程池撈取此狀態(tài)的GlobalSession進行提交。
@Override
public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
...//代碼省略
...//if是SAGA模式處理唐片,代碼省略
else {
for (BranchSession branchSession : globalSession.getSortedBranches()) {
if (!retrying && branchSession.canBeCommittedAsync()) {
continue;
}
BranchStatus currentStatus = branchSession.getStatus();
if (currentStatus == BranchStatus.PhaseOne_Failed) {
globalSession.removeBranch(branchSession);
continue;
}
try {
BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);
switch (branchStatus) {
case PhaseTwo_Committed:
globalSession.removeBranch(branchSession);
continue;
case PhaseTwo_CommitFailed_Unretryable:
if (globalSession.canBeCommittedAsync()) {
continue;
} else {
SessionHelper.endCommitFailed(globalSession);
return false;
}
default:
if (!retrying) {
globalSession.queueToRetryCommit();
return false;
}
if (globalSession.canBeCommittedAsync()) {
continue;
} else {
return false;
}
}
} catch (Exception ex) {
if (!retrying) {
globalSession.queueToRetryCommit();
throw new TransactionException(ex);
}
}
}
if (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {
return false;
}
}
if (success && globalSession.getBranchSessions().isEmpty()) {
SessionHelper.endCommitted(globalSession);
...//代碼省略
}
return success;
}
全局提交對每個branchSession都進行分支提交的回調(diào)丙猬,在RM端由DataSourceManager.branchCommit方法提交到待提交隊列ASYNC_COMMIT_BUFFER,異步等待timerExecutor調(diào)用UndoLogManager的batchDeleteUndoLog方法進行刪除undolog费韭。
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData)));
...//代碼省略
return BranchStatus.PhaseTwo_Committed;
}
private void doBranchCommits() {
...//代碼省略
for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
Connection conn = null;
DataSourceProxy dataSourceProxy;
try {
try {
...//代碼省略
for (Phase2Context commitContext : contextsGroupedByResourceId) {
...//代碼省略
if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
...//代碼省略
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(
xids, branchIds, conn);
...//代碼省略
xids.clear();
branchIds.clear();
}
}
...//代碼省略
}
TM的全局回滾茧球,會調(diào)用DefaultCore的rollback方法,會通過加鎖與判斷全局事務模式是否進行回滾星持,如果是進行回滾則將globalSession的status置為Rollbacking抢埋,直接調(diào)用doGlobalRollback進行全局回滾。
@Override
public GlobalStatus rollback(String xid) throws TransactionException {
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
...//代碼省略
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
globalSession.close();
if (globalSession.getStatus() == GlobalStatus.Begin) {
globalSession.changeStatus(GlobalStatus.Rollbacking);
return true;
}
return false;
});
...//代碼省略
doGlobalRollback(globalSession, false);
return globalSession.getStatus();
}
在doGlobalRollback中督暂,會執(zhí)行全局事務下的所有分支回滾回調(diào)揪垄,若分支提交狀態(tài)不是PhaseTwo_Rollbacked會進行重試,將globalSession的狀態(tài)置為TimeoutRollbackRetrying或者RollbackRetrying逻翁,等待定時線程池撈取此狀態(tài)的GlobalSession進行回滾饥努。
@Override
public boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
boolean success = true;
...//if是SAGA模式,代碼省略
else {
for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
BranchStatus currentBranchStatus = branchSession.getStatus();
if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
globalSession.removeBranch(branchSession);
continue;
}
try {
BranchStatus branchStatus = branchRollback(globalSession, branchSession);
switch (branchStatus) {
case PhaseTwo_Rollbacked:
globalSession.removeBranch(branchSession);
continue;
case PhaseTwo_RollbackFailed_Unretryable:
SessionHelper.endRollbackFailed(globalSession);
return false;
default:
if (!retrying) {
globalSession.queueToRetryRollback();
}
return false;
}
} catch (Exception ex) {
if (!retrying) {
globalSession.queueToRetryRollback();
}
throw new TransactionException(ex);
}
}
...//代碼省略
if (success) {
SessionHelper.endRollbacked(globalSession);
...//代碼省略
}
return success;
}
全局回滾對每個branchSession都進行分支回滾的回調(diào)八回,在RM端由DataSourceManager.branchRollback方法調(diào)用UndoLogManager.undo邏輯酷愧,將undolog進行重放恢復數(shù)據(jù)驾诈。
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
...//代碼省略
try {
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
...//代碼省略
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
return BranchStatus.PhaseTwo_Rollbacked;
}
AT-TC端總結(jié)
新版的AT-TM端將數(shù)據(jù)存放在數(shù)據(jù)庫而不是本地文件,進而可以基于zk溶浴、etcd等分布式協(xié)調(diào)服務進行高可用部署乍迄,它的主要功能是將全局事務的相關數(shù)據(jù)進行保存,掌控全局事務的提交與回滾回調(diào)士败,并且可以對全局事務進行補償重試闯两。
最后總結(jié)
AT模式通過本地事務先提交,全局事務提交時異步刪除分支undolog拱烁,提高了全局事務的性能生蚁,但是相對于樸素的正向補償噩翠,AT模式帶來了形成鏡像的查詢戏自、與TC通信加大耗時等負面效果,只能說AT模式是把雙刃劍伤锚,并不是銀彈擅笔。軟件的藝術之美源于trade-off,是否使用還是要看業(yè)務形態(tài)的適用情況與布道者在關鍵時機的推廣落地屯援。