spark源碼閱讀——rpc部分

rpc可以說(shuō)是一個(gè)分布式系統(tǒng)最基礎(chǔ)的組件了。這里解析一下spark的內(nèi)部rpc框架堆巧。

RpcEndpoint

RpcEndpoint 這個(gè)接口表示一個(gè)Rpc端點(diǎn),只要繼承了這個(gè)trait咧党,
就具備了收發(fā)Rpc消息的能力溜徙,主要包含以下方法

  • 接收信息類

    • def receive: PartialFunction[Any, Unit] 一個(gè)偏函數(shù),用來(lái)接受其他RpcEndpoint發(fā)來(lái)的信息糠排,其他類可以覆蓋這個(gè)方法來(lái)重寫接受信息的邏輯

    • def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] 方法和上面那個(gè)差不多舵稠,不過(guò)這個(gè)處理過(guò)邏輯之后可以返回一些信息

  • 回調(diào)類

    • def onConnected(remoteAddress: RpcAddress): Unit 當(dāng)有遠(yuǎn)程主機(jī)連接到這個(gè)RpcEndpoint時(shí)的回調(diào)
    • onStart,onStop,onDisconnected等回調(diào)

RpcEndpointRef

RpcEndpointRef表示了一個(gè)遠(yuǎn)程RpcEndpoint和當(dāng)前端點(diǎn)的一個(gè)連接,如果想發(fā)送RPC消息給其他主機(jī),可以先通過(guò)遠(yuǎn)程地址RpcAddress(一個(gè)表示遠(yuǎn)程端點(diǎn)的case class)獲取RpcEndpointRef對(duì)象哺徊。通過(guò)這個(gè)對(duì)象發(fā)送RPC消息給遠(yuǎn)程節(jié)點(diǎn)室琢。主要包括以下方法

  • 異步發(fā)送請(qǐng)求 def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
    這個(gè)方法發(fā)送任意的消息給遠(yuǎn)程端點(diǎn),并返回一個(gè)Future對(duì)象落追。當(dāng)遠(yuǎn)端返回信息的時(shí)候可以從這個(gè)對(duì)象獲取結(jié)果盈滴。

  • 同步發(fā)送請(qǐng)求 def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T 等待直到返回結(jié)果

  • 只發(fā)送信息 def send(message: Any): Unit

RpcEnv

這個(gè)接口可以說(shuō)非常重要了,保存了所有的遠(yuǎn)程端點(diǎn)信息轿钠,而且負(fù)責(zé)RPC消息的分發(fā)巢钓。每一個(gè)RpcEndpoint都有一個(gè)RpcEnv對(duì)象。如果想要與其他RpcEndpoint連接并收發(fā)信息疗垛,需要向遠(yuǎn)端RpcEndpoint注冊(cè)自己症汹,遠(yuǎn)端RpcEndpoint收到注冊(cè)信息之后,會(huì)將請(qǐng)求連接的信息保存在RpcEnv對(duì)象中贷腕,這樣就算是兩個(gè)RpcEndpoint彼此連接上了(可以雙向收發(fā)信息了)

  • Endpoint的注冊(cè)方法

    • def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
      用來(lái)一個(gè)Endpoint把自己注冊(cè)到本地的RpcEnv中背镇。一個(gè)進(jìn)程可能有多個(gè)Endpoint 比如說(shuō)一個(gè)接收心跳信息的,還有一個(gè)用來(lái)監(jiān)聽(tīng)Job的運(yùn)行狀態(tài)的泽裳,用來(lái)監(jiān)聽(tīng)Executor返回信息的等等瞒斩。
      RpcEndpoint通過(guò)RpcEnv發(fā)送信息給RpcEndpointRef
      RpcEnv內(nèi)部將接收到的信息分發(fā)給注冊(cè)在RpcEnv中的RpcEndpoint

    • def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] 異步注冊(cè)

    • def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef 同步注冊(cè)

  • 生命周期方法

    • stop
    • shutdown
    • awaitTermination

RpcCallContext

下面分析時(shí)會(huì)說(shuō),先貼出方法

private[spark] trait RpcCallContext {

  /**
   * Reply a message to the sender. If the sender is [[RpcEndpoint]], its [[RpcEndpoint.receive]]
   * will be called.
   */
  def reply(response: Any): Unit

  /**
   * Report a failure to the sender.
   */
  def sendFailure(e: Throwable): Unit

  /**
   * The sender of this message.
   */
  def senderAddress: RpcAddress
}

