Redis 不僅提供一個(gè)NoSQL數(shù)據(jù)庫燕耿,同時(shí)提供了一套消息系統(tǒng),在開發(fā)過程中憔晒,應(yīng)用場(chǎng)景非常多创千,根據(jù)不同的業(yè)務(wù)需求,可以實(shí)現(xiàn)相應(yīng)的功能皆串。
我的業(yè)務(wù)需求是淹办,在分布式系統(tǒng)中,當(dāng)其中一個(gè)節(jié)點(diǎn)產(chǎn)生一條消息時(shí)恶复,需要同時(shí)通知其他節(jié)點(diǎn)做相應(yīng)的處理怜森。
相關(guān)類的解釋:
RedisMessageListenerContainer
Redis訂閱發(fā)布的監(jiān)聽容器,通過Redis的消息發(fā)布谤牡、訂閱配置都在這里面實(shí)現(xiàn)
- addMessageListener(MessageListenerAdapter副硅,PatternTopic) 新增訂閱頻道及訂閱者,訂閱者必須有相關(guān)方法處理收到的消息
- setTopicSerializer(RedisSerializer) 對(duì)頻道內(nèi)容進(jìn)行序列化解析
MessageListenerAdapter
監(jiān)聽適配器
- MessageListenerAdapter(Object , defaultListenerMethod) 創(chuàng)建監(jiān)聽適配器翅萤,綁定訂閱接收器和接收消息的方法
RedisTemplate
Redis模版類
- convertAndSend(String channel, Object message) 發(fā)布者向Redis發(fā)布消息
一恐疲、新增依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
二、Redis連接信息配置
spring:
redis:
host: 127.0.0.1
port: 6381
### Redis數(shù)據(jù)庫索引(默認(rèn)為0)
database: 0
### 連接超時(shí)時(shí)間(毫秒)
timeout: 60000ms
password:
lettuce:
pool:
### 最大連接數(shù)(使用負(fù)值表示沒有限制) 默認(rèn)8
max-active: 8
### 最小空閑連接 默認(rèn)8
min-idle: 0
### 連接池中的最大空閑連接 默認(rèn)8
max-idle: 8
### 連接池最大阻塞等待時(shí)間(使用負(fù)值表示沒有限制)
max-wait: -1ms
三断序、RedisConfig核心類流纹,實(shí)現(xiàn)了Redis連接,訂閱以及發(fā)布配置
@Configuration
public class RedisConfig {
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
redisTemplate.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
redisTemplate.setHashKeySerializer(stringRedisSerializer);
// valuevalue采用jackson序列化方式
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value采用jackson序列化方式
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
//開啟事務(wù)支持
redisTemplate.setEnableTransactionSupport(true);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
/**
* Redis消息監(jiān)聽器容器
* 可以添加多個(gè)監(jiān)聽不同話題的redis監(jiān)聽器违诗,只需要把消息監(jiān)聽器和相應(yīng)的消息訂閱處理器綁定漱凝,
* 該消息監(jiān)聽器通過反射技術(shù)調(diào)用消息訂閱處理器的相關(guān)方法進(jìn)行一些業(yè)務(wù)處理
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(connectionFactory);
/**
* 加入消息監(jiān)聽器(可以加入多個(gè)主題監(jiān)聽器,監(jiān)聽器也可以監(jiān)聽多個(gè)主題)
**/
// 加入WebSocket監(jiān)聽器
final String TOPIC_NAME = "TEST_TOPIC"; // 訂閱主題
MessageListenerAdapter webSocketListenerAdapter = webSocketListenerAdapter();
redisMessageListenerContainer.addMessageListener(webSocketListenerAdapter, new PatternTopic(TOPIC_NAME));
/**
* 設(shè)置序列化對(duì)象
* 特別注意:1. 發(fā)布的時(shí)候需要設(shè)置序列化诸迟;訂閱方也需要設(shè)置序列化
* 2. 設(shè)置序列化對(duì)象必須放在[加入消息監(jiān)聽器]這一步后面茸炒,否則會(huì)導(dǎo)致接收器接收不到消息
*/
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
redisMessageListenerContainer.setTopicSerializer(seria);
return redisMessageListenerContainer;
}
/**
* 綁定WebSocket消息推送接收器和接收方法
*/
@Bean
public MessageListenerAdapter webSocketListenerAdapter() {
WebSocketReceiver webSocketReceiver = new WebSocketReceiver(); // 消息接收器
final String RECEIVE_MESSAGE_METHOD = "receiveMessage"; // 消息接收器的方法名稱
return new MessageListenerAdapter(webSocketReceiver, RECEIVE_MESSAGE_METHOD);
}
}
四、封裝消息對(duì)象
@Data
public class MessageDTO implements Serializable {
private String type;
private String title;
private String content;
}
五阵苇、消息接收器
public class WebSocketReceiver {
/**
* 接收WebSocket推送的消息并處理
* @param message
*/
public void receiveMessage(String message) {
//序列化對(duì)象(特別注意:發(fā)布的時(shí)候需要設(shè)置序列化壁公;訂閱方也需要設(shè)置序列化)
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(WebSocketMessageDTO.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
MessageDTO message = (MessageDTO ) seria.deserialize(message.getBytes());
// 接收到消息對(duì)象,自己實(shí)現(xiàn)相關(guān)的業(yè)務(wù)處理...
}
}
六绅项、測(cè)試發(fā)布消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestRedis {
@Resource
private RedisTemplate redisTemplate;
@Test
public void test() {
final String TOPIC_NAME = "TEST_TOPIC"; // 訂閱主題
MessageDTO message = new MessageDTO();
message.setTitle("訂閱發(fā)布測(cè)試...");
// 發(fā)布消息
redisTemplate.convertAndSend(TOPIC_NAME, message);
}
}