Spark詳解03Job 物理執(zhí)行圖

Job 物理執(zhí)行圖

在 Overview 里我們初步介紹了 DAG 型的物理執(zhí)行圖对室,里面包含 stages 和 tasks。這一章主要解決的問題是:

給定 job 的邏輯執(zhí)行圖刁品,如何生成物理執(zhí)行圖(也就是 stages 和 tasks)?

一個(gè)復(fù)雜 job 的邏輯執(zhí)行圖

ComplexJob

代碼貼在本章最后。給定這樣一個(gè)復(fù)雜數(shù)據(jù)依賴圖施禾,如何合理劃分 stage颁湖,并確定 task 的類型和個(gè)數(shù)宣蠕?

一個(gè)直觀想法是將前后關(guān)聯(lián)的 RDDs 組成一個(gè) stage,每個(gè)箭頭生成一個(gè) task甥捺。對于兩個(gè) RDD 聚合成一個(gè) RDD 的情況抢蚀,這三個(gè) RDD 組成一個(gè) stage。這樣雖然可以解決問題涎永,但顯然效率不高思币。除了效率問題,這個(gè)想法還有一個(gè)更嚴(yán)重的問題:大量中間數(shù)據(jù)需要存儲羡微。對于 task 來說谷饿,其執(zhí)行結(jié)果要么要存到磁盤,要么存到內(nèi)存妈倔,或者兩者皆有博投。如果每個(gè)箭頭都是 task 的話,每個(gè) RDD 里面的數(shù)據(jù)都需要存起來盯蝴,占用空間可想而知毅哗。

仔細(xì)觀察一下邏輯執(zhí)行圖會發(fā)現(xiàn):在每個(gè) RDD 中听怕,每個(gè) partition 是獨(dú)立的,也就是說在 RDD 內(nèi)部虑绵,每個(gè) partition 的數(shù)據(jù)依賴各自不會相互干擾尿瞭。因此,一個(gè)大膽的想法是將整個(gè)流程圖看成一個(gè) stage翅睛,為最后一個(gè) finalRDD 中的每個(gè) partition 分配一個(gè) task声搁。圖示如下:

ComplexTask.png

所有的粗箭頭組合成第一個(gè) task,該 task 計(jì)算結(jié)束后順便將 CoGroupedRDD 中已經(jīng)計(jì)算得到的第二個(gè)和第三個(gè) partition 存起來捕发。之后第二個(gè) task(細(xì)實(shí)線)只需計(jì)算兩步疏旨,第三個(gè) task(細(xì)虛線)也只需要計(jì)算兩步,最后得到結(jié)果扎酷。

這個(gè)想法有兩個(gè)不靠譜的地方:

  • 第一個(gè) task 太大檐涝,碰到 ShuffleDependency 后,不得不計(jì)算 shuffle 依賴的 RDDs 的所有 partitions法挨,而且都在這一個(gè) task 里面計(jì)算谁榜。
  • 需要設(shè)計(jì)巧妙的算法來判斷哪個(gè) RDD 中的哪些 partition 需要 cache。而且 cache 會占用存儲空間坷剧。

雖然這是個(gè)不靠譜的想法惰爬,但有一個(gè)可取之處,即 pipeline 思想:數(shù)據(jù)用的時(shí)候再算惫企,而且數(shù)據(jù)是流到要計(jì)算的位置的撕瞧。比如在第一個(gè) task 中,從 FlatMappedValuesRDD 中的 partition 向前推算狞尔,只計(jì)算要用的(依賴的) RDDs 及 partitions丛版。在第二個(gè) task 中,從 CoGroupedRDD 到 FlatMappedValuesRDD 計(jì)算過程中偏序,不需要存儲中間結(jié)果(MappedValuesRDD 中 partition 的全部數(shù)據(jù))页畦。

