RocketMQ源碼解析(三)-Producer

發(fā)送方式

producer發(fā)送消息支持3種方式惭笑,同步、異步和Oneway瞪浸。

  • 同步發(fā)送:客戶(hù)端提交消息到broker后會(huì)等待返回結(jié)果儒将,相對(duì)來(lái)說(shuō)是最常用的方式。
  • 異步發(fā)送:調(diào)用發(fā)送接口時(shí)會(huì)注冊(cè)一個(gè)callback類(lèi)对蒲,發(fā)送線程繼續(xù)其它業(yè)務(wù)邏輯钩蚊,producer在收到broker結(jié)果后回調(diào)。比較適合不想發(fā)送結(jié)果影響正常業(yè)務(wù)邏輯的情況蹈矮。
  • Oneway:Producer提交消息后砰逻,無(wú)論broker是否正常接收消息都不關(guān)心。適合于追求高吞吐泛鸟、能容忍消息丟失的場(chǎng)景蝠咆,比如日志收集。

發(fā)送實(shí)例

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
            DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

發(fā)送消息首先需要初始化一個(gè)DefaultMQProducer,設(shè)置group name和nameserv的地址刚操。Producer啟動(dòng)后就可以往指定的topic發(fā)送消息闸翅。

MQProducer初始化

Producer的調(diào)用關(guān)系是
MQProducer -> DefaultMQProducer ->DefaultMQProducerImpl
DefaultMQProducer是一個(gè)Facade類(lèi),封裝了DefaultMQProducerImpl內(nèi)部實(shí)現(xiàn)菊霜。我們來(lái)看下Producer的啟動(dòng)過(guò)程坚冀,DefaultMQProducerImpl.start()

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                //參數(shù)檢查鉴逞,不能使用系統(tǒng)默認(rèn)的GroupName
                this.checkConfig();
                //設(shè)置clientInstanceName记某,使用進(jìn)程ID (PID)
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
               // 初始化MQClientInstance,一個(gè)進(jìn)程只會(huì)存在一個(gè)MQClientInstance, 設(shè)置clientId (IP@PID)
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
               //將當(dāng)前Producer注冊(cè)進(jìn)MQClientInsance构捡,保證一個(gè)producerName值對(duì)應(yīng)一個(gè)Producer
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                //啟動(dòng)MQClientInstance
                if (startFactory) {
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                //設(shè)置狀態(tài)為RUNNING
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
       //向所有broker發(fā)送一次心跳 
       this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    }

從上面的代碼可以看出液南,start的過(guò)程主要就是初始化和啟動(dòng)一個(gè)MQClientInstance,將producer注冊(cè)到instance中叭喜。我們來(lái)看下MQClientInstance的啟動(dòng)過(guò)程贺拣。
MQClientInstance啟動(dòng)過(guò)程

public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // 1、如果NameservAddr為空捂蕴,嘗試從http server獲取nameserv的地址
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    // 2譬涡、啟動(dòng)MQClientAPIImpl,初始化NettyClient
                    this.mQClientAPIImpl.start();
                    // 3啥辨、開(kāi)啟Client的定時(shí)任務(wù)
                    this.startScheduledTask();
                    // 4涡匀、Start pull service,開(kāi)始處理PullRequest
                    this.pullMessageService.start();
                    // 5、Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    //6溉知、啟動(dòng)Client內(nèi)置的producer
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

1陨瘩、如果producer在初始化的時(shí)候沒(méi)有設(shè)置nameserv的地址,則會(huì)嘗試從一個(gè)http server獲取nameserv级乍。這個(gè)httpserver是可以配置的舌劳,這種方式非常適合于有統(tǒng)一配置中心的系統(tǒng)
3、這里開(kāi)啟的定時(shí)任務(wù)有以下幾個(gè):
1)獲取nameserv地址玫荣,就是重復(fù)的做第1步甚淡,這樣就可以動(dòng)態(tài)切換nameserv的地址
2)從nameserv更新topicRouteInfo,對(duì)于producer來(lái)說(shuō)topic的路由信息是最重要的
3)將緩存的broker信息和最新的topicRouteInfo做對(duì)比捅厂,清除已經(jīng)下線的broker
4)向broker發(fā)送心跳
4 ~ 6贯卦,producer和consumer公用一個(gè)MQClientInstance的實(shí)現(xiàn)。這幾步初始化是給consumer用的焙贷,后面講consumer的時(shí)候再講撵割。
Producer啟動(dòng)完成以后,就可以發(fā)送消息了辙芍,下面我們來(lái)看下一條普通的message的發(fā)送過(guò)程

