[譯]Spark Streaming編程指南(三)

DStreams轉(zhuǎn)換(Transformation)

和RDD類似,轉(zhuǎn)換中允許輸入DStream中的數(shù)據(jù)被修改邪锌。DStream支持很多Spark RDD上的轉(zhuǎn)換。常用的轉(zhuǎn)換如下。

轉(zhuǎn)換 含義
map(func) 將源DStream中的每個(gè)元素傳給函數(shù)func,返回新的DStream寡痰。
flatMap(func) 和map類似抗楔,但是每個(gè)輸入條目可以映射到0或多個(gè)輸出條目。
filter(func) 選擇源DStream中經(jīng)過func處理后返回true的元素拦坠,返回新的DStream连躏。
repartition(numPartitions) 改變DStream的并行級(jí)別,可創(chuàng)建更多或者更少的分區(qū)贞滨。
union(otherStream) 返回新的DStream入热,包含源DStream和otherDStream元素的union。
count() 通過對(duì)每個(gè)RDD的元素計(jì)數(shù)返回單元素RDDs的DStream晓铆。
reduce(func) 使用函數(shù)func(兩個(gè)參數(shù)和一個(gè)返回值)勺良,通過對(duì)每個(gè)RDD元素進(jìn)行聚合返回單元素RDDs的DStream。函數(shù)應(yīng)該是可結(jié)合和可交換的尤蒿,以便進(jìn)行并行計(jì)算郑气。
countByValue() 在元素類型為K的DStream上調(diào)用時(shí),返回一個(gè)(K, Long)對(duì)的DStream腰池,每個(gè)key的value是每個(gè)RDD中這個(gè)key出現(xiàn)的頻率。
reduceByKey(func, [numTasks]) 在(K, V)對(duì)的DStream上調(diào)用時(shí)忙芒,返回一個(gè)新(K, V)對(duì)的DStream示弓,每個(gè)key的value是使用給定reduce函數(shù)進(jìn)行聚合的結(jié)果。注意:默認(rèn)情況呵萨,使用Spark的默認(rèn)并行任務(wù)數(shù)量(本地模式為2奏属,集群模式數(shù)量由spark.default.parallelism配置項(xiàng)決定)進(jìn)行分組〕甭停可以傳遞可選參數(shù)numTasks任務(wù)數(shù)量囱皿。
join(otherStream, [numTasks]) 當(dāng)在(K, V)對(duì)的DStream和(K, W)對(duì)的DStream上調(diào)用時(shí),返回(K, (V, W))對(duì)的DStream忱嘹。
cogroup(otherStream, [numTasks]) 當(dāng)在(K, V)對(duì)的DStream和(K, W)對(duì)的DStream上調(diào)用時(shí)嘱腥,返回(K, Seq[V], Seq[W])元組的DStream。
transform(func) 對(duì)源DStream的每個(gè)RDD應(yīng)用一個(gè)RDD-to-RDD的函數(shù)拘悦,返回一個(gè)新DStream齿兔。可用于在DStream上進(jìn)行任意RDD操作础米。
updateStateByKey(func) 返回新的"state" DStream分苇,通過對(duì)key的前一個(gè)state和新值應(yīng)用給定方法更新每個(gè)key的state∑ㄉ#可用于維護(hù)每個(gè)key的任意state數(shù)據(jù)医寿。

其中幾個(gè)轉(zhuǎn)換需要詳細(xì)說明。

UpdateStateByKey操作
當(dāng)連續(xù)使用新信息更新state時(shí)蘑斧,updateStateByKey操作允許用戶維護(hù)任意狀態(tài)靖秩。使用這個(gè)操作艾帐,必須包含以下兩個(gè)步驟。

  1. 定義state - state可以是任意數(shù)據(jù)類型盆偿。
  2. 定義state更新函數(shù) - 用一個(gè)函數(shù)指定如何使用前一個(gè)state和來自輸入流的新值更新state柒爸。

在每個(gè)批次,Spark會(huì)對(duì)所有存在的key應(yīng)用state更新函數(shù)事扭,不管這些key是否有新數(shù)據(jù)捎稚。如果更新函數(shù)返回None,則key-value對(duì)會(huì)消除求橄。

用一個(gè)例子說明今野。維護(hù)每個(gè)單詞的運(yùn)行計(jì)數(shù),運(yùn)行計(jì)數(shù)是一個(gè)state并且是個(gè)整數(shù)罐农。定義更新函數(shù)如下:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

應(yīng)用在包含單詞對(duì)(pair DStream包含(word, 1)条霜,參見示例)的DStream上。

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

更新函數(shù)會(huì)在每個(gè)單詞上進(jìn)行調(diào)用涵亏,newValues是1's的序列(來自(word, 1)對(duì))宰睡,runningCount是之前的計(jì)數(shù)。

注意气筋,使用updateStateByKey要求配置檢查點(diǎn)目錄拆内,之后詳細(xì)討論。

Transform操作
transform操作(以及其變化transformWith)可以在DStream上應(yīng)用任意RDD-to-RDD函數(shù)宠默◆锘校可用于使用任意沒有暴露給DStream API的RDD操作。例如搀矫,對(duì)數(shù)據(jù)流中每個(gè)批次的數(shù)據(jù)和另一個(gè)數(shù)據(jù)集進(jìn)行join的功能沒有直接暴露給DStream API抹沪。但是,可以簡(jiǎn)單地使用transform完成瓤球。這增加了很多可能性融欧。例如,通過對(duì)輸入數(shù)據(jù)流和預(yù)先計(jì)算的垃圾信息(也可能是Spark生成的)進(jìn)行join冰垄,完成實(shí)時(shí)數(shù)據(jù)清理蹬癌,然后基于結(jié)果進(jìn)行篩選。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}

注意虹茶,在每個(gè)批次時(shí)間間隔中逝薪,提供的函數(shù)都會(huì)被調(diào)用『铮可用于完成隨時(shí)間變化的RDD操作董济,也就是說,RDD操作要门,分區(qū)數(shù)量和廣播變量等可以在批次之間改變虏肾。

Window操作
Spark Streaming也提供了windowed計(jì)算廓啊,可在一個(gè)滑動(dòng)window數(shù)據(jù)上應(yīng)用轉(zhuǎn)換。下圖進(jìn)行說明封豪。

image.png

如上圖所示谴轮,每次在源DStream滑動(dòng)window,落在window中的源RDDs會(huì)被合并和操作用于產(chǎn)生windowed DStream的RDDs吹埠。在這個(gè)例子中第步,操作應(yīng)用在后面3個(gè)時(shí)間單位的數(shù)據(jù)上,以2個(gè)時(shí)間單位進(jìn)行滑動(dòng)缘琅。任何window操作都需要指定兩個(gè)參數(shù)粘都。

  • window長(zhǎng)度 - window持續(xù)時(shí)間(上圖是3)。
  • 滑動(dòng)間隔 - window操作執(zhí)行的間隔(上圖是2)刷袍。

這兩個(gè)參數(shù)必須是源DStream批時(shí)間間隔的倍數(shù)翩隧。

下面用一個(gè)示例說明window操作。擴(kuò)展之前的示例呻纹,生成過去30s的單詞計(jì)數(shù)堆生,每10s一次。為實(shí)現(xiàn)這個(gè)功能居暖,必須對(duì)pair DStream過去30s的數(shù)據(jù)應(yīng)用reduceByKey顽频。這里使用reduceByKeyAndWindow操作。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

一些通用的window操作如下太闺。所有操作都需要兩個(gè)參數(shù),windowLength和slideInterval嘁圈。

轉(zhuǎn)換 含義
window(windowLength, slideInterval) 返回新的DStream省骂,基于源DStream的windowed批次進(jìn)行計(jì)算。
countByWindow(windowLength, slideInterval) 返回?cái)?shù)據(jù)流中元素的滑動(dòng)窗口計(jì)數(shù)最住。
reduceByWindow(func, windowLength, slideInterval) 返回單元素?cái)?shù)據(jù)流钞澳,使用func函數(shù),聚合滑動(dòng)時(shí)間間隔的數(shù)據(jù)流元素進(jìn)行創(chuàng)建涨缚。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 當(dāng)在(K, V)對(duì)DStream上調(diào)用時(shí)轧粟,返回一個(gè)新的(K, V)對(duì)DStream,在滑動(dòng)window的批次上脓魏,使用給定reduce函數(shù)func通過聚合得到每個(gè)key的value兰吟。注意:默認(rèn)情況下,使用Spark的默認(rèn)并行任務(wù)數(shù)量(本地模式是2茂翔,集群模式根據(jù)配置屬性spark.default.parallelism確定)混蔼。可傳遞可選參數(shù)numTasks設(shè)置不同的任務(wù)數(shù)量珊燎。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 上面方法的另一個(gè)版本惭嚣,每個(gè)window的reduce value使用前一個(gè)window的reduce values增量計(jì)算遵湖。通過reduce進(jìn)入滑動(dòng)window的數(shù)據(jù)完成這個(gè)操作,然后逆向reduce離開window的舊數(shù)據(jù)晚吞。舉個(gè)例子延旧,加減keys的數(shù)量作為window slides。但是槽地,只適用于"可逆向reduce函數(shù)"迁沫,也就是說,有對(duì)應(yīng)"逆向reduce"函數(shù)的那些reduce函數(shù)(即invFunc參數(shù))闷盔。和上面的方法類似弯洗,reduce任務(wù)的數(shù)量可以通過可選參數(shù)進(jìn)行配置。注意逢勾,使用這個(gè)操作必須啟用檢查點(diǎn)牡整。
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 當(dāng)在(K, V)對(duì)DStream上調(diào)用時(shí),返回一個(gè)新的(K, Long)對(duì)DStream溺拱,每個(gè)key的value是這個(gè)key在滑動(dòng)window中的頻次逃贝。和reduceByKeyAndWindow類似,reduce任務(wù)的數(shù)量可以通過可選參數(shù)進(jìn)行配置迫摔。

