SpringBoot消息使用RocketMQ Tag

一棒妨、Topic+Tag

使用SpringBoot框架集成RocketMQ纤垂,我們使用的是RocketMQTemplate這種方式實現(xiàn)消息的發(fā)送和接收菱蔬。如果我們只用Topic不用Tag窑睁,代碼是這樣的:

@Slf4j
@Lazy
@Component
public class TopicTagTestSender {

    private static final String PN = "TopicTag測試生產(chǎn)者, ";

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Value("${rocketmq.topic.topicTagTestTopic}")
    private String topicTagTestTopic;

    /**
     * 發(fā)送消息
     */
    private void asyncSend(EsTopic topic,String tags){
        String message = JSONObject.toJSONString(topic);

        rocketMQTemplate.asyncSend(topicTagTestTopic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info(PN + "消息發(fā)送成功, result: {}", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                log.error(PN + "消息發(fā)送失敗");
                e.printStackTrace();
            }
        });
    }
}
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic0}",
        topic = "${rocketmq.topic.topicTagTestTopic}",
        messageModel = MessageModel.CLUSTERING)
public class TopicTagTestHandler implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
        System.out.println("TopicTagTestHandler:"+esTopic.toString());
    }
}

如果要在Topic的基礎上加上Tag骇陈,只需要topicName后面拼接:tags即可。
RocketMQTemplate.asyncSend源碼如下

    /**
     * Same to {@link #asyncSend(String, Message, SendCallback)}.
     *
     * @param destination  formats: `topicName:tags`
     * @param payload      the Object to use as payload
     * @param sendCallback {@link SendCallback}
     */
    public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
        asyncSend(destination, payload, sendCallback, producer.getSendMsgTimeout());
    }

另外左胞,@RocketMQMessageListener這個注解里selectorExpression默認是*寇仓,接收topic下全部消息。selectorExpression這個不支持配置烤宙,需要寫成常量遍烦。

二、樣例代碼

2.1躺枕、Sender
package com.qimiao.qm.content.app.rocketmq.sender;

import com.alibaba.fastjson.JSONObject;
import com.qimiao.qm.common.core.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Slf4j
@Lazy
@Component
public class TopicTagTestSender {

    private static final String PN = "TopicTag測試生產(chǎn)者, ";

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Value("${rocketmq.topic.topicTagTestTopic}")
    private String topicTagTestTopic;

    public static final String ADD_TAG="ADD";

    public static final String UPATE_TAG="UPDATE";

    public void asyncSendAll(EsTopic topic){
        asyncSend(topic,null);
    }

    public void asyncSendAdd(EsTopic topic){
        asyncSend(topic,ADD_TAG);
    }

    public void asyncSendUpdate(EsTopic topic){
        asyncSend(topic,UPATE_TAG);
    }

    /**
     * 發(fā)送消息
     */
    private void asyncSend(EsTopic topic,String tags){
        //Message<EsTopic> message = MessageBuilder.withPayload(topic).build();
        String message = JSONObject.toJSONString(topic);

        String destination = topicTagTestTopic;
        if(StringUtil.isNotEmpty(tags)){
            destination = topicTagTestTopic+":"+tags;
        }

        rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info(PN + "消息發(fā)送成功, result: {}", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                log.error(PN + "消息發(fā)送失敗");
                e.printStackTrace();
            }
        });
    }
}

2.2服猪、TopicTagTestHandler
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic0}",
        topic = "${rocketmq.topic.topicTagTestTopic}",
        messageModel = MessageModel.CLUSTERING)
public class TopicTagTestHandler implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
        System.out.println("TopicTagTestHandler:"+esTopic.toString());
    }
}
2.3、TopicTagTestOneHandler
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic1}",
        topic = "${rocketmq.topic.topicTagTestTopic}",
        selectorExpression = "ADD",
        //selectorExpression = ${rocketmq.tags.add} 取不到值的,
        messageModel = MessageModel.CLUSTERING)
public class TopicTagTestOneHandler implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {

        String msgId = message.getMsgId();
        EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
        System.out.println("TopicTagTestOneHandler:"+esTopic.toString());
    }
}
2.4拐云、TopicTagTestTwoHandler
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic2}",
        topic = "${rocketmq.topic.topicTagTestTopic}",
        selectorExpression = "UPDATE",
        messageModel = MessageModel.CLUSTERING)
public class TopicTagTestTwoHandler implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
        System.out.println("TopicTagTestTwoHandler:"+esTopic.toString());
    }
}
2.5罢猪、TopicTagTestController
@Api(tags = "Topic-Tag測試")
@RestController
@RequestMapping("topic_tags")
public class TopicTagTestController {

    @Resource
    private TopicTagTestSender topicTagTestSender;

    @ApiOperation(value = "測試(TopicTagTestHandler收到消息)")
    @GetMapping
    public Result<Boolean> test() {
        EsTopic topic = new EsTopic("0","測試");
        topicTagTestSender.asyncSendAll(topic);
        return Result.success();
    }

    @ApiOperation(value = "測試1(TopicTagTestHandler、TopicTagTestOneHandler收到消息)")
    @GetMapping("tag_one")
    public Result<Boolean> test1() {
        EsTopic topic = new EsTopic("1","測試1");
        topicTagTestSender.asyncSendAdd(topic);
        return Result.success();
    }

    @ApiOperation(value = "測試2(TopicTagTestHandler叉瘩、TopicTagTestTwoHandler收到消息)")
    @GetMapping("tag_two")
    public Result<Boolean> test2() {
        EsTopic topic = new EsTopic("2","測試2");
        topicTagTestSender.asyncSendUpdate(topic);
        return Result.success();
    }
}

三膳帕、消費端訂閱多個TAG

如果一個消息有多個TAG,可以用||分隔薇缅。

@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic2}",
        topic = "${rocketmq.topic.topicTagTestTopic}",
        selectorExpression = "ADD||UPDATE",
        messageModel = MessageModel.CLUSTERING)
public class TopicTagTestTwoHandler implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
        System.out.println("TopicTagTestTwoHandler:"+esTopic.toString());
    }
}
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末危彩,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子泳桦,更是在濱河造成了極大的恐慌汤徽,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件灸撰,死亡現(xiàn)場離奇詭異谒府,居然都是意外死亡漆羔,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門狱掂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人亲轨,你說我怎么就攤上這事趋惨。” “怎么了惦蚊?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵器虾,是天一觀的道長。 經(jīng)常有香客問我蹦锋,道長兆沙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任莉掂,我火速辦了婚禮葛圃,結果婚禮上,老公的妹妹穿的比我還像新娘憎妙。我一直安慰自己库正,他們只是感情好,可當我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布厘唾。 她就那樣靜靜地躺著褥符,像睡著了一般。 火紅的嫁衣襯著肌膚如雪抚垃。 梳的紋絲不亂的頭發(fā)上喷楣,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天,我揣著相機與錄音鹤树,去河邊找鬼铣焊。 笑死,一個胖子當著我的面吹牛魂迄,可吹牛的內(nèi)容都是我干的粗截。 我是一名探鬼主播,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼捣炬,長吁一口氣:“原來是場噩夢啊……” “哼熊昌!你這毒婦竟也來了?” 一聲冷哼從身側響起湿酸,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤婿屹,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后推溃,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體昂利,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了蜂奸。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片犁苏。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖扩所,靈堂內(nèi)的尸體忽然破棺而出围详,到底是詐尸還是另有隱情,我是刑警寧澤祖屏,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布助赞,位于F島的核電站,受9級特大地震影響袁勺,放射性物質(zhì)發(fā)生泄漏雹食。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一期丰、第九天 我趴在偏房一處隱蔽的房頂上張望群叶。 院中可真熱鬧,春花似錦咐汞、人聲如沸盖呼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽几晤。三九已至,卻和暖如春植阴,著一層夾襖步出監(jiān)牢的瞬間蟹瘾,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工掠手, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留憾朴,地道東北人。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓喷鸽,卻偏偏與公主長得像众雷,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子做祝,可洞房花燭夜當晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容