RocketMQ 與 Spring Cloud Stream整合(七灵临、消息過濾)

RocketMQ 提供了兩種方式給 Consumer 進(jìn)行消息的過濾:

  • 基于 Tag 過濾

    標(biāo)簽(Tag):為消息設(shè)置的標(biāo)志坐求,用于同一主題下區(qū)分不同類型的消息。來自同一業(yè)務(wù)單元的消息陶冷,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽钙姊。標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性,并優(yōu)化 RocketMQ 提供的查詢系統(tǒng)埂伦。消費(fèi)者可以根據(jù) Tag 實(shí)現(xiàn)對(duì)不同子主題的不同消費(fèi)邏輯煞额,實(shí)現(xiàn)更好的擴(kuò)展性。

  • 基于 SQL92 過濾沾谜,示例:http://www.reibang.com/p/5b13868f4451

消息過濾目前是在 Broker 端實(shí)現(xiàn)的膊毁,優(yōu)點(diǎn)是減少了 Broker 和 Consumer 之間的無用消息的網(wǎng)絡(luò)傳輸,缺點(diǎn)是增加了 Broker 的負(fù)擔(dān)基跑、而且實(shí)現(xiàn)相對(duì)復(fù)雜婚温。

一般情況下,我們使用 Tag 過濾較多媳否,我們來搭建一個(gè) RocketMQ 使用 Tag 進(jìn)行消息過濾的示例栅螟。考慮方便逆日,我們直接復(fù)用快速入門小節(jié)的項(xiàng)目:

  • 修改 [sca-stream-rocketmq-producer] 發(fā)送帶有 Tag 的消息嵌巷。
  • 從 [sca-stream-rocketmq-consumer復(fù)制出 [sca-stream-rocketmq-consumer-filter]來使用 Tag 過濾消息來消費(fèi)。

先搭建消費(fèi)者室抽。

7.1 Demo01Controller

修改 [Demo01Controller]類搪哪,增加發(fā)送 3 條帶 Tag 的消息的 HTTP 接口。代碼如下:

    /**
     * 發(fā)送3條帶有tag的message
     *
     * @return
     */
    @GetMapping("/send_tag")
    public boolean sendTag() {
        for (String tag : new String[]{"trek", "specialized", "look"}) {
            // 創(chuàng)建 Message
            Demo01Message message = new Demo01Message()
                    .setId(new Random().nextInt());
            // 創(chuàng)建 Spring Message 對(duì)象
            Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
                    .setHeader(MessageConst.PROPERTY_TAGS, tag) //  <1> 設(shè)置 Tag
                    .build();
            // 發(fā)送消息
            mySource.erbadagangOutput().send(springMessage);
        }
        return true;
    }

在 <1> 處坪圾,通過添加頭 MessageConst.PROPERTY_TAGS晓折,設(shè)置發(fā)送消息的 Tag。

再搭建消費(fèi)者兽泄。

7.2 復(fù)制項(xiàng)目

從 [sca-stream-rocketmq-consumer] 復(fù)制出 [sca-stream-rocketmq-consumer-filter] 來使用 Tag 過濾消息來消費(fèi)漓概。

9.3 配置文件

修改 [application.yml] 配置文件,設(shè)置 tags 配置項(xiàng)為 trek || look病梢,只消費(fèi)帶有 Tag 為 treklook 的消息胃珍。完整配置如下:

spring:
  application:
    name: erbadagang-consumer-application
  cloud:
    # Spring Cloud Stream 配置項(xiàng),對(duì)應(yīng) BindingServiceProperties 類
    stream:
      # Binding 配置項(xiàng)蜓陌,對(duì)應(yīng) BindingProperties Map
      bindings:
        erbadagang-input:
          destination: ERBADAGANG-TOPIC-01 # 目的地觅彰。這里使用 RocketMQ Topic
          content-type: application/json # 內(nèi)容格式。這里使用 JSON
          group: erbadagang-consumer-group-ERBADAGANG-TOPIC-01 # 消費(fèi)者分組,命名規(guī)則:組名+topic名

      # Spring Cloud Stream RocketMQ 配置項(xiàng)
      rocketmq:
        # RocketMQ Binder 配置項(xiàng)钮热,對(duì)應(yīng) RocketMQBinderConfigurationProperties 類
        binder:
          name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
        # RocketMQ 自定義 Binding 配置項(xiàng)填抬,對(duì)應(yīng) RocketMQBindingProperties Map
        bindings:
          erbadagang-input:
            # RocketMQ Consumer 配置項(xiàng),對(duì)應(yīng) RocketMQConsumerProperties 類
            consumer:
              enabled: true # 是否開啟消費(fèi)隧期,默認(rèn)為 true
              broadcasting: false # 是否使用廣播消費(fèi)飒责,默認(rèn)為 false 使用集群消費(fèi)
              tags: trek || look # 基于 Tag 訂閱赘娄,多個(gè) Tag 使用 || 分隔,默認(rèn)為空
              sql: # 基于 SQL92 訂閱宏蛉,默認(rèn)為空
server:
  port: ${random.int[10000,19999]} # 隨機(jī)端口,方便啟動(dòng)多個(gè)消費(fèi)者

想要基于 SQL92 過濾消息檐晕,可以通過設(shè)置 sql 配置項(xiàng)。

7.4 簡單測試

① 執(zhí)行 ConsumerApplication,啟動(dòng)消費(fèi)者的實(shí)例篡石。
② 執(zhí)行 ProducerApplication芥喇,啟動(dòng)生產(chǎn)者的實(shí)例。
之后凰萨,請(qǐng)求 http://127.0.0.1:18080/demo01/send_tag 接口,發(fā)送帶有 Tag 的消息胖眷。IDEA 控制臺(tái)輸出日志如下:

2020-08-06 21:31:14.711  INFO 15396 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [onMessage][線程編號(hào):76 消息內(nèi)容:Demo01Message{id=-1436746476}]
2020-08-06 21:31:14.714  INFO 15396 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [onMessage][線程編號(hào):76 消息內(nèi)容:Demo01Message{id=-1283315762}]

只消費(fèi)了兩條消息,目測 Tag 為 specialized 的消息已經(jīng)被過濾了珊搀。要注意冶忱,被過濾掉的消息,后續(xù)是無法被消費(fèi)組消費(fèi)掉了境析,效果和消費(fèi)成功是一樣的。

7.5 Consumer 過濾器

上面我們看到的是 RocketMQ 獨(dú)有的 Broker級(jí)別的消息過濾機(jī)制劳淆,而 Spring Cloud Stream 提供了通用的 Consumer 級(jí)別的效率過濾器機(jī)制。我們只需要使用 @StreamListener 注解的 condition 屬性沛鸵,設(shè)置消息滿足指定 Spring EL 表達(dá)式的情況下曲掰,才進(jìn)行消費(fèi)疾捍。

/**
* A condition that must be met by all items that are dispatched to this method.
* @return a SpEL expression that must evaluate to a {@code boolean} value.
*/
String condition() default "";

修改 [Demo01Consumer]類蜈缤,使用 @StreamListener 注解的 condition 屬性來過濾消息。代碼如下:

package com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.listener;

import com.erbadagang.springcloudalibaba.stream.rocketmq.consumer.message.Demo01Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class Demo01Consumer {

    private Logger logger = LoggerFactory.getLogger(getClass());
/*

    @StreamListener(MySink.ERBADAGANG_INPUT)
    public void onMessage(@Payload Demo01Message message) {
        logger.info("[onMessage][線程編號(hào):{} 消息內(nèi)容:{}]", Thread.currentThread().getId(), message);
    }
*/

    @StreamListener(value = MySink.ERBADAGANG_INPUT, condition = "headers['rocketmq_TAGS'] == 'trek'")
    public void onMessage(@Payload Demo01Message message) {
        logger.info("[onMessage][線程編號(hào):{} 消息內(nèi)容:{}]", Thread.currentThread().getId(), message);
    }
}

這里我們?cè)O(shè)置消息的 Header 帶有的rocketmq_TAGS值為 trek 時(shí)咙鞍,才進(jìn)行消費(fèi)。
多個(gè)條件適宜SpEL表達(dá)式:@StreamListener(value = MySink.ERBADAGANG_INPUT, condition = "headers['rocketmq_TAGS'] == 'trek'||headers['rocketmq_TAGS'] == 'look'")