更進(jìn)一步,從 record 粒度來講研儒,如下圖中豫缨,第一個(gè) pattern 中先算 g(f(record1)),然后原始的 record1 和 f(record1) 都可以丟掉端朵,然后再算 g(f(record2))好芭,丟掉中間結(jié)果,最后算 g(f(record3))冲呢。對于第二個(gè) pattern 中的 g舍败,record1 進(jìn)入 g 后,理論上可以丟掉(除非被手動 cache)。其他 pattern 同理邻薯。

pipeline.png

回到 stage 和 task 的劃分問題裙戏,上面不靠譜想法的主要問題是碰到 ShuffleDependency 后無法進(jìn)行 pipeline。那么只要在 ShuffleDependency 處斷開厕诡,就只剩 NarrowDependency累榜,而 NarrowDependency chain 是可以進(jìn)行 pipeline 的。按照此思想灵嫌,上面 ComplexJob 的劃分圖如下:

ComplexJobStage.png

所以劃分算法就是:從后往前推算信柿,遇到 ShuffleDependency 就斷開,遇到 NarrowDependency 就將其加入該 stage醒第。每個(gè) stage 里面 task 的數(shù)目由該 stage 最后一個(gè) RDD 中的 partition 個(gè)數(shù)決定。

粗箭頭表示 task进鸠。因?yàn)槭菑暮笸巴扑愠砺虼俗詈笠粋€(gè) stage 的 id 是 0,stage 1 和 stage 2 都是 stage 0 的 parents客年。如果 stage 最后要產(chǎn)生 result霞幅,那么該 stage 里面的 task 都是 ResultTask,否則都是 ShuffleMapTask量瓜。之所以稱為 ShuffleMapTask 是因?yàn)槠溆?jì)算結(jié)果需要 shuffle 到下一個(gè) stage司恳,本質(zhì)上相當(dāng)于 MapReduce 中的 mapper。ResultTask 相當(dāng)于 MapReduce 中的 reducer(如果需要從 parent stage 那里 shuffle 數(shù)據(jù))绍傲,也相當(dāng)于普通 mapper(如果該 stage 沒有 parent stage)扔傅。

還有一個(gè)問題:算法中提到 NarrowDependency chain 可以 pipeline,可是這里的 ComplexJob 只展示了 OneToOneDependency 和 RangeDependency 的 pipeline烫饼,普通 NarrowDependency 如何 pipeline猎塞?

回想上一章里面 cartesian(otherRDD) 里面復(fù)雜的 NarrowDependency,圖示如下:

Cartesian.png

經(jīng)過算法劃分后結(jié)果如下:

cartesianPipeline.png

圖中粗箭頭展示了第一個(gè) ResultTask杠纵,其他的 task 依此類推荠耽。由于該 stage 的 task 直接輸出 result,所以這個(gè)圖包含 6 個(gè) ResultTasks比藻。與 OneToOneDependency 不同的是這里每個(gè) ResultTask 需要計(jì)算 3 個(gè) RDD铝量,讀取兩個(gè) data block,而整個(gè)讀取和計(jì)算這三個(gè) RDD 的過程在一個(gè) task 里面完成银亲。當(dāng)計(jì)算 CartesianRDD 中的 partition 時(shí)慢叨,需要從兩個(gè) RDD 獲取 records,由于都在一個(gè) task 里面群凶,不需要 shuffle插爹。這個(gè)圖說明:不管是 1:1 還是 N:1 的 NarrowDependency,只要是 NarrowDependency chain,就可以進(jìn)行 pipeline赠尾,生成的 task 個(gè)數(shù)與該 stage 最后一個(gè) RDD 的 partition 個(gè)數(shù)相同力穗。

物理圖的執(zhí)行

生成了 stage 和 task 以后,下一個(gè)問題就是 task 如何執(zhí)行來生成最后的 result气嫁?

