RocketMQ 順序消費源碼分析

背景

rocketmq支持順序消費,是很多業(yè)務中要用的一個場景慨代,我就好奇他是怎么實現(xiàn)的,需要了解背后的原理啸如,是怎么支持順序消費的侍匙,這樣有問題的時候我們才能快速的定位問題,這是一個合格的架構師必備的能力叮雳。

分配MessageQueue

rocketmq 在啟動消費時想暗,會對topic的mq進行reblance,如果是新分配的message queue帘不,如果是順序消費说莫,即isorder為true。則需要先對該
message queue 獲取分布式鎖寞焙,獲取成功才能真正開始消費储狭,代碼入心:

boolean allMQLocked = true;
        List<PullRequest> pullRequestList = new ArrayList<>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                //新分配的message queue 如果是順序消費,需要先獲取鎖捣郊,獲取成功
                //則創(chuàng)建messagequeue 開始拉起數(shù)據辽狈,否則不能消費給mq。
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    //如果獲取失敗呛牲,則不消費這個mq刮萌。
                    allMQLocked = false;
                    continue;
                }
                //如果是順序消費,只有獲取成功娘扩,才開始消費的準備工作尊勿。
                this.removeDirtyOffset(mq);
                ProcessQueue pq = createProcessQueue(topic);
                pq.setLocked(true);
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }

        }

獲取鎖

