Apache Spark之Rpc(上)

SparkNetty.png

0. RpcEnv

整個(gè)通信的核心哆姻,為通信構(gòu)建環(huán)境提针,啟動(dòng)server; 建立RpcEndpoint,所有RpcEndpoint(提供某類服務(wù))都需要注冊(cè)到RpcEnv; 消息路由徘公,也就是整個(gè)RpcEndpoint的通信都交給RpcEnv, 屏蔽了rpc調(diào)用與本地調(diào)用晦攒,讓上層專注endpiont的設(shè)計(jì)狐肢,通信細(xì)節(jié)全部封裝到RpcEnv弃秆。目前唯一的實(shí)現(xiàn)就是NettyRpcEnv,以Netty作為rpc的基礎(chǔ)丧诺。

NettyRpcEnv

[soark-core] org.apache.spark.rpc.netty.NettyRpcEnv

class NettyRpcEnv extends RpcEnv with Logging {
  val role //diver or executor
  val transportConf: TransportConf //spark.rpc.*
  val dispatcher: Dispatcher // 
  val streamManager: NettyStreamManager //
  val transportContext: TransportConext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))//
  val clientFactory: TransportClientFactory //
  //A separate client factory for file downloads. This avoids using the same RPC handler as
  //the main RPC context, so that events caused by these clients are kept isolated from the main RPC traffic.
  var fileDownloadFactory: TransportClientFactory //文件下載專用入桂,避免影響
  val timeoutScheduler: SchedulerExecutorService
  // Because TransportClientFactory.createClient is blocking, we need to run it in this thread pool
  // to implement non-blocking send/ask.
  val clientConnectionExecutor: ThreadPoolExecutor
  var server: TransportServer
  val outboxes: ConcurrentHashMap[RpcAddress, Outbox]
  lazy val address: RpcAddress
}

RpcEndpointRef作為通信的發(fā)起端,關(guān)聯(lián)某個(gè)RpcEndpoint锅必,通過ref進(jìn)行通信, ref用uri表示為 :

Remote: spark://{endpointName}@{ip}:{port}

Client: spark-client://{endpointName}

dispatcher就是根據(jù)不同的endpoint name進(jìn)行消息分發(fā)事格,交給對(duì)應(yīng)的endpoint進(jìn)行處理。

2. Client端的建立與通信

NettyRpcEnv在driver和executor上都會(huì)創(chuàng)建搞隐,我們按照一次請(qǐng)求來分析源碼

這里我們介紹一個(gè)executor與DriveEndpoint通信獲取SparkAppConfig的過程驹愚,此時(shí)driver端建立的TransportServer是server, executor作為client發(fā)起請(qǐng)求獲取配置信息。

DriverEndpoint是在初始化SparkContext里創(chuàng)建的劣纲。具體為CoarseGrainedSchedulerBackend的字段中構(gòu)造的

[spark-core] org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend

class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
  extends ExecutorAllocationClient with SchedulerBackend with Logging {
    ...
    //setup driverEndpoint
    val driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint())
    ...
  }

請(qǐng)求的發(fā)起

CoarseGrainedExecutorBackend作為spark的executor啟動(dòng)類逢捺。在啟動(dòng)后時(shí)需要獲取SparkAppConfig

[spark-core] org.apache.spark.executor.CoarseGrainedExecutorBackend

object CoarseGrainedExecutorBackend extends Logging {
  def main(args: Array[String]): Unit = {
    //匿名函數(shù),創(chuàng)建backend對(duì)象
    val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
      CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
      new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
        arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
        arguments.resourcesFileOpt, resourceProfile)
    }
    run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
    System.exit(0)
  }
  //
   def run(
      arguments: Arguments,
      backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
        CoarseGrainedExecutorBackend): Unit = {
     ...
     //這是一個(gè)臨時(shí)的NettyRpcEnv用于獲取driver的RpcEndpointRef
     val fetcher = RpcEnv.create(
        "driverPropsFetcher",
        arguments.bindAddress,
        arguments.hostname,
        -1,
        executorConf,
        new SecurityManager(executorConf),
        numUsableCores = 0,
        clientMode = true)
         ...
        //這里構(gòu)造一個(gè)driver的rpcEndpointRef
        // spark://CoarseGrainedScheduler@{ip}:{port}
          driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
      ...
     //通過ref進(jìn)行rpc調(diào)用獲取SparkAppConfig
      val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(arguments.resourceProfileId))
      val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId))
      fetcher.shutdown()
     ...
   }
}

driver.askSync是一個(gè)同步請(qǐng)求癞季。等待結(jié)果返回劫瞳。ref的請(qǐng)求最終都委托給了NettyRpcEnv來做處理

private[netty] def askAbortable[T: ClassTag](
      message: RequestMessage, timeout: RpcTimeout): AbortableRpcFuture[T] = {
  ...
  //此時(shí)我們?cè)趀xecutor
  //remoteAddr是diver地址不為null, address是null
  if (remoteAddr == address) {
    val p = Promise[Any]()
    p.future.onComplete {
      case Success(response) => onSuccess(response)
      case Failure(e) => onFailure(e)
    }(ThreadUtils.sameThread)
    dispatcher.postLocalMessage(message, p)
  } else {
    //注意各種消息的包裝,不同的消息包裝绷柒,在不同的層次中使用rpcOutboxMessage
    val rpcMessage = RpcOutboxMessage(message.serialize(this),
       onFailure,
      //處理返回值的回調(diào)志于,在netty Channel的Handler中調(diào)用
      (client, response) => onSuccess(deserialize[Any](client, response)))
       rpcMsg = Option(rpcMessage)
    //核心的方法,把消息加入到Outbox中
    postToOutbox(message.receiver, rpcMessage)
    ...
  }
}

請(qǐng)求消息的發(fā)送

Outbox废睦,每一個(gè)rpc地址都維護(hù)了這樣一個(gè)消息隊(duì)列伺绽,所有發(fā)送到同一個(gè)RpcAddress的消息都放到一個(gè)隊(duì)列中,等待TransportClient發(fā)送到對(duì)應(yīng)的server。

[spark-core] org.apache.spark.rpc.netty.Outbox

class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) {
  val messages = new java.util.LinkedList[OutboxMessage]
  var client: TransportClient = null
  ...
  //通過該方法觸發(fā)消息真正發(fā)送奈应。
  //方法可能被多線程調(diào)用澜掩,僅有一個(gè)線程能執(zhí)行真正的發(fā)送消息
  private def drainOutbox(): Unit = {
    var message: OutboxMessage = null
    synchronized {
      if (stopped) {
        return
      }
      //有線程在啟動(dòng)一個(gè)鏈接了,當(dāng)前線程就不需要做任何事了
      if (connectFuture != null) {
        // We are connecting to the remote address, so just exit
        return
      }
      //鏈接還沒有建立杖挣,通過提交一個(gè)后臺(tái)線程創(chuàng)建TransportClient
      //lauchConnectTask創(chuàng)建好好client后會(huì)再次調(diào)用drainOutbox肩榕,也就是當(dāng)前線程也可以不在管了。由創(chuàng)建鏈接的線程繼續(xù)往后執(zhí)行
      if (client == null) {
        // There is no connect task but client is null, so we need to launch the connect task.
        launchConnectTask()
        return
      }
      if (draining) {
        // There is some thread draining, so just exit
        return
      }
      message = messages.poll()
      if (message == null) {
        return
      }
      draining = true
    }
   //取消息惩妇,直到隊(duì)列被消費(fèi)完
    while (true) {
      try {
        val _client = synchronized { client }
        if (_client != null) {
          message.sendWith(_client)
        } else {
          assert(stopped)
        }
      } catch {
        case NonFatal(e) =>
          handleNetworkFailure(e)
          return
      }
      synchronized {
        if (stopped) {
          return
        }
        message = messages.poll()
        if (message == null) {
          draining = false
          return
        }
      }
    }
  }
}

TransportClient是對(duì)Netty Channel的封裝株汉,所以調(diào)用message.sendWith(_client),就進(jìn)入了Netty發(fā)送消息的范圍了屿附。

TransportClient是通過TransportClientFactory進(jìn)行創(chuàng)建的,TransportClientFactory維護(hù)了該進(jìn)程的所有的TransportClient郎逃,同時(shí)為每個(gè)RpcAddress創(chuàng)建了一個(gè)鏈接池。

[common/network-common] org.apache.spark.network.client.TransportClientFactory

public class TransportClientFactory implements Closeable {
  //對(duì)外暴露的方法挺份,先看有沒有緩存的鏈接,沒有就創(chuàng)建一個(gè)
   public TransportClient createClient(String remoteHost, int remotePort){
     ...
     // Create the ClientPool if we don't have it yet.
    ClientPool clientPool = connectionPool.get(unresolvedAddress);
    if (clientPool == null) {
      connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
      clientPool = connectionPool.get(unresolvedAddress);
    }
    ...// 這里省略了池里有對(duì)象可用贮懈,方法直接返回
    //random的位置沒有鏈接匀泊,新建立一個(gè)
    synchronized (clientPool.locks[clientIndex]) {
      cachedClient = clientPool.clients[clientIndex];
      ...//double check
      //create new client
      clientPool.clients[clientIndex] = createClient(resolvedAddress);
      return clientPool.clients[clientIndex];
    }
   }
  //建立TransportClient,netty client
  private TransportClient createClient(InetSocketAddress address) {
    //熟悉的netty style
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(workerGroup)
      .channel(socketChannelClass)
      // Disable Nagle's Algorithm since we don't want packets to wait
      .option(ChannelOption.TCP_NODELAY, true)
      .option(ChannelOption.SO_KEEPALIVE, true)
      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
      .option(ChannelOption.ALLOCATOR, pooledAllocator);
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
      @Override
      public void initChannel(SocketChannel ch) {
        //將處理封裝在TransportChannelHandler中
        //context為TransportContext朵你,統(tǒng)一封裝了server/client的channel handler各聘。
        //這里我只需要知道,新的socket建立后抡医,處理消息就交給TransportChannelHandler了
        TransportChannelHandler clientHandler = context.initializePipeline(ch);
        clientRef.set(clientHandler.getClient());
        channelRef.set(ch);
      }
    });
    ChannelFuture cf = bootstrap.connect(address);
    ...//等待鏈接建立完成
    return client;
  }
}

從上面的代碼中可以知道躲因,這里的Message即為RpcOutboxMessage,該類定義在Ouxbox文件里面。

case class RpcOutboxMessage(
    content: ByteBuffer,
    _onFailure: (Throwable) => Unit,
    _onSuccess: (TransportClient, ByteBuffer) => Unit)
//messge是消息載體忌傻,同時(shí)也是一個(gè)callBack,會(huì)在請(qǐng)求返回時(shí)進(jìn)行調(diào)用
  extends OutboxMessage with RpcResponseCallback with Logging {

  private var client: TransportClient = _
  private var requestId: Long = _
//通過Transportclient發(fā)送消息
  override def sendWith(client: TransportClient): Unit = {
    this.client = client
    this.requestId = client.sendRpc(content, this)
  }
}

來到TransportClient中

[common/network-common] org.apache.spark.network.client.TransportClient

public class TransportClient implements Closeable {
  private final Channel channel;
  private final TransportResponseHandler handler;
  @Nullable private String clientId;
  ...
  //唯一的構(gòu)造函數(shù)大脉,Channel就是netty的channel
  public TransportClient(Channel channel, TransportResponseHandler handler) {
    this.channel = Preconditions.checkNotNull(channel);
    this.handler = Preconditions.checkNotNull(handler);
    this.timedOut = false;
  }
  ...
  //
  public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
    if (logger.isTraceEnabled()) {
      logger.trace("Sending RPC to {}", getRemoteAddress(channel));
    }

    long requestId = requestId();
    //這里也是重點(diǎn),callback是處理返回值的
    //hanlder是與client一同創(chuàng)建的
    handler.addRpcRequest(requestId, callback);

    //這里把callbakc只處理onFail的情況
    RpcChannelListener listener = new RpcChannelListener(requestId, callback);
    channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
      .addListener(listener);

    return requestId;
  }
}

到這里水孩,請(qǐng)求的message已經(jīng)通過netty發(fā)送出去了镰矿。接下看我們看看怎么處理消息返回的的

接收返回消息

Netty client處理消息返回,即在BootStrap上添加handler俘种,這個(gè)處理就在TransportClient創(chuàng)建的過程中

TransportClientFactory#createClient

bootstrap.handler(new ChannelInitializer<SocketChannel>() {
      @Override
      public void initChannel(SocketChannel ch) {
        TransportChannelHandler clientHandler = context.initializePipeline(ch);
        clientRef.set(clientHandler.getClient());
        channelRef.set(ch);
      }
    });

這里的context即為NettyRpcEnv的成員變量

val transportContext: TransportConext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))

