發(fā)送消息的幾種模式:
- 同步發(fā)送:可靠性高,但延遲較高扇售。
- 異步發(fā)送:提高性能前塔,但需要額外處理回調(diào)。
- 單向發(fā)送:延遲最低承冰,但沒(méi)有消息確認(rèn)华弓。
- 順序發(fā)送:保證消息順序,但影響性能困乒。
- 廣播發(fā)送:每個(gè)消費(fèi)者都消費(fèi)消息寂屏,適用于廣播場(chǎng)景。
- 事務(wù)消息:支持分布式事務(wù)娜搂,適合保證消息一致性迁霎。
springboot集成rocketmq的語(yǔ)法:
同步發(fā)送:
rocketMQTemplate.syncSend(topic, message)
異步發(fā)送:
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("Send success: " + sendResult);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
單向發(fā)送:
rocketMQTemplate.sendOneWay(topic, message);
順序發(fā)送:
// 使用 MessageQueueSelector 來(lái)控制消息發(fā)送到某個(gè)隊(duì)列,從而保證順序性
rocketMQTemplate.send(topic, message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 根據(jù)某個(gè)參數(shù)決定消息發(fā)送到哪個(gè)隊(duì)列
int index = Math.abs(msg.getKeys().hashCode()) % mqs.size();
return mqs.get(index);
}
});
廣播發(fā)送:
rocketMQTemplate.setMessageModel(MessageModel.BROADCASTING);
事務(wù)消息:
// 發(fā)送事務(wù)消息
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, message, null);
tag的使用的坑:
使用相同的topic和group百宇,使用不同的tag區(qū)分消息考廉,會(huì)出現(xiàn)指定消費(fèi)tagA的消息還會(huì)消費(fèi)tagB的消息,
- 輪詢消費(fèi)
是由于消息隊(duì)列的負(fù)載均衡機(jī)制 和 消費(fèi)者組的共享消費(fèi) 導(dǎo)致的携御。雖然消費(fèi)者配置了不同的 tag芝此,消息隊(duì)列的分配不受 tag 影響,導(dǎo)致多個(gè)消費(fèi)者可能從相同的消息隊(duì)列中獲取不同 tag 的消息因痛。 - 避免輪詢消費(fèi)
可以通過(guò)調(diào)整消費(fèi)者組配置婚苹、增加消息隊(duì)列數(shù)量、確保每個(gè)消費(fèi)者配置不同的 selectorExpression 來(lái)減少或避免輪詢消費(fèi)現(xiàn)象鸵膏。
參考: