最近在看源碼發(fā)現(xiàn)一個(gè)問(wèn)題,在看到DefaultSqlSession這個(gè)類的源碼的時(shí)候,發(fā)現(xiàn)這個(gè)類上有一句注釋,
Note that this class is not Thread-Safe意思說(shuō)說(shuō),此類不是線程安全的,及既然不是線程安全的,怎么還是默認(rèn)實(shí)現(xiàn)那
接下來(lái),我們就一起從源碼的角度分析一下,我們寫一個(gè)小案例,然后通過(guò)案例一起分析下,這里我們以查詢?yōu)橹?代碼很簡(jiǎn)單,就是一個(gè)簡(jiǎn)單的查詢,我們定義了一個(gè)線程,通過(guò)countDownLauntch讓他們同時(shí)請(qǐng)求,我們先執(zhí)行下,看看結(jié)果
@RunWith(SpringRunner.class)
@SpringBootTest
public class DefaultSqlSessionTest {
private static final int COUNT = 10;
private static CountDownLatch count = new CountDownLatch(COUNT);
private SqlSession sqlSession;
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Before
public void init(){
//這里的sqlSession是DefaultSqlSession中的sqlSession
sqlSession = sqlSessionFactory.openSession();
}
@After
public void destory(){
//直接調(diào)用DefaultSqlSession,一定記得手動(dòng)關(guān)閉下sqlSession
sqlSession.close();
}
@Test
public void defaultSqlSessionSafeTest() throws InterruptedException {
for (int i = 0;i<10;i++){
new Thread(() ->{
try {
count.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
getAccount2();
}).start();
count.countDown();
}
Thread.sleep(5000);
}
private void getAccount1() {
sqlSession.select("selectByPrimaryKey", 1,resultContext ->{
RyxAccount ryxAccount = (RyxAccount)resultContext.getResultObject();
System.out.println(ryxAccount);
});
}
private void getAccount2(){
sqlSession.selectList("selectByPrimaryKey",1);
}
執(zhí)行完之后,我們看到的是,報(bào)錯(cuò)了,報(bào)的是一個(gè)強(qiáng)轉(zhuǎn)異常,怎嘛會(huì)報(bào)這個(gè)強(qiáng)轉(zhuǎn)異常嘞,我們器跟著源碼,看看,根據(jù)打印的堆棧信息,我們進(jìn)入到源碼的DefaultSqlSession這個(gè)類一探究竟
當(dāng)我們執(zhí)行查詢的時(shí)候,會(huì)調(diào)用DefaultSqlSession類下的selectList這個(gè)方法,我們接著往下看
@Override
public <E> List<E> selectList(String statement, Object parameter) {
return this.selectList(statement, parameter, RowBounds.DEFAULT);
}
@Override
public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
try {
//獲取執(zhí)行的sql語(yǔ)句
MappedStatement ms = configuration.getMappedStatement(statement);
//執(zhí)行查詢
return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error querying database. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
進(jìn)入到CacheExecuter類下的 query方法
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
//獲取執(zhí)行的sql語(yǔ)句
BoundSql boundSql = ms.getBoundSql(parameterObject);
//創(chuàng)建緩存,注意,這個(gè)地方,調(diào)用的BaseExecuter中的createCacheKey方法
CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
這個(gè)方法具體就是將當(dāng)前的sql語(yǔ)句,等一些類信息,按照指定規(guī)則拼裝成一個(gè)key,然后返回,具體就不再分析了
@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
CacheKey cacheKey = new CacheKey();
cacheKey.update(ms.getId());
cacheKey.update(rowBounds.getOffset());
cacheKey.update(rowBounds.getLimit());
cacheKey.update(boundSql.getSql());
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
// mimic DefaultParameterHandler logic
for (ParameterMapping parameterMapping : parameterMappings) {
if (parameterMapping.getMode() != ParameterMode.OUT) {
Object value;
String propertyName = parameterMapping.getProperty();
if (boundSql.hasAdditionalParameter(propertyName)) {
value = boundSql.getAdditionalParameter(propertyName);
} else if (parameterObject == null) {
value = null;
} else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
value = parameterObject;
} else {
MetaObject metaObject = configuration.newMetaObject(parameterObject);
value = metaObject.getValue(propertyName);
}
cacheKey.update(value);
}
}
if (configuration.getEnvironment() != null) {
// issue #176
cacheKey.update(configuration.getEnvironment().getId());
}
return cacheKey;
}
接下來(lái),我們拿到了一個(gè)這個(gè)key,接著往下看,delegate.<E> query這個(gè)方法,是真正的查詢加添加到緩存中的方法實(shí)現(xiàn),這段代碼比較簡(jiǎn)單,就不做分析了,直接進(jìn)入到下一個(gè)方法,BaseExecuter.query
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
throws SQLException {
Cache cache = ms.getCache();
if (cache != null) {
flushCacheIfRequired(ms);
if (ms.isUseCache() && resultHandler == null) {
ensureNoOutParams(ms, parameterObject, boundSql);
@SuppressWarnings("unchecked")
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
list = delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
return delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
BaseExecuter類
@SuppressWarnings("unchecked")
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
queryStack++;
//這是一個(gè)三元運(yùn)算符,resultHandler 是否為空,如果為空,就去緩存中取內(nèi)容,否則設(shè)置為null,
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
if (queryStack == 0) {
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
// issue #601
deferredLoads.clear();
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// issue #482
clearLocalCache();
}
}
return list;
}
2.1
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
localCache.putObject(key, EXECUTION_PLACEHOLDER);
try {
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
localCache.removeObject(key);
}
localCache.putObject(key, list);
if (ms.getStatementType() == StatementType.CALLABLE) {
localOutputParameterCache.putObject(key, parameter);
}
return list;
}
我們重點(diǎn)分析下,一個(gè)三元代碼 list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
試想一個(gè)場(chǎng)景,當(dāng)兩個(gè)線程現(xiàn)在開始執(zhí)行查詢賬戶的業(yè)務(wù),線程T_SQL_1和T_SQL_2
1:T_SQL_1先拿到線程執(zhí)行權(quán),會(huì)先調(diào)用createCacheKey,如果沒有,則會(huì)創(chuàng)建這個(gè)key,此時(shí)假如是第一次查詢,localCache.getObject(key)中還不存在key,則list為null
2:執(zhí)行queryFromDatabase(見2.1)方法,會(huì)在這里先給key添加一個(gè)默認(rèn)占位符EXECUTION_PLACEHOLDER
3:然后在這個(gè)時(shí)候,T_SQL_2獲得了線程執(zhí)行權(quán),調(diào)用上面的localCache.getObject(key),獲得value:EXECUTION_PLACEHOLDER
4:localCache.getObject(key)此時(shí)就不是null了,然后程序開始轉(zhuǎn)換啊,就會(huì)變成如下代碼,我們模擬下這個(gè)解析過(guò)程,看個(gè)demo
如圖,簡(jiǎn)單模擬了下,如下過(guò)程,得到就是強(qiáng)轉(zhuǎn)異常,說(shuō)明問(wèn)題就是出現(xiàn)在這里,由于線程爭(zhēng)奪資源的問(wèn)題,這里拿到的key其實(shí)是占位符
而不是具體從數(shù)據(jù)庫(kù)查詢出來(lái)的值,謎底終于解開了,原來(lái)問(wèn)題出現(xiàn)在這里,
我們繼續(xù)研究這句代碼list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
resultHandler 這個(gè)參數(shù),如果不為空,也就意味著,一級(jí)緩存也就失效了,也就不用去緩存中取找了,所以當(dāng)你使用流式查詢的時(shí)候,是不會(huì)出現(xiàn)這個(gè)問(wèn)題的,因?yàn)榫筒粫?huì)走緩存,都是查詢數(shù)據(jù)庫(kù),不存在緩存的問(wèn)題
最終的結(jié)論是,我們最好不要自己輕易使用DefaultSqlseesion直接去調(diào)用查詢sql,很容易因?yàn)椴l(fā)問(wèn)題導(dǎo)致轉(zhuǎn)換異常
當(dāng)然,既然mybatis的源碼大神們?cè)缍贾肋@個(gè)DefaultSqlSession這個(gè)類線程安全的問(wèn)題,肯定要處理啊,我們接下來(lái)看看他們是怎么處理的,我們看源碼中sqlSession接口實(shí)現(xiàn)類中看到了一共有三個(gè)實(shí)現(xiàn)類如圖
分別是
1:DefaultSqlSession(已分析)
2:SqlSessionManager(mybatis處理DefaultSqlSession的線程安全管理類)
3:SqlSessionTemplete(spring框架處理mybatis的線程安全的處理框架)
我們先分析下SqlSessionManager,看一下這個(gè)類,我們截取一段代碼
我們又看到了熟悉的jdk代理技術(shù),當(dāng)調(diào)用SqlSessioManager的查詢語(yǔ)句的時(shí)候,會(huì)先調(diào)用SqlSessionInterceptor
這里翻譯為攔截器很恰當(dāng),我們看到,會(huì)去ThreadLocal中獲取sqSession,獲取不到,就去創(chuàng)建一個(gè)DefaultSqSession對(duì)象
這樣的話,相當(dāng)于每個(gè)線程持有自己的DefaultSqlSession對(duì)象,所以,當(dāng)不同的線程訪問(wèn)的時(shí)候,一級(jí)緩存也就失效了,
public class SqlSessionManager implements SqlSessionFactory, SqlSession {
private final SqlSessionFactory sqlSessionFactory;
private final SqlSession sqlSessionProxy;
private final ThreadLocal<SqlSession> localSqlSession = new ThreadLocal<SqlSession>();
private SqlSessionManager(SqlSessionFactory sqlSessionFactory) {
this.sqlSessionFactory = sqlSessionFactory;
this.sqlSessionProxy = (SqlSession) Proxy.newProxyInstance(
SqlSessionFactory.class.getClassLoader(),
new Class[]{SqlSession.class},
new SqlSessionInterceptor());
}
public static SqlSessionManager newInstance(Reader reader) {
return new SqlSessionManager(new SqlSessionFactoryBuilder().build(reader, null, null));
}
.....省略
private class SqlSessionInterceptor implements InvocationHandler {
public SqlSessionInterceptor() {
// Prevent Synthetic Access
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//去ThreadLocal中獲取sqlSesison,如果獲取不到,
final SqlSession sqlSession = SqlSessionManager.this.localSqlSession.get();
if (sqlSession != null) {
try {
return method.invoke(sqlSession, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
} else {
//當(dāng)qlSession為null,調(diào)用openSession方法,然后去調(diào)用sqlSessionFactory.openSession()
//創(chuàng)建一個(gè)DefaultSqlSession的對(duì)象,
final SqlSession autoSqlSession = openSession();
try {
final Object result = method.invoke(autoSqlSession, args);
autoSqlSession.commit();
return result;
} catch (Throwable t) {
autoSqlSession.rollback();
throw ExceptionUtil.unwrapThrowable(t);
} finally {
autoSqlSession.close();
}
}
}
}
SqlSessionFactory
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
Transaction tx = null;
try {
final Environment environment = configuration.getEnvironment();
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
final Executor executor = configuration.newExecutor(tx, execType);
return new DefaultSqlSession(configuration, executor, autoCommit);
} catch (Exception e) {
closeTransaction(tx); // may have fetched a connection so lets call close()
throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
分析了這么多,我們來(lái)個(gè)案例,來(lái)驗(yàn)證下SqlSessionManager,案例很簡(jiǎn)單,就不做分析了,我改了下源碼,打印了日志,
@RunWith(SpringRunner.class)
@SpringBootTest
public class DefaultSqlSessionManagerTest {
private static final int COUNT_THREAD = 10;
private static CountDownLatch count = new CountDownLatch(COUNT_THREAD);
private SqlSessionManager sqlSessionManager;
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Before
public void init(){
sqlSessionManager = SqlSessionManager.newInstance(sqlSessionFactory);
}
@Test
public void sqlSessionManagerTest() throws InterruptedException {
for (int i = 0;i<COUNT_THREAD;i++){
new Thread(() ->{
try {
count.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
getAccount1();
}).start();
count.countDown();
}
Thread.sleep(5000);
}
private void getAccount1() {
sqlSessionManager.startManagedSession();
sqlSessionManager.
selectList("selectByPrimaryKey",1);
}
}
這是一部分日志,可以看到每個(gè)線程生成了新的Slqsession,所以也就保證了線程安全
SqlSessionManager可以允許我們將sqlSession設(shè)置到ThreadLoacl中,這樣也可以保證DefaultSqlSession線程安全
具體就是添加一句如下代碼sqlSessionManager.startManagedSession();這樣,我們就為每個(gè)線程分配了一個(gè)SqlSession并存儲(chǔ)到
ThreadLocal中,這樣也是一樣的效果,通過(guò)ThreadLocal,get方法會(huì)獲取到具體的sqlSession對(duì)象,但是這里有個(gè)問(wèn)題,由于這個(gè)ThradLocal是私有的,set完之后,在關(guān)閉后,清除ThreadLocal中的內(nèi)容實(shí)在關(guān)閉sqlSession后,就是在這里
@Override
public void close() {
final SqlSession sqlSession = localSqlSession.get();
if (sqlSession == null) {
throw new SqlSessionException("Error: Cannot close. No managed session is started.");
}
try {
sqlSession.close();
} finally {
//直接將ThreadLocal中的當(dāng)前線程變量sqlSession設(shè)置為null
localSqlSession.set(null);
}
}
ok,SqlSessionManager就分析到這里,代碼還是比較簡(jiǎn)單的,就到這里,下一期,我們一起看下spring到底是怎樣保證defaultSqlSession線程安全的,
Thanks!
更多博客,請(qǐng)移步到博主技術(shù)博客https://renyuanxin.top