在Spring Data Redis提供了RedisTemplate對redis進(jìn)行讀寫操作并且支持事務(wù)蚕苇。
如果在同一線程(比如Web環(huán)境的一次請求中)中存在下面操作將會造成讀操作無法直接讀取出數(shù)據(jù)
1.先在非事務(wù)環(huán)境下執(zhí)行reids操作(調(diào)用沒有加@Transactional注解)
2.然后在事務(wù)環(huán)境下執(zhí)行redis操作(調(diào)用添加了@Transactional注解的方法)
可以從RedisTemplate源碼中找到原因
RedisTemplate中對Redis的各種數(shù)據(jù)類型的操作都抽象出了相對于的操作類 如 ValueOperations哩掺,ListOperations,SetOperations等涩笤,而這些類在執(zhí)行操作時最終還是會調(diào)用RedisTemplate的public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline)嚼吞,這個方法是RedisTemplate的操作Reids的核心方法
public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it");
Assert.notNull(action, "Callback object must not be null");
RedisConnectionFactory factory = getConnectionFactory();
RedisConnection conn = null;
try {
if (enableTransactionSupport) {
// only bind resources in case of potential transaction synchronization
//如果設(shè)置了啟用事務(wù),則調(diào)用bindConnection
conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport);
} else {
conn = RedisConnectionUtils.getConnection(factory);
}
boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
//預(yù)留鉤子函數(shù)可在執(zhí)行具體操作前對connection做一些處理
RedisConnection connToUse = preProcessConnection(conn, existingConnection);
boolean pipelineStatus = connToUse.isPipelined();
if (pipeline && !pipelineStatus) {
connToUse.openPipeline();
}
RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse));
T result = action.doInRedis(connToExpose);
// close pipeline
if (pipeline && !pipelineStatus) {
connToUse.closePipeline();
}
// TODO: any other connection processing?
//預(yù)留鉤子函數(shù)可在執(zhí)行具體操作后對connection做一些處理
return postProcessResult(result, connToUse, existingConnection);
} finally {
RedisConnectionUtils.releaseConnection(conn, factory);
}
}
可以看出這個方法是個模板方法蹬碧,實(shí)現(xiàn)了整個操作的流程
RedisConnectionUtils是獲取連接的工具類舱禽,在配置RedisTemplate是如果設(shè)置了enableTransactionSupport=true時,則會通過bindConnection方法獲取連接
//bindConnection調(diào)用了doGetConnection
public static RedisConnection bindConnection(RedisConnectionFactory factory, boolean enableTranactionSupport) {
return doGetConnection(factory, true, true, enableTranactionSupport);
}
public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
boolean enableTransactionSupport) {
Assert.notNull(factory, "No RedisConnectionFactory specified");
//從當(dāng)前線程中獲取連接
RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);
if (connHolder != null) {
if (enableTransactionSupport) {
//開啟reids事務(wù)
potentiallyRegisterTransactionSynchronisation(connHolder, factory);
}
return connHolder.getConnection();
}
if (!allowCreate) {
throw new IllegalArgumentException("No connection found and allowCreate = false");
}
if (log.isDebugEnabled()) {
log.debug("Opening RedisConnection");
}
//如果當(dāng)前線程中不存在連接則創(chuàng)建連接
RedisConnection conn = factory.getConnection();
if (bind) {
RedisConnection connectionToBind = conn;
//如果開啟的事務(wù)且調(diào)用添加了@Transactional的方法恩沽,這里會創(chuàng)建一個連接的代理對象
if (enableTransactionSupport && isActualNonReadonlyTransactionActive()) {
connectionToBind = createConnectionProxy(conn, factory);
}
connHolder = new RedisConnectionHolder(connectionToBind);
//綁定連接到當(dāng)前線程中
TransactionSynchronizationManager.bindResource(factory, connHolder);
if (enableTransactionSupport) {
//開啟reids事務(wù)
potentiallyRegisterTransactionSynchronisation(connHolder, factory);
}
return connHolder.getConnection();
}
return conn;
}
//開啟reids事務(wù)
private static void potentiallyRegisterTransactionSynchronisation(RedisConnectionHolder connHolder,
final RedisConnectionFactory factory) {
if (isActualNonReadonlyTransactionActive()) {
if (!connHolder.isTransactionSyncronisationActive()) {
connHolder.setTransactionSyncronisationActive(true);
RedisConnection conn = connHolder.getConnection();
conn.multi();
//注冊一個事務(wù)完成時的回調(diào)誊稚,用于提交或回滾redis事務(wù)
TransactionSynchronizationManager.registerSynchronization(new RedisTransactionSynchronizer(connHolder, conn,
factory));
}
}
}
上面代碼可以看出獲取連接的整個流程
- TransactionSynchronizationManager.getResource(factory)(從當(dāng)前線程中獲取連接,TransactionSynchronizationManager使用ThreadLocal把連接綁定到當(dāng)前線程上罗心。
- 如果獲取到連接則開啟事務(wù)里伯,返回連接,如果沒有獲取到則創(chuàng)建連接
- 創(chuàng)建完連接后會判斷當(dāng)前操作是否在事務(wù)中isActualNonReadonlyTransactionActive (是否添加了@Transactional注解渤闷,并且事務(wù)不是ReadOnly的)
- 如果操作實(shí)在事務(wù)中疾瓮,則會創(chuàng)建一個連接的代理對象
- TransactionSynchronizationManager.bindResource(factory, connHolder); 綁定事務(wù)到當(dāng)前線程中
- potentiallyRegisterTransactionSynchronisation(connHolder, factory); 開啟redis事務(wù)
- 返回連接
從上面流程可以看出在事務(wù)中執(zhí)行和不在事務(wù)中執(zhí)行的關(guān)鍵區(qū)別在于,是否創(chuàng)建了一個連接的代理對象肤晓,下面看一下createConnectionProxy的代碼
//創(chuàng)建了一個ConnectionSplittingInterceptor類用于攔截RedisConnection所有方法
private static RedisConnection createConnectionProxy(RedisConnection connection, RedisConnectionFactory factory) {
ProxyFactory proxyFactory = new ProxyFactory(connection);
proxyFactory.addAdvice(new ConnectionSplittingInterceptor(factory));
return RedisConnection.class.cast(proxyFactory.getProxy());
}
上面代碼中創(chuàng)建了一個ConnectionSplittingInterceptor類用于攔截RedisConnection中的所有方法爷贫,ConnectionSplittingInterceptor中的核心代碼是intecepter方法
@Override
public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
RedisCommand commandToExecute = RedisCommand.failsafeCommandLookup(method.getName());
//判斷命令是否為只讀命令,如果是則新開一個連接執(zhí)行度操作补憾,如果是寫命令則放在事務(wù)中執(zhí)行
if (isPotentiallyThreadBoundCommand(commandToExecute)) {
if (log.isDebugEnabled()) {
log.debug(String.format("Invoke '%s' on bound conneciton", method.getName()));
}
return invoke(method, obj, args);
}
if (log.isDebugEnabled()) {
log.debug(String.format("Invoke '%s' on unbound conneciton", method.getName()));
}
RedisConnection connection = factory.getConnection();
try {
return invoke(method, connection, args);
} finally {
// properly close the unbound connection after executing command
if (!connection.isClosed()) {
connection.close();
}
}
}
intecepter方法中會判斷這次執(zhí)行的命令是否是讀命令漫萄。如果不是,會用當(dāng)前線程中的連接執(zhí)行也就是放在事務(wù)中執(zhí)行盈匾,如果是讀操作腾务,會創(chuàng)建一個新的連接執(zhí)行,這樣就能立即獲得讀取的數(shù)據(jù)削饵。
通過代碼可以看出出錯的大致流程:
- 調(diào)用沒有使用事務(wù)的reids操作
- 創(chuàng)建一個連接并綁定到當(dāng)前線程中(由于沒有使用事務(wù)岩瘦,不會創(chuàng)建連接的代理對象)
- 執(zhí)行reids操作 (操作完成后并沒有把當(dāng)前線程中的連接清除)
- 調(diào)用使用事務(wù)的redis操作(方法上添加了@Transactional注解)
- 獲取連接方向當(dāng)前線程中已經(jīng)存在了連接不再重新創(chuàng)建(獲取到的是沒有使用事務(wù)時創(chuàng)建的連接,此連接對象不是代理對象)
- 開啟事務(wù)
- 執(zhí)行操作(如果執(zhí)行的是讀操作窿撬,由于連接對象不是代理對象启昧,讀操作并不會重新創(chuàng)建一個連接,而是使用當(dāng)前連接劈伴,并且放在事務(wù)中運(yùn)行密末,因此讀操作并不會立即執(zhí)行而是等到事務(wù)提交時才能執(zhí)行,導(dǎo)致讀操作讀取的結(jié)果為null)
解決方案:
此問題關(guān)鍵在于如果執(zhí)行了為使用事務(wù)的reids操作跛璧,在操作完成后要將當(dāng)前線程中綁定的連接對象給清除掉严里,或者在使用的事務(wù)的reids操作之前,判斷獲取到的連接是否是代理對象追城,如果不是則清除掉刹碾,重新獲取連接。在RedisTemplate的execute方法中我們看到了 reids為我們預(yù)留了兩個鉤子函數(shù)座柱,
preProcessConnection(conn, existingConnection) 和 postProcessResult(result, connToUse, existingConnection) 因此我們可以繼承RedisTemplate來對連接進(jìn)行處理
public class CustomRedisTemplate<K, V> extends RedisTemplate<K, V> {
private boolean enableTransactionSupport = false;
private static boolean isActualNonReadonlyTransactionActive() {
return TransactionSynchronizationManager.isActualTransactionActive()
&& !TransactionSynchronizationManager.isCurrentTransactionReadOnly();
}
/**
* 解決 redis先非事務(wù)中運(yùn)行迷帜,然后又在事務(wù)中運(yùn)行,出現(xiàn)取到的連接還是非事務(wù)連接的問題
* 在事務(wù)環(huán)境中用非事務(wù)連接色洞,讀取操作無法馬上讀出數(shù)據(jù)
*
* @param connection
* @param existingConnection
* @return
*/
@Override
protected RedisConnection preProcessConnection(RedisConnection connection, boolean existingConnection) {
if (existingConnection && !Proxy.isProxyClass(connection.getClass()) && isActualNonReadonlyTransactionActive()) {
RedisConnectionUtils.unbindConnection(getConnectionFactory());
List<TransactionSynchronization> list = new ArrayList<>(TransactionSynchronizationManager.getSynchronizations());
TransactionSynchronizationManager.clearSynchronization();
TransactionSynchronizationManager.initSynchronization();
//移除最后一個回調(diào)(由于之前回去連接是會注冊一個事務(wù)回調(diào)瞬矩,下面如果再獲取連接會導(dǎo)致注冊兩個事務(wù)回調(diào)。事務(wù)完成后會執(zhí)行兩次回調(diào)锋玲,
// 回調(diào)中會清除資源景用,第一次已經(jīng)清除,第二次再清的時候回拋出異常)
list.remove(list.size() - 1);
list.forEach(TransactionSynchronizationManager::registerSynchronization);
connection = RedisConnectionUtils.bindConnection(getConnectionFactory(), enableTransactionSupport);
}
return connection;
}
@Override
public void setEnableTransactionSupport(boolean enableTransactionSupport) {
super.setEnableTransactionSupport(enableTransactionSupport);
this.enableTransactionSupport = enableTransactionSupport;
}
}