RocketMQ 生產(chǎn)者 Producer 啟動過程

MQProducer

從類關(guān)系中可以看出检眯,MQProducer 有兩種實現(xiàn)方式溯警。一個是 DefaultMQProducer帘皿,另一個是 TransactionMQProducer揭北。

  • DefaultMQProducer: 我們常用的生產(chǎn)者寨辩。
  • TransactionMQProducer:繼承自 DefaultMQProducer圃庭,并支持事務(wù)消息锄奢。

下面我們來分析下 DefaultMQProducer 啟動的過程。

啟動示例

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("....");
            ......
            producer.start();
            ......
        }catch(Exception e){}
    }
}

創(chuàng)建 DefaultMQProducer 實例剧腻,然后制定一些參數(shù)拘央,調(diào)用 start() 方法就開啟了生產(chǎn)者。

DefaultMQProducer 參數(shù)分析

public class DefaultMQProducer extends ClientConfig implements MQProducer {

    //producer 組名
    private String producerGroup;

    // Topic 名字书在,默認為“TBW102”
    private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;

    // 創(chuàng)建 Topic 默認的4個隊列
    private volatile int defaultTopicQueueNums = 4;

    // 發(fā)送消息超時時間
    private int sendMsgTimeout = 3000;

    // 當發(fā)送的消息大于 4K 時灰伟,開始壓縮消息。
    private int compressMsgBodyOverHowmuch = 1024 * 4;

    //同步發(fā)送消息,發(fā)送失敗時再嘗試發(fā)送2次數(shù)
    private int retryTimesWhenSendFailed = 2;

    // 異步發(fā)送消息栏账,發(fā)送失敗時再嘗試發(fā)送2次數(shù)
    private int retryTimesWhenSendAsyncFailed = 2;

    //發(fā)送broker消息存儲失敗時帖族,是否嘗試去試發(fā)送其他的broker
    private boolean retryAnotherBrokerWhenNotStoreOK = false;

    //最大允許發(fā)送字節(jié)數(shù)
    private int maxMessageSize = 1024 * 1024 * 4; // 4M

DefaultMQProducer 中定義的類屬性

  • producerGroup: 生產(chǎn)者組名
  • createTopicKey :Topic 名字,默認為“TBW102”
  • defaultTopicQueueNums :創(chuàng)建 Topic 默認的4個隊列
  • sendMsgTimeout :默認發(fā)送消息3秒超時
  • compressMsgBodyOverHowmuch :當發(fā)送的消息大于 4K 時挡爵,開始壓縮消息竖般。
  • retryTimesWhenSendFailed :同步發(fā)送消息,發(fā)送失敗時再嘗試發(fā)送2次數(shù)茶鹃。
  • retryTimesWhenSendAsyncFailed :異步發(fā)送消息涣雕,發(fā)送失敗時再嘗試發(fā)送2次數(shù)
  • retryAnotherBrokerWhenNotStoreOK :發(fā)送broker消息存儲失敗時,是否嘗試去試發(fā)送其他的broker

DefaultMQProducer 還有可以設(shè)置其他的參數(shù)闭翩,這里就不說明了挣郭。

Producer 啟動

public void start() throws MQClientException {
    this.defaultMQProducerImpl.start();
}

public void start() throws MQClientException {
    this.start(true);
}

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        // 1. 只有 serviceState 狀態(tài)為 CREATE_JUST 時,才啟動 Producer
        case CREATE_JUST:
            //2. 防止啟動多個 Producer疗韵,先把 serviceState 狀態(tài)修改為 START_FAILED兑障。
            this.serviceState = ServiceState.START_FAILED;
            // 3. 檢查 groupName 是否合法
            this.checkConfig();

