RocketMQ源碼分析----Consumer消費(fèi)進(jìn)度相關(guān)

在Consumer消費(fèi)的時(shí)候總有幾個(gè)疑問:

  • 消費(fèi)完成后育八,這個(gè)消費(fèi)進(jìn)度存在哪里
  • 消費(fèi)完成后补箍,還沒保存消費(fèi)進(jìn)度就掛了改执,會(huì)不會(huì)導(dǎo)致重復(fù)消費(fèi)

Consumer

消費(fèi)進(jìn)度保存

消費(fèi)完成后啸蜜,會(huì)返回一個(gè)ConsumeConcurrentlyStatus.CONSUME_SUCCESS告訴MQ消費(fèi)成功,以MessageListener的consumeMessage為入口分析辈挂。
消費(fèi)的時(shí)候衬横,是以ConsumeRequest類為Runnable對(duì)象,在線程池中進(jìn)行處理的终蒂,即ConsumeRequest的run方法會(huì)處理這個(gè)狀態(tài)

        @Override
        public void run() {

            //....
            status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
            // 如果這個(gè)ProcessQueue廢棄了蜂林,則不處理
            if (!processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
            }
        }

在消費(fèi)完成后,將status交給processConsumeResult處理拇泣,代碼如下

    public void processConsumeResult(//
                                     final ConsumeConcurrentlyStatus status, //
                                     final ConsumeConcurrentlyContext context, //
                                     final ConsumeRequest consumeRequest//
    ) {
         //....消費(fèi)成功或者失敗的處理
        
        // 將這批消息從ProcessQueue中移除噪叙,代表消費(fèi)完畢,并返回當(dāng)前ProcessQueue中的消息最小的offset
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            // 更新消費(fèi)進(jìn)度
            this.defaultMQPushConsumerImpl.getOffsetStore()
                .updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

在分析ProcessQueue的時(shí)候霉翔,說過removeMessage返回有兩種情況:

  1. 如果移除這批消息之后已經(jīng)沒有消息了睁蕾,那么返回ProcessQueue中最大的offset+1
  2. 如果還有消息,那么返回treeMap中最小的key债朵,即未消費(fèi)的消息中最小的offset

getOffsetStore返回RemoteBrokerOffsetStore子眶,看下其實(shí)現(xiàn)

    @Override
    public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
        if (mq != null) {
            // 通過MessageQueue獲取本地的對(duì)應(yīng)的消費(fèi)進(jìn)度
            AtomicLong offsetOld = this.offsetTable.get(mq);
            if (null == offsetOld) {
                offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
            }

            if (null != offsetOld) {
                //increaseOnly 為false則直接覆蓋
                //increaseOnly為true則會(huì)判斷更新的值比老的值大才會(huì)進(jìn)行更新
                if (increaseOnly) {
                    MixAll.compareAndIncreaseOnly(offsetOld, offset);
                } else {
                    offsetOld.set(offset);
                }
            }
        }
    }

這里的increaseOnly參數(shù)根據(jù)不同的情況傳入不同的值,有些情況下會(huì)出現(xiàn)并發(fā)修改的情況序芦,那么需要傳入true臭杰,內(nèi)部會(huì)進(jìn)行CAS的操作,能保證正確的賦值芝加,而一些場景下硅卢,只需要進(jìn)行直接覆蓋或者說沒有并發(fā)修改的問題那么傳入false就行了。

消費(fèi)進(jìn)度持久化

offsetTable是一個(gè)Map藏杖,其保存了消費(fèi)進(jìn)度将塑,這只一個(gè)內(nèi)存的結(jié)構(gòu),在Consumer啟動(dòng)的時(shí)候蝌麸,會(huì)啟動(dòng)一個(gè)定時(shí)任務(wù)將本地的數(shù)據(jù)同步到broker点寥,每persistConsumerOffsetInterval(默認(rèn)為5)秒進(jìn)行一次操作

    // mqs為需要持久化的隊(duì)列集合
    public void persistAll(Set<MessageQueue> mqs) {
        if (null == mqs || mqs.isEmpty())
            return;

        final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
        if (mqs != null && !mqs.isEmpty()) {
            // 遍歷本地的消費(fèi)進(jìn)度
            for(Map.Entry<MessageQueue, AtomicLong> entry:this.offsetTable.entrySet()){
                MessageQueue mq = entry.getKey();
                AtomicLong offset = entry.getValue();
                if (offset != null) {
                    // 如果該隊(duì)列在需要持久化的隊(duì)列中
                    if (mqs.contains(mq)) {
                        try {
                            // 將消費(fèi)進(jìn)度發(fā)送到broker
                            this.updateConsumeOffsetToBroker(mq, offset.get());
                        } catch (Exception e) {
                            log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
                        }
                    } else {//廢棄的消費(fèi)進(jìn)度
                        unusedMQ.add(mq);
                    }
                }
            }
        }
        // 如果有廢棄的MQ,則將其消費(fèi)進(jìn)度廢棄
        if (!unusedMQ.isEmpty()) {
            for (MessageQueue mq : unusedMQ) {
                this.offsetTable.remove(mq);
            }
        }
    }

