Spark源碼分析之Master的啟動流程

準備

本文主要對Master的啟動流程源碼進行分析。Spark源碼版本為2.3.1德撬。

閱讀源碼首先從啟動腳本入手铲咨,看看首先加載的是哪個類,我們看一下start-master.sh啟動腳本中的具體內(nèi)容蜓洪。

腳本代碼

可以看到這里加載的類是org.apache.spark.deploy.master.Master纤勒,好那我們的源碼尋覓之旅就從這開始...

源碼分析

打開源碼,我們發(fā)現(xiàn)Master是伴生關(guān)系的一組類隆檀,我們直接定位到Master的main函數(shù)

//主方法
  def main(argStrings: Array[String]) {
    Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
      exitOnUncaughtException = false))
    Utils.initDaemon(log)
    val conf = new SparkConf
    val args = new MasterArguments(argStrings, conf)
    /**
      * 創(chuàng)建RPC 環(huán)境和Endpoint (RPC 遠程過程調(diào)用),在Spark中 Driver摇天, Master ,Worker角色都有各自的Endpoint恐仑,相當于各自的通信郵箱泉坐。
      *
      */
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    rpcEnv.awaitTermination()
  }

發(fā)現(xiàn)該函數(shù)除了做了一些配置文件和args參數(shù)的準備之外,調(diào)用了startRpcEnvAndEndpoint函數(shù)裳仆,我們跟進去看看

/**
   * Start the Master and return a three tuple of:
   *   (1) The Master RpcEnv
   *   (2) The web UI bound port
   *   (3) The REST server bound port, if any
   */
  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)

    /**
      * 創(chuàng)建RPC(Remote Procedure Call )環(huán)境  ,Remote Procedure Call
      * 這里只是創(chuàng)建準備好Rpc的環(huán)境腕让,后面會向RpcEnv中注冊 角色【Driver,Master,Worker,Executor】
      */
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
   .....
  }

首先上面這段代碼通過RpcEnv 構(gòu)建了一個RPC通信環(huán)境,為之后的RPC通信做準備歧斟,Spark底層的通信是基于Netty的NIO模型记某,
這里每個Rpc端點運行時依賴的上下文環(huán)境稱之為RpcEnv。

接下來我們直接跟蹤到create方法中

def create(
      name: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      numUsableCores: Int,
      clientMode: Boolean): RpcEnv = {
    val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
      numUsableCores, clientMode)
    //創(chuàng)建NettyRpc 環(huán)境
    new NettyRpcEnvFactory().create(config)
  }

可以看到构捡,這里構(gòu)建了NettyRpcEnvFactory來創(chuàng)建NettyRpc 環(huán)境,NettyRpcEnvFactory的create函數(shù)又做了那些事呢?

def create(config: RpcEnvConfig): RpcEnv = {
    val sparkConf = config.conf
    // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
    // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
    val javaSerializerInstance =
      new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
    /**
      * 創(chuàng)建nettyRPC通信環(huán)境壳猜。
      * 當new NettyRpcEnv時會做一些初始化:
      *   Dispatcher:這個對象中有存放消息的隊列和消息的轉(zhuǎn)發(fā)
      *   TransportContext:可以創(chuàng)建了NettyRpcHandler
      */
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager, config.numUsableCores)
    if (!config.clientMode) {
      //啟動nettyRPCEnv
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      try {
        //以上  startNettyRpcEnv 匿名函數(shù)在此處會最終被調(diào)用勾徽,當匿名函數(shù)被調(diào)用時,重點方法是483行 nettyEnv.startServer 方法
        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
      } catch {
        case NonFatal(e) =>
          nettyEnv.shutdown()
          throw e
      }
    }
    nettyEnv
  }

可以看到上面首先構(gòu)建了一個序列化的實例對象统扳,然后開始著手構(gòu)架nettyRPC通信環(huán)境喘帚。在new NettyRpcEnv()時會做一些初始化的工作畅姊,如下所示

 /**
    * dispatcher 這個對象中有消息隊列和消息的循環(huán)獲取轉(zhuǎn)發(fā)
    */
  private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

 /**
    * TransportContext 中會創(chuàng)建 NettyPpcHandler
    * TransportContext 這個對象中參數(shù)類型 RpcHandler  就是這里的 NettyRpcHandler
    */
  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))