獲取鎖的代碼不需要看,我們只需要關心下請求參數(shù)即可畜侦,因為關鍵實現(xiàn)在broker端:

 LockBatchRequestBody requestBody = new LockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.getMqSet().add(mq);

            try {
                Set<MessageQueue> lockedMq =
                    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
                for (MessageQueue mmqq : lockedMq) {
                    ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                    if (processQueue != null) {
                        processQueue.setLocked(true);
                        processQueue.setLastLockTimestamp(System.currentTimeMillis());
                    }
                }

                boolean lockOK = lockedMq.contains(mq);
                log.info("message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq);
                return lockOK;
            } catch (Exception e) {
                log.error("lockBatchMQ exception, " + mq, e);
            }

順序消費獲取鎖的代碼可用看出,需要告訴broker端三個參數(shù):

  • consumer group 消費分組躯保。
  • 客戶端id旋膳,即consumer的標識
  • mq,即message queue 是對那個queue的順序消費途事。

請求類型是LOCK_BATCH_MQ验懊,broker server 會用默認的processor來處理這個請擅羞。如果沒有獲取到鎖,則lockedMq是空的义图,沒有直减俏,則返回false,所以接下來碱工,我們看下服務端是怎么做的娃承,來保證這個順序消費。

Broker鎖實現(xiàn)

broker server 處理LOCK_BATCH_MQ 的請求時通過defaultRequestProcessorPair來負責處理怕篷,defaultRequestProcessorPair是AdminBrokerProcessor,實現(xiàn)邏輯在lockBatchMQ方法历筝,代碼如下:

private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);

        Set<MessageQueue> lockOKMQSet = new HashSet<>();
        //根據group和mq,嘗試對沒有被其他consumer鎖定會加鎖廊谓,只有沒有枷鎖的messagequeue梳猪,或者其他的鎖已經過期了,才能上鎖蒸痹。
        //selfLockOKMQSet 是成功獲取鎖的message queue
        Set<MessageQueue> selfLockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(
            requestBody.getConsumerGroup(),
            requestBody.getMqSet(),
            requestBody.getClientId());
        //看是否要請求其他的server春弥,客戶端發(fā)起的時false,broker發(fā)起的是true
        if (requestBody.isOnlyThisBroker() || !brokerController.getBrokerConfig().isLockInStrictMode()) {
            lockOKMQSet = selfLockOKMQSet;
        } else {
            //設置OnlyThisBroker為true叠荠,讓其他的server接到請求時不再請求其他的server了
            requestBody.setOnlyThisBroker(true);
            //獲取副本數(shù)
            int replicaSize = this.brokerController.getMessageStoreConfig().getTotalReplicas();
            //計算過半quorum
            int quorum = replicaSize / 2 + 1;

            if (quorum <= 1) {
                //如果就一個匿沛,則不需要再請求其他的broker server
                lockOKMQSet = selfLockOKMQSet;
            } else {
                //有多個副本,對所有broker嘗試加鎖蝙叛。
                final ConcurrentMap<MessageQueue, Integer> mqLockMap = new ConcurrentHashMap<>();
                //先對本地加鎖的mq 標記為1
                for (MessageQueue mq : selfLockOKMQSet) {
                    if (!mqLockMap.containsKey(mq)) {
                        mqLockMap.put(mq, 0);
                    }
                    mqLockMap.put(mq, mqLockMap.get(mq) + 1);
                }

                BrokerMemberGroup memberGroup = this.brokerController.getBrokerMemberGroup();

                if (memberGroup != null) {
                    Map<Long, String> addrMap = new HashMap<>(memberGroup.getBrokerAddrs());
                    addrMap.remove(this.brokerController.getBrokerConfig().getBrokerId());
                    final CountDownLatch countDownLatch = new CountDownLatch(addrMap.size());
                    requestBody.setMqSet(selfLockOKMQSet);
                    requestBody.setOnlyThisBroker(true);
                    for (Long brokerId : addrMap.keySet()) {
                        try {
                            this.brokerController.getBrokerOuterAPI().lockBatchMQAsync(addrMap.get(brokerId),
                                requestBody, 1000, new LockCallback() {
                                    @Override
                                    public void onSuccess(Set<MessageQueue> lockOKMQSet) {
                                        for (MessageQueue mq : lockOKMQSet) {
                                            if (!mqLockMap.containsKey(mq)) {
                                                mqLockMap.put(mq, 0);
                                            }
                                            //加鎖成功俺祠,對加鎖次數(shù)加1
                                            mqLockMap.put(mq, mqLockMap.get(mq) + 1);
                                        }
                                        countDownLatch.countDown();
                                    }

                                    @Override
                                    public void onException(Throwable e) {
                                        LOGGER.warn("lockBatchMQAsync on {} failed, {}", addrMap.get(brokerId), e);
                                        countDownLatch.countDown();
                                    }
                                });
                        } catch (Exception e) {
                            LOGGER.warn("lockBatchMQAsync on {} failed, {}", addrMap.get(brokerId), e);
                            countDownLatch.countDown();
                        }
                    }
                    try {
                        countDownLatch.await(2000, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        LOGGER.warn("lockBatchMQ exception on {}, {}", this.brokerController.getBrokerConfig().getBrokerName(), e);
                    }
                }
                //計算哪些mq是成功實現(xiàn)過半加鎖的,返回給客戶端
                for (MessageQueue mq : mqLockMap.keySet()) {
                    if (mqLockMap.get(mq) >= quorum) {
                        lockOKMQSet.add(mq);
                    }
                }
            }
        }

上面的代碼挺多借帘,主要是實現(xiàn)了兩個關鍵點蜘渣,分別是對本地mq 加鎖,和對其他的broker server 獲取鎖肺然,計算加鎖成功的broker server是否過半蔫缸,過半則成功,否則失敗际起。

  • 對本地message queue 加鎖
    看本broker server 的message queue 嘗試獲取鎖拾碌,能加鎖成功的條件是沒有加鎖的mq,或者已經加鎖了街望,但是已經過期了校翔,其他的都是被其他的客戶端鎖定中,關鍵代碼如下:
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
        ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
        if (groupValue != null) {
            LockEntry lockEntry = groupValue.get(mq);
            if (lockEntry != null) {
                //檢查clientid和是否過期
                boolean locked = lockEntry.isLocked(clientId);
                if (locked) {
                    lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                }

                return locked;
            }
        }

        return false;
    }
  • 分布式鎖
    分布式我們都知道需要通過zk灾前,redis防症,consul等實現(xiàn),但是rocketmq并沒有這樣做,個人理解是rocketmq 不想因為這個問題要依賴其他的外部組件蔫敲,因為依賴一個組件你還要對依賴組件的穩(wěn)定性饲嗽,所以自己巧妙的實現(xiàn)了對所有broker server message queue 加鎖時,應用了leader選舉的思想奈嘿,因為broker肯定是集群部署貌虾,不同的客戶端同時發(fā)起順序消費時,很有可能鏈接的不同的broker server裙犹,如果只對單broker server判斷獲取鎖成功是有問題的尽狠,通過對所有的broker server都獲取鎖,如果有一半以上獲取鎖成功伯诬,則肯定是只有一個客戶端能獲取到鎖晚唇,類似leader選舉的思路,是值得學習的地方盗似。

定期刷新鎖

