詳細(xì)探究Spark0.8的shuffle實(shí)現(xiàn)

Background
在MapReduce框架中墙贱,shuffle是連接Map和Reduce之間的橋梁捧韵,Map的輸出要用到Reduce中必須經(jīng)過shuffle這個(gè)環(huán)節(jié)颤枪,shuffle的性能高低直接影響了整個(gè)程序的性能和吞吐量渤昌。Spark作為MapReduce框架的一種實(shí)現(xiàn)寿烟,自然也實(shí)現(xiàn)了shuffle的邏輯澈驼,本文就深入研究Spark的shuffle是如何實(shí)現(xiàn)的,有什么優(yōu)缺點(diǎn)筛武,與Hadoop MapReduce的shuffle有什么不同盅藻。
Shuffle
Shuffle是MapReduce框架中的一個(gè)特定的phase,介于Map phase和Reduce phase之間畅铭,當(dāng)Map的輸出結(jié)果要被Reduce使用時(shí)氏淑,輸出結(jié)果需要按key哈希,并且分發(fā)到每一個(gè)Reducer上去硕噩,這個(gè)過程就是shuffle假残。由于shuffle涉及到了磁盤的讀寫和網(wǎng)絡(luò)的傳輸,因此shuffle性能的高低直接影響到了整個(gè)程序的運(yùn)行效率炉擅。
下面這幅圖清晰地描述了MapReduce算法的整個(gè)流程辉懒,其中shuffle phase是介于Map phase和Reduce phase之間。


mapreduce running process

概念上shuffle就是一個(gè)溝通數(shù)據(jù)連接的橋梁谍失,那么實(shí)際上shuffle這一部分是如何實(shí)現(xiàn)的的呢眶俩,下面我們就以Spark為例講一下shuffle在Spark中的實(shí)現(xiàn)。
Spark Shuffle進(jìn)化史
先以圖為例簡(jiǎn)單描述一下Spark中shuffle的整一個(gè)流程:


spark shuffle process

首先每一個(gè)Mapper會(huì)根據(jù)Reducer的數(shù)量創(chuàng)建出相應(yīng)的bucket快鱼,bucket的數(shù)量是M
×
R

" role="presentation" style="display: inline; font-style: normal; font-weight: normal; line-height: normal; font-size: 14px; text-indent: 0px; text-align: left; text-transform: none; letter-spacing: normal; word-spacing: normal; word-wrap: normal; white-space: nowrap; float: none; direction: ltr; max-width: none; max-height: none; min-width: 0px; min-height: 0px; border: 0px; padding: 0px; margin: 0px; position: relative;">M×R
M
×
R

颠印,其中M

" role="presentation" style="display: inline; font-style: normal; font-weight: normal; line-height: normal; font-size: 14px; text-indent: 0px; text-align: left; text-transform: none; letter-spacing: normal; word-spacing: normal; word-wrap: normal; white-space: nowrap; float: none; direction: ltr; max-width: none; max-height: none; min-width: 0px; min-height: 0px; border: 0px; padding: 0px; margin: 0px; position: relative;">M
M

是Map的個(gè)數(shù),R

" role="presentation" style="display: inline; font-style: normal; font-weight: normal; line-height: normal; font-size: 14px; text-indent: 0px; text-align: left; text-transform: none; letter-spacing: normal; word-spacing: normal; word-wrap: normal; white-space: nowrap; float: none; direction: ltr; max-width: none; max-height: none; min-width: 0px; min-height: 0px; border: 0px; padding: 0px; margin: 0px; position: relative;">R
R

是Reduce的個(gè)數(shù)抹竹。
其次Mapper產(chǎn)生的結(jié)果會(huì)根據(jù)設(shè)置的partition算法填充到每個(gè)bucket中去线罕。這里的partition算法是可以自定義的,當(dāng)然默認(rèn)的算法是根據(jù)key哈希到不同的bucket中去窃判。
當(dāng)Reducer啟動(dòng)時(shí)钞楼,它會(huì)根據(jù)自己task的id和所依賴的Mapper的id從遠(yuǎn)端或是本地的block manager中取得相應(yīng)的bucket作為Reducer的輸入進(jìn)行處理。

