Spark內(nèi)置框架rpc通訊機(jī)制及RpcEnv基礎(chǔ)設(shè)施-Spark商業(yè)環(huán)境實(shí)戰(zhàn)

版權(quán)聲明:本套技術(shù)專欄是作者(秦凱新)平時(shí)工作的總結(jié)和升華漠畜,通過從真實(shí)商業(yè)環(huán)境抽取案例進(jìn)行總結(jié)和分享粤铭,并給出商業(yè)應(yīng)用的調(diào)優(yōu)建議和集群環(huán)境容量規(guī)劃等內(nèi)容渡八,請持續(xù)關(guān)注本套博客迂求。版權(quán)聲明:禁止轉(zhuǎn)載,歡迎學(xué)習(xí)欢嘿。

Spark商業(yè)環(huán)境實(shí)戰(zhàn)及調(diào)優(yōu)進(jìn)階系列

1. Spark 內(nèi)置框架rpc通訊機(jī)制

TransportContext 內(nèi)部握有創(chuàng)建TransPortClient和TransPortServer的方法實(shí)現(xiàn)衰琐,但卻屬于最底層的RPC通訊設(shè)施。為什么呢炼蹦?

因?yàn)槌蓡T變量RPCHandler是抽象的羡宙,并沒有具體的消息處理,而且TransportContext功能也在于創(chuàng)建TransPortClient客戶端和TransPortServer服務(wù)端掐隐。具體解釋如下:

 Contains the context to create a {@link TransportServer}, {@link TransportClientFactory}, and to
 setup Netty Channel pipelines with a
 {@link org.apache.spark.network.server.TransportChannelHandler}.

所以TransportContext只能為最底層的通訊基礎(chǔ)狗热。上層為NettyRPCEnv高層封裝,并持有TransportContext引用虑省,在TransportContext中傳入NettyRpcHandler實(shí)體匿刮,來實(shí)現(xiàn)netty通訊回調(diào)Handler處理。TransportContext代碼片段如下:

 /* The TransportServer and TransportClientFactory both create a TransportChannelHandler for each
 * channel. As each TransportChannelHandler contains a TransportClient, this enables server
 * processes to send messages back to the client on an existing channel.
 */
  public class TransportContext {
  private final Logger logger = LoggerFactory.getLogger(TransportContext.class);
  private final TransportConf conf;
  private final RpcHandler rpcHandler;
  private final boolean closeIdleConnections;

  private final MessageEncoder encoder;
  private final MessageDecoder decoder;

  public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
    this(conf, rpcHandler, false);
  }

1.1 客戶端和服務(wù)端統(tǒng)一的消息接收處理器 TransportChannelHandlerer

TransportClient 和TransportServer 在配置Netty的pipeLine的handler處理器時(shí)探颈,均采用TransportChannelHandler, 來做統(tǒng)一的消息receive處理熟丸。為什么呢?在于統(tǒng)一消息處理入口伪节,TransportChannelHandlerer根據(jù)消息類型執(zhí)行不同的處理光羞,代碼片段如下:

 public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
    if (request instanceof RequestMessage) {
      requestHandler.handle((RequestMessage) request);
   } else if (request instanceof ResponseMessage) {
      responseHandler.handle((ResponseMessage) request);
   } else {
      ctx.fireChannelRead(request);
   }

}

