??【Alibaba中間件技術(shù)系列】「RocketMQ技術(shù)專(zhuān)題」讓我們一起探索一下DefaultMQPushConsumer的實(shí)現(xiàn)原理及源碼分析

RocketMQ的前提回顧

RocketMQ是一款分布式、隊(duì)列模型的消息中間件帅涂,具有以下特點(diǎn):

  1. 能夠保證嚴(yán)格的消息順序
  2. 提供豐富的消息拉取模式
  3. 高效的訂閱者水平擴(kuò)展能力
  4. 實(shí)時(shí)的消息訂閱機(jī)制
  5. 億級(jí)消息堆積能力

為什么使用RocketMQ

  1. 強(qiáng)調(diào)集群無(wú)單點(diǎn)溺职,可擴(kuò)展净捅,任意一點(diǎn)高可用、水平可擴(kuò)展
  2. 海量消息堆積能力,消息堆積后寫(xiě)入低延遲
  3. 支持上萬(wàn)個(gè)隊(duì)列
  4. 消息失敗重試機(jī)制
  5. 消息可查詢
  6. 開(kāi)源社區(qū)活躍
  7. 成熟度已經(jīng)經(jīng)過(guò)淘寶雙十一的考驗(yàn)

RocketMQ的發(fā)展變化

RocketMQ開(kāi)源是使用文件作為持久化工具,阿里內(nèi)部未開(kāi)源的性能會(huì)更高,使用oceanBase作為持久化工具绑青。
在RocketMQ1.x和2.x使用zookeeper管理集群,3.x開(kāi)始使用nameserver代替zk屋群,更輕量級(jí)闸婴,此外RocketMQ的客戶端擁有兩種的操作方式:DefaultMQPushConsumer和DefaultMQPullConsumer。

DefaultMQPushConsumer的Maven配置

<dependency>
   <groupId>org.apache.rocketmq</groupId>
   <artifactId>rocketmq-client</artifactId>
   <version>4.3.0</version>
</dependency>

DefaultMQPushConsumer使用示例

  1. CONSUME_FROM_LAST_OFFSET:第一次啟動(dòng)從隊(duì)列最后位置消費(fèi)芍躏,后續(xù)再啟動(dòng)接著上次消費(fèi)的進(jìn)度開(kāi)始消費(fèi)
  2. CONSUME_FROM_FIRST_OFFSET:第一次啟動(dòng)從隊(duì)列初始位置消費(fèi)邪乍,后續(xù)再啟動(dòng)接著上次消費(fèi)的進(jìn)度開(kāi)始消費(fèi)
  3. CONSUME_FROM_TIMESTAMP:第一次啟動(dòng)從指定時(shí)間點(diǎn)位置消費(fèi),后續(xù)再啟動(dòng)接著上次消費(fèi)的進(jìn)度開(kāi)始消費(fèi)

以上所說(shuō)的第一次啟動(dòng)是指從來(lái)沒(méi)有消費(fèi)過(guò)的消費(fèi)者对竣,如果該消費(fèi)者消費(fèi)過(guò)庇楞,那么會(huì)在broker端記錄該消費(fèi)者的消費(fèi)位置,如果該消費(fèi)者掛了再啟動(dòng)否纬,那么自動(dòng)從上次消費(fèi)的進(jìn)度開(kāi)始