回到 ComplexJob 的物理執(zhí)行圖当窗,如果按照 MapReduce 的邏輯,從前到后執(zhí)行寸宵,map() 產(chǎn)生中間數(shù)據(jù) map outpus崖面,經(jīng)過 partition 后放到本地磁盤。再經(jīng)過 shuffle-sort-aggregate 后生成 reduce inputs梯影,最后 reduce() 執(zhí)行得到 result巫员。執(zhí)行流程如下:

MapReduce

整個(gè)執(zhí)行流程沒有問題,但不能直接套用在 Spark 的物理執(zhí)行圖上甲棍,因?yàn)?MapReduce 的流程圖簡單简识、固定,而且沒有 pipeline感猛。

回想 pipeline 的思想是 數(shù)據(jù)用的時(shí)候再算七扰,而且數(shù)據(jù)是流到要計(jì)算的位置的。Result 產(chǎn)生的地方的就是要計(jì)算的位置陪白,要確定 “需要計(jì)算的數(shù)據(jù)”颈走,我們可以從后往前推,需要哪個(gè) partition 就計(jì)算哪個(gè) partition咱士,如果 partition 里面沒有數(shù)據(jù)立由,就繼續(xù)向前推,形成 computing chain序厉。這樣推下去拆吆,結(jié)果就是:需要首先計(jì)算出每個(gè) stage 最左邊的 RDD 中的某些 partition。

對于沒有 parent stage 的 stage脂矫,該 stage 最左邊的 RDD 是可以立即計(jì)算的枣耀,而且每計(jì)算出一個(gè) record 后便可以流入 f 或 g(見前面圖中的 patterns)。如果 f 中的 record 關(guān)系是 1:1 的庭再,那么 f(record1) 計(jì)算結(jié)果可以立即順著 computing chain 流入 g 中捞奕。如果 f 的 record 關(guān)系是 N:1,record1 進(jìn)入 f() 后也可以被回收拄轻÷В總結(jié)一下,computing chain 從后到前建立恨搓,而實(shí)際計(jì)算出的數(shù)據(jù)從前到后流動院促,而且計(jì)算出的第一個(gè) record 流動到不能再流動后筏养,再計(jì)算下一個(gè) record。這樣常拓,雖然是要計(jì)算后續(xù) RDD 的 partition 中的 records渐溶,但并不是要求當(dāng)前 RDD 的 partition 中所有 records 計(jì)算得到后再整體向后流動。

對于有 parent stage 的 stage弄抬,先等著所有 parent stages 中 final RDD 中數(shù)據(jù)計(jì)算好茎辐,然后經(jīng)過 shuffle 后,問題就又回到了計(jì)算 “沒有 parent stage 的 stage”掂恕。

代碼實(shí)現(xiàn):每個(gè) RDD 包含的 getDependency() 負(fù)責(zé)確立 RDD 的數(shù)據(jù)依賴拖陆,compute() 方法負(fù)責(zé)接收 parent RDDs 或者 data block 流入的 records,進(jìn)行計(jì)算懊亡,然后輸出 record依啰。經(jīng)常可以在 RDD 中看到這樣的代碼firstParent[T].iterator(split, context).map(f)店枣。firstParent 表示該 RDD 依賴的第一個(gè) parent RDD孔飒,iterator() 表示 parentRDD 中的 records 是一個(gè)一個(gè)流入該 RDD 的,map(f) 表示每流入一個(gè) recod 就對其進(jìn)行 f(record) 操作艰争,輸出 record。為了統(tǒng)一接口桂对,這段 compute() 仍然返回一個(gè) iterator甩卓,來迭代 map(f) 輸出的 records。

總結(jié)一下:整個(gè) computing chain 根據(jù)數(shù)據(jù)依賴關(guān)系自后向前建立蕉斜,遇到 ShuffleDependency 后形成 stage逾柿。在每個(gè) stage 中,每個(gè) RDD 中的 compute() 調(diào)用 parentRDD.iter() 來將 parent RDDs 中的 records 一個(gè)個(gè) fetch 過來宅此。