TransportContext初始化Pipeline的代碼片段:

  public TransportChannelHandler initializePipeline(
  SocketChannel channel,
  RpcHandler channelRpcHandler) {
  try {
    
  TransportChannelHandler channelHandler = createChannelHandler(channel,
  
  channelRpcHandler);
  channel.pipeline()
    .addLast("encoder", ENCODER)
    .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
    .addLast("decoder", DECODER)
    .addLast("idleStateHandler", new IdleStateHandler(0, 0,   
                   conf.connectionTimeoutMs() / 1000))
                   
    .addLast("handler", channelHandler);
    
  return channelHandler;
} catch (RuntimeException e) {
  logger.error("Error while initializing Netty pipeline", e);
  throw e;
}

客戶端和服務(wù)端統(tǒng)一的消息接收處理器 TransportChannelHandlerer 是這個(gè)函數(shù):createChannelHandler(channel, channelRpcHandler)實(shí)現(xiàn)的绩鸣,也即統(tǒng)一了這個(gè)netty的消息接受處理,代碼片段如下:

    /**
    * Creates the server- and client-side handler which is used to handle both RequestMessages and
    * ResponseMessages. The channel is expected to have been successfully created, though certain
    * properties (such as the remoteAddress()) may not be available yet.
    */
    
    private TransportChannelHandler createChannelHandler(Channel channel,                                    RpcHandler rpcHandler) {
    
    TransportResponseHandler responseHandler = new                     
    TransportResponseHandler(channel);
    TransportClient client = new TransportClient(channel, responseHandler);
    
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
    rpcHandler, conf.maxChunksBeingTransferred());
    
    return new TransportChannelHandler(client, responseHandler, requestHandler,
        conf.connectionTimeoutMs(), closeIdleConnections);
    }

不過transportClient對應(yīng)的是TransportResponseHander纱兑,TransportServer對應(yīng)的的是TransportRequestHander呀闻。
在進(jìn)行消息處理時(shí),首先會(huì)經(jīng)過TransportChannelHandler根據(jù)消息類型進(jìn)行處理器選擇潜慎,分別進(jìn)行netty的消息生命周期管理:

  • exceptionCaught
  • channelActive
  • channelInactive
  • channelRead
  • userEventTriggered

1.2 transportClient對應(yīng)的是ResponseMessage

客戶端一旦發(fā)送消息(均為Request消息)捡多,就會(huì)在

private final Map<Long, RpcResponseCallback> outstandingRpcs;

private final Map<StreamChunkId, ChunkReceivedCallback> outstandingFetches

中緩存,用于回調(diào)處理勘纯。

image

1.3 transportServer對應(yīng)的是RequestMessage

服務(wù)端接收消息類型(均為Request消息)

  • ChunkFetchRequest
  • RpcRequest
  • OneWayMessage
  • StremRequest

服務(wù)端響應(yīng)類型(均為Response消息):

  • ChunkFetchSucess
  • ChunkFetchFailure
  • RpcResponse
  • RpcFailure

2. Spark RpcEnv基礎(chǔ)設(shè)施

2.1 上層建筑NettyRPCEnv

上層建筑NettyRPCEnv局服,持有TransportContext引用钓瞭,在TransportContext中傳入NettyRpcHandler實(shí)體驳遵,來實(shí)現(xiàn)netty通訊回調(diào)Handler處理

  • Dispatcher
  • TransportContext
  • TransPortClientFactroy
  • TransportServer
  • TransportConf

2.2 RpcEndPoint 與 RPCEndPointRef 端點(diǎn)

  • RpcEndPoint 為服務(wù)端
  • RPCEndPointRef 為客戶端

