數據集的劃分

數據集的劃分

因為sqoop是將數據的遷移任務轉化為相應的Haoop任務的绒怨,Hadoop任務是數據集劃分的,即每個map任務的數據集不一樣痢畜。
那么在sqoop中是如何劃分數據集的呢?
這個由以下的類實現。

/**
 * This allows connector to define how input data from the FROM source can be partitioned.
 * The number of data partitions also determines the degree of parallelism.
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class Partitioner<LinkConfiguration, FromJobConfiguration> {

  /**
   * Partition input data into partitions.
   *
   * Each partition will be then processed in separate extractor.
   *
   * @param context Partitioner context object
   * @param linkConfiguration link configuration object
   * @param jobConfiguration job configuration object
   * @return
   */
  public abstract List<Partition> getPartitions(PartitionerContext context,
      LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration);
}

下面是jdbc的 相關的劃分的函數责蝠。


public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJobConfiguration> {

  private static final BigDecimal NUMERIC_MIN_INCREMENT = new BigDecimal(10000 * Double.MIN_VALUE);


  private long numberPartitions;
  private String partitionColumnName;
  private int partitionColumnType;
  private String partitionMinValue;
  private String partitionMaxValue;
  private Boolean allowNullValueInPartitionColumn;

  @Override
  public List<Partition> getPartitions(PartitionerContext context, LinkConfiguration linkConfig,
      FromJobConfiguration fromJobConfig) {
    List<Partition> partitions = new LinkedList<Partition>();

    numberPartitions = context.getMaxPartitions();
    partitionColumnName = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME);
    partitionColumnType = context.getInt(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNTYPE, -1);
    partitionMinValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MINVALUE);
    partitionMaxValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_MAXVALUE);

    allowNullValueInPartitionColumn = fromJobConfig.fromJobConfig.allowNullValueInPartitionColumn;
    if (allowNullValueInPartitionColumn == null) {
      allowNullValueInPartitionColumn = false;
    }

    if (partitionMinValue == null && partitionMaxValue == null) {
      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(partitionColumnName + " IS NULL");
      partitions.add(partition);
      return partitions;
    }

    if (allowNullValueInPartitionColumn) {
      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(partitionColumnName + " IS NULL");
      partitions.add(partition);
      numberPartitions -= 1;
    }

    switch (partitionColumnType) {
    case Types.TINYINT:
    case Types.SMALLINT:
    case Types.INTEGER:
    case Types.BIGINT:
      // Integer column
      partitions.addAll(partitionIntegerColumn());
      break;

    case Types.REAL:
    case Types.FLOAT:
    case Types.DOUBLE:
      // Floating point column
      partitions.addAll(partitionFloatingPointColumn());
      break;

    case Types.NUMERIC:
    case Types.DECIMAL:
      // Decimal column
      partitions.addAll(partitionNumericColumn());
      break;

    case Types.BIT:
    case Types.BOOLEAN:
      // Boolean column
      return partitionBooleanColumn();

    case Types.DATE:
    case Types.TIME:
    case Types.TIMESTAMP:
      // Date time column
      partitions.addAll(partitionDateTimeColumn());
      break;

    case Types.CHAR:
    case Types.VARCHAR:
    case Types.LONGVARCHAR:
      // Text column
      partitions.addAll(partitionTextColumn());
      break;

    default:
      throw new SqoopException(
          GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0011,
          String.valueOf(partitionColumnType));
    }

    return partitions;
  }

  protected List<Partition> partitionDateTimeColumn() {
    List<Partition> partitions = new LinkedList<Partition>();

    long minDateValue = 0;
    long maxDateValue = 0;
    SimpleDateFormat sdf = null;
    switch(partitionColumnType) {
      case Types.DATE:
        sdf = new SimpleDateFormat("yyyy-MM-dd");
        minDateValue = Date.valueOf(partitionMinValue).getTime();
        maxDateValue = Date.valueOf(partitionMaxValue).getTime();
        break;
      case Types.TIME:
        sdf = new SimpleDateFormat("HH:mm:ss");
        minDateValue = Time.valueOf(partitionMinValue).getTime();
        maxDateValue = Time.valueOf(partitionMaxValue).getTime();
        break;
      case Types.TIMESTAMP:
        sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        minDateValue = Timestamp.valueOf(partitionMinValue).getTime();
        maxDateValue = Timestamp.valueOf(partitionMaxValue).getTime();
        break;
    }


    minDateValue += TimeZone.getDefault().getOffset(minDateValue);
    maxDateValue += TimeZone.getDefault().getOffset(maxDateValue);

    sdf.setTimeZone(TimeZone.getTimeZone("GMT"));

    long interval =  (maxDateValue - minDateValue) / numberPartitions;
    long remainder = (maxDateValue - minDateValue) % numberPartitions;

    if (interval == 0) {
      numberPartitions = (int)remainder;
    }

    long lowerBound;
    long upperBound = minDateValue;

    Object objLB = null;
    Object objUB = null;

    for (int i = 1; i < numberPartitions; i++) {
      lowerBound = upperBound;
      upperBound = lowerBound + interval;
      upperBound += (i <= remainder) ? 1 : 0;

      switch(partitionColumnType) {
        case Types.DATE:
          objLB = new Date(lowerBound);
          objUB = new Date(upperBound);
          break;
        case Types.TIME:
          objLB = new Time(lowerBound);
          objUB = new Time(upperBound);

          break;
        case Types.TIMESTAMP:
          objLB = new Timestamp(lowerBound);
          objUB = new Timestamp(upperBound);
          break;
      }

      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(
          constructDateConditions(sdf, objLB, objUB, false));
      partitions.add(partition);
    }

    switch(partitionColumnType) {
      case Types.DATE:
        objLB = new Date(upperBound);
        objUB = new Date(maxDateValue);
        break;
      case Types.TIME:
        objLB = new Time(upperBound);
        objUB = new Time(maxDateValue);
        break;
      case Types.TIMESTAMP:
        objLB = new Timestamp(upperBound);
        objUB = new Timestamp(maxDateValue);
        break;
    }


    GenericJdbcPartition partition = new GenericJdbcPartition();
    partition.setConditions(
        constructDateConditions(sdf, objLB, objUB, true));
    partitions.add(partition);
    return partitions;
  }

  protected List<Partition> partitionTextColumn() {
    List<Partition> partitions = new LinkedList<Partition>();

    String minStringValue = null;
    String maxStringValue = null;

    // Remove common prefix if any as it does not affect outcome.
    int maxPrefixLen = Math.min(partitionMinValue.length(),
        partitionMaxValue.length());
    // Calculate common prefix length
    int cpLen = 0;

    for (cpLen = 0; cpLen < maxPrefixLen; cpLen++) {
      char c1 = partitionMinValue.charAt(cpLen);
      char c2 = partitionMaxValue.charAt(cpLen);
      if (c1 != c2) {
        break;
      }
    }

    // The common prefix has length 'sharedLen'. Extract it from both.
    String prefix = partitionMinValue.substring(0, cpLen);
    minStringValue = partitionMinValue.substring(cpLen);
    maxStringValue = partitionMaxValue.substring(cpLen);

    BigDecimal minStringBD = textToBigDecimal(minStringValue);
    BigDecimal maxStringBD = textToBigDecimal(maxStringValue);

    // Having one single value means that we can create only one single split
    if(minStringBD.equals(maxStringBD)) {
      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(constructTextConditions(prefix, 0, 0,
        partitionMinValue, partitionMaxValue, true, true));
      partitions.add(partition);
      return partitions;
    }

    // Get all the split points together.
    List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();

    BigDecimal splitSize = divide(maxStringBD.subtract(minStringBD),
        new BigDecimal(numberPartitions));
    if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
      splitSize = NUMERIC_MIN_INCREMENT;
    }

    BigDecimal curVal = minStringBD;

    int parts = 0;

    while (curVal.compareTo(maxStringBD) <= 0 && parts < numberPartitions) {
      splitPoints.add(curVal);
      curVal = curVal.add(splitSize);
      // bigDecimalToText approximates to next comparison location.
      // Make sure we are still in range
      String text = bigDecimalToText(curVal);
      curVal = textToBigDecimal(text);
      ++parts;
    }

    if (splitPoints.size() == 0
        || splitPoints.get(0).compareTo(minStringBD) != 0) {
      splitPoints.add(0, minStringBD);
    }

    if (splitPoints.get(splitPoints.size() - 1).compareTo(maxStringBD) != 0
        || splitPoints.size() == 1) {
      splitPoints.add(maxStringBD);
    }

    // Turn the split points into a set of string intervals.
    BigDecimal start = splitPoints.get(0);
    for (int i = 1; i < splitPoints.size(); i++) {
      BigDecimal end = splitPoints.get(i);

      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(constructTextConditions(prefix, start, end,
        partitionMinValue, partitionMaxValue, i == 1, i == splitPoints.size() - 1));
      partitions.add(partition);

      start = end;
    }

    return partitions;
  }


  protected List<Partition> partitionIntegerColumn() {
    List<Partition> partitions = new LinkedList<Partition>();

    long minValue = partitionMinValue == null ? Long.MIN_VALUE
      : Long.parseLong(partitionMinValue);
    long maxValue = Long.parseLong(partitionMaxValue);

    long interval =  (maxValue - minValue) / numberPartitions;
    long remainder = (maxValue - minValue) % numberPartitions;

    if (interval == 0) {
      numberPartitions = (int)remainder;
    }

    long lowerBound;
    long upperBound = minValue;
    for (int i = 1; i < numberPartitions; i++) {
      lowerBound = upperBound;
      upperBound = lowerBound + interval;
      upperBound += (i <= remainder) ? 1 : 0;

      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(
          constructConditions(lowerBound, upperBound, false));
      partitions.add(partition);
    }

    GenericJdbcPartition partition = new GenericJdbcPartition();
    partition.setConditions(
        constructConditions(upperBound, maxValue, true));
    partitions.add(partition);

    return partitions;
  }

  protected List<Partition> partitionFloatingPointColumn() {
    List<Partition> partitions = new LinkedList<Partition>();


    double minValue = partitionMinValue == null ? Double.MIN_VALUE
      : Double.parseDouble(partitionMinValue);
    double maxValue = Double.parseDouble(partitionMaxValue);

    double interval =  (maxValue - minValue) / numberPartitions;

    double lowerBound;
    double upperBound = minValue;
    for (int i = 1; i < numberPartitions; i++) {
      lowerBound = upperBound;
      upperBound = lowerBound + interval;

      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(
          constructConditions(lowerBound, upperBound, false));
      partitions.add(partition);
    }

    GenericJdbcPartition partition = new GenericJdbcPartition();
    partition.setConditions(
        constructConditions(upperBound, maxValue, true));
    partitions.add(partition);

    return partitions;
  }

  protected List<Partition> partitionNumericColumn() {
    List<Partition> partitions = new LinkedList<Partition>();
    // Having one end in null is not supported
    if (partitionMinValue == null || partitionMaxValue == null) {
      throw new SqoopException(GenericJdbcConnectorError.GENERIC_JDBC_CONNECTOR_0015);
    }

    BigDecimal minValue = new BigDecimal(partitionMinValue);
    BigDecimal maxValue = new BigDecimal(partitionMaxValue);

    // Having one single value means that we can create only one single split
    if(minValue.equals(maxValue)) {
      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(constructConditions(minValue));
      partitions.add(partition);
      return partitions;
    }

    // Get all the split points together.
    List<BigDecimal> splitPoints = new LinkedList<BigDecimal>();

    BigDecimal splitSize = divide(maxValue.subtract(minValue), new BigDecimal(numberPartitions));

    if (splitSize.compareTo(NUMERIC_MIN_INCREMENT) < 0) {
      splitSize = NUMERIC_MIN_INCREMENT;
    }

    BigDecimal curVal = minValue;

    while (curVal.compareTo(maxValue) <= 0) {
      splitPoints.add(curVal);
      curVal = curVal.add(splitSize);
    }

    if (splitPoints.get(splitPoints.size() - 1).compareTo(maxValue) != 0 || splitPoints.size() == 1) {
      splitPoints.remove(splitPoints.size() - 1);
      splitPoints.add(maxValue);
    }

    // Turn the split points into a set of intervals.
    BigDecimal start = splitPoints.get(0);
    for (int i = 1; i < splitPoints.size(); i++) {
      BigDecimal end = splitPoints.get(i);

      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(constructConditions(start, end, i == splitPoints.size() - 1));
      partitions.add(partition);

      start = end;
    }

    return partitions;
  }

  protected  List<Partition> partitionBooleanColumn() {
    List<Partition> partitions = new LinkedList<Partition>();


    Boolean minValue = parseBooleanValue(partitionMinValue);
    Boolean maxValue = parseBooleanValue(partitionMaxValue);

    StringBuilder conditions = new StringBuilder();

    // Having one single value means that we can create only one single split
    if(minValue.equals(maxValue)) {
      GenericJdbcPartition partition = new GenericJdbcPartition();

      conditions.append(partitionColumnName).append(" = ")
          .append(maxValue);
      partition.setConditions(conditions.toString());
      partitions.add(partition);
      return partitions;
    }

    GenericJdbcPartition partition = new GenericJdbcPartition();

    if (partitionMinValue == null) {
      conditions = new StringBuilder();
      conditions.append(partitionColumnName).append(" IS NULL");
      partition.setConditions(conditions.toString());
      partitions.add(partition);
    }
    partition = new GenericJdbcPartition();
    conditions = new StringBuilder();
    conditions.append(partitionColumnName).append(" = TRUE");
    partition.setConditions(conditions.toString());
    partitions.add(partition);
    partition = new GenericJdbcPartition();
    conditions = new StringBuilder();
    conditions.append(partitionColumnName).append(" = FALSE");
    partition.setConditions(conditions.toString());
    partitions.add(partition);
    return partitions;
  }

  private Boolean parseBooleanValue(String value) {
    if (value == null) {
      return null;
    }
    if (value.equals("1")) {
      return Boolean.TRUE;
    } else if (value.equals("0")) {
      return Boolean.FALSE;
    } else {
      return Boolean.parseBoolean(value);
    }
  }

  protected BigDecimal divide(BigDecimal numerator, BigDecimal denominator) {
    try {
      return numerator.divide(denominator);
    } catch (ArithmeticException ae) {
      return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
    }
  }

  protected String constructConditions(
      Object lowerBound, Object upperBound, boolean lastOne) {
    StringBuilder conditions = new StringBuilder();
    conditions.append(lowerBound);
    conditions.append(" <= ");
    conditions.append(partitionColumnName);
    conditions.append(" AND ");
    conditions.append(partitionColumnName);
    conditions.append(lastOne ? " <= " : " < ");
    conditions.append(upperBound);
    return conditions.toString();
  }

  protected String constructConditions(Object value) {
    return new StringBuilder()
      .append(partitionColumnName)
      .append(" = ")
      .append(value)
      .toString()
     ;
  }

  protected String constructDateConditions(SimpleDateFormat sdf,
      Object lowerBound, Object upperBound, boolean lastOne) {
    StringBuilder conditions = new StringBuilder();
    conditions.append('\'').append(sdf.format((java.util.Date)lowerBound)).append('\'');
    conditions.append(" <= ");
    conditions.append(partitionColumnName);
    conditions.append(" AND ");
    conditions.append(partitionColumnName);
    conditions.append(lastOne ? " <= " : " < ");
    conditions.append('\'').append(sdf.format((java.util.Date)upperBound)).append('\'');
    return conditions.toString();
  }

  protected String constructTextConditions(String prefix, Object lowerBound, Object upperBound,
      String lowerStringBound, String upperStringBound, boolean firstOne, boolean lastOne) {
    StringBuilder conditions = new StringBuilder();
    String lbString = prefix + bigDecimalToText((BigDecimal)lowerBound);
    String ubString = prefix + bigDecimalToText((BigDecimal)upperBound);
    conditions.append('\'').append(firstOne ? lowerStringBound : lbString).append('\'');
    conditions.append(" <= ");
    conditions.append(partitionColumnName);
    conditions.append(" AND ");
    conditions.append(partitionColumnName);
    conditions.append(lastOne ? " <= " : " < ");
    conditions.append('\'').append(lastOne ? upperStringBound : ubString).append('\'');
    return conditions.toString();
  }

  /**
   *  Converts a string to a BigDecimal representation in Base 2^21 format.
   *  The maximum Unicode code point value defined is 10FFFF.  Although
   *  not all database system support UTF16 and mostly we expect UCS2
   *  characters only, for completeness, we assume that all the unicode
   *  characters are supported.
   *  Given a string 's' containing characters s_0, s_1,..s_n,
   *  the string is interpreted as the number: 0.s_0 s_1 s_2 s_3 s_48)
   *  This can be split and each split point can be converted back to
   *  a string value for comparison purposes.   The number of characters
   *  is restricted to prevent repeating fractions and rounding errors
   *  towards the higher fraction positions.
   */
  private static final BigDecimal UNITS_BASE = new BigDecimal(0x200000);
  private static final int MAX_CHARS_TO_CONVERT = 4;

  private BigDecimal textToBigDecimal(String str) {
    BigDecimal result = BigDecimal.ZERO;
    BigDecimal divisor = UNITS_BASE;

    int len = Math.min(str.length(), MAX_CHARS_TO_CONVERT);

    for (int n = 0; n < len; ) {
      int codePoint = str.codePointAt(n);
      n += Character.charCount(codePoint);
      BigDecimal val = divide(new BigDecimal(codePoint), divisor);
      result = result.add(val);
      divisor = divisor.multiply(UNITS_BASE);
    }

    return result;
  }

  private String bigDecimalToText(BigDecimal bd) {
    BigDecimal curVal = bd.stripTrailingZeros();
    StringBuilder sb = new StringBuilder();

    for (int n = 0; n < MAX_CHARS_TO_CONVERT; ++n) {
      curVal = curVal.multiply(UNITS_BASE);
      int cp = curVal.intValue();
      if (0 >= cp) {
        break;
      }

      if (!Character.isDefined(cp)) {
        int t_cp = Character.MAX_CODE_POINT < cp ? 1 : cp;
        // We are guaranteed to find at least one character
        while(!Character.isDefined(t_cp)) {
          ++t_cp;
          if (t_cp == cp) {
            break;
          }
          if (t_cp >= Character.MAX_CODE_POINT || t_cp <= 0)  {
            t_cp = 1;
          }
        }
        cp = t_cp;
      }
      curVal = curVal.subtract(new BigDecimal(cp));
      sb.append(Character.toChars(cp));
    }

    return sb.toString();
  }

}