這里就需要進(jìn)一步介紹TransportContext秤标,

TranportContext用于創(chuàng)建創(chuàng)建TransportClientFactory, TrasnportSever,以及為配置Netty的ChannelHandler

[common/network-common] org.apache.spark.network.TransportContext

public class TransportContext implements Closeable {
  ...
  private final TransportConf conf;
  //rpcHandler,處理request信息
  private final RpcHandler rpcHandler; 
  //client
  public TransportClientFactory createClientFactory(...);
  //server
  public TransportServer createServer(...)
  
  //配置ChannelHandler
  public TransportChannelHandler initializePipeline(SocketChannel channel) {
    return initializePipeline(channel, rpcHandler);
  }
  
  public TransportChannelHandler initializePipeline(
      SocketChannel channel,
      RpcHandler channelRpcHandler) {
    try {
      //這里的handler是封裝了server / client兩端的handler
      TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
      ChannelPipeline pipeline = channel.pipeline()
        .addLast("encoder", ENCODER)
        .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
        .addLast("decoder", DECODER)
        .addLast("idleStateHandler",
          new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
        // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
        // would require more logic to guarantee if this were not part of the same event loop.
        .addLast("handler", channelHandler);
      // Use a separate EventLoopGroup to handle ChunkFetchRequest messages for shuffle rpcs.
      if (chunkFetchWorkers != null) {
        ChunkFetchRequestHandler chunkFetchHandler = new ChunkFetchRequestHandler(
          channelHandler.getClient(), rpcHandler.getStreamManager(),
          conf.maxChunksBeingTransferred(), true /* syncModeEnabled */);
        pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler);
      }
      return channelHandler;
    } catch (RuntimeException e) {...}
  }
   /**
   * Creates the server- and client-side handler which is used to handle both RequestMessages and
   * ResponseMessages. The channel is expected to have been successfully created, though certain
   * properties (such as the remoteAddress()) may not be available yet.
   */
  private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
    TransportClient client = new TransportClient(channel, responseHandler);
    boolean separateChunkFetchRequest = conf.separateChunkFetchRequest();
    ChunkFetchRequestHandler chunkFetchRequestHandler = null;
    if (!separateChunkFetchRequest) {
      chunkFetchRequestHandler = new ChunkFetchRequestHandler(
        client, rpcHandler.getStreamManager(),
        conf.maxChunksBeingTransferred(), false /* syncModeEnabled */);
    }
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
      rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler);
    return new TransportChannelHandler(client, responseHandler, requestHandler,
      conf.connectionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this);
  }
}

ChannelHandler確定了,就可以看到從Channel怎么處理數(shù)據(jù)了宙刘。

[common/network-common] org.apache.spark.network.server.TransportChannelHandler

public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
  private final TransportClient client;
  private final TransportResponseHandler responseHandler;
  private final TransportRequestHandler requestHandler; 
  private final TransportContext transportContext;
  ...
  //server/client端處理消息
  public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
    if (request instanceof RequestMessage) {
      requestHandler.handle((RequestMessage) request);
    } else if (request instanceof ResponseMessage) {
      responseHandler.handle((ResponseMessage) request);
    } else {
      ctx.fireChannelRead(request);
    }
  }
}

這里分析的是消息的返回苍姜,也就是進(jìn)入到TransportResponseHandler

[common/network-common] org.apache.spark.network.server.TransportResponseHandler#handle

public void handle(ResponseMessage message) throws Exception {
  ...//我們目前只關(guān)注executor請(qǐng)求sparkAppConfig
  else if (message instanceof RpcResponse) {
      RpcResponse resp = (RpcResponse) message;
        //在TransportClient#sendRpc中,保存了callBack與requestId的映射悬包,現(xiàn)在就是用到callback的時(shí)候
      RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
      if (listener == null) {
        logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
          resp.requestId, getRemoteAddress(channel), resp.body().size());
      } else {
        outstandingRpcs.remove(resp.requestId);
        try {
          //通知消息返回. 此時(shí)是在netty的worker線程
          //這里的listener就是RpcOutMessage本身衙猪。
          listener.onSuccess(resp.body().nioByteBuffer());
        } finally {
          resp.body().release();
        }
      }
    }
}

到這里,消息的返回就回到了RpcOutboxMessage創(chuàng)建的地方,即回到NettyRpcEnv#askAbortable屈嗤,進(jìn)一步查看回調(diào)如何處理