如果要自己設(shè)計(jì)一個(gè) RDD机错,那么需要注意的是 compute() 只負(fù)責(zé)定義 parent RDDs => output records 的計(jì)算邏輯,具體依賴哪些 parent RDDs 由 getDependency() 定義父腕,具體依賴 parent RDD 中的哪些 partitions 由 dependency.getParents() 定義弱匪。

例如,在 CartesianRDD 中璧亮,

 // RDD x = (RDD a).cartesian(RDD b)
 // 定義 RDD x 應(yīng)該包含多少個(gè) partition萧诫,每個(gè) partition 是什么類型
 override def getPartitions: Array[Partition] = {
    // create the cross product split
    val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
    for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
      val idx = s1.index * numPartitionsInRdd2 + s2.index
      array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
    }
    array
  }

  // 定義 RDD x 中的每個(gè) partition 怎么計(jì)算得到
  override def compute(split: Partition, context: TaskContext) = {
    val currSplit = split.asInstanceOf[CartesianPartition]
    // s1 表示 RDD x 中的 partition 依賴 RDD a 中的 partitions(這里只依賴一個(gè))
    // s2 表示 RDD x 中的 partition 依賴 RDD b 中的 partitions(這里只依賴一個(gè))
    for (x <- rdd1.iterator(currSplit.s1, context);
         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
  }

  // 定義 RDD x 中的 partition i 依賴于哪些 RDD 中的哪些 partitions
  //
  // 這里 RDD x 依賴于 RDD a,同時(shí)依賴于 RDD b枝嘶,都是 NarrowDependency
  // 對于第一個(gè)依賴帘饶,RDD x 中的 partition i 依賴于 RDD a 中的
  //    第 List(i / numPartitionsInRdd2) 個(gè) partition
  // 對于第二個(gè)依賴,RDD x 中的 partition i 依賴于 RDD b 中的
  //    第 List(id % numPartitionsInRdd2) 個(gè) partition
  override def getDependencies: Seq[Dependency[_]] = List(
    new NarrowDependency(rdd1) {
      def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
    },
    new NarrowDependency(rdd2) {
      def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
    }
  )

生成 job

前面介紹了邏輯和物理執(zhí)行圖的生成原理群扶,那么及刻,怎么觸發(fā) job 的生成镀裤?已經(jīng)介紹了 task,那么 job 是什么缴饭?

下表列出了可以觸發(fā)執(zhí)行圖生成的典型 action()暑劝,其中第二列是 processPartition(),定義如何計(jì)算 partition 中的 records 得到 result茴扁。第三列是 resultHandler()铃岔,定義如何對從各個(gè) partition 收集來的 results 進(jìn)行計(jì)算來得到最終結(jié)果。

Action finalRDD(records) => result compute(results)
reduce(func) (record1, record2) => result, (result, record i) => result (result1, result 2) => result, (result, result i) => result
collect() Array[records] => result Array[result]
count() count(records) => result sum(result)
foreach(f) f(records) => result Array[result]
take(n) record (i<=n) => result Array[result]
first() record 1 => result Array[result]
takeSample() selected records => result Array[result]
takeOrdered(n, [ordering]) TopN(records) => result TopN(results)
saveAsHadoopFile(path) records => write(records) null
countByKey() (K, V) => Map(K, count(K)) (Map, Map) => Map(K, count(K))

用戶的 driver 程序中一旦出現(xiàn) action()峭火,就會生成一個(gè) job毁习,比如 foreach() 會調(diào)用sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f)),向 DAGScheduler 提交 job卖丸。如果 driver 程序后面還有 action()纺且,那么其他 action() 也會生成 job 提交。所以稍浆,driver 有多少個(gè) action()载碌,就會生成多少個(gè) job。這就是 Spark 稱 driver 程序?yàn)?application(可能包含多個(gè) job)而不是 job 的原因衅枫。

