問(wèn)題描述:
軟件環(huán)境:Spring Boot版本: 2.0.2.RELEASE、MyBatis版本: 3.4.6、Druid版本: 1.1.8籍嘹、: mysql-connector-java-5.1.46
Spring Boot + MyBatis + Druid 整合中使用批處理的 SqlSessionTemplate 當(dāng)在Service層拋出RuntimeException(或子類(lèi))時(shí),
Druid代理的連接類(lèi)com.alibaba.druid.pool.DruidPooledConnection的rollback并不會(huì)調(diào)用目標(biāo)數(shù)據(jù)庫(kù)連接(比如MySql的數(shù)據(jù)庫(kù)連接)rollback方法,
而Druid代理的DruidPooledConnection的close方法的邏輯并不是關(guān)閉實(shí)際的數(shù)據(jù)庫(kù)連接,而是將其引用計(jì)數(shù)器減一,然后將連接重新添加到空閑連接池中,
這個(gè)時(shí)候因?yàn)闆](méi)有調(diào)用rollback, 當(dāng)其它線(xiàn)程獲取到這個(gè)連接的時(shí)候不是一個(gè)純凈的Connection,它里面有上次未提交的sql語(yǔ)句,如果此時(shí)提交將會(huì)把上次回滾的sql和當(dāng)前操作的sql一起提交,這個(gè)操作很危險(xiǎn)的。
定位問(wèn)題,分析過(guò)程如下:
MyBatis默認(rèn)的SqlSession是SIMPLE, 當(dāng)配置了:
@Bean
public SqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
//return new SqlSessionTemplate(sqlSessionFactory); // 不使用批處理 SqlSession, 推薦用這種方式
return new SqlSessionTemplate(sqlSessionFactory, ExecutorType.BATCH); // 使用批處理 SqlSession, 不推薦用這種方式, 如果主鍵自增, 批處理方式無(wú)法獲取到自增的id
}
這段代碼后, 內(nèi)部使用的 Executor 是 org.apache.ibatis.executor.BatchExecutor, 而不是默認(rèn)的 org.apache.ibatis.executor.SimpleExecutor
對(duì)數(shù)據(jù)庫(kù)進(jìn)行DML之后, 最終都會(huì)轉(zhuǎn)到 BatchExecutor 或 SimpleExecutor 的 doUpdate(MappedStatement ms, Object parameterObject) 方法 [注: 根據(jù)配置選取是BatchExecutor還是SimpleExecutor]
1余蟹、先看能正呈芳颍回滾數(shù)據(jù)的情況,也就是 SimpleExecutor, 它的doUpdate(MappedStatement ms, Object parameterObject) 方法, 內(nèi)部包含如下代碼:
public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
// 省略其它代碼
int count = handler.update(stmt); // handler 的具體類(lèi)型是: org.apache.ibatis.executor.statement.RoutingStatementHandler , 我們跟蹤進(jìn)入這個(gè)方法
// 省略其它代碼
}
RoutingStatementHandler.java 的 update 方法如下:
public int update(Statement statement) throws SQLException {
return delegate.update(statement); // delegate 的具體類(lèi)型是: org.apache.ibatis.executor.statement.PreparedStatementHandler , 我們跟蹤進(jìn)入這個(gè)方法
}
PreparedStatementHandler.java 的 update 方法如下:
@Override
public int update(Statement statement) throws SQLException {
PreparedStatement ps = (PreparedStatement) statement;
// ps.execute() 這行代碼是關(guān)鍵: ps的具體實(shí)現(xiàn)類(lèi)是: com.alibaba.druid.pool.DruidPooledPreparedStatement , 進(jìn)入這個(gè)方法
ps.execute();
// 省略其它代碼
}
DruidPooledPreparedStatement.java 的 execute() 方法如下:
@Override
public boolean execute() throws SQLException {
// 省略其它代碼
transactionRecord(sql); // 這行代碼是關(guān)鍵, 跟進(jìn)去
// 省略其它代碼
}
DruidPooledPreparedStatement.java 的 transactionRecord 方法如下:
protected void transactionRecord(String sql) throws SQLException {
// 觸發(fā)Connection創(chuàng)建事務(wù)
conn.transactionRecord(sql); // conn 的具體實(shí)現(xiàn)類(lèi)是: com.alibaba.druid.pool.DruidPooledConnection , 跟進(jìn)去查看這個(gè)方法
}
DruidPooledConnection.java
protected void transactionRecord(String sql) throws SQLException {
// 這個(gè)非空判斷是關(guān)鍵: 實(shí)際調(diào)試的結(jié)果是進(jìn)入了這個(gè)判斷, 也就是 new 了一個(gè) TransactionInfo
if (transactionInfo == null && (!conn.getAutoCommit())) {
DruidAbstractDataSource dataSource = holder.getDataSource();
dataSource.incrementStartTransactionCount();
transactionInfo = new TransactionInfo(dataSource.createTransactionId()); // 進(jìn)入了這里, 創(chuàng)建了 TransactionInfo
}
// 省略其它代碼
}
以上流程結(jié)束后, 當(dāng)Service拋出RuntimeException 時(shí)需要回滾事務(wù),進(jìn)入 DruidPooledConnection 的 rollback 方法,
public void rollback() throws SQLException {
// 此時(shí) transactionInfo 和 holder 都不為空, 沒(méi)有進(jìn)入下面的兩個(gè)中斷判斷中, 事務(wù)正程海回滾
if (transactionInfo == null) { // 中斷判斷, SimpleExecutor 沒(méi)有進(jìn)入此判斷
return;
}
if (holder == null) { // 中斷判斷, SimpleExecutor 沒(méi)有進(jìn)入此判斷
return;
}
// 省略其它代碼
conn.rollback(); // 這里調(diào)用實(shí)際的數(shù)據(jù)庫(kù)連接回滾事務(wù), 事務(wù)正趁姿撸回滾
// 省略其它代碼
}
SimpleExecutor 分析完畢
2菱蔬、再看 BatchExecutor 不能正常回滾數(shù)據(jù)的情況,
BatchExecutor.java 的 update 方法如下:
public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
// 節(jié)省篇幅省略了部分代碼
final Statement stmt;
if (sql.equals(currentSql) && ms.equals(currentStatement)) {
// 省略其它代碼
} else { // 進(jìn)入了 esle 分支
Connection connection = getConnection(ms.getStatementLog());
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt); //fix Issues 322
currentSql = sql;
currentStatement = ms;
statementList.add(stmt); // 這里把 Statement 存在集合里面了, 后面一定會(huì)有清除集合的代碼, 否則將導(dǎo)致內(nèi)存泄漏
batchResultList.add(new BatchResult(ms, sql, parameterObject));
}
handler.batch(stmt); // handler 的具體實(shí)現(xiàn)類(lèi)型是: org.apache.ibatis.executor.statement.RoutingStatementHandler , 進(jìn)入這個(gè)方法
return BATCH_UPDATE_RETURN_VALUE;
}
RoutingStatementHandler.java 的 batch 方法如下:
public void batch(Statement statement) throws SQLException {
delegate.batch(statement); // delegate 的具體實(shí)現(xiàn)類(lèi)是: org.apache.ibatis.executor.statement.PreparedStatementHandler , 進(jìn)入這個(gè)方法
}
PreparedStatementHandler.java 的 batch 方法如下:
public void batch(Statement statement) throws SQLException {
PreparedStatement ps = (PreparedStatement) statement;
ps.addBatch();// ps 的實(shí)現(xiàn)類(lèi)是: com.alibaba.druid.pool.DruidPooledPreparedStatement , 進(jìn)入這個(gè)方法
}
DruidPooledPreparedStatement.java 的 addBatch 方法如下:
public void addBatch() throws SQLException {
// 省略其它代碼
// 特別注意: 這里的addBatch()和 SimpleExecutor 最終調(diào)用 PreparedStatement 的 execute() 方法不同, 這個(gè)沒(méi)有創(chuàng)建 TransactionInfo
// 當(dāng)以上流程結(jié)束后, 當(dāng)Service拋出RuntimeException 時(shí)需要回滾事務(wù),進(jìn)入 DruidPooledConnection 的 rollback 方法,
stmt.addBatch();
}
DruidPooledConnection.java 的 rollback 方法如下:
public void rollback() throws SQLException {
// 此時(shí) transactionInfo 為空, 進(jìn)入下面的中斷判斷中, 從而導(dǎo)致后面的 conn.rollback(); 沒(méi)有執(zhí)行
if (transactionInfo == null) { // 中斷判斷, BatchExecutor 進(jìn)入了此判斷, 從而導(dǎo)致后面的 conn.rollback(); 沒(méi)有執(zhí)行
return;
}
if (holder == null) {
return;
}
// 省略其它代碼
conn.rollback(); // 使用 BatchExecutor 時(shí)不能執(zhí)行此操作
// 省略其它代碼
}
至此: 我們提出的問(wèn)題產(chǎn)生的原因分析完畢.
嘗試解決方案如下(在如下兩個(gè)類(lèi)新增了兩處代碼):
org.apache.ibatis.executor.BatchExecutor.java 修改如下:
public class BatchExecutor extends BaseExecutor {
... ...
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException { // 為 true 是回滾數(shù)據(jù), line: 111
try {
List<BatchResult> results = new ArrayList<BatchResult>();
if (isRollback) {
// 調(diào)試后新增: 2017-07-02 ======================> 開(kāi)始 ==================================================================>
// 批處理方式需要清除加入的 Statement 的
if(statementList!=null) {
// 缺一個(gè)清除 Statement 中的批處理腳本的邏輯
for(Statement stmt : statementList) {
closeStatement(stmt);
// 清除批處理緩存
stmt.clearBatch();
}
}
// 清除批處理的緩存信息
currentSql = null;
statementList.clear();
batchResultList.clear();
// Connection conn = transaction.getConnection(); // 最好不要再這里關(guān)閉數(shù)據(jù)庫(kù)連接
// if(conn!=null) {
// conn.close();
// }
// 調(diào)試后新增: 2017-07-02 ======================>結(jié)束 ==================================================================>
// 返回空集合
return Collections.emptyList();
}
}
... ...
}
com.alibaba.druid.pool.DruidPooledPreparedStatement.java 修改如下:
public class DruidPooledPreparedStatement extends DruidPooledStatement implements PreparedStatement {
... ...
@Override
public void addBatch() throws SQLException { // line: 549
checkOpen();
try {
// 調(diào)試后新增: 2017-07-02 ======================> 開(kāi)始 ==================================================================>
if(null == conn.getTransactionInfo()) {
conn.createTransactionInfo(); // 創(chuàng)建 TransactionInfo , 當(dāng)執(zhí)行 DruidPooledConnection 的 rollback 時(shí)不會(huì)進(jìn)入中斷方法
}
// 調(diào)試后新增: 2017-07-02 ======================>結(jié)束 ==================================================================>
stmt.addBatch();
} catch (Throwable t) {
throw checkException(t);
}
}
... ...
}
后記:
整體來(lái)看,是因?yàn)镈ruidPooledPreparedStatement.java 的 TransactionInfo 沒(méi)有在 addBatch() 方法中創(chuàng)建,而是延遲到了 executeBatch() 方法調(diào)用時(shí)才創(chuàng)建,
而 BaseExecutor.java 當(dāng)出現(xiàn)異常時(shí)不執(zhí)行DruidPooledPreparedStatement的executeBatch()方法,進(jìn)而引發(fā)事務(wù)沒(méi)有被回滾。