spring事務(wù)(一) 編程式事務(wù)

spring事務(wù)(一) 編程式事務(wù)

知識(shí)導(dǎo)讀

  • 本地事務(wù)(autocommit=false)-----編程式事務(wù)-----聲明式事務(wù)
  • spring對(duì)事務(wù)對(duì)象的封裝其實(shí)就是封裝了一個(gè)connection钝诚,開啟事務(wù)的本質(zhì)的將connection的autocommit=false,然后使用connection進(jìn)行數(shù)據(jù)庫操作寇僧,最后commit或者rollback
  • spring事務(wù)實(shí)現(xiàn)的關(guān)鍵是保證開啟事務(wù)的connection和執(zhí)行數(shù)據(jù)庫操作的connection是同一個(gè),所以兩者操作connection都要通過TransactionSynchronizationManager去獲取和存儲(chǔ)疑枯,TransactionSynchronizationManager會(huì)將connection保存到ThreadLocal中袍榆,保證同一線程中connection共享。
  • TransactionSynchronizationManager獲取connection時(shí)用的key是DataSource對(duì)象糊昙,所以要保證sql執(zhí)行和事務(wù)用的是同一個(gè)DataSource
  • spring事務(wù)的connection和配置信息都會(huì)存放到ThreadLocal中,在一個(gè)事務(wù)中開啟新的線程谢谦,會(huì)導(dǎo)致新線程中sql執(zhí)行喪失事務(wù)控制
  • spring中事務(wù)的隔離級(jí)別都是基于數(shù)據(jù)庫實(shí)現(xiàn)的释牺、timeout是在sql執(zhí)行的時(shí)候判斷萝衩,所以在事務(wù)方法中的timeout配置只會(huì)作用在最后一個(gè)sql語句處,余下的方法邏輯無法控制
  • 事務(wù)的掛起本質(zhì)就是將開啟事務(wù)的connection從ThreadLocal移除没咙,然后重新獲取connection
  • 事務(wù)的恢復(fù)本質(zhì)就是將掛起事務(wù)的connection重新放入ThreadLocal中
  • spring的傳播行為定義了在spring中多個(gè)事務(wù)如何共存猩谊,每次執(zhí)行無論傳播行為是什么都會(huì)返回一個(gè)新的事務(wù)狀態(tài)TransactionStatus對(duì)象,不同的傳播行為會(huì)導(dǎo)致當(dāng)前運(yùn)行在不同的事務(wù)上下文中祭刚,事務(wù)運(yùn)行在不同的上下文牌捷,在事務(wù)提交和回滾的時(shí)候操作不同,主要有以下4中情況
    • 掛起事務(wù)然后無事務(wù)運(yùn)行(newTransaction=false): 提交和回滾都無需操作
    • 新開啟事務(wù)(newTransaction=true):提交commit 回滾 rollback
    • 在當(dāng)前事務(wù)中運(yùn)行(newTransaction=false):提交不進(jìn)行任何操作涡驮,回滾標(biāo)記rollbacOnly=true然后不進(jìn)行操作
    • 嵌套事務(wù)暗甥,創(chuàng)建保存點(diǎn)(hasSavePoint&newTransaction=false):提交刪除保存點(diǎn),回滾則回滾到保存點(diǎn)
  • 可以通過TransactionSynchronizationManager的registerSynchronization方法添加監(jiān)聽事務(wù)提交前捉捅、提交后撤防、回滾前、回滾后棒口、完成前寄月、完成后的事件。

spring事務(wù)管理

事務(wù)管理器

spring中 PlatformTransactionManager 接口是spring事務(wù)管理器的根接口陌凳,定義了事務(wù)的操作行為:開啟事務(wù)、提交事務(wù)内舟、回滾事務(wù)

image
public interface PlatformTransactionManager {
   //開啟事務(wù)合敦,返回事務(wù)的封裝對(duì)象 TransactionStatus
   TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
         throws TransactionException;
   //提交該事務(wù)
      //如果事務(wù)標(biāo)記回滾rollbackOnly=true,則執(zhí)行回滾验游。
      //如果事務(wù)是一個(gè)新事務(wù)充岛,則調(diào)用connection.commit提交事務(wù)
      //如果事務(wù)包含 保存點(diǎn),則刪除保存點(diǎn)
      //如果事務(wù)不是一個(gè)新事務(wù)耕蝉,運(yùn)行在外層事務(wù)中則忽略提交崔梗,以便適當(dāng)?shù)貐⑴c周圍的事務(wù)。
      //如果前一個(gè)事務(wù)已經(jīng)掛起垒在,那么在提交新事務(wù)之后恢復(fù)前一個(gè)事務(wù)蒜魄。
   void commit(TransactionStatus status) throws TransactionException;
   //執(zhí)行給定事務(wù)的回滾。
       //如果事務(wù)包含 保存點(diǎn)场躯,則回滾到保存點(diǎn)
       //如果事務(wù)是新事務(wù)谈为,調(diào)用connection.rollback回滾事務(wù)
       //如果事務(wù)不是新事務(wù),運(yùn)行在外層事務(wù)則設(shè)置rollbackOnly=true
       //如果前一個(gè)事務(wù)已經(jīng)掛起踢关,那么在回滾新事務(wù)之后恢復(fù)前一個(gè)事務(wù)伞鲫。
   void rollback(TransactionStatus status) throws TransactionException;
}

spring事務(wù)管理的整體實(shí)現(xiàn)由抽象類 AbstractPlatformTransactionManager 實(shí)現(xiàn)。在下面的源碼分析中會(huì)看到签舞。

開啟事務(wù)的參數(shù)是一個(gè)TransactionDefinition對(duì)象秕脓,該類封裝了事務(wù)的配置屬性柒瓣,包括傳播行為、超時(shí)配置吠架、回滾異常芙贫、隔離級(jí)別等信息。事務(wù)開啟后返回TransactionStatus對(duì)象诵肛,提交和回滾都是操作該對(duì)象

子類如DataSourceTransactionManager主要負(fù)責(zé)實(shí)現(xiàn)具體事務(wù)對(duì)象(connection)的管理屹培,如獲取、保存同步怔檩、開啟事務(wù)褪秀、提交、回滾等操作薛训。

事務(wù)狀態(tài)

spring中TransactionStatus類用于封裝事務(wù)的狀態(tài)媒吗。spring中事務(wù)的開啟、提交乙埃、回滾都是通過操作該對(duì)象實(shí)現(xiàn)

  • completed:標(biāo)識(shí)事務(wù)是否已完成結(jié)束
  • rollbackOnly:標(biāo)識(shí)事務(wù)需要回滾闸英,在非獨(dú)立新開啟事務(wù)中用于回滾標(biāo)記,在事務(wù)提交時(shí)的時(shí)候判斷該值進(jìn)行回滾
  • newTransaction :標(biāo)識(shí)事務(wù)是否是一個(gè)獨(dú)立新開啟的事務(wù)或者運(yùn)行在外層事務(wù)或者是否是嵌套事務(wù)
