RocketMQ Consumer接收消息流程

?這節(jié)介紹Consumer接收消息的流程垮抗,分為Pull和Push模式懒震。

1. 初始化

?上一節(jié)講Rebalance時提到鳄厌,Consumer接受客戶端有兩種方式:

  1. Broker發(fā)現(xiàn)客戶端列表有變化,通知所有Consumer執(zhí)行Rebalance
  2. Consumer定時每20秒自動執(zhí)行Rebalance

其中1.的通知到達Consumer后拼缝,會立即觸發(fā)Rebalance,然后會重置2.的定時器等待時間彰亥。二者最后通知Consumer的方式為

  1. Push模式:當有新的Queue分配給客戶端時咧七,會新包裝一個PullRequest,用于后續(xù)自動拉取消息任斋,具體到DefaultMQPushConsumerImpl的executePullRequestImmediately方法
  2. Pull模式:回調(diào)DefaultMQPullConsumerImpl的MessageQueueListener有Queue發(fā)生改變

2. Push模式

?executePullRequestImmediately的內(nèi)容為:

public void executePullRequestImmediately(final PullRequest pullRequest) {
    this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}

即將PullRequest對象傳給了PullMessageService的executePullRequestImmediately方法:

public void executePullRequestImmediately(final PullRequest pullRequest) {
    try {
        this.pullRequestQueue.put(pullRequest);
    } catch (InterruptedException e) {
        log.error("executePullRequestImmediately pullRequestQueue.put", e);
    }
}

PullMessageService的結構如下:

file

內(nèi)部維護著一個LinkedBlockingQueue屬性pullRequestQueue继阻,用于存儲待處理的PullRequest;還有一個ScheduledExecutorService,用于延期處理PullRequest废酷。具體流程如下:

file
  1. RebalanceImpl調(diào)用DefaultMQPushConsumerImpl的executePullRequestImmediately方法瘟檩,傳入PullRequest
  2. DefaultMQPushConsumerImpl內(nèi)部調(diào)用PullMessageService的executePullRequestImmediately方法,該方法會把傳入的PullRequest對象放到LinkedBlockingQueue中進行存儲澈蟆,等待后續(xù)處理墨辛。
  3. PullMessageService會循環(huán)從隊列中出隊一個PullRequest,并調(diào)用自身的pullMessage用于后續(xù)處理趴俘。該動作會從MQClientInstance中選擇對應的客戶端實例DefaultMQPushConsumerImpl睹簇,并委托給它的pullMessage方法。
  4. DefaultMQPushConsumerImpl會先判斷當前請求是否滿足條件寥闪,如果不滿足條件太惠,會調(diào)用PullMessage的executePullRequestLater方法,將當前請求延后處理疲憋。如果滿足條件凿渊,會封裝一個PullCallback對象用于處理異步消息,并調(diào)用PullAPIWrapper異步請求Broker拉取消息缚柳。

從上面的過程可以看出埃脏,Push模式內(nèi)部還是客戶端主動去拉取的,即所謂的封裝拉模式以實現(xiàn)推模式,簡單示意圖如下:

file

內(nèi)部通過PullMessageService循環(huán)的從PullRequest對應MessageQueue中主動拉取數(shù)據(jù)秋忙。

2.1. DefaultMQPushConsumerImpl.pullMessage(PullRequest)

?該方法用于完成從MessageQueue拉取消息的過程剂癌,主要過程如下:

file
  1. 判斷該MessageQueue對應的PullRequest是否已經(jīng)標記為drop,如果是則直接返回

  2. 進行一系列的檢查翰绊,如果檢查不通過佩谷,則等待一定時間后再放回PullMessageService的待處理隊列中旁壮,主要是通過PullMessageService中的ScheduledExecutorService來做到延遲執(zhí)行,涉及的情況包括:

    1. 如果客戶端未準備就緒(DefaultMQPushCOnsumerImpl執(zhí)行start后status為RUNNING)谐檀,則延遲PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION(3000)后再放回PullMessage的隊列中
    2. 如果是暫停狀態(tài)抡谐,則延遲PULL_TIME_DELAY_MILLS_WHEN_SUSPEND(1000)后再放回PullMessageService的等待隊列中
    3. 如果緩存的消息數(shù)大于配置的拉取線程數(shù)閾值(默認1000),則等待PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL(50)后再返回等待隊列中處理
    4. 如果緩存的消息大小大于配置的拉取大小閾值(默認100M)桐猬,則等待PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL(50)后再返回等待隊列中處理
    5. 緩存的數(shù)據(jù)offset相差的偏移量超過設定值(默認2000)麦撵,則等待PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL(50)后再返回等待隊列中處理
    6. 如果沒有訂閱MessageQueue對應的topic,則等待PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION(3000)后再返回隊列中處理
  3. 包裝PullCallback對象溃肪,并調(diào)用PullAPIWrapper發(fā)起異步請求拉取消息

上面通過PullAPIWrapper收到結果后會將結果包裝為PullResult對象并回調(diào)PullCallback免胃。PullCallback和PullResult的定義如下:

public interface PullCallback {
    void onSuccess(final PullResult pullResult);
    
    void onException(final Throwable e);
}
public class PullResult {
    private final PullStatus pullStatus;//請求狀態(tài)
    private final long nextBeginOffset;//Broker返回的下一次開始消費的offset
    private final long minOffset;
    private final long maxOffset;
    private List<MessageExt> msgFoundList;//消息列表,一次請求返回一批消息
}

下面為pullMessage方法處理異步返回結果的流程:

file
  1. 如果請求失敗惫撰,則等待PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION(3000)后再放回PullMessageService的待處理隊列中羔沙;處理成功則進入2.
  2. 調(diào)用PullAPIWrapper對結果進行預處理
  3. 根據(jù)請求狀態(tài)進行處理
    1. 有新消息(FOUND)
      1. 設置PullRequest下次開始消費的起始位置為PullResult的nextBeginOffset
      2. 如果結果列表為空則不延遲,立馬放到PullMessageService的待處理隊列中厨钻,否則進入3
      3. 將PullResult中的結果List<MessageExt>放入ProcessQueue的緩存中扼雏,并通知ConsumeMessageService處理
      4. 將該PullRequest放回待處理隊列中等待再次處理,如果有設置拉取的間隔時間夯膀,則等待該時間后再翻到隊列中等待處理诗充,否則直接放到隊列中等待處理
    2. 沒有新消息(NO_NEW_MSG)
      1. 設置PullRequest下次開始消費的起始位置為PullResult的nextBeginOffset
      2. 如果緩存的待消費消息數(shù)為0,則更新offset存儲
      3. 將PullRequest立馬放到PullMessageService的待處理隊列中
    3. 沒有匹配的消息(NO_MATCHED_MSG)
      1. 設置PullRequest下次開始消費的起始位置為PullResult的nextBeginOffset
      2. 如果緩存的待消費消息數(shù)為0诱建,則更新offset存儲
      3. 將PullRequest立馬放到PullMessageService的待處理隊列中
    4. 不合法的偏移量(OFFSET_ILLEGAL)
      1. 設置PullRequest下次開始消費的起始位置為PullResult的nextBeginOffset
      2. 標記該PullRequset為drop
      3. 10s后再更新并持久化消費offset蝴蜓;再通知Rebalance移除該MessageQueue

?下面先介紹下ProcessQueue,這里只標識幾個相關的屬性:

public class ProcessQueue {
    private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
    //緩存的待消費消息,按照消息的起始offset排序
    private final TreeMap</*消息的起始offset*/Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
    //緩存的待消費消息數(shù)量
    private final AtomicLong msgCount = new AtomicLong();
    //緩存的待消費消息大小
    private final AtomicLong msgSize = new AtomicLong();
    private final Lock lockConsume = new ReentrantLock();
    /**
     * A subset of msgTreeMap, will only be used when orderly consume
     */
    private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
    private final AtomicLong tryUnlockTimes = new AtomicLong(0);
    private volatile long queueOffsetMax = 0L;
    private volatile boolean dropped = false;
    //最近執(zhí)行pull的時間
    private volatile long lastPullTimestamp = System.currentTimeMillis();
    //最近被客戶端消費的時間
    private volatile long lastConsumeTimestamp = System.currentTimeMillis();
    private volatile boolean locked = false;
    private volatile long lastLockTimestamp = System.currentTimeMillis();
    //當前是否在消費俺猿,用于順序消費模式励翼,對并行消費無效
    private volatile boolean consuming = false;
    private volatile long msgAccCnt = 0;
}

