Spring 4.3 源碼分析之 事務(wù)組件概述

1. Spring Tx 概述

首先我們常說的事務(wù)其實是數(shù)據(jù)庫(比如: Mysql) 的事務(wù), 而 Spring 做的事情其實是針對不同 orm (Mybatis | Hibernate)進(jìn)行封裝, 抽成統(tǒng)一的方法 獲取數(shù)據(jù)庫連接, 進(jìn)行數(shù)據(jù)庫操作, 進(jìn)行事務(wù)的提交|回滾 <- 對其實就是這么簡單.

2. Spring Tx 主要組件 TransactionDefinition

TransactionDefinition 是一個接口, 其主要包含了 Spring 中對事務(wù)傳播類別的定義, 隔離級別的定義, 代碼如下:

public interface TransactionDefinition {

    // 若當(dāng)前線程不存在事務(wù)中, 則開啟一個事務(wù), 若當(dāng)前存在事務(wù), 則加入其中
    int PROPAGATION_REQUIRED = 0;

    // 若當(dāng)前存在事務(wù), 則加入到事務(wù)中, 若不存在, 則以非事務(wù)的方式運行
    int PROPAGATION_SUPPORTS = 1;

    // 若有事務(wù), 則用當(dāng)前的事務(wù), 若沒有, 則直接拋出異常
    int PROPAGATION_MANDATORY = 2;

    // 若當(dāng)前存在事務(wù), 則掛起事務(wù), 若當(dāng)前不存在事務(wù), 則開啟一個新事務(wù)運行
    int PROPAGATION_REQUIRES_NEW = 3;

    // 不支持以事務(wù)的方式運行, 若當(dāng)前存在事務(wù), 則將當(dāng)前的事務(wù)掛起
    int PROPAGATION_NOT_SUPPORTED = 4;

    // 不支持事務(wù), 若當(dāng)前線程含有事務(wù), 則直接拋出異常
    int PROPAGATION_NEVER = 5;

    // 這個時在原來的事務(wù)中通過 savepoint 的方式 開啟一個局部事務(wù)
    int PROPAGATION_NESTED = 6;

    // 默認(rèn)隔離級別
    int ISOLATION_DEFAULT = -1;

    // read_uncommitted 級別
    int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED;

    // READ_COMMITTED 級別
    int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED;

    // REPEATABLE_READ 級別
    int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ;

    // SERIALIZABLE 級別
    int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE;

    // 默認(rèn)超時時間
    int TIMEOUT_DEFAULT = -1;

    // 獲取傳播行為
    int getPropagationBehavior();

    //  獲取隔離級別
    int getIsolationLevel();

    // 獲取超時時間
    int getTimeout();

    // 事務(wù)是否是只讀模式
    boolean isReadOnly();

    // 返回事務(wù)的名字
    String getName();
}

其默認(rèn)的實現(xiàn)是 DefaultTransactionDefinition, 在這個類中主要對一些值增加了默認(rèn)值的賦值, 并增加一些對屬性賦值的設(shè)定, 其中有個賦值工具類 Constants, 可以通過這個類將 字符串轉(zhuǎn)為數(shù)字.

3. Spring Tx 主要組件 TransactionStatus

TransactionStatus 這個接口中定義了事務(wù)執(zhí)行過程中的一些屬性, 是否有savePoint, 是否 rollBackOnly, 事務(wù)是否已經(jīng)完成; 而其抽象類中主要是完成 savePoint 以及 rollBackOnly 的具體實現(xiàn); 其默認(rèn)的實現(xiàn)類是 DefaultTransactionStatus, 在這個類中有:

// 事務(wù)連接器, 比如 DataSourceTransactionManager 中的 DataSourceTransactionObject
private final Object transaction;
// 是否是新事務(wù)
private final boolean newTransaction;
// 是否開啟 事務(wù)同步器 <- 其實就是在 TransactionSynchronousManager 中注冊屬性信息
private final boolean newSynchronization;
// 這個事務(wù)是否是 readOnly
private final boolean readOnly;
// debug模式
private final boolean debug;
//  suspend 的上個事務(wù)的信息, suspendedResources 可能是 null
private final Object suspendedResources;
4. Spring Tx 主要組件 PlatformTransactionManager

