在上述的示例中,我們看到的都是使用集群消費突倍,也是最常用的消費模式腔稀。而在一些場景下,我們需要使用廣播消費羽历。
廣播消費模式下焊虏,相同 Consumer Group 的每個 Consumer 實例都接收全量的消息。
例如說秕磷,在應用中诵闭,緩存了數(shù)據(jù)字典等配置表在內存中,可以通過 RocketMQ 廣播消費澎嚣,實現(xiàn)每個應用節(jié)點都消費消息疏尿,刷新本地內存的緩存。
又例如說易桃,我們基于 WebSocket 實現(xiàn)了 IM 聊天褥琐,在我們給用戶主動發(fā)送消息時,因為我們不知道用戶連接的是哪個提供 WebSocket 的應用晤郑,所以可以通過 RocketMQ 廣播消費敌呈,每個應用判斷當前用戶是否是和自己提供的 WebSocket 服務連接,如果是造寝,則推送消息給用戶驱富。
下面,我們來搭建一個 Spring Cloud Stream 消費異常處理機制的示例匹舞。考慮方便线脚,我們直接復用[快速入門]文章的項目赐稽,使用 [sca-stream-rocketmq-producer
]發(fā)送消息,從 [sca-stream-rocketmq-consumer
]復制出 [sca-stream-rocketmq-consumer-broadcasting
] 來演示廣播消費浑侥。
5.1 復制項目
使用 [sca-stream-rocketmq-producer
]發(fā)送消息姊舵,從 [sca-stream-rocketmq-consumer
]復制出 [sca-stream-rocketmq-consumer-broadcasting
]。
5.2 配置文件
修改 [application.yml
]配置文件寓落,只改了一個參數(shù)括丁,設置 broadcasting
配置項為 true
,開啟廣播消費的模式伶选。完整配置如下:
spring:
application:
name: erbadagang-consumer-application
cloud:
# Spring Cloud Stream 配置項史飞,對應 BindingServiceProperties 類
stream:
# Binding 配置項尖昏,對應 BindingProperties Map
bindings:
erbadagang-input:
destination: ERBADAGANG-TOPIC-01 # 目的地。這里使用 RocketMQ Topic
content-type: application/json # 內容格式构资。這里使用 JSON
group: erbadagang-consumer-group-ERBADAGANG-TOPIC-01 # 消費者分組,命名規(guī)則:組名+topic名
trek-input:
destination: TREK-TOPIC-01 # 目的地抽诉。這里使用 RocketMQ Topic
content-type: application/json # 內容格式。這里使用 JSON
group: trek-consumer-group-TREK-TOPIC-01 # 消費者分組,命名規(guī)則:組名+topic名
# Spring Cloud Stream RocketMQ 配置項
rocketmq:
# RocketMQ Binder 配置項吐绵,對應 RocketMQBinderConfigurationProperties 類
binder:
name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定義 Binding 配置項迹淌,對應 RocketMQBindingProperties Map
bindings:
erbadagang-input:
# RocketMQ Consumer 配置項,對應 RocketMQConsumerProperties 類
consumer:
enabled: true # 是否開啟消費己单,默認為 true
broadcasting: true # 是否使用廣播消費唉窃,默認為 false(使用集群消費)
server:
port: ${random.int[10000,19999]} # 隨機端口,方便啟動多個消費者
5.3 簡單測試
① 執(zhí)行 ConsumerApplication 兩次纹笼,啟動兩個消費者的實例纹份,從而實現(xiàn)在消費者分組 erbadagang-consumer-group-ERBADAGANG-TOPIC-01
下有兩個消費者實例。
② 執(zhí)行 ProducerApplication允乐,啟動生產者的實例矮嫉。
之后牍疏,請求 http://127.0.0.1:18080/demo01/send 接口三次蠢笋,發(fā)送三條消息。此時在 IDEA 控制臺看到消費者打印日志如下:
// ConsumerApplication 控制臺 01
2020-08-06 17:07:35.633 INFO 8444 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [onMessage][線程編號:94 消息內容:Demo01Message{id=1167829440}]
// ConsumerApplication 控制臺 02
2020-08-06 17:07:35.633 INFO 15132 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [onMessage][線程編號:93 消息內容:Demo01Message{id=1167829440}]
從日志可以看出鳞陨,每條消息僅被每個消費者消費了一次昨寞。
底線
本文源代碼使用 Apache License 2.0開源許可協(xié)議,這里是本文源碼Gitee地址厦滤,可通過命令git clone+地址
下載代碼到本地援岩,也可直接點擊鏈接通過瀏覽器方式查看源代碼。