概述
RocketMQ消息發(fā)送方式:同步(sync)、異步(async)枣购、單向(oneway)
消息Message簡介
org.apache.rocketmq.common.message.Message
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
private String topic;//消息主題
private int flag;//消息flag颓芭,RocketMQ不做處理
private Map<String, String> properties;//擴展屬性
private byte[] body;//消息體
private String transactionId;//事務消息事務id
其中擴展屬性包括以下四個:
- tag:消息tag,用于消息過濾
- keys:Message索引鍵,多個用空格隔開基跑,RocketMQ根據(jù)這些key快速檢索消息
- waitStoreMsgOK:消息發(fā)送時瞧挤,是否等待消息存儲完成后再返回锡宋。
- delayTimeLevel:消息延遲級別,用于延遲消息或消息重試
生產(chǎn)者啟動流程
org.apache.rocketmq.client.producer.DefaultMQProducer
核心屬性如下
屬性名 | 含義 |
---|---|
producerGroup | 生產(chǎn)組 |
createTopicKey | 默認topic |
defaultTopicQueueNums | 默認主題在每一個broker隊列的數(shù)量 |
sendMsgTimeout | 消息發(fā)送超時時間 3秒 |
compressMsgBodyOverHowmuch | 消息體超過該值則壓縮 |
retryTimesWhenSendFailed | 同步消息重試次數(shù)特恬,2次执俩。包含發(fā)送一次,一個消息最終為三次 |
retryTimesWhenSendAsyncFailed | 異步發(fā)送消息重試次數(shù)癌刽,默認2次 |
retryAnotherBrokerWhenNotStoreOK | 消息重試選擇另一個broker時役首,是否不等待存儲結果就返回尝丐,默認為false |
maxMessageSize | 4M |
消息生產(chǎn)者啟動流程
org.apache.rocketmq.client.producer.DefaultMQProducer#start
啟動流程如下圖:
1.jpg
消息發(fā)送流程
2.jpg
核心流程我們可以分為以下步驟查看:
DefaultMQProducerImpl#sendDefaultImpl
這個方法作為消息發(fā)送的核心方法,主要處理以下:
- tryToFindTopicPublishInfo 尋找消息的路由信息
- selectOneMessageQueue 選擇消息隊列需要發(fā)送的隊列
- sendKernelImpl 發(fā)送消息
- updateFaultItem 更新發(fā)送失敗的broker信息
- 發(fā)送消息出現(xiàn)異常時衡奥,處理各個異常信息
- 返回最終的結果
this.tryToFindTopicPublishInfo(msg.getTopic())
消息發(fā)送的核心方法爹袁,方法查找對應的消息路由信息,為消息發(fā)送做準備失息,該方法返回TopicPublishInfo。
TopicPublishInfo 屬性如下:
屬性 | |
---|---|
private boolean orderTopic = false; | 是否為順序消息 |
private List<MessageQueue> messageQueueList | 主題隊列的消息隊列 |
private volatile ThreadLocalIndex sendWhichQueue | 沒選擇一次消息隊列乏屯,該值會增加1 |
private TopicRouteData topicRouteData | topicRouteData |
TopicRouteData 屬性如下
屬性 | |
---|---|
private String orderTopicConf | 順序消息配置 |
private List<QueueData> queueDatas | topic隊列元數(shù)據(jù)根时,包含brokerName,readQueueNums辰晕,writeQueueNums蛤迎,perm,topicSynFlag |
List<BrokerData> brokerDatas | topic分布的broker元數(shù)據(jù)含友,包含 cluster替裆,brokerName,brokerAddrs |
HashMap<String/* brokerAddr /, List<String>/ Filter Server */> filterServerTable | broker上的過濾服務器地址列表 |
DefaultMQProducerImpl#tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//本地緩存獲取 TopicPublishInfo
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//如果沒有緩存窘问,或者緩存失效則從NameServer獲取
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
最終調用 MQClientInstance#updateTopicRouteInfoFromNameServer
- 從NameServer獲取TopicRouteData 數(shù)據(jù)
簡化后代碼
TopicRouteData topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
- 判斷TopicRouteData是否改變過辆童,改變過則更新
- 需要更新則更新對應的brokerAddrTable、topicRouteTable
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
獲取對應要發(fā)送的消息隊列惠赫。一個MessageQueue 包含topic把鉴、brokerName、queueId儿咱。
- 默認sendLatencyFaultEnable 為false庭砍,則直接調用tpInfo.selectOneMessageQueue(lastBrokerName)
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
獲取增量index,取余 然后返回對應的MessageQueue混埠。
如果上次已經(jīng)發(fā)送消息怠缸,存在lastBrokerName,則需要獲取非lastBrokerName的broker钳宪。
- 如果故障延遲機制啟用揭北,即sendLatencyFaultEnable 為true。
-1. 如果啟用故障延遲吏颖,則通過latencyFaultTolerance.isAvailable判斷對應的broker是否可用
-2. 如果可用則會維護latencyFaultTolerance搔体。
DefaultMQProducerImpl#sendKernelImpl
/**
*
* @param msg 待發(fā)送消息
* @param mq 消息將發(fā)送到該隊列
* @param communicationMode 消息發(fā)送模式 sync/async/oneway
* @param sendCallback 消息回調函數(shù)
* @param topicPublishInfo 主題路由信息
* @param timeout 消息超時時間
*/
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout)
- 獲取broker地址
- 消息設置全局id,消息體超過4k侦高,則壓縮并設置sysFlag 為MessageSysFlag.COMPRESSED_FLAG嫉柴。事務消息則設置sysFlag為MessageSysFlag.TRANSACTION_PREPARED_TYPE。
- 注冊消息hook函數(shù)
- 構建消息SendMessageRequestHeader
- MQClientAPIImpl#sendMessage
- NettyRemotingClient#invokeSync