spring事務(wù)(一) 編程式事務(wù)
知識(shí)導(dǎo)讀
- 本地事務(wù)(autocommit=false)-----編程式事務(wù)-----聲明式事務(wù)
- spring對(duì)事務(wù)對(duì)象的封裝其實(shí)就是封裝了一個(gè)connection钝诚,開啟事務(wù)的本質(zhì)的將connection的autocommit=false,然后使用connection進(jìn)行數(shù)據(jù)庫操作寇僧,最后commit或者rollback
- spring事務(wù)實(shí)現(xiàn)的關(guān)鍵是保證開啟事務(wù)的connection和執(zhí)行數(shù)據(jù)庫操作的connection是同一個(gè),所以兩者操作connection都要通過TransactionSynchronizationManager去獲取和存儲(chǔ)疑枯,TransactionSynchronizationManager會(huì)將connection保存到ThreadLocal中袍榆,保證同一線程中connection共享。
- TransactionSynchronizationManager獲取connection時(shí)用的key是DataSource對(duì)象糊昙,所以要保證sql執(zhí)行和事務(wù)用的是同一個(gè)DataSource
- spring事務(wù)的connection和配置信息都會(huì)存放到ThreadLocal中,在一個(gè)事務(wù)中開啟新的線程谢谦,會(huì)導(dǎo)致新線程中sql執(zhí)行喪失事務(wù)控制
- spring中事務(wù)的隔離級(jí)別都是基于數(shù)據(jù)庫實(shí)現(xiàn)的释牺、timeout是在sql執(zhí)行的時(shí)候判斷萝衩,所以在事務(wù)方法中的timeout配置只會(huì)作用在最后一個(gè)sql語句處,余下的方法邏輯無法控制
- 事務(wù)的掛起本質(zhì)就是將開啟事務(wù)的connection從ThreadLocal移除没咙,然后重新獲取connection
- 事務(wù)的恢復(fù)本質(zhì)就是將掛起事務(wù)的connection重新放入ThreadLocal中
- spring的傳播行為定義了在spring中多個(gè)事務(wù)如何共存猩谊,每次執(zhí)行無論傳播行為是什么都會(huì)返回一個(gè)新的事務(wù)狀態(tài)TransactionStatus對(duì)象,不同的傳播行為會(huì)導(dǎo)致當(dāng)前運(yùn)行在不同的事務(wù)上下文中祭刚,事務(wù)運(yùn)行在不同的上下文牌捷,在事務(wù)提交和回滾的時(shí)候操作不同,主要有以下4中情況
- 掛起事務(wù)然后無事務(wù)運(yùn)行(newTransaction=false): 提交和回滾都無需操作
- 新開啟事務(wù)(newTransaction=true):提交commit 回滾 rollback
- 在當(dāng)前事務(wù)中運(yùn)行(newTransaction=false):提交不進(jìn)行任何操作涡驮,回滾標(biāo)記rollbacOnly=true然后不進(jìn)行操作
- 嵌套事務(wù)暗甥,創(chuàng)建保存點(diǎn)(hasSavePoint&newTransaction=false):提交刪除保存點(diǎn),回滾則回滾到保存點(diǎn)
- 可以通過TransactionSynchronizationManager的registerSynchronization方法添加監(jiān)聽事務(wù)提交前捉捅、提交后撤防、回滾前、回滾后棒口、完成前寄月、完成后的事件。
spring事務(wù)管理
事務(wù)管理器
spring中 PlatformTransactionManager 接口是spring事務(wù)管理器的根接口陌凳,定義了事務(wù)的操作行為:開啟事務(wù)、提交事務(wù)内舟、回滾事務(wù)
public interface PlatformTransactionManager {
//開啟事務(wù)合敦,返回事務(wù)的封裝對(duì)象 TransactionStatus
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
//提交該事務(wù)
//如果事務(wù)標(biāo)記回滾rollbackOnly=true,則執(zhí)行回滾验游。
//如果事務(wù)是一個(gè)新事務(wù)充岛,則調(diào)用connection.commit提交事務(wù)
//如果事務(wù)包含 保存點(diǎn),則刪除保存點(diǎn)
//如果事務(wù)不是一個(gè)新事務(wù)耕蝉,運(yùn)行在外層事務(wù)中則忽略提交崔梗,以便適當(dāng)?shù)貐⑴c周圍的事務(wù)。
//如果前一個(gè)事務(wù)已經(jīng)掛起垒在,那么在提交新事務(wù)之后恢復(fù)前一個(gè)事務(wù)蒜魄。
void commit(TransactionStatus status) throws TransactionException;
//執(zhí)行給定事務(wù)的回滾。
//如果事務(wù)包含 保存點(diǎn)场躯,則回滾到保存點(diǎn)
//如果事務(wù)是新事務(wù)谈为,調(diào)用connection.rollback回滾事務(wù)
//如果事務(wù)不是新事務(wù),運(yùn)行在外層事務(wù)則設(shè)置rollbackOnly=true
//如果前一個(gè)事務(wù)已經(jīng)掛起踢关,那么在回滾新事務(wù)之后恢復(fù)前一個(gè)事務(wù)伞鲫。
void rollback(TransactionStatus status) throws TransactionException;
}
spring事務(wù)管理的整體實(shí)現(xiàn)由抽象類 AbstractPlatformTransactionManager 實(shí)現(xiàn)。在下面的源碼分析中會(huì)看到签舞。
開啟事務(wù)的參數(shù)是一個(gè)TransactionDefinition對(duì)象秕脓,該類封裝了事務(wù)的配置屬性柒瓣,包括傳播行為、超時(shí)配置吠架、回滾異常芙贫、隔離級(jí)別等信息。事務(wù)開啟后返回TransactionStatus對(duì)象诵肛,提交和回滾都是操作該對(duì)象
子類如DataSourceTransactionManager主要負(fù)責(zé)實(shí)現(xiàn)具體事務(wù)對(duì)象(connection)的管理屹培,如獲取、保存同步怔檩、開啟事務(wù)褪秀、提交、回滾等操作薛训。
事務(wù)狀態(tài)
spring中TransactionStatus類用于封裝事務(wù)的狀態(tài)媒吗。spring中事務(wù)的開啟、提交乙埃、回滾都是通過操作該對(duì)象實(shí)現(xiàn)
- completed:標(biāo)識(shí)事務(wù)是否已完成結(jié)束
- rollbackOnly:標(biāo)識(shí)事務(wù)需要回滾闸英,在非獨(dú)立新開啟事務(wù)中用于回滾標(biāo)記,在事務(wù)提交時(shí)的時(shí)候判斷該值進(jìn)行回滾
- newTransaction :標(biāo)識(shí)事務(wù)是否是一個(gè)獨(dú)立新開啟的事務(wù)或者運(yùn)行在外層事務(wù)或者是否是嵌套事務(wù)
spring中使用TransactionStatus的實(shí)現(xiàn)類DefaultTransactionStatus介袜,在這個(gè)類中封裝了兩個(gè)最重要的屬性值
- transaction: 事務(wù)對(duì)象甫何,一般返回的是一個(gè)包裝了connection的對(duì)象
- suspendedResources: 掛起對(duì)的事務(wù)對(duì)象,包含被掛起的connection對(duì)象和被掛起的事務(wù)配置
- 聲明式事務(wù)的配置信息
編程式開啟事務(wù)
spring中定義了一個(gè)TransactionTemplate類遇伞,用于編程式事務(wù)執(zhí)行數(shù)據(jù)庫操作
TransactionTemplate的execute方法中辙喂,通過transactionManager開啟一個(gè)事務(wù)并返回TransactionStatus,然后通過TransactionCallback執(zhí)行實(shí)際的數(shù)據(jù)庫操作鸠珠,執(zhí)行完畢后調(diào)用transactionManager提交事務(wù)巍耗,如果發(fā)生異常則調(diào)用transactionManager回滾事務(wù)
public <T> T execute(TransactionCallback<T> action) throws TransactionException {
Assert.state(this.transactionManager != null, "No PlatformTransactionManager set");
if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
}else {
//開啟事務(wù),并返回封裝事務(wù)的TransactionStatus對(duì)象
TransactionStatus status = this.transactionManager.getTransaction(this);
T result;
try {
//執(zhí)行sql操作
result = action.doInTransaction(status);
}
catch (RuntimeException | Error ex) {
//sql操作異常進(jìn)行事務(wù)回滾
rollbackOnException(status, ex);
throw ex;
}
catch (Throwable ex) {
//sql操作異常進(jìn)行事務(wù)回滾
rollbackOnException(status, ex);
throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
}
//sql執(zhí)行成功 提交事務(wù)
this.transactionManager.commit(status);
return result;
}
}
開啟事務(wù)
AbstractPlatformTransactionManager實(shí)現(xiàn)了事務(wù)管理器的getTransaction渐排,在該方法中完成了事務(wù)的開啟和事務(wù)傳播行為的處理炬太。
通過子類實(shí)現(xiàn)的doGetTransaction獲取一個(gè)transaction事務(wù)對(duì)象,類型是Object的驯耻,可想而知兼容不好處理啊亲族,但是在這里也做了取巧動(dòng)作,雖然返回的是Object類型,但是在父類中不會(huì)對(duì)該事務(wù)對(duì)象進(jìn)行任何操作處理,所有關(guān)于事務(wù)對(duì)象的操作都交由子類覆寫實(shí)現(xiàn)舅逸。比如isExistingTransaction(判斷是否已存在事務(wù))悟泵、doBegin(開啟事務(wù))。我們先不考慮子類的具體實(shí)現(xiàn),首先看事務(wù)處理的整體邏輯
獲取到事務(wù)對(duì)象后,通過子類覆寫邏輯判斷如果已經(jīng)開啟事務(wù)(是否有個(gè)autoCommit=false的connection)誉结,則根據(jù)事務(wù)的傳播行為判斷多事務(wù)共存的情形炼鞠,然后返回TransactionStatus缘滥。
如果當(dāng)前還未開啟事務(wù),則根據(jù)事務(wù)傳播行為判斷是否需要開啟事務(wù)谒主,如果需要?jiǎng)t調(diào)用子類覆寫邏輯 doBegin方法朝扼,將事務(wù)對(duì)象transaction傳遞下去開啟事務(wù),然后封裝好TransactionStatus對(duì)象返回
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
//獲取Transaction對(duì)象霎肯,由具體子類實(shí)現(xiàn)
Object transaction = doGetTransaction();
//如果事務(wù)配置為空擎颖,創(chuàng)建一個(gè)默認(rèn)實(shí)現(xiàn)
if (definition == null) {
definition = new DefaultTransactionDefinition();
}
//在這里要處理多事務(wù)共存的情況,根據(jù)不同的傳播行為來實(shí)現(xiàn)不同的業(yè)務(wù)處理
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(definition, transaction, debugEnabled);
}
//確定不存在事務(wù)观游,要開啟一個(gè)新事務(wù)搂捧,根據(jù)不同的事務(wù)傳播行為進(jìn)行處理
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}//根據(jù)事務(wù)傳播行為配置,開啟一個(gè)新事務(wù)
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
//暫停一個(gè)空事務(wù)
SuspendedResourcesHolder suspendedResources = suspend(null);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//構(gòu)建一個(gè)DefaultTransactionStatus對(duì)象懂缕,用于封裝事務(wù)信息
//新開啟的事務(wù) newtransaction = true
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//調(diào)用子類 開啟事務(wù)允跑,其實(shí)就是創(chuàng)建一個(gè)connection,并設(shè)置autoCommit = false
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
} catch (RuntimeException | Error ex) {
//恢復(fù)被掛起的事務(wù)
resume(null, suspendedResources);
throw ex;
}
} else {//無需事務(wù)運(yùn)行搪柑,在這里構(gòu)建一個(gè)沒有事務(wù)的上線文
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
AbstractPlatformTransactionManager.handleExistingTransaction方法處理當(dāng)前已存在事務(wù)的情況聋丝。根據(jù)TransactionDefinition中配置的傳播行為來處理多事務(wù)共存情況。最終封裝返回事務(wù)狀態(tài)對(duì)象TransactionStatus工碾。具體處理看代碼注釋弱睦,
注意到 newTransaction 屬性的配置,該配置會(huì)直接影響事務(wù)的提交和回滾操作處理
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
//以非事務(wù)方式執(zhí)行渊额,如果當(dāng)前存在事務(wù)况木,則拋出異常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
//以非事務(wù)方式執(zhí)行操作,如果當(dāng)前存在事務(wù)端圈,就把當(dāng)前事務(wù)掛起焦读。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { //掛起當(dāng)前事務(wù)
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
//構(gòu)建一個(gè)無事務(wù)的 TransactionStatus 返回 newtransaction = false
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
//新建事務(wù)子库,如果當(dāng)前存在事務(wù)舱权,把當(dāng)前事務(wù)掛起,然后新開啟一個(gè)事務(wù)返回事務(wù)狀態(tài)
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
//掛起當(dāng)前事務(wù)仑嗅,并獲取被掛起事務(wù)的信息宴倍,保存在DefaultTransactionStatus中,用于恢復(fù)時(shí)用
//掛起的時(shí)候會(huì)清除掉transaction中的connection仓技,在doBegin的時(shí)候會(huì)重新獲取一個(gè)connection
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//新開啟的事務(wù) newtransaction = true
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//重新開啟一個(gè)事務(wù)鸵贬,在doBegin里面會(huì)重新獲取一個(gè)新的connection,開啟事務(wù)
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
} catch (RuntimeException | Error beginEx) {
//開啟新事務(wù)異常脖捻,重新恢復(fù)掛起的事務(wù)
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
//如果當(dāng)前存在事務(wù)阔逼,則在嵌套事務(wù)內(nèi)執(zhí)行。如果當(dāng)前沒有事務(wù)地沮,則進(jìn)行與PROPAGATION_REQUIRED類似的操作嗜浮。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
//使用保存點(diǎn)實(shí)現(xiàn)嵌套事務(wù)
if (useSavepointForNestedTransaction()) {
//嵌套事務(wù)羡亩,不是一個(gè)新開啟的事務(wù), newtransaction = false
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
//當(dāng)前事務(wù)創(chuàng)建一個(gè) 保存點(diǎn)
status.createAndHoldSavepoint();
return status;
} else {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
//PROPAGATION_REQUIRED -- 支持當(dāng)前事務(wù)危融,如果當(dāng)前沒有事務(wù)畏铆,就新建一個(gè)事務(wù)。這是最常見的選擇吉殃。 當(dāng)前已有事務(wù)
//PROPAGATION_SUPPORTS -- 支持當(dāng)前事務(wù)辞居,如果當(dāng)前沒有事務(wù),就以非事務(wù)方式執(zhí)行蛋勺。當(dāng)前已有事務(wù)
//這里先判斷下方法配置的和當(dāng)前事務(wù)配置的事務(wù)隔離級(jí)別瓦灶、只讀屬性是否一致,如果不一致拋異常
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
//在當(dāng)前事務(wù)中運(yùn)行
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//當(dāng)前事務(wù)中運(yùn)行迫卢,不是一個(gè)新開啟的事務(wù)倚搬, newtransaction = false
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
prepareTransactionStatus方法中返回了 DefaultTransactionStatus 對(duì)象。同時(shí)通過TransactionSynchronizationManager類將當(dāng)前事務(wù)狀態(tài)配置到ThreadLocal中乾蛤。
protected final DefaultTransactionStatus prepareTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
prepareSynchronization(status, definition);
return status;
}
protected DefaultTransactionStatus newTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
boolean actualNewSynchronization = newSynchronization &&
!TransactionSynchronizationManager.isSynchronizationActive();
return new DefaultTransactionStatus(
transaction, newTransaction, actualNewSynchronization,
definition.isReadOnly(), debug, suspendedResources);
}
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
分析完開啟事務(wù)的返回結(jié)果每界,接下來看下當(dāng)需要掛起當(dāng)前事務(wù)的時(shí)候,是如何掛起的家卖。AbstractPlatformTransactionManager.suspend方法用于掛起事務(wù)眨层,在該方法中會(huì)調(diào)用子類掛起事務(wù),同時(shí)返回被掛起事務(wù)對(duì)象及其配置信息返回上荡,當(dāng)事務(wù)恢復(fù)的時(shí)候需要依賴這些配置信息用于恢復(fù)趴樱。
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
//子類覆寫實(shí)現(xiàn) 事務(wù)掛起,同時(shí)返回被掛起事務(wù)的信息
//其實(shí)就是將transaction中的connection移除并封裝到suspendedResources中返回
suspendedResources = doSuspend(transaction);
}
//獲取被掛起事務(wù)存儲(chǔ)在ThreadLocal中的配置酪捡,封裝被掛起事務(wù)的配置信息返回
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
} catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
} else if (transaction != null) {
// 掛起事務(wù)
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
} else {
//當(dāng)前不存在事務(wù)叁征,所以無需掛起
return null;
}
}
當(dāng)事務(wù)開啟異常時(shí),需要恢復(fù)被掛起的事務(wù)逛薇,AbstractPlatformTransactionManager.resume用于恢復(fù)事務(wù)捺疼,將TransactionStatus中的SuspendedResources(掛起事務(wù)中的資源)重新放回到ThreadLocal中,該邏輯通過子類doResume方法實(shí)現(xiàn)永罚,然后將其事務(wù)狀態(tài)配置重新設(shè)置到ThreadLocal中啤呼。
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder) throws TransactionException {
if (resourcesHolder != null) {
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
//子類覆寫實(shí)現(xiàn),其實(shí)就是將suspendedResources中的connection放到ThreadLocal中
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
//恢復(fù)ThreadLocal中被掛起事務(wù)的配置
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel); TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
doResumeSynchronization(suspendedSynchronizations);
}
}
}
分析完了AbstractPlatformTransactionManager中開啟事務(wù)的整體執(zhí)行流程呢袱,然后看下子類中 doGetTransaction(獲取事務(wù)對(duì)象)官扣、isExistingTransaction(是否已開啟事務(wù))、deBegin(開啟事務(wù))羞福、doSuspend(掛起事務(wù))惕蹄、doResume(恢復(fù)事務(wù))的具體實(shí)現(xiàn),接下來以最常用的DataSourceTransactionManager舉例
DataSourceTransactionManager覆寫了AbstractPlatformTransactionManager的方法,提供了事務(wù)對(duì)象的具體操作卖陵,同時(shí)封裝了一個(gè)DataSource恋昼,用于獲取connection。
獲取事務(wù)對(duì)象
DataSourceTransactionManager返回的事務(wù)對(duì)象是DataSourceTransactionObject赶促,里面主要就是封裝了一個(gè)ConnectionHolder對(duì)象液肌,ConnectionHolder主要用于存儲(chǔ)一個(gè)數(shù)據(jù)庫連接Connection對(duì)象。
注意這里不是直接通過DataSource去獲取Connection的鸥滨,而是通過TransactionSynchronizationManager去ThreadLocal獲取嗦哆。如果能獲取到證明可能已存在事務(wù),返回connection; 如果獲取不到Connection,就是返回一個(gè)null婿滓。
在spring中sql的執(zhí)行也是會(huì)先通過TransactionSynchronizationManager去ThreadLocal中獲取Connection老速,這樣就實(shí)現(xiàn)了開啟事務(wù)和sql執(zhí)行用的是一個(gè)Connection
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
//這里有可能返回conHolder值,所以有可能不是新建的凸主,直接設(shè)置newConnectionHolder=false橘券,dobegin需要新獲取connection的時(shí)候會(huì)重新設(shè)置為true
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
TransactionSynchronizationManager用于獲取ThreadLocal中設(shè)置的數(shù)據(jù)庫連接信息,key是當(dāng)前數(shù)據(jù)源對(duì)象
@Nullable
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doGetResource(actualKey);
return value;
}
從ThreadLocal獲取Connection信息
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
判斷是否已在一個(gè)事務(wù)中
TransactionSynchronizationManager實(shí)現(xiàn)了isExistingTransaction方法卿吐。接收Object類型的transaction對(duì)象旁舰。因?yàn)檫@個(gè)對(duì)象就是該實(shí)現(xiàn)類自己創(chuàng)建的,所以可以直接強(qiáng)轉(zhuǎn)為DataSourceTransactionObject類型嗡官,事務(wù)開啟的條件為txObject中存在一個(gè)TransactionActive = true 的ConnectionHolder
注意:當(dāng)事務(wù)開啟成功后(setAutoCommit=false)會(huì)設(shè)置 transactionActive = true
@Override
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
開啟事務(wù)
doBegin方法中會(huì)從數(shù)據(jù)源中獲取一個(gè)connection連接箭窜,然后設(shè)置autoCommit=false⊙苄龋可以看到事務(wù)開啟的本質(zhì)就是返回一個(gè)autoCommit=false的connection磺樱。
事務(wù)開啟成功之后,設(shè)置事務(wù)開啟標(biāo)識(shí) setTransactionActive = true
如果是新獲取的connection婆咸,證明是一個(gè)新事務(wù)竹捉,則調(diào)用TransactionSynchronizationManager.bindResource將connectionHolder設(shè)置到ThreadLocal當(dāng)中
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
//開啟事務(wù)前獲取事務(wù)連接 connection,如果事務(wù)對(duì)象中的有了connection就不需要獲取了尚骄,如果沒有去DataSource中獲取一個(gè)新的connection块差,構(gòu)建一個(gè)新的ConnectionHolder
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//從數(shù)據(jù)源中獲取connection
Connection newCon = obtainDataSource().getConnection();
//放到事務(wù)對(duì)象的connectionHolder中,重新獲取的connection,設(shè)置newConnectionHolder=true
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
//將事務(wù)配置設(shè)定到connection上乖仇,同時(shí)記錄下 connection 原始的配置憾儒,事務(wù)完成之后connection狀態(tài)重置回去
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
//關(guān)閉自動(dòng)提交询兴,開啟事務(wù)
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
//事務(wù)開啟成功乃沙,設(shè)置事務(wù)對(duì)象中的TransactionActive=true,用于快速判斷是否開啟事務(wù)
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
//配置事務(wù)超時(shí)時(shí)間
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 將連接信息綁定到ThreadLocal中诗舰,sql執(zhí)行的時(shí)候和事務(wù)傳播行為都會(huì)去ThreadLocal獲取連接
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable ex) {
//異常則關(guān)閉 connection 連接警儒,重置 txObject
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
新獲取的connection需要通過TransactionSynchronizationManager將開啟事務(wù)的connection放到ThreadLocal中。保證事務(wù)過程中用的connection能夠被執(zhí)行sql的代碼拿到
public static void bindResource(Object key, Object value) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
Map<Object, Object> map = resources.get();
// set ThreadLocal Map if none found
if (map == null) {
map = new HashMap<>();
resources.set(map);
}
Object oldValue = map.put(actualKey, value);
// Transparently suppress a ResourceHolder that was marked as void...
if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
oldValue = null;
}
if (oldValue != null) {
throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
}
掛起事務(wù)
操作的還是DataSourceTransactionObject對(duì)象。事務(wù)掛起的本質(zhì)就是移除并返回事務(wù)對(duì)象transaction中原有的connection連接蜀铲。然后再調(diào)用TransactionSynchronizationManager移除ThreadLocal中的connection边琉。
感覺spring這種代碼還是帶點(diǎn)坑的,修改參數(shù)對(duì)象中的值记劝,出問題不好定位变姨,最好的處理是返回一個(gè)操作完的新的對(duì)象
@Override
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);//移除對(duì)象中的connection連接
//移除ThreadLocal中的connection
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
通過TransactionSynchronizationManager移除ThreadLocal中的connection
public static Object unbindResource(Object key) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doUnbindResource(actualKey);
return value;
}
private static Object doUnbindResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
//移除ThreadLocal中connection 并返回
Object value = map.remove(actualKey);
//此處省略代碼。厌丑。定欧。
return value;
}
恢復(fù)事務(wù)
恢復(fù)事務(wù)就是將掛起資源中的connection通過TransactionSynchronizationManager重新放到ThreadLocal中
感覺這里應(yīng)該同時(shí)將connection設(shè)置回transaction中啊,這樣才能和掛起事務(wù)對(duì)應(yīng)怒竿。
@Override
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}
提交事務(wù)
AbstractPlatformTransactionManager.commit定義了提交事務(wù)的流程砍鸠,在事務(wù)提交的時(shí)候首先要判斷下TransactionStatus的rollbackOnly狀態(tài),如果是true耕驰,則回滾事務(wù)爷辱。否則調(diào)用processCommit方法提交
如果當(dāng)前是運(yùn)行在外層事務(wù)中,發(fā)生異常朦肘,不方便直接回滾饭弓,則標(biāo)記rollbackOnly=true,當(dāng)外層執(zhí)行提交的時(shí)候再進(jìn)行回滾
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
//如果標(biāo)記了 localRollbackOnly=true 提交的時(shí)候直接回滾
//TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
if (defStatus.isLocalRollbackOnly()) {
processRollback(defStatus, false);
return;
}
//根據(jù)配置判斷提交的時(shí)候短路走回滾邏輯
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
processRollback(defStatus, true);
return;
}
//提交事務(wù)
processCommit(defStatus);
}
在processCommit方法會(huì)調(diào)用一些事務(wù)提交和完成前后的事件媒抠,根據(jù)TransactionStatus的狀態(tài)進(jìn)行不同的操作
- 如果是嵌套事務(wù)示启,包含保存點(diǎn),則直接釋放保存點(diǎn)就可以领舰,提交由外層事務(wù)完成
- 如果是一個(gè)新開啟的事務(wù)夫嗓,調(diào)用子類實(shí)現(xiàn)的 doCommit方法,其實(shí)就是執(zhí)行connection的commit方法
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);// 觸發(fā)提交前事件
triggerBeforeCompletion(status);//觸發(fā)完成前事件
beforeCompletionInvoked = true;
//事務(wù)提交的時(shí)候如果是嵌套事務(wù)冲秽,有保存點(diǎn)舍咖, 則釋放保存點(diǎn),無需提交事務(wù)锉桑,完成
if (status.hasSavepoint()) {
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
//調(diào)用子類實(shí)現(xiàn)的 doCommit 提交事務(wù)
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
} else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
//全局標(biāo)記了回滾排霉,回滾 RollbackOnly=true
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (UnexpectedRollbackException ex) {
// 觸發(fā)事務(wù)完成 事件
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
} catch (TransactionException ex) {
// 異常后回滾
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
} else {
//觸發(fā)事務(wù)完成 事件
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
} catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
//異常回滾
doRollbackOnCommitException(status, ex);
throw ex;
}
//觸發(fā)事務(wù)提交完成事件和事務(wù)完成事件
try {
triggerAfterCommit(status);
} finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
} finally {
//完成后清除事務(wù)信息民轴,清除當(dāng)前ThreadLocal中的信息攻柠,釋放connection,恢復(fù)掛起的事務(wù)
cleanupAfterCompletion(status);
}
}
子類DataSourceTransactionManager中實(shí)現(xiàn)了提交的具體操作后裸,本質(zhì)就是獲取事務(wù)對(duì)象中的connection瑰钮,然后調(diào)用jdbc的api commit提交。
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();//獲取事務(wù)對(duì)象中的connection
Connection con = txObject.getConnectionHolder().getConnection();
try {//調(diào)用connection的commit方法提交事務(wù)
con.commit();
} catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
事務(wù)完成之后微驶,只要進(jìn)行資源的釋放浪谴、事務(wù)配置信息的清除和掛起事務(wù)的恢復(fù)开睡,所以在finnaly里面調(diào)用了cleanupAfterCompletion方法
- 清除ThreadLocal中事務(wù)狀態(tài)信息
- 如果是新開啟的時(shí)候,調(diào)用子類釋放connection資源苟耻,如果是運(yùn)行在其他事務(wù)中篇恒,無需操作
- 如果存在被掛起的事務(wù),恢復(fù)
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
status.setCompleted();
//如果已經(jīng)將事務(wù)狀態(tài)信息保存到ThreadLocal中凶杖,清除 ThreadLocal中的事務(wù)狀態(tài)配置
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
//如果是新開啟的事務(wù)胁艰,調(diào)用子類釋放連接信息
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
//恢復(fù) 掛起的事務(wù)
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
TransactionSynchronizationManager.clear 用于清除ThreadLocal中的的事務(wù)狀態(tài)配置信息
public static void clear() {
synchronizations.remove();
currentTransactionName.remove();
currentTransactionReadOnly.remove();
currentTransactionIsolationLevel.remove();
actualTransactionActive.remove();
}
新開啟的事務(wù)需要調(diào)用子類釋放事務(wù)資源
- 移除ThreadLocal中connection
- 將 connection 的autoCommit設(shè)置為true
- 調(diào)用DataSource方法釋放connection
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
//清除ThreadLocal中的 connection信息
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
//重置 autoCommit = true
Connection con = txObject.getConnectionHolder().getConnection();
try {
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true);
}
DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
} catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}
//調(diào)用 DataSource 關(guān)閉 connection
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
}
txObject.getConnectionHolder().clear();
}
回滾事務(wù)
AbstractPlatformTransactionManager.commit定義了回滾事務(wù)的流程,調(diào)用processRollback進(jìn)行回滾事務(wù)
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}
在processRollback方法會(huì)調(diào)用一些事務(wù)回滾和完成前后的事件智蝠,根據(jù) TransactionStatus 的狀態(tài)進(jìn)行不同的操作蝗茁,回滾操作完畢后,調(diào)用cleanupAfterCompletion清理事務(wù)信息
- 如果有 保存點(diǎn) 寻咒,直接回滾到 保存點(diǎn)
- 如果是一個(gè)新啟的事務(wù)哮翘,調(diào)用子類實(shí)現(xiàn)doRollback進(jìn)行回滾,其實(shí)就是調(diào)用connection.rollback.
- 如果運(yùn)行在外層事務(wù)毛秘,直接標(biāo)記當(dāng)前事務(wù) rollbackOnly =true饭寺,當(dāng)外層事務(wù)提交的時(shí)候會(huì)進(jìn)行回滾
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
triggerBeforeCompletion(status);
//如果有保存點(diǎn),回滾到保存點(diǎn)
if (status.hasSavepoint()) {
status.rollbackToHeldSavepoint();
}//如果是一個(gè)新的事務(wù)叫挟,調(diào)用子類實(shí)現(xiàn) 回滾事務(wù)
else if (status.isNewTransaction()) {
doRollback(status);
} else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
doSetRollbackOnly(status);
} else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
} else {//不存在事務(wù)艰匙,無需回滾
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
} catch (RuntimeException | Error ex) {
//回滾異常, 觸發(fā)事務(wù)完成事件
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
//回滾成功后觸發(fā)事務(wù)完成事件
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
} finally {
//完成后清除事務(wù)信息,清除當(dāng)前ThreadLocal中的信息抹恳,釋放connection员凝,恢復(fù)掛起的事務(wù)
cleanupAfterCompletion(status);
}
}
子類DataSourceTransactionManager中實(shí)現(xiàn)了回滾的具體操作,本質(zhì)就是獲取事務(wù)對(duì)象中的connection奋献,然后調(diào)用jdbc的api rollback回滾健霹。
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
try {//調(diào)用connection的rollback
con.rollback();
} catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}
sql執(zhí)行與事務(wù)結(jié)合
事務(wù)與sql執(zhí)行用統(tǒng)一connection對(duì)象
來看JdbcTemplate的execute方法,所有的sql執(zhí)行都會(huì)通過該方法執(zhí)行,在該方法中第一步會(huì)通過jdbc配置的DataSource獲取connection瓶蚂,DataSourceUtils要保證能獲取到開啟事務(wù)的connection糖埋,從而實(shí)現(xiàn)事務(wù)控制。在這里就用到了TransactionSynchronizationManager去ThreadLocal中獲取開啟事務(wù)的connection窃这。
public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
throws DataAccessException {
//獲取 connection
Connection con = DataSourceUtils.getConnection(obtainDataSource());
PreparedStatement ps = null;
try {
ps = psc.createPreparedStatement(con);
//這里會(huì)應(yīng)用到事務(wù)配置的超時(shí)時(shí)間 timeout
applyStatementSettings(ps);
T result = action.doInPreparedStatement(ps);
handleWarnings(ps);
return result;
}
}
DataSourceUtils.getConnection用于獲取connection
public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
return doGetConnection(dataSource);
}
在doGetConnection方法中會(huì)首先通過TransactionSynchronizationManager去ThreadLocal中獲取當(dāng)前數(shù)據(jù)源的connection瞳别,spring的事務(wù)也是將connection保存在TransactionSynchronizationManager的ThreadLocal中的,這樣就保證了事務(wù)管理和sql執(zhí)行用的是一個(gè)connection
要注意的是杭攻,獲取connection的key是DataSource對(duì)象祟敛,所以事務(wù)管理的DataSource和jdbc的DataSource一定要用一個(gè)對(duì)象,不然會(huì)導(dǎo)致事務(wù)失效
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified");
//也通過TransactionSynchronizationManager去ThreadLocal中獲取當(dāng)前數(shù)據(jù)源的connection
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
logger.debug("Fetching resumed JDBC Connection from DataSource");
conHolder.setConnection(fetchConnection(dataSource));
}
return conHolder.getConnection();
}
//ThreadLocal中沒有connection兆解,重新從DataSource中獲取馆铁,然后通過TransactionSynchronizationManager綁定到ThreadLocal中,與事務(wù)關(guān)聯(lián)起來
Connection con = fetchConnection(dataSource);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
try {
ConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new ConnectionHolder(con);
}
else {
holderToUse.setConnection(con);
}
holderToUse.requested();
TransactionSynchronizationManager.registerSynchronization(
new ConnectionSynchronization(holderToUse, dataSource));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
}
}
}
return con;
}
事務(wù)超時(shí)時(shí)間(timeout)應(yīng)用
在事務(wù)中會(huì)配置 timeout痪宰,用于控制事務(wù)的超時(shí)實(shí)現(xiàn)叼架。在事務(wù)管理器DataSourceTransactionManager中會(huì)將事務(wù)配置的timeout配置到事務(wù)對(duì)象的connection中,而這個(gè)connection會(huì)通過TransactionSynchronizationManager放到ThreadLocal中的
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
在JdbcTemplate的execute方法中衣撬,有一步applyStatementSettings乖订,就會(huì)去校驗(yàn)事務(wù)的timeout,如果超時(shí)則拋出異尘吡罚回滾事務(wù)
public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
throws DataAccessException {
//獲取 connection
Connection con = DataSourceUtils.getConnection(obtainDataSource());
PreparedStatement ps = null;
try {
ps = psc.createPreparedStatement(con);
//這里會(huì)應(yīng)用到事務(wù)配置的超時(shí)時(shí)間 timeout
applyStatementSettings(ps);
T result = action.doInPreparedStatement(ps);
handleWarnings(ps);
return result;
}
}
applyStatementSettings最后一步判斷事務(wù)超時(shí)時(shí)間
protected void applyStatementSettings(Statement stmt) throws SQLException {
int fetchSize = getFetchSize();
if (fetchSize != -1) {
stmt.setFetchSize(fetchSize);
}
int maxRows = getMaxRows();
if (maxRows != -1) {
stmt.setMaxRows(maxRows);
}
DataSourceUtils.applyTimeout(stmt, getDataSource(), getQueryTimeout());
}
applyTimeout中會(huì)獲取ThreadLocal中的開啟事務(wù)的connection乍构,調(diào)用connectionHolder的getTimeToLiveInSeconds方法,如果已經(jīng)超時(shí)扛点,在該方法中會(huì)拋出異常哥遮。如果還未超時(shí),設(shè)置statement的超時(shí)時(shí)間為剩余時(shí)間陵究,從而實(shí)現(xiàn)了事務(wù)的超時(shí)時(shí)間配置
public static void applyTimeout(Statement stmt, @Nullable DataSource dataSource, int timeout) throws SQLException {
Assert.notNull(stmt, "No Statement specified");
ConnectionHolder holder = null;
//通過TransactionSynchronizationManager獲取ThreadLocal中的 connectionHolder
if (dataSource != null) {
holder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
}
//獲取connectionHolder上配置的事務(wù)超時(shí)時(shí)間
if (holder != null && holder.hasTimeout()) {
stmt.setQueryTimeout(holder.getTimeToLiveInSeconds());
}
else if (timeout >= 0) {
stmt.setQueryTimeout(timeout);
}
}
ConnectionHolder.getTimeToLiveInSeconds方法眠饮,dealine是根據(jù)timeout計(jì)算出的事務(wù)到期時(shí)間,在getTimeToLiveInSeconds方法中如果已經(jīng)到達(dá)deadline铜邮,則調(diào)用checkTransactionTimeout進(jìn)行事務(wù)回滾或者標(biāo)記事務(wù)回滾仪召,從而實(shí)現(xiàn)事務(wù)的超時(shí)回滾
//設(shè)置timeout的時(shí)候 會(huì)計(jì)算出何時(shí)超時(shí) deadline
public void setTimeoutInMillis(long millis) {
this.deadline = new Date(System.currentTimeMillis() + millis);
}
public int getTimeToLiveInSeconds() {
double diff = ((double) getTimeToLiveInMillis()) / 1000;
int secs = (int) Math.ceil(diff);
checkTransactionTimeout(secs <= 0);
return secs;
}
public long getTimeToLiveInMillis() throws TransactionTimedOutException{
if (this.deadline == null) {
throw new IllegalStateException("No timeout specified for this resource holder");
}
long timeToLive = this.deadline.getTime() - System.currentTimeMillis();
//如果達(dá)到超時(shí)時(shí)間,調(diào)用checkTransactionTimeout拋出異常
checkTransactionTimeout(timeToLive <= 0);
return timeToLive;
}
private void checkTransactionTimeout(boolean deadlineReached) throws TransactionTimedOutException {
if (deadlineReached) {
setRollbackOnly();
throw new TransactionTimedOutException("Transaction timed out: deadline was " + this.deadline);
}
}
分析完timeout的實(shí)現(xiàn)松蒜,注意扔茅,事務(wù)的超時(shí)時(shí)通過sql的執(zhí)行去判斷從而進(jìn)行回滾的,但是一個(gè)spring事務(wù)中包含的是一個(gè)方法秸苗,在方法中不僅會(huì)執(zhí)行sql召娜,也會(huì)有其他語句
如果方法最后有個(gè)sql執(zhí)行,那么事務(wù)的超時(shí)時(shí)間配置能夠應(yīng)用
如果方法最后是個(gè)非sql執(zhí)行的語句惊楼,而且很耗時(shí)間玖瘸,會(huì)導(dǎo)致整個(gè)事務(wù)方法的執(zhí)行時(shí)間超過事務(wù)的timeout配置。
事務(wù)隔離級(jí)別
隔離級(jí)別 | 描述 | 場景 |
---|---|---|
REQUIRED | 必須運(yùn)行在事務(wù)中檀咙,當(dāng)前已存在事務(wù)時(shí)在事務(wù)中運(yùn)行;當(dāng)前不存在事務(wù)則開啟一個(gè)新事務(wù) | 運(yùn)行在一個(gè)單層事務(wù)中店读,避免嵌套事務(wù) |
SUPPORTS | 可以運(yùn)行在事務(wù)中,如果當(dāng)前已存在事務(wù)時(shí)在事務(wù)中運(yùn)行;當(dāng)前不存在事務(wù)則直接無事務(wù)運(yùn)行 | 當(dāng)一個(gè)查詢單獨(dú)存在時(shí)無需運(yùn)行在事務(wù)中攀芯;當(dāng)先執(zhí)行了帶事務(wù)的更新屯断,再調(diào)用查詢,則查詢就需要運(yùn)行在事務(wù)中侣诺,在不同的隔離級(jí)別下很有用 |
MANDATORY | 必須運(yùn)行在事務(wù)中殖演,如果當(dāng)前已存在事務(wù)時(shí)在事務(wù)中運(yùn)行;當(dāng)前不存在事務(wù)時(shí)拋出異常 | 當(dāng)一個(gè)方法需要事務(wù)環(huán)境,但不負(fù)責(zé)事務(wù)的開啟年鸳、提交趴久、回滾 |
REQUIRES_NEW | 必須運(yùn)行在事務(wù)中,如果當(dāng)前已存在事務(wù)時(shí)則掛起當(dāng)前事務(wù)搔确,開啟新事務(wù)運(yùn)行結(jié)束后恢復(fù)被掛起事務(wù);當(dāng)前不存在事務(wù)則開啟一個(gè)新事務(wù)運(yùn)行 | 內(nèi)層事務(wù)的回滾和提交和外層事務(wù)的回滾和提交互不影響彼棍,制造一個(gè)內(nèi)層獨(dú)立的小事務(wù) |
NOT_SUPPORTED | 必須不運(yùn)行在事務(wù)中灭忠,如果當(dāng)前已存在事務(wù)時(shí)則掛起當(dāng)前事務(wù),無事務(wù)運(yùn)行結(jié)束后恢復(fù)被掛起事務(wù);當(dāng)前不存在事務(wù)則直接無事務(wù)運(yùn)行 | 保證當(dāng)前執(zhí)行一定不運(yùn)行在事務(wù)當(dāng)中座硕,但是又允許已經(jīng)在事務(wù)中的邏輯調(diào)用 |
NEVER | 必須不運(yùn)行在事務(wù)中弛作,如果當(dāng)前已存在事務(wù)時(shí)則拋出異常;當(dāng)前不存在事務(wù)則直接無事務(wù)運(yùn)行 | 當(dāng)前一定不能運(yùn)行在事務(wù)中,一些涉及外部調(diào)用华匾,耗時(shí)長映琳,為了避免導(dǎo)致事務(wù)出問題,標(biāo)記NEVER蜘拉,在開發(fā)階段發(fā)現(xiàn)問題直接解決 |
NESTED | 如果當(dāng)前已存在事務(wù)時(shí)則創(chuàng)建savepoint萨西,然后在事務(wù)中運(yùn)行;當(dāng)前不存在事務(wù)開啟新事務(wù)運(yùn)行 | 基于保存點(diǎn),內(nèi)層事務(wù)的回滾和提交不會(huì)影響外層事務(wù)旭旭。 外層事務(wù)的回滾和提交會(huì)影響內(nèi)層事務(wù) |