這里的Dispatcher是一個消息分發(fā)器,針對于RPC端點需要發(fā)送消息或者從遠程RPC接收到的消息吹由,分發(fā)至對應(yīng)的指令收件箱/發(fā)件箱若未。

這里的transportContext 是構(gòu)建傳輸?shù)纳舷挛沫h(huán)境,用于創(chuàng)建TransportServer和TransportClientFactory同時倾鲫,通過TransportChannelHandler來設(shè)置Netty的Channel pipelines粗合。

下面我們回到NettyRpcEnvFactory的create方法

val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager, config.numUsableCores)
    if (!config.clientMode) {
      //啟動nettyRPCEnv
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      try {
        //以上  startNettyRpcEnv 匿名函數(shù)在此處會最終被調(diào)用,當匿名函數(shù)被調(diào)用時乌昔,重點方法是483行 nettyEnv.startServer 方法
        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
      } catch {
        case NonFatal(e) =>
          nettyEnv.shutdown()
          throw e
      }
    }

nettyEnv 準備好了之后隙疚,會構(gòu)建一個函數(shù)startNettyRpcEnv,然后startServiceOnPort在會調(diào)用startNettyRpcEnv磕道,進而調(diào)用nettyEnv的startServer函數(shù)供屉,來啟動Server

//啟動Rpc 服務(wù)
  def startServer(bindAddress: String, port: Int): Unit = {
    val bootstraps: java.util.List[TransportServerBootstrap] =
      if (securityManager.isAuthenticationEnabled()) {
        java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
      } else {
        java.util.Collections.emptyList()
      }

    /**
      * transportContext已經(jīng)被創(chuàng)建,這里createServer 就會綁定地址和端口溺蕉,啟動Netty Rpc 服務(wù)
      */
    server = transportContext.createServer(bindAddress, port, bootstraps)
    dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
  }

由于直接初始化時transportContext就已經(jīng)被創(chuàng)建伶丐,這里createServer 就會綁定地址和端口,啟動Netty Rpc 服務(wù)疯特,createServer最在構(gòu)建TransportServer實例時哗魂,會調(diào)用init初始化方法,在這里以前了解過Netty的同學就會非常熟悉了.

private void init(String hostToBind, int portToBind) {

    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    EventLoopGroup bossGroup =
      NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
    EventLoopGroup workerGroup = bossGroup;

    PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
      conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());

    bootstrap = new ServerBootstrap()
      .group(bossGroup, workerGroup)
      .channel(NettyUtils.getServerChannelClass(ioMode))
      .option(ChannelOption.ALLOCATOR, allocator)
      .childOption(ChannelOption.ALLOCATOR, allocator);

    this.metrics = new NettyMemoryMetrics(
      allocator, conf.getModuleName() + "-server", conf);

    if (conf.backLog() > 0) {
      bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
    }

    if (conf.receiveBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
    }

    if (conf.sendBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
    }

    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) {
        RpcHandler rpcHandler = appRpcHandler;
        for (TransportServerBootstrap bootstrap : bootstraps) {
          rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
        }
        //初始化網(wǎng)絡(luò)通信管道
        context.initializePipeline(ch, rpcHandler);
      }
    });

    InetSocketAddress address = hostToBind == null ?
        new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
    channelFuture = bootstrap.bind(address);
    channelFuture.syncUninterruptibly();

    port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
    logger.debug("Shuffle server started on port: {}", port);
}

init 方法就是去初始化一個綁定的主機和端口辙芍,創(chuàng)建nettyRPC通信啡彬,通過之前的rpcHandler來初始化網(wǎng)絡(luò)通信管道,同時會調(diào)用createChannelHandler函數(shù)故硅,創(chuàng)建處理消息的 channelHandler用于處理客戶端請求消息和服務(wù)端回應(yīng)消息

private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
    TransportClient client = new TransportClient(channel, responseHandler);
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
      rpcHandler, conf.maxChunksBeingTransferred());
    /**
     *   由以上  responseHandler   client    requestHandler  三個handler構(gòu)建 TransportChannelHandler
     *   new TransportChannelHandler 這個對象中有 【channelRead() 方法】,用于讀取接收到的消息
     */
    return new TransportChannelHandler(client, responseHandler, requestHandler,
      conf.connectionTimeoutMs(), closeIdleConnections);
  }

