一紧阔、使用rocketmq-spring-boot-starter組件
1、添加依賴
<!-- rocketmq依賴 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
2烤黍、添加配置
rocketmq:
nameServer: xx.xx.xx.xx:9876
producer:
group: test-group # 必須指定group
send-message-timeout: 3000 # 消息發(fā)送超時時長知市,默認3s
retry-times-when-send-failed: 3 # 同步發(fā)送消息失敗重試次數,默認2
retry-times-when-send-async-failed: 3 # 異步發(fā)送消息失敗重試次數速蕊,默認2
consumer:
group: test-group
topic: test-topic
3嫂丙、配置生產者
常用方法匯總,根據實際需要進行封裝
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
public class MQProducerService {
@Value("${rocketmq.producer.send-message-timeout}")
private Integer messageTimeOut;
// 直接注入使用规哲,用于發(fā)送消息到broker服務器
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 發(fā)送同步消息跟啤,異步消費(阻塞當前線程,等待broker響應發(fā)送結果唉锌,這樣不太容易丟失消息)
* (msgBody也可以是對象隅肥,sendResult為返回的發(fā)送結果)
*/
public SendResult sendMsg(String topic,String msgBody) {
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());
return sendResult;
}
public SendResult sendKeyMsg(String topic,String msgBody,String key){
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).setHeader("KEYS",key).build());
return sendResult;
}
/**
* 發(fā)送異步消息(通過線程池執(zhí)行發(fā)送到broker的消息任務,執(zhí)行完后回調:在SendCallback中可處理相關成功失敗時的邏輯)
* (適合對響應時間敏感的業(yè)務場景)
*/
public void sendAsyncMsg(String topic,String msgBody) {
rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 處理消息發(fā)送成功邏輯
}
@Override
public void onException(Throwable throwable) {
// 處理消息發(fā)送異常邏輯
}
});
}
public void sendAsyncKeyMsg(String topic,String msgBody,String key) {
rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).setHeader("KEYS",key).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 處理消息發(fā)送成功邏輯
}
@Override
public void onException(Throwable throwable) {
// 處理消息發(fā)送異常邏輯
}
});
}
/**
* 發(fā)送延時消息(上面的發(fā)送同步消息袄简,delayLevel的值就為0腥放,因為不延時)
* 在start版本中 延時消息一共分為18個等級分別為:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
public void sendDelayMsg(String topic,String msgBody, int delayLevel) {
rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);
}
public void sendDelayKeyMsg(String topic,String msgBody, int delayLevel,String key) {
rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).setHeader("KEYS",key).build(), messageTimeOut, delayLevel);
}
/**
* 發(fā)送單向消息(只負責發(fā)送消息,不等待應答绿语,不關心發(fā)送結果秃症,如日志)
*/
public void sendOneWayMsg(String topic,String msgBody) {
rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build());
}
public void sendOneWayKeyMsg(String topic,String msgBody,String key) {
rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).setHeader("KEYS",key).build());
}
/**
* 發(fā)送帶tag的消息候址,直接在topic后面加上":tag"
*/
public SendResult sendTagMsg(String topic,String msgBody,String tag) {
return rocketMQTemplate.syncSend(topic + ":"+tag, MessageBuilder.withPayload(msgBody).build());
}
public SendResult sendTagKeyMsg(String topic,String msgBody,String tag,String key) {
return rocketMQTemplate.syncSend(topic + ":"+tag, MessageBuilder.withPayload(msgBody).setHeader("KEYS",key).build());
}
}
4、配置消費者
@RocketMQMessageListener(topic = "${rocketmq.topic}",nameServer = "${rocketmq.nameServer}",consumerGroup = "${rocketmq.consumer.group}")
@Component
public class ComsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String str) {
try {
Thread.sleep(1000);
System.out.println("睡眠結束");
}catch (Exception e){
System.out.println("異常");
}
System.out.println("接收信息:"+str);
}
}
5种柑、測試推送/消費mq
@RestController("/mq")
public class RocketMqTestController {
@Autowired
private MQProducerService mqProducerService;
@GetMapping(value = "/sned")
@ResponseBody
public Boolean snedMqMsg(){
mqProducerService.sendMsg("test-topic","發(fā)送mq消息:1111");
System.out.println("發(fā)送成功");
return true;
}
}
測試結果:
二宗雇、使用rocketmq-client組件
1、添加依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.1</version>
</dependency>
2莹规、添加配置
rocketmq:
config:
topic: test-topic
nameServerAddress: xx.xx.xxx.xxx:9876;xx.xx.xxx.xxx:9876
consumerGroupId: test-group
添加配置類
@Component
public class RocketMqProducerConfig {
private static String nameServerAddress;
private static String topic;
private static String groupId;
@Value("${rocketmq.nameServerAddress}")
private String appAddress;
@Value("${rocketmq.config.topic}")
private String appTopic;
@Value("${rocketmq.esSync.consumerGroupId}")
private String appGroupId;
@PostConstruct
public void getConfig() {
topic = this.appTopic;
nameServerAddress = this.appAddress;
groupId = this.appGroupId;
}
public static String getNameServerAddress() {
return nameServerAddress;
}
public static String getTopic() {
return topic;
}
public static String getGroupId() {
return groupId;
}
}
添加消息dto
public class MqMsgDto implements Serializable {
private String pushId;
public String getPushId() {
return pushId;
}
public void setPushId(String pushId) {
this.pushId = pushId;
}
}
3、配置生產者
@Service
@DependsOn("rocketMqProducerConfig")
public class CalendarPushMqServiceImpl implements ICalendarPushMqApi {
protected static final Logger logger = LoggerFactory.getLogger(CalendarPushMqServiceImpl.class);
private static DefaultMQProducer rocketProducer = null;
static {
try {
// 實例化消息生產者Producer
rocketProducer = new DefaultMQProducer(RocketMqProducerConfig.getGroupId());
// 設置NameServer的地址
rocketProducer.setNamesrvAddr(RocketMqProducerConfig.getNameServerAddress());
rocketProducer.start();
logger.error("rocketMq生產者初始化成功");
} catch (Exception e) {
logger.error("rocketMq生產者初始化失斆谏瘛:", e);
}
}
public void pushMsg(MqMsgDto mqMsgDto) {
try {
Message message = new Message();
message.setTopic(RocketMqProducerConfig.getTopic());
message.setBody(JsonUtil.json(mqMsgDto).getBytes());
message.setKeys(calendarMqMsgDto.getCalendarId());
message.setTags(Long.toString(new Date().getTime()));
//延時消費,第二個等級-5s
message.setDelayTimeLevel(2);
rocketProducer.send(message);
} catch (Exception e) {
logger.error("推送mq出錯:", e);
}
}
}
4良漱、配置消費者
@Configuration
public class RocketMqConfiguration {
protected static final Logger logger = LoggerFactory.getLogger(RocketMqConfiguration.class);
/**
* -推送開放搜索-普通消費隊列
* */
@Bean
public DefaultMQPushConsumer testConsumer() {
try {
// 實例化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqProducerConfig.getGroupId());
// 設置NameServer的地址
consumer.setNamesrvAddr(RocketMqProducerConfig.getNameServerAddress());
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
consumer.subscribe(RocketMqProducerConfig.getTopic(), "*");
consumer.registerMessageListener(new RocketMqMessageListener());
consumer.start();
logger.info("初始化消費者成功");
return consumer;
}catch (Exception e){
logger.error("初始化消費隊列失敾都省:",e);
}
return null;
}
}
監(jiān)聽器配置
@Component
public class RocketMqMessageListener implements MessageListenerConcurrently {
private static final Logger logger = LoggerFactory.getLogger(RocketMqMessageListener.class);
private Test test= SpringBeanUtil.getBean(Test.class);
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if(CollectionUtils.isEmpty(list)){
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt message : list) {
String mdmqMessage = null;
MqMsgDto dto = null;
try {
mdmqMessage = new String(message.getBody(), StandardCharsets.UTF_8);
dto = JSON.parseObject(mdmqMessage, MqMsgDto.class);
if (Objects.isNull(dto)){
continue;
}
logger.error("接收信息:"+JSON.toJSONString(dto));
test.update(dto);
}catch (Exception e){
logger.error("接收無效消息母市,自動過濾",e);
continue;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}