在上述的示例中贝搁,我們看到的都是使用集群消費。而在一些場景下芽偏,我們需要使用廣播消費雷逆。
廣播消費模式下,相同 Consumer Group 的每個 Consumer 實例都接收全量的消息污尉。
例如說膀哲,在應用中,緩存了數(shù)據(jù)字典等配置表在內存中十厢,可以通過 RocketMQ 廣播消費等太,實現(xiàn)每個應用節(jié)點都消費消息捂齐,刷新本地內存的緩存蛮放。
又例如說,我們基于 WebSocket 實現(xiàn)了 IM 聊天奠宜,在我們給用戶主動發(fā)送消息時包颁,因為我們不知道用戶連接的是哪個提供 WebSocket 的應用瞻想,所以可以通過 RocketMQ 廣播消費,每個應用判斷當前用戶是否是和自己提供的 WebSocket 服務連接娩嚼,如果是蘑险,則推送消息給用戶。
下面岳悟,我們開始本小節(jié)的示例佃迄。
5.1 Demo05Message
package com.ebadagang.springboot.rocketmq.message;
/**
* 示例 05 的 Message 消息
*/
public class Demo05Message {
public static final String TOPIC = "DEMO_05";
/**
* 編號
*/
private Integer id;
public Demo05Message setId(Integer id) {
this.id = id;
return this;
}
public Integer getId() {
return id;
}
@Override
public String toString() {
return "Demo05Message{" +
"id=" + id +
'}';
}
}
5.2 Demo05Producer
創(chuàng)建 [Demo04Producer]類,它會使用 RocketMQ-Spring 封裝提供的 RocketMQTemplate 贵少,實現(xiàn)同步發(fā)送消息呵俏。代碼如下:
package com.ebadagang.springboot.rocketmq.producer;
import com.ebadagang.springboot.rocketmq.message.Demo05Message;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Demo05Producer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult syncSend(Integer id) {
// 創(chuàng)建 Demo05Message 消息
Demo05Message message = new Demo05Message();
message.setId(id);
// 同步發(fā)送消息
return rocketMQTemplate.syncSend(Demo05Message.TOPIC, message);
}
}
5.3 Demo05Consumer
創(chuàng)建 [Demo05Consumer]類,實現(xiàn) Rocket-Spring 定義的 RocketMQListener 接口滔灶,消費消息普碎。代碼如下:
package com.ebadagang.springboot.rocketmq.consumer;
import com.ebadagang.springboot.rocketmq.message.Demo05Message;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
topic = Demo05Message.TOPIC,
consumerGroup = "demo05-consumer-group-" + Demo05Message.TOPIC,
messageModel = MessageModel.BROADCASTING // 設置為廣播消費
)
public class Demo05Consumer implements RocketMQListener<Demo05Message> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(Demo05Message message) {
logger.info("[onMessage][線程編號:{} 消息內容:{}]", Thread.currentThread().getId(), message);
}
}
- 主要是 @RocketMQMessageListener 注解,通過設置了 messageModel = MessageModel.BROADCASTING 录平,表示使用廣播消費麻车。
5.4 簡單測試
創(chuàng)建 [Demo05ProducerTest]測試類,用于測試廣播消費斗这。代碼如下:
package com.ebadagang.springboot.rocketmq.producer;
import com.ebadagang.springboot.rocketmq.Application;
import org.apache.rocketmq.client.producer.SendResult;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CountDownLatch;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo05ProducerTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private Demo05Producer producer;
@Test
public void test() throws InterruptedException {
// 阻塞等待动猬,保證消費
new CountDownLatch(1).await();
}
@Test
public void testSyncSend() throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
SendResult result = producer.syncSend(id);
logger.info("[testSyncSend][發(fā)送編號:[{}] 發(fā)送結果:[{}]]", id, result);
// 阻塞等待,保證消費
new CountDownLatch(1).await();
}
}
5.4.1 首先
執(zhí)行#test()
測試方法表箭,先啟動一個消費者分組 "demo05-consumer-group-DEMO_05" 的 Consumer 節(jié)點枣察。
5.4.2 然后
執(zhí)行#testSyncSend()
測試方法,先啟動一個消費者分組 "demo05-consumer-group-DEMO_05"
的Consumer
節(jié)點燃逻。同時序目,該測試方法,調用 Demo05ProducerTest#syncSend(id)
方法伯襟,同步發(fā)送了一條消息猿涨。控制臺輸出如下:
5.4.3 #testSyncSend() 方法對應的控制臺
# Producer 同步發(fā)送消息成功
2020-08-04 21:56:34.739 INFO 10824 --- [ main] c.e.s.r.producer.Demo05ProducerTest : [testSyncSend][發(fā)送編號:[1596549394] 發(fā)送結果:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A2A4818B4AAC2142870A50000, offsetMsgId=6585E30D00002A9F0000000000039CC7, messageQueue=MessageQueue [topic=DEMO_05, brokerName=broker-a, queueId=0], queueOffset=0]]]
# Demo05Consumer 消費了該消息
2020-08-04 21:56:34.771 INFO 10824 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo05Consumer : [onMessage][線程編號:174 消息內容:Demo05Message{id=1596549394}]
5.4.4 #test() 方法對應的控制臺
# 另外一個 Demo05Consumer 也消費了該消息
2020-08-04 21:56:34.755 INFO 15504 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo05Consumer : [onMessage][線程編號:184 消息內容:Demo05Message{id=1596549394}]
消費者分組 "demo05-consumer-group-DEMO_05" 的兩個 Consumer 節(jié)點姆怪,都消費了這條發(fā)送的消息叛赚。符合廣播消費的預期。
底線
本文源代碼使用 Apache License 2.0開源許可協(xié)議稽揭,這里是本文源碼Gitee地址俺附,可通過命令git clone+地址
下載代碼到本地,也可直接點擊鏈接通過瀏覽器方式查看源代碼溪掀。