image

spring中使用TransactionStatus的實(shí)現(xiàn)類DefaultTransactionStatus介袜,在這個(gè)類中封裝了兩個(gè)最重要的屬性值

  • transaction: 事務(wù)對(duì)象甫何,一般返回的是一個(gè)包裝了connection的對(duì)象
  • suspendedResources: 掛起對(duì)的事務(wù)對(duì)象,包含被掛起的connection對(duì)象和被掛起的事務(wù)配置
  • 聲明式事務(wù)的配置信息

編程式開啟事務(wù)

spring中定義了一個(gè)TransactionTemplate類遇伞,用于編程式事務(wù)執(zhí)行數(shù)據(jù)庫操作

TransactionTemplate的execute方法中辙喂,通過transactionManager開啟一個(gè)事務(wù)并返回TransactionStatus,然后通過TransactionCallback執(zhí)行實(shí)際的數(shù)據(jù)庫操作鸠珠,執(zhí)行完畢后調(diào)用transactionManager提交事務(wù)巍耗,如果發(fā)生異常則調(diào)用transactionManager回滾事務(wù)

public <T> T execute(TransactionCallback<T> action) throws TransactionException {
   Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
   if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
      return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
   }else {
     //開啟事務(wù),并返回封裝事務(wù)的TransactionStatus對(duì)象
      TransactionStatus status = this.transactionManager.getTransaction(this);
      T result;
      try {
        //執(zhí)行sql操作
         result = action.doInTransaction(status);
      }
      catch (RuntimeException | Error ex) {
         //sql操作異常進(jìn)行事務(wù)回滾
         rollbackOnException(status, ex);
         throw ex;
      }
      catch (Throwable ex) {
        //sql操作異常進(jìn)行事務(wù)回滾
         rollbackOnException(status, ex);
         throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
      }
     //sql執(zhí)行成功 提交事務(wù)
      this.transactionManager.commit(status);
      return result;
   }
}

開啟事務(wù)

AbstractPlatformTransactionManager實(shí)現(xiàn)了事務(wù)管理器的getTransaction渐排,在該方法中完成了事務(wù)的開啟和事務(wù)傳播行為的處理炬太。

  1. 通過子類實(shí)現(xiàn)的doGetTransaction獲取一個(gè)transaction事務(wù)對(duì)象,類型是Object的驯耻,可想而知兼容不好處理啊亲族,但是在這里也做了取巧動(dòng)作,雖然返回的是Object類型,但是在父類中不會(huì)對(duì)該事務(wù)對(duì)象進(jìn)行任何操作處理,所有關(guān)于事務(wù)對(duì)象的操作都交由子類覆寫實(shí)現(xiàn)舅逸。比如isExistingTransaction(判斷是否已存在事務(wù))悟泵、doBegin(開啟事務(wù))。我們先不考慮子類的具體實(shí)現(xiàn),首先看事務(wù)處理的整體邏輯

  2. 獲取到事務(wù)對(duì)象后,通過子類覆寫邏輯判斷如果已經(jīng)開啟事務(wù)(是否有個(gè)autoCommit=false的connection)誉结,則根據(jù)事務(wù)的傳播行為判斷多事務(wù)共存的情形炼鞠,然后返回TransactionStatus缘滥。

  3. 如果當(dāng)前還未開啟事務(wù),則根據(jù)事務(wù)傳播行為判斷是否需要開啟事務(wù)谒主,如果需要?jiǎng)t調(diào)用子類覆寫邏輯 doBegin方法朝扼,將事務(wù)對(duì)象transaction傳遞下去開啟事務(wù),然后封裝好TransactionStatus對(duì)象返回

@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
  //獲取Transaction對(duì)象霎肯,由具體子類實(shí)現(xiàn)
   Object transaction = doGetTransaction();
  //如果事務(wù)配置為空擎颖,創(chuàng)建一個(gè)默認(rèn)實(shí)現(xiàn)
   if (definition == null) {
      definition = new DefaultTransactionDefinition();
   }
    //在這里要處理多事務(wù)共存的情況,根據(jù)不同的傳播行為來實(shí)現(xiàn)不同的業(yè)務(wù)處理
   if (isExistingTransaction(transaction)) {
      return handleExistingTransaction(definition, transaction, debugEnabled);
   }
   //確定不存在事務(wù)观游,要開啟一個(gè)新事務(wù)搂捧,根據(jù)不同的事務(wù)傳播行為進(jìn)行處理
   if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
      throw new IllegalTransactionStateException(
            "No existing transaction found for transaction marked with propagation 'mandatory'");
   }//根據(jù)事務(wù)傳播行為配置,開啟一個(gè)新事務(wù)
  else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    //暫停一個(gè)空事務(wù)
      SuspendedResourcesHolder suspendedResources = suspend(null);
      try {
         boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        //構(gòu)建一個(gè)DefaultTransactionStatus對(duì)象懂缕,用于封裝事務(wù)信息
        //新開啟的事務(wù) newtransaction = true
         DefaultTransactionStatus status = newTransactionStatus(
               definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
        //調(diào)用子類 開啟事務(wù)允跑,其實(shí)就是創(chuàng)建一個(gè)connection,并設(shè)置autoCommit = false
         doBegin(transaction, definition);
         prepareSynchronization(status, definition);
         return status;
      } catch (RuntimeException | Error ex) {
        //恢復(fù)被掛起的事務(wù)
         resume(null, suspendedResources);
         throw ex;
      }
   } else {//無需事務(wù)運(yùn)行搪柑,在這里構(gòu)建一個(gè)沒有事務(wù)的上線文
      boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
      return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
   }
}

AbstractPlatformTransactionManager.handleExistingTransaction方法處理當(dāng)前已存在事務(wù)的情況聋丝。根據(jù)TransactionDefinition中配置的傳播行為來處理多事務(wù)共存情況。最終封裝返回事務(wù)狀態(tài)對(duì)象TransactionStatus工碾。具體處理看代碼注釋弱睦,

注意到 newTransaction 屬性的配置,該配置會(huì)直接影響事務(wù)的提交和回滾操作處理