最終Rpc的環(huán)境就準備好了庶灿,后面會向RpcEnv中注冊 角色 Driver,Master,Worker,Executor。我們回到Master的startRpcEnvAndEndpoint函數(shù)

 /**
      * 向RpcEnv 中 注冊Master
      *
      * rpcEnv.setupEndpoint(name,new Master)
      * 這里new Master 的Master 是一個伴生類吃衅,繼承了 ThreadSafeRpcEndpoint往踢,歸根結(jié)底繼承到了 Trait 接口  RpcEndpoint
    val masterEndpoint: RpcEndpointRef = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)

這里會向RpcEnv中注冊Master,調(diào)用了setupEndpoint函數(shù)徘层,傳入了Master的實例對象峻呕,這里的Master 是一個伴生類,繼承了 ThreadSafeRpcEndpoint趣效,也是RpcEndpoint的實現(xiàn)類瘦癌,而什么是RpcEndpoint?

RpcEndpoint:RPC端點 跷敬,Spark針對于每個節(jié)點(Client/Master/Worker)都稱之一個Rpc端點 ,且都實現(xiàn)RpcEndpoint接口讯私,內(nèi)部根據(jù)不同端點的需求,設(shè)計不同的消息和不同的業(yè)務(wù)處理,如果需要發(fā)送(詢問)則調(diào)用Dispatcher斤寇。

EndPoint中存在

  • onstart() :啟動當前Endpoint
  • receive() :負責收消息
  • receiveAndReply():接受消息并回復

同時Endpoint 還有各自的引用桶癣,方便其他Endpoint發(fā)送消息,直接引用對方的EndpointRef 即可找到對方的Endpoint娘锁,上面源碼中的masterEndpoint 就是Master的Endpoint引用 RpcEndpointRef 牙寞。

RpcEndpointRef中存在

  • send():發(fā)送消息
  • ask() :請求消息,并等待回應(yīng)莫秆。

接下來我們看看NettyRpcEnv中的setupEndpoint函數(shù)

override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
    dispatcher.registerRpcEndpoint(name, endpoint)
  }

上面直接調(diào)用了dispatcher實例间雀,注冊RpcEndpoint

def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
      if (stopped) {
        throw new IllegalStateException("RpcEnv has been stopped")
      }
      //這里 new EndpointData使用到了endpoint,當new Inbox 時向消息隊列中放入OnStart樣例類標識
      if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }
      //獲取剛剛封裝的EndPointData
      val data: EndpointData = endpoints.get(name)
      endpointRefs.put(data.endpoint, data.ref)

      /**
        * receivers 這個消息隊列中放著應(yīng)該去哪個Endpoint 中獲取Message 處理
        * 這里其實就是進入 Dispatcher 當前這個類中的 MessageLoop 方法馏锡。這個方法當new Dispatcher后會一直運行雷蹂。
        * 將消息放入待處理的消息隊列中,消息首先找到對應(yīng)的Endpoint ,再會獲取當前Endpoint的Inbox 中message,使用process 方法處理
        */
      receivers.offer(data)  // for the OnStart message
    }
    endpointRef
  }

上面的new EndpointData使用到了endpoint杯道,EndpointData其實就是對endpoint和endpointRef的封裝匪煌,同時內(nèi)部還構(gòu)建了Inbox

private class EndpointData(
      val name: String,
      val endpoint: RpcEndpoint,
      val ref: NettyRpcEndpointRef) {
    //將endpoint封裝到Inbox中
    val inbox = new Inbox(ref, endpoint)
  }

這里的Inbox是其實就是消息收件箱,一個本地端點對應(yīng)一個收件箱党巾,Dispatcher在每次向Inbox存入消息時萎庭,都將對應(yīng)EndpointData加入內(nèi)部待Receiver Queue中,另外Dispatcher創(chuàng)建時會啟動一個單獨線程進行輪詢Receiver Queue齿拂,進行收件箱消息消費驳规。

其實Inbox實例在構(gòu)建過程中也會有消息的存入。

