使用阿里云的rocketMq訂閱與發(fā)送消息,直接上代碼:
1.引入pom依賴
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.8.Final</version>
</dependency>
2.新建RocketMqUtil類
package com.**.***.***.utils;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.***.***.***.listener.MqMessageListener;
import com.***.***.***.listener.MqTimeMessageListener;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RocketMqUtil {
@Value("${rocketMq.groupId}")
private String groupId;
@Value("${rocketMq.accessKey}")
private String accessKey;
@Value("${rocketMq.secretKey}")
private String secretKey;
@Value("${rocketMq.nameSrvAddr}")
private String nameSrvAddr;
@Value("${rocketMq.topic}")
private String topic;
@Value("${rocketMq.startApprovalTag}")
private String startApprovalTag;
@Value("${rocketMq.timeout}")
private String timeout;
@Autowired
private MqMessageListener messageListener;
@Autowired
private MqTimeMessageListener timeMessageListener;
/**
* 創(chuàng)建消息生產(chǎn)者
* @return
*/
@Bean(initMethod = "start", destroyMethod = "shutdown")
public Producer producer() {
Properties properties = new Properties();
properties.put(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
properties.put(PropertyKeyConst.GROUP_ID, groupId);
properties.put(PropertyKeyConst.SendMsgTimeoutMillis, timeout);
ProducerBean producer = new ProducerBean();
producer.setProperties(properties);
log.info("rocketMq創(chuàng)建生產(chǎn)者成功");
return producer;
}
/**
* 創(chuàng)建消息訂閱
* @return
*/
@Bean(initMethod = "start")
public ConsumerBean consumer() {
ConsumerBean consumerBean = new ConsumerBean();
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, groupId);
properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
properties.put(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
consumerBean.setProperties(properties);
// 訂閱消息
Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
// 訂閱普通消息
Subscription subscription = new Subscription();
subscription.setTopic(topic);
subscription.setExpression("start_approve||process_approve");
subscriptionTable.put(subscription, messageListener);
// 訂閱定時/延時消息
Subscription subscriptionTime = new Subscription();
subscriptionTime.setTopic(topic);
subscriptionTime.setExpression("start_approve||process_approve");
subscriptionTable.put(subscriptionTime, timeMessageListener);
consumerBean.setSubscriptionTable(subscriptionTable);
log.info("rocketMq訂閱成功");
return consumerBean;
}
}
3.普通消息監(jiān)聽類MqMessageListener
package com.***.***.***.listener;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class MqMessageListener implements MessageListener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public Action consume(Message message, ConsumeContext context) {
logger.info("接收到MQ普通消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
message.getTopic(), message.getTag(), message.getMsgID(),message.getKey(), new String(message.getBody()));
try {
// 處理業(yè)務
return Action.CommitMessage;
} catch (Exception e) {
logger.error("消費MQ消息失敗耍休! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
return Action.ReconsumeLater;
}
}
}
4.異步/定時/延時消息監(jiān)聽類
package com.***.***.***.listener;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class MqTimeMessageListener implements MessageListener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${rocketMq.startedApprovalTag}")
private String startedApprovalTag;
@Value("${rocketMq.processedApprove}")
private String processedApprove;
@Override
public Action consume(Message message, ConsumeContext context) {
logger.info("接收到MQ定時/延時消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
message.getTopic(), message.getTag(), message.getMsgID(),message.getKey(), new String(message.getBody()));
try {
// 處理業(yè)務
} catch (Exception e) {
logger.error("消費MQ消息失斄⌒蟆崭添! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
}
return Action.CommitMessage;
}
}
5.消息發(fā)送RocketMqProducer
package com.***.***.***.utils;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON;
import java.util.Properties;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RocketMqProducer {
@Value("${rocketMq.groupId}")
private String groupId;
@Value("${rocketMq.sendMegGroup}")
private String sendMegGroup;
@Value("${rocketMq.accessKey}")
private String accessKey;
@Value("${rocketMq.secretKey}")
private String secretKey;
@Value("${rocketMq.nameSrvAddr}")
private String nameSrvAddr;
@Value("${rocketMq.topic}")
private String topic;
@Value("${rocketMq.startApprovalTag}")
private String startApprovalTag;
@Value("${rocketMq.timeout}")
private String timeout;
@Resource
private ProducerBean producer;
/**
* 發(fā)送異步消息
* @param tag
* @param msgKey
* @param messageBody
*/
public void sendAsyncMsg(String tag, String msgKey, byte[] messageBody) {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, sendMegGroup);
properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
properties.put(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, timeout);
Message msg = new Message(topic, tag, msgKey, messageBody);
try {
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
log.info("rocketMq發(fā)送成功澈蟆,msg:{}", JSON.toJSONString(msg));
}
@Override
public void onException(final OnExceptionContext context) {
log.info("rocketMq發(fā)送失斚韬觥:tag:{},topic:{},body:{}", tag, context.getTopic(), new String(messageBody), context.getException());
// todo 持久化失敗消息雾家,定時補償
}
});
} catch (ONSClientException e) {
log.info("rocketMq發(fā)送異常:", e);
}
}
}
6.調(diào)用消息發(fā)送
@Autowired
private RocketMqProducer rocketMqProducer;
rocketMqProducer.sendAsyncMsg("tag", "msgKey", "msg";