private TransactionStatus handleExistingTransaction(
      TransactionDefinition definition, Object transaction, boolean debugEnabled)
      throws TransactionException {
   //以非事務(wù)方式執(zhí)行渊额,如果當(dāng)前存在事務(wù)况木,則拋出異常
   if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
      throw new IllegalTransactionStateException(
            "Existing transaction found for transaction marked with propagation 'never'");
   }
   //以非事務(wù)方式執(zhí)行操作,如果當(dāng)前存在事務(wù)端圈,就把當(dāng)前事務(wù)掛起焦读。 
   if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {     //掛起當(dāng)前事務(wù)
      Object suspendedResources = suspend(transaction);
      boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
     //構(gòu)建一個(gè)無事務(wù)的 TransactionStatus 返回 newtransaction = false
      return prepareTransactionStatus(
            definition, null, false, newSynchronization, debugEnabled, suspendedResources);
   }
   //新建事務(wù)子库,如果當(dāng)前存在事務(wù)舱权,把當(dāng)前事務(wù)掛起,然后新開啟一個(gè)事務(wù)返回事務(wù)狀態(tài) 
   if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
     //掛起當(dāng)前事務(wù)仑嗅,并獲取被掛起事務(wù)的信息宴倍,保存在DefaultTransactionStatus中,用于恢復(fù)時(shí)用
     //掛起的時(shí)候會(huì)清除掉transaction中的connection仓技,在doBegin的時(shí)候會(huì)重新獲取一個(gè)connection
      SuspendedResourcesHolder suspendedResources = suspend(transaction);
      try {
         boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        //新開啟的事務(wù) newtransaction = true
         DefaultTransactionStatus status = newTransactionStatus(
               definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
        //重新開啟一個(gè)事務(wù)鸵贬,在doBegin里面會(huì)重新獲取一個(gè)新的connection,開啟事務(wù)
         doBegin(transaction, definition);
         prepareSynchronization(status, definition);
         return status;
      } catch (RuntimeException | Error beginEx) {
        //開啟新事務(wù)異常脖捻,重新恢復(fù)掛起的事務(wù)
         resumeAfterBeginException(transaction, suspendedResources, beginEx);
         throw beginEx;
      }
   }
   //如果當(dāng)前存在事務(wù)阔逼,則在嵌套事務(wù)內(nèi)執(zhí)行。如果當(dāng)前沒有事務(wù)地沮,則進(jìn)行與PROPAGATION_REQUIRED類似的操作嗜浮。 
   if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      if (!isNestedTransactionAllowed()) {
         throw new NestedTransactionNotSupportedException(
               "Transaction manager does not allow nested transactions by default - " +
               "specify 'nestedTransactionAllowed' property with value 'true'");
      }
      //使用保存點(diǎn)實(shí)現(xiàn)嵌套事務(wù)
      if (useSavepointForNestedTransaction()) {
        //嵌套事務(wù)羡亩,不是一個(gè)新開啟的事務(wù), newtransaction = false
         DefaultTransactionStatus status =
           prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
        //當(dāng)前事務(wù)創(chuàng)建一個(gè) 保存點(diǎn) 
         status.createAndHoldSavepoint();
         return status;
      } else {
         boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
         DefaultTransactionStatus status = newTransactionStatus(
               definition, transaction, true, newSynchronization, debugEnabled, null);
         doBegin(transaction, definition);
         prepareSynchronization(status, definition);
         return status;
      }
   }
  //PROPAGATION_REQUIRED -- 支持當(dāng)前事務(wù)危融,如果當(dāng)前沒有事務(wù)畏铆,就新建一個(gè)事務(wù)。這是最常見的選擇吉殃。 當(dāng)前已有事務(wù)
  //PROPAGATION_SUPPORTS -- 支持當(dāng)前事務(wù)辞居,如果當(dāng)前沒有事務(wù),就以非事務(wù)方式執(zhí)行蛋勺。當(dāng)前已有事務(wù)
  //這里先判斷下方法配置的和當(dāng)前事務(wù)配置的事務(wù)隔離級(jí)別瓦灶、只讀屬性是否一致,如果不一致拋異常
   if (isValidateExistingTransaction()) {
      if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
         Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
         if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
            Constants isoConstants = DefaultTransactionDefinition.constants;
            throw new IllegalTransactionStateException("Participating transaction with definition [" +
                  definition + "] specifies isolation level which is incompatible with existing transaction: " +
                  (currentIsolationLevel != null ?
                        isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                        "(unknown)"));
         }
      }
      if (!definition.isReadOnly()) {
         if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
            throw new IllegalTransactionStateException("Participating transaction with definition [" +
                  definition + "] is not marked as read-only but existing transaction is");
         }
      }
   }
  //在當(dāng)前事務(wù)中運(yùn)行
   boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  //當(dāng)前事務(wù)中運(yùn)行迫卢,不是一個(gè)新開啟的事務(wù)倚搬, newtransaction = false
   return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

prepareTransactionStatus方法中返回了 DefaultTransactionStatus 對(duì)象。同時(shí)通過TransactionSynchronizationManager類將當(dāng)前事務(wù)狀態(tài)配置到ThreadLocal中乾蛤。

protected final DefaultTransactionStatus prepareTransactionStatus(
            TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
            boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
        DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
        prepareSynchronization(status, definition);
        return status;
    }
protected DefaultTransactionStatus newTransactionStatus(
            TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
            boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
        boolean actualNewSynchronization = newSynchronization &&
                !TransactionSynchronizationManager.isSynchronizationActive();
        return new DefaultTransactionStatus(
                transaction, newTransaction, actualNewSynchronization,
                definition.isReadOnly(), debug, suspendedResources);
    }
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
        if (status.isNewSynchronization()) {
            TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                    definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                            definition.getIsolationLevel() : null);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
            TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
            TransactionSynchronizationManager.initSynchronization();
        }
    }

分析完開啟事務(wù)的返回結(jié)果每界,接下來看下當(dāng)需要掛起當(dāng)前事務(wù)的時(shí)候,是如何掛起的家卖。AbstractPlatformTransactionManager.suspend方法用于掛起事務(wù)眨层,在該方法中會(huì)調(diào)用子類掛起事務(wù),同時(shí)返回被掛起事務(wù)對(duì)象及其配置信息返回上荡,當(dāng)事務(wù)恢復(fù)的時(shí)候需要依賴這些配置信息用于恢復(fù)趴樱。

protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
   if (TransactionSynchronizationManager.isSynchronizationActive()) {
      List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
      try {
         Object suspendedResources = null;
         if (transaction != null) {
           //子類覆寫實(shí)現(xiàn) 事務(wù)掛起,同時(shí)返回被掛起事務(wù)的信息
           //其實(shí)就是將transaction中的connection移除并封裝到suspendedResources中返回
            suspendedResources = doSuspend(transaction);
         }
        //獲取被掛起事務(wù)存儲(chǔ)在ThreadLocal中的配置酪捡,封裝被掛起事務(wù)的配置信息返回
         String name = TransactionSynchronizationManager.getCurrentTransactionName();
         TransactionSynchronizationManager.setCurrentTransactionName(null);
         boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
         TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
         Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
         TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
         boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
         TransactionSynchronizationManager.setActualTransactionActive(false);
         return new SuspendedResourcesHolder(
               suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
      } catch (RuntimeException | Error ex) {
         // doSuspend failed - original transaction is still active...
         doResumeSynchronization(suspendedSynchronizations);
         throw ex;
      }
   } else if (transaction != null) {
      // 掛起事務(wù)
      Object suspendedResources = doSuspend(transaction);
      return new SuspendedResourcesHolder(suspendedResources);
   } else {
      //當(dāng)前不存在事務(wù)叁征,所以無需掛起
      return null;
   }
}