            //4. 判斷是否需要設(shè)置 InstanceName 。
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
            // 5. 構(gòu)建 MQClientInstance 對象伶棒。
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            // 6. 將 DefaultMQProducerImpl 對象注冊到 ConcurrentHashMap<String/* group */, MQProducerInner> producerTable 中
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }
            // 7.以主題名"TBW102"為key值旺垒,新初始化的TopicPublishInfo對象為value值存入DefaultMQProducerImpl.topicPublishInfoTable變量中
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
            // 8. 啟動 第五步創(chuàng)建的 MQClientInstance 實例。
            if (startFactory) {
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            // 9. 設(shè)置DefaultMQProducerImpl的ServiceState為RUNNING
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }
    // 10.  向所有的 broker 發(fā)送心跳和上傳 FilterClass
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
  1. 啟動Producer的時候判斷 serviceState 的當前狀態(tài)肤无,只有 serviceState 狀態(tài)為 CREATE_JUST 時先蒋,才啟動 Producer。否則拋出異常信息宛渐。

2竞漾、同時防止啟動多個 Producer,先把 serviceState 狀態(tài)修改為 START_FAILED窥翩。

3业岁、 檢查 groupName 是否合法。比如不能為空寇蚊,是否符合正則 ^[%|a-zA-Z0-9_-]+$笔时,并且最大長度不能超過 255(CHARACTER_MAX_LENGTH = 255);
groupName 也不能等于 DEFAULT_PRODUCER仗岸。只要滿足上面條件允耿,則拋異常信息。

4扒怖、如果 producerGroup 不等于 CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER" ,然后調(diào)用 changeInstanceNameToPID() 方法判斷名字不是 "DEFAULT" 則更改 instanceName较锡。

public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
        this.instanceName = String.valueOf(UtilAll.getPid());
    }
}
public static int getPid() {
    RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
    String name = runtime.getName(); // format: "pid@hostname"
    try {
        return Integer.parseInt(name.substring(0, name.indexOf('@')));
    .....
}

5、構(gòu)建 MQClientInstance 對象盗痒。

public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }
  • 5.1 首先生成 clientId:ip@instanceName 或 ip@instanceName@unitName
  • 5.2 如果 factoryTable 中是不已經(jīng)存在 MQClientInstance 實例蚂蕴,則創(chuàng)建。 (下面有單獨分析該源碼)

6、將 DefaultMQProducerImpl 對象注冊到 ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();

private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();

public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
    if (null == group || null == producer) {
        return false;
    }
    MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
    if (prev != null) {
        log.warn("the producer group[{}] exist already.", group);
        return false;
    }
    return true;
}

7骡楼、以主題名"TBW102"為key值熔号,新初始化的TopicPublishInfo對象為value值存入DefaultMQProducerImpl.topicPublishInfoTable變量中

8、調(diào)用 第五步創(chuàng)建的 MQClientInstance 實例 的start() 方法鸟整。
該方法做了很多事情:

  • 獲取NameServer地址
  • 啟動 Netty 客戶端服務(wù)
  • 設(shè)置多個定時任務(wù)
  • 開啟 pullMessageService 服務(wù)
  • 開啟 rebalanceService 服務(wù)
  • 開啟 發(fā)送消息服務(wù)

下面有具體代碼分析MQClientInstance.start() 方法跨嘉。

9、設(shè)置DefaultMQProducerImpl的ServiceState為RUNNING
10吃嘿、向所有的 broker 發(fā)送心跳和上傳 FilterClass

創(chuàng)建MQClientInstance實例(第5.2步)

上面 5.2 步驟中創(chuàng)建MQClientInstance 的代碼如下:

public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
    this.clientConfig = clientConfig;
    this.instanceIndex = instanceIndex;
    this.nettyClientConfig = new NettyClientConfig();
    this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
    this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
    this.clientRemotingProcessor = new ClientRemotingProcessor(this);
    // Netty 中注冊接收請求的處理器。
    this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
    //設(shè)置 NameServer 地址梦重。
    if (this.clientConfig.getNamesrvAddr() != null) {
        this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
        log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
    }
    // 客戶端ID
    this.clientId = clientId;
    //創(chuàng)建 MQAdminImpl 對象進行和 NameServer 進行交互兑燥,比如創(chuàng)建Topic、獲取 Queue等
    this.mQAdminImpl = new MQAdminImpl(this);
    // 創(chuàng)建 pullMessageService 服務(wù)
    this.pullMessageService = new PullMessageService(this);
    // 創(chuàng)建 rebalanceService  服務(wù)
    this.rebalanceService = new RebalanceService(this);
    // 創(chuàng)建 DefaultMQProducer 服務(wù)
    this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
    this.defaultMQProducer.resetClientConfig(clientConfig);
    // 開啟 Comsumer 統(tǒng)計服務(wù)
    this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);

    log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
        this.instanceIndex,
        this.clientId,
        this.clientConfig,
        MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}