2.2 Dispacher 與 Inbox 與 Outbox

  • 一個(gè)端點(diǎn)對應(yīng)一個(gè)Dispacher,一個(gè)Inbox , 多個(gè)OutBox
  1. RpcEndpoint:RPC端點(diǎn) 山涡,Spark針對于每個(gè)節(jié)點(diǎn)(Client/Master/Worker)都稱之一個(gè)Rpc端點(diǎn) ,且都實(shí)現(xiàn)RpcEndpoint接口堤结,內(nèi)部根據(jù)不同端點(diǎn)的需求,設(shè)計(jì)不同的消息和不同的業(yè)務(wù)處理鸭丛,如果需要發(fā)送(詢問)則調(diào)用Dispatcher
  2. RpcEnv:RPC上下文環(huán)境竞穷,每個(gè)Rpc端點(diǎn)運(yùn)行時(shí)依賴的上下文環(huán)境稱之為RpcEnv
  3. Dispatcher:消息分發(fā)器,針對于RPC端點(diǎn)需要發(fā)送消息或者從遠(yuǎn)程RPC接收到的消息鳞溉,分發(fā)至對應(yīng)的指令收件箱/發(fā)件箱瘾带。如果指令接收方是自己存入收件箱,如果指令接收方為非自身端點(diǎn)熟菲,則放入發(fā)件箱
  4. Inbox:指令消息收件箱看政,一個(gè)本地端點(diǎn)對應(yīng)一個(gè)收件箱,Dispatcher在每次向Inbox存入消息時(shí)抄罕,都將對應(yīng)EndpointData加入內(nèi)部待Receiver Queue中允蚣,另外Dispatcher創(chuàng)建時(shí)會(huì)啟動(dòng)一個(gè)單獨(dú)線程進(jìn)行輪詢Receiver Queue,進(jìn)行收件箱消息消費(fèi)
  5. OutBox:指令消息發(fā)件箱呆贿,一個(gè)遠(yuǎn)程端點(diǎn)對應(yīng)一個(gè)發(fā)件箱嚷兔,當(dāng)消息放入Outbox后,緊接著將消息通過TransportClient發(fā)送出去做入。消息放入發(fā)件箱以及發(fā)送過程是在同一個(gè)線程中進(jìn)行冒晰,這樣做的主要原因是遠(yuǎn)程消息分為RpcOutboxMessage, OneWayOutboxMessage兩種消息,而針對于需要應(yīng)答的消息直接發(fā)送且需要得到結(jié)果進(jìn)行處理
  6. TransportClient:Netty通信客戶端竟块,根據(jù)OutBox消息的receiver信息壶运,請求對應(yīng)遠(yuǎn)程TransportServer
  7. TransportServer:Netty通信服務(wù)端,一個(gè)RPC端點(diǎn)一個(gè)TransportServer,接受遠(yuǎn)程消息后調(diào)用Dispatcher分發(fā)消息至對應(yīng)收發(fā)件箱
image

Spark在Endpoint的設(shè)計(jì)上核心設(shè)計(jì)即為Inbox與Outbox彩郊,其中Inbox核心要點(diǎn)為:

  1. 內(nèi)部的處理流程拆分為多個(gè)消息指令(InboxMessage)存放入Inbox
  2. 當(dāng)Dispatcher啟動(dòng)最后前弯,會(huì)啟動(dòng)一個(gè)名為【dispatcher-event-loop】的線程掃描Inbox待處理InboxMessage蚪缀,并調(diào)用Endpoint根據(jù)InboxMessage類型做相應(yīng)處理
  3. 當(dāng)Dispatcher啟動(dòng)最后,默認(rèn)會(huì)向Inbox存入OnStart類型的InboxMessage恕出,Endpoint在根據(jù)OnStart指令做相關(guān)的額外啟動(dòng)工作询枚,端點(diǎn)啟動(dòng)后所有的工作都是對OnStart指令處理衍生出來的,因此可以說OnStart指令是相互通信的源頭浙巫。
  • 注意: 一個(gè)端點(diǎn)對應(yīng)一個(gè)Dispacher金蜀,一個(gè)Inbox , 多個(gè)OutBox,可以看到 inbox在Dispacher 中且在EndPointData內(nèi)部:

     private final RpcHandler rpcHandler;
    /**
    * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
    */
     private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
     private class EndpointData(
        val name: String,
        val endpoint: RpcEndpoint,
        val ref: NettyRpcEndpointRef) {
      val inbox = new Inbox(ref, endpoint)
    }
    private val endpoints = new ConcurrentHashMap[String, EndpointData]
    private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
    
    // Track the receivers whose inboxes may contain messages.
    private val receivers = new LinkedBlockingQueue[EndpointData]
    
