rpc可以說(shuō)是一個(gè)分布式系統(tǒng)最基礎(chǔ)的組件了。這里解析一下spark的內(nèi)部rpc框架堆巧。
RpcEndpoint
RpcEndpoint
這個(gè)接口表示一個(gè)Rpc端點(diǎn),只要繼承了這個(gè)trait
咧党,
就具備了收發(fā)Rpc消息的能力溜徙,主要包含以下方法
-
接收信息類
def receive: PartialFunction[Any, Unit]
一個(gè)偏函數(shù),用來(lái)接受其他RpcEndpoint
發(fā)來(lái)的信息糠排,其他類可以覆蓋這個(gè)方法來(lái)重寫接受信息的邏輯def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]
方法和上面那個(gè)差不多舵稠,不過(guò)這個(gè)處理過(guò)邏輯之后可以返回一些信息
-
回調(diào)類
-
def onConnected(remoteAddress: RpcAddress): Unit
當(dāng)有遠(yuǎn)程主機(jī)連接到這個(gè)RpcEndpoint
時(shí)的回調(diào) -
onStart
,onStop
,onDisconnected
等回調(diào)
-
RpcEndpointRef
RpcEndpointRef
表示了一個(gè)遠(yuǎn)程RpcEndpoint
和當(dāng)前端點(diǎn)的一個(gè)連接,如果想發(fā)送RPC消息給其他主機(jī),可以先通過(guò)遠(yuǎn)程地址RpcAddress
(一個(gè)表示遠(yuǎn)程端點(diǎn)的case class)獲取RpcEndpointRef
對(duì)象哺徊。通過(guò)這個(gè)對(duì)象發(fā)送RPC消息給遠(yuǎn)程節(jié)點(diǎn)室琢。主要包括以下方法
異步發(fā)送請(qǐng)求
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
這個(gè)方法發(fā)送任意的消息給遠(yuǎn)程端點(diǎn),并返回一個(gè)Future
對(duì)象落追。當(dāng)遠(yuǎn)端返回信息的時(shí)候可以從這個(gè)對(duì)象獲取結(jié)果盈滴。同步發(fā)送請(qǐng)求
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T
等待直到返回結(jié)果只發(fā)送信息
def send(message: Any): Unit
RpcEnv
這個(gè)接口可以說(shuō)非常重要了,保存了所有的遠(yuǎn)程端點(diǎn)信息轿钠,而且負(fù)責(zé)RPC消息的分發(fā)巢钓。每一個(gè)RpcEndpoint
都有一個(gè)RpcEnv
對(duì)象。如果想要與其他RpcEndpoint
連接并收發(fā)信息疗垛,需要向遠(yuǎn)端RpcEndpoint
注冊(cè)自己症汹,遠(yuǎn)端RpcEndpoint
收到注冊(cè)信息之后,會(huì)將請(qǐng)求連接的信息保存在RpcEnv
對(duì)象中贷腕,這樣就算是兩個(gè)RpcEndpoint
彼此連接上了(可以雙向收發(fā)信息了)
-
Endpoint
的注冊(cè)方法def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
用來(lái)一個(gè)Endpoint
把自己注冊(cè)到本地的RpcEnv
中背镇。一個(gè)進(jìn)程可能有多個(gè)Endpoint
比如說(shuō)一個(gè)接收心跳信息的,還有一個(gè)用來(lái)監(jiān)聽(tīng)Job的運(yùn)行狀態(tài)的泽裳,用來(lái)監(jiān)聽(tīng)Executor
返回信息的等等瞒斩。
RpcEndpoint
通過(guò)RpcEnv
發(fā)送信息給RpcEndpointRef
RpcEnv
內(nèi)部將接收到的信息分發(fā)給注冊(cè)在RpcEnv
中的RpcEndpoint
def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]
異步注冊(cè)def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef
同步注冊(cè)
-
生命周期方法
stop
shutdown
awaitTermination
RpcCallContext
下面分析時(shí)會(huì)說(shuō),先貼出方法
private[spark] trait RpcCallContext {
/**
* Reply a message to the sender. If the sender is [[RpcEndpoint]], its [[RpcEndpoint.receive]]
* will be called.
*/
def reply(response: Any): Unit
/**
* Report a failure to the sender.
*/
def sendFailure(e: Throwable): Unit
/**
* The sender of this message.
*/
def senderAddress: RpcAddress
}
spark 中使用了Netty實(shí)現(xiàn)了這些Rpc接口涮总,下面看一看使用netty的實(shí)現(xiàn)济瓢。
NettyRpcEnvFactory
private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
def create(config: RpcEnvConfig): RpcEnv = {
val sparkConf = config.conf
// Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
// KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
val javaSerializerInstance =
new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager)
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
throw e
}
}
nettyEnv
}
}
用來(lái)創(chuàng)建NettyRpcEnv
對(duì)象一個(gè)工廠,創(chuàng)建了一個(gè)NettyRpcEnv
對(duì)象妹卿。
并啟動(dòng)了一個(gè)Netty服務(wù)器(nettyEnv.startServer
方法)
NettyRpcEnv
這個(gè)對(duì)象主要包含了一個(gè)Dispatcher
private[netty] class NettyRpcEnv(
val conf: SparkConf,
javaSerializerInstance: JavaSerializerInstance,
host: String,
securityManager: SecurityManager) extends RpcEnv(conf) with Logging {
...
private val dispatcher: Dispatcher = new Dispatcher(this)
...
private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))
...
@volatile private var server: TransportServer = _
private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
...
def startServer(bindAddress: String, port: Int): Unit = {
.....
server = transportContext.createServer(bindAddress, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
}
上面說(shuō)到調(diào)用了startServer
方法
而這個(gè)方法內(nèi)部則向dispatcher
對(duì)象注冊(cè)了一個(gè)RpcEndpointVerifier
旺矾,這個(gè)對(duì)象其實(shí)也是一個(gè)RpcEndpoint
private[netty] class RpcEndpointVerifier(override val rpcEnv: RpcEnv, dispatcher: Dispatcher)
extends RpcEndpoint {
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RpcEndpointVerifier.CheckExistence(name) => context.reply(dispatcher.verify(name))
}
}
private[netty] object RpcEndpointVerifier {
val NAME = "endpoint-verifier"
/** A message used to ask the remote [[RpcEndpointVerifier]] if an `RpcEndpoint` exists. */
case class CheckExistence(name: String)
}
這里便是我們遇到的第一個(gè)RpcEndpoint
如果收到了CheckExistence
這個(gè)類型的信息則調(diào)用dispatcher
的verify
方法。
我們先看一下這個(gè)dispatcher
對(duì)象夺克。
Dispatcher
這個(gè)對(duì)象的職責(zé)便是將收到的Rpc信息分發(fā)給不同的Endpoint,可以看到內(nèi)部有一個(gè)ConcurrentHashMap
用來(lái)保存所有注冊(cè)的RpcEndpoint
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
}
private val endpoints: ConcurrentMap[String, EndpointData] =
new ConcurrentHashMap[String, EndpointData]
private val receivers = new LinkedBlockingQueue[EndpointData]
....
}
上面說(shuō)到的registerRpcEndpoint
方法實(shí)際上將RpcEndpointVerifier
放入了這兩個(gè)容器中箕宙。
RpcEndpointVerifier
則被其他Endpoint
用來(lái)判斷自己是否被成功注冊(cè)到這個(gè)RpcEnv
中。
遠(yuǎn)程Endpoint
發(fā)送一個(gè)包含自己名字的信息給這個(gè)RpcEnv
中的這個(gè)RpcEndpointVerifier
隨后會(huì)檢查保存Endpoint
信息的容器中是否包含注冊(cè)信息铺纽,并將結(jié)果返回
NettyRpcEndpointRef
前面說(shuō)過(guò)RpcEndpointRef
代表遠(yuǎn)端的Endpoint
柬帕,可以用來(lái)發(fā)送RPC信息
private[netty] class NettyRpcEndpointRef(
@transient private val conf: SparkConf,
private val endpointAddress: RpcEndpointAddress,
@transient @volatile private var nettyEnv: NettyRpcEnv) extends RpcEndpointRef(conf) {
...
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout)
}
}
讓我們回到RpcEnv.ask
方法
private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
val promise = Promise[Any]()
val remoteAddr = message.receiver.address
def onFailure(e: Throwable): Unit = { ... }
def onSuccess(reply: Any): Unit = reply match { ... }
try {
if (remoteAddr == address) {
val p = Promise[Any]()
p.future.onComplete {
case Success(response) => onSuccess(response)
case Failure(e) => onFailure(e)
}(ThreadUtils.sameThread)
dispatcher.postLocalMessage(message, p)
} else {
val rpcMessage = RpcOutboxMessage(message.serialize(this),
onFailure,
(client, response) => onSuccess(deserialize[Any](client, response)))
postToOutbox(message.receiver, rpcMessage)
promise.future.onFailure {
case _: TimeoutException => rpcMessage.onTimeout()
case _ =>
}(ThreadUtils.sameThread)
}
val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
override def run(): Unit = {
onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " +
s"in ${timeout.duration}"))
}
}, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
promise.future.onComplete { v =>
timeoutCancelable.cancel(true)
}(ThreadUtils.sameThread)
} catch { ... }
promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
}
這個(gè)方法由3部分構(gòu)成
第一部分:判斷消息是否是發(fā)給本地注冊(cè)的RpcEndpoint
的,是則發(fā)送本地信息
第二部分:如果是發(fā)給遠(yuǎn)程Endpoint
的狡门,放到OutBox
里面陷寝,等待處理
第三部分:超時(shí)處理,起了一個(gè)定時(shí)任務(wù)其馏,如果超時(shí)則報(bào)異常凤跑。同時(shí)給聲明的Promise對(duì)象增加了一個(gè)回調(diào),當(dāng)rpc調(diào)用在超時(shí)前完成則取消之前起的定時(shí)任務(wù)叛复。
我們首先看dispatcher.postLocalMessage
,這個(gè)方法封裝了調(diào)用信息仔引,
def postLocalMessage(message: RequestMessage, p: Promise[Any]): Unit = {
val rpcCallContext =
new LocalNettyRpcCallContext(message.senderAddress, p)
val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
postMessage(message.receiver.name, rpcMessage, (e) => p.tryFailure(e))
}
實(shí)際上走了dispatcher.postMessage
方法扔仓,實(shí)際做了3件事:
1.獲取到EndpointData
對(duì)象
2.往這個(gè)對(duì)象的inbox對(duì)象發(fā)信息
3.將EndpointData
對(duì)象放入 receivers
隊(duì)列中
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit ={
...
val data = endpoints.get(endpointName)
data.inbox.post(message)
receivers.offer(data)
...
}
inbox對(duì)象實(shí)際就保存了發(fā)往Endpoint
對(duì)象的信息。發(fā)到這里其實(shí)Endpoint
已經(jīng)收到信息了咖耘。 但是post方法只是將消息放到隊(duì)列里面翘簇,那么實(shí)際是怎么發(fā)送給Endpoint
的呢?
private[netty] class Inbox(
val endpointRef: NettyRpcEndpointRef,
val endpoint: RpcEndpoint)
extends Logging {
inbox => // Give this an alias so we can use it more clearly in closures.
@GuardedBy("this")
protected val messages = new java.util.LinkedList[InboxMessage]()
...
def post(message: InboxMessage): Unit = inbox.synchronized {
if (stopped) {
// We already put "OnStop" into "messages", so we should drop further messages
onDrop(message)
} else {
messages.add(message)
false
}
...
}
Dispatcher
對(duì)象里面有一個(gè)線程池,每個(gè)線程會(huì)不斷的從receivers
隊(duì)列中獲取EndpointData
并處理其中的inbox
對(duì)象保存的信息
private val threadpool: ThreadPoolExecutor = {
val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
math.max(2, Runtime.getRuntime.availableProcessors()))
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
for (i <- 0 until numThreads) {
pool.execute(new MessageLoop)
}
pool
}
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
while (true) {
try {
val data = receivers.take()
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
receivers.offer(PoisonPill)
return
}
data.inbox.process(Dispatcher.this)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case ie: InterruptedException => // exit
}
}
}
我們?cè)倩氐?code>inbox.process方法
def process(dispatcher: Dispatcher): Unit = {
var message: InboxMessage = null
inbox.synchronized {
...
message = messages.poll()
...
}
while (true) {
safelyCall(endpoint) {
message match {
case RpcMessage(_sender, content, context) =>
try {
endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
} catch { ... }
case OneWayMessage(_sender, content) =>
endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
case OnStart =>
endpoint.onStart()
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
enableConcurrent = true
}
}
}
case OnStop =>
val activeThreads = inbox.synchronized { inbox.numActiveThreads }
...
dispatcher.removeRpcEndpointRef(endpoint)
endpoint.onStop()
...
case RemoteProcessConnected(remoteAddress) =>
endpoint.onConnected(remoteAddress)
case RemoteProcessDisconnected(remoteAddress) =>
endpoint.onDisconnected(remoteAddress)
case RemoteProcessConnectionError(cause, remoteAddress) =>
endpoint.onNetworkError(cause, remoteAddress)
}
}
inbox.synchronized {
...
message = messages.poll()
if (message == null) {
numActiveThreads -= 1
return
}
}
}
}
可以看到這個(gè)方法不停的從messages隊(duì)列中獲取對(duì)象直到隊(duì)列里面沒(méi)有信息
之前發(fā)送給本地的Endpoint
的消息是InboxMessage
這個(gè)對(duì)應(yīng)的模式匹配中的哪個(gè)對(duì)象呢儿倒?
private[netty] sealed trait InboxMessage
private[netty] case class OneWayMessage(
senderAddress: RpcAddress,
content: Any) extends InboxMessage
private[netty] case class RpcMessage(
senderAddress: RpcAddress,
content: Any,
context: NettyRpcCallContext) extends InboxMessage
private[netty] case object OnStart extends InboxMessage
private[netty] case object OnStop extends InboxMessage
之前發(fā)送的本地消息是RpcMessage
類型的眶掌,Inbox
和Endpoint
是一一對(duì)應(yīng)的距糖,所以會(huì)直接調(diào)用endpoint.receiveAndReply
方法進(jìn)行相應(yīng)的處理横侦,也就是說(shuō)這時(shí)候消息已經(jīng)發(fā)送到Endpoint
了随静。(可以參考RpcEndpointVerifier.receiveAndReply
,這是其中一種RpcEndpoint
慷吊,在這個(gè)流程中可以理解為袖裕,本地的RpcEndpoint
向本地的RpcEnv
確認(rèn)是否成功注冊(cè))
那么我們看一下發(fā)送消息給遠(yuǎn)程的RpcEndpoint
消息被封裝成RpcOutboxMessage
,并調(diào)用了postToOutbox
方法
private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
if (receiver.client != null) {
message.sendWith(receiver.client)
} else {
...
val targetOutbox = {
val outbox = outboxes.get(receiver.address)
...
}
if (stopped.get) { ... } else {
targetOutbox.send(message)
}
}
}
private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) {
outbox => // Give this an alias so we can use it more clearly in closures.
@GuardedBy("this")
private val messages = new java.util.LinkedList[OutboxMessage]
@GuardedBy("this")
private var client: TransportClient = null
@GuardedBy("this")
private var connectFuture: java.util.concurrent.Future[Unit] = null
def send(message: OutboxMessage): Unit = {
val dropped = synchronized {
if (stopped) { ... } else {
messages.add(message)
false
}
}
if (dropped) { ... } else {
drainOutbox()
}
}
每個(gè)Outbox
里面包含
- 一個(gè)保存消息的隊(duì)列
- 一個(gè)
TransportClient
連接遠(yuǎn)程的RpcEndpoint
并用來(lái)發(fā)送信息
drainOutbox
方法實(shí)際做了2件事
- 檢查是否和遠(yuǎn)端的
RpcEndpoint
建立了連接,沒(méi)有則起一個(gè)線程建立連接 - 遍歷隊(duì)列溉瓶,發(fā)送信息給遠(yuǎn)端的
RpcEnv
的TransportServer
這個(gè)信息會(huì)被遠(yuǎn)端的NettyRpcHandler
處理
private[netty] class NettyRpcHandler(
dispatcher: Dispatcher,
nettyEnv: NettyRpcEnv,
streamManager: StreamManager) extends RpcHandler with Logging {
// A variable to track the remote RpcEnv addresses of all clients
private val remoteAddresses = new ConcurrentHashMap[RpcAddress, RpcAddress]()
override def receive(
client: TransportClient,
message: ByteBuffer,
callback: RpcResponseCallback): Unit = {
val messageToDispatch = internalReceive(client, message)
dispatcher.postRemoteMessage(messageToDispatch, callback)
}
}
def postRemoteMessage(message: RequestMessage, callback: RpcResponseCallback): Unit = {
val rpcCallContext =
new RemoteNettyRpcCallContext(nettyEnv, callback, message.senderAddress)
val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
postMessage(message.receiver.name, rpcMessage, (e) => callback.onFailure(e))
}
于是我們又看到了postMesage
這個(gè)方法急鳄,而這次是調(diào)用的遠(yuǎn)端的RpcEnv
中Dispatcher
的postMessage
,消息最后也會(huì)被發(fā)送給注冊(cè)到遠(yuǎn)端的RpcEnv
中的RpcEndpoint
,這樣遠(yuǎn)端的RpcEndpoint
便收到了來(lái)自本地的信息。完成了RPC通信堰酿。