[Spark] Spark的shuffle寫過(guò)程

相關(guān)博客鏈接:https://blog.csdn.net/hkl15111093042/article/details/94376896
http://www.reibang.com/p/1d714f0c5e07
簡(jiǎn)答:【Spark shuffle處于一個(gè)寬依賴魁蒜,可以實(shí)現(xiàn)類似混洗的功能般堆,可將相同的Key分發(fā)至同一個(gè)Reducer進(jìn)行處理(即將多個(gè)節(jié)點(diǎn)上的同一類數(shù)據(jù)匯集到某一節(jié)點(diǎn)進(jìn)行計(jì)算)】

Shuffle的重要性:無(wú)論是在Hadoop還是Spark中心剥,Shuffle階段會(huì)涉及到磁盤的讀寫和網(wǎng)絡(luò)傳輸尘喝,因此Shuffle的性能高低直接影響整個(gè)程序的性能和吞吐量贴膘。

1.Hadoop MapReduce中的shuffle

在mapreduce中,shuffle是連接Map和Reduce之間的橋梁胜茧,Map操作的輸出要到Reduce中必須經(jīng)過(guò)shuffle操作抑片。


image.png

mapreduce中的shuffle過(guò)程:
首先就是加載文件路徑:
Path inPath=new Path(args[0])
FileInputFormat.addInputPath(job,inPath)
之后的過(guò)程大概如下所示:


image.png

【說(shuō)明:】
partitioner:決定map的數(shù)據(jù)寫到哪一個(gè)分區(qū)塊中,被哪一個(gè)reduce獲攘蛱琛茧痕;
sort:保證map的輸出結(jié)果按照指定的排序方式進(jìn)入到reduce中;
combiner:說(shuō)明數(shù)據(jù)在map端是否需要進(jìn)行一次規(guī)約(reduce)恼除;
group:將同一個(gè)reduce中踪旷,相同的key規(guī)約到同一個(gè)組中曼氛;

【mapreduce中shuffle的調(diào)優(yōu)點(diǎn):】設(shè)置combiner,減少?gòu)膍ap到reduce的數(shù)量量和io開(kāi)銷令野。

注意舀患,另外的面試點(diǎn):
?气破?mapper和reducer的數(shù)量由什么決定聊浅,mapper的數(shù)量不可以指定,reducer的數(shù)量可以指定

2.Spark中Shuffle的寫操作

2.1 基于哈希的shuffle寫操作

Spark避免了Hadoop中多余的排序(即在Reduce之前獲取的數(shù)據(jù)要經(jīng)過(guò)排序)现使,提供基于哈希的Shuffle寫操作低匙。

1.【原理:】
<1> 每一個(gè)Mapper會(huì)根據(jù)Reducer的數(shù)量創(chuàng)建出對(duì)應(yīng)數(shù)量的bucket(bucket的數(shù)量就是M*R);
<2> Mapper生成的結(jié)果會(huì)根據(jù)設(shè)置的Partition算法填充到每個(gè)bucket中碳锈,這里的bucket是抽象的概念顽冶,在該機(jī)制中每個(gè)bucket對(duì)應(yīng)一個(gè)文件,
<3>當(dāng)Reduce啟動(dòng)時(shí)售碳,會(huì)根據(jù)任務(wù)的編號(hào)和所依賴的Mapper的編號(hào)强重,從遠(yuǎn)端或者本地獲取相應(yīng)的bucket作為Reduce任務(wù)的輸入

image.png

2.【HashShuffleManager】
博客鏈接:https://blog.csdn.net/weixin_39216383/article/details/81174950

image.png

這里的M*R個(gè),相當(dāng)于executor數(shù) * 每個(gè)executor上的task數(shù)目


image.png

優(yōu)化點(diǎn):如果我們使用HashShuffleManager贸人,可以設(shè)置spark.shuffle.consolidateFiles间景,該參數(shù)默認(rèn)置為false,將其設(shè)置為true即可開(kāi)啟優(yōu)化機(jī)制艺智。

image.png
image.png

3.【HashShuffleManager源碼流程】
見(jiàn)【Spark】HashShuffleManager

2.2 基于排序的shuffle寫操作
1.【HashShuffle的不足】:

<1> 過(guò)程中創(chuàng)建的文件數(shù)是M*R拱燃,M是當(dāng)前Shuffle Map Task的任務(wù)數(shù),而R是后續(xù)任務(wù)的任務(wù)數(shù)力惯。
如M和R都為1000,最終會(huì)產(chǎn)生1M個(gè)文件召嘶,對(duì)于文件系統(tǒng)負(fù)擔(dān)非常大父晶,同時(shí)在shuffle數(shù)據(jù)量不大而文件數(shù)量很多的情況下,隨機(jī)寫會(huì)降低I/O性能弄跌。
<2> 寫文件時(shí)甲喝,每一個(gè)Writer Handler默認(rèn)要100KB內(nèi)存,雖然Shuffle寫是分時(shí)運(yùn)行的铛只,其內(nèi)存所需是C * F *100KB埠胖,其中C為Spark集群中運(yùn)行的核數(shù),F(xiàn)為后續(xù)任務(wù)數(shù)(即Reduce數(shù)目)淳玩,緩存占用內(nèi)存也不小

2.【原理:】

區(qū)別:
每個(gè)Shuffle Map Task不會(huì)為后續(xù)的每個(gè)Task創(chuàng)建單獨(dú)的文件直撤,而是會(huì)將所有的結(jié)果寫在同一個(gè)文件中,對(duì)應(yīng)的生成一個(gè)Index文件進(jìn)行索引蜕着。
最終臨時(shí)文件的數(shù)量就是2 * reduce task num

運(yùn)行機(jī)制:
a.普通運(yùn)行機(jī)制 b.bypass運(yùn)行機(jī)制
當(dāng)shuffle read task的數(shù)量小于等于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值時(shí)(默認(rèn)為200)谋竖,就會(huì)啟用bypass機(jī)制

image.png

https://www.cnblogs.com/itboys/p/9201750.html

3.【BypassMergeSortShuffleWriter】

BypassMergeSortShuffleWriter:
未定義aggregator并且沒(méi)有開(kāi)啟mapside combine红柱,分區(qū)數(shù)小于spark.shuffle.sort.bypassMergeThreshold指定的分區(qū)數(shù)(默認(rèn)200),則直接每個(gè)reduce分區(qū)寫一個(gè)文件蓖乘,然后再合并锤悄,避免為溢寫文件合并而兩次序列化和反序列化。
源碼流程:
(1)前幾步與HashShuffleWriter一致嘉抒,獲取rdd與dependency零聚,并根據(jù)SparkEnv獲取對(duì)應(yīng)的ShuffleManager

image.png

(2)其中manager.getWriter方法的第一個(gè)參數(shù) dep.shuffleHandle

val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)

registerShuffle方法的具體實(shí)現(xiàn)

 override def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
      // need map-side aggregation, then write numPartitions files directly and just concatenate
      // them at the end. This avoids doing serialization and deserialization twice to merge
      // together the spilled files, which would happen with the normal code path. The downside is
      // having multiple files open at a time and thus more memory allocated to buffers.
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
      new SerializedShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // Otherwise, buffer map outputs in a deserialized form:
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }
  }

其中使用BypassMergeSortShuffleHandle的條件:
a、未開(kāi)啟mapSideCombine
b些侍、分區(qū)數(shù)小于
["spark.shuffle.sort.bypassMergeThreshold]

因?yàn)槎x了map端的合并隶症,無(wú)法避免要進(jìn)行排序;
如果分區(qū)數(shù)小于線程數(shù)娩梨,就無(wú)需排序沿腰,假如大于的話也不能忽略排序。

image.png

(3)BypassMergeSortShuffleWriter的write方法
a狈定、先根據(jù)分區(qū)數(shù)初始化partitionWriters數(shù)組


image.png

b颂龙、將數(shù)據(jù)迭代寫入
key的hash值,對(duì)partition的數(shù)目取余數(shù)


image.png

寫完提交關(guān)閉


image.png

c纽什、合并文件措嵌,寫DataFile和indexFile,同時(shí)更新MapStatus作為結(jié)果返回芦缰。


partitionLengths:long型數(shù)組企巢,表示每個(gè)分區(qū)的數(shù)據(jù)長(zhǎng)度,用于在indexFile中让蕾,表示每個(gè)分區(qū)的開(kāi)始的索引浪规,知道索引后,可以去dataFile中獲取數(shù)據(jù)

4.【SortShuffleWriter】

與ByPass機(jī)制的不同:
并不會(huì)像bypass機(jī)制一樣探孝,直接寫很多文件(按照每個(gè)Map Task對(duì)應(yīng)分區(qū)數(shù)寫對(duì)應(yīng)數(shù)量的文件)再進(jìn)行合并笋婿,而是采用了溢寫磁盤的方式產(chǎn)生臨時(shí)文件,并進(jìn)行合并顿颅。
源碼流程:
(1)前幾步與bypass一致缸濒,直到調(diào)用registerShuflle方法返回ShuffleHandle實(shí)例的時(shí)候:
在不滿足BypassMergeSortShuffleHandle和SerializedShuffleHandle的時(shí)候