private[netty] def askAbortable[T: ClassTag](
      message: RequestMessage, timeout: RpcTimeout): AbortableRpcFuture[T] = {
  ...
  def onSuccess(reply: Any): Unit = reply match {
      case RpcFailure(e) => onFailure(e)
      case rpcReply =>
            //這里Future的狀態(tài)就是success.
        if (!promise.trySuccess(rpcReply)) {
          logWarning(s"Ignored message: $reply")
        }
    }
  ...
  //創(chuàng)建并定義了回調(diào)
  val rpcMessage = RpcOutboxMessage(message.serialize(this),
          onFailure,
          (client, response) => onSuccess(deserialize[Any](client, response)))
}

進(jìn)一步的潘拨,driverEndpointRef#askSync中的awaitResult就可以從阻塞返回了。

def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
    val future = ask[T](message, timeout)
    timeout.awaitResult(future)
  }

在executor端就可以拿到SparkAppConfig了饶号,至此client端完成一次請(qǐng)求的通信就完成了铁追。稍后分析,server端茫船,接收到消息后琅束,如何路由到正確的RpcEndpoint,以及處理請(qǐng)求后如何返回。

通信作為驅(qū)動(dòng)整個(gè)應(yīng)用運(yùn)作的核心算谈,包括信息交換涩禀,數(shù)據(jù)傳輸,信號(hào)傳播等都依賴通信然眼。所以所以spark通信作為源碼分析的開篇艾船。

作為大數(shù)據(jù)從業(yè)新人,希望向各位前輩學(xué)習(xí)高每,如果理解有不恰當(dāng)?shù)挠炱瘢涣咧附蹋?/p>

注:源碼基于Apache Spark 3.0

作者:pokerwu
本作品采用知識(shí)共享署名-非商業(yè)性使用 4.0 國(guó)際許可協(xié)議進(jìn)行許可。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末鲸匿,一起剝皮案震驚了整個(gè)濱河市爷怀,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌带欢,老刑警劉巖运授,帶你破解...
    沈念sama閱讀 216,544評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)编检,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來喇完,“玉大人,你說我怎么就攤上這事剥啤〗跸” “怎么了?”我有些...
    開封第一講書人閱讀 162,764評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵府怯,是天一觀的道長(zhǎng)刻诊。 經(jīng)常有香客問我,道長(zhǎng)牺丙,這世上最難降的妖魔是什么则涯? 我笑而不...
    開封第一講書人閱讀 58,193評(píng)論 1 292
  • 正文 為了忘掉前任复局,我火速辦了婚禮,結(jié)果婚禮上粟判,老公的妹妹穿的比我還像新娘亿昏。我一直安慰自己,他們只是感情好档礁,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,216評(píng)論 6 388
  • 文/花漫 我一把揭開白布角钩。 她就那樣靜靜地躺著,像睡著了一般呻澜。 火紅的嫁衣襯著肌膚如雪递礼。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,182評(píng)論 1 299
  • 那天羹幸,我揣著相機(jī)與錄音脊髓,去河邊找鬼。 笑死栅受,一個(gè)胖子當(dāng)著我的面吹牛将硝,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播屏镊,決...
    沈念sama閱讀 40,063評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼袋哼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了闸衫?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,917評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤诽嘉,失蹤者是張志新(化名)和其女友劉穎蔚出,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體虫腋,經(jīng)...
    沈念sama閱讀 45,329評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡骄酗,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,543評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了悦冀。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片趋翻。...
    茶點(diǎn)故事閱讀 39,722評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖盒蟆,靈堂內(nèi)的尸體忽然破棺而出踏烙,到底是詐尸還是另有隱情,我是刑警寧澤历等,帶...
    沈念sama閱讀 35,425評(píng)論 5 343
  • 正文 年R本政府宣布讨惩,位于F島的核電站,受9級(jí)特大地震影響寒屯,放射性物質(zhì)發(fā)生泄漏荐捻。R本人自食惡果不足惜黍少,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,019評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望处面。 院中可真熱鬧厂置,春花似錦、人聲如沸魂角。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽或颊。三九已至砸紊,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間囱挑,已是汗流浹背醉顽。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留平挑,地道東北人游添。 一個(gè)月前我還...
    沈念sama閱讀 47,729評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像通熄,于是被迫代替她去往敵國(guó)和親唆涝。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,614評(píng)論 2 353