spark RDD的迭代執(zhí)行流程

1.研究的重點和方法

本文主要就是研究rdd的執(zhí)行流程媳叨,主要重點是如何形成rdd的圖譜抖所,以及如何形成遞歸的迭代器進行數(shù)據(jù)的迭代計算的饱搏;
筆者對與rdd的執(zhí)行的流程的研究了網(wǎng)絡(luò)上的相關(guān)文章叭披、相關(guān)的rdd的代碼的閱讀辽狈,簡單的樣例代碼的調(diào)試慈参,以及原理代碼的編寫;其中樣例代碼的調(diào)試個人覺得非常重要刮萌,可以清晰的實踐一下你的代碼的執(zhí)行流程懂牧。

2.RDD圖譜的形成

2.1編寫簡單的樣例代碼

因為重點是研究rdd的執(zhí)行流程,樣例代碼只設(shè)計了窄依賴的變換尊勿,因為其流程相對于款依賴相對簡單僧凤。
文件系統(tǒng)我們只讀取本地文件
集群模式為本機模式,為了調(diào)試方便元扔,只啟動一個執(zhí)行線程
樣例代碼如下:

var sparkConf = new SparkConf().setAppName("Test").setMaster("local[1]"); //啟動一個線程
var sc = new SparkContext(sparkConf);
//讀取本地文件躯保、并調(diào)用兩次簡單的窄依賴變換
sc.textFile("/home/jack/t.txt")
  .map((_, 1))
  .filter(_._2 == 1)
  .collect();   //通過collect 提交真正的action,開始執(zhí)行任務
2.2 SparkContext.textFile

執(zhí)行代碼進行調(diào)試模式澎语,斷點進入textFile途事,來看看textFile的執(zhí)行

  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

繼續(xù)跟蹤hadoopFile方法,可以看到hadoopFile關(guān)鍵邏輯是實例化了一個hadoopRDD類擅羞,并返回尸变;

    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)

基于以上,textFile的關(guān)鍵代碼是調(diào)用了hadoopFile方法减俏,hadoopFile方法返回一個HadoopRDD召烂,而hadoopRDD.map又返回了一個MapPartitionRDD,最終textFile最終返回了這個RDD。

再看hadoopRDD.map 的邏輯

  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

??HadoopRDD的map方法是其父類RDD實現(xiàn)的娃承,在map方法中返回了一個新的MarPartitionRDD奏夫,并將當前RDD實例和用戶的map函數(shù)傳遞進去。
??MapPartitionRDD與HadoopRDD同樣都是繼承于RDD類历筝,MapPartitionRDD的特點是會通過傳遞進來的參數(shù)酗昼,持有父RDD和用戶的執(zhí)行函數(shù)。


RDD繼承關(guān)系

??而textFile為什么沒有直接返回HadoopRDD梳猪,我覺得應該是HadoopRDD的迭代器是一個key,Value迭代器麻削,其中key為行所在的字節(jié)offset值、value是行值春弥,而我們一般來說只關(guān)心value值呛哟,所以其做了一次map變換,只返回了value的迭代惕稻。
??在RDD類中竖共,同樣有filter、flatMap俺祠、distinct 等窄依賴等實現(xiàn)公给,基本套路都是生成一個新的MapPartitionRDD借帘,并將當前的RDD作為父RDD傳遞給新的RDD。
??綜上所述淌铐, SparkContext.textFile肺然,在核心流程上既是返回了new HadoopRDD().map(),也就是一個MapPartitionRDD腿准。

2.3 RDD.map际起、filter等變換方法

??如上文所述, map吐葱、filter街望、flatmap 等方法的思路其實大同小異,都是返回一個攜帶前置父RDD實例以及用戶傳遞的具體transform方法的MapParittionRDD實例弟跑。而這里要重點強調(diào)的是三個點:

  • MarPartitionRDD要持有前置的父RDD
    MapPartitionRDD會持有前置的灾前、變換前的RDD實例,這樣形成一個可以向前追溯的RDD圖譜孟辑,為后續(xù)的迭代計算提供依據(jù)哎甲。
  • MapParitionRDD要持有用戶傳遞的具體transform方法
    具體算法就也會在MapPartitionRDD中存儲。這樣后續(xù)的迭代變化時能夠回溯計算饲嗽;
  • map炭玫、filter、flatMap的具體迭代算法實現(xiàn)在scala的 Iterator中

