RocketMQ源碼閱讀(十)-Consumer消費(fèi)消息

1.消費(fèi)方式和消費(fèi)者組

1.消費(fèi)方式: 拉取和推送兩種(事實(shí)上所有從遠(yuǎn)程獲取數(shù)據(jù)都是這兩種方式).
2.消費(fèi)者組與消費(fèi)模式
多個(gè)消費(fèi)者組成一個(gè)消費(fèi)組, 兩種模式: 集群(消息被其中任何一個(gè)消息者消費(fèi)), 廣播模式(全部消費(fèi)者消費(fèi)).

2.Consumer消費(fèi)消息的基本流程

RocketMQ 分別使用 DefaultMQPullConsumer 和 DefaultMQPushConsumer 實(shí)現(xiàn)了拉取和推送兩種方式. 下面主要以DefaultMQPullConsumer為例進(jìn)行分析.

先看源碼中給出的Demo:

public class PullConsumerTest {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); //@1
        consumer.start(); //@2

        try {
            MessageQueue mq = new MessageQueue();
            mq.setQueueId(0);
            mq.setTopic("TopicTest3");
            mq.setBrokerName("vivedeMacBook-Pro.local");

            long offset = 26;

            long beginTime = System.currentTimeMillis();
            PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32); //@3
            System.out.printf("%s%n", System.currentTimeMillis() - beginTime);
            System.out.printf("%s%n", pullResult);
        } catch (Exception e) {
            e.printStackTrace();
        }

        consumer.shutdown();
    }
}

首先在@1處構(gòu)建Consumer并且制定其所屬的消費(fèi)者組. 在@2處啟動(dòng)Consumer, 并且在@3處拉取消息.

Consumer啟動(dòng)

事實(shí)上DefaultMQPullConsumer將所有操作都委托給DefaultMQPullConsumerImpl, 下面看DefaultMQPullConsumerImpl#start.

public void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();  //@1 

            this.copySubscription(); //@2

            if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPullConsumer.changeInstanceNameToPID(); //@3
            }

            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook); //@4

            //@5            
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            this.pullAPIWrapper = new PullAPIWrapper(//
                mQClientFactory, //
                this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
            
            //@6            
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
            //@7
            if (this.defaultMQPullConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
            } else {
                switch (this.defaultMQPullConsumer.getMessageModel()) {
                    case BROADCASTING:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
            }

            this.offsetStore.load();

            //@8
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;

                throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            //@9
            mQClientFactory.start();
            log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
            //@10
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PullConsumer service state not OK, maybe started once, "http://
                + this.serviceState//
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }
}

首先判斷當(dāng)前consumer的狀態(tài), 除了CREATE_JUST之外, 全部是非法狀態(tài).(這個(gè)容易理解, 因?yàn)闀r(shí)剛剛啟動(dòng), 不應(yīng)該處于其他狀態(tài)).
狀態(tài)合法后, 大體過(guò)程如下:

  • @1校驗(yàn)各配置項(xiàng)是否合法(consumerGroup, allocateMessageQueueStrategy, messageModel)
  • @2將當(dāng)前defaultMQPullConsumer中的訂閱關(guān)系復(fù)制到當(dāng)前rebalanceImpl(負(fù)載均衡器, 主要負(fù)責(zé)決定, 當(dāng)前的consumer應(yīng)該從哪些Queue中消費(fèi)消息)中.
  • @3如果是集群模式,則將當(dāng)前defaultMQPullConsumer實(shí)例名改為線程ID.
  • @4實(shí)例化MQClientInstance(這個(gè)類是一個(gè)大雜燴,負(fù)責(zé)管理client(consumer, producer), 并提供多中功能接口供各個(gè)Service(Rebalance, PullMessage等)調(diào)用)
  • @5初始化rebalance變量
  • @6初始化pullAPIWrapper(長(zhǎng)連接, 負(fù)責(zé)從broker處拉取消息, 然后利用ConsumeMessageService回調(diào)用戶的Listener執(zhí)行消息消費(fèi)邏輯)
  • @7構(gòu)建offsetStore消費(fèi)進(jìn)度存儲(chǔ)對(duì)象(有兩種實(shí)現(xiàn), Local和Rmote, Local存儲(chǔ)在本地磁盤上, 適用于BROADCASTING廣播消費(fèi)模式; 而Remote則將消費(fèi)進(jìn)度存儲(chǔ)在Broker上, 適用于CLUSTERING集群消費(fèi)模式).
  • @8向mqClientFactory注冊(cè)本消費(fèi)者
  • @9啟動(dòng)mqClientFactory(啟動(dòng)各種定時(shí)任務(wù), 如定時(shí)獲取nameserver地址, 定時(shí)清理下線的borker, 啟動(dòng)各種service, 如拉消息服務(wù), 負(fù)載均衡服務(wù))
  • @10將serviceState修改為ServiceState.RUNNING

Consumer獲取消息

Consumer獲取消息使用pullBlockIfNotFound方法, 方法簽名如下:

PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) 
      throws MQClientException, RemotingException, MQBrokerException, InterruptedException