每一個(gè) job 包含 n 個(gè) stage嫁艇,最后一個(gè) stage 產(chǎn)生 result。比如弦撩,第一章的 GroupByTest 例子中存在兩個(gè) job步咪,一共產(chǎn)生了兩組 result。在提交 job 過程中益楼,DAGScheduler 會首先劃分 stage猾漫,然后先提交無 parent stage 的 stages,并在提交過程中確定該 stage 的 task 個(gè)數(shù)及類型感凤,并提交具體的 task悯周。無 parent stage 的 stage 提交完后,依賴該 stage 的 stage 才能夠提交陪竿。從 stage 和 task 的執(zhí)行角度來講禽翼,一個(gè) stage 的 parent stages 執(zhí)行完后,該 stage 才能執(zhí)行族跛。

提交 job 的實(shí)現(xiàn)細(xì)節(jié)

下面簡單分析下 job 的生成和提交代碼捐康,提交過程在 Architecture 那一章也會有圖文并茂的分析:

  1. rdd.action() 會調(diào)用 DAGScheduler.runJob(rdd, processPartition, resultHandler) 來生成 job。
  2. runJob() 會首先通過rdd.getPartitions()來得到 finalRDD 中應(yīng)該存在的 partition 的個(gè)數(shù)和類型:Array[Partition]庸蔼。然后根據(jù) partition 個(gè)數(shù) new 出來將來要持有 result 的數(shù)組 Array[Result](partitions.size)解总。
  3. 最后調(diào)用 DAGScheduler 的runJob(rdd, cleanedFunc, partitions, allowLocal, resultHandler)來提交 job。cleanedFunc 是 processParittion 經(jīng)過閉包清理后的結(jié)果姐仅,這樣可以被序列化后傳遞給不同節(jié)點(diǎn)的 task花枫。
  4. DAGScheduler 的 runJob 繼續(xù)調(diào)用submitJob(rdd, func, partitions, allowLocal, resultHandler) 來提交 job刻盐。
  5. submitJob() 首先得到一個(gè) jobId,然后再次包裝 func劳翰,向 DAGSchedulerEventProcessActor 發(fā)送 JobSubmitted 信息敦锌,該 actor 收到信息后進(jìn)一步調(diào)用dagScheduler.handleJobSubmitted()來處理提交的 job。之所以這么麻煩佳簸,是為了符合事件驅(qū)動模型乙墙。
  6. handleJobSubmmitted() 首先調(diào)用 finalStage = newStage() 來劃分 stage,然后submitStage(finalStage)生均。由于 finalStage 可能有 parent stages听想,實(shí)際先提交 parent stages,等到他們執(zhí)行完马胧,finalStage 需要再次提交執(zhí)行汉买。再次提交由 handleJobSubmmitted() 最后的 submitWaitingStages() 負(fù)責(zé)。

分析一下 newStage() 如何劃分 stage:

  1. 該方法在 new Stage() 的時(shí)候會調(diào)用 finalRDD 的 getParentStages()佩脊。
  2. getParentStages() 從 finalRDD 出發(fā)蛙粘,反向 visit 邏輯執(zhí)行圖,遇到 NarrowDependency 就將依賴的 RDD 加入到 stage威彰,遇到 ShuffleDependency 切開 stage出牧,并遞歸到 ShuffleDepedency 依賴的 stage。
  3. 一個(gè) ShuffleMapStage(不是最后形成 result 的 stage)形成后歇盼,會將該 stage 最后一個(gè) RDD 注冊到MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)舔痕,這一步很重要,因?yàn)?shuffle 過程需要 MapOutputTrackerMaster 來指示 ShuffleMapTask 輸出數(shù)據(jù)的位置旺遮。

