發(fā)送方式
producer發(fā)送消息支持3種方式惭笑,同步、異步和Oneway瞪浸。
- 同步發(fā)送:客戶(hù)端提交消息到broker后會(huì)等待返回結(jié)果儒将,相對(duì)來(lái)說(shuō)是最常用的方式。
- 異步發(fā)送:調(diào)用發(fā)送接口時(shí)會(huì)注冊(cè)一個(gè)callback類(lèi)对蒲,發(fā)送線程繼續(xù)其它業(yè)務(wù)邏輯钩蚊,producer在收到broker結(jié)果后回調(diào)。比較適合不想發(fā)送結(jié)果影響正常業(yè)務(wù)邏輯的情況蹈矮。
- Oneway:Producer提交消息后砰逻,無(wú)論broker是否正常接收消息都不關(guān)心。適合于追求高吞吐泛鸟、能容忍消息丟失的場(chǎng)景蝠咆,比如日志收集。
發(fā)送實(shí)例
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
發(fā)送消息首先需要初始化一個(gè)DefaultMQProducer
,設(shè)置group name和nameserv的地址刚操。Producer啟動(dòng)后就可以往指定的topic發(fā)送消息闸翅。
MQProducer初始化
Producer的調(diào)用關(guān)系是
MQProducer -> DefaultMQProducer ->DefaultMQProducerImpl
DefaultMQProducer是一個(gè)Facade類(lèi),封裝了DefaultMQProducerImpl
內(nèi)部實(shí)現(xiàn)菊霜。我們來(lái)看下Producer的啟動(dòng)過(guò)程坚冀,DefaultMQProducerImpl.start()
。
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
//參數(shù)檢查鉴逞,不能使用系統(tǒng)默認(rèn)的GroupName
this.checkConfig();
//設(shè)置clientInstanceName记某,使用進(jìn)程ID (PID)
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 初始化MQClientInstance,一個(gè)進(jìn)程只會(huì)存在一個(gè)MQClientInstance, 設(shè)置clientId (IP@PID)
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//將當(dāng)前Producer注冊(cè)進(jìn)MQClientInsance构捡,保證一個(gè)producerName值對(duì)應(yīng)一個(gè)Producer
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
//啟動(dòng)MQClientInstance
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
//設(shè)置狀態(tài)為RUNNING
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//向所有broker發(fā)送一次心跳
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
從上面的代碼可以看出液南,start的過(guò)程主要就是初始化和啟動(dòng)一個(gè)MQClientInstance
,將producer注冊(cè)到instance中叭喜。我們來(lái)看下MQClientInstance的啟動(dòng)過(guò)程贺拣。
MQClientInstance啟動(dòng)過(guò)程
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 1、如果NameservAddr為空捂蕴,嘗試從http server獲取nameserv的地址
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
// 2譬涡、啟動(dòng)MQClientAPIImpl,初始化NettyClient
this.mQClientAPIImpl.start();
// 3啥辨、開(kāi)啟Client的定時(shí)任務(wù)
this.startScheduledTask();
// 4涡匀、Start pull service,開(kāi)始處理PullRequest
this.pullMessageService.start();
// 5、Start rebalance service
this.rebalanceService.start();
// Start push service
//6溉知、啟動(dòng)Client內(nèi)置的producer
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
1陨瘩、如果producer在初始化的時(shí)候沒(méi)有設(shè)置nameserv的地址,則會(huì)嘗試從一個(gè)http server獲取nameserv级乍。這個(gè)httpserver是可以配置的舌劳,這種方式非常適合于有統(tǒng)一配置中心的系統(tǒng)
3、這里開(kāi)啟的定時(shí)任務(wù)有以下幾個(gè):
1)獲取nameserv地址玫荣,就是重復(fù)的做第1步甚淡,這樣就可以動(dòng)態(tài)切換nameserv的地址
2)從nameserv更新topicRouteInfo,對(duì)于producer來(lái)說(shuō)topic的路由信息是最重要的
3)將緩存的broker信息和最新的topicRouteInfo做對(duì)比捅厂,清除已經(jīng)下線的broker
4)向broker發(fā)送心跳
4 ~ 6贯卦,producer和consumer公用一個(gè)MQClientInstance的實(shí)現(xiàn)。這幾步初始化是給consumer用的焙贷,后面講consumer的時(shí)候再講撵割。
Producer啟動(dòng)完成以后,就可以發(fā)送消息了辙芍,下面我們來(lái)看下一條普通的message的發(fā)送過(guò)程
消息發(fā)送
Producer默認(rèn)采用SYNC方式提交消息啡彬,消息提交給broker收到response后返回。方法是DefaultMQProducerImpl.send( Message msg)
/**
* DEFAULT SYNC -------------------------------------------------------
*/
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//1、參數(shù)檢查庶灿,消息不能發(fā)給系統(tǒng)預(yù)留的topic注簿,消息體是否超過(guò)最大長(zhǎng)度
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
//2、根據(jù)消息的topic跳仿,獲取該topic的路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
....
//3、發(fā)送重試次數(shù)
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
//用來(lái)緩存發(fā)送和重試中已經(jīng)用過(guò)的broker
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//4捐晶、從所有topic可用queue中選擇一個(gè)queue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {//獲取Queue成功
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
...
//5菲语、提交消息到mq
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
//6、成功惑灵,更新本次調(diào)用時(shí)間到MQFaultStrategy中
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
//異步和ONEWAY調(diào)用后就直接返回了
case ASYNC:
return null;
case ONEWAY:
return null;
//7山上、如果broker存儲(chǔ)失敗,判斷是否要重試
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
// 8英支、調(diào)用接口異常佩憾,更新?tīng)顟B(tài)到MQFaultStrategy中
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
...
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
...
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
...
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
...
throw e;
}
} else {
break;
}
}
//9、成功則返回結(jié)果
if (sendResult != null) {
return sendResult;
}
...
MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
//超過(guò)重試次數(shù)后干花,根據(jù)不同的錯(cuò)誤設(shè)置拋出異常類(lèi)型
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
...
}
從上面的發(fā)送邏輯可以看出妄帘,無(wú)論哪種發(fā)送方式,最終都是調(diào)用的sendDefaultImpl
來(lái)提交消息池凄。
第2步:獲取topic的所有路由信息抡驼,詳細(xì)邏輯后面講
第3步:SYNC發(fā)送可以設(shè)置失敗重試次數(shù)
第4步:因?yàn)槊總€(gè)topic會(huì)在集群的多個(gè)broker上存在多個(gè)queue,所以這里會(huì)選擇一個(gè)合適的queue肿仑,也就是在producer端實(shí)現(xiàn)負(fù)載均衡的功能致盟,詳細(xì)邏輯后面講
第6和8步:無(wú)論提交消息成功或者失敗,都會(huì)更新結(jié)果到MQFaultStrategy
中尤慰,也就是第4中選取queue時(shí)采用的策略
第7步:對(duì)于消息提交成功馏锡,不止有SUCCESS一種狀態(tài),還有別的情況下也會(huì)認(rèn)為成功的伟端,比如broker接收和處理消息成功了杯道,但是寫(xiě)給slave失敗了,或者數(shù)據(jù)落盤(pán)失敗了等荔泳。針對(duì)于存儲(chǔ)失敗的情況蕉饼,客戶(hù)端可以選擇是否要重新發(fā)送。
以上就是消息發(fā)送的整個(gè)流程玛歌,下面分解下每一步的實(shí)現(xiàn)
獲取topic路由
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//如果TopicPushlishInfo不存在昧港,則會(huì)嘗試從Nameserv更新topic路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
根據(jù)topic直接從內(nèi)存的緩存中獲取路由信息,緩存的更新在前面的定時(shí)任務(wù)已經(jīng)講過(guò)支子。
如果TopicPushlishInfo不存在创肥,則會(huì)嘗試從Nameserv更新信息。更新策略是:
1)按topicName去nameserv找指定topic的route信息;
2)如果第一步?jīng)]獲取到則嘗試獲取默認(rèn)創(chuàng)建topic(TBW102)的route信息叹侄,前提是broker支持默認(rèn)創(chuàng)建巩搏。
最終,如果沒(méi)有獲取到topic的route信息趾代,則報(bào)錯(cuò)中止消息發(fā)送
Queue選取策略
選擇Queue
Queue的選取是發(fā)送端實(shí)現(xiàn)負(fù)責(zé)均衡的核心贯底,根據(jù)client是否開(kāi)啟了延時(shí)容錯(cuò),實(shí)現(xiàn)輪詢(xún)和加可用性輪詢(xún)的選取策略撒强。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//直接調(diào)用MQFaultStrategy的方法
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
//MQFaultStrategy的方法實(shí)現(xiàn)
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//開(kāi)啟了延時(shí)容錯(cuò)
if (this.sendLatencyFaultEnable) {
try {
//1禽捆、首先獲取上次使用的Queue index+1
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
//2、找到index對(duì)應(yīng)的queue
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
//3飘哨、如果queue對(duì)應(yīng)的broker可用胚想,則使用該broker
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
//4、如果上一步?jīng)]找個(gè)合適的broker芽隆,則從所有的broker中選擇一個(gè)相對(duì)合適的浊服,并且broker是可寫(xiě)的。
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
//5胚吁、如果以上都沒(méi)找到牙躺,則直接按順序選擇下一個(gè)
return tpInfo.selectOneMessageQueue();
}
//6、未開(kāi)啟延時(shí)容錯(cuò)囤采,直接按順序選下一個(gè)
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
Producer為每個(gè)topic緩存了一個(gè)全局index述呐,每次發(fā)送之后+1,然后從所有queue列表中選擇index位置上的queue,這樣就實(shí)現(xiàn)了輪詢(xún)的效果蕉毯。
如果開(kāi)啟了延時(shí)容錯(cuò)乓搬,則會(huì)考慮broker的可用性:
第1) 2)步:根據(jù)全局index找到queue
第3)步:如果根據(jù)延時(shí)容錯(cuò)判斷queue所在的broker當(dāng)前可用,并且是第一次發(fā)送代虾,或者是重試并且和上次用的broker是同一個(gè)进肯,則使用這個(gè)queue。這里面有兩個(gè)邏輯棉磨,一個(gè)是broker的可用性是如何判斷的江掩,這個(gè)我們下面說(shuō);第二個(gè)是為什么重試的時(shí)候要選上次的broker乘瓤,下面說(shuō)下我的理解环形。
由前面的發(fā)送邏輯中的第6和8步知道,有兩種情況會(huì)重試衙傀,一種是broker返回處理成功但是store失敗抬吟,一種是broker返回失敗。
對(duì)于返回失敗的情況统抬,其實(shí)會(huì)直接更新broker為短時(shí)不可用狀態(tài),這個(gè)在第一個(gè)if條件就已經(jīng)通不過(guò)了火本;而對(duì)于store失敗的情況危队,說(shuō)明broker當(dāng)前是正常的,重發(fā)還是發(fā)給同一個(gè)broker有利于防止消息重復(fù)钙畔。
第4)步:如果將所有queue按照第3)步的情況過(guò)一遍茫陆,發(fā)現(xiàn)都不符合條件,則從所有broker中選擇一個(gè)相對(duì)好的擎析。
第5)步:如果第4不中的broker不支持寫(xiě)入簿盅,則跟未開(kāi)啟延時(shí)容錯(cuò)一樣的邏輯,直接選下一個(gè)queue
Broker延時(shí)控制邏輯
由上面的queue的選擇策略可以知道揍魂,queue的選擇除了輪詢(xún)以外挪鹏,就是根據(jù)Broker的可用性∮淅樱回看下消息發(fā)送的第6步和第8步,在消息發(fā)送后會(huì)更新時(shí)間和發(fā)送狀態(tài)到MQFaultStrategy
中,代碼如下:
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
//1解取、根據(jù)發(fā)送結(jié)果步责,計(jì)算broker不可用時(shí)長(zhǎng)
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
//2、更新Broker不可用時(shí)長(zhǎng)
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
第1步:根據(jù)上次消息發(fā)送時(shí)長(zhǎng)和結(jié)果禀苦,計(jì)算Broker應(yīng)該多長(zhǎng)時(shí)間不可用蔓肯,如果上次發(fā)送失敗的話,發(fā)送時(shí)長(zhǎng)按30秒計(jì)算振乏。
MQFaultStrategy
維護(hù)了一個(gè)broker延時(shí)列表蔗包,如下:
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
以上兩個(gè)列表是一一對(duì)應(yīng)的,當(dāng)發(fā)送時(shí)長(zhǎng)低于100ms時(shí)慧邮,設(shè)置broker不可用時(shí)長(zhǎng)為0调限,之后依次增加,如果超過(guò)15秒误澳,則有10分鐘不可用耻矮。可以看到如果上次發(fā)送失敗的話忆谓,也是10分鐘不可用裆装,如果重試肯定不會(huì)選擇相同的broker。
消息提交sendKernelImpl()
Producer發(fā)送消息最終是調(diào)用sendKernelImpl()
完成提交的倡缠,代碼如下:
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
//根據(jù)brokerName從緩存中獲取broker的地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
//Double check,如果地址為空哨免,則從nameserv中再獲取一次
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
//切換到VIP channel
//Broker啟動(dòng)時(shí)會(huì)開(kāi)啟2個(gè)端口接收客戶(hù)端數(shù)據(jù),其中一個(gè)端口只接收producer的消息昙沦,
//不接受consumer的拉取請(qǐng)求琢唾,被稱(chēng)為VIP channel
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
//客戶(hù)端設(shè)置的id
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
//如果消息body過(guò)長(zhǎng),則壓縮并設(shè)置標(biāo)記位
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
//回調(diào)Forbidden Hook
if (hasCheckForbiddenHook()) {
...
...
}
// 回調(diào)SendMessage Hook
if (this.hasSendMessageHook()) {
...
...
}
//設(shè)置消息頭
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
//要求重新發(fā)送的消息桅滋,設(shè)置重試次數(shù)和延時(shí)時(shí)間
//僅Consumer用
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
//通過(guò)NettyClient發(fā)送消息到Broker
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
msg.setBody(prevBody);
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
//回調(diào)Send message Hook
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} catch (RemotingException e) {
...
} catch (MQBrokerException e) {
...
} catch (InterruptedException e) {
...
} finally {
msg.setBody(prevBody);
}
}
//Broker地址獲取失敗慧耍,拋出異常中止發(fā)送
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
最后一步身辨,通過(guò)前面選擇的queue和broker獲取broker 地址,封裝消息包并發(fā)送到broker芍碧,客戶(hù)端支持單條消息發(fā)送煌珊,也支持多條消息封裝到一個(gè)包中發(fā)送。Client會(huì)和broker保持長(zhǎng)連接泌豆,提高發(fā)送速度定庵。