ProcessQueue展示了MessageQueue的消費情況。上面提到辜荠,發(fā)起pull請求后如果有數(shù)據(jù)汽抚,會先放到ProcessQueue的緩存中,即msgTreeMap屬性伯病,因而緩存的消息會按照消息的起始offset被排序存儲造烁。通過ProcessQueue可以查看MessageQueue當前的處理情況,ProcessQueue還用于輔助實現(xiàn)順序消費午笛。

2.2 ConsumeMessageService

?異步返回的消息內(nèi)容將交給ConsumeMessageService處理惭蟋,ConsumeMessageService是個接口,方法定義如下:

public interface ConsumeMessageService {
    void start();

    void shutdown();

    void updateCorePoolSize(int corePoolSize);

    void incCorePoolSize();

    void decCorePoolSize();

    int getCorePoolSize();

    ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);

    void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume);
}

通過定義可見药磺,要求實現(xiàn)類提供異步處理的功能告组。內(nèi)部提供的實現(xiàn)類有:

file

ConsumeMessageConcurrentlyService:并行消費;ConsumeMessageOrderlyService:順序消費癌佩,這里重點看ConsumeMessageConcurrentlyService木缝。異步請求后會將拉取的新消息列表交給submitConsumeRequest方法處理便锨,如下:

file

該方法會將傳入的消息列表分裝為一個ConsumeRequest,并提到到線程池中等待處理我碟。如果傳入的消息列表長度超過設定值(默認為1)放案,則會分多個批處理。

?在介紹消費具體過程之前先回顧客戶端啟動流程的Demo矫俺,接收消息的寫法如下:

public class Consumer {

