DataX擴(kuò)展hudiwriter組件

前言

最近公司啟動(dòng)了一個(gè)規(guī)劃2年的項(xiàng)目,是做一個(gè)數(shù)據(jù)平臺(tái)槽畔。主要包括數(shù)據(jù)同步(實(shí)時(shí)/離線)栈妆、mapping(實(shí)時(shí)/離線)、數(shù)倉(cāng)(實(shí)時(shí)/離線)、源數(shù)據(jù)管理签钩、數(shù)據(jù)血緣掏呼、調(diào)度、BI等铅檩。架構(gòu)分層自上而下為上層業(yè)務(wù)憎夷、中臺(tái)服務(wù)、底層提供基礎(chǔ)能力昧旨。

項(xiàng)目規(guī)劃的比較大拾给,萬(wàn)丈高樓平地起,只能從源頭開始著手兔沃,源頭當(dāng)然是數(shù)據(jù)接入這塊了蒋得。數(shù)據(jù)同步分為實(shí)時(shí)同步和批量同步,批量同步分為全量乒疏、增量和增量更新额衙,目前官網(wǎng)的datax已經(jīng)支持全量和增量的同步了,但是沒(méi)有支持增量的更新怕吴,所以筆者打算擴(kuò)展datax窍侧,支持hudiwriter來(lái)完善datax支持更新的業(yè)務(wù)場(chǎng)景。

Datax架構(gòu)圖

架構(gòu)圖

datax采用Framework + plugin架構(gòu)構(gòu)建转绷。其中Framework處理了緩沖伟件,限流,并發(fā)议经,上下文加載等技術(shù)問(wèn)題斧账,數(shù)據(jù)源讀取和寫入抽象成為Reader/Writer接口,如果內(nèi)置的plugin無(wú)法滿足我們的場(chǎng)景煞肾,開發(fā)者可以自己編寫plugin定制功能咧织。

業(yè)務(wù)上plugin分為reader和writer:

  • reader為數(shù)據(jù)采集模塊,負(fù)責(zé)采集數(shù)據(jù)源的數(shù)據(jù)籍救,將數(shù)據(jù)發(fā)送給Framework拯爽。
  • writer為數(shù)據(jù)寫入模塊,負(fù)責(zé)不斷向Framework取數(shù)據(jù)钧忽,并將數(shù)據(jù)寫入到目的端毯炮。

功能上plugin分為job和task:

  • Job是DataX用以描述從一個(gè)源頭到一個(gè)目的端的同步作業(yè),是DataX數(shù)據(jù)同步的最小業(yè)務(wù)單元耸黑。比如:從一張mysql的表同步到odps的一個(gè)表的特定分區(qū)桃煎。
  • Task是為最大化而把Job拆分得到的最小執(zhí)行單元。比如:讀一張有1024個(gè)分表的mysql分庫(kù)分表的Job大刊,拆分成1024個(gè)讀Task为迈,用若干個(gè)并發(fā)執(zhí)行。

項(xiàng)目結(jié)構(gòu)

hudiwriter
  • HudiWriter:主流程
  • Key、HudiWriterErrorCode業(yè)務(wù)邏輯需要的類葫辐,非必須
  • package.xml:全局的package,添加插件的打包內(nèi)容
  • plugin.json:對(duì)插件本身的描述搜锰,重點(diǎn)是name和class,name表示插件名稱耿战,class表示插件的入口類蛋叼,必須準(zhǔn)確無(wú)誤;
  • plugin_job_template.json:插件的示例配置文件剂陡。

代碼

github : https://github.com/dongpengfei2/DataX/tree/evyd-1.0.0

HudiWriter.java

package com.alibaba.datax.plugin.writer.hudiwriter;

import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static com.alibaba.datax.plugin.writer.hudiwriter.HudiWriterErrorCode.HUDI_ERROR_TABLE;
import static com.alibaba.datax.plugin.writer.hudiwriter.HudiWriterErrorCode.HUDI_PARAM_LOST;
import static com.alibaba.datax.plugin.writer.hudiwriter.Key.*;

/**
 * Created by david.dong on 22-8-21.
 */
public class HudiWriter extends Writer {
    public static class Job extends Writer.Job {

        private static final Logger LOG = LoggerFactory.getLogger(Job.class);

        private Configuration originalConfig;

        @Override
        public void init() {
            this.originalConfig = super.getPluginJobConf();
        }

        @Override
        public void prepare() {

        }

        @Override
        public void post() {

        }

        @Override
        public void destroy() {

        }

