為了維護(hù)消息的有效性我擂,當(dāng)消費(fèi)消息時(shí)候處理失敗時(shí)候衬以,不進(jìn)行消費(fèi),需要我們根據(jù)業(yè)務(wù)區(qū)返回ACK校摩,本項(xiàng)目我使用Redis和ack機(jī)制雙重保險(xiǎn),保障消息一定能夠正確的消費(fèi)
首先,接著上部分內(nèi)容看峻,使用Topic,機(jī)制(不明白的,可以回顧上部分內(nèi)容)
上部分內(nèi)容,我們使用SpringBoot注解,去實(shí)現(xiàn)衙吩,但是控制權(quán)不完全賬務(wù)互妓,當(dāng)進(jìn)行大規(guī)模項(xiàng)目時(shí)候,不太建議使用
@RabbitListener(queues = TopicRabbitConfig.USER_QUEUE)
@RabbitHandler
public void processUser(String message) {
threadPool.execute(new Runnable() {
@Override
public void run() {
logger.info("用戶側(cè)流水:{}",message);
}
});
}
根據(jù)源碼分析坤塞,當(dāng)然這里不分析源碼冯勉,有興趣的可以多失敗幾次就ok明白了
在配置類中定義監(jiān)聽(tīng)器,監(jiān)聽(tīng)這個(gè)序列(
AcknowledgeMode.MANUAL
是必須的哦)
/**
* 接受消息的監(jiān)聽(tīng)尺锚,這個(gè)監(jiān)聽(tīng)客戶交易流水的消息
* 針對(duì)消費(fèi)者配置
* @return
*/
@Bean
public SimpleMessageListenerContainer messageContainer1(ConnectionFactory connectionFactory, TransactionConsumeImpl transactionConsume) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queueMessage());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設(shè)置確認(rèn)模式手工確認(rèn)
container.setMessageListener(transactionConsume);
return container;
}
這個(gè) TransactionConsumeImpl
要繼承ChannelAwareMessageListener
珠闰,主要說(shuō)的手動(dòng)返回ACK就是channel。調(diào)用
@Component
public class TransactionConsumeImpl implements ChannelAwareMessageListener {
private static final Logger logger = LoggerFactory.getLogger(TransactionConsumeImpl.class);
private static final Gson gson = new Gson();
@Autowired
JedisShardInfo jedisShardInfo;
@Autowired
ExecutorService threadPool;
@Autowired
BoluomeFlowService boluomeFlowService;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String boby = new String(message.getBody(), "utf-8");//轉(zhuǎn)換消息瘫辩,我們是使用json數(shù)據(jù)格式
threadPool.execute(new Runnable() { //多線程處理
@Override
public void run() {
Jedis jedis = jedisShardInfo.createResource();
jedis.sadd(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//添加到key為當(dāng)前消息類型的集合里面伏嗜,防止丟失消息
BoluomeFlow flow = gson.fromJson(boby, BoluomeFlow.class);
String json = gson.toJson(flow);
if (boluomeFlowService.insert(flow)) { //當(dāng)添加成功時(shí)候返回成功
logger.info("客戶交易流水添加1條記錄:{}", json);
jedis.srem(TopicRabbitConfig.TRANSACTION_QUEUE, boby);//從當(dāng)前消息類型集合中移除已經(jīng)消費(fèi)過(guò)的消息
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工返回ACK坛悉,通知此消息已經(jīng)爭(zhēng)取消費(fèi)
} catch (IOException ie) {
logger.error("消費(fèi)成功回調(diào)成功,io操作異常");
}
} else {
logger.info("客戶交易流水添加失敗記錄:{}", json);
}
}
});
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 消息的標(biāo)識(shí)承绸,false只確認(rèn)當(dāng)前一個(gè)消息收到裸影,true確認(rèn)所有consumer獲得的消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // ack返回false,并重新回到隊(duì)列军熏,api里面解釋得很清楚
- channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒絕消息
- true 發(fā)送給下一個(gè)消費(fèi)者
- false 誰(shuí)都不接受轩猩,從隊(duì)列中刪除