PlatformTransactionManager 這個接口中定義了 Spring 執(zhí)行事務(wù)的主方法:

public interface PlatformTransactionManager {
    // 開始事務(wù)
    TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
    // 提交事務(wù)
    void commit(TransactionStatus status) throws TransactionException;
    // 回滾事務(wù)
    void rollback(TransactionStatus status) throws TransactionException;
}

對于上面的接口, 在抽象類 AbstractPlatformTransactionManager 中進(jìn)行了相應(yīng)的實現(xiàn)(PS: 其實這里主要運用了 策略模式 + 模版模式),
其主要屬性有:

// 永遠(yuǎn)激活 事務(wù)同步器
public static final int SYNCHRONIZATION_ALWAYS = 0;

// 只有在有事務(wù)時才激活事務(wù)同步器
public static final int SYNCHRONIZATION_ON_ACTUAL_TRANSACTION = 1;

// Never active transaction synchronization, not even for actual transactions.
// 從不激活事務(wù)同步器
public static final int SYNCHRONIZATION_NEVER = 2;

/** Constants instance for AbstractPlatformTransactionManager */
// 可通過 Constants 設(shè)置AbstractPlatformTransactionManager中的屬性 --> 類似于 BeanWrapper
private static final Constants constants = new Constants(AbstractPlatformTransactionManager.class);

// 是否 開啟事務(wù)同步器支持
private int transactionSynchronization = SYNCHRONIZATION_ALWAYS;

// 事務(wù)默認(rèn)的超時時間
private int defaultTimeout = TransactionDefinition.TIMEOUT_DEFAULT;

// 嵌套式事務(wù)是否允許
private boolean nestedTransactionAllowed = false;

// 是否校驗 是否存在事務(wù)
private boolean validateExistingTransaction = false;

// 分布式事務(wù)中的 rollback 屬性
private boolean globalRollbackOnParticipationFailure = true;

// 分布式事務(wù)中的 rollback 屬性
private boolean failEarlyOnGlobalRollbackOnly = false;

// 在 commit 過程中若出現(xiàn) 異常是否會 rollback
private boolean rollbackOnCommitFailure = false;

在這個類中實現(xiàn)了事務(wù)操作的主要方法;

5. AbstractPlatformTransactionManager 開啟事務(wù)方法

其主邏輯如下:

1. 獲取事務(wù)連接器, 比如 DataSourceTransactionManager 中就是 DataSourceTransactionObject 對象(存放的是 connect, savePoint, 是否是新事務(wù))
2. 若不存在 TransactionDefinition, 則創(chuàng)建一個默認(rèn)的 TransactionDefinition
3. 判斷當(dāng)前是否存在事務(wù)中
    3.1 如果當(dāng)前已經(jīng)存在事務(wù), 且當(dāng)前事務(wù)的傳播屬性設(shè)置為 PROPAGATION_NEVER, 那么拋出異常
    3.2 如果當(dāng)前事務(wù)的配置屬性是 PROPAGATION_NOT_SUPPORTED, 同時當(dāng)前線程已經(jīng)存在事務(wù)了, 那么將事務(wù)掛起, 并且封裝 TransactionStatus
    3.3 如果當(dāng)前事務(wù)的配置屬性是 PROPAGATION_REQUIRES_NEW, 創(chuàng)建新事務(wù), 同時將當(dāng)前線程存在的事務(wù)掛起, 與創(chuàng)建全新事務(wù)的過程類是, 區(qū)別在于在創(chuàng)建全新事務(wù)時不用考慮已有事務(wù)的掛起, 但在這里, 需要考慮已有事務(wù)的掛起
    3.4  嵌套事務(wù)的處理, 創(chuàng)建 TransactionStatus, 創(chuàng)建保存點
    3.5 對于那些沒有匹配的傳播級別, 默認(rèn)的封裝以下 TransactionStatus