分析一下 submitStage(stage) 如何提交 stage 和 task:

  1. 先確定該 stage 的 missingParentStages,使用getMissingParentStages(stage)盈咳。如果 parentStages 都可能已經(jīng)執(zhí)行過了耿眉,那么就為空了。
  2. 如果 missingParentStages 不為空鱼响,那么先遞歸提交 missing 的 parent stages鸣剪,并將自己加入到 waitingStages 里面,等到 parent stages 執(zhí)行結(jié)束后丈积,會觸發(fā)提交 waitingStages 里面的 stage筐骇。
  3. 如果 missingParentStages 為空,說明該 stage 可以立即執(zhí)行江滨,那么就調(diào)用submitMissingTasks(stage, jobId)來生成和提交具體的 task铛纬。如果 stage 是 ShuffleMapStage,那么 new 出來與該 stage 最后一個(gè) RDD 的 partition 數(shù)相同的 ShuffleMapTasks唬滑。如果 stage 是 ResultStage告唆,那么 new 出來與 stage 最后一個(gè) RDD 的 partition 個(gè)數(shù)相同的 ResultTasks棺弊。一個(gè) stage 里面的 task 組成一個(gè) TaskSet,最后調(diào)用taskScheduler.submitTasks(taskSet)來提交一整個(gè) taskSet擒悬。
  4. 這個(gè) taskScheduler 類型是 TaskSchedulerImpl模她,在 submitTasks() 里面,每一個(gè) taskSet 被包裝成 manager: TaskSetMananger懂牧,然后交給schedulableBuilder.addTaskSetManager(manager)侈净。schedulableBuilder 可以是 FIFOSchedulableBuilder 或者 FairSchedulableBuilder 調(diào)度器。submitTasks() 最后一步是通知backend.reviveOffers()去執(zhí)行 task僧凤,backend 的類型是 SchedulerBackend畜侦。如果在集群上運(yùn)行,那么這個(gè) backend 類型是 SparkDeploySchedulerBackend拼弃。
  5. SparkDeploySchedulerBackend 是 CoarseGrainedSchedulerBackend 的子類夏伊,backend.reviveOffers()其實(shí)是向 DriverActor 發(fā)送 ReviveOffers 信息。SparkDeploySchedulerBackend 在 start() 的時(shí)候吻氧,會啟動 DriverActor溺忧。DriverActor 收到 ReviveOffers 消息后,會調(diào)用launchTasks(scheduler.resourceOffers(Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) 來 launch tasks盯孙。scheduler 就是 TaskSchedulerImpl鲁森。scheduler.resourceOffers()從 FIFO 或者 Fair 調(diào)度器那里獲得排序后的 TaskSetManager,并經(jīng)過TaskSchedulerImpl.resourceOffer()振惰,考慮 locality 等因素來確定 task 的全部信息 TaskDescription歌溉。調(diào)度細(xì)節(jié)這里暫不討論。
  6. DriverActor 中的 launchTasks() 將每個(gè) task 序列化骑晶,如果序列化大小不超過 Akka 的 akkaFrameSize痛垛,那么直接將 task 送到 executor 那里執(zhí)行executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))

Discussion

至此桶蛔,我們討論了:

  • driver 程序如何觸發(fā) job 的生成
  • 如何從邏輯執(zhí)行圖得到物理執(zhí)行圖
  • pipeline 思想與實(shí)現(xiàn)
  • 生成與提交 job 的實(shí)際代碼

還有很多地方?jīng)]有深入討論匙头,如:

  • 連接 stage 的 shuffle 過程
  • task 運(yùn)行過程及運(yùn)行位置

下一章重點(diǎn)討論 shuffle 過程。

從邏輯執(zhí)行圖的建立仔雷,到將其轉(zhuǎn)換成物理執(zhí)行圖的過程很經(jīng)典蹂析,過程中的 dependency 劃分,pipeline碟婆,stage 分割电抚,task 生成 都是有條不紊,有理有據(jù)的竖共。

ComplexJob 的源代碼

package internals

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.HashPartitioner