Join操作
最后沐扳,值得強(qiáng)調(diào)的是,可以很容易地在Spark Streaming中執(zhí)行不同類型的join句占。

Stream-stream joins

數(shù)據(jù)流可以很容易地與其它數(shù)據(jù)流進(jìn)行join沪摄。

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)

這里,在每個(gè)批時(shí)間間隔中纱烘,stream1生成的RDD都會(huì)和stream2生成deRDD進(jìn)行join杨拐。也可以使用leftOuterJoinrightOuterJoinfullOuterJoin擂啥。另外哄陶,在流的window之間進(jìn)行join非常有用。

val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)

Stream-dataset joins

之前解釋DStream.transform操作時(shí)進(jìn)行了說明哺壶。這里是另外一個(gè)windowed數(shù)據(jù)流和數(shù)據(jù)集之間進(jìn)行join的示例屋吨。

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

實(shí)際上,可以動(dòng)態(tài)修改要進(jìn)行join的數(shù)據(jù)集山宾。提供給transform的函數(shù)會(huì)在每個(gè)批時(shí)間間隔進(jìn)行評(píng)估至扰,然后dataset指向的當(dāng)前數(shù)據(jù)集。

完整的DStream轉(zhuǎn)換列表在API文件中塌碌。Scala請(qǐng)參考DStreamPairDStreamFunctions渊胸。Java請(qǐng)參考avaDStreamJavaPairDStream。Python請(qǐng)參考DStream台妆。

DStreams輸出操作

輸出操作允許DStream的數(shù)據(jù)寫入到外部系統(tǒng)中翎猛,如數(shù)據(jù)庫或者文件系統(tǒng)胖翰。由于輸出操作實(shí)際上允許轉(zhuǎn)換數(shù)據(jù)被外部系統(tǒng)消費(fèi),所以輸出操作出發(fā)了所有DStream轉(zhuǎn)換的額實(shí)際執(zhí)行(類似RDDs的action)切厘。目前萨咳,定義了如下輸出操作:

輸出操作 含義
print() 在運(yùn)行streaming應(yīng)用程序的驅(qū)動(dòng)節(jié)點(diǎn)上輸出DStream中每個(gè)批次的前10個(gè)元素。 對(duì)于開發(fā)和調(diào)試非常有用疫稿。Python API要調(diào)用pprint()培他。
saveAsTextFiles(prefix, [suffix]) 將DStream的內(nèi)容存儲(chǔ)為文件。每個(gè)批次時(shí)間間隔的文件名字基于prefix和suffix生成:"prefix-TIME_IN_MS[.suffix]"遗座。
saveAsObjectFiles(prefix, [suffix]) 將DStream的內(nèi)容存儲(chǔ)為Java對(duì)象序列化的SequenceFiles舀凛。每個(gè)批次時(shí)間間隔的文件名字基于prefix和suffix生成:"prefix-TIME_IN_MS[.suffix]"。Python API不支持途蒋。
saveAsHadoopFiles(prefix, [suffix]) 將DStream的內(nèi)容存儲(chǔ)為Hadoop文件猛遍。每個(gè)批次時(shí)間間隔的文件名字基于prefix和suffix生成:"prefix-TIME_IN_MS[.suffix]"。Python API不支持号坡。
foreachRDD(func) 最通用的輸出操作符懊烤,在stream生成的每個(gè)RDD上應(yīng)用函數(shù)func。這個(gè)函數(shù)應(yīng)該講每個(gè)RDD的數(shù)據(jù)發(fā)送到外部系統(tǒng)宽堆,如保存RDD到文件或者通過網(wǎng)絡(luò)寫入到數(shù)據(jù)庫腌紧。注意,函數(shù)func在運(yùn)行streaming應(yīng)用程序的驅(qū)動(dòng)進(jìn)程中執(zhí)行畜隶,并且函數(shù)中通常會(huì)有RDD actions出發(fā)streaming RDDs的計(jì)算壁肋。