4. 檢查事務(wù)屬性中 timeout 的設(shè)置是否合理  <-- 這里的 timeout 只在 DataSourceUtils.applyTransactionTimeout 中看到有對應(yīng)的檢查工作
5. 如果當(dāng)前線程不存在事務(wù), 但是 propagationBehavior 被設(shè)置為 PROPAGATION_MANDATORY 拋棄異常
6. PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NEXTED 都需要新建事務(wù)
7.  創(chuàng)建 TransactionStatus, 開始事務(wù), 準(zhǔn)備 TransactionSynchronous

與之對應(yīng)的代碼如下:

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    // 這個 doGetTransaction() 抽象函數(shù), transaction對象的獲取 由具體的事務(wù)處理器實現(xiàn), 比如 DataSourceTransactionManager
    // 這里是 DataSourceTransactionObject 對象(存放的是 connect, savePoint, 是否是新事務(wù))
    Object transaction = doGetTransaction();

    // Cache debug flag to avoid repeated checks.
    boolean debugEnabled = logger.isDebugEnabled();

    // 關(guān)于這個 DefaultTransactionDefinition, 在前面編程式使用事務(wù)處理的時候遇到過, 這個 DefaultTransactionDefinition 的默認(rèn)事務(wù)處理屬性是
    // propagationBehavior = PROPAGATION_REQUIRED; isolationLevel = ISOLATION_DEFAULT; timeout=TIMEOUT_DEFAULT; readlyOnly = false
    if (definition == null) {                       // 如果參數(shù) definition == null -> 則創(chuàng)建一個默認(rèn)的 TransactionDefinition <- 這其實就是一個事務(wù)屬性配置的對象
        // Use defaults if no transaction definition given.
        definition = new DefaultTransactionDefinition();
    }

    // 判斷當(dāng)前被線程是否存在事務(wù), 判斷依據(jù)為當(dāng)前線程記錄的連接不為空且連接中(connectionHolder) 中的 tranactionActive 屬性 = true
    // 如果已經(jīng)存在事務(wù), 那么需要要根據(jù)在事務(wù)屬性中定義的事務(wù)傳播屬性配置來處理事務(wù)
    if (isExistingTransaction(transaction)) {
        // 這里對當(dāng)前線程中已經(jīng)由事務(wù)存在的情況進(jìn)行處理, 所有的處理結(jié)果都封裝在 TransactionStatus 中
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }

    // 檢查事務(wù)屬性中 timeout 的設(shè)置是否合理  <-- 這里的 timeout 只在 DataSourceUtils.applyTransactionTimeout 中看到有對應(yīng)的檢查工作
    // Check definition settings for new transaction.
    if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    }

    // 如果當(dāng)前線程不存在事務(wù), 但是 propagationBehavior 被設(shè)置為 PROPAGATION_MANDATORY 拋棄異常
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NEXTED 都需要新建事務(wù)
    else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {

        // 空掛起 <- 這里掛起 (PS: 有同學(xué)可能疑問, 上面明明判斷了是否在事務(wù)中, 這里沒有必要掛起, 但有種情況 txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive() == false)
        SuspendedResourcesHolder suspendedResources = suspend(null); // <- 這里掛起其實主要還是 TransactionSynchronizationManager 中配置的資源
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
        }
        try {
            // 是否開啟一個新的 事務(wù)同步器
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 創(chuàng)建事務(wù)狀態(tài) DefaultTransactionStatus
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 構(gòu)造 transaction, 包括設(shè)置 ConnectionHolder, 隔離級別, timeout, 如果是新連接, 綁定到當(dāng)前線程
            doBegin(transaction, definition);
            // 新同步事務(wù)的設(shè)置, 針對當(dāng)前線程的設(shè)置
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException ex) {
            resume(null, suspendedResources);  // resume 掛起來的 資源
            throw ex;
        }
        catch (Error err) {
            resume(null, suspendedResources);  // resume 掛起來的 資源
            throw err;
        }
    }
    else {
        // 這里其實就是沒有開啟事務(wù), 相比上面的 if 中, 就少了 doBegin 函數(shù)的調(diào)用
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                    "isolation level will effectively be ignored: " + definition);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);    // 關(guān)鍵是這里第二個參數(shù)是 nul
    }
}

下面是處理已經(jīng)存在事務(wù)那部分:

private TransactionStatus handleExistingTransaction(
        TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {

    // 如果當(dāng)前已經(jīng)存在事務(wù), 且當(dāng)前事務(wù)的傳播屬性設(shè)置為 PROPAGATION_NEVER, 那么拋出異常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }

    // 如果當(dāng)前事務(wù)的配置屬性是 PROPAGATION_NOT_SUPPORTED, 同時當(dāng)前線程已經(jīng)存在事務(wù)了, 那么將事務(wù)掛起
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction");
        }
        // 將事務(wù)的掛起 <- 其實就是 TransactionSynchronizationManager 中的屬性
        Object suspendedResources = suspend(transaction);
        // 是否開啟一個新的事務(wù)同步器
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);

        // 意味著事務(wù)方法不需要放在事務(wù)環(huán)境中執(zhí)行, 同時掛起事務(wù)的信息保存在 TransactionStatus 中, 這里包括了, 進(jìn)程 ThreadLocal 對事務(wù)信息的記錄
        return prepareTransactionStatus( // 注意這里第二個參數(shù)是 null
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }

    // 如果當(dāng)前事務(wù)的配置屬性是 PROPAGATION_REQUIRES_NEW, 創(chuàng)建新事務(wù), 同時將當(dāng)前線程存在的事務(wù)掛起, 與創(chuàng)建全新事務(wù)的過程類是, 區(qū)別在于在創(chuàng)建全新事務(wù)時不用考慮已有事務(wù)的掛起, 但在這里, 需要考慮已有事務(wù)的掛起
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction, creating new transaction with name [" +
                    definition.getName() + "]");
        }
        // 將事務(wù)的掛起 <- 其實就是 TransactionSynchronizationManager 中的屬性
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            // 是否開啟一個新的事務(wù)同步器
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 掛起事務(wù)的信息記錄保存在 TransactionStatus 中,  這里包括ThreadLocal 對事務(wù)信息的記錄
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 構(gòu)造 transaction, 包括設(shè)置 ConnectionHolder, 隔離級別, timeout, 如果是新連接, 綁定到當(dāng)前線程
            doBegin(transaction, definition);
            // 新同步事務(wù)的設(shè)置, 針對當(dāng)前線程的設(shè)置
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException beginEx) { // 拋出異常的話, 直接恢復(fù) 剛才掛起的事務(wù)
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
        catch (Error beginErr) {
            resumeAfterBeginException(transaction, suspendedResources, beginErr);
            throw beginErr;
        }
    }

    // 嵌套事務(wù)的處理, 創(chuàng)建 TransactionStatus, 創(chuàng)建保存點
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {                        // 檢查是否允許嵌套事務(wù)
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (debugEnabled) {
            logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
        }
        if (useSavepointForNestedTransaction()) {
            // 如果沒有可以使用保存點的方式控制事務(wù)回滾, 那么在嵌套式事務(wù)的建立初始建立保存點
            // Create savepoint within existing Spring-managed transaction,
            // through the SavepointManager API implemented by TransactionStatus.
            // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
            // 在 Spring 管理的事務(wù)中, 創(chuàng)建事務(wù)保存點
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);   //  <-- 這里的兩個 false 分別表示 是否是新事務(wù), 新的事務(wù)同步器
            status.createAndHoldSavepoint();
            return status;
        }
        else {
            // Nested transaction through nested begin and commit/rollback calls.
            // Usually only for JTA: Spring synchronization might get activated here
            // in case of a pre-existing JTA transaction.
            // 有些情況是不能使用保存點操作, 比如 JTA, 那么就建立新事務(wù)
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 掛起事務(wù)的信息記錄保存在 TransactionStatus 中,  這里包括ThreadLocal 對事務(wù)信息的記錄
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, null);
            // 構(gòu)造 transaction, 包括設(shè)置 ConnectionHolder, 隔離級別, timeout, 如果是新連接, 綁定到當(dāng)前線程
            doBegin(transaction, definition);
            // 新同步事務(wù)的設(shè)置, 針對當(dāng)前線程的設(shè)置
            prepareSynchronization(status, definition);
            return status;
        }
    }

    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    if (debugEnabled) {
        logger.debug("Participating in existing transaction");
    }
    // 對已經(jīng)存在的事務(wù)的屬性進(jìn)行校驗
    if (isValidateExistingTransaction()) {
        // 隔離級別的校驗 <- 是否一致性 TransactionDefinition 與 TransactionSynchronizationManager 中的值
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] specifies isolation level which is incompatible with existing transaction: " +
                        (currentIsolationLevel != null ?
                                isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                "(unknown)"));
            }
        }
        // readOnly 的校驗 <- 是否一致性 TransactionDefinition 與 TransactionSynchronizationManager 中的值
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] is not marked as read-only but existing transaction is");
            }
        }
    }
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    //  返回 TransactionStatus 注意第三個參數(shù) false 表示 當(dāng)前事務(wù)沒有使用新事務(wù)
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
6. AbstractPlatformTransactionManager 提交事務(wù)方法

