Spark架構師3-spark初始化和任務啟動源碼

1、上次總結 spark初始化環(huán)境資源 0:18:00~ 0:41:00

1牙言、Spark RPC(Endpoint:DriverEndpoint ClientEndpoint)
2览濒、利用 akka(endpoint類似于actor) 模擬實現(xiàn) YARN(Flink 就是基于 akka實現(xiàn)的 RPC)
3、Spark Standalone 集群啟動腳本start-all.sh 分析
4、Master 啟動分析
5、Worker 啟動分析
6领猾、Spark APP 提交腳本 spark-submit 腳本分析
7、SparkSubmit 分析(重點是進入main方法骇扇,通過反射的方式運行用戶編寫的application的主類main方法)

8瘤运、SparkContext 初始化

1.1 SparkContext 初始化 分析 0:10~ 0:41

Spark任務執(zhí)行流程分析.jpg

示例程序demo sparkPI
val spark: SparkSession = SparkSession.builder.appName("Spark Pi").getOrCreate()

注: 上圖粉紅色半邊 ,7件事

0:18:16 代碼開始

 ——》SparkSession#Builder#getOrCreate()
                val sparkContext: SparkContext = userSuppliedContext.getOrElse{
                     SparkContext.getOrCreate(sparkConf)
         ——》 SparkContext#getOrCreate()
                   setActiveContext(new SparkContext(config), allowMultipleContexts = false)                
                   ——》SparkContext類塊#try
                            //以下為類級代碼塊 
                           try {
                                       _conf = config.clone()
                                       _conf.validateSettings()
 ############# 注釋:第一步   創(chuàng)建Spark Env############         
                                * 
                               *  除了創(chuàng)建 sparkEnv之外匠题,還創(chuàng)建了各種 manager 對象。                                
                                */ Create the Spark execution environment (cache, map output tracker, etc)
                                   _env = createSparkEnv(_conf, isLocal, listenerBus)
                          
                                ——》SparkEnv#createDriverEnv
                                    ——》SparkEnv#create
                                               //注釋: 初始化 SecurityManager
                                          val securityManager = new SecurityManager(conf, ioEncryptionKey)                                              
                                                   //注釋: 初始化 NettyRpcEnv
                                           val systemName = if (isDriver) driverSystemName  
                                           val rpcEnv = RpcEnv.create(systemName, 
                                                       bindAddress, advertiseAddress, 
                                                        port.getOrElse(-1), conf, securityManager,
                                                         numUsableCores, !isDriver)
                                         //注釋: 初始化 SerializerManager   
        val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)      
                                    //注釋: 初始化 BroadcastManager      
        val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
                                //注釋: 初始化 MapOutputTracker 
        val mapOutputTracker = if (isDriver) {
            new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
                               //注釋: 初始化 SortShuffleManager    
        val shortShuffleMgrNames = Map("sort" ->  
                               // 注釋: 初始化 UnifiedMemoryManager  統(tǒng)一內(nèi)存管理模型
                                // StaticMemoryManaager  靜態(tài)內(nèi)存管理模型
        val memoryManager: MemoryManager = if (useLegacyMemoryManager) 
                                //注釋: 初始化 BlockManagerMaster 
        val blockManagerMaster = new BlockManagerMaster(
                                //注釋: 初始化 BlockManager 
        val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,  
                               //注釋: 初始化 OutputCommitCoordinator 
        val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse 
                               //注釋: 正式初始化SparkEnv
        val envInstance = new SparkEnv(executorId, rpcEnv, serializer, closureSerializer,
         return   envInstance

 ############# 注釋:第二步   創(chuàng)建SparkUI############         
/**   注釋:第二步
         *  創(chuàng)建并初始化Spark UI
         */
        _ui = if (conf.getBoolean("spark.ui.enabled", true)) {
            // TODO_MA 注釋:_jobProgressListener跟蹤要在UI中顯示的任務級別信息但金,startTime就是SparkContext的初始時的系統(tǒng)時間
            // TODO_MA 注釋:返回SparkUI韭山,它的父類是WebUI,和MasterWebUI是一個級別的
            Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime))
        } else {
            // For tests, do not enable the UI
            None
        }    
 ############# 注釋:第三步   hadoop相關配置以及Executor環(huán)境變量############    
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
if (jars != null) {
            jars.foreach(addJar)
        }
        
        if (files != null) {
            files.foreach(addFile)
        }

 executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
        executorEnvs ++= _conf.getExecutorEnv
        executorEnvs("SPARK_USER") = sparkUser

 ############# 注釋: 第四步:創(chuàng)建心跳接收器 ############
            *  1冷溃、我們需要在“createTaskScheduler”之前注冊“HeartbeatReceiver”钱磅,因為Executor將在構造函數(shù)中檢索“HeartbeatReceiver”
         *  2、創(chuàng)建一個HeartbeatReceiver 的RpcEndpoint注冊到RpcEnv中似枕,每分鐘給自己發(fā)送ExpireDeadHosts盖淡,去檢測Executor是否存在心跳,
         *  3凿歼、如果當前時間減去最一次心跳時間褪迟,大于1分鐘冗恨,就會用CoarseGrainedSchedulerBackend將Executor殺死
         */
        
        _heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
 

 ############# 注釋:第五步:創(chuàng)建任務調(diào)度TaskScheduler############           

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
        _taskScheduler = ts

 ############# 注釋:第六步:創(chuàng)建和啟動DAGScheduler############      
     
 /*  *  1、內(nèi)部初始化了一個:DAGSchedulerEventProcessLoop 用來處理各種任務
         *  2味赃、在 DAGSchedulerEventProcessLoop 創(chuàng)建的時候掀抹,構造函數(shù)的內(nèi)部的最后一句代碼執(zhí)行了 DAGSchedulerEventProcessLoop的啟動。
         *  將來任務的提交心俗,取消等傲武,都會發(fā)送一個事件給 DAGSchedulerEventProcessLoop
         *  從而觸發(fā) dagScheduler.onRecevie() 的運行。
         */
        _dagScheduler = new DAGScheduler(this)
        _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)



 ############# 注釋:第七步:TaskScheduler的啟動城榛,backend.start()############  
 /** :第七步:TaskScheduler的啟動揪利,主要任務:backend.start()
         */
        
        _taskScheduler.start()
                                       ↓
        TaskSchedulerImpl#start()
                  .......
                               backend.start()
                                                    ↓
StandaloneSchedulerBackend.start()
                                                     ▼
/** :調(diào)用父類方法 start() 方法啟動一個 DriverEndPoint
         *   super 粗粒度的 CoarseGrainedSchedulerBackend
         */
                           super.start()        
                                            ↓
                CoarseGrainedSchedulerBackend.start()
                                          ▼
                              /**  創(chuàng)建一個 DriverEndPoint 負責跟 master 打交道的         
                             driverEndpoint = createDriverEndpointRef(properties)

                          /**  注釋:里面維護了一個 DriverEndPoint 主要用來向 Executor分發(fā)任務 
        client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
        
        /*    注釋:client start  啟動了 ClientEndpoint 
                        client.start()

            ——》 StandaloneAppClient.start()
                     endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
                            ——》 ClientEndpoint#Onstart()方法
                                    /** 注釋:clientEndPoint 向 Master 執(zhí)行注冊 
                                       registerWithMaster(1)
                                                      ▼
                                       registerMasterFutures.set(tryRegisterAllMasters())
                                    ——》tryRegisterAllMasters()
                                                      ▼
                             /** 注釋:創(chuàng)建了一個 Master 的 RPC 代理
                         */
                        val masterRef = rpcEnv.setupEndpointRef(masterAddress,  
                        
                        /**
                         *   注釋:注冊
                         */
                        masterRef.send(RegisterApplication(appDescription, self))
                  ——》RegisterApplication()
                                            ↓
                                Master#類 receive 方法 
                                         ...................
                                     case RegisterApplication(description, driver) => 
                                          ...................
                                     case RegisterWorker
                                                schedule()
                                                    launchDriver(worker, driver)    啟動driver
                                              ——》startExecutorsOnWorkers() 
                                                                      ▼
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
                    allocateWorkerResourceToExecutors(app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
                }
                                                  ——》allocateWorkerResourceToExecutors
                                                       ——》launchExecutor(worker, exec)
                                                                                  ▼
                                                           /* 注釋:發(fā)送命令讓 worker 啟動 executor
         *   worker 節(jié)點的一個 RPC 節(jié)點,負責通信的狠持。
         */
        worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
        
        / 注釋:發(fā)消息告訴 Driver 該 worker 上的 executor 已經(jīng)啟動
         */
        exec.application.driver.send(ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))          
                                                                           ↓
                                                       Worker類  receive 方法
                                                                      ▼            
/* 注釋: 接收到 Master 發(fā)送過來的啟動 Executor 的命令
     */
    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => 
                
/* 注釋:把啟動Executor必要的一些信息疟位,封裝在 ExeuctorRunner 中
             */
            val manager = new ExecutorRunner(appId, execId, appDesc.copy(command =
                 /* : 啟動好了 Executor 之后,返回給 Master一個信號工坊。
             *   信息封裝在 ExecutorStateChanged 對象中献汗。
             */
            sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))

           /* 啟動 executor
             *   worker.send(ExecutorStateChanged)
             */
            manager.start()
                          ↓
   ——》ExecutorRunner#start
              ——》fetchAndRunExecutor()
                     /* :構建jvm  進程啟動命令  */
            process = builder.start()
            val header = "Spark Executor Command: %s\n%s\n\n".format(
                           ↓   跳轉(zhuǎn)到Executor類構造方法
            Executor # 
                        // 實際上是構建了一個用于執(zhí)行task的線程池
    private val threadPool = {
        val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Executor task launch worker-%d")
            .setThreadFactory(new ThreadFactory {

2、 本次內(nèi)容概述 spark 任務提交 分析 0:41:00 ~

1王污、Spark Application 提交流程分析
2罢吃、Spark Application 的 DAG 生成和 Stage 切分分析
3、Spark 的 Task 分發(fā)和執(zhí)行源碼分析
4昭齐、Spark 的 Shuffle 機制源碼分析

2.1 Spark Application 提交流程分析 0:43 ~0:58

入口:spark application 中的 action 算子D蛘小(SparkPi 程序中的 reduce 函數(shù))
以 SparkPi 程序舉例:reduce() 算子就是提交 job 的入口

reduce()

sc.runJob
——》SparkContext#runJob
——》DAGScheduler#runJob

/*
* 1、應用程序調(diào)用action算子
* 2阱驾、sparkcontext就谜。runjob
* 3、dagscheduler里覆。runjob
* 4丧荐、taskscheduler。submittasks
* 5喧枷、schedulerbackend虹统。driverEndpoint 提交任務
/
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
——》DAGScheduler#runJob

/
* 注釋: 提交任務
* 參數(shù)解析:
* 1、rdd:要在其上運行任務的參數(shù)RDD目標RDD
* 2隧甚、func:在RDD的每個分區(qū)上運行的函數(shù)
* 3车荔、partitions:要運行的分區(qū)的集;某些作業(yè)可能不希望在目標RDD的所有分區(qū)上進行計算戚扳,例如忧便,對于 first() 之類的操作。
* 4帽借、callSite:在用戶程序中調(diào)用此作業(yè)的位置
* 5珠增、resultHandler:回調(diào)函數(shù)超歌,以將每個分區(qū)結果傳遞給Xxx
* 6、properties:要附加到此作業(yè)的scheduler屬性切平,例如fair scheduler pool name
*/
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

——》DAGScheduler#runJob

/**
* 第一步:封裝一個JobWaiter對象握础;
* 第二步:將JobWaiter對象賦值給JobSubmitted的listener屬性,
* 并將JobSubmitted(DAGSchedulerEvent事件)對象傳遞給eventProcessLoop事件循環(huán)處理器悴品。eventProcessLoop
* 內(nèi)部事件消息處理線程將會接收JobSubmitted事件禀综,并調(diào)用dagScheduler.handleJobSubmitted(...)方法來處理事件;
* 第三步:返回JobWaiter對象苔严。
*/
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)

    /**         
     *   注釋:這是提交任務運行
     *   eventProcessLoop 就是當初 DAGScheduler 在初始化的時候定枷,創(chuàng)建的一個 DAGSchedulerEventProcessLoop
     *   這個組件主要負責:任務的提交執(zhí)行
     */
    eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))

——》 eventProcessLoop 構造函數(shù) DAGSchedulerEventProcessLoop
——》EventLoop#post

   ——》DAGSchedulerEventProcessLoop#onRecive   
        ——》DAGSchedulerEventProcessLoop#doOnRecive
          ——》DAGSchedulerEventProcessLoop#handleJobSubmitted 

從此,任務的提交就交給了 dagScheduler#handleJobSubmitted 方法

/* 注釋: RDD DAG劃分Stages:Stage的劃分是從最后一個Stage開始逆推的届氢,

  • 每遇到一個寬依賴處欠窒,就分裂成另外一個Stage

    • 依此類推直到Stage劃分完畢為止。并且退子,只有最后一個Stage的類型是ResultStage類型岖妄。
    • 注意Dataset、DataFrame寂祥、sparkSession.sql("select ...")
    • 經(jīng)過catalyst代碼解析會將代碼轉(zhuǎn)化為RDD
  • 做了2件最主要的事

  • 1荐虐、stage切分

  • 2、 stage 提交
    /
    /
    注釋: Stage 切分
    * 這個 finalRDD 就是 rdd鏈條中的最后一個 RDD丸凭,也就是觸發(fā) sc.runJob() 方法執(zhí)行的 RDD
    */

         finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
     /* 
      *   注釋: 提交 Stage
      */
     submitStage(finalStage)
    

2.2 Spark Application 的 DAG 生成和 Stage 切分分析 0:58~ 1:46

入口:EventLoop 中的 eventQueue.take() 方法

如果任務提交衙传,則有 JobSubmitted 事件提交到 eventQueue 中撞叨,則 eventQueue.take() 阻塞返回驾凶,此時的 event 就是 JobSubmitted马昨。
根據(jù)事件機制,跳轉(zhuǎn)到:DAGScheduler.handleJobSubmitted() 方法
根據(jù) driver 發(fā)送過來的 事件類型虽界,來決定到底做什么汽烦!

兩個核心的方法:

// stage切分入口
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
// 提交stage執(zhí)行入口
submitStage(finalStage)

方法依賴關系:
1、createResultStage(傳入finalRDD獲得ResultStage) ->2
2莉御、getOrCreateParentStages(傳入rdd獲得父stage) ->3->4
3刹缝、getShuffleDependencies(傳入rdd獲得寬依賴)
4、getOrCreateShuffleMapStage(傳入寬依賴獲得ShuffleMapStage) ->5->6
5颈将、getMissingAncestorShuffleDependencies(傳入一個rdd獲得所有寬依賴) ->3
6、createShuffleMapStage(傳入寬依賴獲得ShuffleMapStage) ->2

image.png

image.png

RDD任務切分中間分為:Application言疗、Job晴圾、Stage 和 Task
1、Application:初始化一個 SparkContext 即生成一個 Application噪奄;
2死姚、Job:一個 Action 算子就會生成一個 Job人乓;
3、Stage:Stage 等于寬依賴的個數(shù)加 1都毒;
4色罚、Task:一個 Stage 階段中,最后一個 RDD 的分區(qū)個數(shù)就是 Task 的個數(shù)账劲。
注意:Application->Job->Stage->Task每一層都是1對n的關系

dagScheduler#handleJobSubmitted 主方法

一 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)


// TODO_MA 注釋:獲取當前Stage的parent Stage戳护,這個方法是劃分Stage的核心實現(xiàn)
val parents = getOrCreateParentStages(rdd, jobId)

image.png

image.png


1 getShuffleDependencies(rdd).map { shuffleDep => 2 getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList

                  1 ——》DAGScheduler#getShuffleDependencies   
                                               ▼
              /**  重點方法  找RDD依賴鏈 
              * Returns shuffle dependencies that are immediate parents of the given RDD.
              * This function will not return more distant ancestors.
              * For example, if C has a shuffle dependency on B which has a shuffle dependency on A:
               * A <-- B <-- C
                 * calling this function with rdd C will only return the B <-- C dependency.
               * This function is scheduler-visible for the purpose of unit testing.
                * TODO_  采用的是深度優(yōu)先遍歷找到Action算子的父依賴中的寬依賴
                  */
        
                         2 ——》   getOrCreateShuffleMapStage
               /* TODO_MA 如果shuffleIdToMapStage中存在shuffle,則獲取shuffle map stage瀑焦。
                  *  否則腌且,如果shuffle map stage不存在,該方法將創(chuàng)建shuffle map stage
                  *   以及任何丟失的parent shuffle map stage榛瓮。
   ***************************************

二 铺董、 /* 注釋: 遞歸提交 Stage /
submitStage(finalStage)
——>submitMissingTasks()

/
注釋: 把 stage 變成 Tasks
* Step3: 為每個需要計算的partiton生成一個task
*/
val tasks: Seq[Task[_]] = try {
//如果是 ShuffleMapStage 階段的 Task,則構建 ShuffleMapTask
case stage: ShuffleMapStage => stage.pendingPartitions.clear()
.............
//如果是 ResultStage 階段的 Task禀晓,則構建 ResultTask
case stage: ResultStage => partitionsToCompute.map

   /* 注釋: 如果該階段有 Task 需要執(zhí)行
     *   Step4: 提交tasks
     */
    if (tasks.size > 0) {          
           //注釋: taskScheduler 的具體類型是:TaskSchedulerImpl
          taskScheduler.submitTasks(new TaskSet(tasks.toArray, 

2.3 Spark Task 分發(fā)和執(zhí)行分析 2:13 ~ 2:45

入口接上面的: taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id,
taskScheduler 的具體類型是:TaskSchedulerImpl

TaskSchedulerImpl#submitTasks 方法

/** 注釋:TaskScheduler提交job精续,最后交由 SchedulerBackEnd 進行提交
*/
backend.reviveOffers()

CoarseGrainedSchedulerBackend.reviveOffers()

// 給 DriverEndpoint (自己 )發(fā)送 ReviveOffers 消息
driverEndpoint.send(ReviveOffers)

CoarseGrainedSchedulerBackend#DriverEndpoint#receive

case ReviveOffers => makeOffers()

scheduler.resourceOffers(workOffers) //申請計算資源
.........
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)

// 注釋: 發(fā)送 LaunchTask 消息給:CoarseGrainedExecutorBackend 類
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

CoarseGrainedExecutorBackend# receive()
▼ //data 是任務的反序列化
case LaunchTask(data) => if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else
{ ....... executor.launchTask(this, taskDesc)
}

Executor#launchTask

// TODO_MA 注釋: 封裝一個 TaskRunner 線程對象,來運行一個 Task
val tr = new TaskRunner(context, taskDescription)

  • 注釋: 提交到線程池運行粹懒,那么就轉(zhuǎn)到 TaskRunner 的 run() 方法
    */
    threadPool.execute(tr)
    ↓ 具體運行一個 Task 的地方
    Executor#TaskRunner#run()

    val value = Utils.tryWithSafeFinally {
    val res = task.run(taskAttemptId = taskId,

    Task#run()//

    runTask(context)// 兩種task 重付, 一種是shuffer ,一種是result
    ↓shuffer
    1. ShuffleMapTask#runTask()

writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get


2.3 Spark Suffle 源碼分析 2:45~ 3:05
HashShuffleManager的運行原理 2:45~ 2:52
SortShuffleManager運行原理 2:52 ~3:13

                                       ↓   有4個writer 崎淳; 以SortShuffleWriter為例
               SortShuffleWriter#write
                                      ▼ 
                *   注釋: 先排序 
    sorter = if (dep.mapSideCombine) {
                插入數(shù)據(jù)到排序區(qū)
    sorter.insertAll(records)          

val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)

     val tmp = Utils.tempFileWith(output)

val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)



                                              ↓     Result
                       2.     ResultMapTask#runTask()
                                               ▼ 
image.png
image.png
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末堪夭,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子拣凹,更是在濱河造成了極大的恐慌森爽,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件嚣镜,死亡現(xiàn)場離奇詭異爬迟,居然都是意外死亡,警方通過查閱死者的電腦和手機菊匿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進店門付呕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人跌捆,你說我怎么就攤上這事徽职。” “怎么了佩厚?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵姆钉,是天一觀的道長。 經(jīng)常有香客問我,道長潮瓶,這世上最難降的妖魔是什么陶冷? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮毯辅,結果婚禮上埂伦,老公的妹妹穿的比我還像新娘。我一直安慰自己思恐,他們只是感情好沾谜,可當我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著壁袄,像睡著了一般类早。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上嗜逻,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天涩僻,我揣著相機與錄音,去河邊找鬼栈顷。 笑死逆日,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的萄凤。 我是一名探鬼主播室抽,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼靡努!你這毒婦竟也來了坪圾?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤惑朦,失蹤者是張志新(化名)和其女友劉穎兽泄,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體漾月,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡病梢,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了梁肿。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蜓陌。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖吩蔑,靈堂內(nèi)的尸體忽然破棺而出钮热,到底是詐尸還是另有隱情,我是刑警寧澤烛芬,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布隧期,位于F島的核電站痴奏,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏厌秒。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一擅憔、第九天 我趴在偏房一處隱蔽的房頂上張望鸵闪。 院中可真熱鬧,春花似錦暑诸、人聲如沸蚌讼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽篡石。三九已至,卻和暖如春西采,著一層夾襖步出監(jiān)牢的瞬間凰萨,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工械馆, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留胖眷,地道東北人。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓霹崎,卻偏偏與公主長得像珊搀,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子尾菇,可洞房花燭夜當晚...
    茶點故事閱讀 45,044評論 2 355