基礎(chǔ)概念
? Spring中事務(wù)支持編程式事務(wù)和聲明式事務(wù)叮称。編程式事務(wù)由使用者自行編碼控制事務(wù)预烙;聲明式事務(wù)則是使用者在方法上加@Transactional注解酬屉,Spring根據(jù)選擇的事務(wù)傳播模式完成事務(wù)的執(zhí)行负懦。兩者比較而言,聲明式事務(wù)在使用時(shí)較為便利缩滨,但是顆粒度較大势就,@Transactional注解加載方法上,整個(gè)方法在一個(gè)事務(wù)中執(zhí)行脉漏,顆粒度較大蛋勺;編程式事務(wù)則相反,事務(wù)流程由使用者自行控制鸠删,編碼上較為繁瑣抱完,但可以自行控制事務(wù)的提交。在方法流程較長刃泡,中間只有部分位置需要使用事務(wù)巧娱,其他流程和事務(wù)無關(guān)且耗時(shí)較長時(shí),可以使用編程式事務(wù)烘贴,提升吞吐量禁添。若方法中事務(wù)耗時(shí)占比很大則可以直接使用聲明式事務(wù)。
Spring中的7種事務(wù)傳播機(jī)制
事務(wù)傳播行為類型 | 說明 |
---|---|
PROPAGATION_REQUIRED | 如果當(dāng)前沒有事務(wù)桨踪,就新建一個(gè)事務(wù)老翘,如果已經(jīng)存在一個(gè)事務(wù)中,加入到這個(gè)事務(wù)中。是Spring默認(rèn)的傳播機(jī)制铺峭。 |
PROPAGATION_REQUIRES_NEW | 新建事務(wù)墓怀,如果當(dāng)前存在事務(wù),把當(dāng)前事務(wù)掛起卫键。 |
PROPAGATION_NESTED | 如果當(dāng)前存在事務(wù)傀履,則在嵌套事務(wù)內(nèi)執(zhí)行。如果當(dāng)前沒有事務(wù)莉炉,則執(zhí)行與PROPAGATION_REQUIRED類似的操作钓账。 |
PROPAGATION_NOT_SUPPORTED | 以非事務(wù)方式執(zhí)行操作,如果當(dāng)前存在事務(wù)絮宁,就把當(dāng)前事務(wù)掛起梆暮。 |
PROPAGATION_SUPPORTS | 支持當(dāng)前事務(wù),如果當(dāng)前沒有事務(wù)绍昂,就以非事務(wù)方式執(zhí)行惕蹄。 |
PROPAGATION_MANDATORY | 使用當(dāng)前的事務(wù),如果當(dāng)前沒有事務(wù)治专,就拋出異常。 |
PROPAGATION_NEVER | 以非事務(wù)方式執(zhí)行遭顶,如果當(dāng)前存在事務(wù)张峰,則拋出異常。 |
? 自己用的主要是PROPAGATION_REQUIRED棒旗、PROPAGATION_REQUIRES_NEW喘批、PROPAGATION_NESTED,所以這邊主要是看這三種的處理過程(其實(shí) PROPAGATION_NESTED 這個(gè)也沒咋用哈哈哈)
源碼學(xué)習(xí)
? 首先明確一點(diǎn)铣揉,Transaction 管理 connection饶深,connection 和 事務(wù)是綁定的,一個(gè)連接可以有多個(gè)事務(wù)逛拱,一個(gè)事務(wù)只能在一個(gè)連接中執(zhí)行敌厘。
開啟事務(wù)
? 通過注解 @EnableTransactionManagement
開啟Spring事務(wù),稍微看一下這個(gè)注解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
boolean proxyTargetClass() default false;
/**
* mode 是 proxy
*/
AdviceMode mode() default AdviceMode.PROXY;
int order() default Ordered.LOWEST_PRECEDENCE;
}
? 顯然是通過 TransactionManagementConfigurationSelector.class
這個(gè)類朽合,在Spring容器初始化的時(shí)候進(jìn)行一個(gè)注入(主要是 IOC 方面的源碼)
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
/**
* Returns {@link ProxyTransactionManagementConfiguration} or
* {@code AspectJ(Jta)TransactionManagementConfiguration} for {@code PROXY}
* and {@code ASPECTJ} values of {@link EnableTransactionManagement#mode()},
* respectively.
*
* ConfigurationClassParser 569 收集的
*/
@Override
protected String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[] {determineTransactionAspectClass()};
default:
return null;
}
}
private String determineTransactionAspectClass() {
return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
}
}
? @EnableTransactionManagement
中設(shè)置了默認(rèn)的 mode 為proxy俱两,所以通過 AutoProxyRegistrar
在 org.springframework.context.annotation.AutoProxyRegistrar#registerBeanDefinitions
會(huì)被注冊(cè)成 AOP 入口 (除了 InfrastructureAdvisorAutoProxyCreator
以外還有其他 AOP 入口,在 AopConfigUtils
這個(gè)類中有設(shè)定優(yōu)先級(jí)曹步,InfrastructureAdvisorAutoProxyCreator
是最弟弟的一個(gè))宪彩。
? 再看 ProxyTransactionManagementConfiguration
,其中 TransactionInterceptor
實(shí)現(xiàn)了 MethodInterceptor
接口讲婚,所以可以加入調(diào)用鏈尿孔。
@Configuration
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
/*
* 創(chuàng)建事務(wù)切面實(shí)例
* BeanFactoryTransactionAttributeSourceAdvisor
* */
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
//里面會(huì)解析收集 @Transactional 注解中的屬性
advisor.setTransactionAttributeSource(transactionAttributeSource());
//設(shè)置 advice
advisor.setAdvice(transactionInterceptor());
if (this.enableTx != null) {
advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
}
return advisor;
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {
return new AnnotationTransactionAttributeSource();
}
/*
* 創(chuàng)建事務(wù)advice
* TransactionInterceptor
* */
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor() {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource());
//事務(wù)管理器和數(shù)據(jù)源有關(guān),所以需要自己定義
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}
? 添加 @Transactional 的方法的話,在 Spring 切面匹配時(shí)會(huì)被切到活合。源碼位置在 org.springframework.aop.support.AopUtils#canApply(org.springframework.aop.Pointcut, java.lang.Class<?>, boolean)
? 切面判斷時(shí)有一個(gè)判斷是事務(wù)判斷雏婶,即 TransactionAttributeSourcePointcut
@Override
public boolean matches(Method method, Class<?> targetClass) {
if (TransactionalProxy.class.isAssignableFrom(targetClass) ||
PlatformTransactionManager.class.isAssignableFrom(targetClass) ||
PersistenceExceptionTranslator.class.isAssignableFrom(targetClass)) {
return false;
}
//如果method能找到@Transactional注解的事務(wù)屬性,則返回true
TransactionAttributeSource tas = getTransactionAttributeSource();
return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}
? matches 中 最終返回的是tas.getTransactionAttribute(method, targetClass)
的結(jié)果芜辕, 通過調(diào)用過程尚骄,我們可以定位到 tas 是從 BeanFactoryTransactionAttributeSourceAdvisor
中獲取的,而 BeanFactoryTransactionAttributeSourceAdvisor
是上述 enable 過程中在 ProxyTransactionManagementConfiguration
中設(shè)置而來的侵续。
? 所以 TransactionAttributeSourcePointcut#getTransactionAttributeSource
返回的是AnnotationTransactionAttributeSource
實(shí)例倔丈,AnnotationTransactionAttributeSource
繼承自AbstractFallbackTransactionAttributeSource
, 故TransactionAttributeSourcePointcut#matches
最終會(huì)調(diào)用到 AbstractFallbackTransactionAttributeSource#getTransactionAttribute
進(jìn)行解析判斷
事務(wù)執(zhí)行
? 事務(wù)切面加入到調(diào)用鏈中執(zhí)行后,具體看一下執(zhí)行時(shí)的流程状蜗。相關(guān)代碼在 org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction
中需五。
/**
* General delegate for around-advice-based subclasses, delegating to several other template
* methods on this class. Able to handle {@link CallbackPreferringPlatformTransactionManager}
* as well as regular {@link PlatformTransactionManager} implementations.
* @param method the Method being invoked
* @param targetClass the target class that we're invoking the method on
* @param invocation the callback to use for proceeding with the target invocation
* @return the return value of the method, if any
* @throws Throwable propagated from the target invocation
*/
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
//獲取事務(wù)屬性類 AnnotationTransactionAttributeSource,就是前面解析收集@Transactional獲取的
TransactionAttributeSource tas = getTransactionAttributeSource();
//獲取方法上面有@Transactional注解的屬性(具體從 getTransactionAttribute 一層一層找就好了)
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
//獲取事務(wù)管理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 創(chuàng)建事務(wù)
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 調(diào)用鏈的傳遞過程
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
//回滾
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
//提交
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
//聲明式事務(wù)不走這邊轧坎,略
...
}
}
? 創(chuàng)建事務(wù)顯然在 createTransactionIfNecessary(tm, txAttr, joinpointIdentification)
/**
* Create a transaction if necessary based on the given TransactionAttribute.
* <p>Allows callers to perform custom TransactionAttribute lookups through
* the TransactionAttributeSource.
* @param txAttr the TransactionAttribute (may be {@code null})
* @param joinpointIdentification the fully qualified method name
* (used for monitoring and logging purposes)
* @return a TransactionInfo object, whether or not a transaction was created.
* The {@code hasTransaction()} method on TransactionInfo can be used to
* tell if there was a transaction created.
* @see #getTransactionAttributeSource()
*/
@SuppressWarnings("serial")
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
//開啟事務(wù)!!!
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
//創(chuàng)建事務(wù)信息對(duì)象宏邮,記錄新老事務(wù)信息對(duì)象
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
? 進(jìn)入開啟事務(wù)的方法 tm.getTransaction(txAttr) ,具體位置在 org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
/**
* This implementation handles propagation behavior. Delegates to
* {@code doGetTransaction}, {@code isExistingTransaction}
* and {@code doBegin}.
* @see #doGetTransaction
* @see #isExistingTransaction
* @see #doBegin
*/
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
// DataSourceTransactionObject拿到對(duì)象
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
// 第一次進(jìn)來connectionHolder為空的缸血,所以不存在事務(wù)
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 第一次進(jìn)來大部分會(huì)走這里
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 先掛起
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 創(chuàng)建事務(wù)狀態(tài)對(duì)象蜜氨,其實(shí)就是封裝了事務(wù)對(duì)象的一些信息,記錄事務(wù)狀態(tài)的
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 開啟事務(wù),重點(diǎn)看看 DataSourceTransactionObject
doBegin(transaction, definition);
// 開啟事務(wù)后捎泻,改變事務(wù)狀態(tài)
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
? doGetTransaction()飒炎,源碼位置為 org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction
。
? ConnectionHolder 類中就封裝了數(shù)據(jù)庫的 connection 對(duì)象笆豁。
@Override
protected Object doGetTransaction() {
//管理connection對(duì)象郎汪,創(chuàng)建回滾點(diǎn),按照回滾點(diǎn)回滾闯狱,釋放回滾點(diǎn)
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
//DataSourceTransactionManager默認(rèn)是允許嵌套事務(wù)的(回滾點(diǎn)那種)
txObject.setSavepointAllowed(isNestedTransactionAllowed());
//obtainDataSource() 獲取數(shù)據(jù)源對(duì)象煞赢,其實(shí)就是數(shù)據(jù)庫連接對(duì)象
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
? 繼續(xù)跟著代碼走,事務(wù)是和數(shù)據(jù)庫連接綁定的哄孤,持有連接的對(duì)象是 ConnectionHolder照筑,所以看一下獲取ConnectionHolder 的過程 org.springframework.transaction.support.TransactionSynchronizationManager#getResource
。
private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");
/**
* Retrieve a resource for the given key that is bound to the current thread.
* @param key the key to check (usually the resource factory)
* @return a value bound to the current thread (usually the active
* resource object), or {@code null} if none
* @see ResourceTransactionManager#getResourceFactory()
*/
@Nullable
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
// 這邊
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}
/**
* Actually check the value of the resource that is bound for the given key.
*/
@Nullable
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
? 可以看到獲取連接的第一步是從 resources(resources 保存了連接池對(duì)象和連接對(duì)象的映射) 中拿的瘦陈,resources 是一個(gè) ThreadLocal 保證在連接不變的情況下(比如 PROPAGATION_REQUIRED )同一個(gè)線程執(zhí)行時(shí)得到的是同一個(gè)連接朦肘,事務(wù)不會(huì)亂。
? 顯然双饥,第一次事務(wù)執(zhí)行時(shí)這邊是獲取不到連接的媒抠。第一次的連接獲取在上述 getTransaction 方法的下半部分,當(dāng)?shù)谝粋€(gè)事務(wù)進(jìn)入時(shí)咏花,獲取不到連接對(duì)象趴生,isExistingTransaction(transaction)
校驗(yàn)不通過阀趴,執(zhí)行后續(xù)流程。
...
// 第一次進(jìn)來connectionHolder為空的苍匆,所以不存在事務(wù)
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// PROPAGATION_REQUIRED刘急、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED
// 主要是看這三種浸踩,所以看這邊
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 先掛起
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 創(chuàng)建事務(wù)狀態(tài)對(duì)象叔汁,其實(shí)就是封裝了事務(wù)對(duì)象的一些信息,記錄事務(wù)狀態(tài)的
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 開啟事務(wù)
doBegin(transaction, definition);
// 開啟事務(wù)后检碗,改變事務(wù)狀態(tài)(這塊沒什么用)
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
...
? 看一下開啟新事務(wù)的流程
/**
* This implementation sets the isolation level but ignores the timeout.
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
//如果沒有數(shù)據(jù)庫連接
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
//從連接池里面獲取連接
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
//把連接包裝成ConnectionHolder据块,然后設(shè)置到事務(wù)對(duì)象中
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
//從數(shù)據(jù)庫連接中獲取隔離級(jí)別
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
//關(guān)閉連接的自動(dòng)提交,和手動(dòng)使用事務(wù)時(shí)一樣
con.setAutoCommit(false);
}
//設(shè)置只讀事務(wù) 從這一點(diǎn)設(shè)置的時(shí)間點(diǎn)開始(時(shí)間點(diǎn)a)到這個(gè)事務(wù)結(jié)束的過程中折剃,其他事務(wù)所提交的數(shù)據(jù)另假,該事務(wù)將看不見!
//只讀事務(wù)內(nèi)沒有新增怕犁,修改边篮,刪除操作只有查詢操作,不需要數(shù)據(jù)庫鎖等操作奏甫,減少數(shù)據(jù)庫壓力
prepareTransactionalConnection(con, definition);
//自己提交關(guān)閉了戈轿,就說明已經(jīng)開啟事務(wù)了,事務(wù)是活的
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
//如果是新創(chuàng)建的事務(wù)阵子,則建立當(dāng)前線程和數(shù)據(jù)庫連接的關(guān)系
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
? 可以看到 doBegin 做的就是從連接池中獲取連接封裝成ConnectionHolder設(shè)置到事務(wù)對(duì)象中思杯,關(guān)閉連接的自動(dòng)提交,封裝連接池和連接的關(guān)系款筑。
? 如果當(dāng)前線程之前已經(jīng)執(zhí)行過事務(wù),ThreadLocal 中有當(dāng)前線程的 connection 對(duì)象腾么,那就會(huì)進(jìn)入handleExistingTransaction(definition, transaction, debugEnabled)
方法中奈梳。
/**
* Create a TransactionStatus for an existing transaction.
*/
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
//不允許有事務(wù),直接異常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
//以非事務(wù)方式執(zhí)行操作解虱,如果當(dāng)前存在事務(wù)攘须,就把當(dāng)前事務(wù)掛起
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
//掛起當(dāng)前事務(wù)
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
//修改事務(wù)狀態(tài)信息,把事務(wù)的一些信息存儲(chǔ)到當(dāng)前線程中殴泰,ThreadLocal中
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
//掛起
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
//默認(rèn)是可以嵌套事務(wù)的
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
//創(chuàng)建回滾點(diǎn)
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
/**
* Suspend the given transaction. Suspends transaction synchronization first,
* then delegates to the {@code doSuspend} template method.
* @param transaction the current transaction object
* (or {@code null} to just suspend active synchronizations, if any)
* @return an object that holds suspended resources
* (or {@code null} if neither transaction nor synchronization active)
* @see #doSuspend
* @see #resume
*/
@Nullable
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
//第一次進(jìn)來于宙,肯定為null的
if (transaction != null) {
//吧connectionHolder設(shè)置為空
suspendedResources = doSuspend(transaction);
}
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// Neither transaction nor synchronization active.
return null;
}
}
? 幾種不同的傳播屬性區(qū)別主要在這邊了。
? 可以看到悍汛,如果是默認(rèn)的 PROPAGATION_REQUIRED 捞魁,走不進(jìn)任何一個(gè)判斷,直接執(zhí)行最后的少量代碼离咐,把事務(wù)狀態(tài)標(biāo)記改了一下谱俭,從true 改為 false 標(biāo)記不是最新事務(wù)了奉件。這個(gè)標(biāo)記位的作用是,在事務(wù)提交的時(shí)候昆著,只有標(biāo)記 newTransaction 為 true 的事務(wù)才會(huì)執(zhí)行 commit 县貌。
org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction
中commitTransactionAfterReturning()
部分
/**
* General delegate for around-advice-based subclasses, delegating to several other template
* methods on this class. Able to handle {@link CallbackPreferringPlatformTransactionManager}
* as well as regular {@link PlatformTransactionManager} implementations.
* @param method the Method being invoked
* @param targetClass the target class that we're invoking the method on
* @param invocation the callback to use for proceeding with the target invocation
* @return the return value of the method, if any
* @throws Throwable propagated from the target invocation
*/
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
...略
//事務(wù)提交
commitTransactionAfterReturning(txInfo);
return retVal;
}
/**
* Execute after successful completion of call, but not after an exception was handled.
* Do nothing if we didn't create a transaction.
* @param txInfo information about the current transaction
*/
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
//進(jìn)入commit這里
//org.springframework.transaction.support.AbstractPlatformTransactionManager#commit
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
/**
* This implementation of commit handles participating in existing
* transactions and programmatic rollback requests.
* Delegates to {@code isRollbackOnly}, {@code doCommit}
* and {@code rollback}.
* @see org.springframework.transaction.TransactionStatus#isRollbackOnly()
* @see #doCommit
* @see #rollback
*/
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
// 這邊
processCommit(defStatus);
}
/**
* Process an actual commit.
* Rollback-only flags have already been checked and applied.
* @param status object representing the transaction
* @throws TransactionException in case of commit failure
*/
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
//如果都是PROPAGATION_REQUIRED,最外層的才會(huì)走進(jìn)來統(tǒng)一提交凑懂,如果是PROPAGATION_REQUIRES_NEW煤痕,每一個(gè)事務(wù)都會(huì)進(jìn)來
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
? 舉個(gè)栗子,一個(gè)加了@Transactional 方法A中又執(zhí)行了一個(gè)加了@Transactional 的方法B接谨,方法的事務(wù)傳播屬性均為 PROPAGATION_REQUIRED摆碉。則這些方法應(yīng)該都在同一個(gè)事務(wù)中,任意一個(gè)失敗整個(gè)事務(wù)都應(yīng)該回滾疤坝。顯然兆解,在執(zhí)行完B后是不能直接 commit的,需要等A的所有邏輯執(zhí)行完后跑揉,整個(gè)事務(wù)才算執(zhí)行完畢锅睛,這時(shí)事務(wù)才可以提交,中間任何部分產(chǎn)生異常历谍,全部回滾逾冬。
@Transactional
public void doA(){
doSomething();
doB();
doSomething();
}
@Transactional
public void doB(){
...
}
? 如果設(shè)置的傳播屬性為 PROPAGATION_REQUIRES_NEW,那么Spring 會(huì)掛起當(dāng)前事務(wù)刁品,即解除當(dāng)前連接的綁定關(guān)系傀蚌,重新執(zhí)行doBegin() 創(chuàng)建一個(gè)全新的事務(wù)對(duì)象(這個(gè)事務(wù)對(duì)象的newTransaction標(biāo)記為true,可以單獨(dú)提交)脱衙。在掛起舊事務(wù)的時(shí)候侥猬,會(huì)保存舊事物的連接對(duì)象,在PROPAGATION_REQUIRES_NEW的事務(wù)執(zhí)行完畢后重新把連接關(guān)系綁定回去捐韩。
? PROPAGATION_REQUIRES_NEW 的情況下退唠,執(zhí)行完doB()后,若在doSomethingAfter()時(shí)產(chǎn)生異常荤胁,則B正常提交瞧预,doA()回滾。
@Transactional
public void doA(){
doSomethingBefore();
doB();
doSomethingAfter();
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void doB(){
...
}
? 如果傳播屬性為PROPAGATION_NESTED仅政,那么每執(zhí)行一個(gè)方法垢油,會(huì)存一個(gè)savepoint,回滾的時(shí)候按回滾點(diǎn)回滾
/**
* Create a TransactionStatus for an existing transaction.
*/
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
...
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
//默認(rèn)是可以嵌套事務(wù)的
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
// 還是復(fù)用之前的事務(wù)對(duì)象
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
//創(chuàng)建回滾點(diǎn)
status.createAndHoldSavepoint();
return status;
}
...
}
? 這類方法在執(zhí)行時(shí)圆丹,如果成功走到commit步驟滩愁,則認(rèn)為已部分成功,清除回滾點(diǎn)辫封。走到后面的新方法時(shí)再次創(chuàng)建對(duì)應(yīng)的回滾點(diǎn)惊楼。
? 回滾的時(shí)候
org.springframework.transaction.interceptor.TransactionAspectSupport#completeTransactionAfterThrowing
/**
* General delegate for around-advice-based subclasses, delegating to several other template
* methods on this class. Able to handle {@link CallbackPreferringPlatformTransactionManager}
* as well as regular {@link PlatformTransactionManager} implementations.
* @param method the Method being invoked
* @param targetClass the target class that we're invoking the method on
* @param invocation the callback to use for proceeding with the target invocation
* @return the return value of the method, if any
* @throws Throwable propagated from the target invocation
*/
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
//獲取事務(wù)屬性類 AnnotationTransactionAttributeSource
TransactionAttributeSource tas = getTransactionAttributeSource();
//獲取方法上面有@Transactional注解的屬性
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
//獲取事務(wù)管理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 調(diào)用鏈的傳遞過程
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
//事務(wù)回滾
completeTransactionAfterThrowing(txInfo, ex);
//注意這邊會(huì)拋出異常
throw ex;
}
...略
}
}
/**
* Handle a throwable, completing the transaction.
* We may commit or roll back, depending on the configuration.
* @param txInfo information about the current transaction
* @param ex throwable encountered
*/
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 回滾
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
//org.springframework.transaction.support.AbstractPlatformTransactionManager#rollback
/**
* This implementation of rollback handles participating in existing
* transactions. Delegates to {@code doRollback} and
* {@code doSetRollbackOnly}.
* @see #doRollback
* @see #doSetRollbackOnly
*/
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}
/**
* Process an actual rollback.
* The completed flag has already been checked.
* @param status object representing the transaction
* @throws TransactionException in case of rollback failure
*/
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
triggerBeforeCompletion(status);
//按照嵌套事務(wù)按照回滾點(diǎn)回滾
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
...略
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}
}
? <font color = red>可以看到玖瘸,在內(nèi)層事務(wù)會(huì)拋出異常,當(dāng)內(nèi)層事務(wù)失敗回滾后檀咙,內(nèi)層事務(wù)還會(huì)把異常拋出雅倒,會(huì)被外層事務(wù)捕獲到,所以外層事務(wù)也會(huì)回滾弧可。若外層事務(wù)不想回滾蔑匣,則需自行編碼吞掉內(nèi)層事務(wù)拋出的異常!W厮小裁良!</font>
over
? 因?yàn)榇碇惖脑颍聞?wù)注解會(huì)有失效的情況校套,這邊不展開价脾。
? 想起來大二剛跟著網(wǎng)上的視頻學(xué)Spring入門的時(shí)候,一套事務(wù)直接給我整懵逼了哈哈哈〉殉祝現(xiàn)在跟著源碼走了一遍清楚多了侨把,也算多少有些長進(jìn)了。