問題描述
在redis集成到spring后吹零, 使用了redis的訂閱與發(fā)布的功能晰房,在每次發(fā)布的時候, 會出現(xiàn)瘋狂日志報錯窍荧。 由于該問題比較隱蔽辉巡,在這邊記錄并提供解決辦法。
集成說明
- spring 版本: 4.1.4
- redis客戶端: jedis 2.7.2
- 連接池druid
使用application.xml配置redis連接池等的信息蕊退。
對jedis的簡單封裝
在項目中郊楣, 我對jedis客戶端做了簡單的封裝, 封裝代碼如下:
@Repository("redisClientTemplate")
public class RedisClientTemplate {
@Autowired
private ShardedJedisPool shardedJedisPool;
@Autowired
private JedisPool jedisPool;
/**
*
* @param ec
* @return
* @throws IllegalArgumentException when ec is null
* @throws RuntimeException
*/
public <T>T execute(Executor<T> ec, Class<? extends JedisCommands> jedisClass){
Assert.notNull(ec, "the redis executor is null");
JedisCommands jedisCommands = null;
try {
if (jedisClass == Jedis.class) {
jedisCommands = jedisPool.getResource();
} else {
jedisCommands = shardedJedisPool.getResource();
}
return ec.execute(jedisCommands);
}catch (Exception e){
throw new RuntimeException(e);
}finally {
if (jedisCommands!=null && jedisCommands instanceof Closeable){
try {
((Closeable)jedisCommands).close();
} catch (IOException e) {
// log
}
}
}
}
}
/**
* 具體的執(zhí)行邏輯
*/
public abstract class Executor<T> {
T execute(JedisCommands jedisCommands){
if (jedisCommands instanceof Jedis){
return doExecute((Jedis)jedisCommands);
}else if (jedisCommands instanceof ShardedJedis){
return doExecute((ShardedJedis)jedisCommands);
}else {
return doExecute((ShardedJedis)jedisCommands);
}
}
protected T doExecute(ShardedJedis jedisCommands) {
return null;
}
protected T doExecute(Jedis jedisCommands) {
return null;
}
}
在這里說明一下瓤荔, 由于jedis存在jedis和shardedJedis, 2種api不一樣输硝, 并且應(yīng)用場景也不一樣今瀑, 比如對于key的模糊搜索ShardedJedis此操作。
使用example:
redisClientTemplate.execute(ExecutorUtils.addSet(key, value), ShardedJedis.class);
在ExecutorUtils
public static Executor<Boolean> addSet(final String key, final String... value){
if (key == null || key.equals("")) {
return null; // todo illegalArgument
}
return new Executor<Boolean>() {
@Override
public Boolean doExecute(ShardedJedis shardedJedis) {
Long i = shardedJedis.sadd(key, value); // todo test
return true;
}
};
}
jedis的api 使用起來很簡單橘荠, 在項目中哥童, 使用了redis的訂閱發(fā)布功能, 如何使用呢, 請看下面代碼
@Component
@Slf4j
public class SubscriberDemo extends JedisPubSub {
Gson gson = new Gson();
@Autowired
RedisClientTemplate redisClientTemplate;
@Override
public void onMessage(String channel, String message) {
doSomethingWithDatabase();
}
@PostConstruct
public void init (){
new Thread(new Runnable() {
@Override
public void run() {
redisClientTemplate.execute(new Executor<Object>() {
@Override
protected Object doExecute(Jedis jedisCommands) {
jedisCommands.subscribe(SubscriberDemo.this,"channel");
return null;
}
}, Jedis.class);
}
},"threadName").start();
}
@PreDestroy
public void destroy(){
this.unsubscribe("channel");
}
}
問題詳情
在使用jedis時,我并沒有加入destroy方法毛仪, 導(dǎo)致每次生產(chǎn)發(fā)布后芯勘, 在日志中都會出現(xiàn)大量druid獲取的連接已關(guān)閉的錯誤, 然而我發(fā)現(xiàn)服務(wù)器上的業(yè)務(wù)時正常運(yùn)行的衡怀。
是什么原因?qū)е逻@個問題的產(chǎn)生呢抛杨?
經(jīng)過一定的校驗我發(fā)現(xiàn), 在發(fā)布生產(chǎn)時玉罐,數(shù)據(jù)源會被關(guān)閉吊输, 然而redis客戶端還在訂閱服務(wù)端的chennel,此時服務(wù)器又啟動透硝,在這個類初始化之前, 繼而調(diào)用init函數(shù),此時又會有一個redis客戶端訂閱該channe
簡單畫一個圖解:
解決辦法
解決辦法正如我上面寫的, 寫一個destry方法浴井, 在該對象銷毀之前洪囤, 取消訂閱channel即可。 這樣始終保持每次發(fā)布訂閱一個channel锦溪, 結(jié)束銷毀一個channel牺丙。
其他注意點(diǎn):
還會發(fā)現(xiàn), 我們這邊在訂閱的時候?qū)懥藗€線程肖揣,原因是因為redis訂閱是同步的, 其實我們通過jedis源碼很容易看到, 在內(nèi)部是通過一個do while循環(huán)來監(jiān)聽消息。所以我們將該訂閱交給一個線程執(zhí)行一屋, 使得應(yīng)用能夠正常被啟動诽嘉。
如何提高消息的處理能力呢?
很簡單,我們可以在onMessage中加入線程池來處理消息。
spring-data-redis
spring-data-redis為我們在jedis做了更好的封裝, 使得我們對redis客戶端能夠很簡單的應(yīng)用宙帝, 下面簡單介紹以下spring-data-redis的發(fā)布與訂閱功能步脓。
其中我們只需要進(jìn)行相關(guān)的bean的配置要出, 無需關(guān)心channel的訂閱
@Configuration
public class RedisSubListenerConfig {
final ExecutorService executorService = Executors.newFixedThreadPool(20);
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter service) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(dataWarning, new PatternTopic(TOPIC));
return container;
}
@Bean
MessageListenerAdapter dataAdapter(Service service) {
MessageListenerAdapter adapter = new MessageListenerAdapter(service, "doSomething");
adapter.setSerializer(new FastJsonRedisSerializer<>(Message.class));
return adapter;
}
}