public class MQPushConsumer {
    public static void main(String[] args) throws MQClientException {
        String groupName = "rocketMqGroup1";
        // 用于把多個(gè)Consumer組織到一起吕晌,提高并發(fā)處理能力
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        // 設(shè)置nameServer地址,多個(gè)以;分隔
        consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 訂閱topic临燃,可以對(duì)指定消息進(jìn)行過(guò)濾睛驳,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息
        consumer.subscribe("order-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> mgs,
                    ConsumeConcurrentlyContext consumeconcurrentlycontext) {
                System.out.println(Thread.currentThread().getName()+"Receive New Messages:"+mgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}
  • CLUSTERING:默認(rèn)模式,同一個(gè)ConsumerGroup(groupName相同)每個(gè)consumer只消費(fèi)所訂閱消息的一部分內(nèi)容膜廊,同一個(gè)ConsumerGroup里所有的Consumer消息加起來(lái)才是所
  • 訂閱topic整體乏沸,從而達(dá)到負(fù)載均衡的目的
  • BROADCASTING:同一個(gè)ConsumerGroup每個(gè)consumer都消費(fèi)到所訂閱topic所有消息,也就是一個(gè)消費(fèi)會(huì)被多次分發(fā)溃论,被多個(gè)consumer消費(fèi)屎蜓。

ConsumeConcurrentlyStatus.RECONSUME_LATER boker會(huì)根據(jù)設(shè)置的messageDelayLevel發(fā)起重試,默認(rèn)16次钥勋。

DefaultMQPushConsumerImpl中各個(gè)對(duì)象的主要功能如下:

RebalancePushImpl:主要負(fù)責(zé)決定炬转,當(dāng)前的consumer應(yīng)該從哪些Queue中消費(fèi)消息;

  • 1)PullAPIWrapper:長(zhǎng)連接算灸,負(fù)責(zé)從broker處拉取消息扼劈,然后利用ConsumeMessageService回調(diào)用戶的Listener執(zhí)行消息消費(fèi)邏輯;
  • 2)ConsumeMessageService:實(shí)現(xiàn)所謂的"Push-被動(dòng)"消費(fèi)機(jī)制菲驴;從Broker拉取的消息后荐吵,封裝成ConsumeRequest提交給ConsumeMessageSerivce,此service負(fù)責(zé)回調(diào)用戶的Listener消費(fèi)消息;
  • 3)OffsetStore:維護(hù)當(dāng)前consumer的消費(fèi)記錄(offset)先煎;有兩種實(shí)現(xiàn)贼涩,Local和Rmote,Local存儲(chǔ)在本地磁盤(pán)上薯蝎,適用于BROADCASTING廣播消費(fèi)模式遥倦;而Remote則將消費(fèi)進(jìn)度存儲(chǔ)在Broker上,適用于CLUSTERING集群消費(fèi)模式占锯;
  • 4)MQClientFactory:負(fù)責(zé)管理client(consumer袒哥、producer),并提供多中功能接口供各個(gè)Service(Rebalance消略、PullMessage等)調(diào)用堡称;大部分邏輯均在這個(gè)類(lèi)中完成;

consumer.registerMessageListener執(zhí)行過(guò)程:

/**
     * Register a callback to execute on message arrival for concurrent consuming.
     * @param messageListener message handling callback.
     */
    @Override
    public void registerMessageListener(MessageListenerConcurrently messageListener) {
        this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
    }

通過(guò)源碼可以看出主要實(shí)現(xiàn)過(guò)程在DefaultMQPushConsumerImpl類(lèi)中consumer.start后調(diào)用DefaultMQPushConsumerImpl的同步start方法

public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                this.copySubscription();
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                  this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                this.consumeMessageService.start();
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();
    }

通過(guò)mQClientFactory.start();發(fā)我們發(fā)現(xiàn)他調(diào)用

public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                  this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

在這個(gè)方法中有多個(gè)start艺演,我們主要看pullMessageService.start();通過(guò)這里我們發(fā)現(xiàn)RocketMQ的Push模式底層其實(shí)也是通過(guò)pull實(shí)現(xiàn)的却紧,下面我們來(lái)看下pullMessageService處理了哪些邏輯:

private void pullMessage(final PullRequest pullRequest) {
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }

我們發(fā)現(xiàn)其實(shí)他還是通過(guò)DefaultMQPushConsumerImpl類(lèi)的pullMessage方法來(lái)進(jìn)行消息的邏輯處理.

pullRequest拉取方式

PullRequest這里說(shuō)明一下,上面我們已經(jīng)提了一下rocketmq的push模式其實(shí)是通過(guò)pull模式封裝實(shí)現(xiàn)的钞艇,pullrequest這里是通過(guò)長(zhǎng)輪詢的方式達(dá)到push效果啄寡。