image
  • 注意: 一個(gè)端點(diǎn)對應(yīng)一個(gè)Dispacher的畴,一個(gè)Inbox , 多個(gè)OutBox渊抄,可以看到 OutBox在NettyRpcEnv內(nèi)部:

    private[netty] class NettyRpcEnv(
      val conf: SparkConf,
      javaSerializerInstance: JavaSerializerInstance,
      host: String,
      securityManager: SecurityManager) extends RpcEnv(conf) with Logging {
      
      private val dispatcher: Dispatcher = new Dispatcher(this)
      
      private val streamManager = new NettyStreamManager(this)
      private val transportContext = new TransportContext(transportConf,
      new NettyRpcHandler(dispatcher, this, streamManager))
      
    /**
     * A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]],
     * we just put messages to its [[Outbox]] to implement a non-blocking `send` method.
     */
    private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
    

2.3 Dispacher 與 Inbox 與 Outbox

Dispatcher的代碼片段中,包含了核心的消息發(fā)送代碼邏輯丧裁,意思是:向服務(wù)端發(fā)送一條消息护桦,也即同時(shí)放進(jìn)Dispatcher中的receiverrs中,也放進(jìn)inbox的messages中煎娇。這個(gè)高層封裝二庵,如Master和Worker端點(diǎn)發(fā)送消息都是通過NettyRpcEnv中的 Dispatcher來實(shí)現(xiàn)的。在Dispatcher中有一個(gè)線程缓呛,叫做MessageLoop催享,實(shí)現(xiàn)消息的及時(shí)處理。

 /**
 * Posts a message to a specific endpoint.
 *
 * @param endpointName name of the endpoint.
 * @param message the message to post
  * @param callbackIfStopped callback function if the endpoint is stopped.
 */
 private def postMessage(
  endpointName: String,
  message: InboxMessage,
  callbackIfStopped: (Exception) => Unit): Unit = {
   val error = synchronized {
   val data = endpoints.get(endpointName)
   
  if (stopped) {
    Some(new RpcEnvStoppedException())
  } else if (data == null) {
    Some(new SparkException(s"Could not find $endpointName."))
  } else {
  
    data.inbox.post(message)
    receivers.offer(data)
    
    None
  }
 }

注意:默認(rèn)第一條消息為onstart,為什么呢哟绊?看這里:

image
image

看到下面的 new EndpointData(name, endpoint, endpointRef) 了嗎因妙?

def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
 val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
  if (stopped) {
    throw new IllegalStateException("RpcEnv has been stopped")
  }
  if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
    throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
  }
  val data = endpoints.get(name)
  endpointRefs.put(data.endpoint, data.ref)
  receivers.offer(data)  // for the OnStart message
}
endpointRef

}

注意EndpointData里面包含了inbox,因此Inbox初始化的時(shí)候票髓,放進(jìn)了onstart

 private class EndpointData(
  val name: String,
  val endpoint: RpcEndpoint,
  val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)

}

onstart在Inbox初始化時(shí)出現(xiàn)了攀涵,注意每一個(gè)端點(diǎn)只有一個(gè)inbox,比如:master 節(jié)點(diǎn)炬称。


image

2.4 發(fā)送消息流程為分為兩種汁果,一種端點(diǎn)(Master)自己把消息發(fā)送到本地Inbox,一種端點(diǎn)(Master)接收到消息后玲躯,通過TransPortRequestHander接收后處理据德,扔進(jìn)Inbox

2.4.1 端點(diǎn)(Master)自己把消息發(fā)送到本地Inbox
- endpoint(Master) -> NettyRpcEnv-> Dispatcher ->  postMessage -> MessageLoop(Dispatcher) -> inbox -> process -> endpoint.receiveAndReply