順序消費的這個鎖也是一個鎖租約的機制哩陕,到了時間不續(xù)租,就釋放了赫舒,所以broker分布式鎖除了兩看consumer的客戶端id悍及,還有一個時間的限制,如果客戶端出現(xiàn)問題接癌,沒有主動更新鎖的時間心赶,則會被其他的客戶端獲取到鎖,續(xù)租也有可能是和其他的客戶端并發(fā)的缺猛,所以就有可能鎖續(xù)租失敗缨叫,失敗了就不能消費這個message queue了,所以在消費的時候需要檢查是否持有鎖荔燎,更新是通過一個定時任務更新的耻姥,時間周期為20秒一次,通過rocketmq.client.rebalance.lockInterval 變量控制有咨。

還有一個值得注意的是琐簇,一個topic有多個message queue,兩個客戶端同時發(fā)起順序消費時座享,在獲取分布式鎖時婉商,有可能兩個分別獲得部分mq的鎖,rocketmq的順序是保證在mq級別的渣叛。

分發(fā)消息

獲取到對應message queue的鎖后丈秩,就可以創(chuàng)建pullRequest請求到隊列messageRequestQueue 中,這時候拉消息的線程就會被換醒淳衙,去拉消息癣籽,拉到消息后挽唉,會把消息緩存在一個treeMap中,這個和并發(fā)消費是一樣的筷狼,添加到treeMap中,返回結果判斷是否需要提交新的ConsumeRequest task匠童,如果前面的消費任務已經消費完了埂材,則會返回true,即需要提交新的ConsumeRequest汤求,代碼如下:

public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) {
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }

順序消費在分發(fā)的時候俏险,不像并發(fā)消費一樣,默認一個請求提交一個ConsumeRequest task到線程執(zhí)行扬绪,來實現(xiàn)并發(fā)消費竖独。

順序消費如果沒有入在消費的判斷,在把消息加入到processQueue時會判斷有沒有線程在消費挤牛,如果有莹痢,則不能提交消費任務,只有沒有線程消費的時候墓赴,才創(chuàng)建一個ConsumeRequest task到線程池執(zhí)行, 因為有提交一個任務后竞膳,會不斷的從processQueue 的treemap 里獲取message,如果獲取不到了诫硕,才把consuming的標記設置為false坦辟,下次拉到消息時,就重新提交一個新的ConsumeRequest章办。

ConsumeRequest 的run 方法如下:

public void run() {
            
            .....
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || this.processQueue.isLocked() && !this.processQueue.isLockExpired()) {
                    final long beginTime = System.currentTimeMillis();
                    for (boolean continueConsume = true; continueConsume; ) {
                        if (this.processQueue.isDropped()) {
                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                            break;
                        }

                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && !this.processQueue.isLocked()) {
                            log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }

                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && this.processQueue.isLockExpired()) {
                            log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }

                        long interval = System.currentTimeMillis() - beginTime;
                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                            break;
                        }
                        //consumeBatchSize 默認是1锉走,從tree map里取出一批消息,默認是一條消息
                        final int consumeBatchSize =
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

                        List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
                        defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                        if (!msgs.isEmpty()) {
                            final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                            ConsumeOrderlyStatus status = null;

                            ConsumeMessageContext consumeMessageContext = null;
                                                  
                            //.... hook partion
                            long beginTimestamp = System.currentTimeMillis();
                            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                            boolean hasException = false;
                            try {
                                //這里需要加鎖藕届,一定是等前面一條消息處理完后挪蹭,才能繼續(xù)消費下一條消息。
                                this.processQueue.getConsumeLock().lock();
                                if (this.processQueue.isDropped()) {
                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                    break;
                                }
                                //執(zhí)行業(yè)務的消費代碼
                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                                log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
                                    UtilAll.exceptionSimpleDesc(e),
                                    ConsumeMessageOrderlyService.this.consumerGroup,
                                    msgs,
                                    messageQueue), e);
                                hasException = true;
                            } finally {
                                this.processQueue.getConsumeLock().unlock();
                            }

                            //去掉部分代碼
                            long consumeRT = System.currentTimeMillis() - beginTimestamp;
                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                        } else {
                            continueConsume = false;
                        }
                    }
                } else {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        return;
                    }

                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                }
            }
        }