消息發(fā)送

Producer默認(rèn)采用SYNC方式提交消息啡彬,消息提交給broker收到response后返回。方法是DefaultMQProducerImpl.send( Message msg)

/**
     * DEFAULT SYNC -------------------------------------------------------
     */
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
    }
    public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }

    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //1、參數(shù)檢查庶灿,消息不能發(fā)給系統(tǒng)預(yù)留的topic注簿,消息體是否超過(guò)最大長(zhǎng)度
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);

        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        //2、根據(jù)消息的topic跳仿,獲取該topic的路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            ....
            //3、發(fā)送重試次數(shù)
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0; 
            //用來(lái)緩存發(fā)送和重試中已經(jīng)用過(guò)的broker
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                //4捐晶、從所有topic可用queue中選擇一個(gè)queue
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {//獲取Queue成功
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        ...
                        //5菲语、提交消息到mq
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        //6、成功惑灵,更新本次調(diào)用時(shí)間到MQFaultStrategy中
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            //異步和ONEWAY調(diào)用后就直接返回了
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            //7山上、如果broker存儲(chǔ)失敗,判斷是否要重試
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    // 8英支、調(diào)用接口異常佩憾,更新?tīng)顟B(tài)到MQFaultStrategy中
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        ...
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        ...
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        ...
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        ...
                        throw e;
                    }
                } else {
                    break;
                }
            }
            //9、成功則返回結(jié)果
            if (sendResult != null) {
                return sendResult;
            }

            ...
            MQClientException mqClientException = new MQClientException(info, exception);
            if (callTimeout) {
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }
            //超過(guò)重試次數(shù)后干花,根據(jù)不同的錯(cuò)誤設(shè)置拋出異常類(lèi)型
            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }

            throw mqClientException;
        }
        ...
    }

從上面的發(fā)送邏輯可以看出妄帘,無(wú)論哪種發(fā)送方式,最終都是調(diào)用的sendDefaultImpl來(lái)提交消息池凄。
第2步:獲取topic的所有路由信息抡驼,詳細(xì)邏輯后面講
第3步:SYNC發(fā)送可以設(shè)置失敗重試次數(shù)
第4步:因?yàn)槊總€(gè)topic會(huì)在集群的多個(gè)broker上存在多個(gè)queue,所以這里會(huì)選擇一個(gè)合適的queue肿仑,也就是在producer端實(shí)現(xiàn)負(fù)載均衡的功能致盟,詳細(xì)邏輯后面講
第6和8步:無(wú)論提交消息成功或者失敗,都會(huì)更新結(jié)果到MQFaultStrategy中尤慰,也就是第4中選取queue時(shí)采用的策略
第7步:對(duì)于消息提交成功馏锡,不止有SUCCESS一種狀態(tài),還有別的情況下也會(huì)認(rèn)為成功的伟端,比如broker接收和處理消息成功了杯道,但是寫(xiě)給slave失敗了,或者數(shù)據(jù)落盤(pán)失敗了等荔泳。針對(duì)于存儲(chǔ)失敗的情況蕉饼,客戶(hù)端可以選擇是否要重新發(fā)送。
以上就是消息發(fā)送的整個(gè)流程玛歌,下面分解下每一步的實(shí)現(xiàn)

獲取topic路由

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //如果TopicPushlishInfo不存在昧港,則會(huì)嘗試從Nameserv更新topic路由信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
 }

根據(jù)topic直接從內(nèi)存的緩存中獲取路由信息,緩存的更新在前面的定時(shí)任務(wù)已經(jīng)講過(guò)支子。
如果TopicPushlishInfo不存在创肥,則會(huì)嘗試從Nameserv更新信息。更新策略是:
1)按topicName去nameserv找指定topic的route信息;
2)如果第一步?jīng)]獲取到則嘗試獲取默認(rèn)創(chuàng)建topic(TBW102)的route信息叹侄,前提是broker支持默認(rèn)創(chuàng)建巩搏。
最終,如果沒(méi)有獲取到topic的route信息趾代,則報(bào)錯(cuò)中止消息發(fā)送

Queue選取策略