當(dāng)事務(wù)開啟異常時(shí),需要恢復(fù)被掛起的事務(wù)逛薇,AbstractPlatformTransactionManager.resume用于恢復(fù)事務(wù)捺疼,將TransactionStatus中的SuspendedResources(掛起事務(wù)中的資源)重新放回到ThreadLocal中,該邏輯通過子類doResume方法實(shí)現(xiàn)永罚,然后將其事務(wù)狀態(tài)配置重新設(shè)置到ThreadLocal中啤呼。

protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)  throws TransactionException {

   if (resourcesHolder != null) {
      Object suspendedResources = resourcesHolder.suspendedResources;
      if (suspendedResources != null) {
        //子類覆寫實(shí)現(xiàn),其實(shí)就是將suspendedResources中的connection放到ThreadLocal中
         doResume(transaction, suspendedResources);
      }
      List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
     //恢復(fù)ThreadLocal中被掛起事務(wù)的配置
      if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);       TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
         TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
         doResumeSynchronization(suspendedSynchronizations);
      }
   }
}

分析完了AbstractPlatformTransactionManager中開啟事務(wù)的整體執(zhí)行流程呢袱,然后看下子類中 doGetTransaction(獲取事務(wù)對(duì)象)官扣、isExistingTransaction(是否已開啟事務(wù))、deBegin(開啟事務(wù))羞福、doSuspend(掛起事務(wù))惕蹄、doResume(恢復(fù)事務(wù))的具體實(shí)現(xiàn),接下來以最常用的DataSourceTransactionManager舉例

DataSourceTransactionManager覆寫了AbstractPlatformTransactionManager的方法,提供了事務(wù)對(duì)象的具體操作卖陵,同時(shí)封裝了一個(gè)DataSource恋昼,用于獲取connection。

獲取事務(wù)對(duì)象

DataSourceTransactionManager返回的事務(wù)對(duì)象是DataSourceTransactionObject赶促,里面主要就是封裝了一個(gè)ConnectionHolder對(duì)象液肌,ConnectionHolder主要用于存儲(chǔ)一個(gè)數(shù)據(jù)庫連接Connection對(duì)象。

注意這里不是直接通過DataSource去獲取Connection的鸥滨,而是通過TransactionSynchronizationManager去ThreadLocal獲取嗦哆。如果能獲取到證明可能已存在事務(wù),返回connection; 如果獲取不到Connection,就是返回一個(gè)null婿滓。

在spring中sql的執(zhí)行也是會(huì)先通過TransactionSynchronizationManager去ThreadLocal中獲取Connection老速,這樣就實(shí)現(xiàn)了開啟事務(wù)和sql執(zhí)行用的是一個(gè)Connection

@Override
protected Object doGetTransaction() {
   DataSourceTransactionObject txObject = new DataSourceTransactionObject();
   txObject.setSavepointAllowed(isNestedTransactionAllowed());
  
   ConnectionHolder conHolder =
         (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
  //這里有可能返回conHolder值,所以有可能不是新建的凸主,直接設(shè)置newConnectionHolder=false橘券,dobegin需要新獲取connection的時(shí)候會(huì)重新設(shè)置為true 
  txObject.setConnectionHolder(conHolder, false);
   return txObject;
}

TransactionSynchronizationManager用于獲取ThreadLocal中設(shè)置的數(shù)據(jù)庫連接信息,key是當(dāng)前數(shù)據(jù)源對(duì)象

@Nullable
public static Object getResource(Object key) {
   Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
   Object value = doGetResource(actualKey);
   return value;
}

從ThreadLocal獲取Connection信息

private static Object doGetResource(Object actualKey) {
   Map<Object, Object> map = resources.get();
   if (map == null) {
      return null;
   }
   Object value = map.get(actualKey);
   // Transparently remove ResourceHolder that was marked as void...
   if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
      map.remove(actualKey);
      // Remove entire ThreadLocal if empty...
      if (map.isEmpty()) {
         resources.remove();
      }
      value = null;
   }
   return value;
}

判斷是否已在一個(gè)事務(wù)中

TransactionSynchronizationManager實(shí)現(xiàn)了isExistingTransaction方法卿吐。接收Object類型的transaction對(duì)象旁舰。因?yàn)檫@個(gè)對(duì)象就是該實(shí)現(xiàn)類自己創(chuàng)建的,所以可以直接強(qiáng)轉(zhuǎn)為DataSourceTransactionObject類型嗡官,事務(wù)開啟的條件為txObject中存在一個(gè)TransactionActive = true 的ConnectionHolder

注意:當(dāng)事務(wù)開啟成功后(setAutoCommit=false)會(huì)設(shè)置 transactionActive = true

@Override
protected boolean isExistingTransaction(Object transaction) {
   DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
   return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}

開啟事務(wù)

doBegin方法中會(huì)從數(shù)據(jù)源中獲取一個(gè)connection連接箭窜,然后設(shè)置autoCommit=false⊙苄龋可以看到事務(wù)開啟的本質(zhì)就是返回一個(gè)autoCommit=false的connection磺樱。

事務(wù)開啟成功之后,設(shè)置事務(wù)開啟標(biāo)識(shí) setTransactionActive = true

如果是新獲取的connection婆咸,證明是一個(gè)新事務(wù)竹捉,則調(diào)用TransactionSynchronizationManager.bindResource將connectionHolder設(shè)置到ThreadLocal當(dāng)中

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
   DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
   Connection con = null;
   try {
     //開啟事務(wù)前獲取事務(wù)連接 connection,如果事務(wù)對(duì)象中的有了connection就不需要獲取了尚骄,如果沒有去DataSource中獲取一個(gè)新的connection块差,構(gòu)建一個(gè)新的ConnectionHolder
      if (!txObject.hasConnectionHolder() ||
            txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
        //從數(shù)據(jù)源中獲取connection
         Connection newCon = obtainDataSource().getConnection();
        //放到事務(wù)對(duì)象的connectionHolder中,重新獲取的connection,設(shè)置newConnectionHolder=true
         txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
      }
      txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
      con = txObject.getConnectionHolder().getConnection();
      //將事務(wù)配置設(shè)定到connection上乖仇,同時(shí)記錄下 connection 原始的配置憾儒,事務(wù)完成之后connection狀態(tài)重置回去
      Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
      txObject.setPreviousIsolationLevel(previousIsolationLevel);
      //關(guān)閉自動(dòng)提交询兴,開啟事務(wù)
      if (con.getAutoCommit()) {
         txObject.setMustRestoreAutoCommit(true);
         con.setAutoCommit(false);
      }
      prepareTransactionalConnection(con, definition);
     //事務(wù)開啟成功乃沙,設(shè)置事務(wù)對(duì)象中的TransactionActive=true,用于快速判斷是否開啟事務(wù)
      txObject.getConnectionHolder().setTransactionActive(true);

      int timeout = determineTimeout(definition);
     //配置事務(wù)超時(shí)時(shí)間
      if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
         txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
      }
      // 將連接信息綁定到ThreadLocal中诗舰,sql執(zhí)行的時(shí)候和事務(wù)傳播行為都會(huì)去ThreadLocal獲取連接
      if (txObject.isNewConnectionHolder()) {
         TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
      }
   } catch (Throwable ex) {
     //異常則關(guān)閉 connection 連接警儒,重置 txObject
      if (txObject.isNewConnectionHolder()) {
         DataSourceUtils.releaseConnection(con, obtainDataSource());
         txObject.setConnectionHolder(null, false);
      }
      throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
   }
}