傳入的是當(dāng)前Consumer分配的MessageQueue列表来吩,rebalance之后敢辩,可能分配的MessageQueue已經(jīng)變化,所以offsetTable里有些消費(fèi)進(jìn)度的隊(duì)列時(shí)不需要的弟疆,所以將它的消費(fèi)進(jìn)度廢棄
updateConsumeOffsetToBroker方法就是簡單的網(wǎng)絡(luò)請(qǐng)求戚长,將offset發(fā)送給Broker

消費(fèi)進(jìn)度提交

除了定時(shí)提交消費(fèi)進(jìn)度之外,在拉取消息的時(shí)候怠苔,會(huì)順便將本地的消費(fèi)進(jìn)度一起傳到broker同廉,例如查看拉取消息的方法DefaultMQPushConsumerImpl#pullMessage中的一段代碼

boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        // 集群消費(fèi)模式
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            // 通過offsetStore獲取當(dāng)前消費(fèi)進(jìn)度
            // ReadOffsetType.READ_FROM_MEMORY表示從本地獲取(即offsetTable)
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
            if (commitOffsetValue > 0) {//
                // 傳給Broker,讓其判斷是否需要保存消費(fèi)進(jìn)度
                commitOffsetEnable = true;
            }
        }
        // 構(gòu)造一些標(biāo)志位,這里主要看commitOffsetEnable值
        // 將commitOffsetEnable放到一個(gè)int類型的值中迫肖,讓broker判斷是否需要保存消費(fèi)進(jìn)度
                int sysFlag = PullSysFlag.buildSysFlag(//
                commitOffsetEnable, // commitOffset
                true, // suspend
                subExpression != null, // subscription
                classFilter // class filter
        );
        //....
            // 通過拉取消息請(qǐng)求锅劝,將commitOffsetValue和sysFlag傳給broker
            this.pullAPIWrapper.pullKernelImpl(//
                    pullRequest.getMessageQueue(), // 1
                    subExpression, // 2
                    subscriptionData.getSubVersion(), // 3
                    pullRequest.getNextOffset(), // 4
                    this.defaultMQPushConsumer.getPullBatchSize(), // 5
                    sysFlag, // 6
                    commitOffsetValue, // 7
                    BrokerSuspendMaxTimeMillis, // 8
                    ConsumerTimeoutMillisWhenSuspend, // 9
                    CommunicationMode.ASYNC, // 10
                    pullCallback// 11
            );

具體broker對(duì)消費(fèi)進(jìn)度的處理看后面分析

Broker

消費(fèi)進(jìn)度保存

RocketMQ的網(wǎng)絡(luò)請(qǐng)求都有一個(gè)RequestCode,更新消費(fèi)進(jìn)度的Code為UPDATE_CONSUMER_OFFSET蟆湖,通過查到其使用的地方故爵,找到對(duì)應(yīng)的Processor為ClientManageProcessor,其processRequest處理對(duì)應(yīng)的請(qǐng)求

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.HEART_BEAT:
                return this.heartBeat(ctx, request);
            case RequestCode.UNREGISTER_CLIENT:
                return this.unregisterClient(ctx, request);
            case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
                return this.getConsumerListByGroup(ctx, request);
            case RequestCode.UPDATE_CONSUMER_OFFSET:
                return this.updateConsumerOffset(ctx, request);
            case RequestCode.QUERY_CONSUMER_OFFSET:
                return this.queryConsumerOffset(ctx, request);
            default:
                break;
        }
        return null;
    }

更新消費(fèi)進(jìn)度的方法為updateConsumerOffset隅津,里面解析了請(qǐng)求體之后又調(diào)用了ConsumerOffsetManager.commitOffset方法

    public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
        // topic@group 
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        this.commitOffset(clientHost, key, queueId, offset);
    }

    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
        ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
        if (null == map) {
            map = new ConcurrentHashMap<Integer, Long>(32);
            map.put(queueId, offset);
            this.offsetTable.put(key, map);
        } else {
            Long storeOffset = map.put(queueId, offset);
            if (storeOffset != null && offset < storeOffset) {
                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}",
               clientHost, key, queueId, offset, storeOffset);
            }
        }
    }