        @Override
        public List<Configuration> split(int mandatoryNumber) {
            List<Configuration> list = new ArrayList<>();
            for (int i = 0; i < mandatoryNumber; i++) {
                list.add(originalConfig.clone());
            }
            return list;
        }

    }

    public static class Task extends Writer.Task {
        private static final Logger LOG = LoggerFactory.getLogger(Task.class);
        private String primaryKey;
        private String partitionFields;
        private String writeOption;
        private int batchSize;
        private Configuration sliceConfig;
        private List<Configuration> columnsList;

        private List<String> partitionList;

        Schema avroSchema;

        private HoodieJavaWriteClient<HoodieAvroPayload> client;

        @Override
        public void init() {
            //獲取與本task相關(guān)的配置
            this.sliceConfig = super.getPluginJobConf();
            String tableName = sliceConfig.getNecessaryValue(Key.HUDI_TABLE_NAME, HUDI_ERROR_TABLE);
            String tablePath = sliceConfig.getNecessaryValue(Key.HUDI_TABLE_PATH, HUDI_PARAM_LOST);
            String tableType = sliceConfig.getNecessaryValue(Key.HUDI_TABLE_TYPE, HUDI_PARAM_LOST);
            primaryKey = sliceConfig.getNecessaryValue(Key.HUDI_PRIMARY_KEY, HUDI_PARAM_LOST);
            partitionFields = sliceConfig.getString(Key.HUDI_PARTITION_FIELDS);
            writeOption = sliceConfig.getNecessaryValue(Key.HUDI_WRITE_OPTION, HUDI_PARAM_LOST);
            columnsList = sliceConfig.getListConfiguration(Key.HUDI_COLUMN);
            batchSize = sliceConfig.getInt(HUDI_BATCH_SIZE);

            partitionList = StringUtils.isEmpty(partitionFields) ? new ArrayList<>() : Arrays.asList(partitionFields.split(","));

            org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
            try {
                //是否有Kerberos認(rèn)證
                Boolean haveKerberos = sliceConfig.getBool(HAVE_KERBEROS, false);
                if(haveKerberos){
                    String kerberosKeytabFilePath = sliceConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
                    String kerberosPrincipal = sliceConfig.getString(Key.KERBEROS_PRINCIPAL);
                    hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
                    this.kerberosAuthentication(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
                }
                //初始化HDFS
                Path path = new Path(tablePath);
                FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
                if (!fs.exists(path)) {
                    HoodieTableMetaClient.withPropertyBuilder()
                        .setTableType(HUDI_WRITE_TYPE_MOR.equals(tableType) ? HoodieTableType.MERGE_ON_READ : HoodieTableType.COPY_ON_WRITE)
                        .setTableName(tableName)
                        .setPayloadClassName(HoodieAvroPayload.class.getName())
                        .initTable(hadoopConf, tablePath);
                }
            } catch (IOException e) {
                LOG.error(ExceptionUtils.getStackTrace(e));
            }
            JSONArray fields = new JSONArray();
            for (Configuration columnConfig : columnsList) {
                JSONObject confObject = new JSONObject();
                confObject.put("name", columnConfig.getString("name"));
                String configType = columnConfig.getString("type");
                confObject.put("type", "date".equals(configType) || "datetime".equals(configType) ? "string" : configType);
                fields.add(confObject);
            }

            JSONObject schemaObject = new JSONObject();
            schemaObject.put("type", "record");
            schemaObject.put("name", "triprec");
            schemaObject.put("fields", fields);
            String schemaStr = schemaObject.toJSONString();

            avroSchema = new Schema.Parser().parse(schemaStr);

            // Create the write client to write some records in
            HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
                .withSchema(schemaStr).withParallelism(2, 2)
                .withDeleteParallelism(2).forTable(tableName)
                .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
                .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
            client =
                new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg);
        }

        @Override
        public void prepare() {

        }

