主要想回答兩個問題:
- map端(shuffle-write)如何對數(shù)據(jù)進行分片?
- reduce端(shuffle-read)如何讀取數(shù)據(jù)哼鬓?
ShuffleMapTask中,指定此task運算真對上游RDD的那個partition棍掐,即map端的partition票唆,writer.write操作的時候缕探,根據(jù)RDD的partitioner生成新的partitionId缅疟,然后寫入,完成shuffle-write淮菠,下游shuffle-read的時候男公,拉取相應得partition數(shù)據(jù)即可;
下面插入一段說一下Spark中netty block server的實現(xiàn):
- NettyRpcEnv :: TransportContext-> createServer -> new TransportServer
- TransportServer中appRpcHandler就是上層處理邏輯合陵,默認沒有安全配置的情況下枢赔,bootstraps集合為空;
- TransportServer -> init 初始化bootstrap拥知,其中childHandler定義了對請求的處理邏輯踏拜,即context.initializePipeline(ch, rpcHandler);
- TransportContext :: initializePipeLine 定義了處理請求的pipeline,pipeline中包括對req低剔,rap的encoder速梗,decoder,TransportChannelHandler户侥;
- TransportChannelHandler :: channelRead0 根據(jù)message的不同,分別調用requestHandler和responseHandler進行處理峦嗤,上層的RpcHandler就包含在RequestHandler中蕊唐;
當reduce端讀取數(shù)據(jù)的時候,ShuffleBlockFetcherIterator :: sendRequest 調用 NettyBlockTransferService :: fetchBlocks 調用OneForOneBlockFetcher::start 首先調用TransportClient :: sendRpcSync 發(fā)送OpenBlocks發(fā)送到上面提到的netty block server烁设,然后發(fā)送ChunkFetchRequest替梨,獲取對應的chunk钓试,這里面的chunk其實就是一個一個的block,一個(shuffleId, mapId, bucketId(reduceId))唯一確定一個block副瀑,也即下游RDD的一個partition弓熏;
shuffle-read其實是從上游executor以block為單位獲取數(shù)據(jù),這里就遇到了一個問題糠睡,如果數(shù)據(jù)分布不均勻挽鞠,導致下游某個partition過大,即這個block過大狈孔,就會出現(xiàn)OOM信认,Netty會報錯direct buffer out of memory;
上面說的OOM是Netty處理數(shù)據(jù)時堆外內存的OOM均抽,如果限制使用堆外內存(為Executor增加配置-Dio.netty.noUnsafe=true嫁赏,就可以讓shuffle不使用堆外內存),會報堆內內存OOM油挥,java.lang.OutOfMemoryError: Java heap space潦蝇;
如何解決?
其實在對Block處理過程中深寥,無論是Client端還是Server端攘乒,都是以ManagedBuffer來處理的,具體實現(xiàn)類有FileSegmentManagedBuffer翩迈,NettyManagedBuffer等持灰,Server端收到請求之后,會將返回的Block封裝在FileSegmentmanagedBuffer负饲,這個類內部不cache數(shù)據(jù)堤魁,提供從文件中讀取block data的方法,但是過rpc server時通過encoder會進行封裝返十,從FIleChannel零拷貝寫入SocketChannel妥泉,具體實現(xiàn)就是在MessageEncoder里面將FileSegmentBuffer converToNetty,其實生成時FileRegion洞坑,后面封裝到MessageWithHeader也是FileRegion盲链,寫出到List<Object> out,Netty會調用FileRegion中的transferTo迟杂,將內容寫到目標channel刽沾,寫入是直接調用file.transfer,實現(xiàn)零拷貝排拷;
所以是否可以嘗試添加一個新的協(xié)議侧漓,在OneForOneBlockFetcher中,判斷监氢,如果一個block小于某值布蔗,比如100M藤违,使用原來的方式fetch數(shù)據(jù),否則纵揍,服務端收到請求之后返回數(shù)據(jù)流顿乒,客戶端收到數(shù)據(jù)流之后,將數(shù)據(jù)寫到本地文件泽谨,形成新的FileSegmentManagedBuffer璧榄,供后續(xù)處理,對比原來的實現(xiàn)隔盛,就是將客戶端直接處理NettyManagedBuffer變成直接處理FileSegmentManagedBuffer犹菱;