使用foreachRDD的設(shè)計(jì)模式

dstream.foreachRDD是一個(gè)強(qiáng)大的原語,允許數(shù)據(jù)發(fā)送到外部系統(tǒng)籽慢。但是墩划,理解如何正確高效地使用這個(gè)原語非常重要。避免一些常見錯(cuò)誤的方式如下嗡综。

通常寫數(shù)據(jù)到外部系統(tǒng)要求創(chuàng)建連接對(duì)象(如TCP連接到遠(yuǎn)程服務(wù))并使用連接發(fā)送數(shù)據(jù)到遠(yuǎn)程系統(tǒng)。為達(dá)到這個(gè)目的杜漠,開發(fā)者可能大一地在Spark driver創(chuàng)建連接對(duì)象极景,然后嘗試在Spark worker中使用連接來保存RDDs中的記錄。例如:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

這種做法是錯(cuò)誤驾茴,因?yàn)檫@要求連接對(duì)象要序列化并且發(fā)送到worker中去盼樟。這樣的連接對(duì)象很少跨機(jī)器進(jìn)行傳遞。這個(gè)錯(cuò)誤可能會(huì)顯示為序列化錯(cuò)誤(連接對(duì)象未進(jìn)行序列化)锈至,初始化錯(cuò)誤(連接對(duì)象需要在worker節(jié)點(diǎn)初始化)等等晨缴。正確的方法是在worker節(jié)點(diǎn)創(chuàng)建連接對(duì)象。

但是峡捡,這會(huì)導(dǎo)致另外一個(gè)常見錯(cuò)誤 - 為每條記錄創(chuàng)建一個(gè)新的連接击碗。例如筑悴,

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

創(chuàng)建連接對(duì)象需要時(shí)間和資源開銷。因此稍途,為每條記錄創(chuàng)建和銷毀連接對(duì)象會(huì)引發(fā)不必要的高開銷并會(huì)顯著降低系統(tǒng)的吞吐量阁吝。好的解決防范是使用rdd.foreachPartition - 創(chuàng)建一個(gè)連接對(duì)象,然后使用它發(fā)送一個(gè)RDD分區(qū)中的所有記錄械拍。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

這種方式將連接創(chuàng)建的開銷平攤到多條記錄當(dāng)中了突勇。

最后,可以通過在多個(gè)RDDs/批次之間重用連接對(duì)象進(jìn)一步優(yōu)化坷虑〖撞觯可以維護(hù)一個(gè)連接對(duì)象的靜態(tài)池,重用連接對(duì)象將RDDs的多個(gè)批次發(fā)送到外部系統(tǒng)迄损,進(jìn)一步減少開銷定躏。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

注意,池子中的連接應(yīng)該是按照需要懶惰創(chuàng)建的并且如果一定時(shí)間不用會(huì)超時(shí)海蔽。這實(shí)現(xiàn)了向外部系統(tǒng)發(fā)送數(shù)據(jù)最有效的方式共屈。

其它需要記住的點(diǎn):

  • DStream通過輸出操作懶惰執(zhí)行,就想RDDs通過RDD action懶惰執(zhí)行党窜。DStream輸出操作中的RDD actions會(huì)強(qiáng)制接收數(shù)據(jù)的處理拗引。因此,如果應(yīng)用程序沒有任何輸出操作或者有dstream.foreachRDD()這種不帶任何RDD action的輸出操作幌衣,什么也不會(huì)執(zhí)行矾削。系統(tǒng)會(huì)簡(jiǎn)單地接收數(shù)據(jù)并丟棄數(shù)據(jù)。
  • 默認(rèn)地豁护,輸出操作只會(huì)執(zhí)行一次哼凯。會(huì)按照在應(yīng)用程序的定義順序執(zhí)行。

DataFrame和SQL操作

