RocketMQ 提供了兩種方式給 Consumer 進(jìn)行消息的過濾:
-
基于 Tag 過濾
標(biāo)簽(Tag):為消息設(shè)置的標(biāo)志坐求,用于同一主題下區(qū)分不同類型的消息。來自同一業(yè)務(wù)單元的消息陶冷,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽钙姊。標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性,并優(yōu)化 RocketMQ 提供的查詢系統(tǒng)埂伦。消費(fèi)者可以根據(jù) Tag 實(shí)現(xiàn)對(duì)不同子主題的不同消費(fèi)邏輯煞额,實(shí)現(xiàn)更好的擴(kuò)展性。
基于 SQL92 過濾沾谜,示例:http://www.reibang.com/p/5b13868f4451
消息過濾目前是在 Broker 端實(shí)現(xiàn)的膊毁,優(yōu)點(diǎn)是減少了 Broker 和 Consumer 之間的無用消息的網(wǎng)絡(luò)傳輸,缺點(diǎn)是增加了 Broker 的負(fù)擔(dān)基跑、而且實(shí)現(xiàn)相對(duì)復(fù)雜婚温。
一般情況下,我們使用 Tag 過濾較多媳否,我們來搭建一個(gè) RocketMQ 使用 Tag 進(jìn)行消息過濾的示例栅螟。考慮方便逆日,我們直接復(fù)用快速入門小節(jié)的項(xiàng)目:
- 修改 [
sca-stream-rocketmq-producer
] 發(fā)送帶有 Tag 的消息嵌巷。 - 從 [
sca-stream-rocketmq-consumer
復(fù)制出 [sca-stream-rocketmq-consumer-filter
]來使用 Tag 過濾消息來消費(fèi)。
先搭建消費(fèi)者室抽。
7.1 Demo01Controller
修改 [Demo01Controller]類搪哪,增加發(fā)送 3 條帶 Tag 的消息的 HTTP 接口。代碼如下:
/**
* 發(fā)送3條帶有tag的message
*
* @return
*/
@GetMapping("/send_tag")
public boolean sendTag() {
for (String tag : new String[]{"trek", "specialized", "look"}) {
// 創(chuàng)建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 創(chuàng)建 Spring Message 對(duì)象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_TAGS, tag) // <1> 設(shè)置 Tag
.build();
// 發(fā)送消息
mySource.erbadagangOutput().send(springMessage);
}
return true;
}
在 <1> 處坪圾,通過添加頭 MessageConst.PROPERTY_TAGS晓折,設(shè)置發(fā)送消息的 Tag。
再搭建消費(fèi)者兽泄。
7.2 復(fù)制項(xiàng)目
從 [sca-stream-rocketmq-consumer
] 復(fù)制出 [sca-stream-rocketmq-consumer-filter
] 來使用 Tag 過濾消息來消費(fèi)漓概。
9.3 配置文件
修改 [application.yml
] 配置文件,設(shè)置 tags
配置項(xiàng)為 trek || look
病梢,只消費(fèi)帶有 Tag 為 trek
或 look
的消息胃珍。完整配置如下:
spring:
application:
name: erbadagang-consumer-application
cloud:
# Spring Cloud Stream 配置項(xiàng),對(duì)應(yīng) BindingServiceProperties 類
stream:
# Binding 配置項(xiàng)蜓陌,對(duì)應(yīng) BindingProperties Map
bindings:
erbadagang-input:
destination: ERBADAGANG-TOPIC-01 # 目的地觅彰。這里使用 RocketMQ Topic
content-type: application/json # 內(nèi)容格式。這里使用 JSON
group: erbadagang-consumer-group-ERBADAGANG-TOPIC-01 # 消費(fèi)者分組,命名規(guī)則:組名+topic名
# Spring Cloud Stream RocketMQ 配置項(xiàng)
rocketmq:
# RocketMQ Binder 配置項(xiàng)钮热,對(duì)應(yīng) RocketMQBinderConfigurationProperties 類
binder:
name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定義 Binding 配置項(xiàng)填抬,對(duì)應(yīng) RocketMQBindingProperties Map
bindings:
erbadagang-input:
# RocketMQ Consumer 配置項(xiàng),對(duì)應(yīng) RocketMQConsumerProperties 類
consumer:
enabled: true # 是否開啟消費(fèi)隧期,默認(rèn)為 true
broadcasting: false # 是否使用廣播消費(fèi)飒责,默認(rèn)為 false 使用集群消費(fèi)
tags: trek || look # 基于 Tag 訂閱赘娄,多個(gè) Tag 使用 || 分隔,默認(rèn)為空
sql: # 基于 SQL92 訂閱宏蛉,默認(rèn)為空
server:
port: ${random.int[10000,19999]} # 隨機(jī)端口,方便啟動(dòng)多個(gè)消費(fèi)者
想要基于 SQL92 過濾消息檐晕,可以通過設(shè)置 sql 配置項(xiàng)。
7.4 簡單測試
① 執(zhí)行 ConsumerApplication,啟動(dòng)消費(fèi)者的實(shí)例篡石。
② 執(zhí)行 ProducerApplication芥喇,啟動(dòng)生產(chǎn)者的實(shí)例。
之后凰萨,請(qǐng)求 http://127.0.0.1:18080/demo01/send_tag 接口,發(fā)送帶有 Tag 的消息胖眷。IDEA 控制臺(tái)輸出日志如下:
2020-08-06 21:31:14.711 INFO 15396 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [onMessage][線程編號(hào):76 消息內(nèi)容:Demo01Message{id=-1436746476}]
2020-08-06 21:31:14.714 INFO 15396 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [onMessage][線程編號(hào):76 消息內(nèi)容:Demo01Message{id=-1283315762}]
只消費(fèi)了兩條消息,目測 Tag 為 specialized
的消息已經(jīng)被過濾了珊搀。要注意冶忱,被過濾掉的消息,后續(xù)是無法被消費(fèi)組消費(fèi)掉了境析,效果和消費(fèi)成功是一樣的。
7.5 Consumer 過濾器
上面我們看到的是 RocketMQ 獨(dú)有的 Broker級(jí)別的消息過濾機(jī)制劳淆,而 Spring Cloud Stream 提供了通用的 Consumer 級(jí)別的效率過濾器機(jī)制。我們只需要使用 @StreamListener 注解的 condition 屬性沛鸵,設(shè)置消息滿足指定 Spring EL 表達(dá)式的情況下曲掰,才進(jìn)行消費(fèi)疾捍。
/**
* A condition that must be met by all items that are dispatched to this method.
* @return a SpEL expression that must evaluate to a {@code boolean} value.
*/
String condition() default "";
修改 [Demo01Consumer]類蜈缤,使用 @StreamListener
注解的 condition
屬性來過濾消息。代碼如下:
package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener;
import com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.message.Demo01Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class Demo01Consumer {
private Logger logger = LoggerFactory.getLogger(getClass());
/*
@StreamListener(MySink.ERBADAGANG_INPUT)
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][線程編號(hào):{} 消息內(nèi)容:{}]", Thread.currentThread().getId(), message);
}
*/
@StreamListener(value = MySink.ERBADAGANG_INPUT, condition = "headers['rocketmq_TAGS'] == 'trek'")
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][線程編號(hào):{} 消息內(nèi)容:{}]", Thread.currentThread().getId(), message);
}
}
這里我們?cè)O(shè)置消息的 Header
帶有的rocketmq_TAGS
值為 trek
時(shí)咙鞍,才進(jìn)行消費(fèi)。
多個(gè)條件適宜SpEL表達(dá)式:@StreamListener(value = MySink.ERBADAGANG_INPUT, condition = "headers['rocketmq_TAGS'] == 'trek'||headers['rocketmq_TAGS'] == 'look'")
7.5.1 再次測試
① 執(zhí)行 ConsumerApplication翰守,啟動(dòng)消費(fèi)者的實(shí)例疲酌。
② 執(zhí)行 ProducerApplication,啟動(dòng)生產(chǎn)者的實(shí)例朗恳。
之后,請(qǐng)求 http://127.0.0.1:18080/demo01/send_tag 接口油航,發(fā)送帶有 Tag 的消息怀浆。IDEA 控制臺(tái)輸出日志如下:
// tag為trek的正常消費(fèi)了。
2020-08-06 21:50:14.672 INFO 11016 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [onMessage][線程編號(hào):77 消息內(nèi)容:Demo01Message{id=880062806}]
//這個(gè)消息雖然在yml中訂閱了执赡,但java方法沒有配@StreamListener所以報(bào)錯(cuò)镰踏。
2020-08-06 21:50:14.673 WARN 11016 --- [MessageThread_1] .DispatchingStreamListenerMessageHandler : Cannot find a @StreamListener matching for message with id: a7eb0b98-df97-f2ec-d566-ca514c1482c2
只消費(fèi)了一條消息沙合,Tag 為 specialized 的消息被 Broker 過濾,Tag 為 look 的消息被 Consumer 過濾芳来。要注意猜拾,被過濾掉的消息,后續(xù)是無法被消費(fèi)掉了挎袜,效果和消費(fèi)成功是一樣的。
7.5.2 控制臺(tái)查看
-
trek這條消息是被正常消費(fèi)的紊搪。
-
specialized被broker過濾了全景,但是等同已消費(fèi)。
底線
本文源代碼使用 Apache License 2.0開源許可協(xié)議滞伟,這里是本文源碼Gitee地址,可通過命令git clone+地址
下載代碼到本地梆奈,也可直接點(diǎn)擊鏈接通過瀏覽器方式查看源代碼。