該方法一共有4個(gè)參數(shù).

  • mq, 從哪個(gè)隊(duì)列中拉取消息;
  • subExpression, SubscriptionData中的subString;
  • offset, 消息拉取的offset;
  • maxNums, 最大拉取的消息數(shù)目.
    跟蹤該方法,最終會(huì)委托給DefaultMQPullConsumerImpl中的pullSyncImpl方法執(zhí)行, 代碼如下:
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout)
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();     //@1
    //@2
    if (null == mq) {
        throw new MQClientException("mq is null", null);

    }

    if (offset < 0) {
        throw new MQClientException("offset < 0", null);
    }

    if (maxNums <= 0) {
        throw new MQClientException("maxNums <= 0", null);
    }

    this.subscriptionAutomatically(mq.getTopic()); //@3

    int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); //@4
    //@5
    SubscriptionData subscriptionData;
    try {
        subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
            mq.getTopic(), subExpression);
    } catch (Exception e) {
        throw new MQClientException("parse subscription error", e);
    }

    long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;   //@6

    PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//
        mq, 
        subscriptionData.getSubString(), 
        0L, 
        offset, 
        maxNums, 
        sysFlag, 
        0, 
        this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), 
        timeoutMillis, 
        CommunicationMode.SYNC, 
        null
    );   //@7
    this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);  //@8
    //@9
    if (!this.consumeMessageHookList.isEmpty()) {
        ConsumeMessageContext consumeMessageContext = null;
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext.setConsumerGroup(this.groupName());
        consumeMessageContext.setMq(mq);
        consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
        consumeMessageContext.setSuccess(false);
        this.executeHookBefore(consumeMessageContext);
        consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
        consumeMessageContext.setSuccess(true);
        this.executeHookAfter(consumeMessageContext);
    }
    return pullResult;
}

過(guò)程概括如下:

  • @1檢驗(yàn)當(dāng)前consumer客戶端是否處于RUNNING狀態(tài), 否則非法;
  • @2檢查mq, offset, maxNums三個(gè)參數(shù)是否合法;
  • @3構(gòu)建rebalanceImpl中的SubscriptionData;
  • @4構(gòu)建sysFlag;
  • @5構(gòu)建當(dāng)前consumer的SubscriptionData(這一步和@3有點(diǎn)重復(fù));
  • @6從broker拉取消息時(shí)的超時(shí)時(shí)間;
  • @7從broker拉取消息;
  • @8對(duì)pullresult進(jìn)行處理, 這一步主要進(jìn)行兩個(gè)操作, a.更新消息隊(duì)列拉取消息Broker編號(hào)的映射, b.解析消息赦肋,并根據(jù)訂閱信息消息tagCode匹配合適消息.
  • @9如果HookList不為空, 執(zhí)行HookList中的操作.
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末锭部,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子扣猫,更是在濱河造成了極大的恐慌琳钉,老刑警劉巖似枕,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件求厕,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡离唬,警方通過(guò)查閱死者的電腦和手機(jī)后专,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)输莺,“玉大人戚哎,你說(shuō)我怎么就攤上這事∩┯茫” “怎么了型凳?”我有些...
    開(kāi)封第一講書人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)嘱函。 經(jīng)常有香客問(wèn)我甘畅,道長(zhǎng),這世上最難降的妖魔是什么往弓? 我笑而不...
    開(kāi)封第一講書人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任疏唾,我火速辦了婚禮,結(jié)果婚禮上函似,老公的妹妹穿的比我還像新娘槐脏。我一直安慰自己,他們只是感情好缴淋,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布准给。 她就那樣靜靜地躺著泄朴,像睡著了一般重抖。 火紅的嫁衣襯著肌膚如雪露氮。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 51,631評(píng)論 1 305
  • 那天钟沛,我揣著相機(jī)與錄音畔规,去河邊找鬼。 笑死恨统,一個(gè)胖子當(dāng)著我的面吹牛叁扫,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播畜埋,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼莫绣,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了悠鞍?” 一聲冷哼從身側(cè)響起对室,我...
    開(kāi)封第一講書人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎咖祭,沒(méi)想到半個(gè)月后掩宜,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡么翰,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年牺汤,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片浩嫌。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡檐迟,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出码耐,到底是詐尸還是另有隱情追迟,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布伐坏,位于F島的核電站怔匣,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏桦沉。R本人自食惡果不足惜每瞒,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望纯露。 院中可真熱鬧剿骨,春花似錦、人聲如沸埠褪。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至贷掖,卻和暖如春嫡秕,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背苹威。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工昆咽, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人牙甫。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓掷酗,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親窟哺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子泻轰,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,722評(píng)論 13 425
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)且轨,斷路器浮声,智...
    卡卡羅2017閱讀 134,659評(píng)論 18 139
  • 背景介紹 Kafka簡(jiǎn)介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)殖告。主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O...
    高廣超閱讀 12,835評(píng)論 8 167
  • consumer 1.啟動(dòng) 有別于其他消息中間件由broker做負(fù)載均衡并主動(dòng)向consumer投遞消息秘蛔,Rock...
    veShi文閱讀 4,936評(píng)論 0 2
  • 馬驍在哄笑中極力鎮(zhèn)定著自己的情緒沿量,想盡快入睡灶轰。 可是事與愿違其垄。 總是這樣,在人們極力追求和夢(mèng)想的事情上爽丹,老天總是那...
    行走的M閱讀 371評(píng)論 0 0