事務(wù)有原子性枫笛、一致性、隔離性刚照、持久性的特點刑巧,在開發(fā)中我們將一組不可分割的操作放在事務(wù)中,要么一起成功无畔,要么一起失敗啊楚,例如最簡單的轉(zhuǎn)賬,我們看一下spring是如何實現(xiàn)事務(wù)管理的檩互。
spring事務(wù)的關(guān)鍵對象
spring的事務(wù)實現(xiàn)中有幾個關(guān)鍵對象特幔,我們先來認識一下:
PlatformTransactionManager
事務(wù)管理器接口只有三個方法:
getTransaction:獲取當(dāng)前激活的事務(wù)或者創(chuàng)建一個事務(wù)
commit:提交當(dāng)前事務(wù)
rollback:回滾當(dāng)前事務(wù)
PlatformTransactionManager有很多具體的實現(xiàn):
- DataSourceTransactionManager,使用JDBC與ibatis進行持久化
- JtaTransactionManager:使用分布式事務(wù)
- JpaTransactionManager:使用jpa
- HibernateTransactionManager:使用hibernate
TransactionDefiition
TransactionDefiition是事務(wù)屬性的定義闸昨,其中包含事務(wù)的隔離級別蚯斯、事務(wù)傳播類型薄风、事務(wù)過期時間、事務(wù)是否可讀等事務(wù)信息拍嵌。
spring的隔離級別和數(shù)據(jù)庫的是對應(yīng)的遭赂,只是多了一個ISOLATION_DEFAULT
類型,表示使用mysql默認隔離級別横辆。
spring的事務(wù)傳播類型如下:
常量名稱 | 常量解釋 |
---|---|
PROPAGATION_REQUIRED | 支持當(dāng)前事務(wù)撇他,如果當(dāng)前沒有事務(wù),就新建一個事務(wù) |
PROPAGATION_REQUIRES_NEW | 新建事務(wù)狈蚤,如果當(dāng)前存在事務(wù)困肩,把當(dāng)前事務(wù)掛起。新建的事務(wù)將和被掛起的事務(wù)沒有任何關(guān)系脆侮,是兩個獨立的事務(wù)锌畸,外層事務(wù)失敗回滾之后,不能回滾內(nèi)層事務(wù)執(zhí)行的結(jié)果靖避,內(nèi)層事務(wù)失敗拋出異常潭枣,外層事務(wù)捕獲,也可以不處理回滾操作 |
PROPAGATION_SUPPORTS | 支持當(dāng)前事務(wù)幻捏,如果當(dāng)前沒有事務(wù)盆犁,就以非事務(wù)方式執(zhí)行 |
PROPAGATION_MANDATORY | 使用當(dāng)前事務(wù),如果當(dāng)前沒有事務(wù)篡九,就拋出異常 |
PROPAGATION_NOT_SUPPORTED | 以非事務(wù)方式執(zhí)行操作谐岁,如果當(dāng)前存在事務(wù),就把當(dāng)前事務(wù)掛起 |
PROPAGATION_NEVER | 以非事務(wù)方式執(zhí)行榛臼,如果當(dāng)前存在事務(wù)翰铡,則拋出異常。 |
PROPAGATION_NESTED | 如果當(dāng)前存在事務(wù)讽坏,則運行在一個嵌套的事務(wù)中锭魔。如果沒有事務(wù),則按REQUIRED屬性執(zhí)行路呜。它使用了一個單獨的事務(wù)迷捧,這個事務(wù)擁有多個可以回滾的保存點。內(nèi)部事務(wù)的回滾不會對外部事務(wù)造成影響胀葱。它只對DataSourceTransactionManager事務(wù)管理器起效 |
事務(wù)的具體實現(xiàn)
我們用一個簡單的例子看一下spring的事務(wù)是如何實現(xiàn)的
新建一個賬戶表漠秋,包含幾個基本字段,金額抵屿、姓名等
CREATE TABLE `account` (
`ID` int(11) NOT NULL AUTO_INCREMENT,
`count` int(11) NOT NULL DEFAULT '0',
`name` varchar(400) DEFAULT NULL COMMENT '姓名',
`update` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
新建一個springboot工程庆锦,引入jdbc與mysql的依賴
新建一個service,在用一個controller調(diào)用此service的轉(zhuǎn)賬方法
@Service
public class AccountService {
private JdbcTemplate jdbctemplate;
public AccountService(DataSource dataSource){
jdbctemplate = new JdbcTemplate(dataSource);
}
@Transactional
public int update(int count,int fromId,int toId){
int update = jdbctemplate.update("update account set count = count-? where id=?", count, fromId);
// int i = 0 / 0;
jdbctemplate.update("update account set count = count+? where id=?", count, toId);
return update;
}
}
為了探究spring是如何實現(xiàn)事務(wù)的轧葛,我們通過tcpdump抓mysql的包來看執(zhí)行的語句
//樓主是mac環(huán)境
sudo tcpdump -i lo0 -s 0 -l -w - port 3306 | strings
我們執(zhí)行用戶1給用戶2轉(zhuǎn)賬5元搂抒,看到抓包結(jié)果為
可以看到艇搀,在第一個語句執(zhí)行前設(shè)置為手動提交,在所有語句執(zhí)行完后執(zhí)行commit求晶,最后再設(shè)置成自動提交焰雕。
如果中間執(zhí)行失敗呢,我們將 int i = 0 / 0;這行的注釋打開芳杏,制造一個RuntimeException矩屁,看看執(zhí)行結(jié)果
事務(wù)被執(zhí)行回滾了。
分析事務(wù)代碼
spring中可以用aop注入TransactionInterceptor爵赵,或者像前面的例子一樣吝秕,在需要事務(wù)管理的方法上加上注解@Transactional,spring在此方法執(zhí)行時會執(zhí)行個方法的interceptor集合空幻,事務(wù)的interceptor是org.springframework.transaction.interceptor.TransactionInterceptor
郭膛,它繼承自TransactionAspectSupport,事務(wù)的具體實現(xiàn)在TransactionAspectSupport中氛悬。
TransactionInterceptor在執(zhí)行時會調(diào)用方法invoke
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
在invokeWithinTransaction中會調(diào)用createTransactionIfNecessary方法
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 設(shè)置事務(wù)名稱
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
//獲取事務(wù)狀態(tài)
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
//填充事務(wù)信息
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
tm.getTransaction(txAttr);這里會使用具體持久化方式對應(yīng)的那個TransactionManager,樓主使用的Jdbc耘柱,實現(xiàn)是DataSourceTransactionManager如捅,getTransaction方法調(diào)用的是父類中的方法
org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
//如果TransactionDefinition為空則使用默認配置
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
if (isExistingTransaction(transaction)) {
//如果有存在的事務(wù),則處理存在的事物
return handleExistingTransaction(def, transaction, debugEnabled);
}
// 判斷事務(wù)是否超時
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 使用PROPAGATION_MANDATORY這個級別调煎,沒有可用的事務(wù)則拋異常
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
//根據(jù)隔離級別開始事務(wù)
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
startTransaction會調(diào)用DataSourceTransactionManager的doBegin方法設(shè)置當(dāng)前連接為手動提交事務(wù)
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
//獲取數(shù)據(jù)庫連接
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
//如果連接是自動提交镜遣,則設(shè)置成手動
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
//這里會和數(shù)據(jù)庫通信
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
再看回org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction方法,這里顯示了aop如何處理事務(wù)
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
//獲取事務(wù)
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
//觸發(fā)后面的攔截器和方法本身
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 捕獲異常處理回滾
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//重置threadLocal中事務(wù)狀態(tài)
cleanupTransactionInfo(txInfo);
}
//省略
...
return retVal;
}
再看一下completeTransactionAfterThrowing方法士袄,如果是需要回滾的異常則執(zhí)行回滾悲关,否則執(zhí)行提交
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
//需要回滾的異常
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
//執(zhí)行回滾
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
else {
try {
//提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
總結(jié)
spring中的事務(wù)實現(xiàn)從原理上說比較簡單,通過aop在方法執(zhí)行前后增加數(shù)據(jù)庫事務(wù)的操作娄柳。
1寓辱、在方法開始時判斷是否開啟新事務(wù),需要開啟事務(wù)則設(shè)置事務(wù)手動提交 set autocommit=0;
2赤拒、在方法執(zhí)行完成后手動提交事務(wù) commit;
3秫筏、在方法拋出指定異常后調(diào)用rollback回滾事務(wù)
spring事務(wù)管理有很多細節(jié)的處理,如傳播方式的實現(xiàn)挎挖,感興趣的同學(xué)自己去看看源碼嘍~