解釋如下:端點(diǎn)通過自己的RPCEnv環(huán)境,向自己的Inbox中發(fā)送消息跷车,然后交由Dispatch來進(jìn)行消息的處理棘利,調(diào)用了端點(diǎn)自己的receiveAndReply方法

  • 這里著重講一下MessageLoop是什么時(shí)候啟動(dòng)的,參照Dispatcher的代碼段如下朽缴,一旦初始化就會(huì)啟動(dòng),因?yàn)槭浅蓡T變量:

      private val threadpool: ThreadPoolExecutor = {
      val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
        math.max(2, Runtime.getRuntime.availableProcessors()))
      val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
      for (i <- 0 until numThreads) {
        pool.execute(new MessageLoop)
      }
       pool
     }
    
  • 接著講nettyRpcEnv是何時(shí)初始化的善玫,Dispatcher是何時(shí)初始化的?

master初始化RpcEnv環(huán)境時(shí)密强,調(diào)用NettyRpcEnvFactory().create(config)進(jìn)行初始化nettyRpcEnv茅郎,然后其成員變量Dispatcher開始初始化蜗元,然后Dispatcher內(nèi)部成員變量threadpool開始啟動(dòng)messageLoop,然后開始處理消息,可謂是一環(huán)套一環(huán)啊系冗。如下是Master端點(diǎn)初始化RPCEnv奕扣。


image

在NettyRpcEnv中,NettyRpcEnvFactory的create方法如下:

image

其中nettyRpcEnv.startServer掌敬,代碼段如下惯豆,然后調(diào)用底層 transportContext.createServer來創(chuàng)建Server,并初始化netty 的 pipeline:

    server = transportContext.createServer(host, port, bootstraps)
    dispatcher.registerRpcEndpoint(
     RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))

最終端點(diǎn)開始不斷向自己的Inboxz中發(fā)送消息即可,代碼段如下:

    private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
      error = synchronized {
      val data = endpoints.get(endpointName)
      if (stopped) {
           Some(new RpcEnvStoppedException())
      } else if (data == null) {
          Some(new SparkException(s"Could not find $endpointName."))
      } else {
      
         data.inbox.post(message)
         receivers.offer(data)
         
         None
      }
    }
2.4.2 端點(diǎn)(Master)接收到消息后奔害,通過TransPortRequestHander接收后處理楷兽,扔進(jìn)Inbox
- endpointRef(Worker) ->TransportChannelHandler -> channelRead0 -> TransPortRequestHander -> handle -> processRpcRequest ->NettyRpcHandler(在NettyRpcEnv中)  -> receive ->  internalReceive -> dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress)) (響應(yīng))-> dispatcher.postRemoteMessage(messageToDispatch, callback) (發(fā)送遠(yuǎn)端來的消息放進(jìn)inbox)-> postMessage -> inbox -> process

如下圖展示了整個(gè)消息接收到inbox的流程:


image

下圖展示了 TransportChannelHandler接收消息:

    @Override
 public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
 if (request instanceof RequestMessage) {
  requestHandler.handle((RequestMessage) request);
} else {
  responseHandler.handle((ResponseMessage) request);
}
 }

然后TransPortRequestHander來進(jìn)行消息匹配處理:

image

最終交給inbox的process方法,實(shí)際上由端點(diǎn) endpoint.receiveAndReply(context)方法處理:

 /**
 * Process stored messages.
 */
 def process(dispatcher: Dispatcher): Unit = {
  var message: InboxMessage = null
    inbox.synchronized {
  if (!enableConcurrent && numActiveThreads != 0) {
    return
  }
  message = messages.poll()
  if (message != null) {
    numActiveThreads += 1
  } else {
    return
  }
}
while (true) {
  safelyCall(endpoint) {
    message match {
      case RpcMessage(_sender, content, context) =>
        try {
          endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
            throw new SparkException(s"Unsupported message $message from ${_sender}")
          })
        } catch {
          case NonFatal(e) =>
            context.sendFailure(e)
            // Throw the exception -- this exception will be caught by the safelyCall function.
            // The endpoint's onError function will be called.
            throw e
        }

      case OneWayMessage(_sender, content) =>
        endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
          throw new SparkException(s"Unsupported message $message from ${_sender}")
        })

      case OnStart =>
        endpoint.onStart()
        if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
          inbox.synchronized {
            if (!stopped) {
              enableConcurrent = true
            }
          }
        }

      case OnStop =>
        val activeThreads = inbox.synchronized { inbox.numActiveThreads }
        assert(activeThreads == 1,
          s"There should be only a single active thread but found $activeThreads threads.")
        dispatcher.removeRpcEndpointRef(endpoint)
        endpoint.onStop()
        assert(isEmpty, "OnStop should be the last message")

      case RemoteProcessConnected(remoteAddress) =>
        endpoint.onConnected(remoteAddress)

      case RemoteProcessDisconnected(remoteAddress) =>
        endpoint.onDisconnected(remoteAddress)

      case RemoteProcessConnectionError(cause, remoteAddress) =>
        endpoint.onNetworkError(cause, remoteAddress)
    }
  }

  inbox.synchronized {
    // "enableConcurrent" will be set to false after `onStop` is called, so we should check it
    // every time.
    if (!enableConcurrent && numActiveThreads != 1) {
      // If we are not the only one worker, exit
      numActiveThreads -= 1
      return
    }
    message = messages.poll()
    if (message == null) {
      numActiveThreads -= 1
      return
    }
  }
}

}

