Spark Network 模塊分析
為什么用Netty通信框架代替Akka
一直以來,基于Akka實現(xiàn)的RPC通信框架是Spark引以為豪的主要特性斤儿,也是與Hadoop等分布式計算框架對比過程中一大亮點准潭,但是時代和技術都在演化,從Spark1.3.1版本開始,為了解決大塊數(shù)據(jù)(如Shuffle)的傳輸問題碾盟,Spark引入了Netty通信框架妆丘,到了1.6.0版本锄俄,Netty完全取代了Akka,承擔Spark內(nèi)部所有的RPC通信以及數(shù)據(jù)流傳輸勺拣。
JAVA IO也經(jīng)歷了幾次演化奶赠,從最早的BIO(阻塞式/非阻塞IO),到1.4版本的NIO(IO復用)药有,到1.7版本的NIO2.0/AIO(異步IO)毅戈。
基于早期BIO來實現(xiàn)高并發(fā)網(wǎng)絡服務器都是依賴多線程來實現(xiàn),但是線程開銷較大愤惰,BIO的瓶頸明顯苇经,NIO的出現(xiàn)解決了這一大難題,基于IO復用解決了IO高并發(fā)宦言。
但是NIO有也有幾個缺點:
- API可用性較低(拿ByteBuffer來說扇单,共用一個curent指針,讀寫切換需要進行flip和rewind蜡励,相當麻煩)
- 僅僅是API令花,如果想在NIO上實現(xiàn)一個網(wǎng)絡模型,還需要自己寫很多比如線程池凉倚,解碼兼都,半包/粘包,限流等邏輯
- 著名的NIO-Epoll死循環(huán)的BUG
因為這幾個原因稽寒,促使了很多JAVA-IO通信框架的出現(xiàn)扮碧,Netty就是其中一員,它也因為高度的穩(wěn)定性,功能性慎王,性能等特性蚓土,成為Java開發(fā)的首選
那么Netty和JDK-NIO之間到底是什么關系?
首先是NIO的上層封裝赖淤,Netty提供了NioEventLoopGroup / NioSocketChannel / NioServerSocketChannel的組合來完成實際IO操作蜀漆,繼而在此之上實現(xiàn)數(shù)據(jù)流Pipeline以及EventLoop線程池等功能。
另外它又重寫了NIO咱旱,JDK-NIO底層是基于Epoll的LT模式來實現(xiàn)确丢,而Netty是基于Epoll的ET模式實現(xiàn)的一組IO操作EpollEventLoopGroup / EpollSocketChannel / EpollServerSocketChannel
Netty對兩種實現(xiàn)進行完美的封裝,可以根據(jù)業(yè)務的需求來選擇不同的實現(xiàn)
Epoll的ET和LT模式真的有很大的性能差別嗎吐限?單從Epoll的角度來看鲜侥,ET肯定是比LT要性能好那么一點。如果為了編碼簡潔性诸典,LT還是首選描函,ET如果用戶層邏輯實現(xiàn)不夠優(yōu)美,相比ET還會帶來更大大性能開銷
那么Akka又是什么狐粱?
從Akka出現(xiàn)背景來說舀寓,它是基于Actor的RPC通信系統(tǒng),它的核心概念也是Message脑奠,它是基于協(xié)程的基公,性能不容置疑;基于scala的偏函數(shù)宋欺,易用性也沒有話說,但是它畢竟只是RPC通信胰伍,無法適用大的package/stream的數(shù)據(jù)傳輸齿诞,這也是Spark早期引入Netty的原因。
那么Netty為什么可以取代Akka骂租?
首先不容置疑的是Akka可以做到的祷杈,Netty也可以做到,但是Netty可以做到渗饮,Akka卻無法做到但汞。原因是啥?在軟件棧中互站,Akka相比Netty要Higher一點私蕾,它專門針對RPC做了很多事情,而Netty相比更加基礎一點胡桃,可以為不同的應用層通信協(xié)議(RPC踩叭,F(xiàn)TP,HTTP等)提供支持,在早期的Akka版本容贝,底層的NIO通信就是用的Netty自脯。
其次一個優(yōu)雅的工程師是不會允許一個系統(tǒng)中容納兩套通信框架闷哆!最后沽瘦,雖然Netty沒有Akka協(xié)程級的性能優(yōu)勢褥民,但是Netty內(nèi)部高效的Reactor線程模型价脾,無鎖化的串行設計康二,高效的序列化采章,零拷貝烁兰,內(nèi)存池等特性也保證了Netty不會存在性能問題辅斟。
那么Spark是怎么用Netty來取代Akka呢脚囊?一句話龟糕,利用偏函數(shù)的特性,基于Netty“仿造”出一個簡約版本的Actor模型悔耘。
Spark Network Common的實現(xiàn)
Byte的表示
對于Network通信讲岁,不管傳輸?shù)氖切蛄谢蟮膶ο筮€是文件,在網(wǎng)絡上表現(xiàn)的都是字節(jié)流衬以。在傳統(tǒng)IO中缓艳,字節(jié)流表示為Stream;在NIO中看峻,字節(jié)流表示為ByteBuffer阶淘;在Netty中字節(jié)流表示為ByteBuff或FileRegion;在Spark中互妓,針對Byte也做了一層包裝溪窒,支持對Byte和文件流進行處理,即ManagedBuffer冯勉;
ManagedBuffer包含了三個函數(shù)createInputStream()澈蚌,nioByteBuffer(),convertToNetty()來對Buffer進行“類型轉(zhuǎn)換”灼狰,分別獲取stream宛瞄,ByteBuffer,ByteBuff或FileRegion交胚;NioManagedBuffer / NettyManagedBuffer / FileSegmentManagedBuffer也是針對性提供了具體的實現(xiàn)份汗。
更好的理解ManagedBuffer:比如Shuffle BlockManager模塊需要在內(nèi)存中維護本地executor生成的shuffle-map輸出的文件引用,從而可以提供給shuffleFetch進行遠程讀取蝴簇,此時文件表示為FileSegmentManagedBuffer杯活,shuffleFetch遠程調(diào)用FileSegmentManagedBuffer.nioByteBuffer / createInputStream函數(shù)從文件中讀取為Bytes,并進行后面的網(wǎng)絡傳輸军熏。如果已經(jīng)在內(nèi)存中bytes就更好理解了轩猩,比如將一個字符數(shù)組表示為NettyManagedBuffer。
Protocol的表示
協(xié)議是應用層通信的基礎,它提供了應用層通信的數(shù)據(jù)表示均践,以及編碼和解碼的能力晤锹。在Spark Network Common中,繼承AKKA中的定義彤委,將協(xié)議命名為Message鞭铆,它繼承Encodable,提供了encode的能力焦影。
Message根據(jù)請求響應可以劃分為RequestMessage和ResponseMessage兩種车遂;對于Response,根據(jù)處理結(jié)果斯辰,可以劃分為Failure和Success兩種類型舶担;根據(jù)功能的不同,主要劃分為Stream彬呻,ChunkFetch衣陶,Rpc。
Stream消息就是上面提到的ManagedBuffer中的Stream流闸氮,在Spark內(nèi)部剪况,比如SparkContext.addFile操作會在Driver中針對每一個add進來的file / jar會分配唯一的StreamID(file / [filename],jars / [filename])蒲跨;worker通過該StreamID向Driver發(fā)起一個StreamRequest的請求译断,Driver將文件轉(zhuǎn)換為FileSegmentManagedBuffer返回給Worker,這就是StreamMessage的用途之一或悲;
ChunkFetch也有一個類似Stream的概念孙咪,ChunkFetch的對象是“一個內(nèi)存中的Iterator[ManagedBuffer]”,即一組Buffer巡语,每一個Buffer對應一個chunkIndex该贾,整個Iterator[ManagedBuffer]由一個StreamID標識。Client每次的ChunkFetch請求是由(streamId捌臊,chunkIndex)組成的唯一的StreamChunkId,Server端根據(jù)StreamChunkId獲取為一個Buffer并返回給Client兜材; 不管是Stream還是ChunkFetch理澎,在Server的內(nèi)存中都需要管理一組由StreamID與資源之間映射,即StreamManager類曙寡,它提供了getChunk和openStream兩個接口來分別響應ChunkFetch與Stream兩種操作糠爬,并且針對Server的ChunkFetch提供一個registerStream接口來注冊一組Buffer,比如可以將BlockManager中一組BlockID對應的Iterator[ManagedBuffer]注冊到StreamManager举庶,從而支持遠程Block Fetch操作执隧。
Case:對于ExternalShuffleService(一種單獨shuffle服務進程,對其他計算節(jié)點提供本節(jié)點上面的所有shuffle map輸出),它為遠程Executor提供了一種OpenBlocks的RPC接口镀琉,即根據(jù)請求的appid峦嗤,executorid,blockid(appid+executor對應本地一組目錄屋摔,blockid拆封出)從本地磁盤中加載一組FileSegmentManagedBuffer到內(nèi)存烁设,并返回加載后的streamId返回給客戶端,從而支持后續(xù)的ChunkFetch的操作钓试。
RPC是第三種核心的Message装黑,和Stream/ChunkFetch的Message不同,每次通信的Body是類型是確定的弓熏,在rpcHandler可以根據(jù)每種Body的類型進行相應的處理恋谭。 在Spark1.6.*版本中,也正式使用基于Netty的RPC框架來替代Akka挽鞠。
Server的結(jié)構(gòu)
Server構(gòu)建在Netty之上疚颊,它提供兩種模型NIO和Epoll,可以通過參數(shù)(spark.[module].io.mode)進行配置滞谢,最基礎的module就是shuffle串稀,不同的IOMode選型,對應了Netty底層不同的實現(xiàn)狮杨,Server的Init過程中母截,最重要的步驟就是根據(jù)不同的IOModel完成EventLoop和Pipeline的構(gòu)造
EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) {
switch (mode) {
case NIO:
return new NioEventLoopGroup(numThreads, threadFactory);
case EPOLL:
return new EpollEventLoopGroup(numThreads, threadFactory);
}
}
public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
switch(mode) {
case NIO:
return NioServerSocketChannel.class;
case EPOLL:
return EpollServerSocketChannel.class;
}
}
Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
switch (mode) {
case NIO:
return NioServerSocketChannel.class;
case EPOLL:
return EpollServerSocketChannel.class;
}
}
channel.pipeline()
.addLast("encoder", this.encoder)
.addLast("frameDecoder", NettyUtils.createFrameDecoder())
.addLast("decoder", this.decoder)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, this.conf.connectionTimeoutMs() / 1000))
.addLast("handler", channelHandler);
其中,MessageEncoder/Decoder針對網(wǎng)絡包到Message的編碼和解碼橄教,而最為核心就TransportRequestHandler清寇,它封裝了對所有請求/響應的處理;
TransportChannelHandler內(nèi)部實現(xiàn)也很簡單护蝶,它封裝了responseHandler和requestHandler华烟,當從Netty中讀取一條Message以后,根據(jù)判斷路由給相應的responseHandler和requestHandler持灰。
public void handle(RequestMessage request) {
if (request instanceof ChunkFetchRequest) {
this.processFetchRequest((ChunkFetchRequest)request);
} else if (request instanceof RpcRequest) {
this.processRpcRequest((RpcRequest)request);
} else if (request instanceof OneWayMessage) {
this.processOneWayMessage((OneWayMessage)request);
} else {
if (!(request instanceof StreamRequest)) {
throw new IllegalArgumentException("Unknown request type: " + request);
}
this.processStreamRequest((StreamRequest)request);
}
}
public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
if (request instanceof RequestMessage) {
this.requestHandler.handle((RequestMessage)request);
} else {
this.responseHandler.handle((ResponseMessage)request);
}
}
Sever提供的RPC盔夜,ChunkFecth,Stream的功能都是依賴TransportRequestHandler來實現(xiàn)的堤魁;從原理上來說喂链,RPC與ChunkFecth / Stream還是有很大不同的,其中RPC對于TransportRequestHandler來說是功能依賴妥泉,而ChunkFecth / Stream對于TransportRequestHandler來說只是數(shù)據(jù)依賴椭微。
怎么理解?即TransportRequestHandler已經(jīng)提供了ChunkFecth / Stream的實現(xiàn)盲链,只需要在構(gòu)造的時候蝇率,向TransportRequestHandler提供一個streamManager迟杂,告訴RequestHandler從哪里可以讀取到Chunk或者Stream。而RPC需要向TransportRequestHandler注冊一個rpcHandler本慕,針對每個RPC接口進行功能實現(xiàn)排拷,同時RPC與ChunkFecth / Stream都會有同一個streamManager的依賴,因此注入到TransportRequestHandler中的streamManager也是依賴rpcHandler來實現(xiàn)间狂,即rpcHandler中提供了RPC功能實現(xiàn)和streamManager的數(shù)據(jù)依賴攻泼。
Client的結(jié)構(gòu)
Server是通過監(jiān)聽一個端口,注入rpcHandler和streamManager從而對外提供RPC鉴象,ChunkFecth忙菠,Stream的服務,而Client即為一個客戶端類纺弊,通過該類牛欢,可以將一個streamId / chunkIndex對應的ChunkFetch請求,streamId對應的Stream請求淆游,以及一個RPC數(shù)據(jù)包對應的RPC請求發(fā)送到服務端傍睹,并監(jiān)聽和處理來自服務端的響應;其中最重要的兩個類即為TransportClient和TransportResponseHandler分別為上述的“客戶端類”和“監(jiān)聽和處理來自服務端的響應"犹菱。
那么TransportClient和TransportResponseHandler是怎么配合一起完成Client的工作呢拾稳?由TransportClient將用戶的RPC,ChunkFecth腊脱,Stream的請求進行打包并發(fā)送到Server端访得,同時將用戶提供的回調(diào)函數(shù)注冊到TransportResponseHandler,TransportResponseHandler是TransportChannelHandler的一部分陕凹,在TransportChannelHandler接收到數(shù)據(jù)包悍抑,并判斷為響應包以后,將包數(shù)據(jù)路由到TransportResponseHandler中杜耙,在TransportResponseHandler中通過注冊的回調(diào)函數(shù)搜骡,將響應包的數(shù)據(jù)返回給客戶端
Spark Network的功能應用--BlockTransfer&&Shuffle
無論是BlockTransfer還是ShuffleFetch都需要跨executor的數(shù)據(jù)傳輸,在每一個executor里面都需要運行一個Server線程(后面也會分析到佑女,對于Shuffle也可能是一個獨立的ShuffleServer進程存在)來提供對Block數(shù)據(jù)的遠程讀寫服務
在每個Executor里面记靡,都有一個BlockManager模塊,它提供了對當前Executor所有的Block的“本地管理”团驱,并對進程內(nèi)其他模塊暴露getBlockData(blockId: BlockId): ManagedBuffer的Block讀取接口簸呈,但是這里GetBlockData僅僅是提供本地的管理功能,對于跨遠程的Block傳輸店茶,則由NettyBlockTransferService提供服務。
NettyBlockTransferService本身即是Server劫恒,為其他其他遠程Executor提供Block的讀取功能贩幻,同時它即為Client轿腺,為本地其他模塊暴露fetchBlocks的接口,支持通過host/port拉取任何Executor上的一組的Blocks丛楚。
源碼位置 spark-core: org.apache.spark.network.netty
NettyBlockTransferService作為一個Server
NettyBlockTransferService作為一個Server族壳,與Executor或Driver里面其他的服務一樣,在進程啟動時趣些,由SparkEnv初始化構(gòu)造并啟動服務仿荆,在整個運行時的一部分。
SparkEnv.create
val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
val envInstance = new SparkEnv(... blockTransferService ...)
一個Server的構(gòu)造依賴RpcHandler提供RPC的功能注入以及提供streamManager的數(shù)據(jù)注入坏平。對于NettyBlockTransferService拢操,該RpcHandler即為NettyBlockRpcServer,在構(gòu)造的過程中舶替,需要與本地的BlockManager進行管理令境,從而支持對外提供本地BlockMananger中管理的數(shù)據(jù)
RpcHandler提供RPC的功能注入在這里還是屬于比較“簡陋的”,畢竟他是屬于數(shù)據(jù)傳輸模塊顾瞪,Server中提供的chunkFetch和stream已經(jīng)足夠滿足他的功能需要舔庶,那現(xiàn)在問題就是怎么從streamManager中讀取數(shù)據(jù)來提供給chunkFetch和stream進行使用呢?
就是NettyBlockRpcServer作為RpcHandler提供的一個Rpc接口之一:OpenBlocks陈醒,它接受由Client提供一個Blockids列表惕橙,Server根據(jù)該BlockIds從BlockManager獲取到相應的數(shù)據(jù)并注冊到streamManager中,同時返回一個StreamID钉跷,后續(xù)Client即可以使用該StreamID發(fā)起ChunkFetch的操作弥鹦。
message match {
case openBlocks: OpenBlocks =>
val blocks: Seq[ManagedBuffer] =
openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData)
val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
logTrace(s"Registered streamId $streamId with ${blocks.size} buffers")
responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer)
}
NettyBlockTransferService作為一個Client
從NettyBlockTransferService作為一個Server,我們基本可以推測NettyBlockTransferService作為一個Client支持fetchBlocks的功能的基本方法:
- Client將一組Blockid表示為一個openMessage請求尘应,發(fā)送到服務端惶凝,服務針對該組Blockid返回一個唯一的streamId
- Client針對該streamId發(fā)起size(blockids)個fetchChunk操作
override def uploadBlock(
hostname: String,
port: Int,
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
level: StorageLevel): Future[Unit] = {
//發(fā)出openMessage請求
client.sendRpc(openMessage.toByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
streamHandle = (StreamHandle)response;//獲取streamId
//針對streamid發(fā)出一組fetchChunk
for (int i = 0; i < streamHandle.numChunks; i++) {
client.fetchChunk(streamHandle.streamId, i, chunkCallback);
}
}
})
result.future
}
同時,為了提高服務端穩(wěn)定性犬钢,針對fetchBlocks操作NettyBlockTransferService提供了非重試版本和重試版本的BlockFetcher苍鲜,分別為OneForOneBlockFetcher和RetryingBlockFetcher,通過參數(shù)(spark.[module].io.maxRetries)進行配置玷犹,默認是重試3次
在Spark混滔,Block有各種類型,可以是ShuffleBlock歹颓,也可以是BroadcastBlock等等坯屿,對于ShuffleBlock的Fetch,除了由Executor內(nèi)部的NettyBlockTransferService提供服務以外巍扛,也可以由外部的ShuffleService來充當Server的功能领跛,并由專門的ExternalShuffleClient來與其進行交互,從而獲取到相應Block數(shù)據(jù)撤奸。功能的原理和實現(xiàn)吠昭,基本一致喊括,但是問題來了,為什么需要一個專門的ShuffleService服務呢矢棚?主要原因還是為了做到任務隔離郑什,即減輕因為fetch帶來對Executor的壓力,讓其專心的進行數(shù)據(jù)的計算蒲肋。
其實外部的ShuffleService最終是來自Hadoop的AuxiliaryService概念蘑拯,AuxiliaryService為計算節(jié)點NodeManager常駐的服務線程,早期的MapReduce是進程級別的調(diào)度兜粘,ShuffleMap完成shuffle文件的輸出以后申窘,即立即退出,在ShuffleReduce過程中由誰來提供文件的讀取服務呢妹沙?即AuxiliaryService偶洋,每一個ShuffleMap都會將自己在本地的輸出,注冊到AuxiliaryService距糖,由AuxiliaryService提供本地數(shù)據(jù)的清理以及外部讀取的功能玄窝。
在目前Spark中,也提供了這樣的一個AuxiliaryService:YarnShuffleService悍引,但是對于Spark不是必須的恩脂,如果你考慮到需要“通過減輕因為fetch帶來對Executor的壓力”,那么就可以嘗試嘗試趣斤。
同時俩块,如果啟用了外部的ShuffleService,對于shuffleClient也不是使用上面的NettyBlockTransferService浓领,而是專門的ExternalShuffleClient玉凯,功能邏輯基本一致!
Spark Network的功能應用--新的RPC框架
Akka的通信模型是基于Actor联贩,一個Actor可以理解為一個Service服務對象漫仆,它可以針對相應的RPC請求進行處理,如下所示泪幌,定義了一個最為基本的Actor:
class HelloActor extends Actor {
def receive = {
case "hello" => println("world")
case _ => println("huh?")
}
}
Actor內(nèi)部只有唯一一個變量(當然也可以理解為函數(shù)了)盲厌,即Receive,它為一個偏函數(shù)祸泪,通過case語句可以針對Any信息可以進行相應的處理吗浩,這里Any消息在實際項目中就是消息包。
另外一個很重要的概念就是ActorSystem没隘,它是一個Actor的容器懂扼,多個Actor可以通過name->Actor的注冊到Actor中,在ActorSystem中可以根據(jù)請求不同將請求路由給相應的Actor右蒲。ActorSystem和一組Actor構(gòu)成一個完整的Server端微王,此時客戶端通過host:port與ActorSystem建立連接屡限,通過指定name就可以相應的Actor進行通信,這里客戶端就是ActorRef炕倘。所有Akka整個RPC通信系列是由Actor,ActorRef翰撑,ActorSystem組成罩旋。
Spark基于這個思想在上述的Network的基礎上實現(xiàn)一套自己的RPC Actor模型,從而取代Akka眶诈。其中RpcEndpoint對應Actor涨醋,RpcEndpointRef對應ActorRef,RpcEnv即對應了ActorSystem逝撬。
private[spark] trait RpcEndpoint {
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException()
}
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException())
}
//onStart(),onStop()
}
RpcEndpoint與Actor一樣浴骂,不同RPC Server可以根據(jù)業(yè)務需要指定相應receive/receiveAndReply的實現(xiàn),在Spark內(nèi)部現(xiàn)在有N多個這樣的Actor宪潮,比如Executor就是一個Actor溯警,它處理來自Driver的LaunchTask/KillTask等消息。
RpcEnv相對于ActorSystem:
- 首先它作為一個Server狡相,它通過NettyRpcHandler來提供了Server的服務能力
- 其次它作為RpcEndpoint的容器梯轻,它提供了setupEndpoint(name,endpoint)接口尽棕,從而實現(xiàn)將一個RpcEndpoint以一個Name對應關系注冊到容器中喳挑,從而通過Server對外提供Service
- 最后它作為Client的適配器,它提供了setupEndpointRef/setupEndpointRefByURI接口滔悉,通過指定Server端的Host和PORT伊诵,并指定RpcEndpointName,從而獲取一個與指定Endpoint通信的引用回官。
RpcEndpointRef即為與相應Endpoint通信的引用曹宴,它對外暴露了send/ask等接口,實現(xiàn)將一個Message發(fā)送到Endpoint中孙乖。
這就是新版本的RPC框架的基本功能浙炼,它的實現(xiàn)基本上與Akka無縫對接,業(yè)務的遷移的功能很小唯袄,目前基本上都全部遷移完了弯屈。
RpcEnv內(nèi)部實現(xiàn)原理
RpcEnv不僅從外部接口與Akka基本一致,在內(nèi)部的實現(xiàn)上恋拷,也基本差不多资厉,都是按照MailBox的設計思路來實現(xiàn)的;
RpcEnv即充當著Server蔬顾,同時也為Client內(nèi)部實現(xiàn)宴偿。
當作為Server湘捎,RpcEnv會初始化一個Server,并注冊NettyRpcHandler窄刘。RpcHandler的receive接口負責對每一個請求進行處理窥妇,一般情況下,簡單業(yè)務可以在RpcHandler直接完成請求的處理娩践,但是考慮一個RpcEnv的Server上會掛載了很多個RpcEndpoint活翩,每個RpcEndpoint的RPC請求頻率不可控,因此需要對一定的分發(fā)機制和隊列來維護這些請求翻伺,其中Dispatcher為分發(fā)器材泄,InBox即為請求隊列;
在將RpcEndpoint注冊到RpcEnv過程中吨岭,也間接的將RpcEnv注冊到Dispatcher分發(fā)器中拉宗,Dispatcher針對每個RpcEndpoint維護一個InBox,在Dispatcher維持一個線程池(線程池大小默認為系統(tǒng)可用的核數(shù)辣辫,當然也可以通過spark.rpc.netty.dispatcher.numThreads進行配置)旦事,線程針對每個InBox里面的請求進行處理。當然實際的處理過程是由RpcEndpoint來完成络它。
其次RpcEnv也完成Client的功能實現(xiàn)族檬,RpcEndpointRef是以RpcEndpoint為單位,即如果一個進程需要和遠程機器上N個RpcEndpoint服務進行通信化戳,就對應N個RpcEndpointRef(后端的實際的網(wǎng)絡連接是公用单料,這個是TransportClient內(nèi)部提供了連接池來實現(xiàn)的),當調(diào)用一個RpcEndpointRef的ask/send等接口時候点楼,會將把“消息內(nèi)容+RpcEndpointRef+本地地址”一起打包為一個RequestMessage扫尖,交由RpcEnv進行發(fā)送。注意這里打包的消息里面包括RpcEndpointRef本身是很重要的掠廓,從而可以由Server端識別出這個消息對應的是哪一個RpcEndpoint换怖。
和發(fā)送端一樣,在RpcEnv中蟀瞧,針對每個remote端的host:port維護一個隊列沉颂,即OutBox,RpcEnv的發(fā)送僅僅是把消息放入到相應的隊列中悦污,但是和發(fā)送端不一樣的是:在OutBox中沒有維護一個所謂的線程池來定時清理OutBox铸屉,而是通過一堆synchronized來實現(xiàn)的,add之后立刻消費切端。
摘自:Github/ColZer