以map方法為例展開講述以上三點貌虾,我們看到返回的MapPartitionRDD的構(gòu)造形式是:

new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))

??this即當前調(diào)用map的父RDD實例吞加,這個較容易理解。
??cleanF是用戶傳遞的map具體用戶函數(shù)酝惧, 但(context,pid,iter)=>iter.map(cleanF)這個形式榴鼎,如果scala不那么熟悉的話,一眼望過去著實看得有點吃力晚唇,其實這個類似于與c語言中的函數(shù)指針、java中的函數(shù)接口盗似,相當與定義了一個方法的類型哩陕;例如在scala中 var f:(Int)=>Int,即定義了一個方法類型:入?yún)镮nt赫舒,返回值為Int悍及,翻到MapPartitionRDD的源碼,可以看到第二個參數(shù)的定義為為 f: (TaskContext, Int, Iterator[T]) => Iterator[U], 那么這個f函數(shù)變量到底干了啥呢接癌,結(jié)合map具體傳遞形式心赶,我們可以看到f就是一個函數(shù),內(nèi)部調(diào)用了第三個參數(shù)傳遞進來的迭代器(Iterator[T])缺猛, 調(diào)用到Iterator的map方法缨叫,map方法的參數(shù)是用戶的cleanF椭符,并返回一個新的迭代器(Iterator[U]); 這些形成的transform的方法最終也會形成一個調(diào)用鏈耻姥, 最終由ResultTask的runTask從rdd的最后一個節(jié)點開始調(diào)用销钝,這個下文再詳細介紹。
??關(guān)于迭代器算法的實現(xiàn)琐簇,最終都在Iterator中蒸健,這些實現(xiàn)并非Spark框架中的,而都是scala語言框架的婉商,我想這也是spark首選scala來實現(xiàn)其理念的原因吧似忧。Iterator的map、filter丈秩、flatMap 等方法同樣接收用戶的方法盯捌,并返回一個新的Iterator,持續(xù)的進行迭代癣籽,以下就是Iterator中map的實現(xiàn)挽唉,可以看出他的核心代碼就是返回一個迭代器,而next方法的返回值筷狼, 是把前置的Iterator (self)的next方法返回值傳遞給用戶定義的回調(diào)方法(f)進行返回瓶籽,從而形成了一個遞歸的迭代調(diào)用:

  def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
    def hasNext = self.hasNext
    def next() = f(self.next())
  }

??綜上所述,RDD通過不斷的map埂材、filter塑顺、flatMap等變換,生成了一系列保存前置rdd和用戶變換函數(shù)的新的RDD俏险, 這些rdd按照調(diào)用順序形成了一個有序的圖譜严拒,等待最后的action動作最終的執(zhí)行,如下圖所示竖独。在《spark性能調(diào)優(yōu)與原理分析》一書中裤唠,作者把該過程做了一個我認為比較形象的比喻:rdd的圖譜形成的過程就像一個一步步形成的菜譜,每次transform調(diào)用都會記錄rdd的一次轉(zhuǎn)換莹痢,菜譜會記錄做菜的先后步驟种蘸,但只有要吃這道菜的時候,才會按照菜譜進行操作竞膳。而吃菜的動作就是后續(xù)我們要講述的collect航瞭、foreach等action操作觸發(fā)的數(shù)據(jù)迭代計算的過程。


RDD執(zhí)行變換流程.png

3. RDD數(shù)據(jù)的遞歸迭代計算

??在rdd的圖譜形成后坦辟,終歸我們是要調(diào)用一些action動作函數(shù)觸發(fā)數(shù)據(jù)的迭代計算的刊侯,最終返回我們的數(shù)據(jù)結(jié)果。

3.1 觸發(fā)計算的時機和位置

我們通過實際的方法調(diào)用的堆棧中最終找到锉走,觸發(fā)數(shù)據(jù)計算的位置是在 ResultTask中的runTask方法中滨彻;

  override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    func(context, rdd.iterator(partition, context))
  }

runTask的最后藕届,會調(diào)用最后一個子rdd的iterator方法,從而開始調(diào)用鏈條的頭端iterator疮绷。

3.2 iterator和compute方法

