上篇博文我們整理了RabbitMQ的交換機(jī)牙躺、隊(duì)列以及路由綁定等相關(guān)知識佩迟,并且了解了RabbitMQ是如何發(fā)送消息給隊(duì)列的酬诀,以及重要的RoutingKey等重要知識點(diǎn)磺芭,這篇博文我們來重點(diǎn)了解下RabbitMQ是如何發(fā)送消息有梆,消費(fèi)消息的是尖,本片博文我們還是以代碼為主,簡要說明為輔泥耀,同時(shí)對消息的發(fā)送和接收功能進(jìn)行簡單的封裝饺汹,可以作為一個(gè)jar包給第三方進(jìn)行使用。
本博文是在上一篇博文《SpringBoot整合RabbitMQ——交換機(jī)和隊(duì)列的管理和綁定》的基礎(chǔ)上進(jìn)行重構(gòu)和新增消息的發(fā)送和接收的功能
如果我們要提供一個(gè)類似jar包的可以讓第三方來作為依賴引入痰催,從而在代碼中簡單集成我們提供的rabbitMQ的通用的功能首繁,我們的項(xiàng)目需要滿足以下的需求:
- 消息發(fā)送方需要知道消息是否真正的到達(dá)了RabbitMQ,如果發(fā)送不到rabbitMQ,如何保證發(fā)送方能夠處理發(fā)送失敗的業(yè)務(wù)數(shù)據(jù)
- 如何保證消息可靠的達(dá)到消費(fèi)者陨囊,并且消費(fèi)者能否成功消費(fèi)消息弦疮,并且告知發(fā)送者消費(fèi)者的具體消費(fèi)情況(改業(yè)務(wù)需求主要是滿足消息發(fā)送方需要知道消息消費(fèi)方是否消費(fèi)成功),對于是否通知消息發(fā)送方具體的消費(fèi)情況需要做到用戶自定義
- 為了確保消息的正確和安全蜘醋,對于消息要做到持久化胁塞,防止出現(xiàn)異常導(dǎo)致消息丟失
- 保證客戶端對于消息能自主的進(jìn)行消息的確認(rèn),并且對了消費(fèi)失敗的數(shù)據(jù)能在業(yè)務(wù)方保存日志压语、
- 客戶端引入我們的依賴啸罢,可以很方便的在程序中動態(tài)實(shí)現(xiàn)隊(duì)列的注冊和交換機(jī)的綁定,并且很方便的實(shí)現(xiàn)消息的發(fā)送以及發(fā)送失敗的處理
- 客戶端能很方便的繼承消息的消費(fèi)胎食,動態(tài)的指定處理器自定義處理消息扰才,并且提供消息消費(fèi)失敗(包括業(yè)務(wù)流程的失敗和系統(tǒng)代碼錯(cuò)誤引起的失敳蘖)后消息回傳的功能衩匣,回傳功能做到業(yè)務(wù)可控
- 在客戶端消費(fèi)消息存在日志記錄,并且能與業(yè)務(wù)邏輯解耦粥航,實(shí)現(xiàn)業(yè)務(wù)與日志分離
- 能針對客戶端發(fā)出的各類不同類型的消息進(jìn)行處理琅捏,并且是實(shí)現(xiàn)各種不同業(yè)務(wù)的處理擴(kuò)展
消息的發(fā)送
在進(jìn)行消息的發(fā)送之前,我們需要了解下參數(shù)mandatory
:
- 當(dāng)其值為true時(shí)递雀,交換器無法根據(jù)自身的類型和路由鍵匹配到符合條件的隊(duì)列柄延,這是rabbitMQ就會通過回調(diào)函數(shù)將消息返回給生產(chǎn)者。
- 當(dāng)其值為false時(shí)缀程,如果出現(xiàn)上述情形搜吧,則消息會丟失
如果需要處理發(fā)送rabbitMQ失敗的話市俊,在SpringBoot中我們需要在配置文件中配置如下:
spring:
rabbitmq:
template:
mandatory: true
publisher-confirms: true
publisher-returns: true
對應(yīng)的代碼如下:
// RabbitMQConfig類中添加屬性
/**
* 消息發(fā)送失敗,是否回調(diào)給發(fā)送者
*/
@Value("${spring.rabbitmq.template.mandatory:false}")
private Boolean mandatory;
/**
* 是否確認(rèn)
*/
@Value("${spring.rabbitmq.publisher-confirms:false}")
private Boolean publisherConfirms;
/**
* 如果mandatorys設(shè)置成true滤奈,該值也設(shè)置 成true
*/
@Value("${spring.rabbitmq.publisher-returns:false}")
private Boolean publisherReturns;
// RabbitMQConfig中定義connectionFactory中設(shè)置屬性
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses(this.addresses);
cachingConnectionFactory.setUsername(this.username);
cachingConnectionFactory.setPassword(this.password);
cachingConnectionFactory.setVirtualHost(this.virtualHost);
// 如果消息要設(shè)置成回調(diào)秕衙,則以下的配置必須要設(shè)置成true
cachingConnectionFactory.setPublisherConfirms(this.publisherConfirms);
cachingConnectionFactory.setPublisherReturns(this.publisherReturns);
return cachingConnectionFactory;
}
// 同時(shí)為了調(diào)用SpringBoot集成rabbitMQ提供的發(fā)送的方法,我們需要注入rabbitTemplate
/**
* 因?yàn)橐O(shè)置回調(diào)類僵刮,所以應(yīng)是prototype類型据忘,如果是singleton類型,則回調(diào)類為最后一次設(shè)置
* 主要是為了設(shè)置回調(diào)類
*
* @return
*/
@Bean(name = "rabbitTemplate")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
以上我們就完成了配置類的修改搞糕,下面我們來對發(fā)送消息的方法進(jìn)行封裝勇吊,并且支持用戶自定義相關(guān)屬性
在發(fā)送消息之前,我們需要先創(chuàng)建隊(duì)列窍仰,并且將交換機(jī)(這里采用默認(rèn)的交換機(jī)mq.direct)和隊(duì)列進(jìn)行綁定汉规,路由鍵就設(shè)置成隊(duì)列名,方法中提供自定義的綁定方法驹吮,如有需要可以自行進(jìn)行封裝使用
/**
* Copyright ? 2018 五月工作室. All rights reserved.
*
* @Package com.amos.common.send
* @ClassName SendService
* @Description 發(fā)送消息的抽象類针史,子類可以實(shí)現(xiàn)該類來處理對應(yīng)的業(yè)務(wù)邏輯
* <p/>
* 抽象類實(shí)現(xiàn)了ConfirmCallback和ReturnCallback接口,
* confirmCallback來實(shí)現(xiàn)業(yè)務(wù)日志記錄碟狞,并且自定義處理各自的業(yè)務(wù)處理邏輯
* returnCallback來實(shí)現(xiàn)消息發(fā)送失敗時(shí)的業(yè)務(wù)處理啄枕,并且自定義各自的業(yè)務(wù)處理邏輯
* @Author Amos
* @Modifier
* @Date 2019/7/1 15:11
* @Version 1.0
**/
public abstract class AbstractSendService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
public final Log logger = LogFactory.getLog(this.getClass());
public final static String DEFAULT_EXCHANGE = "amq.direct";
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 簡單的發(fā)送消息
* 發(fā)送的交換機(jī)是默認(rèn)的 amq.direct交換機(jī),該交換機(jī)的類型是DIRECT類型族沃,開啟持久化機(jī)制
* 發(fā)送的隊(duì)列即為RoutingKey,需要綁定隊(duì)列時(shí)
*
* @param queue 隊(duì)列频祝,默認(rèn)是跟路由鍵是相同的
* @param content 發(fā)送的內(nèi)容
*/
public void send(String queue, String content) {
if (StringUtils.isEmpty(queue)) {
RabbitMQExceptionUtils.throwRabbitMQException("發(fā)送的隊(duì)列不能為空");
}
if (StringUtils.isEmpty(content)) {
RabbitMQExceptionUtils.throwRabbitMQException("內(nèi)容不能為空");
}
this.send(MqExchange.DEFAULT_DIRECT_EXCHANGE, queue, content, null, UUIDUtils.generateUuid());
}
/**
* 發(fā)送一條有過期時(shí)間的消息
*
* @param queue 隊(duì)列,默認(rèn)是跟路由鍵相同的
* @param content 發(fā)送的內(nèi)容
* @param expireTime 過期時(shí)間 時(shí)間毫秒
*/
public void send(String queue, String content, int expireTime) {
if (StringUtils.isEmpty(queue)) {
RabbitMQExceptionUtils.throwRabbitMQException("發(fā)送的隊(duì)列不能為空");
}
if (StringUtils.isEmpty(content)) {
RabbitMQExceptionUtils.throwRabbitMQException("內(nèi)容不能為空");
}
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 設(shè)置消息的過期時(shí)間
message.getMessageProperties().setExpiration(expireTime + "");
return message;
}
};
this.send(MqExchange.DEFAULT_DIRECT_EXCHANGE, queue, content, messagePostProcessor, UUIDUtils.generateUuid());
}
/**
* 按照給定的交換機(jī)脆淹、路由鍵常空、發(fā)送內(nèi)容、發(fā)送的自定義屬性來發(fā)送消息
* TODO 待完善交互方式
*
* @param exchange 交換機(jī)名稱
* @param routingKey 路由鍵
* @param object 發(fā)送的內(nèi)容
* @param messagePostProcessor 發(fā)送消息自定義處理
* @param messageId 消息ID
*/
public void send(String exchange, String routingKey, Object object, MessagePostProcessor messagePostProcessor, String messageId) {
if (StringUtils.isEmpty(exchange)) {
RabbitMQExceptionUtils.throwRabbitMQException("交換機(jī)不能為空");
}
if (StringUtils.isEmpty(routingKey)) {
RabbitMQExceptionUtils.throwRabbitMQException("路由鍵不能為空");
}
if (StringUtils.isEmpty(object)) {
RabbitMQExceptionUtils.throwRabbitMQException("發(fā)送的內(nèi)容不能為空");
}
CorrelationData correlationData = new CorrelationData();
correlationData.setId(StringUtils.isEmpty(messageId) ? UUIDUtils.generateUuid() : messageId);
MqMessage mqMessage = new MqMessage();
mqMessage.setMessageBody(object);
mqMessage.setMessageId(correlationData.getId());
mqMessage.setExchangeName(exchange);
mqMessage.setQueueName(routingKey);
mqMessage.setRoutingKey(routingKey);
if (StringUtils.isEmpty(messagePostProcessor)) {
this.rabbitTemplate.convertAndSend(exchange, routingKey, mqMessage, correlationData);
} else {
// 發(fā)送對應(yīng)的消息
this.rabbitTemplate.convertAndSend(exchange, routingKey, mqMessage, messagePostProcessor, correlationData);
}
}
/**
* 默認(rèn)實(shí)現(xiàn)發(fā)送確認(rèn)的處理方法
* 子類需要重寫該方法盖溺,實(shí)現(xiàn)自己的業(yè)務(wù)處理邏輯
*
* @param messageId 消息
* @param ack
* @param cause
*/
public abstract void handleConfirmCallback(String messageId, boolean ack, String cause);
/**
* 默認(rèn)實(shí)現(xiàn)發(fā)送匹配不上隊(duì)列時(shí) 回調(diào)函數(shù)的處理
*
* @param message
* @param replyCode
* @param replyText
* @param routingKey
*/
public abstract void handleReturnCallback(Message message, int replyCode, String replyText,
String routingKey);
/**
* 交換機(jī)如果根據(jù)自身的類型和路由鍵匹配上對應(yīng)的隊(duì)列時(shí)漓糙,是否調(diào)用returnCallback回調(diào)函數(shù)
* true: 調(diào)用returnCallback回調(diào)函數(shù)
* false: 不調(diào)用returnCallback回調(diào)函數(shù) 這樣在匹配不上對應(yīng)的隊(duì)列時(shí),會導(dǎo)致消息丟失
*/
@Value("${spring.message.mandatory:false} ")
private Boolean mandatory;
/**
* 默認(rèn)隊(duì)列的優(yōu)先級
*/
public static final int MESSAGE_PRIORITY = 1;
@PostConstruct
public final void init() {
this.logger.info("sendservice 初始化...... ");
this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnCallback(this);
}
/**
* 確認(rèn)后回調(diào)方法
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public final void confirm(CorrelationData correlationData, boolean ack, String cause) {
this.logger.info("confirm-----correlationData:" + correlationData.toString() + "---ack:" + ack + "----cause:" + cause);
// TODO 記錄日志(數(shù)據(jù)庫或者es)
this.handleConfirmCallback(correlationData.getId(), ack, cause);
}
/**
* 失敗后回調(diào)方法
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public final void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
this.logger.info("return-----message:" + message.toString() + "---replyCode:" + replyCode + "----replyText:" + replyText + "----exchange:" + exchange + "----routingKey:" + routingKey);
// TODO 記錄日志(數(shù)據(jù)庫或者es)
this.handleReturnCallback(message, replyCode, replyText, routingKey);
}
}
消息的接收
消息發(fā)送RabbitMQ之后烘嘱,我們需要定義監(jiān)聽來監(jiān)控隊(duì)列昆禽, 并且消費(fèi)隊(duì)列上的消息,本類方法中對消息消費(fèi)進(jìn)行了封裝拙友,添加了消費(fèi)信息日志和狀態(tài)的記錄为狸,并且支持用戶自定義消費(fèi)方法。消費(fèi)完成之后遗契,可以自定義設(shè)置是否返回給消息發(fā)送者消息消費(fèi)的具體情況,并且針對不同類型的消息 病曾,封裝了命令模式來處理不同類型的消息牍蜂,方便用戶后期對消息的處理的擴(kuò)展漾根,具體的代碼可以參考Gitee上項(xiàng)目
rabbitmq
主要代碼邏輯如下:
- 注冊隊(duì)列,并且使用隊(duì)列名來與交換機(jī)進(jìn)行綁定
- 為該隊(duì)列添加好消息接收處理
- 封裝消息接收的處理邏輯鲫竞,并且對于消息接收失敗的進(jìn)行重發(fā)
下面是重要代碼說明:
注冊隊(duì)列辐怕,并且為該隊(duì)列設(shè)置消息監(jiān)聽
/**
* Copyright ? 2018 五月工作室. All rights reserved.
*
* @Project: rabbitmq
* @ClassName: RegisterQueue
* @Package: com.amos.common.register
* @author: zhuqb
* @Description: 注冊隊(duì)列并且設(shè)置監(jiān)聽
* @date: 2019/7/2 0002 下午 15:32
* @Version: V1.0
*/
@Data
public abstract class AbstractRegisterQueue {
public final Log logger = LogFactory.getLog(this.getClass());
@Autowired
AmBindDeclare amBindDeclare;
@Autowired
AmQueueDeclare amQueueDeclare;
@Autowired
MessageListen messageListen;
@Value("${spring.rabbitmq.queue.isAck:false}")
private Boolean isAck;
/**
* 子類提供自定義的消息監(jiān)聽
*
* @return
*/
public abstract AbstractMessageHandler messageHandler();
/**
* 實(shí)例化隊(duì)列名
*
* @param queue
* @return
*/
public AbstractRegisterQueue queue(String queue) {
this.queue = queue;
return this;
}
/**
* 實(shí)例化交換機(jī)
*
* @param exchange
* @return
*/
public AbstractRegisterQueue exchange(String exchange) {
this.exchange = exchange;
return this;
}
/**
* 實(shí)例化路由鍵
*
* @param routingKey
* @return
*/
public AbstractRegisterQueue routingKey(String routingKey) {
this.routingKey = routingKey;
return this;
}
/**
* 實(shí)例化結(jié)構(gòu)化屬性
*
* @param properties
* @return
*/
public AbstractRegisterQueue properties(Map<String, Object> properties) {
this.properties = properties;
return this;
}
/**
* 隊(duì)列名
*/
private String queue;
/**
* 交換機(jī) 默認(rèn)是 amq.direct 交換機(jī)
*/
private String exchange = MqExchange.DEFAULT_DIRECT_EXCHANGE;
/**
* 路由鍵 默認(rèn)是隊(duì)列名
*/
private String routingKey = this.getQueue();
/**
* 結(jié)構(gòu)化屬性
*/
private Map<String, Object> properties;
public String getRoutingKey() {
if (StringUtils.isEmpty(this.routingKey)) {
return this.getQueue();
}
return this.routingKey;
}
/**
* 注冊隊(duì)列,并且監(jiān)聽隊(duì)列
*
* @return
*/
public boolean registerQueue() {
MqQueue mqQueue = new MqQueue().name(this.queue);
this.amQueueDeclare.declareQueue(mqQueue);
boolean tag = this.amBindDeclare.bind(this.queue, Binding.DestinationType.QUEUE, this.exchange, this.getRoutingKey(), this.properties);
if (tag) {
try {
this.messageListen.addMessageLister(this.queue, this.messageHandler(), this.isAck);
return Boolean.TRUE;
} catch (Exception e) {
if (this.logger.isDebugEnabled()) {
e.printStackTrace();
}
return Boolean.FALSE;
}
}
return tag;
}
}
上面類主要是用來注冊隊(duì)列从绘,并且注冊成功之后為其新增消息監(jiān)聽類
我們再來新增消息監(jiān)聽以及消息接收處理的代碼:
/**
* Copyright ? 2018 五月工作室. All rights reserved.
*
* @Package com.amos.common.listen
* @ClassName AbstractMessageHandle
* @Description 隊(duì)列設(shè)置監(jiān)聽基類
* @Author Amos
* @Modifier
* @Date 2019/7/1 20:21
* @Version 1.0
**/
@Component
public class MessageListen {
public final Log logger = LogFactory.getLog(this.getClass());
@Autowired
private ConnectionFactory connectionFactory;
/**
* 在容器中加入消息監(jiān)聽
*
* @param queue
* @param messageHandler
* @param isAck
* @throws Exception
*/
public void addMessageLister(String queue, AbstractMessageHandler messageHandler, boolean isAck) throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(this.connectionFactory);
container.setQueueNames(queue);
AcknowledgeMode ack = AcknowledgeMode.NONE;
if (isAck) {
ack = AcknowledgeMode.MANUAL;
}
messageHandler.setAck(queue, ack);
container.setAcknowledgeMode(ack);
MessageListenerAdapter adapter = new MessageListenerAdapter(messageHandler);
container.setMessageListener(adapter);
container.start();
this.logger.info("------ 已成功監(jiān)聽異步消息觸發(fā)通知隊(duì)列:" + queue + " ------");
}
}
指明隊(duì)列的監(jiān)聽類寄疏,并且維護(hù)是否手動ack消息
/**
* Copyright ? 2018 五月工作室. All rights reserved.
*
* @Package com.amos.common.listen
* @ClassName AbstractMessageHandler
* @Description 消息接收處理類
* <p/>
* 實(shí)現(xiàn) ChannelAwareMessageListener接口 重寫onMessage方法來實(shí)現(xiàn)業(yè)務(wù)的處理
* @Author Amos
* @Modifier
* @Date 2019/7/1 20:09
* @Version 1.0
**/
@Component
public abstract class AbstractMessageHandler implements ChannelAwareMessageListener {
public final Log logger = LogFactory.getLog(this.getClass());
@Value("${spring.message.queue.retryTimes:5}")
private Integer retryTimes;
/**
* 用戶自定義消息處理
*
* @param message 消息
*/
public abstract void handleMessage(String message, Channel channel);
private ConcurrentHashMap<String, AcknowledgeMode> ackMap = new ConcurrentHashMap<>(8);
/**
* 消息處理
*
* @param message 消息體
* @param channel channel通道
* @throws Exception
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
this.logger.info("接收到發(fā)送的消息.......");
// 業(yè)務(wù)處理是否成功
boolean handleResult = false;
// 消息處理標(biāo)識
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 獲取消費(fèi)的隊(duì)列名
String queue = message.getMessageProperties().getConsumerQueue();
MqMessage mqMessage = null;
// TODO 進(jìn)行自己的業(yè)務(wù)處理 比如記錄日志
try {
String msg = new String(message.getBody());
mqMessage = JSONObject.parseObject(msg, MqMessage.class);
// 自定義業(yè)務(wù)處理
this.handleMessage(JSONObject.toJSONString(mqMessage.getMessageBody()), channel);
} catch (Exception e) {
if (this.logger.isDebugEnabled()) {
e.printStackTrace();
}
}
// TODO 如果消息處理失敗,處理失敗的采取措施僵井, 確保消息不丟失
this.onMessageCompleted(mqMessage, queue, channel, deliveryTag, handleResult);
}
/**
* 消息處理結(jié)束后進(jìn)行復(fù)處理
*
* @param mqMessage 消息實(shí)體
* @param queue
* @param channel
* @param deliveryTag
* @param handleResult 業(yè)務(wù)處理是否成功
*/
private void onMessageCompleted(MqMessage mqMessage, String queue, Channel channel, long deliveryTag, boolean handleResult) {
this.logger.info("消息:" + mqMessage.toString() + "處理完成陕截,等待事務(wù)提交和狀態(tài)更新");
if (!handleResult) {
// TODO 業(yè)務(wù)處理失敗,需要更新狀態(tài)
return;
}
AcknowledgeMode ack = this.ackMap.get(queue);
if (ack.isManual()) {
//重試5次
int retryTimes = 5;
//進(jìn)行消息
RetryTemplate oRetryTemplate = new RetryTemplate();
SimpleRetryPolicy oRetryPolicy = new SimpleRetryPolicy();
oRetryPolicy.setMaxAttempts(retryTimes);
oRetryTemplate.setRetryPolicy(oRetryPolicy);
try {
// obj為doWithRetry的返回結(jié)果批什,可以為任意類型
Integer result = oRetryTemplate.execute(new RetryCallback<Integer, Exception>() {
int count = 0;
@Override
public Integer doWithRetry(RetryContext context) throws Exception {//開始重試
channel.basicAck(deliveryTag, false);
AbstractMessageHandler.this.logger.info("消息" + mqMessage.toString() + "已簽收");
return ++this.count;
}
}, new RecoveryCallback<Integer>() {
@Override
public Integer recover(RetryContext context) throws Exception { //重試多次后都失敗了
AbstractMessageHandler.this.logger.info("消息" + mqMessage.toString() + "簽收失敗");
return Integer.MAX_VALUE;
}
});
if (result.intValue() <= retryTimes) {
//消息簽收成功 更改狀態(tài)
} else {
//MQ服務(wù)器或網(wǎng)絡(luò)出現(xiàn)問題农曲,簽收失敗 更改狀態(tài)
}
} catch (Exception e) {
this.logger.error("消息" + mqMessage.toString() + "簽收出現(xiàn)異常:" + e.getMessage());
}
} else {
this.logger.info("消息自動簽收");
}
}
/**
* @param ack
* @Title: setAck
* @date: 2018年9月14日 上午11:17:41
* @Description: 注入消息簽收模式
*/
public final void setAck(String queue, AcknowledgeMode ack) {
this.ackMap.put(queue, ack);
this.logger.info("注入隊(duì)列 " + queue + " 消息簽收模式: " + ack.name());
}
}
上面代碼主要是封裝了消息接收處理的代碼邏輯
- 定義抽象類方便讓子類來繼承實(shí)現(xiàn)基類中的方法,其次類實(shí)現(xiàn)了
ChannelAwareMessageListener
接口驻债,實(shí)現(xiàn)onMessage
方法的重寫 - 該方法中除了進(jìn)行自身業(yè)務(wù)的處理乳规,同時(shí)也調(diào)用業(yè)務(wù)的自定義消息處理邏輯
- 對于消息接收處理失敗后,進(jìn)行消息重發(fā)合呐,并且可以業(yè)務(wù)進(jìn)行日志記錄
- 在對隊(duì)列進(jìn)行設(shè)置監(jiān)聽時(shí)暮的,指定是否手動ack消息
- 業(yè)務(wù)新增類繼承
AbstractMessageHandler
基類,并且將該監(jiān)聽類與隊(duì)列動態(tài)綁定即可
以上就是本博文對消息的發(fā)送和接收處理進(jìn)行的簡單的封裝淌实,其中核心的業(yè)務(wù)都已經(jīng)實(shí)現(xiàn)青扔,待后期與elasticsearch集合完善日志記錄相關(guān)的功能
附: 不同類型消息的處理
之所以不跟上面的代碼整合在一起,主要是因?yàn)楸緍abbitMQ的項(xiàng)目主要是為了對rabbitMQ的常用業(yè)務(wù)進(jìn)行封裝翩伪,消息的處理大多數(shù)是業(yè)務(wù)方面的工作微猖,如果整合在一起的會造成代碼的耦合,不利于rabbitMQ功能代碼的剝離缘屹。
不同消息類型的處理的業(yè)務(wù)流程如下:
- 聲明一個(gè)接口定義消息處理的通用方法
/**
* Copyright ? 2018 五月工作室. All rights reserved.
*
* @Package com.amos.common.send
* @ClassName SendService
* @Description 對消息進(jìn)行處理
* @Author Amos
* @Modifier
* @Date 2019/7/1 15:11
* @Version 1.0
**/
public interface Receiver {
/**
* 對消息進(jìn)行處理
*
* @param messageData
* @return
*/
HandleResult handleMessage(MessageData messageData);
}
- 定義消息接收基類凛剥,該類實(shí)現(xiàn)
Receiver
接口,積累中定義不同類型處理的自定義方法以及處理成功和處理失敗的業(yè)務(wù)邏輯
/**
* Copyright ? 2018 五月工作室. All rights reserved.
*
* @Package com.amos.common.send
* @ClassName SendService
* @Description 定義通用消息接收處理基類
* @Author Amos
* @Modifier
* @Date 2019/7/1 15:11
* @Version 1.0
**/
public abstract class AbstractReceiver implements Receiver {
private static final Logger logger = LoggerFactory.getLogger(AbstractReceiver.class);
/**
* 用戶自定義消息處理
*
* @param messageData
* @return
*/
public abstract HandleResult exec(MessageData messageData) throws Exception;
/**
* 用戶自定義驗(yàn)證
*
* @param messageData
* @return
*/
public abstract Result validate(MessageData messageData);
/**
* 成功處理
*
* @param messageData
* @return
*/
public abstract HandleResult handleSuccess(MessageData messageData);
/**
* 失敗處理
*
* @param messageData
* @return
*/
public abstract HandleResult handleFail(MessageData messageData);
/**
* 處理
*
* @param messageData
* @return
*/
@Override
public final HandleResult handleMessage(MessageData messageData) {
logger.info(this.getClass().getSimpleName() + "-->handleMessage()參數(shù) unicomData:{}", messageData.toString());
HandleResult handleResult = null;
try {
// 如果自定義驗(yàn)證不通過
Result result = this.validate(messageData);
if (!ResultEnum.success().equals(result.getCode())) {
// 如果驗(yàn)證失敗 進(jìn)行失敗處理
return this.handleFail(messageData);
}
// 根據(jù)自行處理的返回結(jié)果
handleResult = this.exec(messageData);
// 執(zhí)行成功處理的邏輯
handleResult = this.handleSuccess(messageData);
} catch (Exception e) {
e.printStackTrace();
messageData.setContent(e.getMessage());
return this.handleFail(messageData);
}
return handleResult;
}
}
- 針對不同的消息類型實(shí)體進(jìn)行處理 ,這里可以采用命令模式來封裝代碼轻姿,首先定義命令的基類犁珠,基類中定義好處理的方法,子類實(shí)現(xiàn)該基類互亮,并且設(shè)置對應(yīng)的命令來處理其業(yè)務(wù)邏輯犁享,該種設(shè)計(jì)模式將命令與處理者之間進(jìn)行了松耦合,可以很方便的維護(hù)命令與處理者之間關(guān)系
//命令的基類
/**
* Copyright ? 2018 五月工作室. All rights reserved.
*
* @Package com.amos.common.send
* @ClassName SendService
* @Description 命令抽象類
* <p/>
* 把接收消息的類型封裝成一個(gè)命令 并且交給指定的接收者出處理
* 方便擴(kuò)展每個(gè)命令的處理
* @Author Amos
* @Modifier
* @Date 2019/7/1 15:11
* @Version 1.0
**/
public abstract class AbstractCommand {
/**
* 每個(gè)命令都必須被處理
*
* @param messageData
* @return
*/
public abstract HandleResult execute(MessageData messageData);
}
// 回調(diào)命令的處理豹休,在回調(diào)命令中設(shè)置了抽象處理者炊昆,處理者交由子類去具現(xiàn)
/**
* Copyright ? 2018 五月工作室. All rights reserved.
*
* @Package com.amos.common.send
* @ClassName SendService
* @Description 回調(diào)函數(shù) 消息處理
* @Author Amos
* @Modifier
* @Date 2019/7/1 15:11
* @Version 1.0
**/
@Component
public class CallBackCommand extends AbstractCommand {
/**
* 定義handler來進(jìn)行命令處理
*/
private AbstractHandler handler;
public CallBackCommand(AbstractHandler handler) {
this.handler = handler;
}
public CallBackCommand init(AbstractHandler handler) {
this.handler = handler;
return this;
}
/**
* 執(zhí)行業(yè)務(wù)處理
*
* @param unicomData
* @return
*/
@Override
public HandleResult execute(MessageData unicomData) {
return this.handler.handle(unicomData);
}
}
- 定義具體的處理者來處理不同的消息
// 定義處理者基類
/**
* Copyright ? 2018 五月工作室. All rights reserved.
*
* @Package com.amos.common.send
* @ClassName SendService
* @Description 處理消息 抽象類
* <p/>
* 業(yè)務(wù)處理需要繼承該基類,實(shí)現(xiàn)處理的方法
* @Author Amos
* @Modifier
* @Date 2019/7/1 15:11
* @Version 1.0
**/
public abstract class AbstractHandler {
/**
* 自定義處理
*
* @param data
* @return
*/
public abstract HandleResult handle(MessageData data);
}
// 子類實(shí)現(xiàn)處理者基類并且實(shí)現(xiàn)具體的處理方法
/**
* Copyright ? 2018 五月工作室. All rights reserved.
*
* @Package com.amos.common.send
* @ClassName SendService
* @Description 回調(diào)消息處理者
* @Author Amos
* @Modifier
* @Date 2019/7/1 15:11
* @Version 1.0
**/
@Component
public class CallBackHandler extends AbstractHandler {
/**
* 修改消息
*
* @param data
* @return
*/
@Override
public HandleResult handle(MessageData data) {
// TODO 自定義業(yè)務(wù)邏輯處理
return new HandleResult.CallBack(true).callback(false).msg("處理成功").builder();
}
}