在提交事務(wù)時, 主要是放在 TransactionSynchronousManager 中回調(diào)函數(shù)的調(diào)用, 事務(wù)的提交, savePoint 的處理

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
        boolean beforeCompletionInvoked = false;
        try {
            // 事務(wù)提交的準(zhǔn)備工作由具體的事務(wù)處理器完成
            prepareForCommit(status);                   // 預(yù)留方法, 留給子類區(qū)擴充
            triggerBeforeCommit(status);                // 添加的 TransactionSynchronization 中的對應(yīng)方法 beforeCommit 的調(diào)用
            triggerBeforeCompletion(status);            // 添加的 TransactionSynchronization 中的對應(yīng)方法 beforeCompletion 的調(diào)用
            beforeCompletionInvoked = true;
            boolean globalRollbackOnly = false;         // 手動設(shè)置回回滾 <- 這里是 針對分布式事務(wù)
            if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                globalRollbackOnly = status.isGlobalRollbackOnly();
            }
            if (status.hasSavepoint()) {                // 若是嵌套事務(wù), 則直接釋放保存點
                if (status.isDebug()) {
                    logger.debug("Releasing transaction savepoint");
                }
                status.releaseHeldSavepoint();          // 如果存在保存點則清除保存點信息
            }
            /**
             * 下面對當(dāng)前線程中保存的事務(wù)狀態(tài)進(jìn)行處理, 如果當(dāng)前的事務(wù)是一個新事務(wù), 調(diào)用具體的事務(wù)處理器完成提交
             * 如果當(dāng)前所持有的事務(wù)不是新事務(wù)碉考, 則不提交, 由已有的事務(wù)來完成提交
             */
            else if (status.isNewTransaction()) {       // 若是新事務(wù), 則直接提交
                if (status.isDebug()) {
                    logger.debug("Initiating transaction commit");
                }
                doCommit(status);                       // 進(jìn)行事務(wù)的提交 <- 這是個抽象方法, 交由子類實現(xiàn)
            }
            // Throw UnexpectedRollbackException if we have a global rollback-only
            // marker but still didn't get a corresponding exception from commit.
            if (globalRollbackOnly) {                   // 分布式事務(wù)里面手動設(shè)置了 rollbackOnly 則直接拋出異常 <-- 這個現(xiàn)在很少使用了
                throw new UnexpectedRollbackException(
                        "Transaction silently rolled back because it has been marked as rollback-only");
            }
        }
        catch (UnexpectedRollbackException ex) {
            // can only be caused by doCommit
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
            throw ex;
        }
        catch (TransactionException ex) {
            // can only be caused by doCommit
            if (isRollbackOnCommitFailure()) {         // 在 commit 提交異常時, 是否需要進(jìn)行回滾操作
                doRollbackOnCommitException(status, ex);
            }
            else {                                     // 添加的 TransactionSynchronization 中的對應(yīng)方法 afterCompletion 的調(diào)用
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            }
            throw ex;
        }
        catch (RuntimeException ex) {                  // 提交的過程中, 若出現(xiàn) RuntimeException 則直接回滾
            if (!beforeCompletionInvoked) {
                triggerBeforeCompletion(status);
            }
            doRollbackOnCommitException(status, ex);
            throw ex;
        }
        catch (Error err) {
            if (!beforeCompletionInvoked) {
                triggerBeforeCompletion(status);      // 添加的 TransactionSynchronization 中的對應(yīng)方法 beforeCompletion 的調(diào)用
            }
            doRollbackOnCommitException(status, err); // 提交過程中出現(xiàn)異常則回滾
            throw err;
        }
        // Trigger afterCommit callbacks, with an exception thrown there
        // propagated to callers but the transaction still considered as committed.
        // 觸發(fā) AfterCommit 回滾
        try {
            triggerAfterCommit(status);               // 添加的 TransactionSynchronization 中的對應(yīng)方法 afterCommit 的調(diào)用
        }
        finally {                                     // 添加的 TransactionSynchronization 中的對應(yīng)方法 afterCompletion 的調(diào)用
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
        }
    }
    finally {                                          // commit 完成, 這里做一些清理數(shù)據(jù)的工作
        cleanupAfterCompletion(status);
    }
}