        @Override
        public void startWrite(RecordReceiver recordReceiver) {
            Record record;
            DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
            DateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            AtomicLong counter = new AtomicLong(0);
            List<HoodieRecord<HoodieAvroPayload>> writeRecords = new ArrayList<>();
            while ((record = recordReceiver.getFromReader()) != null) {
                GenericRecord row = new GenericData.Record(avroSchema);
                for (int i=0; i<columnsList.size(); i++) {
                    Configuration configuration = columnsList.get(i);
                    String columnName = configuration.getString("name");
                    String columnType = configuration.getString("type");
                    Column column = record.getColumn(i);
                    Object rawData = column.getRawData();
                    if (rawData == null) {
                        row.put(columnName, null);
                        continue;
                    }
                    switch (columnType) {
                        case "int":
                            row.put(columnName, Integer.parseInt(rawData.toString()));
                            break;
                        case "float":
                            row.put(columnName, Float.parseFloat(rawData.toString()));
                            break;
                        case "double":
                            row.put(columnName, Double.parseDouble(rawData.toString()));
                            break;
                        case "date":
                            row.put(columnName, dateFormat.format(rawData));
                            break;
                        case "datetime":
                            row.put(columnName, dateTimeFormat.format(rawData));
                            break;
                        case "string":
                        default:
                            row.put(columnName, rawData.toString());
                    }
                }
                String partitionPath = "";
                if (!partitionList.isEmpty()) {
                    List<Object> values = partitionList.stream().map(row::get).collect(Collectors.toList());
                    partitionPath = StringUtils.join(values, "/");
                }
                HoodieKey key = new HoodieKey(row.get(primaryKey).toString(), partitionPath);
                HoodieRecord<HoodieAvroPayload> hoodieAvroPayload = new HoodieRecord<>(key, new HoodieAvroPayload(Option.of(row)));
                writeRecords.add(hoodieAvroPayload);
                long num = counter.incrementAndGet();

                if (num >= batchSize) {
                    flushCache(writeRecords);
                    writeRecords.clear();
                    counter.set(0L);
                }
            }
            if (!writeRecords.isEmpty()) {
                flushCache(writeRecords);
            }
        }

        private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf){
            if(StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)){
                UserGroupInformation.setConfiguration(hadoopConf);
                try {
                    UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
                } catch (Exception e) {
                    String message = String.format("kerberos認(rèn)證失敗,請(qǐng)確定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填寫正確",
                            kerberosKeytabFilePath, kerberosPrincipal);
                    LOG.error(message);
                    throw DataXException.asDataXException(HudiWriterErrorCode.KERBEROS_LOGIN_ERROR, e);
                }
            }
        }

        private void flushCache(List<HoodieRecord<HoodieAvroPayload>> writeRecords) {
            String commitTime = client.startCommit();
            LOG.info("Starting commit " + commitTime);
            switch (writeOption) {
                case HUDI_WRITE_OPTION_INSERT:
                    client.insert(writeRecords, commitTime);
                    break;
                case HUDI_WRITE_OPTION_BULK_INSERT:
                    client.bulkInsert(writeRecords, commitTime);
                    break;
                case HUDI_WRITE_OPTION_UPSERT:
                    client.upsert(writeRecords, commitTime);
                    break;
            }
        }

        @Override
        public void post() {

        }

        @Override
        public void destroy() {
            if (client!=null) {
                client.close();
            }
        }
    }
}

Key.java

package com.alibaba.datax.plugin.writer.hudiwriter;

public class Key {
    public static final String HUDI_TABLE_NAME = "tableName";
    public static final String HUDI_TABLE_PATH = "tablePath";
    public static final String HUDI_PRIMARY_KEY = "primaryKey";
    public static final String HUDI_PARTITION_FIELDS = "partitionFields";
    public static final String HUDI_TABLE_TYPE = "tableType";
    public static final String HUDI_BATCH_SIZE = "batchSize";
    public static final String HUDI_WRITE_OPTION = "writeOption";
    public static final String HUDI_COLUMN = "column";

    public static final String HUDI_WRITE_OPTION_INSERT = "insert";
    public static final String HUDI_WRITE_OPTION_BULK_INSERT = "bulk_insert";
    public static final String HUDI_WRITE_OPTION_UPSERT = "upsert";

    public static final String HUDI_WRITE_TYPE_COW = "cow";
    public static final String HUDI_WRITE_TYPE_MOR = "mor";

    // Kerberos
    public static final String HAVE_KERBEROS = "haveKerberos";
    public static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath";
    public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";

    public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
}

HudiWriterErrorCode.java

package com.alibaba.datax.plugin.writer.hudiwriter;

import com.alibaba.datax.common.spi.ErrorCode;

public enum HudiWriterErrorCode implements ErrorCode {

    HUDI_ERROR_TABLE("Hudi Error Table", "您的參數(shù)配置錯(cuò)誤."),
    HUDI_PARAM_LOST("Hudi Param Lost", "您缺失了必須填寫的參數(shù)值."),
    HDFS_CONNECT_ERROR("Hdfs Connect Error", "與HDFS建立連接時(shí)出現(xiàn)IO異常."),
    KERBEROS_LOGIN_ERROR("Hdfs Login Error", "KERBEROS認(rèn)證失敗");

    private final String code;
    private final String description;

    HudiWriterErrorCode(String code, String description) {
        this.code = code;
        this.description = description;
    }

    @Override
    public String getCode() {
        return this.code;
    }