??iterator 方法是由RDD類實現(xiàn)的翰舌,其返回一個Iterator迭代器,iterator的內(nèi)部經(jīng)過幾次判定最終還是調(diào)用到rdd的compute方法冬骚;

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    if (isCheckpointedAndMaterialized) {
      firstParent[T].iterator(split, context)
    } else {
      compute(split, context)
    }
  }

  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]

而翻看到compute方法椅贱,發(fā)現(xiàn)其是個抽象方法,約定了其返回值類型是Iterator只冻,其實現(xiàn)是下方給MapPartitionRDD或者 HadoopRDD來實現(xiàn)的庇麦。

  • MapPartitionRDD的 compute
    我們先來看MapPartitionRDD的實現(xiàn)
  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

??可以看到調(diào)用了其保存的函數(shù)變量f, 而重要的第三個參數(shù)是firstParent.iterator()喜德, 其實就是調(diào)用了父RDD對象的iterator()方法山橄,而父iterator的調(diào)用就是父rdd的compute,例如如果是map調(diào)用舍悯,那就是iter.map(cleanF)航棱,f(,,firstParent.iterator)的展開就是 firstParent[T].iterator().map(clieanF);這樣依次的觸發(fā)上一級rdd的iterator萌衬,形成了一個迭代式的調(diào)用饮醇,最終得到返回的數(shù)據(jù)。

  • HadoopRDD的 compute
    ??這時我們肯定會想秕豫,sc.TextFile第一次創(chuàng)建的HadoopRDD的compute返回的迭代器是什么呢朴艰?這個迭代器總歸要去讀取數(shù)據(jù)源了吧;我們翻開HadoopRDD的compute實現(xiàn):
  override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
    val iter = new NextIterator[(K, V)] {
      //....................刪除了很多代碼
      private var reader: RecordReader[K, V] = null
      private val inputFormat = getInputFormat(jobConf)
      reader =   inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
      private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
      private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()

      override def getNext(): (K, V) = {
        try {
          finished = !reader.next(key, value)
        } catch {
          case e: FileNotFoundException if ignoreMissingFiles =>
            logWarning(s"Skipped missing file: ${split.inputSplit}", e)
            finished = true
          // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
          case e: FileNotFoundException if !ignoreMissingFiles => throw e
          case e: IOException if ignoreCorruptFiles =>
            logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
            finished = true
        }
        if (!finished) {
          inputMetrics.incRecordsRead(1)
        }
        if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
          updateBytesRead()
        }
        (key, value)
      }
    }
    new InterruptibleIterator[(K, V)](context, iter)
  }

??果然compute方法不一樣了混移, 代碼比較多祠墅,刪除了很多,但其核心實現(xiàn)還是返回一個Iterator歌径;而這個Iterator的getNext方法毁嗦,是利用一個LineRecorderReader,不斷的去讀取數(shù)據(jù)源中的行來實現(xiàn)的回铛。也即是迭代一次金矛,就去數(shù)據(jù)源讀取一行,直到所在分區(qū)數(shù)據(jù)結(jié)束勺届,具體的數(shù)據(jù)的行數(shù)據(jù)的迭代原理可以參看hadoop的LineRecordReader、UncompressedSplitLineReader娶耍、SplitLineReader以及LineReader等實現(xiàn)免姿。

3.2 迭代計算數(shù)據(jù)

??最后我們可以看到數(shù)據(jù)的迭代計算是與圖譜的形成反向觸發(fā)的的,圖譜形成后榕酒,action動作觸發(fā)的計算由最后的子rdd開始觸發(fā)胚膊,通過iterator和compute方法故俐,依次向前獲取數(shù)據(jù),最終到達首個hadoopRdd的compute紊婉,讀取文件中的一行药版,這樣形成一個類似迭代計算的流,如圖所示:


rdd調(diào)用迭代流程.png

而迭代的計算也有一個非常巨大的優(yōu)勢,既是我們在讀取文件的時候不用一次性的把文件都加載都內(nèi)存中,而是形成一個不斷的計算流一條條的加載都许、計算逐工、保存,節(jié)省了巨量的內(nèi)存对嚼,這種思路是非常值得我們參考和學習的。

4.使用java來實現(xiàn)一個簡單的rdd迭代調(diào)用

4.1 基本思路

