簡書 慢黑八
轉(zhuǎn)載請(qǐng)注明原創(chuàng)出處键兜,謝謝岖赋!
如果讀完覺得有收獲的話,歡迎點(diǎn)贊加關(guān)注
背景
由于某項(xiàng)目獨(dú)特的特色需要手動(dòng)開啟事務(wù)鞭莽。然而坊秸,在手動(dòng)開啟事務(wù)后,事務(wù)能否正常結(jié)束 commit or rollback
就出現(xiàn)了各式各樣的不確定情況澎怒。如果commit or rollback
未執(zhí)行或執(zhí)行失敗褒搔,將會(huì)導(dǎo)致該事務(wù)持有的數(shù)據(jù)庫連接無法正常歸還到連接池中。高并發(fā)場(chǎng)景下的現(xiàn)象就是連接池中的可用連接越來越少喷面,最后導(dǎo)致獲取連接超時(shí)
的異常星瘾。
以下為手動(dòng)事務(wù)工具類
@Service
public class TransactionTool {
//spring注入事務(wù)管理對(duì)象
@Resource(name = "transactionManager")
private PlatformTransactionManager transManager ;
public TransactionStatus getTransSatus(int propagate) {
// TransactionStatus.
// TransactionDefinition
// 事務(wù)定義
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// 傳播范圍
def.setPropagationBehavior(propagate);
TransactionStatus transactionStatus = transManager.getTransaction(def);
return transactionStatus;
}
}
下面是開啟事務(wù)的業(yè)務(wù)處理邏輯
@Service
public class BizService{
@Autowired
TransactionTool transactionTool;
public void bizMethod(){
//以下代碼手動(dòng)開啟事務(wù)
TransactionStatus transactionStatus = null;
try{
transactionStatus = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
// ..業(yè)務(wù)邏輯
transactionManager.commit(transactionStatus);
}catch(Exception e){
transactionManager.rollback(transactionStatus);
}finally{
//略掉一些分庫分表的特殊處理
}
}
}
主要導(dǎo)致事務(wù)沒有正常結(jié)束的三種場(chǎng)景
- 場(chǎng)景 1、處理
業(yè)務(wù)邏輯
時(shí)惧辈,拋出的是Error
而不是Exception
琳状,catch
接不住,導(dǎo)致rollback
不能正常執(zhí)行盒齿,這也意味著事務(wù)無法正衬畛眩回滾困食,造成連接泄露。 - 場(chǎng)景 2翎承、處理
業(yè)務(wù)邏輯
時(shí)硕盹,未執(zhí)行到commit
就return
了,這樣也會(huì)導(dǎo)致了該事務(wù)沒有正常結(jié)束叨咖,connection
沒有正常歸還連接池,造成泄露瘩例。 - 場(chǎng)景3、同一個(gè)方法中事務(wù)雙開甸各,雙關(guān)垛贤,按照以下順序執(zhí)行
開啟事務(wù)1(requires_new
)-> 然后開事務(wù)2(requires_new
) -> 之后提交事務(wù)1(commit
) -> 在提交事務(wù)2(commit
)
事務(wù)上下文狀態(tài)切換如下:
TS=TransactionStatus ???? TE=TransactionEvent ??? ? T=Transaction
步驟 | 事務(wù)操作 | TransactionSynchronizationManager | 掛起\執(zhí)行 |
---|---|---|---|
1 | TS1=getTransaction(REQUIRES_NEW) publish TE1 |
T1(con1)、TE1 | 掛起 NULL |
2 | TS2=getTransaction(REQUIRES_NEW) publish TE2 |
T2(con2)痴晦、TE2 | 掛起T1南吮,TE1 |
3 | commit(TS1) | TE2執(zhí)行琳彩,同步器清理T2 解掛步驟1掛起的null事務(wù)資源 |
執(zhí)行T1.commit成功 con1歸還連接池 |
4 | commit(TS2) | 當(dāng)前事務(wù)資源為null導(dǎo)致同步器 事件處理出現(xiàn)異常誊酌,導(dǎo)致con2 不能正常歸還到連接池,造成 連接泄露 |
執(zhí)行 T2.commit失敗 con2泄露 |
在開啟事務(wù)1的時(shí)候掛起的事務(wù)資源為空露乏,在commit
事務(wù)1的之后碧浊,會(huì)解掛當(dāng)前線程的事務(wù)資源為:null
,提交事務(wù)2時(shí)候瘟仿,如果當(dāng)前線程的事務(wù)資源為null
箱锐,會(huì)拋空指針異常,最后在解綁資源unbindResource()
的時(shí)候拋出以下代碼塊中的IllegalStateException
異常(遺憾的是劳较,該異常被spring框架捕獲后沒有打印出來)驹止。最終導(dǎo)致事務(wù)2持有的連接不能正常釋放。TransactionEvent
會(huì)在事務(wù)結(jié)束的時(shí)候執(zhí)行當(dāng)前TransactionSynchronizationManager
線程本地變量中的synchronizations
事件观蜗。
public static Object unbindResource(Object key) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doUnbindResource(actualKey);
if (value == null) {
throw new IllegalStateException(
"No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
return value;
}
以上3中情況臊恋,在自動(dòng)事務(wù)@Transactional的處理邏輯中都不會(huì)出現(xiàn)。首先spring-tx都進(jìn)行了統(tǒng)一封裝充分考慮了非正常的可以墓捻。其次抖仅,在嵌套事務(wù)雙開的時(shí)候,都是先開的事務(wù)后關(guān)砖第。所以撤卢,手動(dòng)事務(wù)一定要遵循先開的事務(wù)后關(guān)這個(gè)原則
。
監(jiān)控解決未關(guān)閉事務(wù)的幾個(gè)思路
-
思路1:采用spring的
ApplicationEventPublisher
的事件發(fā)布監(jiān)聽機(jī)制梧兼。
訂閱@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
事務(wù)完成階段的監(jiān)聽放吩,對(duì)“一定時(shí)間內(nèi)”未關(guān)閉的事件進(jìn)行預(yù)警,發(fā)現(xiàn)后整改羽杰。 -
思路2:在finally中對(duì)事務(wù)進(jìn)行統(tǒng)一關(guān)閉渡紫。
調(diào)整catch
的范圍瞭稼,從Exception
修改為Throwable
捕捉到所有Exception
或者Error
的情況,把commit移動(dòng)到finally中腻惠。commit的前置條件是transactionStatus!=null&&transactionStatus.isNewTransaction() && !transactionStatus.isCompleted()
环肘,這樣會(huì)對(duì)所有 新建的且未完成的 事務(wù)進(jìn)行commit
。如果小伙伴覺得思路2改動(dòng)方式比較激進(jìn)集灌,想暫時(shí)先觀察一下那些服務(wù)存在 事務(wù)未正常結(jié)束 的情況悔雹,可以參考思路3。
@Service
public class BizService{
@Autowired
TransactionTool transactionTool;
public void bizMethod(){
//以下代碼手動(dòng)開啟事務(wù)
TransactionStatus transactionStatus = null;
try{
transactionStatus = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
// ..業(yè)務(wù)邏輯
}catch(Throwable t){
transactionManager.rollback(transactionStatus);
}finally{
//try..catch內(nèi)容可提煉成公共方法
try {
if (transactionStatus != null && transactionStatus.isNewTransaction()
&& !transactionStatus.isCompleted()) {
//TODO: arms日志輸出 堆棧相關(guān)信息
transactionManager.commit(transactionStatus);
}
} catch (Exception e) {
e.printStackTrace();
}
//略掉一些分庫分表的特殊處理
}
}
}
-
思路3: 在finally中檢查未完成的事物并進(jìn)行預(yù)警欣喧。
預(yù)警的前提條件是transactionStatus!=null&&transactionStatus.isNewTransaction() && !transactionStatus.isCompleted()
腌零,這樣會(huì)對(duì)所有 新建的且未完成的 事務(wù)進(jìn)行預(yù)警日志信息輸出。該思路在finally中增加try..catch塊進(jìn)行檢查唆阿,對(duì)應(yīng)用程序改動(dòng)影響較小益涧。
需要注意的是:這種方式仍然監(jiān)控不到上文中場(chǎng)景3連接泄露的問題,如果想解決場(chǎng)景3的問題驯鳖,需要從TransactionStatus
中獲取事務(wù)對(duì)象闲询,抽取ConnectionHolder
中的數(shù)據(jù)庫Connection
,用conn.isClosed()
來判斷連接是否已經(jīng)關(guān)閉浅辙。另外還需要修改DataSourceTransactionManager
源碼扭弧,把內(nèi)部類DataSourceTransactionObject
的訪問修飾符從private
修改為public
。
參考如下代碼:
@Service
public class BizService{
@Autowired
TransactionTool transactionTool;
public void bizMethod(){
//以下代碼手動(dòng)開啟事務(wù)
TransactionStatus transactionStatus = null;
try{
transactionStatus = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
// ..業(yè)務(wù)邏輯
transactionManager.commit(transactionStatus);
}catch(Throwable t){
transactionManager.rollback(transactionStatus);
}finally{
//try..catch內(nèi)容可提煉成公共方法
try {
if (transactionStatus != null && transactionStatus.isNewTransaction()) {
if(!transactionStatus.isCompleted()) {
// arms日志輸出 堆棧相關(guān)信息
System.out.println("事務(wù)未結(jié)束原因[事務(wù)-未完成]");
printStackTrace(Thread.currentThread().getStackTrace());
}else {
Connection conn = null;
DefaultTransactionStatus defaultTransactionStatus = (DefaultTransactionStatus)transactionStatus;
if(defaultTransactionStatus.getTransaction().getClass().getClassLoader() == DataSourceTransactionObject.class.getClassLoader()) {
conn = ((DataSourceTransactionObject)defaultTransactionStatus.getTransaction()).getConnectionHolder().getConnection();
if(conn != null && conn.isClosed()==false) {
System.out.println("事務(wù)未結(jié)束原因[連接-未關(guān)閉]");
printStackTrace(Thread.currentThread().getStackTrace());
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
//略掉一些分庫分表的特殊處理
}
}
}
接下來說下上面三種思路的可行性
[X ] 思路1记舆,不可行
[ok] 思路2鸽捻,可行
[ok] 思路3為過渡監(jiān)控性的解決方案,可行
[ok] 思路2+思路3為最終解決方案泽腮,可行
思路1中御蒲,基于spring事件的發(fā)布訂閱模式會(huì)存在什么問題?
使用spring的ApplicationEventPublisher
的事件發(fā)布監(jiān)聽機(jī)制诊赊。
訂閱@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
事務(wù)完成階段**的監(jiān)聽厚满,對(duì)“一定時(shí)間內(nèi)”未關(guān)閉的事件進(jìn)行預(yù)警,發(fā)現(xiàn)后整改豪筝。
1痰滋、改造TransactionTool
在執(zhí)行getTransSatus
方法時(shí)調(diào)用publishTransactionEvent(transactionStatus , propagate)
發(fā)布包含transactionId 的 "新事務(wù)事件" ,然后把需要監(jiān)控的事務(wù)事件存放在aliveTransactionMap
中 续崖。
@Service
public class TransactionTool {
private AtomicLong transactionId = new AtomicLong(0);
// transcatioId,BizTransactionEvent 存儲(chǔ)存活的事務(wù)事件
public static ConcurrentHashMap<String, BizTransactionEvent> aliveTransactionMap =
new ConcurrentHashMap<String, BizTransactionEvent>();
//spring注入事務(wù)管理對(duì)象
@Resource(name = "transactionManager")
private PlatformTransactionManager transManager ;
@Autowired
private ApplicationEventPublisher publisher;
public TransactionStatus getTransSatus(int propagate) {
// TransactionStatus.
// TransactionDefinition
// 事務(wù)定義
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// 傳播范圍
def.setPropagationBehavior(propagate);
TransactionStatus transactionStatus = transManager.getTransaction(def);
// 增加事務(wù)監(jiān)聽
publishTransactionEvent(transactionStatus , propagate);
return transactionStatus;
}
public void publishEvent(long tid,int propagate) {
long temp = tid;
StackTraceElement[] stackTraceElementArray = Thread.currentThread().getStackTrace();
if(stackTraceElementArray.length>2) {
if(transactionId.longValue() == Long.MAX_VALUE) {
transactionId.compareAndSet(Long.MAX_VALUE, 0);
}
BizTransactionEvent bizTransactionEvent = new BizTransactionEvent();
bizTransactionEvent.setTransactionId(""+temp);
bizTransactionEvent.setTransactionName(stackTraceElementArray[3].getClassName()+":"
+stackTraceElementArray[3].getMethodName()+":"+stackTraceElementArray[3].getLineNumber());
bizTransactionEvent.setCurrentTimeMillis(System.currentTimeMillis());
bizTransactionEvent.setStackTraceElement(stackTraceElementArray);
bizTransactionEvent.setPropagate(propagate);
System.out.println("[NEWTX"+bizTransactionEvent.getTransactionId()+"]"+bizTransactionEvent.toString());
publisher.publishEvent(bizTransactionEvent);
//在這里處理新建的事務(wù)操作敲街,可以放入一個(gè)map中
TransactionTool.aliveTransactionMap.put(bizTransactionEvent.getTransactionId(), bizTransactionEvent);
}
}
}
2、增加事物事件類1BizTransactionEvent
严望,事務(wù)監(jiān)聽類BizTransactionEventListener
多艇,通過事務(wù)commit時(shí)候,同步調(diào)用標(biāo)有注解@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
的afterCompletion
方法把aliveTransactionMap
中transactionId對(duì)應(yīng)的事務(wù)事件刪掉像吻。
事務(wù)事件監(jiān)聽類
@Component
public class BizTransactionEventListener {
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
public void afterCompletion(PayloadApplicationEvent<BizTransactionEvent> event) {
System.out.println("[NEWTX" + event.getPayload().getTransactionId() + "-REMOVE] " + event.toString()
+ "Duration:" + (System.currentTimeMillis() - event.getPayload().getCurrentTimeMillis()) + "ms");
TransactionTool.aliveTransactionMap.remove(event.getPayload().getTransactionId());
}
}
事務(wù)事件類
public class BizTransactionEvent {
private static final int STACK_TRACE_ELEMENT_DEEP = 4;
private String transactionId;
private String transactionName;
private StackTraceElement[] stackTraceElement;
private long currentTimeMillis;
private int propagate;
public String getTransactionName() {
return transactionName;
}
public void setTransactionName(String transactionName) {
this.transactionName = transactionName;
}
public int getPropagate() {
return propagate;
}
public void setPropagate(int propagate) {
this.propagate = propagate;
}
public String getTransactionId() {
return transactionId;
}
public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}
public long getCurrentTimeMillis() {
return currentTimeMillis;
}
public void setCurrentTimeMillis(long currentTimeMillis) {
this.currentTimeMillis = currentTimeMillis;
}
public StackTraceElement[] getStackTraceElement() {
return stackTraceElement;
}
public void setStackTraceElement(StackTraceElement[] stackTraceElement) {
this.stackTraceElement = stackTraceElement;
}
@Override
public String toString() {
return "BizTransactionEvent [transactionId=" + transactionId + ", transactionName=" + transactionName
+ ", stackTraceElement=" + Arrays.toString(Arrays.copyOf(stackTraceElement, STACK_TRACE_ELEMENT_DEEP))
+ ", currentTimeMillis=" + currentTimeMillis + ", propagate=" + propagate + "]";
}
}
3峻黍、我們可以通過監(jiān)控aliveTransactionMap
中的事務(wù)事件存活時(shí)間來尋找發(fā)現(xiàn)事務(wù)未關(guān)閉的業(yè)務(wù)代碼复隆。
代碼略...
4、我們看下以下邏輯中問題出在哪:
@Service
public class BizService{
@Autowired
TransactionTool transactionTool;
@Transactional
public void bizMethod(){
//以下代碼手動(dòng)開啟事務(wù)
TransactionStatus transactionStatus1 = null;
TransactionStatus transactionStatus2 = null;
try{
transactionStatus1 = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
// ..業(yè)務(wù)邏輯
//transactionManager.commit(transactionStatus1);
}catch(Exception){
transactionManager.rollback(transactionStatus1);
}finally{
//略掉一些分庫分表的特殊處理
}
try{
transactionStatus2 = TransactionTool.getTransaction(DefaultTransactionDefinition.PROPAGATION_REQUIRES_NEW);
// ..業(yè)務(wù)邏輯
transactionManager.commit(transactionStatus2);
}catch(Exception){
transactionManager.rollback(transactionStatus2);
}finally{
//略掉一些分庫分表的特殊處理
}
}
}
事務(wù)上下文狀態(tài)切換如下:
TS=TransactionStatus ???? TE=TransactionEvent ??? ? T=Transaction
步驟 | 事務(wù)操作 | TransactionSynchronizationManager | 掛起\執(zhí)行 |
---|---|---|---|
1 | @Transactional TS0=getTransaction(REQUIRESD) | T0(con0) | 掛起 NULL |
2 | TS1=getTransaction(REQUIRES_NEW) publish TE1 |
T1(con1)姆涩、TE1 | 掛起T0 |
3 | commit(TS1)被注掉了挽拂,不執(zhí)行 | . | con1連接泄露 |
4 | TS2=getTransaction(REQUIRES_NEW) publish TE2 |
T2(con1)、TE2 | 掛起T1骨饿、TE1 |
5 | commit(TS2) | TE2執(zhí)行亏栈,同步器清理T2 解掛步驟4的T1、TE1 |
執(zhí)行T2.commit成功 con2歸還連接池 |
6 | commit(TS0) | TE1執(zhí)行宏赘,同步器清理T1 解掛步驟2的T0 |
執(zhí)行 T0.commit成功 con0歸還連接池 |
這種方式的最大問題在于绒北,程序執(zhí)行完成后,當(dāng)前線程在事務(wù)同步器中仍存在解掛的事務(wù)資源(T0)察署,并且事務(wù)commit(TS1)沒有執(zhí)行闷游,TE1卻被正常執(zhí)行了,同時(shí)aliveTransactionMap中的TE1被移除了贴汪,失去了后續(xù)的監(jiān)控基礎(chǔ)脐往。
所以對(duì)于手動(dòng)事務(wù)來說,思路1比較失敗
文末彩蛋:簡述手動(dòng)Spring事務(wù)處理邏輯
spring-tx嘶是、spring-jdbc中比較重要的四個(gè)關(guān)鍵處理類:
-
AbstractPlatformTransactionManager
:事務(wù)核心處理類钙勃,開啟事務(wù)蛛碌,掛起/恢復(fù)聂喇,釋放資源等功能 -
DataSourceTransactionManager
:數(shù)據(jù)庫操作都有這個(gè)類來完成,例如:setAutoCommit蔚携,commit希太,rollback -
TransactionSynchronizationManager
:這里的TransactionSynchronizationManager都是以線程為單位來記錄相關(guān)的資源息。resources中記錄了酝蜒,key為datasource誊辉,value為ConnectionHolder的map結(jié)構(gòu)信息。上文中publisher.publishEvent(bizTransactionEvent)會(huì)把事務(wù)事件到synchronizations中亡脑,后續(xù)事務(wù)在提交的時(shí)候會(huì)執(zhí)行synchronizations中的事件堕澄。 -
DefaultTransactionStatus
:存放當(dāng)前事務(wù),掛起的事務(wù)資源霉咨,事務(wù)定義等內(nèi)容蛙紫。
自動(dòng)事務(wù)cglib代理可參考TransactionAspectSupport
類
在事務(wù)處理的過程中參考如下步驟,偷個(gè)懶不畫時(shí)序圖了途戒,大家按照序號(hào)坑傅,腦補(bǔ)一下
[package:spring-tx]AbstractPlatformTransactionManager
1、首先調(diào)用getTransaction()方法喷斋,獲取連接唁毒,獲取當(dāng)前事務(wù)狀態(tài)
4蒜茴、調(diào)用handleExistingTransaction()處理已存在的事務(wù)
- 如果是
REQUIRES_NEW
就要掛起當(dāng)前存在事務(wù)、創(chuàng)建新事務(wù)把掛起的事務(wù)資源放入新事務(wù)中浆西,并且切換TransactionSynchronizationManager的本地線程變量為新事務(wù)相關(guān)內(nèi)容粉私,解綁當(dāng)前事務(wù)資源。 - 如果是
NESTED
則需要?jiǎng)?chuàng)建保存點(diǎn) - 如果是
REQUIRED
近零,創(chuàng)建新把newTransaction設(shè)定為false毡鉴。
5、掛起資源SuspendedResourcesHolder結(jié)構(gòu)與TransactionSynchronizationManager相同秒赤,用于解掛時(shí)恢復(fù)TransactionSynchronizationManager
中的本地線程變量猪瞬。
7、調(diào)用prepareSynchronization方法入篮,初始化當(dāng)前線程的事務(wù)同步管理器陈瘦,設(shè)置Threadlocal相關(guān)內(nèi)容,并反回新的TransactionStatus對(duì)象潮售。
以下為事務(wù)提交后的操作
8痊项、調(diào)用commit方法提交事務(wù)。這里會(huì)調(diào)用processCommit方法酥诽,在這個(gè)方法中會(huì)調(diào)用事務(wù)事件監(jiān)聽邏輯
鞍泉。通過ApplicationListenerMethodTransactionalAdapter處理各個(gè)不同階段的transactionEvent,需要注意的是待處理的transactionEvent是從TransactionSynchronizationManager.getSynchronizations()
當(dāng)前的本地線程變量中獲取的肮帐。
9咖驮、cleanupAfterCompletion設(shè)置事務(wù)狀態(tài)為完成,清理當(dāng)前線程TransactionSynchronizationManager資源训枢,解綁connection資源托修,設(shè)置autocommit=true。還原connection屬性恒界,回并且把連接歸還給連接池睦刃。
10、調(diào)用resume()方法還原掛起的資源十酣,繼續(xù)執(zhí)行涩拙。
[package:spring-jdbc]DataSourceTransactionManager
2、調(diào)用doGetTransaction() 獲取事務(wù)對(duì)象DataSourceTransactionObject
3耸采、檢索綁定到當(dāng)前線程(TransactionSynchronizationManager)的資源(ConnectionHolder)兴泥,把ConnectionHolder放入DataSourceTransactionObject中
6、調(diào)用dobegin開啟事務(wù)con.setAutoCommit(false);并且修改transactionActive為true洋幻。如果連接資源為空則獲取新的連接郁轻,并且在TransactionSynchronizationManager進(jìn)行資源綁定。
8.1、調(diào)用doCommit提交事務(wù)