object complexJob {
  def main(args: Array[String]) {

    val sc = new SparkContext("local", "ComplexJob test")
    
    val data1 = Array[(Int, Char)](
      (1, 'a'), (2, 'b'),
      (3, 'c'), (4, 'd'),
      (5, 'e'), (3, 'f'),
      (2, 'g'), (1, 'h'))
    val rangePairs1 = sc.parallelize(data1, 3)
    
    val hashPairs1 = rangePairs1.partitionBy(new HashPartitioner(3))

    
    val data2 = Array[(Int, String)]((1, "A"), (2, "B"),
      (3, "C"), (4, "D"))

    val pairs2 = sc.parallelize(data2, 2)
    val rangePairs2 = pairs2.map(x => (x._1, x._2.charAt(0)))

    
    val data3 = Array[(Int, Char)]((1, 'X'), (2, 'Y'))
    val rangePairs3 = sc.parallelize(data3, 2)

    
    val rangePairs = rangePairs2.union(rangePairs3)

    
    val result = hashPairs1.join(rangePairs)

    result.foreachWith(i => i)((x, i) => println("[result " + i + "] " + x))

    println(result.toDebugString)
  }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蝙叛,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子公给,更是在濱河造成了極大的恐慌甥温,老刑警劉巖锻煌,帶你破解...
    沈念sama閱讀 221,576評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異姻蚓,居然都是意外死亡宋梧,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評論 3 399
  • 文/潘曉璐 我一進(jìn)店門狰挡,熙熙樓的掌柜王于貴愁眉苦臉地迎上來捂龄,“玉大人,你說我怎么就攤上這事加叁【氩祝” “怎么了?”我有些...
    開封第一講書人閱讀 168,017評論 0 360
  • 文/不壞的土叔 我叫張陵它匕,是天一觀的道長展融。 經(jīng)常有香客問我,道長豫柬,這世上最難降的妖魔是什么告希? 我笑而不...
    開封第一講書人閱讀 59,626評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮烧给,結(jié)果婚禮上燕偶,老公的妹妹穿的比我還像新娘。我一直安慰自己础嫡,他們只是感情好指么,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著榴鼎,像睡著了一般伯诬。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上巫财,一...
    開封第一講書人閱讀 52,255評論 1 308
  • 那天盗似,我揣著相機(jī)與錄音,去河邊找鬼翁涤。 笑死桥言,一個(gè)胖子當(dāng)著我的面吹牛萌踱,可吹牛的內(nèi)容都是我干的葵礼。 我是一名探鬼主播,決...
    沈念sama閱讀 40,825評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼并鸵,長吁一口氣:“原來是場噩夢啊……” “哼鸳粉!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起园担,我...
    開封第一講書人閱讀 39,729評論 0 276
  • 序言:老撾萬榮一對情侶失蹤届谈,失蹤者是張志新(化名)和其女友劉穎枯夜,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體艰山,經(jīng)...
    沈念sama閱讀 46,271評論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡湖雹,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了曙搬。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片摔吏。...
    茶點(diǎn)故事閱讀 40,498評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖纵装,靈堂內(nèi)的尸體忽然破棺而出征讲,到底是詐尸還是另有隱情,我是刑警寧澤橡娄,帶...
    沈念sama閱讀 36,183評論 5 350
  • 正文 年R本政府宣布诗箍,位于F島的核電站,受9級特大地震影響挽唉,放射性物質(zhì)發(fā)生泄漏滤祖。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評論 3 333
  • 文/蒙蒙 一橱夭、第九天 我趴在偏房一處隱蔽的房頂上張望氨距。 院中可真熱鬧,春花似錦棘劣、人聲如沸俏让。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽首昔。三九已至,卻和暖如春糙俗,著一層夾襖步出監(jiān)牢的瞬間勒奇,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評論 1 272
  • 我被黑心中介騙來泰國打工巧骚, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留赊颠,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,906評論 3 376
  • 正文 我出身青樓劈彪,卻偏偏與公主長得像竣蹦,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子沧奴,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評論 2 359

推薦閱讀更多精彩內(nèi)容