新獲取的connection需要通過TransactionSynchronizationManager將開啟事務(wù)的connection放到ThreadLocal中。保證事務(wù)過程中用的connection能夠被執(zhí)行sql的代碼拿到

public static void bindResource(Object key, Object value) throws IllegalStateException {
   Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
   Assert.notNull(value, "Value must not be null");
   Map<Object, Object> map = resources.get();
   // set ThreadLocal Map if none found
   if (map == null) {
      map = new HashMap<>();
      resources.set(map);
   }
   Object oldValue = map.put(actualKey, value);
   // Transparently suppress a ResourceHolder that was marked as void...
   if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
      oldValue = null;
   }
   if (oldValue != null) {
      throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
            actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
   }
}

掛起事務(wù)

操作的還是DataSourceTransactionObject對(duì)象。事務(wù)掛起的本質(zhì)就是移除并返回事務(wù)對(duì)象transaction中原有的connection連接蜀铲。然后再調(diào)用TransactionSynchronizationManager移除ThreadLocal中的connection边琉。

感覺spring這種代碼還是帶點(diǎn)坑的,修改參數(shù)對(duì)象中的值记劝,出問題不好定位变姨,最好的處理是返回一個(gè)操作完的新的對(duì)象

@Override
protected Object doSuspend(Object transaction) {
   DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
   txObject.setConnectionHolder(null);//移除對(duì)象中的connection連接
  //移除ThreadLocal中的connection
   return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

通過TransactionSynchronizationManager移除ThreadLocal中的connection

public static Object unbindResource(Object key) throws IllegalStateException {
   Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
   Object value = doUnbindResource(actualKey);
   return value;
}
private static Object doUnbindResource(Object actualKey) {
        Map<Object, Object> map = resources.get();
        if (map == null) {
            return null;
        }
  //移除ThreadLocal中connection 并返回
        Object value = map.remove(actualKey);
   //此處省略代碼。厌丑。定欧。
        return value;
}

恢復(fù)事務(wù)

恢復(fù)事務(wù)就是將掛起資源中的connection通過TransactionSynchronizationManager重新放到ThreadLocal中

感覺這里應(yīng)該同時(shí)將connection設(shè)置回transaction中啊,這樣才能和掛起事務(wù)對(duì)應(yīng)怒竿。

@Override
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
   TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}

提交事務(wù)

AbstractPlatformTransactionManager.commit定義了提交事務(wù)的流程砍鸠,在事務(wù)提交的時(shí)候首先要判斷下TransactionStatus的rollbackOnly狀態(tài),如果是true耕驰,則回滾事務(wù)爷辱。否則調(diào)用processCommit方法提交

如果當(dāng)前是運(yùn)行在外層事務(wù)中,發(fā)生異常朦肘,不方便直接回滾饭弓,則標(biāo)記rollbackOnly=true,當(dāng)外層執(zhí)行提交的時(shí)候再進(jìn)行回滾

public final void commit(TransactionStatus status) throws TransactionException {
  if (status.isCompleted()) {
            throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
        }
   DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
  //如果標(biāo)記了 localRollbackOnly=true 提交的時(shí)候直接回滾
  //TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
   if (defStatus.isLocalRollbackOnly()) {
      processRollback(defStatus, false);
      return;
   }
     //根據(jù)配置判斷提交的時(shí)候短路走回滾邏輯 
   if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
      processRollback(defStatus, true);
      return;
   }
   //提交事務(wù)
   processCommit(defStatus);
}

在processCommit方法會(huì)調(diào)用一些事務(wù)提交和完成前后的事件媒抠,根據(jù)TransactionStatus的狀態(tài)進(jìn)行不同的操作

  • 如果是嵌套事務(wù)示启,包含保存點(diǎn),則直接釋放保存點(diǎn)就可以领舰,提交由外層事務(wù)完成
  • 如果是一個(gè)新開啟的事務(wù)夫嗓,調(diào)用子類實(shí)現(xiàn)的 doCommit方法,其實(shí)就是執(zhí)行connection的commit方法
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
   try {
      boolean beforeCompletionInvoked = false;
      try {
         boolean unexpectedRollback = false;
         prepareForCommit(status);
         triggerBeforeCommit(status);// 觸發(fā)提交前事件
         triggerBeforeCompletion(status);//觸發(fā)完成前事件
         beforeCompletionInvoked = true;
        //事務(wù)提交的時(shí)候如果是嵌套事務(wù)冲秽,有保存點(diǎn)舍咖, 則釋放保存點(diǎn),無需提交事務(wù)锉桑,完成
         if (status.hasSavepoint()) {
            unexpectedRollback = status.isGlobalRollbackOnly();
            status.releaseHeldSavepoint();
         } else if (status.isNewTransaction()) {
            //調(diào)用子類實(shí)現(xiàn)的 doCommit 提交事務(wù)
            unexpectedRollback = status.isGlobalRollbackOnly();
            doCommit(status);
         } else if (isFailEarlyOnGlobalRollbackOnly()) {
            unexpectedRollback = status.isGlobalRollbackOnly();
         }
         //全局標(biāo)記了回滾排霉,回滾  RollbackOnly=true
         if (unexpectedRollback) {
            throw new UnexpectedRollbackException(
             "Transaction silently rolled back because it has been marked as rollback-only");
         }
      } catch (UnexpectedRollbackException ex) {
         // 觸發(fā)事務(wù)完成 事件
         triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
         throw ex;
      } catch (TransactionException ex) {
         // 異常后回滾
         if (isRollbackOnCommitFailure()) {
            doRollbackOnCommitException(status, ex);
         } else {
           //觸發(fā)事務(wù)完成 事件
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
         }
         throw ex;
      } catch (RuntimeException | Error ex) {
         if (!beforeCompletionInvoked) {
            triggerBeforeCompletion(status);
         }
        //異常回滾
         doRollbackOnCommitException(status, ex);
         throw ex;
      }
      //觸發(fā)事務(wù)提交完成事件和事務(wù)完成事件
      try {
         triggerAfterCommit(status);
      } finally {
         triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
      }
   } finally {
     //完成后清除事務(wù)信息民轴,清除當(dāng)前ThreadLocal中的信息攻柠,釋放connection,恢復(fù)掛起的事務(wù)
      cleanupAfterCompletion(status);
   }
}