spark 中使用了Netty實(shí)現(xiàn)了這些Rpc接口涮总,下面看一看使用netty的實(shí)現(xiàn)济瓢。

NettyRpcEnvFactory

private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {

  def create(config: RpcEnvConfig): RpcEnv = {
    val sparkConf = config.conf
    // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
    // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
    val javaSerializerInstance =
      new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager)
    if (!config.clientMode) {
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      try {
        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
      } catch {
        case NonFatal(e) =>
          nettyEnv.shutdown()
          throw e
      }
    }
    nettyEnv
  }
}

用來(lái)創(chuàng)建NettyRpcEnv對(duì)象一個(gè)工廠,創(chuàng)建了一個(gè)NettyRpcEnv對(duì)象妹卿。
并啟動(dòng)了一個(gè)Netty服務(wù)器(nettyEnv.startServer方法)

NettyRpcEnv

這個(gè)對(duì)象主要包含了一個(gè)Dispatcher

private[netty] class NettyRpcEnv(
    val conf: SparkConf,
    javaSerializerInstance: JavaSerializerInstance,
    host: String,
    securityManager: SecurityManager) extends RpcEnv(conf) with Logging {

  ...
  private val dispatcher: Dispatcher = new Dispatcher(this)
  ...
  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))
  ...
  @volatile private var server: TransportServer = _
  private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
  ... 

  def startServer(bindAddress: String, port: Int): Unit = {
        .....
        server = transportContext.createServer(bindAddress, port, bootstraps)
        dispatcher.registerRpcEndpoint(
        RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
  }
}

上面說(shuō)到調(diào)用了startServer方法
而這個(gè)方法內(nèi)部則向dispatcher對(duì)象注冊(cè)了一個(gè)RpcEndpointVerifier旺矾,這個(gè)對(duì)象其實(shí)也是一個(gè)RpcEndpoint

private[netty] class RpcEndpointVerifier(override val rpcEnv: RpcEnv, dispatcher: Dispatcher)
  extends RpcEndpoint {

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case RpcEndpointVerifier.CheckExistence(name) => context.reply(dispatcher.verify(name))
  }
}

private[netty] object RpcEndpointVerifier {
  val NAME = "endpoint-verifier"

  /** A message used to ask the remote [[RpcEndpointVerifier]] if an `RpcEndpoint` exists. */
  case class CheckExistence(name: String)
}

這里便是我們遇到的第一個(gè)RpcEndpoint 如果收到了CheckExistence這個(gè)類型的信息則調(diào)用dispatcherverify方法。

我們先看一下這個(gè)dispatcher對(duì)象夺克。

Dispatcher

這個(gè)對(duì)象的職責(zé)便是將收到的Rpc信息分發(fā)給不同的Endpoint,可以看到內(nèi)部有一個(gè)ConcurrentHashMap用來(lái)保存所有注冊(cè)的RpcEndpoint

private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {

  private class EndpointData(
      val name: String,
      val endpoint: RpcEndpoint,
      val ref: NettyRpcEndpointRef) {
    val inbox = new Inbox(ref, endpoint)
  }

  private val endpoints: ConcurrentMap[String, EndpointData] =
    new ConcurrentHashMap[String, EndpointData]

  private val receivers = new LinkedBlockingQueue[EndpointData]
  ....

}

上面說(shuō)到的registerRpcEndpoint方法實(shí)際上將RpcEndpointVerifier放入了這兩個(gè)容器中箕宙。
RpcEndpointVerifier則被其他Endpoint用來(lái)判斷自己是否被成功注冊(cè)到這個(gè)RpcEnv中。
遠(yuǎn)程Endpoint發(fā)送一個(gè)包含自己名字的信息給這個(gè)RpcEnv中的這個(gè)RpcEndpointVerifier隨后會(huì)檢查保存Endpoint信息的容器中是否包含注冊(cè)信息铺纽,并將結(jié)果返回

NettyRpcEndpointRef

前面說(shuō)過(guò)RpcEndpointRef代表遠(yuǎn)端的Endpoint柬帕,可以用來(lái)發(fā)送RPC信息


private[netty] class NettyRpcEndpointRef(
    @transient private val conf: SparkConf,
    private val endpointAddress: RpcEndpointAddress,
    @transient @volatile private var nettyEnv: NettyRpcEnv) extends RpcEndpointRef(conf) {
    ...

    override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
        nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout)
    }
}

讓我們回到RpcEnv.ask方法

private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
    val promise = Promise[Any]()
    val remoteAddr = message.receiver.address

    def onFailure(e: Throwable): Unit = { ... }
    def onSuccess(reply: Any): Unit = reply match { ... }

    try {
      if (remoteAddr == address) {
        val p = Promise[Any]()
        p.future.onComplete {
          case Success(response) => onSuccess(response)
          case Failure(e) => onFailure(e)
        }(ThreadUtils.sameThread)
        dispatcher.postLocalMessage(message, p)
      } else {
        val rpcMessage = RpcOutboxMessage(message.serialize(this),
          onFailure,
          (client, response) => onSuccess(deserialize[Any](client, response)))
        postToOutbox(message.receiver, rpcMessage)
        promise.future.onFailure {
          case _: TimeoutException => rpcMessage.onTimeout()
          case _ =>
        }(ThreadUtils.sameThread)
      }

      val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
        override def run(): Unit = {
          onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " +
            s"in ${timeout.duration}"))
        }
      }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
      promise.future.onComplete { v =>
        timeoutCancelable.cancel(true)
      }(ThreadUtils.sameThread)
    } catch { ... }
    promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
  }

這個(gè)方法由3部分構(gòu)成
第一部分:判斷消息是否是發(fā)給本地注冊(cè)的RpcEndpoint的,是則發(fā)送本地信息
第二部分:如果是發(fā)給遠(yuǎn)程Endpoint的狡门,放到OutBox里面陷寝,等待處理
第三部分:超時(shí)處理,起了一個(gè)定時(shí)任務(wù)其馏,如果超時(shí)則報(bào)異常凤跑。同時(shí)給聲明的Promise對(duì)象增加了一個(gè)回調(diào),當(dāng)rpc調(diào)用在超時(shí)前完成則取消之前起的定時(shí)任務(wù)叛复。

我們首先看dispatcher.postLocalMessage,這個(gè)方法封裝了調(diào)用信息仔引,

def postLocalMessage(message: RequestMessage, p: Promise[Any]): Unit = {
    val rpcCallContext =
      new LocalNettyRpcCallContext(message.senderAddress, p)
    val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
    postMessage(message.receiver.name, rpcMessage, (e) => p.tryFailure(e))
  }

實(shí)際上走了dispatcher.postMessage方法扔仓,實(shí)際做了3件事:

1.獲取到EndpointData對(duì)象
2.往這個(gè)對(duì)象的inbox對(duì)象發(fā)信息
3.將EndpointData對(duì)象放入 receivers隊(duì)列中

       
private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit ={
       ...
      val data = endpoints.get(endpointName)
      data.inbox.post(message)
      receivers.offer(data)
       ...
}

inbox對(duì)象實(shí)際就保存了發(fā)往Endpoint對(duì)象的信息。發(fā)到這里其實(shí)Endpoint 已經(jīng)收到信息了咖耘。 但是post方法只是將消息放到隊(duì)列里面翘簇,那么實(shí)際是怎么發(fā)送給Endpoint的呢?

private[netty] class Inbox(
    val endpointRef: NettyRpcEndpointRef,
    val endpoint: RpcEndpoint)
  extends Logging {

  inbox =>  // Give this an alias so we can use it more clearly in closures.

  @GuardedBy("this")
  protected val messages = new java.util.LinkedList[InboxMessage]()
  ...
 
  def post(message: InboxMessage): Unit = inbox.synchronized {
    if (stopped) {
      // We already put "OnStop" into "messages", so we should drop further messages
      onDrop(message)
    } else {
      messages.add(message)
      false
    }
  ...
  }

Dispatcher對(duì)象里面有一個(gè)線程池,每個(gè)線程會(huì)不斷的從receivers隊(duì)列中獲取EndpointData并處理其中的inbox對(duì)象保存的信息

private val threadpool: ThreadPoolExecutor = {
    val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
      math.max(2, Runtime.getRuntime.availableProcessors()))
    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
    for (i <- 0 until numThreads) {
      pool.execute(new MessageLoop)
    }
    pool
  }

private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            val data = receivers.take()
            if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              receivers.offer(PoisonPill)
              return
            }
            data.inbox.process(Dispatcher.this)
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }

我們?cè)倩氐?code>inbox.process方法