在上面的類中,劃分Partitioner是根據the partition column data type. 上面支持的類別有以下幾種:

TINYINT
SMALLINT
INTEGER
BIGINT
REAL
FLOAT
DOUBLE
NUMERIC
DECIMAL
BIT
BOOLEAN
DATE
TIME
TIMESTAMP
CHAR
VARCHAR
LONGVARCHAR

默認的 partition column是主鍵萎庭。
劃分的方式是:
(upper boundary - lower boundary) / (max partitions)
最后得到每個Partitioner的上下邊界玛歌,這個條件用于Extractor對數據進行抽取時使用。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末擎椰,一起剝皮案震驚了整個濱河市支子,隨后出現的幾起案子,更是在濱河造成了極大的恐慌达舒,老刑警劉巖值朋,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件叹侄,死亡現場離奇詭異,居然都是意外死亡昨登,警方通過查閱死者的電腦和手機趾代,發(fā)現死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來丰辣,“玉大人撒强,你說我怎么就攤上這事◇鲜玻” “怎么了飘哨?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵,是天一觀的道長琐凭。 經常有香客問我芽隆,道長,這世上最難降的妖魔是什么统屈? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任胚吁,我火速辦了婚禮,結果婚禮上愁憔,老公的妹妹穿的比我還像新娘腕扶。我一直安慰自己,他們只是感情好吨掌,可當我...
    茶點故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布半抱。 她就那樣靜靜地躺著,像睡著了一般思犁。 火紅的嫁衣襯著肌膚如雪代虾。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天激蹲,我揣著相機與錄音棉磨,去河邊找鬼。 笑死学辱,一個胖子當著我的面吹牛乘瓤,可吹牛的內容都是我干的。 我是一名探鬼主播策泣,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼衙傀,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了萨咕?” 一聲冷哼從身側響起统抬,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后聪建,有當地人在樹林里發(fā)現了一具尸體钙畔,經...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年金麸,在試婚紗的時候發(fā)現自己被綠了擎析。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡挥下,死狀恐怖揍魂,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情棚瘟,我是刑警寧澤现斋,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站解取,受9級特大地震影響步责,放射性物質發(fā)生泄漏返顺。R本人自食惡果不足惜禀苦,卻給世界環(huán)境...
    茶點故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望遂鹊。 院中可真熱鬧振乏,春花似錦、人聲如沸秉扑。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽舟陆。三九已至误澳,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間秦躯,已是汗流浹背忆谓。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留踱承,地道東北人倡缠。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像茎活,于是被迫代替她去往敵國和親昙沦。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內容