看源碼大部分都要從開啟某種服務(wù)的注解上看嗅剖,找到源頭才能繼續(xù)。
所以看事務(wù)的源碼就在Application啟動類上找到注解@EnableTransactionManagement
本次整理是出于在實際應(yīng)用中出現(xiàn)的問題尔当,進行排查問題時順便總結(jié)的。有錯誤的地方請指正揍愁。
遇到的具體問題是這樣的:
有幾個要素:
1.系統(tǒng)中有全局的線程势就,使用切面匹配的所有service。
2.數(shù)據(jù)庫連接池最大活動數(shù)量是50系吭,最大超時時間是一分鐘五嫂。
3.系統(tǒng)中有一個全局連接池,最大活動的線程數(shù)量也是50
問題發(fā)生的現(xiàn)場
1.在線程池中放了50+的任務(wù)村斟,這些任務(wù)都配置了@Transactional(propagation = Propagation.REQUIRES_NEW)
2.切面會先創(chuàng)建50個事務(wù)贫导,此時獲取了50個數(shù)據(jù)庫連接。
3.在執(zhí)行方法時蟆盹,掃描到注解中配置了Propagation.REQUIRES_NEW孩灯,所以要將外部事務(wù)掛起,開啟新事務(wù)逾滥。
4.問題來了峰档,新事務(wù)在獲取數(shù)據(jù)庫連接時,可用資源不足寨昙,進行等待讥巡。
5.一分鐘后個別任務(wù)超時,其余任務(wù)有可能獲得連接繼續(xù)執(zhí)行舔哪。
解決方案
1.慎用Propagation.REQUIRES_NEW
2.合理配置線程池與數(shù)據(jù)庫連接池配置
3.做壓力測試
爆出的異常信息
maxWaitThreadCount 50 , current wait Thread count 50
下面是分享
EnableTransactionManagement中使用@Import注解可以將TransactionManagementConfigurationSelector實現(xiàn)的selectImports方法返回對象交給SpringIOC管理(后續(xù)再深究如何交給spring的)欢顷。默認(rèn)返回的是AutoProxyRegistrar與ProxyTransactionManagementConfiguration接下來關(guān)注這兩個類做了些什么事情。
AutoProxyRegistrar
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
boolean candidateFound = false;
//獲取啟動類注解
Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
for (String annType : annTypes) {
//拿到注解中的屬性
AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
if (candidate == null) {
continue;
}
Object mode = candidate.get("mode");
Object proxyTargetClass = candidate.get("proxyTargetClass");
if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
Boolean.class == proxyTargetClass.getClass()) {
//符合開啟事務(wù)的注解 進行標(biāo)記候選
candidateFound = true;
if (mode == AdviceMode.PROXY) {
AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
if ((Boolean) proxyTargetClass) {
AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
return;
}
}
}
}
一點log....
}
ProxyTransactionManagementConfiguration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)涉及到AOP的功能捉蚤,后續(xù)再進行分析吧抬驴。
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
//創(chuàng)建切面用的
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
//組裝對Transaction的增強,包括readOnly缆巧、timeout布持、rollbackFor、rollbackForClassName等
advisor.setTransactionAttributeSource(transactionAttributeSource());
advisor.setAdvice(transactionInterceptor());
if (this.enableTx != null) {
advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
}
return advisor;
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {
return new AnnotationTransactionAttributeSource();
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor() {
//事務(wù)處理的核心類
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource());
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}
經(jīng)過服務(wù)的啟動陕悬,spring自己將各種組件幫我們初始化好题暖,配置好。接下來看看在運行時它是如何管理事務(wù)的捉超。
TransactionInterceptor 事務(wù)攔截器
AOP可以使用自定義注解(切點)+interceptor(增強Advice)構(gòu)成織入(DefaultPointcutAdvisor)來實現(xiàn)胧卤。
TransactionInterceptor實現(xiàn)了MethodInterceptor中的invoke方法,所以當(dāng)代理對象執(zhí)行目標(biāo)方法時狂秦,會執(zhí)行invoke方法灌侣,在invoke方法中最重要的是invokeWithinTransaction。
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
//獲取注解參數(shù)
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
//確定事務(wù)管理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
//進行事務(wù)處理的方法全限定名 path.class.method
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
//創(chuàng)建數(shù)據(jù)庫鏈接裂问,獲取事務(wù)侧啼,修改AutoCommit為false,此處為處理數(shù)據(jù)庫資源的核心代碼堪簿,最重要的是tm.getTransaction方法
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
//進行接下來的調(diào)用痊乾,如沒啥意外會直接調(diào)用目標(biāo)方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
//AbstractPlatformTransactionManager 提交事務(wù),包括回滾
commitTransactionAfterReturning(txInfo);
return retVal;
}
略....
}
核心方法:AbstractPlatformTransactionManager.getTransaction 讓我們繼續(xù)深入
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
//獲取一個數(shù)據(jù)庫連接
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
//判斷這個鏈接是否已經(jīng)存在事務(wù)了椭更,如果存在事務(wù)則進行特殊處理哪审,這里要注意TransactionDefinition.PROPAGATION_REQUIRES_NEW的情況
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//核心方法
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
其中doBegin是實際執(zhí)行的方法,構(gòu)建了事務(wù)的具體實現(xiàn)虑瀑。需要注意的是txObject.getConnectionHolder().getConnection();方法湿滓,這個方法在資源不夠的情況下會循環(huán)不斷的去獲取數(shù)據(jù)庫連接滴须,導(dǎo)致程序超時。
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
//此處會去數(shù)據(jù)庫中去獲取連接叽奥,會不斷的去獲取
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
//關(guān)閉自動提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
//開啟事務(wù)設(shè)置參數(shù)
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
獲取完事務(wù)后扔水,會執(zhí)行retVal = invocation.proceedWithInvocation();進行后續(xù)的調(diào)用,最后調(diào)用commitTransactionAfterReturning(txInfo);進行事務(wù)提交或回滾的操作朝氓。