def process(dispatcher: Dispatcher): Unit = {
    var message: InboxMessage = null
    inbox.synchronized {
      ... 
      message = messages.poll()
      ...
    }
    while (true) {
      safelyCall(endpoint) {
        message match {
          case RpcMessage(_sender, content, context) =>
            try {
              endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
                throw new SparkException(s"Unsupported message $message from ${_sender}")
              })
            } catch { ... }

          case OneWayMessage(_sender, content) =>
            endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
              throw new SparkException(s"Unsupported message $message from ${_sender}")
            })

          case OnStart =>
            endpoint.onStart()
            if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
              inbox.synchronized {
                if (!stopped) {
                  enableConcurrent = true
                }
              }
            }

          case OnStop =>
            val activeThreads = inbox.synchronized { inbox.numActiveThreads }
            ...
            dispatcher.removeRpcEndpointRef(endpoint)
            endpoint.onStop()
            ...

          case RemoteProcessConnected(remoteAddress) =>
            endpoint.onConnected(remoteAddress)

          case RemoteProcessDisconnected(remoteAddress) =>
            endpoint.onDisconnected(remoteAddress)

          case RemoteProcessConnectionError(cause, remoteAddress) =>
            endpoint.onNetworkError(cause, remoteAddress)
        }
      }

      inbox.synchronized {
        ... 
        message = messages.poll()
        if (message == null) {
          numActiveThreads -= 1
          return
        }
      }
    }
  }

可以看到這個(gè)方法不停的從messages隊(duì)列中獲取對(duì)象直到隊(duì)列里面沒(méi)有信息
之前發(fā)送給本地的Endpoint的消息是InboxMessage這個(gè)對(duì)應(yīng)的模式匹配中的哪個(gè)對(duì)象呢儿倒?

private[netty] sealed trait InboxMessage

private[netty] case class OneWayMessage(
    senderAddress: RpcAddress,
    content: Any) extends InboxMessage

private[netty] case class RpcMessage(
    senderAddress: RpcAddress,
    content: Any,
    context: NettyRpcCallContext) extends InboxMessage

private[netty] case object OnStart extends InboxMessage

private[netty] case object OnStop extends InboxMessage

之前發(fā)送的本地消息是RpcMessage類型的眶掌,InboxEndpoint是一一對(duì)應(yīng)的距糖,所以會(huì)直接調(diào)用endpoint.receiveAndReply方法進(jìn)行相應(yīng)的處理横侦,也就是說(shuō)這時(shí)候消息已經(jīng)發(fā)送到Endpoint了随静。(可以參考RpcEndpointVerifier.receiveAndReply,這是其中一種RpcEndpoint慷吊,在這個(gè)流程中可以理解為袖裕,本地的RpcEndpoint向本地的RpcEnv確認(rèn)是否成功注冊(cè))

那么我們看一下發(fā)送消息給遠(yuǎn)程的RpcEndpoint消息被封裝成RpcOutboxMessage,并調(diào)用了postToOutbox方法

private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
    if (receiver.client != null) {
      message.sendWith(receiver.client)
    } else {
      ...
      val targetOutbox = {
        val outbox = outboxes.get(receiver.address)
        ...
      }
      if (stopped.get) { ... } else {
        targetOutbox.send(message)
      }
    }
  }

private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) {
outbox => // Give this an alias so we can use it more clearly in closures.

  @GuardedBy("this")
  private val messages = new java.util.LinkedList[OutboxMessage]

  @GuardedBy("this")
  private var client: TransportClient = null

  @GuardedBy("this")
  private var connectFuture: java.util.concurrent.Future[Unit] = null

  def send(message: OutboxMessage): Unit = {
    val dropped = synchronized {
      if (stopped) { ... } else {
        messages.add(message)
        false
      }
    }
    if (dropped) { ... } else {
      drainOutbox()
    }
  }
 

每個(gè)Outbox里面包含

  • 一個(gè)保存消息的隊(duì)列
  • 一個(gè)TransportClient 連接遠(yuǎn)程的RpcEndpoint并用來(lái)發(fā)送信息

drainOutbox方法實(shí)際做了2件事

  1. 檢查是否和遠(yuǎn)端的 RpcEndpoint建立了連接,沒(méi)有則起一個(gè)線程建立連接
  2. 遍歷隊(duì)列溉瓶,發(fā)送信息給遠(yuǎn)端的RpcEnvTransportServer這個(gè)信息會(huì)被遠(yuǎn)端的 NettyRpcHandler處理
private[netty] class NettyRpcHandler(
    dispatcher: Dispatcher,
    nettyEnv: NettyRpcEnv,
    streamManager: StreamManager) extends RpcHandler with Logging {

  // A variable to track the remote RpcEnv addresses of all clients
  private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]()

  override def receive(
      client: TransportClient,
      message: ByteBuffer,
      callback: RpcResponseCallback): Unit = {
    val messageToDispatch = internalReceive(client, message)
    dispatcher.postRemoteMessage(messageToDispatch, callback)
  }
}
def postRemoteMessage(message: RequestMessage, callback: RpcResponseCallback): Unit = {
    val rpcCallContext =
      new RemoteNettyRpcCallContext(nettyEnv, callback, message.senderAddress)
    val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
    postMessage(message.receiver.name, rpcMessage, (e) => callback.onFailure(e))
  }

