1. Spring Tx 概述
首先我們常說的事務(wù)其實是數(shù)據(jù)庫(比如: Mysql) 的事務(wù), 而 Spring 做的事情其實是針對不同 orm (Mybatis | Hibernate)進(jìn)行封裝, 抽成統(tǒng)一的方法 獲取數(shù)據(jù)庫連接, 進(jìn)行數(shù)據(jù)庫操作, 進(jìn)行事務(wù)的提交|回滾 <- 對其實就是這么簡單.
2. Spring Tx 主要組件 TransactionDefinition
TransactionDefinition 是一個接口, 其主要包含了 Spring 中對事務(wù)傳播類別的定義, 隔離級別的定義, 代碼如下:
public interface TransactionDefinition {
// 若當(dāng)前線程不存在事務(wù)中, 則開啟一個事務(wù), 若當(dāng)前存在事務(wù), 則加入其中
int PROPAGATION_REQUIRED = 0;
// 若當(dāng)前存在事務(wù), 則加入到事務(wù)中, 若不存在, 則以非事務(wù)的方式運行
int PROPAGATION_SUPPORTS = 1;
// 若有事務(wù), 則用當(dāng)前的事務(wù), 若沒有, 則直接拋出異常
int PROPAGATION_MANDATORY = 2;
// 若當(dāng)前存在事務(wù), 則掛起事務(wù), 若當(dāng)前不存在事務(wù), 則開啟一個新事務(wù)運行
int PROPAGATION_REQUIRES_NEW = 3;
// 不支持以事務(wù)的方式運行, 若當(dāng)前存在事務(wù), 則將當(dāng)前的事務(wù)掛起
int PROPAGATION_NOT_SUPPORTED = 4;
// 不支持事務(wù), 若當(dāng)前線程含有事務(wù), 則直接拋出異常
int PROPAGATION_NEVER = 5;
// 這個時在原來的事務(wù)中通過 savepoint 的方式 開啟一個局部事務(wù)
int PROPAGATION_NESTED = 6;
// 默認(rèn)隔離級別
int ISOLATION_DEFAULT = -1;
// read_uncommitted 級別
int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED;
// READ_COMMITTED 級別
int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED;
// REPEATABLE_READ 級別
int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ;
// SERIALIZABLE 級別
int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE;
// 默認(rèn)超時時間
int TIMEOUT_DEFAULT = -1;
// 獲取傳播行為
int getPropagationBehavior();
// 獲取隔離級別
int getIsolationLevel();
// 獲取超時時間
int getTimeout();
// 事務(wù)是否是只讀模式
boolean isReadOnly();
// 返回事務(wù)的名字
String getName();
}
其默認(rèn)的實現(xiàn)是 DefaultTransactionDefinition, 在這個類中主要對一些值增加了默認(rèn)值的賦值, 并增加一些對屬性賦值的設(shè)定, 其中有個賦值工具類 Constants, 可以通過這個類將 字符串轉(zhuǎn)為數(shù)字.
3. Spring Tx 主要組件 TransactionStatus
TransactionStatus 這個接口中定義了事務(wù)執(zhí)行過程中的一些屬性, 是否有savePoint, 是否 rollBackOnly, 事務(wù)是否已經(jīng)完成; 而其抽象類中主要是完成 savePoint 以及 rollBackOnly 的具體實現(xiàn); 其默認(rèn)的實現(xiàn)類是 DefaultTransactionStatus, 在這個類中有:
// 事務(wù)連接器, 比如 DataSourceTransactionManager 中的 DataSourceTransactionObject
private final Object transaction;
// 是否是新事務(wù)
private final boolean newTransaction;
// 是否開啟 事務(wù)同步器 <- 其實就是在 TransactionSynchronousManager 中注冊屬性信息
private final boolean newSynchronization;
// 這個事務(wù)是否是 readOnly
private final boolean readOnly;
// debug模式
private final boolean debug;
// suspend 的上個事務(wù)的信息, suspendedResources 可能是 null
private final Object suspendedResources;
4. Spring Tx 主要組件 PlatformTransactionManager
PlatformTransactionManager 這個接口中定義了 Spring 執(zhí)行事務(wù)的主方法:
public interface PlatformTransactionManager {
// 開始事務(wù)
TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
// 提交事務(wù)
void commit(TransactionStatus status) throws TransactionException;
// 回滾事務(wù)
void rollback(TransactionStatus status) throws TransactionException;
}
對于上面的接口, 在抽象類 AbstractPlatformTransactionManager 中進(jìn)行了相應(yīng)的實現(xiàn)(PS: 其實這里主要運用了 策略模式 + 模版模式),
其主要屬性有:
// 永遠(yuǎn)激活 事務(wù)同步器
public static final int SYNCHRONIZATION_ALWAYS = 0;
// 只有在有事務(wù)時才激活事務(wù)同步器
public static final int SYNCHRONIZATION_ON_ACTUAL_TRANSACTION = 1;
// Never active transaction synchronization, not even for actual transactions.
// 從不激活事務(wù)同步器
public static final int SYNCHRONIZATION_NEVER = 2;
/** Constants instance for AbstractPlatformTransactionManager */
// 可通過 Constants 設(shè)置AbstractPlatformTransactionManager中的屬性 --> 類似于 BeanWrapper
private static final Constants constants = new Constants(AbstractPlatformTransactionManager.class);
// 是否 開啟事務(wù)同步器支持
private int transactionSynchronization = SYNCHRONIZATION_ALWAYS;
// 事務(wù)默認(rèn)的超時時間
private int defaultTimeout = TransactionDefinition.TIMEOUT_DEFAULT;
// 嵌套式事務(wù)是否允許
private boolean nestedTransactionAllowed = false;
// 是否校驗 是否存在事務(wù)
private boolean validateExistingTransaction = false;
// 分布式事務(wù)中的 rollback 屬性
private boolean globalRollbackOnParticipationFailure = true;
// 分布式事務(wù)中的 rollback 屬性
private boolean failEarlyOnGlobalRollbackOnly = false;
// 在 commit 過程中若出現(xiàn) 異常是否會 rollback
private boolean rollbackOnCommitFailure = false;
在這個類中實現(xiàn)了事務(wù)操作的主要方法;
5. AbstractPlatformTransactionManager 開啟事務(wù)方法
其主邏輯如下:
1. 獲取事務(wù)連接器, 比如 DataSourceTransactionManager 中就是 DataSourceTransactionObject 對象(存放的是 connect, savePoint, 是否是新事務(wù))
2. 若不存在 TransactionDefinition, 則創(chuàng)建一個默認(rèn)的 TransactionDefinition
3. 判斷當(dāng)前是否存在事務(wù)中
3.1 如果當(dāng)前已經(jīng)存在事務(wù), 且當(dāng)前事務(wù)的傳播屬性設(shè)置為 PROPAGATION_NEVER, 那么拋出異常
3.2 如果當(dāng)前事務(wù)的配置屬性是 PROPAGATION_NOT_SUPPORTED, 同時當(dāng)前線程已經(jīng)存在事務(wù)了, 那么將事務(wù)掛起, 并且封裝 TransactionStatus
3.3 如果當(dāng)前事務(wù)的配置屬性是 PROPAGATION_REQUIRES_NEW, 創(chuàng)建新事務(wù), 同時將當(dāng)前線程存在的事務(wù)掛起, 與創(chuàng)建全新事務(wù)的過程類是, 區(qū)別在于在創(chuàng)建全新事務(wù)時不用考慮已有事務(wù)的掛起, 但在這里, 需要考慮已有事務(wù)的掛起
3.4 嵌套事務(wù)的處理, 創(chuàng)建 TransactionStatus, 創(chuàng)建保存點
3.5 對于那些沒有匹配的傳播級別, 默認(rèn)的封裝以下 TransactionStatus
4. 檢查事務(wù)屬性中 timeout 的設(shè)置是否合理 <-- 這里的 timeout 只在 DataSourceUtils.applyTransactionTimeout 中看到有對應(yīng)的檢查工作
5. 如果當(dāng)前線程不存在事務(wù), 但是 propagationBehavior 被設(shè)置為 PROPAGATION_MANDATORY 拋棄異常
6. PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NEXTED 都需要新建事務(wù)
7. 創(chuàng)建 TransactionStatus, 開始事務(wù), 準(zhǔn)備 TransactionSynchronous
與之對應(yīng)的代碼如下:
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
// 這個 doGetTransaction() 抽象函數(shù), transaction對象的獲取 由具體的事務(wù)處理器實現(xiàn), 比如 DataSourceTransactionManager
// 這里是 DataSourceTransactionObject 對象(存放的是 connect, savePoint, 是否是新事務(wù))
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
// 關(guān)于這個 DefaultTransactionDefinition, 在前面編程式使用事務(wù)處理的時候遇到過, 這個 DefaultTransactionDefinition 的默認(rèn)事務(wù)處理屬性是
// propagationBehavior = PROPAGATION_REQUIRED; isolationLevel = ISOLATION_DEFAULT; timeout=TIMEOUT_DEFAULT; readlyOnly = false
if (definition == null) { // 如果參數(shù) definition == null -> 則創(chuàng)建一個默認(rèn)的 TransactionDefinition <- 這其實就是一個事務(wù)屬性配置的對象
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
// 判斷當(dāng)前被線程是否存在事務(wù), 判斷依據(jù)為當(dāng)前線程記錄的連接不為空且連接中(connectionHolder) 中的 tranactionActive 屬性 = true
// 如果已經(jīng)存在事務(wù), 那么需要要根據(jù)在事務(wù)屬性中定義的事務(wù)傳播屬性配置來處理事務(wù)
if (isExistingTransaction(transaction)) {
// 這里對當(dāng)前線程中已經(jīng)由事務(wù)存在的情況進(jìn)行處理, 所有的處理結(jié)果都封裝在 TransactionStatus 中
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// 檢查事務(wù)屬性中 timeout 的設(shè)置是否合理 <-- 這里的 timeout 只在 DataSourceUtils.applyTransactionTimeout 中看到有對應(yīng)的檢查工作
// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// 如果當(dāng)前線程不存在事務(wù), 但是 propagationBehavior 被設(shè)置為 PROPAGATION_MANDATORY 拋棄異常
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NEXTED 都需要新建事務(wù)
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 空掛起 <- 這里掛起 (PS: 有同學(xué)可能疑問, 上面明明判斷了是否在事務(wù)中, 這里沒有必要掛起, 但有種情況 txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive() == false)
SuspendedResourcesHolder suspendedResources = suspend(null); // <- 這里掛起其實主要還是 TransactionSynchronizationManager 中配置的資源
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
// 是否開啟一個新的 事務(wù)同步器
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 創(chuàng)建事務(wù)狀態(tài) DefaultTransactionStatus
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 構(gòu)造 transaction, 包括設(shè)置 ConnectionHolder, 隔離級別, timeout, 如果是新連接, 綁定到當(dāng)前線程
doBegin(transaction, definition);
// 新同步事務(wù)的設(shè)置, 針對當(dāng)前線程的設(shè)置
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources); // resume 掛起來的 資源
throw ex;
}
catch (Error err) {
resume(null, suspendedResources); // resume 掛起來的 資源
throw err;
}
}
else {
// 這里其實就是沒有開啟事務(wù), 相比上面的 if 中, 就少了 doBegin 函數(shù)的調(diào)用
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); // 關(guān)鍵是這里第二個參數(shù)是 nul
}
}
下面是處理已經(jīng)存在事務(wù)那部分:
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 如果當(dāng)前已經(jīng)存在事務(wù), 且當(dāng)前事務(wù)的傳播屬性設(shè)置為 PROPAGATION_NEVER, 那么拋出異常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// 如果當(dāng)前事務(wù)的配置屬性是 PROPAGATION_NOT_SUPPORTED, 同時當(dāng)前線程已經(jīng)存在事務(wù)了, 那么將事務(wù)掛起
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
// 將事務(wù)的掛起 <- 其實就是 TransactionSynchronizationManager 中的屬性
Object suspendedResources = suspend(transaction);
// 是否開啟一個新的事務(wù)同步器
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 意味著事務(wù)方法不需要放在事務(wù)環(huán)境中執(zhí)行, 同時掛起事務(wù)的信息保存在 TransactionStatus 中, 這里包括了, 進(jìn)程 ThreadLocal 對事務(wù)信息的記錄
return prepareTransactionStatus( // 注意這里第二個參數(shù)是 null
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 如果當(dāng)前事務(wù)的配置屬性是 PROPAGATION_REQUIRES_NEW, 創(chuàng)建新事務(wù), 同時將當(dāng)前線程存在的事務(wù)掛起, 與創(chuàng)建全新事務(wù)的過程類是, 區(qū)別在于在創(chuàng)建全新事務(wù)時不用考慮已有事務(wù)的掛起, 但在這里, 需要考慮已有事務(wù)的掛起
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
// 將事務(wù)的掛起 <- 其實就是 TransactionSynchronizationManager 中的屬性
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
// 是否開啟一個新的事務(wù)同步器
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 掛起事務(wù)的信息記錄保存在 TransactionStatus 中, 這里包括ThreadLocal 對事務(wù)信息的記錄
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 構(gòu)造 transaction, 包括設(shè)置 ConnectionHolder, 隔離級別, timeout, 如果是新連接, 綁定到當(dāng)前線程
doBegin(transaction, definition);
// 新同步事務(wù)的設(shè)置, 針對當(dāng)前線程的設(shè)置
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) { // 拋出異常的話, 直接恢復(fù) 剛才掛起的事務(wù)
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}
// 嵌套事務(wù)的處理, 創(chuàng)建 TransactionStatus, 創(chuàng)建保存點
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) { // 檢查是否允許嵌套事務(wù)
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// 如果沒有可以使用保存點的方式控制事務(wù)回滾, 那么在嵌套式事務(wù)的建立初始建立保存點
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
// 在 Spring 管理的事務(wù)中, 創(chuàng)建事務(wù)保存點
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); // <-- 這里的兩個 false 分別表示 是否是新事務(wù), 新的事務(wù)同步器
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
// 有些情況是不能使用保存點操作, 比如 JTA, 那么就建立新事務(wù)
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 掛起事務(wù)的信息記錄保存在 TransactionStatus 中, 這里包括ThreadLocal 對事務(wù)信息的記錄
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
// 構(gòu)造 transaction, 包括設(shè)置 ConnectionHolder, 隔離級別, timeout, 如果是新連接, 綁定到當(dāng)前線程
doBegin(transaction, definition);
// 新同步事務(wù)的設(shè)置, 針對當(dāng)前線程的設(shè)置
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
// 對已經(jīng)存在的事務(wù)的屬性進(jìn)行校驗
if (isValidateExistingTransaction()) {
// 隔離級別的校驗 <- 是否一致性 TransactionDefinition 與 TransactionSynchronizationManager 中的值
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
// readOnly 的校驗 <- 是否一致性 TransactionDefinition 與 TransactionSynchronizationManager 中的值
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 返回 TransactionStatus 注意第三個參數(shù) false 表示 當(dāng)前事務(wù)沒有使用新事務(wù)
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
6. AbstractPlatformTransactionManager 提交事務(wù)方法
在提交事務(wù)時, 主要是放在 TransactionSynchronousManager 中回調(diào)函數(shù)的調(diào)用, 事務(wù)的提交, savePoint 的處理
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
// 事務(wù)提交的準(zhǔn)備工作由具體的事務(wù)處理器完成
prepareForCommit(status); // 預(yù)留方法, 留給子類區(qū)擴充
triggerBeforeCommit(status); // 添加的 TransactionSynchronization 中的對應(yīng)方法 beforeCommit 的調(diào)用
triggerBeforeCompletion(status); // 添加的 TransactionSynchronization 中的對應(yīng)方法 beforeCompletion 的調(diào)用
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false; // 手動設(shè)置回回滾 <- 這里是 針對分布式事務(wù)
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) { // 若是嵌套事務(wù), 則直接釋放保存點
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
status.releaseHeldSavepoint(); // 如果存在保存點則清除保存點信息
}
/**
* 下面對當(dāng)前線程中保存的事務(wù)狀態(tài)進(jìn)行處理, 如果當(dāng)前的事務(wù)是一個新事務(wù), 調(diào)用具體的事務(wù)處理器完成提交
* 如果當(dāng)前所持有的事務(wù)不是新事務(wù)碉考, 則不提交, 由已有的事務(wù)來完成提交
*/
else if (status.isNewTransaction()) { // 若是新事務(wù), 則直接提交
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
doCommit(status); // 進(jìn)行事務(wù)的提交 <- 這是個抽象方法, 交由子類實現(xiàn)
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (globalRollbackOnly) { // 分布式事務(wù)里面手動設(shè)置了 rollbackOnly 則直接拋出異常 <-- 這個現(xiàn)在很少使用了
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) { // 在 commit 提交異常時, 是否需要進(jìn)行回滾操作
doRollbackOnCommitException(status, ex);
}
else { // 添加的 TransactionSynchronization 中的對應(yīng)方法 afterCompletion 的調(diào)用
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException ex) { // 提交的過程中, 若出現(xiàn) RuntimeException 則直接回滾
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
catch (Error err) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status); // 添加的 TransactionSynchronization 中的對應(yīng)方法 beforeCompletion 的調(diào)用
}
doRollbackOnCommitException(status, err); // 提交過程中出現(xiàn)異常則回滾
throw err;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
// 觸發(fā) AfterCommit 回滾
try {
triggerAfterCommit(status); // 添加的 TransactionSynchronization 中的對應(yīng)方法 afterCommit 的調(diào)用
}
finally { // 添加的 TransactionSynchronization 中的對應(yīng)方法 afterCompletion 的調(diào)用
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally { // commit 完成, 這里做一些清理數(shù)據(jù)的工作
cleanupAfterCompletion(status);
}
}
上面是事務(wù)提交的主流程, 流程總體清晰, 與之而來的出現(xiàn)了一個新的角色 TransactionSynchronizationManager
7. Spring Tx 組件 TransactionSynchronizationManager
在事務(wù)執(zhí)行的過程中, 需要保存很多變量值, 包括一些回調(diào)函數(shù) TransactionSynchronization
private static final ThreadLocal<Map<Object, Object>> resources = // key 是 dataSource, value 是 ConnectionHolder, 這里的 Map 是為了解決, 同一個線程操作多個 DataSource 而準(zhǔn)備de
new NamedThreadLocal<Map<Object, Object>>("Transactional resources");
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = // 存儲 TransactionSynchronization <- 這里面存儲的都是回調(diào)函數(shù)
new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");
private static final ThreadLocal<String> currentTransactionName = // 當(dāng)前事務(wù)的名稱
new NamedThreadLocal<String>("Current transaction name");
private static final ThreadLocal<Boolean> currentTransactionReadOnly = // 當(dāng)前事務(wù)是否是 readOnly
new NamedThreadLocal<Boolean>("Current transaction read-only status");
private static final ThreadLocal<Integer> currentTransactionIsolationLevel = // 當(dāng)前事務(wù)的隔離級別
new NamedThreadLocal<Integer>("Current transaction isolation level");
private static final ThreadLocal<Boolean> actualTransactionActive = // 當(dāng)前線程是否已經(jīng)在事務(wù)中
new NamedThreadLocal<Boolean>("Actual transaction active");
每次在事務(wù) suspend 或 resume 時, 其實操作的就是通過TransactionSynchronizationManager 將屬性放在 ThreadLocal 中
8. Spring Tx 組件 TransactionAttributeSource
TransactionAttributeSource 它是事務(wù)屬性獲取器(PS: 這里出現(xiàn)了 TransactionAttribute這個對象, 其實就是 TransactionDefinition 加上 一些其他屬性), 主要的 TransactionAttributeSource 有如下:
1. NameMatchTransactionAttributeSource: 通過將 Properties 里面的屬性轉(zhuǎn)化成 methodName <--> TransactionAttribute 的TransactionAttributeSource
2. MethodMapTransactionAttributeSource: 通過配置文件配置 className.methodName <--> TransactionAttribute 形式注入的 MethodMapTransactionAttributeSource
3. MatchAlwaysTransactionAttributeSource: 只要是用戶定義的方法就返回 true 的 TransactionAttributeSource
4. CompositeTransactionAttributeSource: 組合多個 TransactionAttributeSource, 只要其中有一個獲取 TransactionAttribute, 就 OK
5. AnnotationTransactionAttributeSource: 通過獲取方法上的注解信息來獲知 事務(wù)的屬性, 解析主要由 SpringTransactionAnnotationParser 來進(jìn)行
在上面幾個類中, AnnotationTransactionAttributeSource 是我們最長使用的 TransactionDefinition 的解析器, 它內(nèi)部其實蠻簡單的, 主要還是通過 SpringTransactionAnnotationParser 解析方法上注解 @Transactional 中的信息來獲得事務(wù)屬性
9. Spring Tx 組件 TransactionAspectSupport
這是事務(wù)支持的一個工具類, 其也是 TransactionInterceptor 的父類, 在這個類中定義了執(zhí)行事務(wù)的主邏輯 -> 方法 invokeWithinTransaction (PS: 其實就是aop 中的 aroundAdvice)
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
// 這里讀取事務(wù)的屬性和設(shè)置, 通過 TransactionAttributeSource 對象取得
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
// 獲取 beanFactory 中的 transactionManager
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 構(gòu)造方法唯一標(biāo)識(類, 方法, 如 service.UserServiceImpl.save)
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
/**
* 這里區(qū)分不同類型的 PlatformTransactionManager 因為它們的調(diào)用方式不同
* 對 CallbackPreferringPlatformTransactionManager 來說, 需要回調(diào)函數(shù)來
* 實現(xiàn)事務(wù)的創(chuàng)建和提交
* 對于非 CallbackPreferringPlatformTransactionManager 來說, 不需要通過
* 回調(diào)函數(shù)來實現(xiàn)事務(wù)的創(chuàng)建和提交
* 像 DataSourceTransactionManager 就不是 CallbackPreferringPlatformTransactionManager
* 不需要通過回調(diào)的方式來使用
*/
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 這里創(chuàng)建事務(wù), 同時把創(chuàng)建事務(wù)過程中得到的信息放到 TransactionInfo 中去 (創(chuàng)建事務(wù)的起點)
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 這里的調(diào)用使用處理沿著攔截器鏈進(jìn)行, 使最后目標(biāo)對象的方法得到調(diào)用
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 如果在事務(wù)處理方法調(diào)用中出現(xiàn)異常, 事務(wù)處理如何進(jìn)行需要根據(jù)具體的情況考慮回滾或者提交
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 這里把與線程綁定的 TransactionInfo 設(shè)置為 oldTransactionInfo
cleanupTransactionInfo(txInfo);
}
// 這里通過事務(wù)處理器來對事務(wù)進(jìn)行提交
commitTransactionAfterReturning(txInfo);
return retVal;
}
}
其實上面的主邏輯還是: 創(chuàng)建事務(wù), 執(zhí)行SQL, 提交/回滾事務(wù)
10. Spring Tx 組件 TransactionTemplate
TransactionTemplate 這個類是編程式事務(wù)常用的類, 當(dāng)使用 JdbcTemplate 時其實也是在使用這個 template 類 (PS: TransactionTemplate 繼承 DefaultTransactionDefinition, 其本身就是個TransactionDefinition), 其執(zhí)行邏輯也是獲取 事務(wù), 開始事務(wù), 執(zhí)行 SQL, 提交/回滾事務(wù) <-- 其內(nèi)部還是使用 PlatformTransactionManager 來完成的
11. 總結(jié)
Spring 事務(wù)其實只是做了一些上層的封裝, 其有以下幾個設(shè)計特點:
1. 通過 ThreadLocal 將事務(wù)執(zhí)行過程中的很多屬性(包括回調(diào)函數(shù))存儲其中
2. 針對 整個事務(wù)的執(zhí)行流程, Spring都封裝到 AbstractPlatformTransactionManager 中, 在其里面定義了事務(wù)獲取, 執(zhí)行, 提交的整個邏輯, 并且遺留下一些模版方法給子類實現(xiàn) <- 模版方法, 而其下的各個子類, 又是策略模式
3. 通過 Spring AOP 來實現(xiàn)通過在方法上增加注解實現(xiàn)事務(wù)的封裝
4. Spring 定義了程序中事務(wù)的傳播行為
12. 參考:
Spring Aop核心源碼分析
Spring技術(shù)內(nèi)幕
Spring 揭秘
Spring 源碼深度分析
開濤 Spring 雜談
傷神 Spring 源碼分析
Spring源碼情操陶冶