這里的bucket是一個(gè)抽象概念袄琳,在實(shí)現(xiàn)中每個(gè)bucket可以對(duì)應(yīng)一個(gè)文件询件,可以對(duì)應(yīng)文件的一部分或是其他等。
接下來我們分別從shuffle writeshuffle fetch這兩塊來講述一下Spark的shuffle進(jìn)化史唆樊。
Shuffle Write
在Spark 0.6和0.7的版本中宛琅,對(duì)于shuffle數(shù)據(jù)的存儲(chǔ)是以文件的方式存儲(chǔ)在block manager中,與rdd.persist(StorageLevel.DISk_ONLY)
采取相同的策略窗轩,可以參看:
override def run(attemptId: Long): MapStatus = {

val numOutputSplits = dep.partitioner.numPartitions

...

// Partition the map output.

val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])

for (elem <- rdd.iterator(split, taskContext)) {

val pair = elem.asInstanceOf[(Any, Any)]

val bucketId = dep.partitioner.getPartition(pair._1)

buckets(bucketId) += pair

}

...

val blockManager = SparkEnv.get.blockManager

for (i <- 0 until numOutputSplits) {

val blockId = "shuffle_" + dep.shuffleId + "" + partition + "" + i

// Get a Scala iterator from Java map

val iter: Iterator[(Any, Any)] = buckets(i).iterator

val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)

totalBytes += size

}

...

}

我已經(jīng)將一些干擾代碼刪去夯秃。可以看到Spark在每一個(gè)Mapper中為每個(gè)Reducer創(chuàng)建一個(gè)bucket痢艺,并將RDD計(jì)算結(jié)果放進(jìn)bucket中仓洼。需要注意的是每個(gè)bucket是一個(gè)ArrayBuffer
,也就是說Map的輸出結(jié)果是會(huì)先存儲(chǔ)在內(nèi)存堤舒。
接著Spark會(huì)將ArrayBuffer中的Map輸出結(jié)果寫入block manager所管理的磁盤中色建,這里文件的命名方式為:shuffle_ + shuffle_id + "" + map partition id + "" + shuffle partition id

早期的shuffle write有兩個(gè)比較大的問題:
Map的輸出必須先全部存儲(chǔ)到內(nèi)存中舌缤,然后寫入磁盤箕戳。這對(duì)內(nèi)存是一個(gè)非常大的開銷,當(dāng)內(nèi)存不足以存儲(chǔ)所有的Map output時(shí)就會(huì)出現(xiàn)OOM国撵。
每一個(gè)Mapper都會(huì)產(chǎn)生Reducer number個(gè)shuffle文件陵吸,如果Mapper個(gè)數(shù)是1k,Reducer個(gè)數(shù)也是1k介牙,那么就會(huì)產(chǎn)生1M個(gè)shuffle文件壮虫,這對(duì)于文件系統(tǒng)是一個(gè)非常大的負(fù)擔(dān)。同時(shí)在shuffle數(shù)據(jù)量不大而shuffle文件又非常多的情況下环础,隨機(jī)寫也會(huì)嚴(yán)重降低IO的性能囚似。

在Spark 0.8版本中,shuffle write采用了與RDD block write不同的方式线得,同時(shí)也為shuffle write單獨(dú)創(chuàng)建了ShuffleBlockManager
饶唤,部分解決了0.6和0.7版本中遇到的問題。
首先我們來看一下Spark 0.8的具體實(shí)現(xiàn):
override def run(attemptId: Long): MapStatus = {

...

val blockManager = SparkEnv.get.blockManager

var shuffle: ShuffleBlocks = null

var buckets: ShuffleWriterGroup = null

try {

// Obtain all the block writers for shuffle blocks.

val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)

shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)

buckets = shuffle.acquireWriters(partition)

// Write the map output to its associated buckets.

for (elem <- rdd.iterator(split, taskContext)) {

val pair = elem.asInstanceOf[Product2[Any, Any]]

val bucketId = dep.partitioner.getPartition(pair._1)

buckets.writers(bucketId).write(pair)

}

// Commit the writes. Get the size of each bucket block (total block size).

var totalBytes = 0L

val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>

writer.commit()

writer.close()

val size = writer.size()

totalBytes += size

MapOutputTracker.compressSize(size)

}

...

} catch { case e: Exception =>

// If there is an exception from running the task, revert the partial writes

// and throw the exception upstream to Spark.

if (buckets != null) {

buckets.writers.foreach(_.revertPartialWrites())

}

throw e

} finally {

// Release the writers back to the shuffle block manager.

if (shuffle != null && buckets != null) {

shuffle.releaseWriters(buckets)

}

// Execute the callbacks on task completion.

taskContext.executeOnCompleteCallbacks()

}

}

}