private[netty] class Inbox(
    val endpointRef: NettyRpcEndpointRef,
    val endpoint: RpcEndpoint)
  extends Logging {
....
@GuardedBy("this")
  protected val messages = new java.util.LinkedList[InboxMessage]()

  // OnStart should be the first message to process
  //當注冊endpoint時都會調(diào)用這個異步方法署海,messags中放入一個OnStart樣例類消息對象
  inbox.synchronized {
    messages.add(OnStart)
  }
....

可以看到吗购,構(gòu)建inbox時會向messages中加入一個OnStart消息,該消息會被inbox類中的process所消費砸狞,只不過觸發(fā)時機還在后面捻勉。

回到Dispatcher的registerRpcEndpoint函數(shù)

      if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }
      //獲取剛剛封裝的EndPointData
      val data: EndpointData = endpoints.get(name)
      endpointRefs.put(data.endpoint, data.ref)

      /**
        * receivers 這個消息隊列中放著應(yīng)該去哪個Endpoint 中獲取Message 處理
        * 這里其實就是進入 Dispatcher 當前這個類中的 MessageLoop 方法。這個方法當new Dispatcher后會一直運行刀森。
        * 將消息放入待處理的消息隊列中踱启,消息首先找到對應(yīng)的Endpoint ,再會獲取當前Endpoint的Inbox 中message,使用process 方法處理
        */
      receivers.offer(data)  // for the OnStart message
    }
    endpointRef

這里發(fā)現(xiàn)剛才構(gòu)建的EndpointData會被放入到endpoints中,endpoints和endpointRefs的類型都是ConcurrentHashMap研底,對這兩個容器進行相應(yīng)的操作之后埠偿,就會調(diào)用receivers.offer(data)。

receivers是Dispatcher中的一個消息隊列榜晦,這個消息隊列還有一個線程池來進行消息的消費冠蒋,整個模式構(gòu)成一個生產(chǎn)者消費者模式,因此這里將data消息加入到消息隊列中乾胶,會觸發(fā)線程消費浊服。

  private val threadpool: ThreadPoolExecutor = {
    val availableCores =
      if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
    val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
      math.max(2, availableCores))
    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
    for (i <- 0 until numThreads) {
      pool.execute(new MessageLoop)
    }
    pool
  }

/** Message loop used for dispatching messages. */
  private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            //take 出來消息一直處理
            val data: EndpointData = receivers.take()
            if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              receivers.offer(PoisonPill)
              return
            }
            //調(diào)用process 方法處理消息
            data.inbox.process(Dispatcher.this)
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }

可以看到上面的MessageLoop中统屈,通過消息隊列receivers取出一條消息,該消息的類型為EndpointData牙躺,內(nèi)部都封裝了inbox實例,因此直接調(diào)用該實例的process函數(shù)去處理消息腕扶,我們之前在inbox實例構(gòu)建過程中孽拷,發(fā)現(xiàn)會向inbox內(nèi)部的消息容器中放入一條onStart消息,因此我們看一下process函數(shù)是如何處理該消息的半抱。

def process(dispatcher: Dispatcher): Unit = {

case OnStart =>
            //調(diào)用Endpoint 的onStart函數(shù)
            endpoint.onStart()
            if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
              inbox.synchronized {
                if (!stopped) {
                  enableConcurrent = true
                }
              }
            }
}

process中對onStart的處理為脓恕,調(diào)用endpoint的onStart函數(shù),而endpoint我們還記得是啥嗎窿侈,這里是我們直接構(gòu)建的Master實例炼幔,因此我們回到Master中去看看onStart()函數(shù)的處理過程。

