使用 Spring Data Redis 發(fā)布訂閱消息
1. 概述
在 Redis 中君丁,發(fā)布者并沒有將消息發(fā)送給特定的訂閱者女淑。是將發(fā)布的消息被劃分為通道牍蜂,并不知道會有哪些訂閱者(如果有的話)嫩海。
類似地早抠,訂閱者表示對一個或多個主題感興趣,并且只接收感興趣的消息碑韵,而不知道有哪些發(fā)布者(如果有的話)赡茸。
發(fā)布者和訂閱者的這種解耦可以實(shí)現(xiàn)更大的可伸縮性和更動態(tài)的網(wǎng)絡(luò)拓?fù)洹?/p>
2. Redis 配置
讓我們開始添加消息隊(duì)列所需的配置。
首先祝闻,我們將定義一個 MessageListenerAdapter占卧,其中包含名為 RedisMessageSubscriber 的 MessageListener 接口的自定義實(shí)現(xiàn)。這個 bean 充當(dāng)發(fā)布-訂閱消息模型中的訂閱者:
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber());
}
RedisMessageListenerContainer 是 Spring Data Redis 提供的一個類联喘。這是內(nèi)部調(diào)用的华蜒,根據(jù) Spring Data Redis 文檔 的說法 —— “處理監(jiān)聽、轉(zhuǎn)換和消息調(diào)度的底層細(xì)節(jié)豁遭“认玻”
@Bean
RedisMessageListenerContainer redisContainer() {
RedisMessageListenerContainer container
= new RedisMessageListenerContainer();
container.setConnectionFactory(jedisConnectionFactory());
container.addMessageListener(messageListener(), topic());
return container;
}
我們還將使用定制的 MessagePublisher 接口和 RedisMessagePublisher 實(shí)現(xiàn)創(chuàng)建 bean。這樣堤框,我們可以有一個通用的消息發(fā)布 API域滥,并讓 Redis 實(shí)現(xiàn)采用 redisTemplate 和 topic 作為構(gòu)造函數(shù)參數(shù):
@Bean
MessagePublisher redisPublisher() {
return new RedisMessagePublisher(redisTemplate(), topic());
}
最后,我們將設(shè)置一個主題蜈抓,發(fā)布者將向其發(fā)送消息启绰,訂閱者將接收消息:
@Bean
ChannelTopic topic() {
return new ChannelTopic("messageQueue");
}
3. 發(fā)布消息
3.1. 定義 MessagePublisher 接口
Spring Data Redis 沒有提供用于消息分發(fā)的 MessagePublisher 接口。:我們可以定義一個自定義接口沟使,它將在實(shí)現(xiàn)中使用 redisTemplate:
public interface MessagePublisher {
void publish(String message);
}
3.2. RedisMessagePublisher 實(shí)現(xiàn)[
我們接下來提供 MessagePublisher 接口的實(shí)現(xiàn)委可,添加消息發(fā)布的細(xì)節(jié)并使用 redisTemplate 中的函數(shù)。
該模板包含了一組非常豐富的函數(shù)腊嗡,用于廣泛的操作—— 其中 convertAndSend 能夠通過主題向隊(duì)列發(fā)送消息:
public class RedisMessagePublisher implements MessagePublisher {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ChannelTopic topic;
public RedisMessagePublisher() {
}
public RedisMessagePublisher(
RedisTemplate<String, Object> redisTemplate, ChannelTopic topic) {
this.redisTemplate = redisTemplate;
this.topic = topic;
}
public void publish(String message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
如您所見着倾,發(fā)布者實(shí)現(xiàn)非常簡單。它使用 redisTemplate 的 convertAndSend() 方法格式化給定的消息并將其發(fā)布到配置的主題燕少。
主題實(shí)現(xiàn)了發(fā)布和訂閱語義:當(dāng)消息發(fā)布時卡者,它將發(fā)送給所有注冊偵聽該主題的訂閱者。
4. 訂閱消息
RedisMessageSubscriber 實(shí)現(xiàn)了 Spring Data Redis 提供的 MessageListener 接口:
@Service
public class RedisMessageSubscriber implements MessageListener {
public static List<String> messageList = new ArrayList<String>();
public void onMessage(Message message, byte[] pattern) {
messageList.add(message.toString());
System.out.println("Message received: " + message.toString());
}
}
注意客们,還有第二個參數(shù) pattern崇决,在本例中我們沒有使用它。Spring Data Redis 文檔指出底挫,該參數(shù)表示“匹配通道的模式(如果指定)”恒傻,但它可以為 null。
5. 發(fā)送與接收消息
現(xiàn)在我們把它們結(jié)合起來建邓。我們創(chuàng)建一個消息盈厘,然后使用 RedisMessagePublisher 發(fā)布它:
String message = "Message " + UUID.randomUUID();
redisMessagePublisher.publish(message);
當(dāng)我們調(diào)用 publish(message) 時,內(nèi)容被發(fā)送到 Redis官边,在那里它被路由到我們的發(fā)布者中定義的消息隊(duì)列主題沸手。然后將它分發(fā)給該主題的訂閱者外遇。
您可能已經(jīng)注意到 RedisMessageSubscriber 是一個偵聽器,它將自己注冊到隊(duì)列以檢索消息契吉。
消息到達(dá)時臀规,訂閱者定義的 onMessage() 方法被觸發(fā)。
在我們的例子中栅隐,我們可以通過檢查 RedisMessageSubscriber 中的 messageList 來驗(yàn)證我們已經(jīng)收到了已經(jīng)發(fā)布的消息:
RedisMessageSubscriber.messageList.get(0).contains(message)
6. 結(jié)論
在本文中,我們研究了使用Spring Data Redis 實(shí)現(xiàn)的發(fā)布/訂閱消息隊(duì)列玩徊。
上述示例的實(shí)現(xiàn)可以在 GitHub project 項(xiàng)目中找到租悄。
【注】本文譯自:PubSub Messaging with Spring Data Redis | Baeldung