邏輯也很簡單就不多說了诬垂,有意思的是,Broker的保存消費(fèi)進(jìn)度的結(jié)構(gòu)和Consumer類似伦仍,Broker多了一個(gè)維度剥纷,因?yàn)锽roker接收的是所有消費(fèi)者的進(jìn)度,而Consumer保存的是自己的
在Consumer的消費(fèi)進(jìn)度上報(bào)到Broker之后呢铆,Broker只是保存到內(nèi)存晦鞋,這并不可靠,大概也能猜出棺克,和Consumer一樣悠垛,也有一個(gè)定時(shí)任務(wù)將消費(fèi)進(jìn)度持久化。這時(shí)娜谊,先看下ConsumerOffsetManager這個(gè)類的繼承關(guān)系确买,他的父類是ConfigManager,這個(gè)東西很重要纱皆,是幾個(gè)重要配置信息持久化類湾趾,看下其繼承關(guān)系:


image.png

分別是訂閱關(guān)系管理,消費(fèi)進(jìn)度管理派草,Topic信息管理搀缠,和延遲隊(duì)列信息管理,這4個(gè)配置信息都需要通過ConfigManager去持久化和加載近迁,看下ConfigManager的幾個(gè)方法

public abstract class ConfigManager {
    // 將對(duì)象轉(zhuǎn)換成json串
    public abstract String encode();

    //將文件里內(nèi)容(json格式)的轉(zhuǎn)換成對(duì)象
    public boolean load() {
        String fileName = null;
            // 獲取文件地址
            fileName = this.configFilePath();
            // 將文件里的內(nèi)容讀取出來
            String jsonString = MixAll.file2String(fileName);
            // json轉(zhuǎn)換成指定對(duì)象的數(shù)據(jù)
            this.decode(jsonString);
    }
    // 配置文件地址
    public abstract String configFilePath();
    
    // 與load類似
    private boolean loadBak() {
        String fileName = null;
            fileName = this.configFilePath();
            String jsonString = MixAll.file2String(fileName + ".bak");
            this.decode(jsonString);
        return true;
    }
    // json轉(zhuǎn)換成指定對(duì)象的數(shù)據(jù)
    public abstract void decode(final String jsonString);
    // 將對(duì)象里的數(shù)據(jù)轉(zhuǎn)換成json并持久化到configFilePath()文件中
    public synchronized void persist() {
        String jsonString = this.encode(true);
            String fileName = this.configFilePath();
                MixAll.string2File(jsonString, fileName);
        
    }

    public abstract String encode(final boolean prettyFormat);

那么ConsumerOffsetManager會(huì)實(shí)現(xiàn)encode和decode方法并在某個(gè)地方定時(shí)調(diào)用persist方法艺普,查看其使用的地方,找到BrokerController的initialize方法鉴竭,有段定時(shí)任務(wù)如下:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.consumerOffsetManager.persist();
        } catch (Throwable e) {
            log.error("schedule persist consumerOffset error.", e);
        }
    }
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

可以看到歧譬,每flushConsumerOffsetInterval(默認(rèn)5000)毫秒會(huì)進(jìn)行一次持久化

拉取消息的時(shí)候保存消費(fèi)進(jìn)度

拉取消息的Code為RequestCode.PULL_MESSAGE,對(duì)應(yīng)的Processor為PullMessageProcessor搏存,找到其中消費(fèi)進(jìn)度處理的地方

// 上面說的consumer傳過來的commitOffsetEnable
// 當(dāng)Consumer本地消費(fèi)進(jìn)度大于0的時(shí)候這個(gè)參數(shù)為true
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.

// brokerAllowSuspend在處理消息請(qǐng)求的時(shí)候?yàn)閠rue瑰步,hold請(qǐng)求自己處理是false
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
// Master才需要保存進(jìn)度,slave只是同步broker的消息
storeOffsetEnable = storeOffsetEnable
        && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
    this.brokerController.getConsumerOffsetManager().commitOffset(
        RemotingHelper.parseChannelRemoteAddr(channel),
        requestHeader.getConsumerGroup(), 
        requestHeader.getTopic(), 
        requestHeader.getQueueId(), 
        requestHeader.getCommitOffset());//consumer傳上來的offset
}

總的來說:
當(dāng)broker為master的時(shí)候璧眠,且Consumer消費(fèi)進(jìn)度大于0則在拉取消息的時(shí)候順便將消費(fèi)進(jìn)度保存到broker

問題分析

重復(fù)消費(fèi)問題