主要功能:

  • 創(chuàng)建 MQAdminImpl 對象進行和 NameServer 進行交互琴拧,比如創(chuàng)建Topic降瞳、獲取 Queue等
  • 創(chuàng)建 pullMessageService 服務(wù)
  • 創(chuàng)建 rebalanceService 服務(wù),供 Consumer 端使用
  • 創(chuàng)建 DefaultMQProducer 服務(wù)蚓胸,
  • 開啟 Comsumer 統(tǒng)計服務(wù)挣饥。統(tǒng)計最近一段時間內(nèi),消費成功個數(shù)沛膳、消費失敗個數(shù)等信息扔枫。

啟動MQClientInstance 服務(wù) (第8步)

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // 1. 如果配置NameServer地址,則從默認服務(wù)器地址中獲取(該地址不可改變)
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // 2. 啟動 Netty 客戶端服務(wù)
                this.mQClientAPIImpl.start();
                // 3. 設(shè)置多個定時任務(wù)
                this.startScheduledTask();
                // 4. 開啟 pullMessageService 服務(wù)
                this.pullMessageService.start();
                // 5. 開啟 rebalanceService 服務(wù)
                this.rebalanceService.start();
                // 6. 開啟 發(fā)送消息服務(wù)
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

主要這幾步驟操作锹安。

  • 1短荐、獲取NameServer地址
    如果啟動 Producer 時沒有指定 NameServer,則程序會向一個Http地址發(fā)送請求來獲取NameServer地址叹哭。通過這種方式可以動態(tài)的配置 NameServer忍宋。從而達到動態(tài)增加和刪除NameServer服務(wù)。
public static String getWSAddr() {
    String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
    String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
    String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
    if (wsDomainName.indexOf(":") > 0) {
        wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
    }
    return wsAddr;
}
  • 2风罩、啟動 Netty 客戶端服務(wù)
  • 3糠排、調(diào)用startScheduledTask() 方法設(shè)置多個定時任務(wù)
  • 4、開啟 pullMessageService 服務(wù)
  • 5超升、開啟 rebalanceService 服務(wù)
  • 6入宦、開啟 發(fā)送消息服務(wù)

startScheduledTask() 方法:定時任務(wù)

private void startScheduledTask() {
    // 1.如果 NameServer 地址默認沒配置,則定時向一個Http地址獲取 
    if (null == this.clientConfig.getNamesrvAddr()) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }

    // 2. 定時的從 NameServer 中獲取 Topic廓俭、broker云石、queue 相關(guān)信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

    // 3. 定時清理無效的Broker,并向所有的Broker 發(fā)送心跳數(shù)據(jù)
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

    // 4. 定時的持久化 Consumer 端消費每個 queue的 offset 數(shù)據(jù)研乒。
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    // 5. 調(diào)整消費端的線程數(shù)
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
}
  • 1汹忠、定時更新 NameServer 地址
    每個2分鐘,程序會向一個Http地址發(fā)送請求來獲取NameServer地址來動態(tài)更新NameServer地址。

  • 2宽菜、 定時的從 NameServer 中獲取 Topic谣膳、broker、queue 相關(guān)信息
    默認每隔 30秒去 NameServer 中獲取Topic铅乡、broker继谚、queue等相關(guān)信息。
    如果有新broker注冊或下線阵幸,producer端會在30秒之內(nèi)感知花履。

  • 3、定時清理無效的Broker挚赊,并向所有的Broker 發(fā)送心跳數(shù)據(jù).
    默認每隔 30 秒向 Broker 發(fā)送心跳數(shù)據(jù) 和 用戶自定義的 filterclass 類诡壁。

  • 4、定時的持久化 Consumer 端消費每個 queue的 offset 數(shù)據(jù)荠割。
    默認每隔 5 秒持久或 Consumer 消費的 queue 的 offset信息妹卿。
    持久化分為,遠程持久化和本地持久化蔑鹦。
    MessageModel.CLUSTERING 模式 queue的offset 保存到 broker上夺克。
    BROADCASTING("BROADCASTING") 模式 queue 的 offset 保存在本地。

  • 5嚎朽、調(diào)整消費端的線程數(shù)
    每隔 1 分鐘計算每一個queue中消息擠壓的數(shù)量铺纽,如果超過100000條,則增加消費線程的并發(fā)數(shù)火鼻,如果小于80000條則減少消費者的線程數(shù)室囊。
    不過進入源碼中看,調(diào)整消費者的線程數(shù)都注釋掉了魁索。


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末融撞,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子粗蔚,更是在濱河造成了極大的恐慌尝偎,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,561評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件鹏控,死亡現(xiàn)場離奇詭異致扯,居然都是意外死亡,警方通過查閱死者的電腦和手機当辐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,218評論 3 385
  • 文/潘曉璐 我一進店門抖僵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人缘揪,你說我怎么就攤上這事耍群∫骞穑” “怎么了?”我有些...
    開封第一講書人閱讀 157,162評論 0 348
  • 文/不壞的土叔 我叫張陵蹈垢,是天一觀的道長慷吊。 經(jīng)常有香客問我,道長曹抬,這世上最難降的妖魔是什么溉瓶? 我笑而不...
    開封第一講書人閱讀 56,470評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮谤民,結(jié)果婚禮上堰酿,老公的妹妹穿的比我還像新娘。我一直安慰自己张足,他們只是感情好胞锰,可當我...
    茶點故事閱讀 65,550評論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著兢榨,像睡著了一般。 火紅的嫁衣襯著肌膚如雪顺饮。 梳的紋絲不亂的頭發(fā)上吵聪,一...
    開封第一講書人閱讀 49,806評論 1 290
  • 那天,我揣著相機與錄音兼雄,去河邊找鬼吟逝。 笑死,一個胖子當著我的面吹牛赦肋,可吹牛的內(nèi)容都是我干的块攒。 我是一名探鬼主播,決...
    沈念sama閱讀 38,951評論 3 407
  • 文/蒼蘭香墨 我猛地睜開眼佃乘,長吁一口氣:“原來是場噩夢啊……” “哼囱井!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起趣避,我...
    開封第一講書人閱讀 37,712評論 0 266
  • 序言:老撾萬榮一對情侶失蹤庞呕,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后程帕,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體住练,經(jīng)...
    沈念sama閱讀 44,166評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,510評論 2 327
  • 正文 我和宋清朗相戀三年愁拭,在試婚紗的時候發(fā)現(xiàn)自己被綠了讲逛。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,643評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡岭埠,死狀恐怖盏混,靈堂內(nèi)的尸體忽然破棺而出蔚鸥,到底是詐尸還是另有隱情,我是刑警寧澤括饶,帶...
    沈念sama閱讀 34,306評論 4 330
  • 正文 年R本政府宣布株茶,位于F島的核電站,受9級特大地震影響图焰,放射性物質(zhì)發(fā)生泄漏启盛。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,930評論 3 313
  • 文/蒙蒙 一技羔、第九天 我趴在偏房一處隱蔽的房頂上張望僵闯。 院中可真熱鬧,春花似錦藤滥、人聲如沸鳖粟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,745評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽向图。三九已至,卻和暖如春标沪,著一層夾襖步出監(jiān)牢的瞬間榄攀,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,983評論 1 266
  • 我被黑心中介騙來泰國打工金句, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留檩赢,地道東北人。 一個月前我還...
    沈念sama閱讀 46,351評論 2 360
  • 正文 我出身青樓违寞,卻偏偏與公主長得像贞瞒,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子趁曼,可洞房花燭夜當晚...
    茶點故事閱讀 43,509評論 2 348

推薦閱讀更多精彩內(nèi)容