在這個(gè)版本中為shuffle write添加了一個(gè)新的類ShuffleBlockManager
贯钩,由ShuffleBlockManager
來分配和管理bucket募狂。同時(shí)ShuffleBlockManager
為每一個(gè)bucket分配一個(gè)DiskObjectWriter
,每個(gè)write handler擁有默認(rèn)100KB的緩存角雷,使用這個(gè)write handler將Map output寫入文件中熬尺。可以看到現(xiàn)在的寫入方式變?yōu)閎uckets.writers(bucketId).write(pair)
谓罗,也就是說Map output的key-value pair是逐個(gè)寫入到磁盤而不是預(yù)先把所有數(shù)據(jù)存儲(chǔ)在內(nèi)存中在整體flush到磁盤中去粱哼。
ShuffleBlockManager
的代碼如下所示:
private[spark]

class ShuffleBlockManager(blockManager: BlockManager) {

def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = {

new ShuffleBlocks {

// Get a group of writers for a map task.

override def acquireWriters(mapId: Int): ShuffleWriterGroup = {

val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024

val writers = Array.tabulateBlockObjectWriter { bucketId =>

val blockId = ShuffleBlockManager.blockId(shuffleId, bucketId, mapId)

blockManager.getDiskBlockWriter(blockId, serializer, bufferSize)

}

new ShuffleWriterGroup(mapId, writers)

}

override def releaseWriters(group: ShuffleWriterGroup) = {

// Nothing really to release here.

}

}

}

}

Spark 0.8顯著減少了shuffle的內(nèi)存壓力,現(xiàn)在Map output不需要先全部存儲(chǔ)在內(nèi)存中檩咱,再flush到硬盤揭措,而是record-by-record寫入到磁盤中。同時(shí)對(duì)于shuffle文件的管理也獨(dú)立出新的ShuffleBlockManager
進(jìn)行管理刻蚯,而不是與rdd cache文件在一起了绊含。
但是這一版Spark 0.8的shuffle write仍然有兩個(gè)大的問題沒有解決:
首先依舊是shuffle文件過多的問題,shuffle文件過多一是會(huì)造成文件系統(tǒng)的壓力過大炊汹,二是會(huì)降低IO的吞吐量躬充。
其次雖然Map output數(shù)據(jù)不再需要預(yù)先在內(nèi)存中evaluate顯著減少了內(nèi)存壓力,但是新引入的DiskObjectWriter
所帶來的buffer開銷也是一個(gè)不容小視的內(nèi)存開銷。假定我們有1k個(gè)Mapper和1k個(gè)Reducer充甚,那么就會(huì)有1M個(gè)bucket以政,于此同時(shí)就會(huì)有1M個(gè)write handler,而每一個(gè)write handler默認(rèn)需要100KB內(nèi)存伴找,那么總共需要100GB的內(nèi)存盈蛮。這樣的話僅僅是buffer就需要這么多的內(nèi)存,內(nèi)存的開銷是驚人的技矮。當(dāng)然實(shí)際情況下這1k個(gè)Mapper是分時(shí)運(yùn)行的話抖誉,所需的內(nèi)存就只有cores * reducer numbers * 100KB
大小了。但是reducer數(shù)量很多的話衰倦,這個(gè)buffer的內(nèi)存開銷也是蠻厲害的袒炉。

為了解決shuffle文件過多的情況,Spark 0.8.1引入了新的shuffle consolidation樊零,以期顯著減少shuffle文件的數(shù)量我磁。
首先我們以圖例來介紹一下shuffle consolidation的原理。


spark shuffle consolidation process