image.png

根據(jù)BaseShuffleHandle匹配,返回ShuffleWriter為


image.png

(2)根據(jù)獲取到的rdd和dependecny粱腻,是否有mapSideCombine庇配,是否定義aggregator,來(lái)決定構(gòu)建不同的ExternalSorter(第二種情況中绍些,并不會(huì)在意每個(gè)分區(qū)中是否根據(jù)key進(jìn)行排序捞慌,如果是sortByKey算子也是會(huì)在reduce端進(jìn)行排序。)


image.png

ExternalSorter類的注釋上有這么一段:
需要在map端combine柬批,則使用PartitionedAppendOnlyMap寫內(nèi)存數(shù)據(jù)buffer卿闹;否則的話使用PartitionedPairBuffer揭糕。


image.png

(3)調(diào)用sorter的insertAll方法,將數(shù)據(jù)先寫入到buffer中锻霎,有需要的話再溢寫到磁盤中著角。
(4)insertAll方法中,

  def insertAll(records: Iterator[Product2[K, V]]): Unit = {
    // TODO: stop combining if we find that the reduction factor isn't high
    val shouldCombine = aggregator.isDefined

    if (shouldCombine) {
      // Combine values in-memory first using our AppendOnlyMap
      val mergeValue = aggregator.get.mergeValue
      val createCombiner = aggregator.get.createCombiner
      var kv: Product2[K, V] = null
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      while (records.hasNext) {
        addElementsRead()
        kv = records.next()
        map.changeValue((getPartition(kv._1), kv._1), update)
        maybeSpillCollection(usingMap = true)
      }
    } else {
      // Stick values into our buffer
      while (records.hasNext) {
        addElementsRead()
        val kv = records.next()
        buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
        maybeSpillCollection(usingMap = false)
      }
    }
  }

首先根據(jù)是否定義aggregator旋恼,決定寫內(nèi)存數(shù)據(jù)的buffer類型吏口,將數(shù)據(jù)迭代寫入,同時(shí)會(huì)判斷是否需要溢寫到磁盤上maybeSpillCollection冰更,溢寫條件判斷在maybeSpill方法中产徊,
最終的條件就是當(dāng)前的內(nèi)存占用大于等于初始化的內(nèi)存閾值[spark.shuffle.spill.initialMemoryThreshold],默認(rèn)值是5 * 1024 * 1024或者當(dāng)緩存的條數(shù)達(dá)到一定的量就進(jìn)行溢寫

image.png

maybeSpill過(guò)程描述:每寫入32條數(shù)據(jù)檢查1次蜀细,向內(nèi)存管理器申請(qǐng)執(zhí)行內(nèi)存(granted代表內(nèi)存真正分配的)舟铜,如果內(nèi)存真正占用超過(guò)了新的閾值,那么就進(jìn)行溢寫奠衔。

(5)溢寫文件調(diào)用的是ExternalSorter的spill方法

 override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
    val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
    spills += spillFile
  }

下圖方法返回分區(qū)的寫迭代器谆刨,會(huì)根據(jù)是否進(jìn)行mapSideCombine有兩種實(shí)現(xiàn)


image.png

a、PartitionedPairBuffer
的partitionedDestructiveSortedIterator归斤,返回的僅僅是基于partitionId進(jìn)行排序的一個(gè)迭代器
b痊夭、PartitionedAppendOnlyMap的
partitionedDestructiveSortedIterator返回的迭代器排序是基于(PartitionId,key)為key的排序迭代器

(6)寫入內(nèi)存buffer(或進(jìn)一步溢寫磁盤)后,合并DataFile及創(chuàng)建IndexFile

image.png

過(guò)程分析:
構(gòu)建blockId脏里,并調(diào)用writePartitionedFile進(jìn)入將數(shù)據(jù)寫入磁盤的動(dòng)作

val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)

writePartitionedFile:


image.png