    @Override
    public String getDescription() {
        return this.description;
    }

    @Override
    public String toString() {
        return String.format("Code:[%s], Description:[%s].", this.code,
                this.description);
    }
}

plugin.json

{
    "name": "hudiwriter",
    "class": "com.alibaba.datax.plugin.writer.hudiwriter.HudiWriter",
    "description": "useScene: test. mechanism: use datax framework to transport data to hudi. warn: The more you know about the data, the less problems you encounter.",
    "developer": "alibaba"
}

plugin_job_template.json

{
    "name": "hudiwriter",
    "parameter": {
        "tableName": "",
        "tablePath": "",
        "tableType": "",
        "writeOption": "",
        "primaryKey": "",
        "partitionFields": "",
        "batchSize": "",
        "column": []
    }
}

測(cè)試

wlapp_user_mysql_to_hudi.json

{
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "name",
                            "age",
                            "dt",
                            "score",
                            "create_at",
                            "update_at"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://127.0.0.1:3306/wlapp?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
                                ],
                                "table": [
                                    "user"
                                ]
                            }
                        ],
                        "password": "123456",
                        "username": "root",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "hudiwriter",
                    "parameter": {
                        "tableName": "user",
                        "tablePath": "hdfs://localhost:9000/user/hive/warehouse/wlapp.db/user",
                        "tableType": "mor",
                        "writeOption": "upsert",
                        "primaryKey": "id",
                        "partitionFields": "dt",
                        "batchSize": 100,
                        "column": [
                            {
                                "name": "id",
                                "type": "int"
                            },
                            {
                                "name": "name",
                                "type": "string"
                            },
                            {
                                "name": "age",
                                "type": "int"
                            },
                            {
                                "name": "dt",
                                "type": "date"
                            },
                            {
                                "name": "score",
                                "type": "double"
                            },
                            {
                                "name": "create_at",
                                "type": "datetime"
                            },
                            {
                                "name": "update_at",
                                "type": "datetime"
                            }
                        ]
                    }
                }
            }
        ]
    }
}

啟動(dòng)命令

python bin/datax.py job/wlapp_user_mysql_to_hudi.json;

帶參數(shù)啟動(dòng)命令

#會(huì)替換掉wlapp_user_mysql_to_hudi.json中的 ${date} 變量
python bin/datax.py -p"-Ddate='2022-08-25 00:00:00'" job/wlapp_user_mysql_to_hudi.json;

結(jié)

目前測(cè)試過(guò)程中運(yùn)行正常狈涮,線上使用有問(wèn)題會(huì)持續(xù)更新

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市鸭栖,隨后出現(xiàn)的幾起案子歌馍,更是在濱河造成了極大的恐慌,老刑警劉巖晕鹊,帶你破解...
    沈念sama閱讀 218,941評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件松却,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡溅话,警方通過(guò)查閱死者的電腦和手機(jī)玻褪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)公荧,“玉大人,你說(shuō)我怎么就攤上這事同规⊙” “怎么了?”我有些...
    開封第一講書人閱讀 165,345評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵券勺,是天一觀的道長(zhǎng)绪钥。 經(jīng)常有香客問(wèn)我,道長(zhǎng)关炼,這世上最難降的妖魔是什么程腹? 我笑而不...
    開封第一講書人閱讀 58,851評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮儒拂,結(jié)果婚禮上寸潦,老公的妹妹穿的比我還像新娘。我一直安慰自己社痛,他們只是感情好见转,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著蒜哀,像睡著了一般斩箫。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,688評(píng)論 1 305
  • 那天乘客,我揣著相機(jī)與錄音狐血,去河邊找鬼。 笑死易核,一個(gè)胖子當(dāng)著我的面吹牛匈织,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播耸成,決...
    沈念sama閱讀 40,414評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼报亩,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了井氢?” 一聲冷哼從身側(cè)響起弦追,我...
    開封第一講書人閱讀 39,319評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎花竞,沒(méi)想到半個(gè)月后劲件,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,775評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡约急,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年牵辣,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了纬向。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,096評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出汤徽,到底是詐尸還是另有隱情谒府,我是刑警寧澤完疫,帶...
    沈念sama閱讀 35,789評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站余舶,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏挟憔。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦曲伊、人聲如沸坟募。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)屿储。三九已至够掠,卻和暖如春赊堪,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背群叶。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工埠通, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留梁剔,地道東北人荣病。 一個(gè)月前我還...
    沈念sama閱讀 48,308評(píng)論 3 372
  • 正文 我出身青樓朵栖,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親偿渡。 傳聞我的和親對(duì)象是個(gè)殘疾皇子卸察,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容