一棒妨、Topic+Tag
使用SpringBoot框架集成RocketMQ纤垂,我們使用的是RocketMQTemplate這種方式實現(xiàn)消息的發(fā)送和接收菱蔬。如果我們只用Topic不用Tag窑睁,代碼是這樣的:
@Slf4j
@Lazy
@Component
public class TopicTagTestSender {
private static final String PN = "TopicTag測試生產(chǎn)者, ";
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.topic.topicTagTestTopic}")
private String topicTagTestTopic;
/**
* 發(fā)送消息
*/
private void asyncSend(EsTopic topic,String tags){
String message = JSONObject.toJSONString(topic);
rocketMQTemplate.asyncSend(topicTagTestTopic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info(PN + "消息發(fā)送成功, result: {}", sendResult);
}
@Override
public void onException(Throwable e) {
log.error(PN + "消息發(fā)送失敗");
e.printStackTrace();
}
});
}
}
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic0}",
topic = "${rocketmq.topic.topicTagTestTopic}",
messageModel = MessageModel.CLUSTERING)
public class TopicTagTestHandler implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
System.out.println("TopicTagTestHandler:"+esTopic.toString());
}
}
如果要在Topic的基礎上加上Tag骇陈,只需要topicName
后面拼接:tags
即可。
RocketMQTemplate.asyncSend
源碼如下
/**
* Same to {@link #asyncSend(String, Message, SendCallback)}.
*
* @param destination formats: `topicName:tags`
* @param payload the Object to use as payload
* @param sendCallback {@link SendCallback}
*/
public void asyncSend(String destination, Object payload, SendCallback sendCallback) {
asyncSend(destination, payload, sendCallback, producer.getSendMsgTimeout());
}
另外左胞,@RocketMQMessageListener
這個注解里selectorExpression
默認是*
寇仓,接收topic下全部消息。selectorExpression
這個不支持配置烤宙,需要寫成常量遍烦。
二、樣例代碼
2.1躺枕、Sender
package com.qimiao.qm.content.app.rocketmq.sender;
import com.alibaba.fastjson.JSONObject;
import com.qimiao.qm.common.core.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Lazy
@Component
public class TopicTagTestSender {
private static final String PN = "TopicTag測試生產(chǎn)者, ";
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.topic.topicTagTestTopic}")
private String topicTagTestTopic;
public static final String ADD_TAG="ADD";
public static final String UPATE_TAG="UPDATE";
public void asyncSendAll(EsTopic topic){
asyncSend(topic,null);
}
public void asyncSendAdd(EsTopic topic){
asyncSend(topic,ADD_TAG);
}
public void asyncSendUpdate(EsTopic topic){
asyncSend(topic,UPATE_TAG);
}
/**
* 發(fā)送消息
*/
private void asyncSend(EsTopic topic,String tags){
//Message<EsTopic> message = MessageBuilder.withPayload(topic).build();
String message = JSONObject.toJSONString(topic);
String destination = topicTagTestTopic;
if(StringUtil.isNotEmpty(tags)){
destination = topicTagTestTopic+":"+tags;
}
rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info(PN + "消息發(fā)送成功, result: {}", sendResult);
}
@Override
public void onException(Throwable e) {
log.error(PN + "消息發(fā)送失敗");
e.printStackTrace();
}
});
}
}
2.2服猪、TopicTagTestHandler
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic0}",
topic = "${rocketmq.topic.topicTagTestTopic}",
messageModel = MessageModel.CLUSTERING)
public class TopicTagTestHandler implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
System.out.println("TopicTagTestHandler:"+esTopic.toString());
}
}
2.3、TopicTagTestOneHandler
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic1}",
topic = "${rocketmq.topic.topicTagTestTopic}",
selectorExpression = "ADD",
//selectorExpression = ${rocketmq.tags.add} 取不到值的,
messageModel = MessageModel.CLUSTERING)
public class TopicTagTestOneHandler implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
System.out.println("TopicTagTestOneHandler:"+esTopic.toString());
}
}
2.4拐云、TopicTagTestTwoHandler
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic2}",
topic = "${rocketmq.topic.topicTagTestTopic}",
selectorExpression = "UPDATE",
messageModel = MessageModel.CLUSTERING)
public class TopicTagTestTwoHandler implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
System.out.println("TopicTagTestTwoHandler:"+esTopic.toString());
}
}
2.5罢猪、TopicTagTestController
@Api(tags = "Topic-Tag測試")
@RestController
@RequestMapping("topic_tags")
public class TopicTagTestController {
@Resource
private TopicTagTestSender topicTagTestSender;
@ApiOperation(value = "測試(TopicTagTestHandler收到消息)")
@GetMapping
public Result<Boolean> test() {
EsTopic topic = new EsTopic("0","測試");
topicTagTestSender.asyncSendAll(topic);
return Result.success();
}
@ApiOperation(value = "測試1(TopicTagTestHandler、TopicTagTestOneHandler收到消息)")
@GetMapping("tag_one")
public Result<Boolean> test1() {
EsTopic topic = new EsTopic("1","測試1");
topicTagTestSender.asyncSendAdd(topic);
return Result.success();
}
@ApiOperation(value = "測試2(TopicTagTestHandler叉瘩、TopicTagTestTwoHandler收到消息)")
@GetMapping("tag_two")
public Result<Boolean> test2() {
EsTopic topic = new EsTopic("2","測試2");
topicTagTestSender.asyncSendUpdate(topic);
return Result.success();
}
}
三膳帕、消費端訂閱多個TAG
如果一個消息有多個TAG,可以用||分隔薇缅。
@Slf4j
@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group.topicTagTestTopic2}",
topic = "${rocketmq.topic.topicTagTestTopic}",
selectorExpression = "ADD||UPDATE",
messageModel = MessageModel.CLUSTERING)
public class TopicTagTestTwoHandler implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msgId = message.getMsgId();
EsTopic esTopic = JSONObject.parseObject(message.getBody(), EsTopic.class);
System.out.println("TopicTagTestTwoHandler:"+esTopic.toString());
}
}