0. RpcEnv
整個(gè)通信的核心哆姻,為通信構(gòu)建環(huán)境提针,啟動(dòng)server; 建立RpcEndpoint,所有RpcEndpoint(提供某類服務(wù))都需要注冊(cè)到RpcEnv; 消息路由徘公,也就是整個(gè)RpcEndpoint的通信都交給RpcEnv, 屏蔽了rpc調(diào)用與本地調(diào)用晦攒,讓上層專注endpiont的設(shè)計(jì)狐肢,通信細(xì)節(jié)全部封裝到RpcEnv弃秆。目前唯一的實(shí)現(xiàn)就是NettyRpcEnv,以Netty作為rpc的基礎(chǔ)丧诺。
NettyRpcEnv
[soark-core] org.apache.spark.rpc.netty.NettyRpcEnv
class NettyRpcEnv extends RpcEnv with Logging {
val role //diver or executor
val transportConf: TransportConf //spark.rpc.*
val dispatcher: Dispatcher //
val streamManager: NettyStreamManager //
val transportContext: TransportConext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))//
val clientFactory: TransportClientFactory //
//A separate client factory for file downloads. This avoids using the same RPC handler as
//the main RPC context, so that events caused by these clients are kept isolated from the main RPC traffic.
var fileDownloadFactory: TransportClientFactory //文件下載專用入桂,避免影響
val timeoutScheduler: SchedulerExecutorService
// Because TransportClientFactory.createClient is blocking, we need to run it in this thread pool
// to implement non-blocking send/ask.
val clientConnectionExecutor: ThreadPoolExecutor
var server: TransportServer
val outboxes: ConcurrentHashMap[RpcAddress, Outbox]
lazy val address: RpcAddress
}
RpcEndpointRef作為通信的發(fā)起端,關(guān)聯(lián)某個(gè)RpcEndpoint锅必,通過ref進(jìn)行通信, ref用uri表示為 :
Remote: spark://{endpointName}@{ip}:{port}
Client: spark-client://{endpointName}
dispatcher就是根據(jù)不同的endpoint name進(jìn)行消息分發(fā)事格,交給對(duì)應(yīng)的endpoint進(jìn)行處理。
2. Client端的建立與通信
NettyRpcEnv
在driver和executor上都會(huì)創(chuàng)建搞隐,我們按照一次請(qǐng)求來分析源碼
這里我們介紹一個(gè)executor與DriveEndpoint通信獲取SparkAppConfig的過程驹愚,此時(shí)driver端建立的TransportServer是server, executor作為client發(fā)起請(qǐng)求獲取配置信息。
DriverEndpoint是在初始化SparkContext里創(chuàng)建的劣纲。具體為CoarseGrainedSchedulerBackend
的字段中構(gòu)造的
[spark-core] org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
extends ExecutorAllocationClient with SchedulerBackend with Logging {
...
//setup driverEndpoint
val driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint())
...
}
請(qǐng)求的發(fā)起
CoarseGrainedExecutorBackend作為spark的executor啟動(dòng)類逢捺。在啟動(dòng)后時(shí)需要獲取SparkAppConfig
[spark-core] org.apache.spark.executor.CoarseGrainedExecutorBackend
object CoarseGrainedExecutorBackend extends Logging {
def main(args: Array[String]): Unit = {
//匿名函數(shù),創(chuàng)建backend對(duì)象
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.resourcesFileOpt, resourceProfile)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
System.exit(0)
}
//
def run(
arguments: Arguments,
backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend): Unit = {
...
//這是一個(gè)臨時(shí)的NettyRpcEnv用于獲取driver的RpcEndpointRef
val fetcher = RpcEnv.create(
"driverPropsFetcher",
arguments.bindAddress,
arguments.hostname,
-1,
executorConf,
new SecurityManager(executorConf),
numUsableCores = 0,
clientMode = true)
...
//這里構(gòu)造一個(gè)driver的rpcEndpointRef
// spark://CoarseGrainedScheduler@{ip}:{port}
driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
...
//通過ref進(jìn)行rpc調(diào)用獲取SparkAppConfig
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(arguments.resourceProfileId))
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId))
fetcher.shutdown()
...
}
}
driver.askSync是一個(gè)同步請(qǐng)求癞季。等待結(jié)果返回劫瞳。ref的請(qǐng)求最終都委托給了NettyRpcEnv來做處理
private[netty] def askAbortable[T: ClassTag](
message: RequestMessage, timeout: RpcTimeout): AbortableRpcFuture[T] = {
...
//此時(shí)我們?cè)趀xecutor
//remoteAddr是diver地址不為null, address是null
if (remoteAddr == address) {
val p = Promise[Any]()
p.future.onComplete {
case Success(response) => onSuccess(response)
case Failure(e) => onFailure(e)
}(ThreadUtils.sameThread)
dispatcher.postLocalMessage(message, p)
} else {
//注意各種消息的包裝,不同的消息包裝绷柒,在不同的層次中使用rpcOutboxMessage
val rpcMessage = RpcOutboxMessage(message.serialize(this),
onFailure,
//處理返回值的回調(diào)志于,在netty Channel的Handler中調(diào)用
(client, response) => onSuccess(deserialize[Any](client, response)))
rpcMsg = Option(rpcMessage)
//核心的方法,把消息加入到Outbox中
postToOutbox(message.receiver, rpcMessage)
...
}
}
請(qǐng)求消息的發(fā)送
Outbox
废睦,每一個(gè)rpc地址都維護(hù)了這樣一個(gè)消息隊(duì)列伺绽,所有發(fā)送到同一個(gè)RpcAddress的消息都放到一個(gè)隊(duì)列中,等待TransportClient發(fā)送到對(duì)應(yīng)的server。
[spark-core] org.apache.spark.rpc.netty.Outbox
class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) {
val messages = new java.util.LinkedList[OutboxMessage]
var client: TransportClient = null
...
//通過該方法觸發(fā)消息真正發(fā)送奈应。
//方法可能被多線程調(diào)用澜掩,僅有一個(gè)線程能執(zhí)行真正的發(fā)送消息
private def drainOutbox(): Unit = {
var message: OutboxMessage = null
synchronized {
if (stopped) {
return
}
//有線程在啟動(dòng)一個(gè)鏈接了,當(dāng)前線程就不需要做任何事了
if (connectFuture != null) {
// We are connecting to the remote address, so just exit
return
}
//鏈接還沒有建立杖挣,通過提交一個(gè)后臺(tái)線程創(chuàng)建TransportClient
//lauchConnectTask創(chuàng)建好好client后會(huì)再次調(diào)用drainOutbox肩榕,也就是當(dāng)前線程也可以不在管了。由創(chuàng)建鏈接的線程繼續(xù)往后執(zhí)行
if (client == null) {
// There is no connect task but client is null, so we need to launch the connect task.
launchConnectTask()
return
}
if (draining) {
// There is some thread draining, so just exit
return
}
message = messages.poll()
if (message == null) {
return
}
draining = true
}
//取消息惩妇,直到隊(duì)列被消費(fèi)完
while (true) {
try {
val _client = synchronized { client }
if (_client != null) {
message.sendWith(_client)
} else {
assert(stopped)
}
} catch {
case NonFatal(e) =>
handleNetworkFailure(e)
return
}
synchronized {
if (stopped) {
return
}
message = messages.poll()
if (message == null) {
draining = false
return
}
}
}
}
}
TransportClient是對(duì)Netty Channel的封裝株汉,所以調(diào)用message.sendWith(_client),就進(jìn)入了Netty發(fā)送消息的范圍了屿附。
TransportClient是通過TransportClientFactory進(jìn)行創(chuàng)建的,TransportClientFactory維護(hù)了該進(jìn)程的所有的TransportClient郎逃,同時(shí)為每個(gè)RpcAddress創(chuàng)建了一個(gè)鏈接池。
[common/network-common] org.apache.spark.network.client.TransportClientFactory
public class TransportClientFactory implements Closeable {
//對(duì)外暴露的方法挺份,先看有沒有緩存的鏈接,沒有就創(chuàng)建一個(gè)
public TransportClient createClient(String remoteHost, int remotePort){
...
// Create the ClientPool if we don't have it yet.
ClientPool clientPool = connectionPool.get(unresolvedAddress);
if (clientPool == null) {
connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
clientPool = connectionPool.get(unresolvedAddress);
}
...// 這里省略了池里有對(duì)象可用贮懈,方法直接返回
//random的位置沒有鏈接匀泊,新建立一個(gè)
synchronized (clientPool.locks[clientIndex]) {
cachedClient = clientPool.clients[clientIndex];
...//double check
//create new client
clientPool.clients[clientIndex] = createClient(resolvedAddress);
return clientPool.clients[clientIndex];
}
}
//建立TransportClient,netty client
private TransportClient createClient(InetSocketAddress address) {
//熟悉的netty style
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(socketChannelClass)
// Disable Nagle's Algorithm since we don't want packets to wait
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
.option(ChannelOption.ALLOCATOR, pooledAllocator);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
//將處理封裝在TransportChannelHandler中
//context為TransportContext朵你,統(tǒng)一封裝了server/client的channel handler各聘。
//這里我只需要知道,新的socket建立后抡医,處理消息就交給TransportChannelHandler了
TransportChannelHandler clientHandler = context.initializePipeline(ch);
clientRef.set(clientHandler.getClient());
channelRef.set(ch);
}
});
ChannelFuture cf = bootstrap.connect(address);
...//等待鏈接建立完成
return client;
}
}
從上面的代碼中可以知道躲因,這里的Message即為RpcOutboxMessage,該類定義在Ouxbox文件里面。
case class RpcOutboxMessage(
content: ByteBuffer,
_onFailure: (Throwable) => Unit,
_onSuccess: (TransportClient, ByteBuffer) => Unit)
//messge是消息載體忌傻,同時(shí)也是一個(gè)callBack,會(huì)在請(qǐng)求返回時(shí)進(jìn)行調(diào)用
extends OutboxMessage with RpcResponseCallback with Logging {
private var client: TransportClient = _
private var requestId: Long = _
//通過Transportclient發(fā)送消息
override def sendWith(client: TransportClient): Unit = {
this.client = client
this.requestId = client.sendRpc(content, this)
}
}
來到TransportClient中
[common/network-common] org.apache.spark.network.client.TransportClient
public class TransportClient implements Closeable {
private final Channel channel;
private final TransportResponseHandler handler;
@Nullable private String clientId;
...
//唯一的構(gòu)造函數(shù)大脉,Channel就是netty的channel
public TransportClient(Channel channel, TransportResponseHandler handler) {
this.channel = Preconditions.checkNotNull(channel);
this.handler = Preconditions.checkNotNull(handler);
this.timedOut = false;
}
...
//
public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}
long requestId = requestId();
//這里也是重點(diǎn),callback是處理返回值的
//hanlder是與client一同創(chuàng)建的
handler.addRpcRequest(requestId, callback);
//這里把callbakc只處理onFail的情況
RpcChannelListener listener = new RpcChannelListener(requestId, callback);
channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
.addListener(listener);
return requestId;
}
}
到這里水孩,請(qǐng)求的message已經(jīng)通過netty發(fā)送出去了镰矿。接下看我們看看怎么處理消息返回的的
接收返回消息
Netty client處理消息返回,即在BootStrap上添加handler俘种,這個(gè)處理就在TransportClient創(chuàng)建的過程中
TransportClientFactory#createClient
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
TransportChannelHandler clientHandler = context.initializePipeline(ch);
clientRef.set(clientHandler.getClient());
channelRef.set(ch);
}
});
這里的context
即為NettyRpcEnv
的成員變量
val transportContext: TransportConext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))
這里就需要進(jìn)一步介紹TransportContext秤标,
TranportContext用于創(chuàng)建創(chuàng)建TransportClientFactory, TrasnportSever,以及為配置Netty的ChannelHandler
[common/network-common] org.apache.spark.network.TransportContext
public class TransportContext implements Closeable {
...
private final TransportConf conf;
//rpcHandler,處理request信息
private final RpcHandler rpcHandler;
//client
public TransportClientFactory createClientFactory(...);
//server
public TransportServer createServer(...)
//配置ChannelHandler
public TransportChannelHandler initializePipeline(SocketChannel channel) {
return initializePipeline(channel, rpcHandler);
}
public TransportChannelHandler initializePipeline(
SocketChannel channel,
RpcHandler channelRpcHandler) {
try {
//這里的handler是封裝了server / client兩端的handler
TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
ChannelPipeline pipeline = channel.pipeline()
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast("decoder", DECODER)
.addLast("idleStateHandler",
new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
// would require more logic to guarantee if this were not part of the same event loop.
.addLast("handler", channelHandler);
// Use a separate EventLoopGroup to handle ChunkFetchRequest messages for shuffle rpcs.
if (chunkFetchWorkers != null) {
ChunkFetchRequestHandler chunkFetchHandler = new ChunkFetchRequestHandler(
channelHandler.getClient(), rpcHandler.getStreamManager(),
conf.maxChunksBeingTransferred(), true /* syncModeEnabled */);
pipeline.addLast(chunkFetchWorkers, "chunkFetchHandler", chunkFetchHandler);
}
return channelHandler;
} catch (RuntimeException e) {...}
}
/**
* Creates the server- and client-side handler which is used to handle both RequestMessages and
* ResponseMessages. The channel is expected to have been successfully created, though certain
* properties (such as the remoteAddress()) may not be available yet.
*/
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
boolean separateChunkFetchRequest = conf.separateChunkFetchRequest();
ChunkFetchRequestHandler chunkFetchRequestHandler = null;
if (!separateChunkFetchRequest) {
chunkFetchRequestHandler = new ChunkFetchRequestHandler(
client, rpcHandler.getStreamManager(),
conf.maxChunksBeingTransferred(), false /* syncModeEnabled */);
}
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler);
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this);
}
}
ChannelHandler確定了,就可以看到從Channel怎么處理數(shù)據(jù)了宙刘。
[common/network-common] org.apache.spark.network.server.TransportChannelHandler
public class TransportChannelHandler extends SimpleChannelInboundHandler<Message> {
private final TransportClient client;
private final TransportResponseHandler responseHandler;
private final TransportRequestHandler requestHandler;
private final TransportContext transportContext;
...
//server/client端處理消息
public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
if (request instanceof RequestMessage) {
requestHandler.handle((RequestMessage) request);
} else if (request instanceof ResponseMessage) {
responseHandler.handle((ResponseMessage) request);
} else {
ctx.fireChannelRead(request);
}
}
}
這里分析的是消息的返回苍姜,也就是進(jìn)入到TransportResponseHandler
[common/network-common] org.apache.spark.network.server.TransportResponseHandler#handle
public void handle(ResponseMessage message) throws Exception {
...//我們目前只關(guān)注executor請(qǐng)求sparkAppConfig
else if (message instanceof RpcResponse) {
RpcResponse resp = (RpcResponse) message;
//在TransportClient#sendRpc中,保存了callBack與requestId的映射悬包,現(xiàn)在就是用到callback的時(shí)候
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
resp.requestId, getRemoteAddress(channel), resp.body().size());
} else {
outstandingRpcs.remove(resp.requestId);
try {
//通知消息返回. 此時(shí)是在netty的worker線程
//這里的listener就是RpcOutMessage本身衙猪。
listener.onSuccess(resp.body().nioByteBuffer());
} finally {
resp.body().release();
}
}
}
}
到這里,消息的返回就回到了RpcOutboxMessage創(chuàng)建的地方,即回到NettyRpcEnv#askAbortable
屈嗤,進(jìn)一步查看回調(diào)如何處理
private[netty] def askAbortable[T: ClassTag](
message: RequestMessage, timeout: RpcTimeout): AbortableRpcFuture[T] = {
...
def onSuccess(reply: Any): Unit = reply match {
case RpcFailure(e) => onFailure(e)
case rpcReply =>
//這里Future的狀態(tài)就是success.
if (!promise.trySuccess(rpcReply)) {
logWarning(s"Ignored message: $reply")
}
}
...
//創(chuàng)建并定義了回調(diào)
val rpcMessage = RpcOutboxMessage(message.serialize(this),
onFailure,
(client, response) => onSuccess(deserialize[Any](client, response)))
}
進(jìn)一步的潘拨,driverEndpointRef#askSync
中的awaitResult就可以從阻塞返回了。
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
val future = ask[T](message, timeout)
timeout.awaitResult(future)
}
在executor端就可以拿到SparkAppConfig了饶号,至此client端完成一次請(qǐng)求的通信就完成了铁追。稍后分析,server端茫船,接收到消息后琅束,如何路由到正確的RpcEndpoint,以及處理請(qǐng)求后如何返回。
通信作為驅(qū)動(dòng)整個(gè)應(yīng)用運(yùn)作的核心算谈,包括信息交換涩禀,數(shù)據(jù)傳輸,信號(hào)傳播等都依賴通信然眼。所以所以spark通信作為源碼分析的開篇艾船。
作為大數(shù)據(jù)從業(yè)新人,希望向各位前輩學(xué)習(xí)高每,如果理解有不恰當(dāng)?shù)挠炱瘢涣咧附蹋?/p>
注:源碼基于Apache Spark 3.0
作者:pokerwu
本作品采用知識(shí)共享署名-非商業(yè)性使用 4.0 國(guó)際許可協(xié)議進(jìn)行許可。