寫在前面
seata-XA模式是利用事務(wù)資源(數(shù)據(jù)庫贱鼻、消息服務(wù)等)對 XA 協(xié)議的支持勒虾,以 XA 協(xié)議的機(jī)制來管理分支事務(wù)的一種解決方案县踢。
跟AT、 TCC模式對比悼瓮,XA擁有全局事務(wù)的隔離性戈毒,并且基于數(shù)據(jù)庫現(xiàn)有的XA能力,seata可以對全局事務(wù)的實(shí)現(xiàn)進(jìn)行“打薄”(比如undolog的保存重放刪除横堡、全局鎖等都可以不做)埋市,并且適配已經(jīng)在使用XA模式的業(yè)務(wù)形態(tài)。但缺點(diǎn)顯而易見命贴,XA是三種模式中性能最差的道宅。
在代碼實(shí)現(xiàn)中食听,AT模式與XA模式大同小異,都是一樣通過TM污茵、RM樱报、TC三種協(xié)同處理全局事務(wù)。只不過TCC模式下分支事務(wù)的注冊省咨、提交或回滾是調(diào)用自己實(shí)現(xiàn)的TCC行為類進(jìn)行處理肃弟。
XA-TM端分析
首先說明,XA流程同樣需要打上@GolbalTransaction注解的TM范圍中完成分支注冊零蓉、提交或回滾笤受,不在細(xì)說。
XA-RM端分析
XA模式同樣是通過自上而下的動態(tài)代理數(shù)據(jù)庫相關(guān)類(Database敌蜂、Connection箩兽、Statement),在原方法執(zhí)行過程中對sql的執(zhí)行進(jìn)行攔截處理章喉。
首先AT-RM端會將原DataSource進(jìn)行代理形成DataSourceProxyXA汗贫,這樣可以通過DataSourceProxyXA從原DataSource獲取Connection進(jìn)行代理形成XAConnection封裝的ConnectionProxyXA,最后可以通過ConnectionProxyXA獲取Statement進(jìn)行代理形成StatementProxyXA秸脱。由ConnectionProxyXA與StatementProxyXA接管sql的執(zhí)行與全局事務(wù)的處理落包。
public DataSourceProxyXA(DataSource dataSource, String resourceGroupId) {
if (dataSource instanceof SeataDataSourceProxy) {
LOGGER.info("Unwrap the data source, because the type is: {}", dataSource.getClass().getName());
dataSource = ((SeataDataSourceProxy) dataSource).getTargetDataSource();
}
this.dataSource = dataSource;
this.branchType = BranchType.XA;
JdbcUtils.initDataSourceResource(this, dataSource, resourceGroupId);
//Set the default branch type to 'XA' in the RootContext.
RootContext.setDefaultBranchType(this.getBranchType());
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
Connection connection = dataSource.getConnection(username, password);
return getConnectionProxy(connection);
}
@Override
public Statement createStatement() throws SQLException {
Statement targetStatement = originalConnection.createStatement();
return new StatementProxyXA(this, targetStatement);
}
同樣的,看StatementProxyXA的方法都被ExecuteTemplateXA執(zhí)行模板封裝執(zhí)行摊唇。
@Override
public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
return ExecuteTemplateXA.execute(connectionProxyXA, (statement, args) -> statement.executeUpdate((String)args[0], (int[])args[1]), targetStatement, sql, columnIndexes);
}
來到ExecuteTemplateXA咐蝇,同樣會執(zhí)行先開門再關(guān)門操作,調(diào)用 connectionProxyXA.setAutoCommit(false)先開門巷查,執(zhí)行原sql有序,再根據(jù)執(zhí)行結(jié)果調(diào)用connectionProxyXA.commit或者connectionProxyXA.rollback,最后調(diào)用connectionProxyXA.setAutoCommit(true)進(jìn)行關(guān)門岛请。
public static <T, S extends Statement> T execute(AbstractConnectionProxyXA connectionProxyXA,
StatementCallback<T, S> statementCallback,
S targetStatement,
Object... args) throws SQLException {
boolean autoCommitStatus = connectionProxyXA.getAutoCommit();
if (autoCommitStatus) {
// XA Start
connectionProxyXA.setAutoCommit(false);
}
try {
T res = null;
try {
// execute SQL
res = statementCallback.execute(targetStatement, args);
} catch (Throwable ex) {
if (autoCommitStatus) {
// XA End & Rollback
try {
connectionProxyXA.rollback();
} catch (SQLException sqle) {
// log and ignore the rollback failure.
LOGGER.warn(
"Failed to rollback xa branch of " + connectionProxyXA.xid +
"(caused by SQL execution failure(" + ex.getMessage() + ") since " + sqle.getMessage(),
sqle);
}
}
if (ex instanceof SQLException) {
throw ex;
} else {
throw new SQLException(ex);
}
}
if (autoCommitStatus) {
try {
// XA End & Prepare
connectionProxyXA.commit();
} catch (Throwable ex) {
LOGGER.warn(
"Failed to commit xa branch of " + connectionProxyXA.xid + ") since " + ex.getMessage(),
ex);
// XA End & Rollback
if (!(ex instanceof SQLException) || AbstractConnectionProxyXA.SQLSTATE_XA_NOT_END != ((SQLException) ex).getSQLState()) {
try {
connectionProxyXA.rollback();
} catch (SQLException sqle) {
// log and ignore the rollback failure.
LOGGER.warn(
"Failed to rollback xa branch of " + connectionProxyXA.xid +
"(caused by commit failure(" + ex.getMessage() + ") since " + sqle.getMessage(),
sqle);
}
}
if (ex instanceof SQLException) {
throw ex;
} else {
throw new SQLException(ex);
}
}
}
return res;
} finally {
if (autoCommitStatus) {
connectionProxyXA.setAutoCommit(true);
}
}
}
1.來到connectionProxyXA.setAutoCommit(false)旭寿,內(nèi)部的操作其實(shí)并不只是設(shè)置自動提交狀態(tài)為否,還帶有向TC注冊分支的操作崇败,然后調(diào)用
xaResource.start盅称,啟動XA分支事務(wù)。
@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
if (currentAutoCommitStatus == autoCommit) {
return;
}
if (autoCommit) {
// According to JDBC spec:
// If this method is called during a transaction and the
// auto-commit mode is changed, the transaction is committed.
if (xaActive) {
commit();
}
} else {
if (xaActive) {
throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active");
}
// Start a XA branch
long branchId = 0L;
try {
// 1. register branch to TC then get the branchId
branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null,
null);
} catch (TransactionException te) {
cleanXABranchContext();
throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te);
}
// 2. build XA-Xid with xid and branchId
this.xaBranchXid = XAXidBuilder.build(xid, branchId);
try {
// 3. XA Start
xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);
} catch (XAException e) {
cleanXABranchContext();
throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e);
}
// 4. XA is active
this.xaActive = true;
}
currentAutoCommitStatus = autoCommit;
}
2.statementCallback.execute后室,將在XA分支事務(wù)的范圍下執(zhí)行sql微渠,不再細(xì)說。
3調(diào)用 connectionProxyXA.commit或者connectionProxyXA.rollback咧擂。
先看connectionProxyXA.commit逞盆,由上文可知,到這個(gè)方法處理前松申,已經(jīng)完成了xaResource.start開始XA分支事務(wù)與執(zhí)行sql云芦,因此commit方法需要調(diào)用xaResource.end表明執(zhí)行成功結(jié)束XA分支事務(wù)俯逾,并且調(diào)用xaResource.prepare方法,保持xa連接舅逸,將等待TC全局提交的回調(diào)桌肴,最后清除上下文。
@Override
public void commit() throws SQLException {
if (currentAutoCommitStatus) {
// Ignore the committing on an autocommit session.
return;
}
if (!xaActive || this.xaBranchXid == null) {
throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END);
}
try {
// XA End: Success
xaResource.end(xaBranchXid, XAResource.TMSUCCESS);
// XA Prepare
xaResource.prepare(xaBranchXid);
// Keep the Connection if necessary
keepIfNecessary();
} catch (XAException xe) {
try {
// Branch Report to TC: Failed
DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(),
BranchStatus.PhaseOne_Failed, null);
} catch (TransactionException te) {
LOGGER.warn("Failed to report XA branch commit-failure on " + xid + "-" + xaBranchXid.getBranchId()
+ " since " + te.getCode() + ":" + te.getMessage() + " and XAException:" + xe.getMessage());
}
throw new SQLException(
"Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
.getMessage(), xe);
} finally {
cleanXABranchContext();
}
}
再看connectionProxyXA.rollback琉历,xa分支事務(wù)回滾需要向TC報(bào)告分支事務(wù)執(zhí)行失敗坠七,然后需要調(diào)用xaResource.end表明執(zhí)行失敗結(jié)束XA分支事務(wù),最后清除上下文旗笔。
@Override
public void rollback() throws SQLException {
if (currentAutoCommitStatus) {
// Ignore the committing on an autocommit session.
return;
}
if (!xaActive || this.xaBranchXid == null) {
throw new SQLException("should NOT rollback on an inactive session");
}
try {
// Branch Report to TC
DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), BranchStatus.PhaseOne_Failed, null);
} catch (TransactionException te) {
// log and ignore the report failure
LOGGER.warn("Failed to report XA branch rollback on " + xid + "-" + xaBranchXid.getBranchId()
+ " since " + te.getCode() + ":" + te.getMessage());
}
try {
// XA End: Fail
xaResource.end(xaBranchXid, XAResource.TMFAIL);
} catch (XAException xe) {
throw new SQLException(
"Failed to end(TMFAIL) xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
.getMessage(), xe);
} finally {
cleanXABranchContext();
}
}
TC的全局提交或回滾回調(diào)會流經(jīng)ResourceManagerXA進(jìn)行處理彪置,都會進(jìn)入到finishBranch方法,如果是全局提交回調(diào)則調(diào)用connectionProxyXA.xaCommit蝇恶,否則調(diào)用connectionProxyXA.xaRollback拳魁,報(bào)錯(cuò)則會通知TC進(jìn)行補(bǔ)償重試。
private BranchStatus finishBranch(boolean committed, BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
XAXid xaBranchXid = XAXidBuilder.build(xid, branchId);
Resource resource = dataSourceCache.get(resourceId);
if (resource instanceof AbstractDataSourceProxyXA) {
try (ConnectionProxyXA connectionProxyXA = ((AbstractDataSourceProxyXA)resource).getConnectionForXAFinish(xaBranchXid)) {
if (committed) {
connectionProxyXA.xaCommit(xid, branchId, applicationData);
LOGGER.info(xaBranchXid + " was committed.");
return BranchStatus.PhaseTwo_Committed;
} else {
connectionProxyXA.xaRollback(xid, branchId, applicationData);
LOGGER.info(xaBranchXid + " was rolled back.");
return BranchStatus.PhaseTwo_Rollbacked;
}
} catch (XAException | SQLException sqle) {
if (sqle instanceof XAException) {
if (((XAException)sqle).errorCode == XAException.XAER_NOTA) {
if (committed) {
return BranchStatus.PhaseTwo_Committed;
} else {
return BranchStatus.PhaseTwo_Rollbacked;
}
}
}
if (committed) {
LOGGER.info(xaBranchXid + " commit failed since " + sqle.getMessage(), sqle);
// FIXME: case of PhaseTwo_CommitFailed_Unretryable
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
} else {
LOGGER.info(xaBranchXid + " rollback failed since " + sqle.getMessage(), sqle);
// FIXME: case of PhaseTwo_RollbackFailed_Unretryable
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
} else {
LOGGER.error("Unknown Resource for XA resource " + resourceId + " " + resource);
if (committed) {
return BranchStatus.PhaseTwo_CommitFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
}
}
}
connectionProxyXA的xaCommit與xaRollback撮弧,會分別調(diào)用xaResource.commit與xaResource.rollback方法潘懊,完成二階段的提交或回滾,并且釋放緩存的XA連接贿衍。
public void xaCommit(String xid, long branchId, String applicationData) throws XAException {
XAXid xaXid = XAXidBuilder.build(xid, branchId);
xaResource.commit(xaXid, false);
releaseIfNecessary();
}
public void xaRollback(String xid, long branchId, String applicationData) throws XAException {
XAXid xaXid = XAXidBuilder.build(xid, branchId);
xaResource.rollback(xaXid);
releaseIfNecessary();
}
XA-TC端分析
同樣的授舟,RM端分支注冊會創(chuàng)建BatchSession保存在數(shù)據(jù)庫,TM執(zhí)行全局事務(wù)的提交或回滾時(shí)也會進(jìn)行回調(diào)與補(bǔ)償處理贸辈,TC端的處理不再細(xì)說释树。