1. 導(dǎo)入依賴
compile 'org.apache.rocketmq:rocketmq-client:4.5.2'
2. 編寫application.yml配置
3. 引入配置信息
為了方便,在這里消費(fèi)者和生產(chǎn)者都放在一個(gè)項(xiàng)目里
引入生產(chǎn)者配置信息
package utry.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author
* 消息生產(chǎn)者配置信息
*/
@Configuration
@ConfigurationProperties(prefix = "rocketmq.producer")
public class ProducerPropertiesConfig {
@Value("${namesrvAddr}")
private String namesrvAddr;
private String groupName;
private Integer maxMessageSize;
private Integer sendMsgTimeout;
private Integer retryTimesWhenSendFailed;
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public Integer getMaxMessageSize() {
return maxMessageSize;
}
public void setMaxMessageSize(Integer maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
public Integer getSendMsgTimeout() {
return sendMsgTimeout;
}
public void setSendMsgTimeout(Integer sendMsgTimeout) {
this.sendMsgTimeout = sendMsgTimeout;
}
public Integer getRetryTimesWhenSendFailed() {
return retryTimesWhenSendFailed;
}
public void setRetryTimesWhenSendFailed(Integer retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}
@Override
public String toString() {
return "ProducerConfig [namesrvAddr=" + namesrvAddr + ", groupName=" + groupName + "]";
}
}
編寫生產(chǎn)者
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author
* 消息生產(chǎn)者
*/
@Configuration
public class ProducerConfigure {
Logger logger = LoggerFactory.getLogger(ProducerConfigure.class);
@Autowired
private ProducerPropertiesConfig producerPropertiesConfig;
@Bean
public DefaultMQProducer defaultProducer() throws MQClientException {
logger.info(producerPropertiesConfig.toString());
logger.info("defaultProducer 正在創(chuàng)建---------------------------------------");
DefaultMQProducer producer = new DefaultMQProducer(producerPropertiesConfig.getGroupName());
producer.setNamesrvAddr(producerPropertiesConfig.getNamesrvAddr());
producer.setVipChannelEnabled(false);
//其他屬性自行設(shè)置矾瑰,這里才用默認(rèn)
producer.start();
logger.info("rocketmq producer server開啟成功---------------------------------.");
return producer;
}
}
引入消費(fèi)者配置信息
package utry.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author
* 消費(fèi)者屬性配置類
*/
@ConfigurationProperties(prefix = "rocketmq.consumer")
@Configuration
public class ConsumerPropertiesConfig {
private String groupName;
@Value("${namesrvAddr}")
private String namesrvAddr;
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getNamesrvAddr() {
return namesrvAddr;
}
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
@Override
public String toString() {
return "ConsumerConfig [groupName=" + groupName + ", namesrvAddr=" + namesrvAddr + "]";
}
}
編寫消費(fèi)者
先編寫一個(gè)抽象類,再寫具體的實(shí)現(xiàn)
- 編寫抽象類
package utry.config;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author
* 抽象的消息消費(fèi)者
*/
@Service
public abstract class DefaultConsumerConfigure {
Logger log = LoggerFactory.getLogger(DefaultConsumerConfigure.class);
@Autowired
private ConsumerPropertiesConfig consumerConfig;
/**
* 開啟消費(fèi)者監(jiān)聽服務(wù)
* @param topic
* @param tag
* @throws MQClientException
*/
public void listener(String topic, String tag) throws MQClientException {
log.info("開啟" + topic + ":" + tag + "消費(fèi)者-------------------");
log.info(consumerConfig.toString());
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerConfig.getGroupName());
consumer.setNamesrvAddr(consumerConfig.getNamesrvAddr());
consumer.subscribe(topic, tag);
// 開啟內(nèi)部類實(shí)現(xiàn)監(jiān)聽
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
return DefaultConsumerConfigure.this.dealBody(messageExtList);
}
});
consumer.start();
log.info("rocketmq啟動(dòng)成功---------------------------------------");
}
/**
* 處理body的業(yè)務(wù)
* @param messageExtList
* @return
*/
public abstract ConsumeConcurrentlyStatus dealBody(List<MessageExt> messageExtList);
}
- 編寫消費(fèi)者
package utry.config;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import java.util.List;
/**
* @author
* 消息消費(fèi)者
*/
@Configuration
public class CustomConsumer extends DefaultConsumerConfigure implements ApplicationListener<ContextRefreshedEvent> {
Logger log = LoggerFactory.getLogger(CustomConsumer.class);
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
try {
super.listener("t_TopicTest", "Tag1");
} catch (MQClientException e) {
log.error("消費(fèi)者監(jiān)聽器啟動(dòng)失敗", e);
}
}
@Override
public ConsumeConcurrentlyStatus dealBody(List<MessageExt> messageExtList) {
log.info("接收到消息");
for (MessageExt msg : messageExtList) {
try {
String msgStr = new String(msg.getBody(), "utf-8");
log.info(msgStr);
} catch (Exception e) {
log.error("body轉(zhuǎn)字符串解析失敗");
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
編寫Controller測(cè)試
package utry.controller;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import utry.config.CustomConsumer;
@RestController
public class ProducerController {
Logger log = LoggerFactory.getLogger(CustomConsumer.class);
@Autowired
private DefaultMQProducer producer;
@GetMapping("/msg/product")
public void test(String info) throws Exception {
Message message = new Message("t_TopicTest", "Tag1", "12345", info.getBytes());
// 這里用到了這個(gè)mq的異步處理,類似ajax,可以得到發(fā)送到mq的情況壶愤,并做相應(yīng)的處理
// 不過要注意的是這個(gè)是異步的
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("傳輸成功");
log.info(JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("傳輸失敗", e);
}
});
}
}
github: