Kafka 入門1:系統(tǒng)架構(gòu)潘悼、基本概念以及偽集群搭建方法

一、Kafka 是什么呈昔?

Apache Kafka 本質(zhì)上是一種消息中間件挥等,用來(lái)可靠傳遞消息事件,用來(lái)管理消息隊(duì)列(Message Queue)堤尾,具有如下特點(diǎn):

  • 分布式的肝劲,支持在線水平擴(kuò)展;
  • 高吞吐郭宝、高性能:kafka 具有高的吞吐量辞槐,內(nèi)部采用消息的批量處理zero-copy 機(jī)制粘室,數(shù)據(jù)的存儲(chǔ)和獲取是本地磁盤順序批量操作榄檬,具有 O(1)的復(fù)雜度,消息處理的效率很高衔统;
  • 分布式發(fā)布/訂閱(生產(chǎn)/消費(fèi))鹿榜;
  • 依賴于 Zookeeper 保存 meta 信息海雪,以保證系統(tǒng)HA;
  • Kafka的動(dòng)態(tài)擴(kuò)容是通過(guò)Zookeeper來(lái)實(shí)現(xiàn)的舱殿;
  • 起源于Linkedin奥裸,使用 Scala 語(yǔ)言編寫。


    消息隊(duì)列--發(fā)布/訂閱

二沪袭、Kafka 集群架構(gòu)

一個(gè) Kafka 系統(tǒng)架構(gòu)包括如下幾個(gè)部分:

  • Producer:向 broker 發(fā)布消息(push 方式)的 client湾宙;
  • Consumer:從 broker 消費(fèi)(之前訂閱的)消息(pull 方式)的 client;
  • Consumer Group:消費(fèi)組(同類消費(fèi)者進(jìn)行歸類)冈绊,一個(gè) Consumer 只能屬于某一個(gè)組侠鳄;
  • Topic:主題,對(duì)同類消息進(jìn)行歸類死宣,一個(gè) Topic 可有多個(gè) Consumer伟恶;
  • Partition:分區(qū),用來(lái)存儲(chǔ)消息毅该,一個(gè) Topic 可包含多個(gè)分區(qū)知押;
  • Broker:即 Kafka server(扮演兩種角色:leader、follower)鹃骂,也就是常說(shuō)的 Kafka 節(jié)點(diǎn),可以通過(guò)增加 server 對(duì)集群進(jìn)行橫向擴(kuò)展罢绽;
  • Zookeeper 集群:管理 Kafka 的 meta 數(shù)據(jù)(比如partition offset畏线、消費(fèi)者生產(chǎn)者的狀態(tài)),當(dāng) Consumer Group 發(fā)生變化時(shí) rebalance良价。
Kafka 集群架構(gòu)圖

為便于理解寝殴,可以將 Kafka 系統(tǒng)類比為一個(gè)工廠倉(cāng)庫(kù):

kafka 系統(tǒng) 工廠倉(cāng)庫(kù)
消息 加工件
broker 庫(kù)管員
topic 貨架(同類加工件放在同一個(gè)貨架)
partition 貨架上的某一層
producer 上一級(jí)工序的產(chǎn)出
consumer 下一級(jí)工序的輸入

2.1 Partition 內(nèi)部結(jié)構(gòu)

Producer 向 broker push消息時(shí),會(huì)將該消息寫入到某topic的某partition下的某 LogSegment 文件中明垢。每個(gè) consumer 會(huì)保留它讀取到某個(gè) partition 的 offset蚣常,而 consumer 是通過(guò)zookeeper來(lái)保留offset的。
上面提到痊银,一個(gè)broker 可以管理多個(gè) Topic抵蚊,一個(gè) Topic 也可能包含多個(gè) Partition。每個(gè) Partition都是一個(gè)有序的溯革、不可變的結(jié)構(gòu)化的提交日志記錄的序列贞绳。

topic 中的 partition

一個(gè) Partition 會(huì)映射到一個(gè)邏輯 log 文件,一個(gè) partition 又包含多個(gè) LogSegment(每個(gè)segment大小一樣)致稀,一個(gè) LogSegment 中又包含多條message 記錄冈闭,LogSegment 中使用唯一標(biāo)識(shí) offset(64位Long型)來(lái)唯一標(biāo)識(shí)某條 message。

Partition 的 log 文件內(nèi)部結(jié)構(gòu)

LogSegment文件由兩部分組成抖单,分別為:

  • .index 文件: segment 索引文件萎攒;
  • .log 文件:數(shù)據(jù)文件遇八。

這兩個(gè)文件的命令規(guī)則為:partition 全局的第一個(gè) segment 從0開(kāi)始,后續(xù)每個(gè) segment 文件名為上一個(gè) segment 文件最后一條消息的 offset 值耍休。

需要注意的是:

  1. 消息在Kafka中的保存時(shí)間是有有效期的刃永,一旦超過(guò)有效期即被丟棄;
  2. partition中的數(shù)據(jù)是有序的羹应,不同partition間的數(shù)據(jù)丟失了數(shù)據(jù)的順序揽碘。如果topic有多個(gè)partition,消費(fèi)數(shù)據(jù)時(shí)就不能保證數(shù)據(jù)的順序园匹。在需要嚴(yán)格保證消息的消費(fèi)順序的場(chǎng)景下雳刺,需要將partition數(shù)目設(shè)為1。

2.2 分區(qū)備份和負(fù)載均衡

為保證容錯(cuò)性和 HA裸违,Kafka 集群會(huì)將某個(gè)topic下的某partition 同時(shí)備份在多個(gè) broker中掖桦。至于備份多少份、備份在哪些 broker上是可以配置的供汛。


分區(qū)備份

在一個(gè)kafka集群中枪汪,每個(gè)broker 通常會(huì)扮演兩個(gè)角色:

  1. 在一個(gè) partition 中扮演 leader(broker 的 leader 選舉由 Zookeeper 幫助完成);
  2. 在其它的 partition 中扮演 followers怔昨。

Leader是最繁忙的雀久,要處理讀寫請(qǐng)求。這樣將leader均分到不同的broker上趁舀,目的自然是要確保負(fù)載均衡赖捌。


三、Mac 上安裝配置 Kafka 偽集群

Kafka 集群的概念是指由多臺(tái)機(jī)器組成的集群中每臺(tái)機(jī)器上各運(yùn)行著一個(gè)Kafka-server 進(jìn)程(即broker 進(jìn)程)矮烹,偽集群即指在一臺(tái)機(jī)器上運(yùn)行著多個(gè)Kafka-server 進(jìn)程越庇,是對(duì)集群的一種模擬。

3.1 搭建 Kafka 偽集群

1. 首先奉狈,確保系統(tǒng)安裝了JDK卤唉、Scala 以及 Zookeeper

  • Kafka 集群運(yùn)作依賴于Zookeeper;
  • Kafka 使用 Scala編寫仁期。

2. 其次桑驱,下載解壓 kafka 安裝包

  • Kafka 官網(wǎng)下載對(duì)應(yīng)版本的Kafka安裝包,比如筆者下載的是 kafka_2.11-2.4.0跛蛋,其中2.11 是scala版本碰纬;
  • 解壓安裝包:
tar -zxvf ~/Downloads/kafka_2.11-2.4.0 -C ~/software-package-install/kafka_install/

解壓縮后可以看到如下目錄:

-rw-r--r--@   1 ycaha  1699762527  32216 Dec 10 00:46 LICENSE
-rw-r--r--@   1 ycaha  1699762527    337 Dec 10 00:46 NOTICE
drwxr-xr-x@  35 ycaha  1699762527   1120 Jan 14 11:35 bin/
drwxr-xr-x@  19 ycaha  1699762527    608 Jan 14 13:31 config/
drwxr-xr-x@ 104 ycaha  1699762527   3328 Dec 10 00:51 libs/
drwxr-xr-x   35 ycaha  1699762527   1120 Jan 14 13:08 logs/
drwxr-xr-x@   3 ycaha  1699762527     96 Dec 10 00:51 site-docs/

其中:

  • config 目錄:存放各種配置文件;
  • bin 目錄:存放各種命令文件问芬,比如啟動(dòng)停止集群悦析、創(chuàng)建查看 topic 等。

3. 最后此衅,添加修改配置項(xiàng)
要搭建集群模式强戴,有兩個(gè)配置項(xiàng) broker.id 和 listeners 是必須要進(jìn)行修改的:

  • broker.id: kafka-server 的 id亭螟,每個(gè) kafka-server 進(jìn)程對(duì)應(yīng)的 id 都是唯一的;
  • listener:監(jiān)聽(tīng)骑歹,PLAINTEXT://ip:port预烙。

如果是偽集群(單機(jī))模式,由于 ip 是一致的道媚,因此需要用不同 port 來(lái)區(qū)分不同的 kafka-server 進(jìn)程扁掸。此外還有一個(gè)配置項(xiàng) log.dirs 表示服務(wù)器的 log 文件的存放路徑,由于是單機(jī)最域,因此需要為不同的 kafka-server 進(jìn)程設(shè)置不同的 log.dirs谴分。

${kafka_home}/config/server.properties 文件的配置項(xiàng):

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
# The address the socket server listens on. 
listeners=PLAINTEXT://localhost:9092
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

復(fù)制 server.properties 為 server-1.properties 和 server-2.properties:

cp  server.properties  server-1.properties 
cp  server.properties  server-2.properties 

${kafka_home}/config/server-1.properties 文件的配置項(xiàng):

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# The address the socket server listens on. 
listeners=PLAINTEXT://localhost:9093
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs1

${kafka_home}/config/server-2.properties 文件的配置項(xiàng):

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# The address the socket server listens on. 
listeners=PLAINTEXT://localhost:9094
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs2

至此 偽集群搭建完成,接下來(lái)可以啟動(dòng)3個(gè)(因?yàn)?個(gè)不同的配置文件)不同的 kafka-server 進(jìn)程镀脂。


四牺蹄、常見(jiàn)的 Kafka操作(shell和java API)

4.1 Kafka shell 操作

Kafka 提供了若干個(gè)腳本來(lái)完成集群的啟動(dòng)關(guān)閉、topic 的創(chuàng)建等薄翅,這些腳本就在目錄 ${kafka_home}/bin 中沙兰。

1. 啟動(dòng)關(guān)閉 Kafka 集群
啟動(dòng) Kafka 之前必須先啟動(dòng) Zookeeper 集群。

# 分別啟動(dòng)3個(gè) kafka-server 進(jìn)程
cd ${kafka_home}/bin
sh kafka-server-start.sh ../config/server.properties &
sh kafka-server-start.sh ../config/server-1.properties &
sh kafka-server-start.sh ../config/server-2.properties &

# 查看啟動(dòng)的3個(gè) kafka-server 進(jìn)程
ps -ef | grep kafka

# 關(guān)閉 kafka-server 進(jìn)程
sh kafka-server-stop.sh 

2. 創(chuàng)建查看 topic

# 創(chuàng)建名為  test05 的 topic
sh kafka-topics.sh --create --zookeeper localhost:2181  --replication-factor 1 --partitions 1 --topic test05
# 列出所有的 topic
sh kafka-topics.sh --list --zookeeper localhost:2181
# 獲取所有 topic 的描述
sh kafka-topics.sh --describe --zookeeper localhost:2181
# 獲取 topic test06 的描述
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic test06
# 刪除 topic test05
sh kafka-topics.sh --zookeeper localhost:2181 --delete --topic test05

更多的命令解釋可以通過(guò)在 shell 中使用--help命令來(lái)獲取翘魄,比如關(guān)于 topic 方面的:

sh kafka-topics.sh --help


五鼎天、測(cè)試 producer 產(chǎn)生事件 & consumer 消費(fèi)事件

1. 首先打開(kāi)一個(gè)終端shell,啟動(dòng) producer console

sh kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test1

進(jìn)入 producer 控制臺(tái)暑竟,push 事件到 broker 的 topic test1 中:

>hello world
>this is kafka
>test1

2. 然后新開(kāi)一個(gè)終端shell训措,啟動(dòng) consumer console

sh kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --from-beginning --topic test1

進(jìn)入 consumer 控制臺(tái),發(fā)現(xiàn) consumer 自動(dòng)從 broker 的 topic test1 中pull 了事件:

hello world
this is kafka
test1


六光羞、Java API 操作 Kafka

  • 模擬 Kafka producer 發(fā)布產(chǎn)生事件和 consumer 訂閱消費(fèi)事件;
  • 自定義Partitioner怀大;

maven pom.xml

<properties>
  <kafka.version>2.4.0</kafka.version>
</properties> 
<dependency>
  <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.11</artifactId>
   <version>${kafka.version}</version>
</dependency>

import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class MyKafkaPartitioner extends AbstractPartitionAssignor {

    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {
        Map<String, List<String>> consumersPerTopic =
                consumersPerTopic(subscriptions);
        Map<String, List<TopicPartition>> assignment = new HashMap<>();
        for (String memberId : subscriptions.keySet()) {
            assignment.put(memberId, new ArrayList<>());
        }

        // 針對(duì)每一個(gè)topic進(jìn)行分區(qū)分配
        for (Map.Entry<String, List<String>> topicEntry :
                consumersPerTopic.entrySet()) {
            String topic = topicEntry.getKey();
            List<String> consumersForTopic = topicEntry.getValue();
            int consumerSize = consumersForTopic.size();

            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if (numPartitionsForTopic == null) {
                continue;
            }

            // 當(dāng)前topic下的所有分區(qū)
            List<TopicPartition> partitions =
                    AbstractPartitionAssignor.partitions(topic,
                            numPartitionsForTopic);
            // 將每個(gè)分區(qū)隨機(jī)分配給一個(gè)消費(fèi)者
            for (TopicPartition partition : partitions) {
                int rand = new Random().nextInt(consumerSize);
                String randomConsumer = consumersForTopic.get(rand);
                assignment.get(randomConsumer).add(partition);
            }
        }
        return assignment;
//        return null;
    }

    // 獲取每個(gè)topic所對(duì)應(yīng)的消費(fèi)者列表纱兑,即:[topic, List[consumer]]
    private Map<String, List<String>> consumersPerTopic(
            Map<String, Subscription> consumerMetadata) {
        Map<String, List<String>> res = new HashMap<>();
        for (Map.Entry<String, Subscription> subscriptionEntry :
                consumerMetadata.entrySet()) {
            String consumerId = subscriptionEntry.getKey();
            for (String topic : subscriptionEntry.getValue().topics())
                put(res, topic, consumerId);
        }
        return res;
    }

    @Override
    public String name() {
//        return null;
        return "MyKafkaPartitioner";
    }
}




import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;

public class Main {
    public static void main(String[] args) {
        initProducer();
        initConsumer();

    }

    private static void initProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 2);
        props.put("linger.ms", 1);
//        props.put("partitioner.class", "com.example.demo.MyPartitioner");
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"com.saicmotor.kafka.MyKafkaPartitioner");
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("powerTopic", Integer.toString(i), Integer.toString(i)));
        }

        producer.close();
    }


    private static void initConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        @SuppressWarnings("resource")
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("powerTopic", "topic1"), new ConsumerRebalanceListener() {
            // kafka在有新消費(fèi)者加入或者撤出時(shí),會(huì)觸發(fā)rebalance操作化借,在subscribe訂閱主題的時(shí)候潜慎,我們可以編寫回掉函數(shù),在觸發(fā)rebalance操作之前和之后蓖康,提交相應(yīng)偏移量和獲取偏移量
            // 注意enable.auto.commit為false铐炫,防止自動(dòng)提交偏移量
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            //在rebalance操作之前調(diào)用,用于我們提交消費(fèi)者偏移

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                //onPartitionAssigned會(huì)在rebalance操作之后調(diào)用蒜焊,用于我們接取新的分配區(qū)的偏移量
                Map<TopicPartition,Long> beginningOffset = consumer.beginningOffsets(collection);
                for(Map.Entry<TopicPartition,Long> entry : beginningOffset.entrySet()) {
                    consumer.seekToBeginning(collection);
                }
            }

        });
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("partition:" + record.partition() + ",key:" + record.key() + ",value:" + record.value());
                consumer.commitAsync();
            }
        }
    }
    
}


七倒信、常問(wèn)問(wèn)題

7.1 Kafka 針對(duì) Topic 為什么要進(jìn)行分區(qū)?日志為什么要分 segment泳梆?

https://www.zhihu.com/question/28925721

7.2 Kafka 可靠性如何保證鳖悠?

replication 副本榜掌;


參考:
https://cloud.tencent.com/developer/article/1446017

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市乘综,隨后出現(xiàn)的幾起案子憎账,更是在濱河造成了極大的恐慌,老刑警劉巖卡辰,帶你破解...
    沈念sama閱讀 211,639評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件胞皱,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡九妈,警方通過(guò)查閱死者的電腦和手機(jī)反砌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)允蚣,“玉大人于颖,你說(shuō)我怎么就攤上這事∪峦茫” “怎么了森渐?”我有些...
    開(kāi)封第一講書人閱讀 157,221評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)冒晰。 經(jīng)常有香客問(wèn)我同衣,道長(zhǎng),這世上最難降的妖魔是什么壶运? 我笑而不...
    開(kāi)封第一講書人閱讀 56,474評(píng)論 1 283
  • 正文 為了忘掉前任耐齐,我火速辦了婚禮,結(jié)果婚禮上蒋情,老公的妹妹穿的比我還像新娘埠况。我一直安慰自己,他們只是感情好棵癣,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布辕翰。 她就那樣靜靜地躺著,像睡著了一般狈谊。 火紅的嫁衣襯著肌膚如雪喜命。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 49,816評(píng)論 1 290
  • 那天河劝,我揣著相機(jī)與錄音壁榕,去河邊找鬼。 笑死赎瞎,一個(gè)胖子當(dāng)著我的面吹牛牌里,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播务甥,決...
    沈念sama閱讀 38,957評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼二庵,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼贪染!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起催享,我...
    開(kāi)封第一講書人閱讀 37,718評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤杭隙,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后因妙,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體痰憎,經(jīng)...
    沈念sama閱讀 44,176評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評(píng)論 2 327
  • 正文 我和宋清朗相戀三年攀涵,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了铣耘。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,646評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡以故,死狀恐怖蜗细,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情怒详,我是刑警寧澤炉媒,帶...
    沈念sama閱讀 34,322評(píng)論 4 330
  • 正文 年R本政府宣布,位于F島的核電站昆烁,受9級(jí)特大地震影響吊骤,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜静尼,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評(píng)論 3 313
  • 文/蒙蒙 一白粉、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧鼠渺,春花似錦鸭巴、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,755評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至掌敬,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間池磁,已是汗流浹背奔害。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,987評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留地熄,地道東北人华临。 一個(gè)月前我還...
    沈念sama閱讀 46,358評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像端考,于是被迫代替她去往敵國(guó)和親雅潭。 傳聞我的和親對(duì)象是個(gè)殘疾皇子揭厚,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容