3 結(jié)語

本文花了將近兩天時(shí)間進(jìn)行剖析Spark的 Rpc 工作原理华临,真是不容易芯杀,關(guān)鍵是你看懂了嗎?歡迎評論

秦凱新 于深圳 2018-10-28

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末银舱,一起剝皮案震驚了整個(gè)濱河市瘪匿,隨后出現(xiàn)的幾起案子跛梗,更是在濱河造成了極大的恐慌寻馏,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,817評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件核偿,死亡現(xiàn)場離奇詭異诚欠,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)漾岳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,329評論 3 385
  • 文/潘曉璐 我一進(jìn)店門轰绵,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人尼荆,你說我怎么就攤上這事左腔。” “怎么了捅儒?”我有些...
    開封第一講書人閱讀 157,354評論 0 348
  • 文/不壞的土叔 我叫張陵液样,是天一觀的道長。 經(jīng)常有香客問我巧还,道長鞭莽,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,498評論 1 284
  • 正文 為了忘掉前任麸祷,我火速辦了婚禮澎怒,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘阶牍。我一直安慰自己喷面,他們只是感情好星瘾,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,600評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著惧辈,像睡著了一般死相。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上咬像,一...
    開封第一講書人閱讀 49,829評論 1 290
  • 那天算撮,我揣著相機(jī)與錄音,去河邊找鬼县昂。 笑死肮柜,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的倒彰。 我是一名探鬼主播审洞,決...
    沈念sama閱讀 38,979評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼待讳!你這毒婦竟也來了芒澜?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,722評論 0 266
  • 序言:老撾萬榮一對情侶失蹤创淡,失蹤者是張志新(化名)和其女友劉穎痴晦,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體琳彩,經(jīng)...
    沈念sama閱讀 44,189評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡誊酌,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,519評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了露乏。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片碧浊。...
    茶點(diǎn)故事閱讀 38,654評論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖瘟仿,靈堂內(nèi)的尸體忽然破棺而出箱锐,到底是詐尸還是另有隱情,我是刑警寧澤劳较,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布驹止,位于F島的核電站,受9級特大地震影響兴想,放射性物質(zhì)發(fā)生泄漏幢哨。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,940評論 3 313
  • 文/蒙蒙 一嫂便、第九天 我趴在偏房一處隱蔽的房頂上張望捞镰。 院中可真熱鬧,春花似錦、人聲如沸岸售。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,762評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽凸丸。三九已至拷邢,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間屎慢,已是汗流浹背瞭稼。 一陣腳步聲響...
    開封第一講書人閱讀 31,993評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留腻惠,地道東北人环肘。 一個(gè)月前我還...
    沈念sama閱讀 46,382評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像集灌,于是被迫代替她去往敵國和親悔雹。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,543評論 2 349

推薦閱讀更多精彩內(nèi)容