長(zhǎng)輪詢方式既有pull的優(yōu)點(diǎn)又有push模式的實(shí)時(shí)性有點(diǎn)。

  • push方式是server端接收到消息后哩照,主動(dòng)把消息推送給client端挺物,實(shí)時(shí)性高。弊端是server端工作量大飘弧,影響性能识藤,其次是client端處理能力不同且client端的狀態(tài)不受server端的控制,如果client端不能及時(shí)處理消息容易導(dǎo)致消息堆積已經(jīng)影響正常業(yè)務(wù)等次伶。

  • pull方式是client循環(huán)從server端拉取消息痴昧,主動(dòng)權(quán)在client端,自己處理完一個(gè)消息再去拉取下一個(gè)冠王,缺點(diǎn)是循環(huán)的時(shí)間不好設(shè)定赶撰,時(shí)間太短容易忙等,浪費(fèi)CPU資源柱彻,時(shí)間間隔太長(zhǎng)client的處理能力會(huì)下降豪娜,有時(shí)候有些消息會(huì)處理不及時(shí)。

長(zhǎng)輪詢的方式可以結(jié)合兩者優(yōu)點(diǎn)
  1. 檢查PullRequest對(duì)象中的ProcessQueue對(duì)象的dropped是否為true(在RebalanceService線程中為topic下的MessageQueue創(chuàng)建拉取消息請(qǐng)求時(shí)要維護(hù)對(duì)應(yīng)的ProcessQueue對(duì)象哟楷,若Consumer不再訂閱該topic則會(huì)將該對(duì)象的dropped置為true)瘤载;若是則認(rèn)為該請(qǐng)求是已經(jīng)取消的,則直接跳出該方法卖擅;
  2. 更新PullRequest對(duì)象中的ProcessQueue對(duì)象的時(shí)間戳(ProcessQueue.lastPullTimestamp)為當(dāng)前時(shí)間戳鸣奔;
  3. 檢查該Consumer是否運(yùn)行中墨技,即DefaultMQPushConsumerImpl.serviceState是否為RUNNING;若不是運(yùn)行狀態(tài)或者是暫停狀態(tài)(DefaultMQPushConsumerImpl.pause=true),則調(diào)用PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)方法延遲再拉取消息挎狸,其中timeDelay=3000扣汪;該方法的目的是在3秒之后再次將該P(yáng)ullRequest對(duì)象放入PullMessageService. pullRequestQueue隊(duì)列中;并跳出該方法伟叛;
  4. 進(jìn)行流控私痹。若ProcessQueue對(duì)象的msgCount大于了消費(fèi)端的流控閾值(DefaultMQPushConsumer.pullThresholdForQueue,默認(rèn)值為1000)统刮,則調(diào)用PullMessageService.executePullRequestLater方法,在50毫秒之后重新該P(yáng)ullRequest請(qǐng)求放入PullMessageService.pullRequestQueue隊(duì)列中账千;并跳出該方法侥蒙;
  5. 若不是順序消費(fèi)(即DefaultMQPushConsumerImpl.consumeOrderly等于false),則檢查ProcessQueue對(duì)象的msgTreeMap:TreeMap<Long,MessageExt>變量的第一個(gè)key值與最后一個(gè)key值之間的差額匀奏,該key值表示查詢的隊(duì)列偏移量queueoffset鞭衩;若差額大于閾值(由DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定,默認(rèn)是2000)娃善,則調(diào)用PullMessageService.executePullRequestLater方法论衍,在50毫秒之后重新將該P(yáng)ullRequest請(qǐng)求放入PullMessageService.pullRequestQueue隊(duì)列中;并跳出該方法聚磺;
  6. 以PullRequest.messageQueue對(duì)象的topic值為參數(shù)從RebalanceImpl.subscriptionInner: ConcurrentHashMap, SubscriptionData>中獲取對(duì)應(yīng)的SubscriptionData對(duì)象坯台,若該對(duì)象為null,考慮到并發(fā)的關(guān)系瘫寝,調(diào)用executePullRequestLater方法蜒蕾,稍后重試;并跳出該方法焕阿;
  7. 若消息模型為集群模式(RebalanceImpl.messageModel等于CLUSTERING)咪啡,則以PullRequest對(duì)象的MessageQueue變量值、type =READ_FROM_MEMORY(從內(nèi)存中獲取消費(fèi)進(jìn)度offset值)為參數(shù)調(diào)用DefaultMQPushConsumerImpl. offsetStore對(duì)象(初始化為RemoteBrokerOffsetStore對(duì)象)的readOffset(MessageQueue mq, ReadOffsetType type)方法從本地內(nèi)存中獲取消費(fèi)進(jìn)度offset值暮屡。若該offset值大于0 則置臨時(shí)變量commitOffsetEnable等于true否則為false撤摸;該offset值作為pullKernelImpl方法中的commitOffset參數(shù),在Broker端拉取消息之后根據(jù)commitOffsetEnable參數(shù)值決定是否用該offset更新消息進(jìn)度褒纲。該readOffset方法的邏輯是:以入?yún)essageQueue對(duì)象從RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap <MessageQueue,AtomicLong>變量中獲取消費(fèi)進(jìn)度偏移量准夷;若該偏移量不為null則返回該值,否則返回-1外厂;
  8. 當(dāng)每次拉取消息之后需要更新訂閱關(guān)系(由DefaultMQPushConsumer. postSubscriptionWhenPull參數(shù)表示冕象,默認(rèn)為false)并且以topic值參數(shù)從RebalanceImpl.subscriptionInner獲取的SubscriptionData對(duì)象的classFilterMode等于false(默認(rèn)為false),則將sysFlag標(biāo)記的第3個(gè)字節(jié)置為1汁蝶,否則該字節(jié)置為0渐扮;
  9. 該sysFlag標(biāo)記的第1個(gè)字節(jié)置為commitOffsetEnable的值论悴;第2個(gè)字節(jié)(suspend標(biāo)記)置為1;第4個(gè)字節(jié)置為classFilterMode的值墓律;
  10. 初始化匿名內(nèi)部類(lèi)PullCallback膀估,實(shí)現(xiàn)了onSucess/onException方法; 該方法只有在異步請(qǐng)求的情況下才會(huì)回調(diào)耻讽;
  11. 調(diào)用底層的拉取消息API接口:

PullAPIWrapper.pullKernelImpl

PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法進(jìn)行消息拉取操作察纯。

將回調(diào)類(lèi)PullCallback傳入該方法中,當(dāng)采用異步方式拉取消息時(shí)针肥,在收到響應(yīng)之后會(huì)回調(diào)該回調(diào)類(lèi)的方法饼记。

public void pullMessage(final PullRequest pullRequest) {
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        if (processQueue.isDropped()) {
            log.info("the pull request[{}] is dropped.", pullRequest.toString());
            return;
        }
        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
        try {
            this.makeSureStateOK();
        } catch (MQClientException e) {
            log.warn("pullMessage exception, consumer state not ok", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            return;
        }
        if (this.isPause()) {
            log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
            return;
        }
        long cachedMessageCount = processQueue.getMsgCount().get();
        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
        if (!this.consumeOrderly) {
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, queueMaxSpanFlowControlTimes);
                }
                return;
            }
        } else {
            if (processQueue.isLocked()) {
                if (!pullRequest.isLockedFirst()) {
                    final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                    boolean brokerBusy = offset < pullRequest.getNextOffset();
                    log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
                        pullRequest, offset, brokerBusy);
                    if (brokerBusy) {
                        log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
                            pullRequest, offset);
                    }
                    pullRequest.setLockedFirst(true);
                    pullRequest.setNextOffset(offset);
                }
            } else {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                log.info("pull message later because not locked in broker, {}", pullRequest);
                return;
            }
        }
        final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (null == subscriptionData) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            log.warn("find the consumer's subscription failed, {}", pullRequest);
            return;
        }
        final long beginTimestamp = System.currentTimeMillis();
        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);
                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }
                            if (pullResult.getNextBeginOffset() < prevRequestOffset
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                    pullResult.getNextBeginOffset(),
                                    firstMsgOffset,
                                    prevRequestOffset);
                            }
                            break;
                        case NO_NEW_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case NO_MATCHED_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal, {} {}",
                                pullRequest.toString(), pullResult.toString());
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
                                @Override
                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);
                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                        log.warn("fix the pull request offset, {}", pullRequest);
                                    } catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
                            }, 10000);
                            break;
                        default:
                            break;
                    }
                }
            }
            @Override
            public void onException(Throwable e) {
                if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("execute the pull request exception", e);
                }
                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            }
        };
        boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
            if (commitOffsetValue > 0) {
                commitOffsetEnable = true;
            }
        }
        String subExpression = null;
        boolean classFilter = false;
        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (sd != null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                subExpression = sd.getSubString();
            }
            classFilter = sd.isClassFilterMode();
        }
        int sysFlag = PullSysFlag.buildSysFlag(
            commitOffsetEnable, // commitOffset
            true, // suspend
            subExpression != null, // subscription
            classFilter // class filter
        );
        try {
            // 下面我們看繼續(xù)跟進(jìn)這個(gè)方法,這個(gè)方法已經(jīng)就是客戶端如何拉取消息
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                // 消息的通信方式為異步
                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }
    }

