Spark 中的消息通信主要涉及 RpcEnv妹懒、RpcEndpoint 及 RpcEndpointRef 幾個類雀监,下面進(jìn)行簡單介紹
RpcEnv、RpcEndpoint 及 RpcEndpointRef
RPCEndpoints 定義了如何處理消息(即眨唬,使用哪個函數(shù)來處理指定消息),在通過 name 完成注冊后会前,RpcEndpoint 就一直存放在 RpcEnv 中。RpcEndpoint 的生命周期按順序是 onStart
匾竿,receive
及 onStop
瓦宜,receive
可以被同時調(diào)用,如果希望 receive
是線程安全的岭妖,可以使用 ThreadSafeRpcEndpoint
RpcEndpointRef
是 RpcEnv 中的 RpcEndpoint 的引用临庇,是一個序列化的實(shí)體以便于通過網(wǎng)絡(luò)傳送或保存以供之后使用。一個 RpcEndpointRef 有一個地址和名字区转√蓿可以調(diào)用 RpcEndpointRef
的 send
方法發(fā)送異步的單向的消息給對應(yīng)的 RpcEndpoint
RpcEnv 管理各個 RpcEndpoint 并將發(fā)送自 RpcEndpointRef 或遠(yuǎn)程節(jié)點(diǎn)的消息分發(fā)給對應(yīng)的 RpcEndpoint。對于 RpcEnv 沒有 catch 到的異常废离,會通過 RpcCallContext.sendFailure
將該異常發(fā)回給消息發(fā)送者或記日志
RpcEnvFactory
RpcEnvFactory 是構(gòu)造 RpcEnv 的工廠類侄泽,調(diào)用其 create(config: RpcEnvConfig): RpcEnv
會 new 一個 RpcEnv 實(shí)例并返回。
Spark 中實(shí)現(xiàn)了兩種 RpcEnvFactory:
-
org.apache.spark.rpc.netty.NettyRpcEnvFactory
使用netty
-
org.apache.spark.rpc.akka.AkkaRpcEnvFactory
使用akka
其中在 Spark 2.0 已經(jīng)沒有了 AkkaRpcEnvFactory
蜻韭,僅保留了 NettyRpcEnvFactory
悼尾。在 Spark 1.6 中可以通過設(shè)置 spark.rpc
值為 netty
(默認(rèn))來使用 NettyRpcEnvFactory
或設(shè)置為 akka
來使用 AkkaRpcEnvFactory
,例如:
$ ./bin/spark-shell --conf spark.rpc=netty
$ ./bin/spark-shell --conf spark.rpc=akka
RpcAddress 與 RpcEndpointAddress
RpcAddress 是一個 RpcEnv 的邏輯地址肖方,包含 hostname 和端口闺魏,RpcAddress 像 Spark URL 一樣編碼,比如:spark://host:port
俯画。RpcEndpointAddress 是向一個 RpcEnv 注冊的 RpcEndpoint 的邏輯地址析桥,包含 RpcAddress 及名字,格式如:spark://[name]@[rpcAddress.host]:[rpcAddress.port]