子類DataSourceTransactionManager中實(shí)現(xiàn)了提交的具體操作后裸,本質(zhì)就是獲取事務(wù)對(duì)象中的connection瑰钮,然后調(diào)用jdbc的api commit提交。

@Override
protected void doCommit(DefaultTransactionStatus status) {
   DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();//獲取事務(wù)對(duì)象中的connection
   Connection con = txObject.getConnectionHolder().getConnection();
   try {//調(diào)用connection的commit方法提交事務(wù)
      con.commit();
   } catch (SQLException ex) {
      throw new TransactionSystemException("Could not commit JDBC transaction", ex);
   }
}

事務(wù)完成之后微驶,只要進(jìn)行資源的釋放浪谴、事務(wù)配置信息的清除和掛起事務(wù)的恢復(fù)开睡,所以在finnaly里面調(diào)用了cleanupAfterCompletion方法

  1. 清除ThreadLocal中事務(wù)狀態(tài)信息
  2. 如果是新開啟的時(shí)候,調(diào)用子類釋放connection資源苟耻,如果是運(yùn)行在其他事務(wù)中篇恒,無需操作
  3. 如果存在被掛起的事務(wù),恢復(fù)
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
   status.setCompleted();
   //如果已經(jīng)將事務(wù)狀態(tài)信息保存到ThreadLocal中凶杖,清除 ThreadLocal中的事務(wù)狀態(tài)配置
   if (status.isNewSynchronization()) {
      TransactionSynchronizationManager.clear();
   }
  //如果是新開啟的事務(wù)胁艰,調(diào)用子類釋放連接信息
   if (status.isNewTransaction()) {
      doCleanupAfterCompletion(status.getTransaction());
   }
   if (status.getSuspendedResources() != null) {
      Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
     //恢復(fù) 掛起的事務(wù)
      resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
   }
}

TransactionSynchronizationManager.clear 用于清除ThreadLocal中的的事務(wù)狀態(tài)配置信息

public static void clear() {
   synchronizations.remove();
   currentTransactionName.remove();
   currentTransactionReadOnly.remove();
   currentTransactionIsolationLevel.remove();
   actualTransactionActive.remove();
}

新開啟的事務(wù)需要調(diào)用子類釋放事務(wù)資源

  1. 移除ThreadLocal中connection
  2. 將 connection 的autoCommit設(shè)置為true
  3. 調(diào)用DataSource方法釋放connection
protected void doCleanupAfterCompletion(Object transaction) {
   DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
   //清除ThreadLocal中的 connection信息
   if (txObject.isNewConnectionHolder()) {
      TransactionSynchronizationManager.unbindResource(obtainDataSource());
   }
   //重置 autoCommit = true
   Connection con = txObject.getConnectionHolder().getConnection();
   try {
      if (txObject.isMustRestoreAutoCommit()) {
         con.setAutoCommit(true);
      }
      DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
   } catch (Throwable ex) {
      logger.debug("Could not reset JDBC Connection after transaction", ex);
   }
   //調(diào)用 DataSource 關(guān)閉 connection
   if (txObject.isNewConnectionHolder()) {
      DataSourceUtils.releaseConnection(con, this.dataSource);
   }

   txObject.getConnectionHolder().clear();
}

回滾事務(wù)

AbstractPlatformTransactionManager.commit定義了回滾事務(wù)的流程,調(diào)用processRollback進(jìn)行回滾事務(wù)

public final void rollback(TransactionStatus status) throws TransactionException {
   if (status.isCompleted()) {
      throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
   }
   DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
   processRollback(defStatus, false);
}

在processRollback方法會(huì)調(diào)用一些事務(wù)回滾和完成前后的事件智蝠,根據(jù) TransactionStatus 的狀態(tài)進(jìn)行不同的操作蝗茁,回滾操作完畢后,調(diào)用cleanupAfterCompletion清理事務(wù)信息

  • 如果有 保存點(diǎn) 寻咒,直接回滾到 保存點(diǎn)
  • 如果是一個(gè)新啟的事務(wù)哮翘,調(diào)用子類實(shí)現(xiàn)doRollback進(jìn)行回滾,其實(shí)就是調(diào)用connection.rollback.
  • 如果運(yùn)行在外層事務(wù)毛秘,直接標(biāo)記當(dāng)前事務(wù) rollbackOnly =true饭寺,當(dāng)外層事務(wù)提交的時(shí)候會(huì)進(jìn)行回滾
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
   try {
      boolean unexpectedRollback = unexpected;
      try {
         triggerBeforeCompletion(status);
                    //如果有保存點(diǎn),回滾到保存點(diǎn)
         if (status.hasSavepoint()) {
            status.rollbackToHeldSavepoint();
         }//如果是一個(gè)新的事務(wù)叫挟,調(diào)用子類實(shí)現(xiàn) 回滾事務(wù)
         else if (status.isNewTransaction()) {
            doRollback(status);
         } else {
            // Participating in larger transaction
            if (status.hasTransaction()) {
               if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                  doSetRollbackOnly(status);
               } else {
                  if (status.isDebug()) {
                     logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                  }
               }
            } else {//不存在事務(wù)艰匙,無需回滾
               logger.debug("Should roll back transaction but cannot - no transaction available");
            }
            // Unexpected rollback only matters here if we're asked to fail early
            if (!isFailEarlyOnGlobalRollbackOnly()) {
               unexpectedRollback = false;
            }
         }
      } catch (RuntimeException | Error ex) {
        //回滾異常, 觸發(fā)事務(wù)完成事件
         triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
         throw ex;
      }
      //回滾成功后觸發(fā)事務(wù)完成事件
      triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
      // Raise UnexpectedRollbackException if we had a global rollback-only marker
      if (unexpectedRollback) {
         throw new UnexpectedRollbackException(
               "Transaction rolled back because it has been marked as rollback-only");
      }
   } finally {
     //完成后清除事務(wù)信息,清除當(dāng)前ThreadLocal中的信息抹恳,釋放connection员凝,恢復(fù)掛起的事務(wù)
      cleanupAfterCompletion(status);
   }
}

子類DataSourceTransactionManager中實(shí)現(xiàn)了回滾的具體操作,本質(zhì)就是獲取事務(wù)對(duì)象中的connection奋献,然后調(diào)用jdbc的api rollback回滾健霹。

protected void doRollback(DefaultTransactionStatus status) {
   DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
   Connection con = txObject.getConnectionHolder().getConnection();
   try {//調(diào)用connection的rollback
      con.rollback();
   } catch (SQLException ex) {
      throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
   }
}

sql執(zhí)行與事務(wù)結(jié)合

事務(wù)與sql執(zhí)行用統(tǒng)一connection對(duì)象