于是我們又看到了postMesage這個(gè)方法急鳄,而這次是調(diào)用的遠(yuǎn)端的RpcEnvDispatcherpostMessage,消息最后也會(huì)被發(fā)送給注冊(cè)到遠(yuǎn)端的RpcEnv中的RpcEndpoint,這樣遠(yuǎn)端的RpcEndpoint便收到了來(lái)自本地的信息。完成了RPC通信堰酿。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末疾宏,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子触创,更是在濱河造成了極大的恐慌坎藐,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,270評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件哼绑,死亡現(xiàn)場(chǎng)離奇詭異岩馍,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)抖韩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,489評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門蛀恩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人茂浮,你說(shuō)我怎么就攤上這事双谆。” “怎么了席揽?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,630評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵顽馋,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我幌羞,道長(zhǎng)寸谜,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,906評(píng)論 1 295
  • 正文 為了忘掉前任新翎,我火速辦了婚禮程帕,結(jié)果婚禮上住练,老公的妹妹穿的比我還像新娘地啰。我一直安慰自己愁拭,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,928評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布亏吝。 她就那樣靜靜地躺著岭埠,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蔚鸥。 梳的紋絲不亂的頭發(fā)上惜论,一...
    開(kāi)封第一講書(shū)人閱讀 51,718評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音止喷,去河邊找鬼馆类。 笑死,一個(gè)胖子當(dāng)著我的面吹牛弹谁,可吹牛的內(nèi)容都是我干的乾巧。 我是一名探鬼主播,決...
    沈念sama閱讀 40,442評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼预愤,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼沟于!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起植康,我...
    開(kāi)封第一講書(shū)人閱讀 39,345評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤旷太,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后销睁,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體供璧,經(jīng)...
    沈念sama閱讀 45,802評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,984評(píng)論 3 337
  • 正文 我和宋清朗相戀三年冻记,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嗜傅。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,117評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡檩赢,死狀恐怖吕嘀,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情贞瞒,我是刑警寧澤偶房,帶...
    沈念sama閱讀 35,810評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站军浆,受9級(jí)特大地震影響棕洋,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜乒融,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,462評(píng)論 3 331
  • 文/蒙蒙 一掰盘、第九天 我趴在偏房一處隱蔽的房頂上張望摄悯。 院中可真熱鬧,春花似錦愧捕、人聲如沸奢驯。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,011評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)瘪阁。三九已至,卻和暖如春邮偎,著一層夾襖步出監(jiān)牢的瞬間管跺,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,139評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工禾进, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留豁跑,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,377評(píng)論 3 373
  • 正文 我出身青樓泻云,卻偏偏與公主長(zhǎng)得像艇拍,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子壶愤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,060評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理淑倾,服務(wù)發(fā)現(xiàn),斷路器征椒,智...
    卡卡羅2017閱讀 134,665評(píng)論 18 139
  • 本文基于spark源碼2.11 1. 概要 spark中網(wǎng)絡(luò)通信無(wú)處不在娇哆,例如 driver和master的通信,...
    aaron1993閱讀 3,410評(píng)論 1 3
  • Spark RPC層設(shè)計(jì)概況 spark2.0的RPC框架是基于優(yōu)秀的網(wǎng)絡(luò)通信框架Netty開(kāi)發(fā)的勃救,我們先把Spa...
    ZanderXu閱讀 4,967評(píng)論 0 12
  • 本文會(huì)為大家介紹Spark中的RPC通信機(jī)制蒙秒,詳細(xì)闡述“Spark RPC到底是個(gè)什么鬼勃黍?”,閑話少敘晕讲,讓我們來(lái)進(jìn)...
    sun4lower閱讀 3,056評(píng)論 0 8
  • \d覆获,\w,\s,[a-zA-Z0-9],\b,.,*,+,?,x{3},^$分別是什么? \d是查找數(shù)字(dig...
    柯良勇閱讀 435評(píng)論 0 0