7.5.1 再次測試

① 執(zhí)行 ConsumerApplication翰守,啟動(dòng)消費(fèi)者的實(shí)例疲酌。
② 執(zhí)行 ProducerApplication,啟動(dòng)生產(chǎn)者的實(shí)例朗恳。
之后,請(qǐng)求 http://127.0.0.1:18080/demo01/send_tag 接口油航,發(fā)送帶有 Tag 的消息怀浆。IDEA 控制臺(tái)輸出日志如下:

// tag為trek的正常消費(fèi)了。
2020-08-06 21:50:14.672  INFO 11016 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [onMessage][線程編號(hào):77 消息內(nèi)容:Demo01Message{id=880062806}]
//這個(gè)消息雖然在yml中訂閱了执赡,但java方法沒有配@StreamListener所以報(bào)錯(cuò)镰踏。
2020-08-06 21:50:14.673  WARN 11016 --- [MessageThread_1] .DispatchingStreamListenerMessageHandler : Cannot find a @StreamListener matching for message with id: a7eb0b98-df97-f2ec-d566-ca514c1482c2

只消費(fèi)了一條消息沙合,Tag 為 specialized 的消息被 Broker 過濾,Tag 為 look 的消息被 Consumer 過濾芳来。要注意猜拾,被過濾掉的消息,后續(xù)是無法被消費(fèi)掉了挎袜,效果和消費(fèi)成功是一樣的。