來看JdbcTemplate的execute方法,所有的sql執(zhí)行都會(huì)通過該方法執(zhí)行,在該方法中第一步會(huì)通過jdbc配置的DataSource獲取connection瓶蚂,DataSourceUtils要保證能獲取到開啟事務(wù)的connection糖埋,從而實(shí)現(xiàn)事務(wù)控制。在這里就用到了TransactionSynchronizationManager去ThreadLocal中獲取開啟事務(wù)的connection窃这。

public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
      throws DataAccessException {
   //獲取 connection
   Connection con = DataSourceUtils.getConnection(obtainDataSource());
   PreparedStatement ps = null;
   try {
      ps = psc.createPreparedStatement(con);
     //這里會(huì)應(yīng)用到事務(wù)配置的超時(shí)時(shí)間 timeout
      applyStatementSettings(ps);
      T result = action.doInPreparedStatement(ps);
      handleWarnings(ps);
      return result;
   }

}

DataSourceUtils.getConnection用于獲取connection

public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
  return doGetConnection(dataSource);
}

在doGetConnection方法中會(huì)首先通過TransactionSynchronizationManager去ThreadLocal中獲取當(dāng)前數(shù)據(jù)源的connection瞳别,spring的事務(wù)也是將connection保存在TransactionSynchronizationManager的ThreadLocal中的,這樣就保證了事務(wù)管理和sql執(zhí)行用的是一個(gè)connection

要注意的是杭攻,獲取connection的key是DataSource對(duì)象祟敛,所以事務(wù)管理的DataSource和jdbc的DataSource一定要用一個(gè)對(duì)象,不然會(huì)導(dǎo)致事務(wù)失效

public static Connection doGetConnection(DataSource dataSource) throws SQLException {
   Assert.notNull(dataSource, "No DataSource specified");
  //也通過TransactionSynchronizationManager去ThreadLocal中獲取當(dāng)前數(shù)據(jù)源的connection
   ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
   if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
      conHolder.requested();
      if (!conHolder.hasConnection()) {
         logger.debug("Fetching resumed JDBC Connection from DataSource");
         conHolder.setConnection(fetchConnection(dataSource));
      }
      return conHolder.getConnection();
   }
   //ThreadLocal中沒有connection兆解,重新從DataSource中獲取馆铁,然后通過TransactionSynchronizationManager綁定到ThreadLocal中,與事務(wù)關(guān)聯(lián)起來
   Connection con = fetchConnection(dataSource);

   if (TransactionSynchronizationManager.isSynchronizationActive()) {
      try {
         ConnectionHolder holderToUse = conHolder;
         if (holderToUse == null) {
            holderToUse = new ConnectionHolder(con);
         }
         else {
            holderToUse.setConnection(con);
         }
         holderToUse.requested();
         TransactionSynchronizationManager.registerSynchronization(
               new ConnectionSynchronization(holderToUse, dataSource));
         holderToUse.setSynchronizedWithTransaction(true);
         if (holderToUse != conHolder) {
            TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
         }
      }
   }

   return con;
}

事務(wù)超時(shí)時(shí)間(timeout)應(yīng)用

在事務(wù)中會(huì)配置 timeout痪宰,用于控制事務(wù)的超時(shí)實(shí)現(xiàn)叼架。在事務(wù)管理器DataSourceTransactionManager中會(huì)將事務(wù)配置的timeout配置到事務(wù)對(duì)象的connection中,而這個(gè)connection會(huì)通過TransactionSynchronizationManager放到ThreadLocal中的

int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
  txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}

在JdbcTemplate的execute方法中衣撬,有一步applyStatementSettings乖订,就會(huì)去校驗(yàn)事務(wù)的timeout,如果超時(shí)則拋出異尘吡罚回滾事務(wù)

public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
      throws DataAccessException {
   //獲取 connection
   Connection con = DataSourceUtils.getConnection(obtainDataSource());
   PreparedStatement ps = null;
   try {
      ps = psc.createPreparedStatement(con);
     //這里會(huì)應(yīng)用到事務(wù)配置的超時(shí)時(shí)間 timeout
      applyStatementSettings(ps);
      T result = action.doInPreparedStatement(ps);
      handleWarnings(ps);
      return result;
   }

}

applyStatementSettings最后一步判斷事務(wù)超時(shí)時(shí)間

protected void applyStatementSettings(Statement stmt) throws SQLException {
   int fetchSize = getFetchSize();
   if (fetchSize != -1) {
      stmt.setFetchSize(fetchSize);
   }
   int maxRows = getMaxRows();
   if (maxRows != -1) {
      stmt.setMaxRows(maxRows);
   }
   DataSourceUtils.applyTimeout(stmt, getDataSource(), getQueryTimeout());
}

applyTimeout中會(huì)獲取ThreadLocal中的開啟事務(wù)的connection乍构,調(diào)用connectionHolder的getTimeToLiveInSeconds方法,如果已經(jīng)超時(shí)扛点,在該方法中會(huì)拋出異常哥遮。如果還未超時(shí),設(shè)置statement的超時(shí)時(shí)間為剩余時(shí)間陵究,從而實(shí)現(xiàn)了事務(wù)的超時(shí)時(shí)間配置

public static void applyTimeout(Statement stmt, @Nullable DataSource dataSource, int timeout) throws SQLException {
   Assert.notNull(stmt, "No Statement specified");
   ConnectionHolder holder = null;
  //通過TransactionSynchronizationManager獲取ThreadLocal中的 connectionHolder
   if (dataSource != null) {
      holder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
   }
  //獲取connectionHolder上配置的事務(wù)超時(shí)時(shí)間
   if (holder != null && holder.hasTimeout()) {
      stmt.setQueryTimeout(holder.getTimeToLiveInSeconds());
   }
   else if (timeout >= 0) {
      stmt.setQueryTimeout(timeout);
   }
}

ConnectionHolder.getTimeToLiveInSeconds方法眠饮,dealine是根據(jù)timeout計(jì)算出的事務(wù)到期時(shí)間,在getTimeToLiveInSeconds方法中如果已經(jīng)到達(dá)deadline铜邮,則調(diào)用checkTransactionTimeout進(jìn)行事務(wù)回滾或者標(biāo)記事務(wù)回滾仪召,從而實(shí)現(xiàn)事務(wù)的超時(shí)回滾

//設(shè)置timeout的時(shí)候 會(huì)計(jì)算出何時(shí)超時(shí) deadline
public void setTimeoutInMillis(long millis) {
  this.deadline = new Date(System.currentTimeMillis() + millis);
}

public int getTimeToLiveInSeconds() {
   double diff = ((double) getTimeToLiveInMillis()) / 1000;
   int secs = (int) Math.ceil(diff);
   checkTransactionTimeout(secs <= 0);
   return secs;
}

public long getTimeToLiveInMillis() throws TransactionTimedOutException{
        if (this.deadline == null) {
            throw new IllegalStateException("No timeout specified for this resource holder");
        }
        long timeToLive = this.deadline.getTime() - System.currentTimeMillis();
  //如果達(dá)到超時(shí)時(shí)間,調(diào)用checkTransactionTimeout拋出異常
        checkTransactionTimeout(timeToLive <= 0);
        return timeToLive;
    }