上面是事務(wù)提交的主流程, 流程總體清晰, 與之而來的出現(xiàn)了一個新的角色 TransactionSynchronizationManager

7. Spring Tx 組件 TransactionSynchronizationManager

在事務(wù)執(zhí)行的過程中, 需要保存很多變量值, 包括一些回調(diào)函數(shù) TransactionSynchronization

private static final ThreadLocal<Map<Object, Object>> resources =                    // key 是 dataSource, value 是 ConnectionHolder, 這里的 Map 是為了解決, 同一個線程操作多個 DataSource 而準(zhǔn)備de
        new NamedThreadLocal<Map<Object, Object>>("Transactional resources");

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = // 存儲 TransactionSynchronization <- 這里面存儲的都是回調(diào)函數(shù)
        new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");

private static final ThreadLocal<String> currentTransactionName =                    // 當(dāng)前事務(wù)的名稱
        new NamedThreadLocal<String>("Current transaction name");

private static final ThreadLocal<Boolean> currentTransactionReadOnly =               // 當(dāng)前事務(wù)是否是 readOnly
        new NamedThreadLocal<Boolean>("Current transaction read-only status");

private static final ThreadLocal<Integer> currentTransactionIsolationLevel =         // 當(dāng)前事務(wù)的隔離級別
        new NamedThreadLocal<Integer>("Current transaction isolation level");

private static final ThreadLocal<Boolean> actualTransactionActive =                  // 當(dāng)前線程是否已經(jīng)在事務(wù)中
        new NamedThreadLocal<Boolean>("Actual transaction active");

每次在事務(wù) suspend 或 resume 時, 其實操作的就是通過TransactionSynchronizationManager 將屬性放在 ThreadLocal 中

8. Spring Tx 組件 TransactionAttributeSource

TransactionAttributeSource 它是事務(wù)屬性獲取器(PS: 這里出現(xiàn)了 TransactionAttribute這個對象, 其實就是 TransactionDefinition 加上 一些其他屬性), 主要的 TransactionAttributeSource 有如下:

1. NameMatchTransactionAttributeSource:   通過將 Properties 里面的屬性轉(zhuǎn)化成 methodName <--> TransactionAttribute 的TransactionAttributeSource
2. MethodMapTransactionAttributeSource:   通過配置文件配置 className.methodName <--> TransactionAttribute 形式注入的 MethodMapTransactionAttributeSource
3. MatchAlwaysTransactionAttributeSource: 只要是用戶定義的方法就返回 true 的 TransactionAttributeSource
4. CompositeTransactionAttributeSource:   組合多個 TransactionAttributeSource, 只要其中有一個獲取 TransactionAttribute, 就 OK
5. AnnotationTransactionAttributeSource:  通過獲取方法上的注解信息來獲知 事務(wù)的屬性, 解析主要由 SpringTransactionAnnotationParser 來進(jìn)行

在上面幾個類中, AnnotationTransactionAttributeSource 是我們最長使用的 TransactionDefinition 的解析器, 它內(nèi)部其實蠻簡單的, 主要還是通過 SpringTransactionAnnotationParser 解析方法上注解 @Transactional 中的信息來獲得事務(wù)屬性

9. Spring Tx 組件 TransactionAspectSupport

這是事務(wù)支持的一個工具類, 其也是 TransactionInterceptor 的父類, 在這個類中定義了執(zhí)行事務(wù)的主邏輯 -> 方法 invokeWithinTransaction (PS: 其實就是aop 中的 aroundAdvice)

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
        throws Throwable {
    // If the transaction attribute is null, the method is non-transactional.
    // 這里讀取事務(wù)的屬性和設(shè)置, 通過 TransactionAttributeSource 對象取得
    final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
    // 獲取 beanFactory 中的 transactionManager
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    // 構(gòu)造方法唯一標(biāo)識(類, 方法, 如 service.UserServiceImpl.save)
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    /**
     * 這里區(qū)分不同類型的 PlatformTransactionManager 因為它們的調(diào)用方式不同
     * 對 CallbackPreferringPlatformTransactionManager 來說, 需要回調(diào)函數(shù)來
     * 實現(xiàn)事務(wù)的創(chuàng)建和提交
     * 對于非 CallbackPreferringPlatformTransactionManager 來說, 不需要通過
     * 回調(diào)函數(shù)來實現(xiàn)事務(wù)的創(chuàng)建和提交
     * 像 DataSourceTransactionManager 就不是 CallbackPreferringPlatformTransactionManager
     * 不需要通過回調(diào)的方式來使用
     */
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // Standard transaction demarcation with getTransaction and commit/rollback calls.
        // 這里創(chuàng)建事務(wù), 同時把創(chuàng)建事務(wù)過程中得到的信息放到 TransactionInfo 中去 (創(chuàng)建事務(wù)的起點)
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
        Object retVal = null;
        try {
            // This is an around advice: Invoke the next interceptor in the chain.
            // This will normally result in a target object being invoked.
            // 這里的調(diào)用使用處理沿著攔截器鏈進(jìn)行, 使最后目標(biāo)對象的方法得到調(diào)用
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
            // target invocation exception
            // 如果在事務(wù)處理方法調(diào)用中出現(xiàn)異常, 事務(wù)處理如何進(jìn)行需要根據(jù)具體的情況考慮回滾或者提交
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
            // 這里把與線程綁定的 TransactionInfo 設(shè)置為 oldTransactionInfo
            cleanupTransactionInfo(txInfo);
        }
        // 這里通過事務(wù)處理器來對事務(wù)進(jìn)行提交
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }
}

其實上面的主邏輯還是: 創(chuàng)建事務(wù), 執(zhí)行SQL, 提交/回滾事務(wù)

10. Spring Tx 組件 TransactionTemplate

TransactionTemplate 這個類是編程式事務(wù)常用的類, 當(dāng)使用 JdbcTemplate 時其實也是在使用這個 template 類 (PS: TransactionTemplate 繼承 DefaultTransactionDefinition, 其本身就是個TransactionDefinition), 其執(zhí)行邏輯也是獲取 事務(wù), 開始事務(wù), 執(zhí)行 SQL, 提交/回滾事務(wù) <-- 其內(nèi)部還是使用 PlatformTransactionManager 來完成的