7.5.2 控制臺(tái)查看

  • trek這條消息是被正常消費(fèi)的紊搪。


    trek message
  • specialized被broker過濾了全景,但是等同已消費(fèi)。


    specialized

底線


本文源代碼使用 Apache License 2.0開源許可協(xié)議滞伟,這里是本文源碼Gitee地址,可通過命令git clone+地址下載代碼到本地梆奈,也可直接點(diǎn)擊鏈接通過瀏覽器方式查看源代碼。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末乓梨,一起剝皮案震驚了整個(gè)濱河市清酥,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌焰轻,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,817評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異荸频,居然都是意外死亡客冈,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門和悦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來渠缕,“玉大人,你說我怎么就攤上這事亦鳞。” “怎么了燕差?”我有些...
    開封第一講書人閱讀 157,354評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵徒探,是天一觀的道長。 經(jīng)常有香客問我测暗,道長磨澡,這世上最難降的妖魔是什么蹋辅? 我笑而不...
    開封第一講書人閱讀 56,498評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮秩命,結(jié)果婚禮上褒傅,老公的妹妹穿的比我還像新娘。我一直安慰自己殿托,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評(píng)論 6 386
  • 文/花漫 我一把揭開白布旋廷。 她就那樣靜靜地躺著礼搁,像睡著了一般。 火紅的嫁衣襯著肌膚如雪馒吴。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,829評(píng)論 1 290
  • 那天豪治,我揣著相機(jī)與錄音扯罐,去河邊找鬼。 笑死篮赢,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的启泣。 我是一名探鬼主播,決...
    沈念sama閱讀 38,979評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼遣蚀,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了险耀?” 一聲冷哼從身側(cè)響起玖喘,我...
    開封第一講書人閱讀 37,722評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎累奈,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體搞乏,經(jīng)...
    沈念sama閱讀 44,189評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡戒努,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了侍筛。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片撒穷。...
    茶點(diǎn)故事閱讀 38,654評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖桥滨,靈堂內(nèi)的尸體忽然破棺而出弛车,到底是詐尸還是另有隱情,我是刑警寧澤喻括,帶...
    沈念sama閱讀 34,329評(píng)論 4 330
  • 正文 年R本政府宣布贫奠,位于F島的核電站,受9級(jí)特大地震影響唤崭,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜谢肾,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望冕杠。 院中可真熱鬧,春花似錦分预、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至纺座,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間净响,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評(píng)論 1 266
  • 我被黑心中介騙來泰國打工赞别, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留配乓,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,382評(píng)論 2 360
  • 正文 我出身青樓崎页,卻偏偏與公主長得像腰埂,于是被迫代替她去往敵國和親飒焦。 傳聞我的和親對(duì)象是個(gè)殘疾皇子屿笼,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評(píng)論 2 349