描述
seata是分布式事務(wù)解決方案。分布式事務(wù)是包含若干分支事務(wù)的全局事務(wù)。如果各分支事務(wù)提交成功矛市,則全局事務(wù)提交。如果各分支有一個(gè)執(zhí)行失敗诲祸,則全局事務(wù)回滾浊吏。
分布式事務(wù)處理過程抽象為事務(wù)管理器(Transaction Manager)、分支事務(wù)(Resource Manager )救氯、事務(wù)協(xié)調(diào)器(Transaction Coordinator)找田。
TC作為服務(wù)端部署,協(xié)調(diào)RM的提交或回滾着憨。RM和TM集成作為客戶端部署墩衙。如果有注冊中心,則TC甲抖、RM漆改、TM都需要向注冊中心注冊。
適合在MVC框架中准谚,在service層方法上添加@GlobalTransactional
開啟全局事務(wù)挫剑,然后service中用到的dao層是分支事務(wù)
測試環(huán)境
下面使用seata-samples中springboot-mybatis項(xiàng)目做測試,seata-simple該項(xiàng)目依賴很多柱衔,可以將不需要的子項(xiàng)目刪除再測試樊破。還需要下載seata-server。
初始化
初始化DataSourceProxy
中秀存,緩存DataSourceProxy捶码,生成ResourceId并向TC注冊資源。
一階段
TM
在方法上添加注解@GlobalTransactional
開啟分布式事務(wù)或链。該注解由GlobalTransactionalInterceptor
攔截處理惫恼。由下圖可知TM處理流程:
1.開啟全局事務(wù),向TC注冊全局事務(wù)并返回XID
2.如果業(yè)務(wù)執(zhí)行成功澳盐,通知TC全局事務(wù)提交
3.如果業(yè)務(wù)執(zhí)行失敗祈纯,通知TC全局事務(wù)回滾
4.清除內(nèi)存中XID
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
// 1.1 get transactionInfo
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
try {
// 2. begin transaction
beginTransaction(txInfo, tx);
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3.the needed business exception to rollback.
completeTransactionAfterThrowing(txInfo,tx,ex);
throw ex;
}
// 4. everything is fine, commit.
commitTransaction(tx);
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}
DefaultGlobalTransaction
定義了提交令宿、回滾方法。執(zhí)行操作的角色有發(fā)起者(Launcher)腕窥、參與者(Participant)粒没。只有發(fā)起者才可以執(zhí)行begin、commit簇爆、rollback癞松。
RM
1.向TC注冊分支事務(wù)
2.插入回滾日志
3.提交本地事務(wù)
4.通知TC本地事務(wù)執(zhí)行狀態(tài)
5.清空內(nèi)存中XID、branchId
AT模式主要封裝了數(shù)據(jù)庫操作入蛆。DataSource响蓉、Connection、Statement封裝為DataSourceProxy哨毁、ConnectionProxy枫甲、StatementProxy。
執(zhí)行sql時(shí)扼褪,ConnectionProxy創(chuàng)建Statement包裝類StatementProxy對象想幻。StatementProxy創(chuàng)建Executor。Executor中解析目標(biāo)sql话浇,再創(chuàng)建beforeImage脏毯,執(zhí)行sql語句、創(chuàng)建afterImage幔崖。最后ConnectionProxy提交本地事務(wù)抄沮。
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
// 設(shè)置手動(dòng)提交
connectionProxy.setAutoCommit(false);
return new LockRetryPolicy(connectionProxy.getTargetConnection()).execute(() -> {
// 創(chuàng)建beforeImage、執(zhí)行sql岖瑰、創(chuàng)建afterImage
T result = executeAutoCommitFalse(args);
// 提交本地事務(wù)
connectionProxy.commit();
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
((ConnectionProxy) connectionProxy).getContext().reset();
connectionProxy.setAutoCommit(true);
}
}
protected T executeAutoCommitFalse(Object[] args) throws Exception {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}
private void processGlobalTransactionCommit() throws SQLException {
try {
// RM向TC注冊分支事務(wù),獲取branchId
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
if (context.hasUndoLog()) {
// 插入數(shù)據(jù)庫回滾日志
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
}
// 提交本地事務(wù)
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
// RM向TC報(bào)告本地事務(wù)提交失敗
report(false);
throw new SQLException(ex);
}
// RM向TC報(bào)告本地事務(wù)提交成功
report(true);
// 清空XID砂代、branchId
context.reset();
}
二階段
TM通知TC全局事務(wù)提交/回滾蹋订。TC通知各RM提交/回滾本地事務(wù)。
RM以RMHandlerAT
來處理TC的事務(wù)提交通知刻伊。提交全局事務(wù)時(shí)露戒,RM將刪除undo log。先將刪除操作封裝為任務(wù)放入AsyncWorker
中的阻塞隊(duì)列中捶箱,并返回TC成功消息智什。AsyncWorker
中的定時(shí)器每隔1s執(zhí)行刪除任務(wù)。
回滾時(shí)丁屎,RM先獲取undo log回滾數(shù)據(jù)荠锭,然后刪除undo log。邏輯在AbstractUndoLogManager
中undo()
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;
for (; ; ) {
try {
// 獲取源數(shù)據(jù)庫鏈接
conn = dataSourceProxy.getPlainConnection();
// The entire undo process should run in a local transaction.
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
// Find UNDO LOG
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();
boolean exists = false;
while (rs.next()) {
exists = true;
// It is possible that the server repeatedly sends a rollback request to roll back
// the same branch transaction to multiple processes,
// ensuring that only the undo_log in the normal state is processed.
int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
if (!canUndo(state)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, ignore {} undo_log",
xid, branchId, state);
}
return;
}
String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = parseContext(contextString);
Blob b = rs.getBlob(ClientTableColumnsName.UNDO_LOG_ROLLBACK_INFO);
byte[] rollbackInfo = BlobUtils.blob2Bytes(b);
String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() :
UndoLogParserFactory.getInstance(serializer);
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try {
// put serializer name to local
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy).getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
sqlUndoLog.setTableMeta(tableMeta);
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
dataSourceProxy.getDbType(),
sqlUndoLog);
// 回滾數(shù)據(jù)
undoExecutor.executeOn(conn);
}
} finally {
// remove serializer name
removeCurrentSerializer();
}
}
// If undo_log exists, it means that the branch transaction has completed the first phase,
// we can directly roll back and clean the undo_log
// Otherwise, it indicates that there is an exception in the branch transaction,
// causing undo_log not to be written to the database.
// For example, the business processing timeout, the global transaction is the initiator rolls back.
// To ensure data consistency, we can insert an undo_log with GlobalFinished state
// to prevent the local transaction of the first phase of other programs from being correctly submitted.
// See https://github.com/seata/seata/issues/489
if (exists) {
// 刪除undo log
deleteUndoLog(xid, branchId, conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log deleted with {}",
xid, branchId, State.GlobalFinished.name());
}
} else {
// 插入undo log晨川,防止一階段重復(fù)寫入
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log added with {}",
xid, branchId, State.GlobalFinished.name());
}
}
return;
}
}
}
隔離界別
寫隔離
一階段本地事務(wù)提交前要先獲取全局鎖证九,獲取到才可提交删豺。獲取全局鎖被限定在一定時(shí)間范圍內(nèi)。
比如tx1和tx2兩個(gè)事務(wù)更新數(shù)據(jù)場景愧怜。tx1先獲取本地鎖執(zhí)行更新呀页,然后獲取全局鎖去提交本地事務(wù),本地事務(wù)提交后釋放本地鎖拥坛。此時(shí)全局鎖還是tx1蓬蝶。tx2更新時(shí),無法獲取全局鎖猜惋,也就無法提交本地事務(wù)丸氛。
如果tx1提交,則釋放全局鎖惨奕,tx2才可以執(zhí)行雪位。
如果tx1回滾,需要獲取本地鎖執(zhí)行補(bǔ)償操作梨撞,而tx2擁有本地鎖雹洗。所以tx1本地事務(wù)回滾失敗,并一直重試卧波。直到tx2獲取全局鎖超時(shí)时肿,釋放本地鎖為止。
讀隔離
默認(rèn)分支事務(wù)是讀已提交級別或之上港粱,全局事務(wù)是讀未提交級別螃成。如果應(yīng)用需要全局隔離級別到讀已提交,則是通過SELECT FOR UPDATE代理語句實(shí)現(xiàn)查坪。
SELECT FOR UPDATE執(zhí)行時(shí)會(huì)先獲取全局鎖寸宏,而也會(huì)獲取數(shù)據(jù)庫拍他鎖。如果獲取失敗偿曙,就釋放本地鎖氮凝,然后重試。這時(shí)查詢是阻塞的望忆。直到獲取全局鎖罩阵。
引用
https://my.oschina.net/u/1464083/blog/3040896#h2_8
https://www.sofastack.tech/blog/seata-distributed-transaction-deep-dive/
https://mp.weixin.qq.com/s/EzmZ-DAi-hxJhRkFvFhlJQ
http://seata.io/zh-cn/docs/overview/what-is-seata.html
https://github.com/javagrowing/JGrowing/tree/master/%E5%88%86%E5%B8%83%E5%BC%8F/%E5%88%86%E5%B8%83%E5%BC%8F%E4%BA%8B%E5%8A%A1