11. 總結(jié)

Spring 事務(wù)其實只是做了一些上層的封裝, 其有以下幾個設(shè)計特點:

1. 通過 ThreadLocal 將事務(wù)執(zhí)行過程中的很多屬性(包括回調(diào)函數(shù))存儲其中
2. 針對 整個事務(wù)的執(zhí)行流程, Spring都封裝到 AbstractPlatformTransactionManager 中, 在其里面定義了事務(wù)獲取, 執(zhí)行, 提交的整個邏輯, 并且遺留下一些模版方法給子類實現(xiàn) <- 模版方法, 而其下的各個子類, 又是策略模式
3. 通過 Spring AOP 來實現(xiàn)通過在方法上增加注解實現(xiàn)事務(wù)的封裝
4. Spring 定義了程序中事務(wù)的傳播行為
12. 參考:

Spring Aop核心源碼分析
Spring技術(shù)內(nèi)幕
Spring 揭秘
Spring 源碼深度分析
開濤 Spring 雜談
傷神 Spring 源碼分析
Spring源碼情操陶冶

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末筏勒,一起剝皮案震驚了整個濱河市臼勉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌附井,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件炊昆,死亡現(xiàn)場離奇詭異芒篷,居然都是意外死亡,警方通過查閱死者的電腦和手機孟岛,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進(jìn)店門瓶竭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人渠羞,你說我怎么就攤上這事斤贰。” “怎么了次询?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵荧恍,是天一觀的道長。 經(jīng)常有香客問我屯吊,道長送巡,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任盒卸,我火速辦了婚禮骗爆,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蔽介。我一直安慰自己摘投,他們只是感情好煮寡,可當(dāng)我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著犀呼,像睡著了一般幸撕。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上圆凰,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天杈帐,我揣著相機與錄音,去河邊找鬼专钉。 笑死挑童,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的跃须。 我是一名探鬼主播站叼,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼菇民!你這毒婦竟也來了尽楔?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤第练,失蹤者是張志新(化名)和其女友劉穎阔馋,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體娇掏,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡呕寝,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了婴梧。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片下梢。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖塞蹭,靈堂內(nèi)的尸體忽然破棺而出孽江,到底是詐尸還是另有隱情,我是刑警寧澤番电,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布岗屏,位于F島的核電站,受9級特大地震影響漱办,放射性物質(zhì)發(fā)生泄漏担汤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一洼冻、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧隅很,春花似錦撞牢、人聲如沸率碾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽所宰。三九已至,卻和暖如春畜挥,著一層夾襖步出監(jiān)牢的瞬間仔粥,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工蟹但, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留躯泰,地道東北人。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓华糖,卻偏偏與公主長得像麦向,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子客叉,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 46,811評論 6 342
  • Spring 事務(wù)屬性分析 事務(wù)管理對于企業(yè)應(yīng)用而言至關(guān)重要诵竭。它保證了用戶的每一次操作都是可靠的,即便出現(xiàn)了異常的...
    壹點零閱讀 1,305評論 0 2
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理兼搏,服務(wù)發(fā)現(xiàn)卵慰,斷路器,智...
    卡卡羅2017閱讀 134,657評論 18 139
  • 很多人喜歡這篇文章佛呻,特此同步過來 由淺入深談?wù)搒pring事務(wù) 前言 這篇其實也要歸納到《常識》系列中裳朋,但這重點又...
    碼農(nóng)戲碼閱讀 4,736評論 2 59
  • 暫且喊我小妮吧,我有個毛病件相,說大不大再扭,你也可以說是怪癖。我每次寫文章夜矗,除了工作外的文稿泛范,我都喜歡先放出一張圖。這是...
    劉小妮同學(xué)閱讀 297評論 1 1