private void checkTransactionTimeout(boolean deadlineReached) throws TransactionTimedOutException {
        if (deadlineReached) {
            setRollbackOnly();
            throw new TransactionTimedOutException("Transaction timed out: deadline was " + this.deadline);
        }
    }

分析完timeout的實(shí)現(xiàn)松蒜,注意扔茅,事務(wù)的超時(shí)時(shí)通過sql的執(zhí)行去判斷從而進(jìn)行回滾的,但是一個(gè)spring事務(wù)中包含的是一個(gè)方法秸苗,在方法中不僅會(huì)執(zhí)行sql召娜,也會(huì)有其他語句

如果方法最后有個(gè)sql執(zhí)行,那么事務(wù)的超時(shí)時(shí)間配置能夠應(yīng)用

如果方法最后是個(gè)非sql執(zhí)行的語句惊楼,而且很耗時(shí)間玖瘸,會(huì)導(dǎo)致整個(gè)事務(wù)方法的執(zhí)行時(shí)間超過事務(wù)的timeout配置。

事務(wù)隔離級(jí)別

隔離級(jí)別 描述 場景
REQUIRED 必須運(yùn)行在事務(wù)中檀咙,當(dāng)前已存在事務(wù)時(shí)在事務(wù)中運(yùn)行;當(dāng)前不存在事務(wù)則開啟一個(gè)新事務(wù) 運(yùn)行在一個(gè)單層事務(wù)中店读,避免嵌套事務(wù)
SUPPORTS 可以運(yùn)行在事務(wù)中,如果當(dāng)前已存在事務(wù)時(shí)在事務(wù)中運(yùn)行;當(dāng)前不存在事務(wù)則直接無事務(wù)運(yùn)行 當(dāng)一個(gè)查詢單獨(dú)存在時(shí)無需運(yùn)行在事務(wù)中攀芯;當(dāng)先執(zhí)行了帶事務(wù)的更新屯断,再調(diào)用查詢,則查詢就需要運(yùn)行在事務(wù)中侣诺,在不同的隔離級(jí)別下很有用
MANDATORY 必須運(yùn)行在事務(wù)中殖演,如果當(dāng)前已存在事務(wù)時(shí)在事務(wù)中運(yùn)行;當(dāng)前不存在事務(wù)時(shí)拋出異常 當(dāng)一個(gè)方法需要事務(wù)環(huán)境,但不負(fù)責(zé)事務(wù)的開啟年鸳、提交趴久、回滾
REQUIRES_NEW 必須運(yùn)行在事務(wù)中,如果當(dāng)前已存在事務(wù)時(shí)則掛起當(dāng)前事務(wù)搔确,開啟新事務(wù)運(yùn)行結(jié)束后恢復(fù)被掛起事務(wù);當(dāng)前不存在事務(wù)則開啟一個(gè)新事務(wù)運(yùn)行 內(nèi)層事務(wù)的回滾和提交和外層事務(wù)的回滾和提交互不影響彼棍,制造一個(gè)內(nèi)層獨(dú)立的小事務(wù)
NOT_SUPPORTED 必須不運(yùn)行在事務(wù)中灭忠,如果當(dāng)前已存在事務(wù)時(shí)則掛起當(dāng)前事務(wù),無事務(wù)運(yùn)行結(jié)束后恢復(fù)被掛起事務(wù);當(dāng)前不存在事務(wù)則直接無事務(wù)運(yùn)行 保證當(dāng)前執(zhí)行一定不運(yùn)行在事務(wù)當(dāng)中座硕,但是又允許已經(jīng)在事務(wù)中的邏輯調(diào)用
NEVER 必須不運(yùn)行在事務(wù)中弛作,如果當(dāng)前已存在事務(wù)時(shí)則拋出異常;當(dāng)前不存在事務(wù)則直接無事務(wù)運(yùn)行 當(dāng)前一定不能運(yùn)行在事務(wù)中,一些涉及外部調(diào)用华匾,耗時(shí)長映琳,為了避免導(dǎo)致事務(wù)出問題,標(biāo)記NEVER蜘拉,在開發(fā)階段發(fā)現(xiàn)問題直接解決
NESTED 如果當(dāng)前已存在事務(wù)時(shí)則創(chuàng)建savepoint萨西,然后在事務(wù)中運(yùn)行;當(dāng)前不存在事務(wù)開啟新事務(wù)運(yùn)行 基于保存點(diǎn),內(nèi)層事務(wù)的回滾和提交不會(huì)影響外層事務(wù)旭旭。 外層事務(wù)的回滾和提交會(huì)影響內(nèi)層事務(wù)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末谎脯,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子持寄,更是在濱河造成了極大的恐慌穿肄,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,110評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件际看,死亡現(xiàn)場離奇詭異咸产,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)仲闽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門脑溢,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人赖欣,你說我怎么就攤上這事屑彻。” “怎么了顶吮?”我有些...
    開封第一講書人閱讀 165,474評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵社牲,是天一觀的道長。 經(jīng)常有香客問我悴了,道長搏恤,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,881評(píng)論 1 295
  • 正文 為了忘掉前任湃交,我火速辦了婚禮熟空,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘搞莺。我一直安慰自己息罗,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,902評(píng)論 6 392
  • 文/花漫 我一把揭開白布才沧。 她就那樣靜靜地躺著迈喉,像睡著了一般绍刮。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上挨摸,一...
    開封第一講書人閱讀 51,698評(píng)論 1 305
  • 那天孩革,我揣著相機(jī)與錄音,去河邊找鬼油坝。 笑死嫉戚,一個(gè)胖子當(dāng)著我的面吹牛刨裆,可吹牛的內(nèi)容都是我干的澈圈。 我是一名探鬼主播,決...
    沈念sama閱讀 40,418評(píng)論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼帆啃,長吁一口氣:“原來是場噩夢啊……” “哼瞬女!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起努潘,我...
    開封第一講書人閱讀 39,332評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤诽偷,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后疯坤,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體报慕,經(jīng)...
    沈念sama閱讀 45,796評(píng)論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,968評(píng)論 3 337
  • 正文 我和宋清朗相戀三年压怠,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了眠冈。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,110評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡菌瘫,死狀恐怖蜗顽,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情雨让,我是刑警寧澤雇盖,帶...
    沈念sama閱讀 35,792評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站栖忠,受9級(jí)特大地震影響崔挖,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜庵寞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,455評(píng)論 3 331
  • 文/蒙蒙 一虚汛、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧皇帮,春花似錦卷哩、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽冷溶。三九已至,卻和暖如春尊浓,著一層夾襖步出監(jiān)牢的瞬間逞频,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評(píng)論 1 272
  • 我被黑心中介騙來泰國打工栋齿, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留苗胀,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,348評(píng)論 3 373
  • 正文 我出身青樓瓦堵,卻偏偏與公主長得像基协,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子菇用,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,047評(píng)論 2 355