override def onStart(): Unit = {
.....
 val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
      case "ZOOKEEPER" =>
        logInfo("Persisting recovery state to ZooKeeper")
        val zkFactory =
          new ZooKeeperRecoveryModeFactory(conf, serializer)
        (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
      case "FILESYSTEM" =>
        val fsFactory =
          new FileSystemRecoveryModeFactory(conf, serializer)
        (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
      case "CUSTOM" =>
        val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
        val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
          .newInstance(conf, serializer)
          .asInstanceOf[StandaloneRecoveryModeFactory]
        (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
      case _ =>
        (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
    }
    persistenceEngine = persistenceEngine_
    leaderElectionAgent = leaderElectionAgent_
}

onStart函數(shù)涉及到webUI的啟動史简,applicationMetricsSystem和masterMetricsSystem的啟動過程乃秀,如果設(shè)置了restServer還涉及到啟動過程。

同時Master的onStart函數(shù)的部分代碼如上所示圆兵,我么可以看到跺讯,這里涉及到如何在HA的環(huán)境下選主的過程。內(nèi)部會根據(jù)配置決定采用哪種方式殉农,是ZOOKEEPER還是文件系統(tǒng)的方式來進行刀脏。

生產(chǎn)環(huán)境下一般采用Zookeeper做HA,Zookeeper會自動化管理 Master的切換超凳;

采用Zookeeper做HA的時候愈污,Zookeeper會負責保存整個Spark集群運行時候的元數(shù)據(jù):workers、Drivers轮傍、Applications暂雹、Executors;

Zookeeper遇到當前Active級別的Master出現(xiàn)故障的時候會從StandbyMaster中選取一臺作為Active Master金麸,但是要注意擎析,被選舉后到成為真正的ActiveMaster之間需要從Zookeeper中獲取集群當前運行狀態(tài)的元數(shù)據(jù)信息并進行恢復;

在Master切換的過程中挥下,所有的已經(jīng)在運行的程序皆正常運行揍魂!因為Spark Application在運行前就已經(jīng)通過ClusterManager獲得了計算資源,所以在運行時Job本身的調(diào)度和處理和Master是沒有任何關(guān)系的棚瘟!

在Master的切換過程中唯一的影響是不能提交新的job:一方面不能提交新的應(yīng)用程序給集群现斋,因為只有ActiveMaster才能接受新的程序的提交請求;另外一方面偎蘸,已經(jīng)運行的程序中也不能夠因為Action操作觸發(fā)新的Job的提交請求庄蹋;

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末瞬内,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子限书,更是在濱河造成了極大的恐慌虫蝶,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,729評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件倦西,死亡現(xiàn)場離奇詭異能真,居然都是意外死亡,警方通過查閱死者的電腦和手機扰柠,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,226評論 3 399
  • 文/潘曉璐 我一進店門粉铐,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人卤档,你說我怎么就攤上這事蝙泼。” “怎么了劝枣?”我有些...
    開封第一講書人閱讀 169,461評論 0 362
  • 文/不壞的土叔 我叫張陵汤踏,是天一觀的道長。 經(jīng)常有香客問我哨免,道長茎活,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,135評論 1 300
  • 正文 為了忘掉前任琢唾,我火速辦了婚禮载荔,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘采桃。我一直安慰自己懒熙,他們只是感情好,可當我...
    茶點故事閱讀 69,130評論 6 398
  • 文/花漫 我一把揭開白布普办。 她就那樣靜靜地躺著工扎,像睡著了一般。 火紅的嫁衣襯著肌膚如雪衔蹲。 梳的紋絲不亂的頭發(fā)上肢娘,一...
    開封第一講書人閱讀 52,736評論 1 312
  • 那天,我揣著相機與錄音舆驶,去河邊找鬼橱健。 笑死,一個胖子當著我的面吹牛沙廉,可吹牛的內(nèi)容都是我干的拘荡。 我是一名探鬼主播,決...
    沈念sama閱讀 41,179評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼撬陵,長吁一口氣:“原來是場噩夢啊……” “哼珊皿!你這毒婦竟也來了网缝?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,124評論 0 277
  • 序言:老撾萬榮一對情侶失蹤蟋定,失蹤者是張志新(化名)和其女友劉穎粉臊,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體溢吻,經(jīng)...
    沈念sama閱讀 46,657評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡维费,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,723評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了促王。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,872評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡而晒,死狀恐怖蝇狼,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情倡怎,我是刑警寧澤迅耘,帶...
    沈念sama閱讀 36,533評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站监署,受9級特大地震影響颤专,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜钠乏,卻給世界環(huán)境...
    茶點故事閱讀 42,213評論 3 336
  • 文/蒙蒙 一栖秕、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧晓避,春花似錦簇捍、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,700評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至锅必,卻和暖如春事格,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背搞隐。 一陣腳步聲響...
    開封第一講書人閱讀 33,819評論 1 274
  • 我被黑心中介騙來泰國打工驹愚, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人尔许。 一個月前我還...
    沈念sama閱讀 49,304評論 3 379
  • 正文 我出身青樓么鹤,卻偏偏與公主長得像,于是被迫代替她去往敵國和親味廊。 傳聞我的和親對象是個殘疾皇子蒸甜,可洞房花燭夜當晚...
    茶點故事閱讀 45,876評論 2 361

推薦閱讀更多精彩內(nèi)容