假定該job有4個(gè)Mapper和4個(gè)Reducer淹接,有2個(gè)core十性,也就是能并行運(yùn)行兩個(gè)task。我們可以算出Spark的shuffle write共需要16個(gè)bucket塑悼,也就有了16個(gè)write handler劲适。在之前的Spark版本中,每一個(gè)bucket對(duì)應(yīng)的是一個(gè)文件厢蒜,因此在這里會(huì)產(chǎn)生16個(gè)shuffle文件霞势。
而在shuffle consolidation中每一個(gè)bucket并非對(duì)應(yīng)一個(gè)文件,而是對(duì)應(yīng)文件中的一個(gè)segment斑鸦,同時(shí)shuffle consolidation所產(chǎn)生的shuffle文件數(shù)量與Spark core的個(gè)數(shù)也有關(guān)系愕贡。在上面的圖例中,job的4個(gè)Mapper分為兩批運(yùn)行巷屿,在第一批2個(gè)Mapper運(yùn)行時(shí)會(huì)申請(qǐng)8個(gè)bucket固以,產(chǎn)生8個(gè)shuffle文件;而在第二批Mapper運(yùn)行時(shí)嘱巾,申請(qǐng)的8個(gè)bucket并不會(huì)再產(chǎn)生8個(gè)新的文件憨琳,而是追加寫到之前的8個(gè)文件后面,這樣一共就只有8個(gè)shuffle文件旬昭,而在文件內(nèi)部這有16個(gè)不同的segment篙螟。因此從理論上講shuffle consolidation所產(chǎn)生的shuffle文件數(shù)量為C
×
R

" role="presentation" style="display: inline; font-style: normal; font-weight: normal; line-height: normal; font-size: 14px; text-indent: 0px; text-align: left; text-transform: none; letter-spacing: normal; word-spacing: normal; word-wrap: normal; white-space: nowrap; float: none; direction: ltr; max-width: none; max-height: none; min-width: 0px; min-height: 0px; border: 0px; padding: 0px; margin: 0px; position: relative;">C×R
C
×
R

,其中C

" role="presentation" style="display: inline; font-style: normal; font-weight: normal; line-height: normal; font-size: 14px; text-indent: 0px; text-align: left; text-transform: none; letter-spacing: normal; word-spacing: normal; word-wrap: normal; white-space: nowrap; float: none; direction: ltr; max-width: none; max-height: none; min-width: 0px; min-height: 0px; border: 0px; padding: 0px; margin: 0px; position: relative;">C
C

是Spark集群的core number问拘,R

" role="presentation" style="display: inline; font-style: normal; font-weight: normal; line-height: normal; font-size: 14px; text-indent: 0px; text-align: left; text-transform: none; letter-spacing: normal; word-spacing: normal; word-wrap: normal; white-space: nowrap; float: none; direction: ltr; max-width: none; max-height: none; min-width: 0px; min-height: 0px; border: 0px; padding: 0px; margin: 0px; position: relative;">R
R

是Reducer的個(gè)數(shù)遍略。
需要注意的是當(dāng) M
=
C

" role="presentation" style="display: inline; font-style: normal; font-weight: normal; line-height: normal; font-size: 14px; text-indent: 0px; text-align: left; text-transform: none; letter-spacing: normal; word-spacing: normal; word-wrap: normal; white-space: nowrap; float: none; direction: ltr; max-width: none; max-height: none; min-width: 0px; min-height: 0px; border: 0px; padding: 0px; margin: 0px; position: relative;">M=C
M
=
C

時(shí)shuffle consolidation所產(chǎn)生的文件數(shù)和之前的實(shí)現(xiàn)是一樣的惧所。
Shuffle consolidation顯著減少了shuffle文件的數(shù)量,解決了之前版本一個(gè)比較嚴(yán)重的問題绪杏,但是writer handler的buffer開銷過大依然沒有減少下愈,若要減少writer handler的buffer開銷,我們只能減少Reducer的數(shù)量寞忿,但是這又會(huì)引入新的問題驰唬,下文將會(huì)有詳細(xì)介紹顶岸。
講完了shuffle write的進(jìn)化史腔彰,接下來要講一下shuffle fetch了,同時(shí)還要講一下Spark的aggregator辖佣,這一塊對(duì)于Spark實(shí)際應(yīng)用的性能至關(guān)重要霹抛。
Shuffle Fetch and Aggregator
Shuffle write寫出去的數(shù)據(jù)要被Reducer使用,就需要shuffle fetcher將所需的數(shù)據(jù)fetch過來卷谈,這里的fetch包括本地和遠(yuǎn)端杯拐,因?yàn)閟huffle數(shù)據(jù)有可能一部分是存儲(chǔ)在本地的。Spark對(duì)shuffle fetcher實(shí)現(xiàn)了兩套不同的框架:NIO通過socket連接去fetch數(shù)據(jù)世蔗;OIO通過netty server去fetch數(shù)據(jù)端逼。分別對(duì)應(yīng)的類是BasicBlockFetcherIterator
和NettyBlockFetcherIterator