在ProcessQueue的removeMessage的第二種情況有個(gè)問題缩焦,假設(shè)有如下情況:
批量拉取了4條消息ABCD兵钮,分別對(duì)應(yīng)的offset為400|401|402|403,此時(shí)consumeBatchSize(批量消費(fèi)數(shù)量舌界,默認(rèn)為1,即一條一條消費(fèi))泰演,那么會(huì)分4個(gè)線程去消費(fèi)這幾個(gè)消息呻拌,出現(xiàn)下面消費(fèi)次序
消費(fèi)D -> removeMessage -> 返回400(情況2)
消費(fèi)C -> removeMessage -> 返回400(情況2)
消費(fèi)B -> removeMessage -> 返回400(情況2)
消費(fèi)A -> removeMessage -> 返回404(情況1)

在消費(fèi)A之前,本地消費(fèi)進(jìn)度持久化到Broker之后睦焕,應(yīng)用宕機(jī)了藐握,那么此時(shí)Broker保存的是offset=400(準(zhǔn)確來說,在消費(fèi)完A且保存消費(fèi)進(jìn)度到broker之前垃喊,offset都是400)猾普。那么會(huì)有什么問題呢?
先假設(shè)消費(fèi)完DCB且消費(fèi)進(jìn)度上傳完成宕機(jī)本谜,然后重啟應(yīng)用初家,這時(shí)候會(huì)先從broker獲取應(yīng)該從哪里消費(fèi)(),因?yàn)镈CB消費(fèi)完成后都是保存400這個(gè)消費(fèi)進(jìn)度乌助,那么返回的是400溜在,這時(shí)候consumer會(huì)請(qǐng)求offset為400的消費(fèi),到這里他托,已經(jīng)重復(fù)消費(fèi)了DCB掖肋。

消費(fèi)進(jìn)度保存在哪里

  1. consumer保存在內(nèi)存,定時(shí)上傳broker
  2. broker保存在內(nèi)存赏参,定時(shí)刷新到磁盤文件

:以上沒有特別聲明的都是并發(fā)消費(fèi)模式

整體流程圖

image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末志笼,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子把篓,更是在濱河造成了極大的恐慌纫溃,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件韧掩,死亡現(xiàn)場離奇詭異皇耗,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)揍很,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門郎楼,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人窒悔,你說我怎么就攤上這事呜袁。” “怎么了简珠?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵阶界,是天一觀的道長虹钮。 經(jīng)常有香客問我,道長膘融,這世上最難降的妖魔是什么芙粱? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮氧映,結(jié)果婚禮上春畔,老公的妹妹穿的比我還像新娘。我一直安慰自己岛都,他們只是感情好律姨,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著臼疫,像睡著了一般择份。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上烫堤,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天荣赶,我揣著相機(jī)與錄音,去河邊找鬼鸽斟。 笑死讯壶,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的湾盗。 我是一名探鬼主播伏蚊,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼格粪!你這毒婦竟也來了躏吊?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤帐萎,失蹤者是張志新(化名)和其女友劉穎比伏,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體疆导,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡赁项,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了澈段。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片悠菜。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖败富,靈堂內(nèi)的尸體忽然破棺而出悔醋,到底是詐尸還是另有隱情,我是刑警寧澤兽叮,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布芬骄,位于F島的核電站猾愿,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏账阻。R本人自食惡果不足惜蒂秘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望淘太。 院中可真熱鬧姻僧,春花似錦、人聲如沸琴儿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽造成。三九已至,卻和暖如春雄嚣,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來泰國打工季惯, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留邮辽,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓港谊,卻偏偏與公主長得像骇吭,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子歧寺,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,721評(píng)論 13 425
  • metaq是阿里團(tuán)隊(duì)的消息中間件燥狰,之前也有用過和了解過kafka,據(jù)說metaq是基于kafka的源碼改過來的斜筐,他...
    菜鳥小玄閱讀 32,882評(píng)論 0 14
  • consumer 1.啟動(dòng) 有別于其他消息中間件由broker做負(fù)載均衡并主動(dòng)向consumer投遞消息龙致,Rock...
    veShi文閱讀 4,934評(píng)論 0 2
  • 連日數(shù)陰晴, 新芽臨寒風(fēng)顷链。 春曉寂寂冷目代, 晨鵲恰恰鳴。
    楓之然閱讀 128評(píng)論 6 16
  • 在廣袤的森林盡頭嗤练,佇立著一座古老巍峨的城堡榛了。城堡里住著一對(duì)姐妹,姐姐凱莉和妹妹雪莉煞抬。 雪莉有一雙烏黑明亮的大眼睛忽冻,...
    喜樂圓子閱讀 396評(píng)論 1 9