??在研究完spark的rdd的調(diào)用模式后,發(fā)現(xiàn)其實現(xiàn)思路還是挺不錯的剩盒,再研究過程中因為spark的rdd部分都是用scala語言編寫的,因為scala的不熟悉帶來了一些難度慨蛙,在研究的過程中其實筆者還采取了一個辦法就是把相關(guān)的代碼看的大體明白后辽聊,用java又實現(xiàn)了一個簡約模式,當java版的rdd成功運行后期贫,也對其模式調(diào)用有了一些更深的理解跟匆。

4.2 基本實現(xiàn)類

java 簡約版的rdd實現(xiàn)包括了如下的組件:

  • Iterable類, 其實就是iterator唯灵,模仿了scala的Iterator贾铝,參考其實現(xiàn)了最基礎(chǔ)的map、filter和foreach
  • RDD類埠帕, java版rdd垢揩,實現(xiàn)了spark中的RDD,實現(xiàn)RDD的map敛瓷、filter和foreach叁巨,實現(xiàn)iterator,就是返回compute呐籽, 留著 compute作為抽象方法
  • MapRDD類锋勺, 模仿MapPartitionRDD,重寫了compute方法
  • ListRDD類狡蝶, 模仿HadoopRDD庶橱,不同的是ListRDD的數(shù)據(jù)源是一個ArrayList,compute返回的迭代器是對ArrayList的迭代贪惹。
4.2 實現(xiàn)代碼

Iterable類

import java.util.NoSuchElementException;

public abstract class Iterable<T> {
    @FunctionalInterface
    interface MapCallback<T1, T2> {
        T2 invoke(T1 num);
    }

    @FunctionalInterface
    interface ForeachCallback<T1> {
        void invoke(T1 num);
    }

    @FunctionalInterface
    interface FilterCallback<T1> {
        boolean invoke(T1 t);
    }

    public <RT> Iterable<RT> map(MapCallback<T, RT> callback) {
        Iterable<T> self = Iterable.this;
        return new Iterable<RT>() {
            @Override
            public boolean hasNext() {
                return self.hasNext();
            }

            @Override
            public RT next() {
                return callback.invoke(self.next());
            }
        };
    }

    public Iterable<T> filter(FilterCallback<T> callback) {

        return new Iterable<T>() {
            Iterable<T> self = Iterable.this;
            T nextValue = null;

            @Override
            public boolean hasNext() {
                if (self.hasNext() == false) {
                    nextValue = null;
                    return false;
                }
                else {
                    nextValue = self.next();
                    while (callback.invoke(nextValue) == false) {
                        if (self.hasNext() == false) {
                            nextValue = null;
                            return false;
                        }
                        nextValue = self.next();
                    }
                    return true;
                }
            }

            @Override
            public T next() {
                if (nextValue != null) {
                    T curr = nextValue;
                    nextValue = null;
                    return curr;
                }
                else if (this.hasNext()) {
                    T curr = nextValue;
                    nextValue = null;
                    return curr;
                }
                return null;
            }
        };
    }

    public void foreach(ForeachCallback<T> callback) {
        while (this.hasNext()) {
            callback.invoke(this.next());
        }
    }

    public abstract boolean hasNext();

    public abstract T next();

}

RDD類

public abstract class RDD<T> {
    interface RDDCallback<T1, T2> {
        Iterable<T2> invoke(Iterable<T1> it);
    }

    public Iterable<T> iterator() {
        return compute();
    }

    public <U> RDD<U> map(Iterable.MapCallback<T, U> callback) {
        return new MapRDD<U, T>(this, (it -> it.map(callback)));
    }

    public RDD<T> filter(Iterable.FilterCallback<T> callback) {
        return new MapRDD<T, T>(this, (it -> it.filter(callback)));
    }

    public abstract Iterable<T> compute();

    public void foreach(Iterable.ForeachCallback<T> callback) {
        this.iterator().foreach(callback);
    }
    
}

MapRDD

public class MapRDD<U, T> extends RDD<U> {

    RDDCallback<T, U> f;
    RDD<T> parent;

    public MapRDD(RDD<T> parent, RDDCallback<T, U> f) {
        this.parent = parent;
        this.f = f;
    }

    @Override
    public Iterable<U> compute() {
        Iterable<U> it = f.invoke(parent.iterator());
        return it;
    }
}