    public static void main (String[] args) throws InterruptedException, MQClientException {

        // 實例化消費者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("GroupTest");

        // 設置NameServer的地址
        consumer.setNamesrvAddr ("localhost:9876");

        // 訂閱一個或者多個Topic吱殉,以及Tag來過濾需要消費的消息
        consumer.subscribe ("TopicTest", "*");
        // 注冊回調(diào)實現(xiàn)類來處理從broker拉取回來的消息
        consumer.registerMessageListener (new MessageListenerConcurrently () {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf ("%s Receive New Messages: %s %n", Thread.currentThread ().getName (), msgs);
                // 標記該消息已經(jīng)被成功消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動消費者實例
        consumer.start ();
        System.out.printf ("Consumer Started.%n");
    }
}

其中注冊了一個MessageListenerConcurrently,該類將用于用戶端處理消息厘托。

?回過來看ConsumeRequest友雳,該類實現(xiàn)了Runnable接口,會在run方法完成主要的處理工作铅匹,主要動作為:

  1. 調(diào)用DefaultMQPushConsumerImpl.executeHookBefore執(zhí)行前置hook動作
  2. 調(diào)用MessageListenerConcurrently.consumeMessage通知用戶端處理消息押赊,即上面demo內(nèi)容,會返回處理結果ConsumeConcurrentlyStatus
  3. 調(diào)用DefaultMQPushConsumerImpl.executeHookAfter執(zhí)行后置hook動作
  4. ConsumeMessageConcurrentlyService.processConsumeResult根據(jù)ConsumeConcurrentlyStatus執(zhí)行收尾動作

2.2.1. MessageListenerConcurrently.consumeMessage

?用戶真正接收消息并執(zhí)行處理動作的地方伊群,需要返回ConsumeConcurrentlyStatus告知框架處理結果。這里在方法里最好不要做耗時長的任務策精,快速處理后返回給框架結果舰始,避免消息堆積在線程池中⊙释啵可以將消息內(nèi)容復制一遍后再放到線程池中進行分發(fā)處理丸卷。

2.2.2. ConsumeMessageConcurrentlyService.processConsumeResult

?該方法主要在用戶消費完數(shù)據(jù)后進行收尾動作,過程如下:

file

ConsumerRequest在run方法的開始處询刹,實例化了一個ConsumeConcurrentlyContext對象谜嫉,用于后續(xù)傳遞內(nèi)容,該定義為:

public class ConsumeConcurrentlyContext {
    private final MessageQueue messageQueue;
    //重試的延遲級別,-1:不重試;0:由broker控制;>0由客戶端控制
    private int delayLevelWhenNextConsume = 0;
    //消息列表最后一個正常消費的消息索引號
    private int ackIndex = Integer.MAX_VALUE;
}

其中ackIndex表示最后一個正常消費的消息索引號(0從開始,0~ackIndex為正常消費)凹联,該位置后的消息表示沒法正常消費沐兰。該值由用戶端控制,可以通過ackIndex來控制需要重發(fā)的消息。

?ackIndex默認值為Integer.MAX_VALUE蔽挠,如果為該值則認為所有消息正常消費住闯,不存在錯誤。上面流程中統(tǒng)計成功和失敗也是根據(jù)ackIndex來判斷的澳淑,對于ackIndex后的消息比原,如果是集群消費模式,則會先嘗試發(fā)送回broker杠巡,由broker控制重試時機量窘;如果重試失敗,會收集這些失敗的消息氢拥,延遲5秒后再調(diào)用一次ConsumeMessageService.submitConsumeRequest讓用戶端再次處理蚌铜。最后會將處理成功的消息從ProcessQueue中移除锨侯,更新緩存,然后將q消費的偏移量記錄下來厘线,等待后臺線程同步到broker或者本地识腿。

?綜合上面的介紹,Push模式下的處理流程大致如下:

file

Push模式通過PullMessageService循環(huán)從監(jiān)聽的MessageQueue中以Pull模式拉取消息造壮,并分發(fā)給用戶注冊的MesageListenerConsurrently對象處理渡讼,完了之后會自動處理消息的重試,offset更新等動作耳璧,從而模擬消息從Broker端主動推動過來成箫。

2. Pull模式

?同Push模式一樣,Pull模式的觸發(fā)也是通過Rebalance旨枯,如下:

file

同開頭提及的一樣蹬昌,會回調(diào)DefaultMQPullConsumerImpl的MessageQueueListener有Queue發(fā)生改變。

?系統(tǒng)提供了MQPullConsumerScheduleService攀隔,可以定時以Pull模式拉取消息皂贩,并將結果通知MessageQueueListener,內(nèi)部的實現(xiàn)為:

class MessageQueueListenerImpl implements MessageQueueListener {
    @Override
    public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {//mqAll該topic下的所有q昆汹,mqDivided該實例分配到的q
        MessageModel messageModel =
            MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();
        switch (messageModel) {
            case BROADCASTING:
                MQPullConsumerScheduleService.this.putTask(topic, mqAll);//通知該topic下的監(jiān)聽器明刷,最新的所有q
                break;
            case CLUSTERING:
                MQPullConsumerScheduleService.this.putTask(topic, mqDivided);//通知該topic下的監(jiān)聽器,該實例分配的q
                break;
            default:
                break;
        }
    }
}

putTask會將分配到的新的MessageQueue包裝為一個PullTaskImpl满粗,PullTaskImpl實現(xiàn)了Runnable,會在后臺一直執(zhí)行辈末;而將不屬于自己處理的MessageQueue對應的PullTaskImpl停掉。PullTaskImpl會查找該MessageQueue所監(jiān)聽topic對應的處理類PullTaskCallback映皆,調(diào)用doPullTask挤聘,將具體動作讓用戶處理。

?MQPullConsumerScheduleService的例子為:

public class PullScheduleService {

    public static void main(String[] args) throws MQClientException {
        final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");

        scheduleService.setMessageModel(MessageModel.CLUSTERING);
        scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {//注冊topic的監(jiān)聽器

            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
                MQPullConsumer consumer = context.getPullConsumer();
                try {

                    long offset = consumer.fetchConsumeOffset(mq, false);
                    if (offset < 0)
                        offset = 0;

                    PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                    System.out.printf("%s%n", offset + "\t" + mq + "\t" + pullResult);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());//上報消費的offset捅彻,消費完后要主動上報

                    context.setPullNextDelayTimeMillis(100);//設置下一次觸發(fā)間隔
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        scheduleService.start();
    }
}

?也可以自己手動執(zhí)行pull组去,如下面的例子:

public class PullConsumer {
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");

        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }

}

?相較于Push模式,Pull模式則需要用戶自己控制消息的重試步淹,offset更新等動作添怔。下面附上該部分當時源碼閱讀過程做的筆記簡圖:

file

更多原創(chuàng)內(nèi)容請搜索微信公眾號:啊駝(doubaotaizi)

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市贤旷,隨后出現(xiàn)的幾起案子广料,更是在濱河造成了極大的恐慌,老刑警劉巖幼驶,帶你破解...
    沈念sama閱讀 211,123評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件艾杏,死亡現(xiàn)場離奇詭異,居然都是意外死亡盅藻,警方通過查閱死者的電腦和手機购桑,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評論 2 384
  • 文/潘曉璐 我一進店門畅铭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人勃蜘,你說我怎么就攤上這事硕噩。” “怎么了缭贡?”我有些...
    開封第一講書人閱讀 156,723評論 0 345
  • 文/不壞的土叔 我叫張陵炉擅,是天一觀的道長。 經(jīng)常有香客問我阳惹,道長谍失,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,357評論 1 283
  • 正文 為了忘掉前任莹汤,我火速辦了婚禮快鱼,結果婚禮上,老公的妹妹穿的比我還像新娘纲岭。我一直安慰自己抹竹,他們只是感情好,可當我...
    茶點故事閱讀 65,412評論 5 384
  • 文/花漫 我一把揭開白布止潮。 她就那樣靜靜地躺著窃判,像睡著了一般。 火紅的嫁衣襯著肌膚如雪沽翔。 梳的紋絲不亂的頭發(fā)上兢孝,一...
    開封第一講書人閱讀 49,760評論 1 289
  • 那天窿凤,我揣著相機與錄音仅偎,去河邊找鬼。 笑死雳殊,一個胖子當著我的面吹牛橘沥,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播夯秃,決...
    沈念sama閱讀 38,904評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼座咆,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了仓洼?” 一聲冷哼從身側響起介陶,我...
    開封第一講書人閱讀 37,672評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎色建,沒想到半個月后哺呜,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,118評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡箕戳,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,456評論 2 325
  • 正文 我和宋清朗相戀三年某残,在試婚紗的時候發(fā)現(xiàn)自己被綠了国撵。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,599評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡玻墅,死狀恐怖介牙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情澳厢,我是刑警寧澤环础,帶...
    沈念sama閱讀 34,264評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站赏酥,受9級特大地震影響喳整,放射性物質發(fā)生泄漏。R本人自食惡果不足惜裸扶,卻給世界環(huán)境...
    茶點故事閱讀 39,857評論 3 312
  • 文/蒙蒙 一框都、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧呵晨,春花似錦魏保、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至季二,卻和暖如春狰闪,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背市埋。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評論 1 264
  • 我被黑心中介騙來泰國打工谋旦, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人桑嘶。 一個月前我還...
    沈念sama閱讀 46,286評論 2 360
  • 正文 我出身青樓炊汹,卻偏偏與公主長得像,于是被迫代替她去往敵國和親逃顶。 傳聞我的和親對象是個殘疾皇子讨便,可洞房花燭夜當晚...
    茶點故事閱讀 43,465評論 2 348

推薦閱讀更多精彩內(nèi)容