代碼有點多翰舌,省掉了部分非關鍵的代碼嚣潜,ConsumeRequest 的run 方法主要干了如下幾件事情:

    1. 首先獲取鎖,這個鎖是以message queue為單位的椅贱,就是為每個message queue 創(chuàng)建了一個object懂算,通過對synchronized 對object 加鎖,防止并發(fā)執(zhí)行庇麦。
    1. 檢查processqueue 是否還被鎖住计技,就是前面說的,會定期更新鎖山橄,即續(xù)租成功垮媒,就還是locked,如果失敗,則不能消費睡雇。
    1. 檢查消費的時間萌衬,如果持續(xù)消費超過了1分鐘,說明消費有瓶頸它抱,則等10毫秒再繼續(xù)消費秕豫。
    1. 取消息,從msgTreeMap里獲取消息观蓄,默認是一次獲取1條混移,這里還有對這條消息做了一個暫存,存在consumingMsgOrderlyTreeMap里面侮穿,是用來消費成功后歌径,做commit offset的。
  • 5.獲取 processqueue的consumer lock亲茅,拿到鎖后回铛,即開始執(zhí)行業(yè)務的消費代碼,這里的鎖不是很理解芯急,順序消費的task 同時只有一個線程在運行勺届,前面已經對message queue加了一個大鎖。

  • 6.執(zhí)行業(yè)務的消費代碼娶耍,獲取消費結果免姿。

  • 7.處理消費結果,如果成功的情況下榕酒,會更新本地的offset胚膊,這里不更新到broker server端,還是統(tǒng)一通過定時任務上報給broker server的想鹰。

總結時刻

本文對rocketmq 的順序消費模式的代碼擼了一遍紊婉,讓我們了解了順序消費背后的原理和邏輯,即是怎么保證客戶端能順序消費消息的辑舷,主要有下幾點:

  1. 順序消費時group級別對message queue保證有順序喻犁。
  2. 開始消費message queue前需要獲取分布式鎖,這里和選舉leader一樣的思路何缓,通過對集群的broker都獲取鎖肢础,有一半獲取成功就說明加鎖成功。
  3. 順序消費時拉到消息后碌廓,只提交一個ConsumeRequest任務传轰,甚至有可能不提交,如果前面一個還在消費的情況下谷婆,通過一個ConsumeRequest來循環(huán)從msgTree里獲取慨蛙,默認一次取一條消息辽聊,來執(zhí)行業(yè)務的消費代碼,也就是單線程在執(zhí)行期贫,雖然是線程池跟匆。
  4. 每消費完一條消息,更新一次消費的offset唯灵。

注:目前看機會中贾铝,關注基礎架構,中間件埠帕,高并發(fā)系統(tǒng)建設和治理。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末玖绿,一起剝皮案震驚了整個濱河市敛瓷,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌斑匪,老刑警劉巖呐籽,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蚀瘸,居然都是意外死亡狡蝶,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進店門贮勃,熙熙樓的掌柜王于貴愁眉苦臉地迎上來贪惹,“玉大人,你說我怎么就攤上這事寂嘉∽嗨玻” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵泉孩,是天一觀的道長硼端。 經常有香客問我,道長寓搬,這世上最難降的妖魔是什么珍昨? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮句喷,結果婚禮上镣典,老公的妹妹穿的比我還像新娘。我一直安慰自己脏嚷,他們只是感情好骆撇,可當我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著父叙,像睡著了一般神郊。 火紅的嫁衣襯著肌膚如雪肴裙。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天涌乳,我揣著相機與錄音蜻懦,去河邊找鬼。 笑死夕晓,一個胖子當著我的面吹牛宛乃,可吹牛的內容都是我干的。 我是一名探鬼主播蒸辆,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼征炼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了躬贡?” 一聲冷哼從身側響起谆奥,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎拂玻,沒想到半個月后酸些,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡檐蚜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年魄懂,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片闯第。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡市栗,死狀恐怖,靈堂內的尸體忽然破棺而出乡括,到底是詐尸還是另有隱情肃廓,我是刑警寧澤,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布诲泌,位于F島的核電站盲赊,受9級特大地震影響,放射性物質發(fā)生泄漏敷扫。R本人自食惡果不足惜哀蘑,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望葵第。 院中可真熱鬧绘迁,春花似錦、人聲如沸卒密。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽哮奇。三九已至膛腐,卻和暖如春睛约,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背哲身。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工辩涝, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人勘天。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓怔揩,卻偏偏與公主長得像,于是被迫代替她去往敵國和親脯丝。 傳聞我的和親對象是個殘疾皇子商膊,可洞房花燭夜當晚...
    茶點故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內容