發(fā)送遠(yuǎn)程請(qǐng)求拉取消息

在MQClientAPIImpl.pullMessage方法中慰枕,根據(jù)入?yún)ommunicationMode的值分為異步拉取和同步拉取方式兩種具则。

無(wú)論是異步方式拉取還是同步方式拉取,在發(fā)送拉取請(qǐng)求之前都會(huì)構(gòu)造一個(gè)ResponseFuture對(duì)象具帮,以請(qǐng)求消息的序列號(hào)為key值博肋,存入NettyRemotingAbstract.responseTable:ConcurrentHashMap, ResponseFuture>變量中,對(duì)該變量有幾種情況會(huì)處理:

  1. 發(fā)送失敗后直接刪掉responseTable變量中的相應(yīng)記錄蜂厅;
  2. 收到響應(yīng)消息之后匪凡,會(huì)以響應(yīng)消息中的序列號(hào)(由服務(wù)端根據(jù)請(qǐng)求消息的序列號(hào)原樣返回)從responseTable中查找ResponseFuture對(duì)象,并設(shè)置該對(duì)象的responseCommand變量掘猿。若是同步發(fā)送會(huì)喚醒等待響應(yīng)的ResponseFuture.waitResponse方法病游;若是異步發(fā)送會(huì)調(diào)用ResponseFuture.executeInvokeCallback()方法完成回調(diào)邏輯處理;
  3. 在NettyRemotingClient.start()啟動(dòng)時(shí)术奖,也會(huì)初始化定時(shí)任務(wù)礁遵,該定時(shí)任務(wù)每隔1秒定期掃描responseTable列表,遍歷該列表中的ResponseFuture對(duì)象采记,檢查等待響應(yīng)是否超時(shí)佣耐,若超時(shí),則調(diào)用ResponseFuture. executeInvokeCallback()方法唧龄,并將該對(duì)象從responseTable列表中刪除兼砖;
public PullResult pullMessage(
        final String addr,
        final PullMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
        switch (communicationMode) {
            case ONEWAY:
                assert false;
                return null;
            case ASYNC:
                this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
                return null;
            case SYNC:
                return this.pullMessageSync(addr, request, timeoutMillis);
            default:
                assert false;
                break;
        }
        return null;
    }

同步拉取