a她我、如果沒(méi)有溢寫文件,無(wú)需合并的步驟直接構(gòu)建迭代器迫横,根據(jù)分區(qū)按照文件塊寫文件番舆。構(gòu)建迭代器的時(shí)候跟spill里的方式一樣
b、如果有溢寫文件矾踱,其迭代器 this.partitionedIterator 會(huì)根據(jù)是否有溢寫合蔽,是否需要排序分了三種情況:

  def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
    val usingMap = aggregator.isDefined
    val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
    if (spills.isEmpty) {
      // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
      // we don't even need to sort by anything other than partition ID
      if (!ordering.isDefined) {
        // The user hasn't requested sorted keys, so only sort by partition ID, not key
        groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
      } else {
        // We do need to sort by both partition ID and key
        groupByPartition(destructiveIterator(
          collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
      }
    } else {
      // Merge spilled and in-memory data
      merge(spills, destructiveIterator(
        collection.partitionedDestructiveSortedIterator(comparator)))
    }
  }

其中第三種情況,有溢寫的話介返,需要合并溢寫數(shù)據(jù)和內(nèi)存數(shù)據(jù):
定義了aggregator,要進(jìn)行基于aggregator的合并沃斤;
沒(méi)有定義aggregator圣蝎,但是有順序(如sortByKey),需要對(duì)數(shù)據(jù)排序但不需要合并merge衡瓶;
既沒(méi)有aggregator徘公,也沒(méi)有順序,直接返回哮针。

  private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
      : Iterator[(Int, Iterator[Product2[K, C]])] = {
    val readers = spills.map(new SpillReader(_))
    val inMemBuffered = inMemory.buffered
    (0 until numPartitions).iterator.map { p =>
      val inMemIterator = new IteratorForPartition(p, inMemBuffered)
      val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
      if (aggregator.isDefined) {
        // Perform partial aggregation across partitions
        (p, mergeWithAggregation(
          iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
      } else if (ordering.isDefined) {
        // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
        // sort the elements without trying to merge them
        (p, mergeSort(iterators, ordering.get))
      } else {
        (p, iterators.iterator.flatten)
      }
    }
  }

最后关面,遍歷每個(gè)MapTask的所有數(shù)據(jù)迭代器坦袍,將數(shù)據(jù)寫入數(shù)據(jù)文件;接著寫索引文件和構(gòu)建MapStatus等太。

5.【UnsafeShuffleWriter】"鎢絲計(jì)劃"

執(zhí)行過(guò)程圖與SortShuffleWriter基本一致捂齐,也是先寫內(nèi)存buffer,再溢寫到磁盤缩抡。
源碼流程:
(1)前幾步與bypass一致奠宜,直到調(diào)用SortShuffleManager.registerShuflle方法返回ShuffleHandle實(shí)例的時(shí)候:

image.png

在排除不是bypassMergeSortShuffleHandle的基礎(chǔ)上,滿足 1.對(duì)象需要支持序列化瞻想,2.不能定義aggregator压真,3.partition數(shù)目小于等于[MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE] (16777216個(gè))
注意:不能大于該數(shù)目的原因是
image.png

根據(jù)ShuffleHandle類型,返回UnsafeShuffleWriter實(shí)現(xiàn)類蘑险。
(2)UnsafeShuffleWriter的write方法

  boolean success = false;
    try {
      while (records.hasNext()) {
        insertRecordIntoSorter(records.next());
      }
      closeAndWriteOutput();
      success = true;
    } finally {
      if (sorter != null) {
        try {
          sorter.cleanupResources();
        } catch (Exception e) {
          // Only throw this error if we won't be masking another
          // error.
          if (success) {
            throw e;
          } else {
            logger.error("In addition to a failure during writing, we failed during " +
                         "cleanup.", e);
          }
        }
      }
    }

其中insertRecordIntoSorter方法滴肿,
定義一個(gè)會(huì)清空的serBuffer,大小為1024*1024(就是1M)佃迄,將數(shù)據(jù)寫入到序列化流中并且刷新泼差。

image.png

最后的sorter.insertRecord,該方法內(nèi)會(huì)判斷是否需要溢寫文件:
內(nèi)存中的record數(shù)目和屎,是否大于設(shè)定的[numElementsForSpillThreshold]拴驮,由屬性 spark.shuffle.spill.numElementsForceSpillThreshold指定

  public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
    throws IOException {

    // for tests
    assert(inMemSorter != null);
    if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
      logger.info("Spilling data because number of spilledRecords crossed the threshold " +
        numElementsForSpillThreshold);
    // 需要溢寫操作,調(diào)用spill函數(shù)
      spill();
    }
    // 判斷是否需要將數(shù)組增大柴信,以便接受數(shù)據(jù)套啤,如果空間不足,則觸發(fā)spill
    growPointerArrayIfNecessary();
    // Need 4 bytes to store the record length.
    final int required = length + 4;
    // 如果空間不足随常,就重新申請(qǐng)新的page潜沦,假如資源不足觸發(fā)spill
    // 已經(jīng)根據(jù)申請(qǐng)的字節(jié)數(shù),修改了currentPage和pageCursor的值
    acquireNewPageIfNecessary(required);

    assert(currentPage != null);
    /**
    * 獲取內(nèi)存地址绪氛,通過(guò)一個(gè)內(nèi)存地址(off-heap)或者一個(gè)對(duì)象引用的偏移(on-heap)追蹤
    * 尋址方式對(duì)比:
    * on-heap:先找到對(duì)象唆鸡,然后再找索引
    * off-heap:根據(jù)地址找到索引
    */
    final Object base = currentPage.getBaseObject();
    final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
    Platform.putInt(base, pageCursor, length);
    pageCursor += 4;
    Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
    pageCursor += length;
    inMemSorter.insertRecord(recordAddress, partitionId);
  }

on-heap和off-heap兩種模式是由下圖決定:


image.png

滿足的條件為:


image.png

(3)再分析下spill方法,調(diào)用的是自身ShuffleExternalSorter的spill方法枣察,其中進(jìn)一步調(diào)用的是writeSortedFile方法争占。
(方法太復(fù)雜,會(huì)另寫文章介紹具體過(guò)程)

(4)在writeSortedFile方法中序目,會(huì)先進(jìn)行排序臂痕,再將數(shù)據(jù)寫入磁盤
(5)不需要溢寫,會(huì)先判斷資源是否寫一條記錄猿涨,不足夠時(shí)會(huì)先進(jìn)行資源擴(kuò)展握童,然后將數(shù)據(jù)寫到內(nèi)存里面。如果資源不夠會(huì)觸發(fā)溢寫操作叛赚。
(6)在合并文件時(shí)澡绩,先將內(nèi)存殘余刷寫到磁盤中稽揭,接著進(jìn)行文件合并。
spill聚合文件的時(shí)候分為高效和低效兩種方式肥卡,spark.shuffle.unsafe.fastMergeEnabled為true時(shí)溪掀,并且沒(méi)有開(kāi)啟壓縮或者壓縮方式為snappy|LZF,可以采用非常高效的合并transferTo召调;否則只能使用fileStream-based膨桥。
(7)最后產(chǎn)生indexFile并且更新mapstatus

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市唠叛,隨后出現(xiàn)的幾起案子只嚣,更是在濱河造成了極大的恐慌,老刑警劉巖艺沼,帶你破解...
    沈念sama閱讀 206,839評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件册舞,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡障般,警方通過(guò)查閱死者的電腦和手機(jī)调鲸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)挽荡,“玉大人藐石,你說(shuō)我怎么就攤上這事《猓” “怎么了于微?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,116評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)青自。 經(jīng)常有香客問(wèn)我株依,道長(zhǎng),這世上最難降的妖魔是什么延窜? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,371評(píng)論 1 279
  • 正文 為了忘掉前任恋腕,我火速辦了婚禮,結(jié)果婚禮上逆瑞,老公的妹妹穿的比我還像新娘荠藤。我一直安慰自己,他們只是感情好获高,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布哈肖。 她就那樣靜靜地躺著,像睡著了一般谋减。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上扫沼,一...
    開(kāi)封第一講書(shū)人閱讀 49,111評(píng)論 1 285
  • 那天出爹,我揣著相機(jī)與錄音庄吼,去河邊找鬼。 笑死严就,一個(gè)胖子當(dāng)著我的面吹牛总寻,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播梢为,決...
    沈念sama閱讀 38,416評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼渐行,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了铸董?” 一聲冷哼從身側(cè)響起祟印,我...
    開(kāi)封第一講書(shū)人閱讀 37,053評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎粟害,沒(méi)想到半個(gè)月后蕴忆,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,558評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡悲幅,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評(píng)論 2 325
  • 正文 我和宋清朗相戀三年套鹅,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片汰具。...
    茶點(diǎn)故事閱讀 38,117評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡卓鹿,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出留荔,到底是詐尸還是另有隱情吟孙,我是刑警寧澤,帶...
    沈念sama閱讀 33,756評(píng)論 4 324
  • 正文 年R本政府宣布存谎,位于F島的核電站拔疚,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏既荚。R本人自食惡果不足惜稚失,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望恰聘。 院中可真熱鬧句各,春花似錦、人聲如沸晴叨。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,315評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)兼蕊。三九已至初厚,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背产禾。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,539評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工排作, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人亚情。 一個(gè)月前我還...
    沈念sama閱讀 45,578評(píng)論 2 355
  • 正文 我出身青樓妄痪,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親楞件。 傳聞我的和親對(duì)象是個(gè)殘疾皇子衫生,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容