ListRDD

import java.util.Iterator;
import java.util.List;

public class ListRDD<T> extends RDD<T> {
    List<T> list;

    private ListRDD(List<T> list) {
        this.list = list;
    }

    @Override
    public Iterable<T> compute() {
        return new Iterable<T>() {
            Iterator<T> iterator = list.iterator();

            @Override
            public boolean hasNext() {
                return iterator.hasNext();
            }

            @Override
            public T next() {
                return iterator.next();
            }
        };
    }

    public static <T> RDD<T> makeRDD(List<T> list) {
        return new ListRDD<>(list);
    }
}

ok,下面是主函數(shù)苏章,開始調(diào)用

import java.util.ArrayList;
import java.util.List;

public class JavaMain {
    public static void main(String[] args) {
        //創(chuàng)建一個List作為數(shù)據(jù)源
        List<Integer>  list = new ArrayList<>();
        for(int i = 0; i <= 5; i++) list.add(i);
        //通過list創(chuàng)建一個ListRDD,并依次調(diào)用map、filter和foreach
        ListRDD.makeRDD(list)
                .map((num)->new Pair<>(String.valueOf(num), num))
                .filter(p->p.getValue() > 1)
                .foreach((p)->{System.out.println(p.toString());});
    }
}

運行后枫绅,輸出結(jié)果

2=2
3=3
4=4
5=5
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末泉孩,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子并淋,更是在濱河造成了極大的恐慌寓搬,老刑警劉巖,帶你破解...
    沈念sama閱讀 210,914評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件县耽,死亡現(xiàn)場離奇詭異句喷,居然都是意外死亡,警方通過查閱死者的電腦和手機酬诀,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評論 2 383
  • 文/潘曉璐 我一進店門脏嚷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人瞒御,你說我怎么就攤上這事父叙。” “怎么了肴裙?”我有些...
    開封第一講書人閱讀 156,531評論 0 345
  • 文/不壞的土叔 我叫張陵趾唱,是天一觀的道長。 經(jīng)常有香客問我蜻懦,道長甜癞,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,309評論 1 282
  • 正文 為了忘掉前任宛乃,我火速辦了婚禮悠咱,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘征炼。我一直安慰自己析既,他們只是感情好,可當我...
    茶點故事閱讀 65,381評論 5 384
  • 文/花漫 我一把揭開白布谆奥。 她就那樣靜靜地躺著眼坏,像睡著了一般。 火紅的嫁衣襯著肌膚如雪酸些。 梳的紋絲不亂的頭發(fā)上宰译,一...
    開封第一講書人閱讀 49,730評論 1 289
  • 那天,我揣著相機與錄音魄懂,去河邊找鬼沿侈。 笑死,一個胖子當著我的面吹牛市栗,可吹牛的內(nèi)容都是我干的肋坚。 我是一名探鬼主播,決...
    沈念sama閱讀 38,882評論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼智厌!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起盲赊,我...
    開封第一講書人閱讀 37,643評論 0 266
  • 序言:老撾萬榮一對情侶失蹤铣鹏,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后哀蘑,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體诚卸,經(jīng)...
    沈念sama閱讀 44,095評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,448評論 2 325
  • 正文 我和宋清朗相戀三年绘迁,在試婚紗的時候發(fā)現(xiàn)自己被綠了合溺。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,566評論 1 339
  • 序言:一個原本活蹦亂跳的男人離奇死亡缀台,死狀恐怖棠赛,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情膛腐,我是刑警寧澤睛约,帶...
    沈念sama閱讀 34,253評論 4 328
  • 正文 年R本政府宣布,位于F島的核電站哲身,受9級特大地震影響辩涝,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜勘天,卻給世界環(huán)境...
    茶點故事閱讀 39,829評論 3 312
  • 文/蒙蒙 一怔揩、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧脯丝,春花似錦商膊、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至砰苍,卻和暖如春潦匈,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背赚导。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評論 1 264
  • 我被黑心中介騙來泰國打工茬缩, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人吼旧。 一個月前我還...
    沈念sama閱讀 46,248評論 2 360
  • 正文 我出身青樓凰锡,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子掂为,可洞房花燭夜當晚...
    茶點故事閱讀 43,440評論 2 348

推薦閱讀更多精彩內(nèi)容