可對(duì)流數(shù)據(jù)使用DataFrames and SQL操作楚里。需要使用SparkContext創(chuàng)建一個(gè)SparkSession断部。必須這樣做,才能從驅(qū)動(dòng)程序錯(cuò)誤中恢復(fù)重啟班缎。通過創(chuàng)建SparkSession的懶實(shí)例化單例完成蝴光。下面是一個(gè)示例。修改了之前的示例达址,使用DataFrames和SQL生成單詞計(jì)數(shù)蔑祟。每個(gè)RDD會(huì)轉(zhuǎn)換為DataFrame,作為臨時(shí)表注冊(cè)沉唠,然后使用SQL查詢疆虚。

/** DataFrame operations inside your streaming program */

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // Create a temporary view
  wordsDataFrame.createOrReplaceTempView("words")

  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame = 
    spark.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

完整代碼參見source code

也可以從另外一個(gè)線程在表上執(zhí)行查詢(異步運(yùn)行StreamingContext)。只要保證設(shè)置StreamingContext記住查詢需要的足量數(shù)據(jù)即可径簿。否則StreamingContext罢屈,無法識(shí)別異步查詢,會(huì)在查詢完成前刪除舊的流數(shù)據(jù)牍帚。例如儡遮,如果想查詢上一個(gè)批次的數(shù)據(jù),但是你的查詢可能需要運(yùn)行5分鐘暗赶,則調(diào)用streamingContext.remember(Minutes(5))鄙币。

關(guān)于DataFrame的更多信息參見DataFrames and SQL

MLib操作

可以使用MLlib提供的機(jī)器學(xué)習(xí)算法蹂随。首先十嘿,streaming機(jī)器學(xué)習(xí)算法(如Streaming Linear RegressionStreaming KMeans等)岳锁,可以同時(shí)從流數(shù)據(jù)中學(xué)習(xí)并在流數(shù)據(jù)上應(yīng)用模型绩衷。除了這些,對(duì)于更大一類的機(jī)器學(xué)習(xí)算法激率,可以離線學(xué)習(xí)模型(如使用歷史數(shù)據(jù))咳燕,然后將模型應(yīng)用于在線流數(shù)據(jù)。具體參見MLlib乒躺。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末招盲,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子嘉冒,更是在濱河造成了極大的恐慌曹货,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,816評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件讳推,死亡現(xiàn)場(chǎng)離奇詭異顶籽,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)银觅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門礼饱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人究驴,你說我怎么就攤上這事慨仿。” “怎么了纳胧?”我有些...
    開封第一講書人閱讀 158,300評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)帘撰。 經(jīng)常有香客問我跑慕,道長(zhǎng),這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,780評(píng)論 1 285
  • 正文 為了忘掉前任核行,我火速辦了婚禮牢硅,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘芝雪。我一直安慰自己减余,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,890評(píng)論 6 385
  • 文/花漫 我一把揭開白布惩系。 她就那樣靜靜地躺著位岔,像睡著了一般。 火紅的嫁衣襯著肌膚如雪堡牡。 梳的紋絲不亂的頭發(fā)上抒抬,一...
    開封第一講書人閱讀 50,084評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音晤柄,去河邊找鬼擦剑。 笑死,一個(gè)胖子當(dāng)著我的面吹牛芥颈,可吹牛的內(nèi)容都是我干的惠勒。 我是一名探鬼主播,決...
    沈念sama閱讀 39,151評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼爬坑,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼纠屋!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起妇垢,我...
    開封第一講書人閱讀 37,912評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤巾遭,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后闯估,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體灼舍,經(jīng)...
    沈念sama閱讀 44,355評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,666評(píng)論 2 327
  • 正文 我和宋清朗相戀三年涨薪,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了骑素。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,809評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡刚夺,死狀恐怖献丑,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情侠姑,我是刑警寧澤创橄,帶...
    沈念sama閱讀 34,504評(píng)論 4 334
  • 正文 年R本政府宣布,位于F島的核電站莽红,受9級(jí)特大地震影響妥畏,放射性物質(zhì)發(fā)生泄漏邦邦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,150評(píng)論 3 317
  • 文/蒙蒙 一醉蚁、第九天 我趴在偏房一處隱蔽的房頂上張望燃辖。 院中可真熱鬧,春花似錦网棍、人聲如沸黔龟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽氏身。三九已至,卻和暖如春罗捎,著一層夾襖步出監(jiān)牢的瞬間观谦,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評(píng)論 1 267
  • 我被黑心中介騙來泰國(guó)打工桨菜, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留豁状,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,628評(píng)論 2 362
  • 正文 我出身青樓倒得,卻偏偏與公主長(zhǎng)得像泻红,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子霞掺,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,724評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容