spring Transactional 深入分析
sschrodinger
2020年8月6日
引用
JDBC 官方文檔 - Using Transactions
spring AOP源碼深度解析 - 掘金 - 智能后端小頭
SpringBoot中@Transactional事務(wù)控制實(shí)現(xiàn)原理及事務(wù)無效問題排查 - CSDN - hanchao5272
Spring AOP: Spring之面向方面編程 攔截器 MethodInterceptor - CSDN - 洪文識(shí)途
spring boot 版本 - 2.3.1.RELEASE
連接池版本 - c3p0 - 0.9.5.5
mysql 版本 - mysql communication - 8.0.21
開始
對(duì)于 JDBC 來說昵观,事務(wù)的創(chuàng)建是通過設(shè)置 sql 的自動(dòng)提交為 false
實(shí)現(xiàn)的。
以一個(gè)最小化的 Spring boot 文件驗(yàn)證。
驗(yàn)證代碼如下:
/*
* 需要 junit 與 spring-jdbc 組件
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class JdbcSimpleDatasourceApplicationTests {
@Test
public void springDataSourceTest(){
//輸出為true
System.out.println(dataSource instanceof HikariDataSource);
try{
Connection connection = dataSource.getConnection();
System.out.println(connection.getTransactionIsolation());
// 設(shè)置自動(dòng)提交為 false
// 當(dāng)進(jìn)行一個(gè)新 sql 時(shí)谴咸,會(huì)自動(dòng)開啟一個(gè)事務(wù)
connection.setAutoCommit(false);
System.out.println(connection.getTransactionIsolation());
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from student");
while (resultSet.next()) {
System.out.println(resultSet.getString("name"));
}
statement.execute("insert student values (\"name\", 90, 25)");
Thread.sleep(1000000);
statement.close();
connection.close();
}catch (Exception exception){
exception.printStackTrace();
}
}
}
在測(cè)試代碼進(jìn)入 sleep 后亭病,查看 mysql 的 information_schema.innodb_trx
表(可以查看正在運(yùn)行的事務(wù))七问,如下:
mysql> select * from information_schema.innodb_trx\G;
*************************** 1. row ***************************
trx_id: 2568
trx_state: RUNNING
trx_started: 2020-08-06 14:23:41
trx_requested_lock_id: NULL
trx_wait_started: NULL
trx_weight: 2
trx_mysql_thread_id: 43
trx_query: NULL
trx_operation_state: NULL
trx_tables_in_use: 0
trx_tables_locked: 1
trx_lock_structs: 1
trx_lock_memory_bytes: 1136
trx_rows_locked: 0
trx_rows_modified: 1
trx_concurrency_tickets: 0
trx_isolation_level: REPEATABLE READ
trx_unique_checks: 1
trx_foreign_key_checks: 1
trx_last_foreign_key_error: NULL
trx_adaptive_hash_latched: 0
trx_adaptive_hash_timeout: 0
trx_is_read_only: 0
trx_autocommit_non_locking: 0
trx_schedule_weight: NULL
1 row in set (0.00 sec)
ERROR:
No query specified
可以看到旁舰,這時(shí)已經(jīng)產(chǎn)生了一個(gè)事務(wù)沟娱。
接下來,驗(yàn)證事務(wù)只與連接有關(guān)葛作,即只要一個(gè)連接設(shè)置了自動(dòng)提交為 false
的標(biāo)志位寿羞,則不管連接在什么線程中,都只會(huì)有一個(gè)事務(wù)进鸠。
測(cè)試代碼如下:
/*
* 注意測(cè)試時(shí)需要注意 datasource 的選擇稠曼,最好選擇原生 database形病,以防止兩個(gè)線程拿到同一個(gè) connection
*
*/
@SpringBootTest
@RunWith(SpringRunner.class)
public class JdbcSimpleDatasourceApplicationTests {
@Test
public void springDataThreadSourceTest() throws InterruptedException, SQLException {
/*
* 兩個(gè)線程共用一個(gè)連接
*/
Connection connection = dataSource.getConnection();
Thread thread_one = new Thread(()->{
try {
System.out.println(connection.getTransactionIsolation());
connection.setAutoCommit(false);
System.out.println(connection.getTransactionIsolation());
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from student");
//...
Thread.sleep(1000000);
} catch (Exception exception){
exception.printStackTrace();
}
});
Thread thread_two = new Thread(()->{
try {
System.out.println(connection.getTransactionIsolation());
connection.setAutoCommit(false);
System.out.println(connection.getTransactionIsolation());
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from student");
//...
Thread.sleep(1000000);
} catch (Exception exception){
exception.printStackTrace();
}
});
thread_one.start();
thread_two.start();
thread_one.join();
thread_two.join();
}
@Test
public void springDataCOnnectSourceTest() throws InterruptedException {
/*
* 兩個(gè)線程分別用兩個(gè)連接
*/
Thread thread_one = new Thread(()->{
try (Connection connection = dataSource.getConnection();){
System.out.println(connection.getTransactionIsolation());
connection.setAutoCommit(false);
System.out.println(connection.getTransactionIsolation());
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from student");
//...
Thread.sleep(1000000);
connection.commit();
} catch (Exception exception){
exception.printStackTrace();
}
});
Thread thread_two = new Thread(()->{
try (Connection connection = dataSource.getConnection();){
System.out.println(connection.getTransactionIsolation());
connection.setAutoCommit(false);
System.out.println(connection.getTransactionIsolation());
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from student");
//...
Thread.sleep(1000000);
connection.commit();
} catch (Exception exception){
exception.printStackTrace();
}
});
thread_one.start();
thread_two.start();
thread_one.join();
thread_two.join();
}
}
查看 mysql 的 information_schema.innodb_trx
表客年,可以看到共用同一個(gè)連接的只有一個(gè)事務(wù),兩個(gè)連接的有兩個(gè)事務(wù)漠吻。
有兩個(gè)結(jié)論:
結(jié)論
- 設(shè)置自動(dòng)提交為
false
之后量瓜,會(huì)開啟一個(gè)事務(wù)(隱式),同時(shí)途乃,一個(gè)連接只允許存在一個(gè)事務(wù)绍傲,多個(gè)連接的事務(wù)不共享。- 除了設(shè)置自動(dòng)提交外耍共,JDBC 沒有其他的手段去創(chuàng)建事務(wù)烫饼。
根據(jù)這兩個(gè)結(jié)論,提出一些猜測(cè):
推測(cè)
- spring 在實(shí)現(xiàn)動(dòng)態(tài)代理時(shí)试读,會(huì)在目標(biāo)函數(shù)前加上取消自動(dòng)提交的語句杠纵,并在目標(biāo)函數(shù)完成之后,進(jìn)行提交或者處理
代碼形式如下:
// 目標(biāo)方法
public void targetMethod() {
// do some db op
}
// 代理方法
public void proxyMethod() {
Connection connection = getConnection();
// 首先設(shè)置自動(dòng)提交為 false
connection.setAutoCommit(false);
try {
targetMethod();
// 提交
connection.commit();
} catch(Exception e) {
// 回滾
connection.rollback();
} finally {
// 最后需要設(shè)置自動(dòng)提交為 true
connection.setAutoCommit(true);
}
}
那接下來看 spring 如何包裝這些階段钩骇。
TransactionManager
TransactionManager
是一個(gè)空接口比藻,只標(biāo)志其是一個(gè)事務(wù)管理器,我們直接看他的子接口 PlatformTransactionManager
倘屹,形式如下:
public interface PlatformTransactionManager extends TransactionManager {
// 用于開啟一個(gè)事務(wù)
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
// 用于事務(wù)提交
void commit(TransactionStatus status) throws TransactionException;
// 用于事務(wù)回滾
void rollback(TransactionStatus status) throws TransactionException;
以最常見的 DataSourceTransactionManager
舉例银亲,我們分析他的實(shí)現(xiàn)。
如下是他的一個(gè) UML 類圖纽匙,我們分析他的主要流程 PlatformTransactionManager -> AbstractPlatformTransactionManager -> DataSourceTransactionManager
AbstractPlatformTransactionManager
實(shí)現(xiàn)了事務(wù)的基本模板务蝠,首先看 getTransaction()
函數(shù):
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// 1. 設(shè)置傳播級(jí)別等事務(wù)信息定義
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 2. 獲得當(dāng)前事務(wù)
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
if (isExistingTransaction(transaction)) {
// 存在事務(wù)時(shí)按照傳播級(jí)別處理
return handleExistingTransaction(def, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 3. 如果不存在事務(wù),TransactionDefinition.PROPAGATION_MANDATORY 需要拋出異常
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 4. 創(chuàng)建事務(wù)掛起記錄
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 5. 掛起事務(wù)并執(zhí)行
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
// 6. 掛起事務(wù)繼續(xù)執(zhí)行
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
Note
- step 1:獲得事務(wù)的定義
def
烛缔,即定義的@Transactional
注解中的信息馏段,包括傳播級(jí)別等- step 2:獲得當(dāng)前事務(wù)
transaction
- step 3:如果不存在當(dāng)前事務(wù)赠尾,那么遇到傳播級(jí)別
PROPAGATION_MANDATORY
直接拋出異常- step 4:創(chuàng)建事務(wù)掛起記錄
- step 5:根據(jù)
def
的傳播級(jí)別掛起事務(wù),并開始執(zhí)行- step 6:如果異常毅弧,則取消掛起
看如何獲得當(dāng)前事務(wù)气嫁,doGetTransaction()
在 DataSourceTransactionManager
實(shí)現(xiàn),如下:
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
DataSourceTransactionObject
持有了一個(gè) ConnectionHolder
够坐,ConnectionHolder
又持有了一個(gè) connect
寸宵,簡(jiǎn)單的說, DataSourceTransactionObject
持有了一個(gè)唯一的連接元咙,因?yàn)橐粋€(gè)連接可以代表一個(gè)事務(wù)梯影,因此 doGetTransaction()
就是得到一個(gè) connection
,用 connection
代表事務(wù)庶香。
重點(diǎn)看一下 TransactionSynchronizationManager
甲棍。
TransactionSynchronizationManager
管理著多個(gè)ThreadLocal,用于存儲(chǔ) ConnectionHolder
等對(duì)象赶掖,因此感猛,Spring 中每個(gè)線程所獲得的 Connection
是獨(dú)立的。并且每個(gè)線程只持有一個(gè) Connection
奢赂。
通過 ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
函數(shù)可以返回一個(gè)該線程所對(duì)應(yīng)的唯一一個(gè) connection
(ThreadLocal
保存的 Map
鍵為 datasource
)陪白。
然后看創(chuàng)建事務(wù)掛起記錄,主要兩個(gè)步驟膳灶,第一個(gè)步驟是移除 ThreadLocal
中的記錄咱士,第二個(gè)是將 ThreadLocal
中的記錄包裝成 suspendedResources
,記錄 Connection
等信息轧钓。
最后看開始事務(wù)的過程序厉,startTransaction()
開始新事務(wù),并返回被掛起的事務(wù)信息和當(dāng)前事務(wù)信息毕箍。
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
//
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
// in DataSourceTransactionManager
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 新建一個(gè)連接
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
// 設(shè)置自動(dòng)提交為 false弛房,開啟一個(gè)事務(wù)
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
commit()
與 rollback()
過程與其相似,略霉晕。
總結(jié)
TransactionManager
使用ThreadLocal
存儲(chǔ)唯一對(duì)應(yīng)的連接- 事務(wù)被掛起庭再,等效于使用新的連接創(chuàng)建事務(wù),并將舊事務(wù)數(shù)據(jù)保存在返回的新事務(wù)數(shù)據(jù)中
- 事務(wù)執(zhí)行結(jié)束牺堰,查看有無舊事務(wù)拄轻,有則將舊事務(wù)的信息存儲(chǔ)回
ThreadLocal
中
TransactionInterceptor
spring 動(dòng)態(tài)代理時(shí),會(huì)順序執(zhí)行攔截器(攔截器實(shí)現(xiàn)了around通知)伟葫,只需要調(diào)用 invoke
函數(shù)就可以完成所有的代理工作恨搓,代理函數(shù)偽代碼如下:
public XXInterceptor implements MethodInterceptor {
// invocation 提供一個(gè)方法 getMethod 獲得被代理方法
Object invoke(MethodInvocation invocation) throws Throwable {
try {
doSomethingBefore();
invocation.getMethod().invoke();
doSomethingAfter();
} catch (Exception e) {
doSomethingError();
} finally {
doSomethingFinal();
}
}
public void doSomethingBefore(){};
public void doSomethingAfter(){};
public void doSomethingError(){};
public void doSomethingFinal(){};
}
事務(wù)使用攔截器 TransactionInterceptor
,看部分代碼,如下:
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
//...
// ptm is instance of TransactionManager
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 新建事務(wù)
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// 執(zhí)行其他代理函數(shù)與被代理函數(shù)
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 執(zhí)行錯(cuò)誤回滾(定義的 exception )或提交(沒有在該 exception 定義 rollback)
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 調(diào)用 commit 提交
commitTransactionAfterReturning(txInfo);
return retVal;
}
//...
}
因此斧抱,spring 事務(wù)是擴(kuò)展了代理類的功能常拓,并且使用 TransactionManager
提供了事務(wù)的功能。
事務(wù)注解的使用場(chǎng)景
由以上的分析辉浦,可得結(jié)論弄抬。
結(jié)論
- jdbc 的事務(wù)又由
setAutoCommmit
控制,數(shù)據(jù)庫(kù)連接設(shè)置值為false
之后宪郊,隱式的變成一個(gè)事務(wù)- spring 使用
TransctionManager
封裝事務(wù)- spring 每個(gè)線程持有一個(gè)唯一的數(shù)據(jù)庫(kù)連接掂恕,調(diào)用
getTransaction
方法后,會(huì)生成一個(gè)事務(wù)(置 AutoCommmit 為false
)- 只要在同一個(gè)線程中弛槐,連接就相同懊亡,就是一個(gè)事務(wù)
因此,注解寫法總結(jié)如下:
1. 同一個(gè)類函數(shù)相互調(diào)用乎串,被調(diào)用的函數(shù)注解無效店枣。
舉例如下:
public class TestService {
@Transactional
public void a() {
// do some database op
}
@Transactional
public void b() {
a();
// do some database op
}
}
b()
調(diào)用 a()
,a()
的注解會(huì)失效(不能在同一個(gè)類叹誉,原因是Spring aop 原理)
2. 不同類函數(shù)相互調(diào)用鸯两,如果都是 Required
注解,沒有必要在被調(diào)用函數(shù)增加注解桂对。
舉例如下:
@Component
public class TestService_1 {
@Transactional(propagation = Propagation.REQUIRED)
public void a() {
// do some database op
}
}
public class TestService_2 {
@Autowired private TestService_1 service;
@Transactional(propagation = Propagation.REQUIRED)
public void b() {
// do some database op
service.a();
}
}
b()
調(diào)用 a()
甩卓,a()
處于同一個(gè)線程鸠匀,使用相同連接蕉斜,則本來就存在事務(wù),如果 a()
的傳播級(jí)別為 Propagation.REQUIRED
缀棍,則不需要在 a()
增加注解(提高效率)