選擇Queue
Queue的選取是發(fā)送端實(shí)現(xiàn)負(fù)責(zé)均衡的核心贯底,根據(jù)client是否開(kāi)啟了延時(shí)容錯(cuò),實(shí)現(xiàn)輪詢(xún)和加可用性輪詢(xún)的選取策略撒强。

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        //直接調(diào)用MQFaultStrategy的方法
        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
    }
     //MQFaultStrategy的方法實(shí)現(xiàn)
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        //開(kāi)啟了延時(shí)容錯(cuò)
        if (this.sendLatencyFaultEnable) {
            try {
               //1禽捆、首先獲取上次使用的Queue index+1
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    //2、找到index對(duì)應(yīng)的queue
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    //3飘哨、如果queue對(duì)應(yīng)的broker可用胚想,則使用該broker
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }
                //4、如果上一步?jīng)]找個(gè)合適的broker芽隆,則從所有的broker中選擇一個(gè)相對(duì)合適的浊服,并且broker是可寫(xiě)的。
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
           //5胚吁、如果以上都沒(méi)找到牙躺,則直接按順序選擇下一個(gè)
            return tpInfo.selectOneMessageQueue();
        }
        //6、未開(kāi)啟延時(shí)容錯(cuò)囤采,直接按順序選下一個(gè)
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

Producer為每個(gè)topic緩存了一個(gè)全局index述呐,每次發(fā)送之后+1,然后從所有queue列表中選擇index位置上的queue,這樣就實(shí)現(xiàn)了輪詢(xún)的效果蕉毯。
如果開(kāi)啟了延時(shí)容錯(cuò)乓搬,則會(huì)考慮broker的可用性:
第1) 2)步:根據(jù)全局index找到queue
第3)步:如果根據(jù)延時(shí)容錯(cuò)判斷queue所在的broker當(dāng)前可用,并且是第一次發(fā)送代虾,或者是重試并且和上次用的broker是同一個(gè)进肯,則使用這個(gè)queue。這里面有兩個(gè)邏輯棉磨,一個(gè)是broker的可用性是如何判斷的江掩,這個(gè)我們下面說(shuō);第二個(gè)是為什么重試的時(shí)候要選上次的broker乘瓤,下面說(shuō)下我的理解环形。

由前面的發(fā)送邏輯中的第6和8步知道,有兩種情況會(huì)重試衙傀,一種是broker返回處理成功但是store失敗抬吟,一種是broker返回失敗。
對(duì)于返回失敗的情況统抬,其實(shí)會(huì)直接更新broker為短時(shí)不可用狀態(tài),這個(gè)在第一個(gè)if條件就已經(jīng)通不過(guò)了火本;而對(duì)于store失敗的情況危队,說(shuō)明broker當(dāng)前是正常的,重發(fā)還是發(fā)給同一個(gè)broker有利于防止消息重復(fù)钙畔。

第4)步:如果將所有queue按照第3)步的情況過(guò)一遍茫陆,發(fā)現(xiàn)都不符合條件,則從所有broker中選擇一個(gè)相對(duì)好的擎析。
第5)步:如果第4不中的broker不支持寫(xiě)入簿盅,則跟未開(kāi)啟延時(shí)容錯(cuò)一樣的邏輯,直接選下一個(gè)queue
Broker延時(shí)控制邏輯
由上面的queue的選擇策略可以知道揍魂,queue的選擇除了輪詢(xún)以外挪鹏,就是根據(jù)Broker的可用性∮淅樱回看下消息發(fā)送的第6步和第8步,在消息發(fā)送后會(huì)更新時(shí)間和發(fā)送狀態(tài)到MQFaultStrategy中,代碼如下:

    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            //1解取、根據(jù)發(fā)送結(jié)果步责,計(jì)算broker不可用時(shí)長(zhǎng)
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            //2、更新Broker不可用時(shí)長(zhǎng)
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }

第1步:根據(jù)上次消息發(fā)送時(shí)長(zhǎng)和結(jié)果禀苦,計(jì)算Broker應(yīng)該多長(zhǎng)時(shí)間不可用蔓肯,如果上次發(fā)送失敗的話,發(fā)送時(shí)長(zhǎng)按30秒計(jì)算振乏。
MQFaultStrategy維護(hù)了一個(gè)broker延時(shí)列表蔗包,如下:

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

以上兩個(gè)列表是一一對(duì)應(yīng)的,當(dāng)發(fā)送時(shí)長(zhǎng)低于100ms時(shí)慧邮,設(shè)置broker不可用時(shí)長(zhǎng)為0调限,之后依次增加,如果超過(guò)15秒误澳,則有10分鐘不可用耻矮。可以看到如果上次發(fā)送失敗的話忆谓,也是10分鐘不可用裆装,如果重試肯定不會(huì)選擇相同的broker。

消息提交sendKernelImpl()

Producer發(fā)送消息最終是調(diào)用sendKernelImpl()完成提交的倡缠,代碼如下:

private SendResult sendKernelImpl(final Message msg,
                                      final MessageQueue mq,
                                      final CommunicationMode communicationMode,
                                      final SendCallback sendCallback,
                                      final TopicPublishInfo topicPublishInfo,
                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        //根據(jù)brokerName從緩存中獲取broker的地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        //Double check,如果地址為空哨免,則從nameserv中再獲取一次
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
           //切換到VIP channel
           //Broker啟動(dòng)時(shí)會(huì)開(kāi)啟2個(gè)端口接收客戶(hù)端數(shù)據(jù),其中一個(gè)端口只接收producer的消息昙沦,
           //不接受consumer的拉取請(qǐng)求琢唾,被稱(chēng)為VIP channel
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                //客戶(hù)端設(shè)置的id
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }
                //如果消息body過(guò)長(zhǎng),則壓縮并設(shè)置標(biāo)記位
                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }

                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }
               //回調(diào)Forbidden Hook
                if (hasCheckForbiddenHook()) {
                    ...
                    ...
                }
                // 回調(diào)SendMessage Hook
                if (this.hasSendMessageHook()) {
                    ...
                    ...
                }
                //設(shè)置消息頭
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                //要求重新發(fā)送的消息桅滋,設(shè)置重試次數(shù)和延時(shí)時(shí)間
                //僅Consumer用
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }
                //通過(guò)NettyClient發(fā)送消息到Broker
                SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC:
                        Message tmpMessage = msg;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            msg.setBody(prevBody);
                        }
                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }
                //回調(diào)Send message Hook
                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } catch (RemotingException e) {
                ...
            } catch (MQBrokerException e) {
                ...
            } catch (InterruptedException e) {
               ...
            } finally {
                msg.setBody(prevBody);
            }
        }
       //Broker地址獲取失敗慧耍,拋出異常中止發(fā)送
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

最后一步身辨,通過(guò)前面選擇的queue和broker獲取broker 地址,封裝消息包并發(fā)送到broker芍碧,客戶(hù)端支持單條消息發(fā)送煌珊,也支持多條消息封裝到一個(gè)包中發(fā)送。Client會(huì)和broker保持長(zhǎng)連接泌豆,提高發(fā)送速度定庵。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市踪危,隨后出現(xiàn)的幾起案子蔬浙,更是在濱河造成了極大的恐慌,老刑警劉巖贞远,帶你破解...
    沈念sama閱讀 217,907評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件畴博,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡蓝仲,警方通過(guò)查閱死者的電腦和手機(jī)俱病,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)袱结,“玉大人亮隙,你說(shuō)我怎么就攤上這事」讣校” “怎么了溢吻?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,298評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)果元。 經(jīng)常有香客問(wèn)我促王,道長(zhǎng),這世上最難降的妖魔是什么而晒? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,586評(píng)論 1 293
  • 正文 為了忘掉前任硼砰,我火速辦了婚禮,結(jié)果婚禮上欣硼,老公的妹妹穿的比我還像新娘题翰。我一直安慰自己,他們只是感情好诈胜,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,633評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布豹障。 她就那樣靜靜地躺著,像睡著了一般焦匈。 火紅的嫁衣襯著肌膚如雪血公。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,488評(píng)論 1 302
  • 那天缓熟,我揣著相機(jī)與錄音累魔,去河邊找鬼摔笤。 笑死,一個(gè)胖子當(dāng)著我的面吹牛垦写,可吹牛的內(nèi)容都是我干的吕世。 我是一名探鬼主播,決...
    沈念sama閱讀 40,275評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼梯投,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼命辖!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起分蓖,我...
    開(kāi)封第一講書(shū)人閱讀 39,176評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤尔艇,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后么鹤,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體终娃,經(jīng)...
    沈念sama閱讀 45,619評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,819評(píng)論 3 336
  • 正文 我和宋清朗相戀三年蒸甜,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了尝抖。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,932評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡迅皇,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出衙熔,到底是詐尸還是另有隱情登颓,我是刑警寧澤,帶...
    沈念sama閱讀 35,655評(píng)論 5 346
  • 正文 年R本政府宣布红氯,位于F島的核電站框咙,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏痢甘。R本人自食惡果不足惜喇嘱,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,265評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望塞栅。 院中可真熱鬧者铜,春花似錦、人聲如沸放椰。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,871評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)砾医。三九已至拿撩,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間如蚜,已是汗流浹背压恒。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,994評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工影暴, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人探赫。 一個(gè)月前我還...
    沈念sama閱讀 48,095評(píng)論 3 370
  • 正文 我出身青樓型宙,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親期吓。 傳聞我的和親對(duì)象是個(gè)殘疾皇子早歇,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,884評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容