對(duì)于同步發(fā)送方式,調(diào)用MQClientAPIImpl.pullMessageSync(String addr, RemotingCommand request, long timeoutMillis)方法,大致步驟如下:

  1. 調(diào)用RemotingClient.invokeSync(String addr, RemotingCommand request, long timeoutMillis)方法:
    • 獲取Broker地址的Channel信息既棺。根據(jù)broker地址從RemotingClient.channelTables:ConcurrentHashMap, ChannelWrapper>變量中獲取ChannelWrapper對(duì)象并返回該對(duì)象的Channel變量讽挟;若沒(méi)有ChannelWrapper對(duì)象則與broker地址建立新的連接并將連接信息存入channelTables變量中,便于下次使用丸冕;
    • 若NettyRemotingClient.rpcHook:RPCHook變量不為空(該變量在應(yīng)用層初始化DefaultMQPushConsumer或者DefaultMQPullConsumer對(duì)象傳入該值)耽梅,則調(diào)用RPCHook.doBeforeRequest(String remoteAddr, RemotingCommand request)方法;
    • 調(diào)用NettyRemotingAbstract.invokeSyncImpl(Channel channel, RemotingCommand request, long timeoutMillis)方法胖烛,該方法的邏輯如下:
      • A)使用請(qǐng)求的序列號(hào)(opaue)眼姐、超時(shí)時(shí)間初始化ResponseFuture對(duì)象诅迷;并將該ResponseFuture對(duì)象存入NettyRemotingAbstract.responseTable: ConcurrentHashMap變量中;
      • B)調(diào)用Channel.writeAndFlush(Object msg)方法將請(qǐng)求對(duì)象RemotingCommand發(fā)送給Broker众旗;然后調(diào)用addListener(GenericFutureListener<? extends Future<? super Void>> listener)方法添加內(nèi)部匿名類(lèi):該內(nèi)部匿名類(lèi)實(shí)現(xiàn)了ChannelFutureListener接口的operationComplete方法罢杉,在發(fā)送完成之后回調(diào)該監(jiān)聽(tīng)類(lèi)的operationComplete方法,在該方法中贡歧,首先調(diào)用ChannelFuture. isSuccess()方法檢查是否發(fā)送成功滩租,若成功則置ResponseFuture對(duì)象的sendRequestOK等于true并退出此回調(diào)方法等待響應(yīng)結(jié)果;若不成功則置ResponseFuture對(duì)象的sendRequestOK等于false利朵,然后從NettyRemotingAbstract.responseTable中刪除此請(qǐng)求序列號(hào)(opaue)的記錄律想,置ResponseFuture對(duì)象的responseCommand等于null,并喚醒ResponseFuture.waitResponse(long timeoutMillis)方法的等待绍弟;
      • C)調(diào)用ResponseFuture.waitResponse(long timeoutMillis)方法等待響應(yīng)結(jié)果蜘欲;在發(fā)送失敗或者收到響應(yīng)消息(詳見(jiàn)5.10.3小節(jié))或者超時(shí)的情況下會(huì)喚醒該方法返回ResponseFuture.responseCommand變量值;
      • D)若上一步返回的responseCommand值為null晌柬,則拋出異常:若ResponseFuture.sendRequestOK為true,則拋出RemotingTimeoutException異常郭脂,否則拋出RemotingSendRequestException異常年碘;
      • E)若上一步返回的responseCommand值不為null,則返回responseCommand變量值展鸡;
    • 若NettyRemotingClient.rpcHook: RPCHook變量不為空屿衅,則調(diào)用RPCHook.doAfterResponse(String remoteAddr, RemotingCommand request)方法;
  • 以上一步的返回值RemotingCommand對(duì)象為參數(shù)調(diào)用MQClientAPIImpl. processPullResponse (RemotingCommand response)方法將返回對(duì)象解析并封裝成PullResultExt對(duì)象然后返回給調(diào)用者莹弊,響應(yīng)消息的結(jié)果狀態(tài)轉(zhuǎn)換如下:
    • 若RemotingCommand對(duì)象的Code等于SUCCESS涤久,則PullResultExt.pullStatus=FOUND;
    • 若RemotingCommand對(duì)象的Code等于PULL_NOT_FOUND忍弛,則PullResultExt.pullStatus= NO_NEW_MSG响迂;
    • 若RemotingCommand對(duì)象的Code等于PULL_RETRY_IMMEDIATELY,則PullResultExt.pullStatus= NO_MATCHED_MSG细疚;
    • 若RemotingCommand對(duì)象的Code等于PULL_OFFSET_MOVED蔗彤,則PullResultExt.pullStatus= OFFSET_ILLEGAL;
@Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        long beginStartTime = System.currentTimeMillis();
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
                    this.rpcHook.doBeforeRequest(addr, request);
                }
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call timeout");
                }
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                if (this.rpcHook != null) {
                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                }
                return response;
            } catch (RemotingSendRequestException e) {
                log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

getMQClientAPIImpl().pullMessage最終通過(guò)channel寫(xiě)入并刷新隊(duì)列中疯兼。然后在消息服務(wù)端大體的處理邏輯是服務(wù)端收到新消息請(qǐng)求后然遏,如果隊(duì)列中沒(méi)有消息不急于返回,通過(guò)一個(gè)循環(huán)狀態(tài)吧彪,每次waitForRunning一段時(shí)間默認(rèn)5秒待侵,然后再check,如果broker一直沒(méi)有新新消息姨裸,第三次check的時(shí)間等到時(shí)間超過(guò)SuspendMaxTimeMills就返回空秧倾,如果在等待過(guò)程中收到了新消息直接調(diào)用notifyMessageArriving函數(shù)返回請(qǐng)求結(jié)果怨酝。“長(zhǎng)輪詢”的核心是中狂,Broker端HOLD住客戶端過(guò)來(lái)的請(qǐng)求一小段時(shí)間凫碌,在這個(gè)時(shí)間內(nèi)有新消息到達(dá),就利用現(xiàn)有的連接立刻返回消息給 Consumer 胃榕。長(zhǎng)輪詢的主動(dòng)權(quán)掌握在consumer中盛险,即使broker有大量的消息堆積也不會(huì)主動(dòng)推送給consumer。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末勋又,一起剝皮案震驚了整個(gè)濱河市苦掘,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌楔壤,老刑警劉巖鹤啡,帶你破解...
    沈念sama閱讀 211,123評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異蹲嚣,居然都是意外死亡递瑰,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門(mén)隙畜,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)抖部,“玉大人,你說(shuō)我怎么就攤上這事议惰∩骺牛” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,723評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵言询,是天一觀的道長(zhǎng)俯萎。 經(jīng)常有香客問(wèn)我,道長(zhǎng)运杭,這世上最難降的妖魔是什么夫啊? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,357評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮县习,結(jié)果婚禮上涮母,老公的妹妹穿的比我還像新娘。我一直安慰自己躁愿,他們只是感情好叛本,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,412評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著彤钟,像睡著了一般来候。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上逸雹,一...
    開(kāi)封第一講書(shū)人閱讀 49,760評(píng)論 1 289
  • 那天营搅,我揣著相機(jī)與錄音云挟,去河邊找鬼。 笑死转质,一個(gè)胖子當(dāng)著我的面吹牛园欣,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播休蟹,決...
    沈念sama閱讀 38,904評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼沸枯,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了赂弓?” 一聲冷哼從身側(cè)響起绑榴,我...
    開(kāi)封第一講書(shū)人閱讀 37,672評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎盈魁,沒(méi)想到半個(gè)月后翔怎,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,118評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡杨耙,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,456評(píng)論 2 325
  • 正文 我和宋清朗相戀三年赤套,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片珊膜。...
    茶點(diǎn)故事閱讀 38,599評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡于毙,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出辅搬,到底是詐尸還是另有隱情,我是刑警寧澤脖旱,帶...
    沈念sama閱讀 34,264評(píng)論 4 328
  • 正文 年R本政府宣布堪遂,位于F島的核電站,受9級(jí)特大地震影響萌庆,放射性物質(zhì)發(fā)生泄漏溶褪。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,857評(píng)論 3 312
  • 文/蒙蒙 一践险、第九天 我趴在偏房一處隱蔽的房頂上張望猿妈。 院中可真熱鬧,春花似錦巍虫、人聲如沸彭则。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,731評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)俯抖。三九已至,卻和暖如春瓦胎,著一層夾襖步出監(jiān)牢的瞬間芬萍,已是汗流浹背尤揣。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,956評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留柬祠,地道東北人北戏。 一個(gè)月前我還...
    沈念sama閱讀 46,286評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像漫蛔,于是被迫代替她去往敵國(guó)和親嗜愈。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,465評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容