在Spark 0.7和更早的版本中污淋,只支持BasicBlockFetcherIterator
顶滩,而BasicBlockFetcherIterator
在shuffle數(shù)據(jù)量比較大的情況下performance始終不是很好,無法充分利用網(wǎng)絡(luò)帶寬寸爆,為了解決這個(gè)問題礁鲁,添加了新的shuffle fetcher來試圖取得更好的性能。對(duì)于早期shuffle性能的評(píng)測(cè)可以參看Spark usergroup赁豆。當(dāng)然現(xiàn)在BasicBlockFetcherIterator
的性能也已經(jīng)好了很多仅醇,使用的時(shí)候可以對(duì)這兩種實(shí)現(xiàn)都進(jìn)行測(cè)試比較。
接下來說一下aggregator魔种。我們都知道在Hadoop MapReduce的shuffle過程中析二,shuffle fetch過來的數(shù)據(jù)會(huì)進(jìn)行merge sort,使得相同key下的不同value按序歸并到一起供Reducer使用节预,這個(gè)過程可以參看下圖:

mapreduce shuffle process

所有的merge sort都是在磁盤上進(jìn)行的叶摄,有效地控制了內(nèi)存的使用,但是代價(jià)是更多的磁盤IO心铃。
那么Spark是否也有merge sort呢准谚,還是以別的方式實(shí)現(xiàn),下面我們就細(xì)細(xì)說明去扣。
首先雖然Spark屬于MapReduce體系柱衔,但是對(duì)傳統(tǒng)的MapReduce算法進(jìn)行了一定的改變樊破。Spark假定在大多數(shù)用戶的case中,shuffle數(shù)據(jù)的sort不是必須的唆铐,比如word count哲戚,強(qiáng)制地進(jìn)行排序只會(huì)使性能變差,因此Spark并不在Reducer端做merge sort艾岂。既然沒有merge sort那Spark是如何進(jìn)行reduce的呢顺少?這就要說到aggregator了。
aggregator本質(zhì)上是一個(gè)hashmap王浴,它是以map output的key為key脆炎,以任意所要combine的類型為value的hashmap。當(dāng)我們?cè)谧鰓ord count reduce計(jì)算count值的時(shí)候氓辣,它會(huì)將shuffle fetch到的每一個(gè)key-value pair更新或是插入到hashmap中(若在hashmap中沒有查找到秒裕,則插入其中;若查找到則更新value值)钞啸。這樣就不需要預(yù)先把所有的key-value進(jìn)行merge sort几蜻,而是來一個(gè)處理一個(gè),省下了外部排序這一步驟体斩。但同時(shí)需要注意的是reducer的內(nèi)存必須足以存放這個(gè)partition的所有key和count值梭稚,因此對(duì)內(nèi)存有一定的要求。
在上面word count的例子中絮吵,因?yàn)関alue會(huì)不斷地更新弧烤,而不需要將其全部記錄在內(nèi)存中,因此內(nèi)存的使用還是比較少的源武《笸剩考慮一下如果是group by key這樣的操作,Reducer需要得到key對(duì)應(yīng)的所有value粱栖。在Hadoop MapReduce中话浇,由于有了merge sort,因此給予Reducer的數(shù)據(jù)已經(jīng)是group by key了闹究,而Spark沒有這一步幔崖,因此需要將key和對(duì)應(yīng)的value全部存放在hashmap中,并將value合并成一個(gè)array渣淤∩涂埽可以想象為了能夠存放所有數(shù)據(jù),用戶必須確保每一個(gè)partition足夠小到內(nèi)存能夠容納价认,這對(duì)于內(nèi)存是一個(gè)非常嚴(yán)峻的考驗(yàn)嗅定。因此Spark文檔中建議用戶涉及到這類操作的時(shí)候盡量增加partition,也就是增加Mapper和Reducer的數(shù)量用踩。
增加Mapper和Reducer的數(shù)量固然可以減小partition的大小渠退,使得內(nèi)存可以容納這個(gè)partition忙迁。但是我們?cè)趕huffle write中提到,bucket和對(duì)應(yīng)于bucket的write handler是由Mapper和Reducer的數(shù)量決定的碎乃,task越多姊扔,bucket就會(huì)增加的更多,由此帶來write handler所需的buffer也會(huì)更多梅誓。在一方面我們?yōu)榱藴p少內(nèi)存的使用采取了增加task數(shù)量的策略恰梢,另一方面task數(shù)量增多又會(huì)帶來buffer開銷更大的問題,因此陷入了內(nèi)存使用的兩難境地梗掰。
為了減少內(nèi)存的使用嵌言,只能將aggregator的操作從內(nèi)存移到磁盤上進(jìn)行,Spark社區(qū)也意識(shí)到了Spark在處理數(shù)據(jù)規(guī)模遠(yuǎn)遠(yuǎn)大于內(nèi)存大小時(shí)所帶來的問題愧怜。因此PR303提供了外部排序的實(shí)現(xiàn)方案呀页,相信在Spark 0.9 release的時(shí)候妈拌,這個(gè)patch應(yīng)該能merge進(jìn)去拥坛,到時(shí)候內(nèi)存的使用量可以顯著地減少。
End
本文詳細(xì)地介紹了Spark的shuffle實(shí)現(xiàn)是如何進(jìn)化的尘分,以及遇到問題解決問題的過程猜惋。shuffle作為Spark程序中很重要的一個(gè)環(huán)節(jié),直接影響了Spark程序的性能培愁,現(xiàn)如今的Spark版本雖然shuffle實(shí)現(xiàn)還存在著種種問題著摔,但是相比于早期版本,已經(jīng)有了很大的進(jìn)步定续。開源代碼就是如此不停地迭代推進(jìn)谍咆,隨著Spark的普及程度越來越高,貢獻(xiàn)的人越來越多私股,相信后續(xù)的版本會(huì)有更大的提升摹察。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市倡鲸,隨后出現(xiàn)的幾起案子供嚎,更是在濱河造成了極大的恐慌,老刑警劉巖峭状,帶你破解...
    沈念sama閱讀 218,284評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件克滴,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡优床,警方通過查閱死者的電腦和手機(jī)劝赔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來胆敞,“玉大人着帽,你說我怎么就攤上這事罩阵。” “怎么了启摄?”我有些...
    開封第一講書人閱讀 164,614評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵稿壁,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我歉备,道長(zhǎng)傅是,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,671評(píng)論 1 293
  • 正文 為了忘掉前任蕾羊,我火速辦了婚禮喧笔,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘龟再。我一直安慰自己书闸,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評(píng)論 6 392
  • 文/花漫 我一把揭開白布利凑。 她就那樣靜靜地躺著浆劲,像睡著了一般。 火紅的嫁衣襯著肌膚如雪哀澈。 梳的紋絲不亂的頭發(fā)上牌借,一...
    開封第一講書人閱讀 51,562評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音割按,去河邊找鬼膨报。 笑死,一個(gè)胖子當(dāng)著我的面吹牛适荣,可吹牛的內(nèi)容都是我干的现柠。 我是一名探鬼主播,決...
    沈念sama閱讀 40,309評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼弛矛,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼够吩!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起汪诉,我...
    開封第一講書人閱讀 39,223評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤废恋,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后扒寄,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鱼鼓,經(jīng)...
    沈念sama閱讀 45,668評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評(píng)論 3 336
  • 正文 我和宋清朗相戀三年该编,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了迄本。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,981評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡课竣,死狀恐怖嘉赎,靈堂內(nèi)的尸體忽然破棺而出置媳,到底是詐尸還是另有隱情,我是刑警寧澤公条,帶...
    沈念sama閱讀 35,705評(píng)論 5 347
  • 正文 年R本政府宣布拇囊,位于F島的核電站,受9級(jí)特大地震影響靶橱,放射性物質(zhì)發(fā)生泄漏寥袭。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評(píng)論 3 330
  • 文/蒙蒙 一关霸、第九天 我趴在偏房一處隱蔽的房頂上張望传黄。 院中可真熱鬧,春花似錦队寇、人聲如沸膘掰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽识埋。三九已至,卻和暖如春苍日,著一層夾襖步出監(jiān)牢的瞬間惭聂,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評(píng)論 1 270
  • 我被黑心中介騙來泰國打工相恃, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人笨觅。 一個(gè)月前我還...
    沈念sama閱讀 48,146評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親蛆封。 傳聞我的和親對(duì)象是個(gè)殘疾皇子喻杈,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容