Flink源碼分析系列文檔目錄
請點擊:Flink 源碼分析系列文檔目錄
簡介
本篇為大家分析Flink底層RPC調(diào)用的執(zhí)行過程和原理频轿。Flink RPC使用了Akka Actor框架。建議閱讀前需要Akka Actor的基礎(chǔ)知識居扒。
Flink為了簡化Actor的使用蝙云,對Actor做了一系列封裝佃却。定義了如下幾個重要的接口或類:
- RpcService:定義了RPC服務(wù)端(調(diào)用和被調(diào)用端)的一系列行為经备,遠(yuǎn)程調(diào)用需要指定地址和端口號捻爷。包含服務(wù)的啟停疑苔,和連接其他遠(yuǎn)程的RpcService镜豹。
- AkkaRpcService:
RpcService
的實現(xiàn)類傲须。 - RpcServer:RPC服務(wù),它自己也是個
RpcGateway
趟脂。 - AkkaInvocationHandler:
RpcServer
的實現(xiàn)類泰讽。RpcService
連接其他遠(yuǎn)程RpcService
返回的實際上是一個代理類。這個代理類的真實執(zhí)行邏輯位于AkkaInvocationHandler
。 - RpcGateway:RPC網(wǎng)關(guān)已卸,調(diào)用其他
RpcService
的入口佛玄。一個RpcService
連接其他遠(yuǎn)程RpcService
返回的對象是RpcGateway
類型。通常RpcGateway
還實現(xiàn)了其他接口累澡,這個接口和遠(yuǎn)端被調(diào)用的RpcService
實現(xiàn)的接口相同梦抢。上面說過,RpcService
連接其他遠(yuǎn)程RpcService
返回的實際上是一個代理類愧哟,我們調(diào)用這個代理類的時候奥吩,底層會通過Akka調(diào)用遠(yuǎn)程RpcService
的同名方法。 - RpcEndpoint:RPC被調(diào)用端蕊梧,需要持有一個RpcService霞赫,并且實現(xiàn)自己的業(yè)務(wù)邏輯接口,以供
RpcGateway
遠(yuǎn)程調(diào)用的時候執(zhí)行肥矢。
Flink封裝后的Akka使用起來非常簡單端衰。我們可以參考Flink單元測試RemoteAkkaRpcActorTest
的canRespondWithSerializedValueRemotely
方法,它通過remoteGateway
遠(yuǎn)程調(diào)用AkkaRpcActorTest.SerializedValueRespondingEndpoint
的getSerializedValueSynchronously
方法甘改。代碼如下所示:
@Test
public void canRespondWithSerializedValueRemotely() throws Exception {
try (final AkkaRpcActorTest.SerializedValueRespondingEndpoint endpoint =
new AkkaRpcActorTest.SerializedValueRespondingEndpoint(rpcService)) {
endpoint.start();
final AkkaRpcActorTest.SerializedValueRespondingGateway remoteGateway =
otherRpcService
.connect(
endpoint.getAddress(),
AkkaRpcActorTest.SerializedValueRespondingGateway.class)
.join();
assertThat(
remoteGateway.getSerializedValueSynchronously(),
equalTo(AkkaRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE));
final CompletableFuture<SerializedValue<String>> responseFuture =
remoteGateway.getSerializedValue();
assertThat(
responseFuture.get(),
equalTo(AkkaRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE));
}
}
單元測試中如何創(chuàng)建RpcService
的方法沒有貼出旅东。
本篇從TaskManagerRunner
創(chuàng)建RpcService
開始,分析Flink封裝Akka Actor的方法和整個調(diào)用流程十艾。
RpcService
TaskManager創(chuàng)建RpcService
TaskManager的createRpcService
方法根據(jù)Flink的配置文件和高可用服務(wù)玉锌,創(chuàng)建出RpcService。
TaskManagerRunner
的createRpcService
方法如下:
@VisibleForTesting
static RpcService createRpcService(
final Configuration configuration, final HighAvailabilityServices haServices)
throws Exception {
checkNotNull(configuration);
checkNotNull(haServices);
return AkkaRpcServiceUtils.createRemoteRpcService(
configuration,
determineTaskManagerBindAddress(configuration, haServices),
configuration.getString(TaskManagerOptions.RPC_PORT),
configuration.getString(TaskManagerOptions.BIND_HOST),
configuration.getOptional(TaskManagerOptions.RPC_BIND_PORT));
}
創(chuàng)建RpcService
的邏輯在AkkaRpcServiceUtils
工具類中疟羹。接下來我們談一下RpcService
和它的創(chuàng)建過程主守。
RpcService定義
負(fù)責(zé)使用Akka Actor執(zhí)行RPC。擁有一個子類AkkaRpcService
榄融。一個進程擁有一個RpcService
参淫。用于統(tǒng)籌RPC調(diào)用服務(wù),連接到本地Endpoint啟動服務(wù)愧杯,供遠(yuǎn)端調(diào)用(RpcEndpoint
是被調(diào)用端)涎才,或者連接到遠(yuǎn)程RPC服務(wù),創(chuàng)建出一個RpcGateway
(調(diào)用端)力九,從而可以調(diào)用遠(yuǎn)端耍铜。
RpcService
接口代碼如下所示:
public interface RpcService {
/**
* Return the hostname or host address under which the rpc service can be reached. If the rpc
* service cannot be contacted remotely, then it will return an empty string.
*
* @return Address of the rpc service or empty string if local rpc service
*/
// 獲取RPC服務(wù)的地址
// 如果是本地RPC服務(wù)的話,返回空
String getAddress();
/**
* Return the port under which the rpc service is reachable. If the rpc service cannot be
* contacted remotely, then it will return -1.
*
* @return Port of the rpc service or -1 if local rpc service
*/
// 返回RPC服務(wù)端口號
// 如果是本地RPC服務(wù)跌前,返回-1
int getPort();
/**
* Connect to a remote rpc server under the provided address. Returns a rpc gateway which can be
* used to communicate with the rpc server. If the connection failed, then the returned future
* is failed with a {@link RpcConnectionException}.
*
* @param address Address of the remote rpc server
* @param clazz Class of the rpc gateway to return
* @param <C> Type of the rpc gateway to return
* @return Future containing the rpc gateway or an {@link RpcConnectionException} if the
* connection attempt failed
*/
// 根據(jù)提供的地址棕兼,連接到遠(yuǎn)程RPC服務(wù)
// 返回C類型RPC網(wǎng)關(guān),用于和遠(yuǎn)端通信
<C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);
/**
* Connect to a remote fenced rpc server under the provided address. Returns a fenced rpc
* gateway which can be used to communicate with the rpc server. If the connection failed, then
* the returned future is failed with a {@link RpcConnectionException}.
*
* @param address Address of the remote rpc server
* @param fencingToken Fencing token to be used when communicating with the server
* @param clazz Class of the rpc gateway to return
* @param <F> Type of the fencing token
* @param <C> Type of the rpc gateway to return
* @return Future containing the fenced rpc gateway or an {@link RpcConnectionException} if the
* connection attempt failed
*/
// 創(chuàng)建一個具有Fence功能的RPC網(wǎng)關(guān)
// Fence是防止腦裂的機制抵乓,我們在AkkaRpcActor分析Fencing機制原理
<F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
String address, F fencingToken, Class<C> clazz);
/**
* Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint.
*
* @param rpcEndpoint Rpc protocol to dispatch the rpcs to
* @param <C> Type of the rpc endpoint
* @return Self gateway to dispatch remote procedure calls to oneself
*/
// 啟動RPC服務(wù)伴挚,將接收到的遠(yuǎn)程請求發(fā)送給rpcEndpoint處理
<C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint);
/**
* Fence the given RpcServer with the given fencing token.
*
* <p>Fencing the RpcServer means that we fix the fencing token to the provided value. All RPCs
* will then be enriched with this fencing token. This expects that the receiving RPC endpoint
* extends {@link FencedRpcEndpoint}.
*
* @param rpcServer to fence with the given fencing token
* @param fencingToken to fence the RpcServer with
* @param <F> type of the fencing token
* @return Fenced RpcServer
*/
// 和上面的方法一樣靶衍,只不過啟用了防腦裂功能
<F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken);
/**
* Stop the underlying rpc server of the provided self gateway.
*
* @param selfGateway Self gateway describing the underlying rpc server
*/
// 停止RPC服務(wù)
void stopServer(RpcServer selfGateway);
/**
* Trigger the asynchronous stopping of the {@link RpcService}.
*
* @return Future which is completed once the {@link RpcService} has been fully stopped.
*/
// 異步停止RPC服務(wù)
CompletableFuture<Void> stopService();
/**
* Returns a future indicating when the RPC service has been shut down.
*
* @return Termination future
*/
// 返回一個CompletableFuture,在RPC服務(wù)完全關(guān)閉之后調(diào)用
CompletableFuture<Void> getTerminationFuture();
/**
* Gets the executor, provided by this RPC service. This executor can be used for example for
* the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods of futures.
*
* <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
* concurrent invocations and is therefore not suitable to run completion methods of futures
* that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the {@link
* RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that {@code RpcEndpoint}.
*
* @return The execution context provided by the RPC service
*/
// 獲取RPC服務(wù)執(zhí)行線程茎芋,可用于handleAsync等異步邏輯執(zhí)行
Executor getExecutor();
/**
* Gets a scheduled executor from the RPC service. This executor can be used to schedule tasks
* to be executed in the future.
*
* <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
* concurrent invocations and is therefore not suitable to run completion methods of futures
* that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the {@link
* RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that {@code RpcEndpoint}.
*
* @return The RPC service provided scheduled executor
*/
// 獲取定時任務(wù)線程池
ScheduledExecutor getScheduledExecutor();
/**
* Execute the runnable in the execution context of this RPC Service, as returned by {@link
* #getExecutor()}, after a scheduled delay.
*
* @param runnable Runnable to be executed
* @param delay The delay after which the runnable will be executed
*/
// 設(shè)置一個定時任務(wù)颅眶,在ScheduledExecutor中執(zhí)行
ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);
/**
* Execute the given runnable in the executor of the RPC service. This method can be used to run
* code outside of the main thread of a {@link RpcEndpoint}.
*
* <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
* concurrent invocations and is therefore not suitable to run completion methods of futures
* that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the {@link
* RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that {@code RpcEndpoint}.
*
* @param runnable to execute
*/
// 在RPC服務(wù)線程池中運行runnable
void execute(Runnable runnable);
/**
* Execute the given callable and return its result as a {@link CompletableFuture}. This method
* can be used to run code outside of the main thread of a {@link RpcEndpoint}.
*
* <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
* concurrent invocations and is therefore not suitable to run completion methods of futures
* that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the {@link
* RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that {@code RpcEndpoint}.
*
* @param callable to execute
* @param <T> is the return value type
* @return Future containing the callable's future result
*/
// 在RPC服務(wù)線程池異步運行callable任務(wù),異步結(jié)果以CompletableFuture形式返回
<T> CompletableFuture<T> execute(Callable<T> callable);
}
AkkaRpcServiceUtils
一個負(fù)責(zé)創(chuàng)建AkkaRpcService
的工具類田弥。
我們繼續(xù)第一節(jié)的創(chuàng)建RpcService的過程涛酗。查看AkkaRpcServiceUtils
的createRemoteRpcService
方法,如下所示:
public static AkkaRpcService createRemoteRpcService(
Configuration configuration,
@Nullable String externalAddress,
String externalPortRange,
@Nullable String bindAddress,
@SuppressWarnings("OptionalUsedAsFieldOrParameterType") Optional<Integer> bindPort)
throws Exception {
// 創(chuàng)建一個serviceBuilder
final AkkaRpcServiceBuilder akkaRpcServiceBuilder =
AkkaRpcServiceUtils.remoteServiceBuilder(
configuration, externalAddress, externalPortRange);
// 傳入bind地址和bind端口號配置
if (bindAddress != null) {
akkaRpcServiceBuilder.withBindAddress(bindAddress);
}
bindPort.ifPresent(akkaRpcServiceBuilder::withBindPort);
// 創(chuàng)建并啟動RpcService
return akkaRpcServiceBuilder.createAndStart();
}
AkkaRpcService
通過建造者模式構(gòu)建偷厦,在給予akkaRpcServiceBuilder
足夠配置信息后煤杀,調(diào)用createAndStart
方法創(chuàng)建出AkkaRpcService
。
public AkkaRpcService createAndStart() throws Exception {
// 獲取線程池并行度配置
if (actorSystemExecutorConfiguration == null) {
actorSystemExecutorConfiguration =
BootstrapTools.ForkJoinExecutorConfiguration.fromConfiguration(
configuration);
}
final ActorSystem actorSystem;
// 如果沒有配置外部訪問地址沪哺,創(chuàng)建本地ActorSystem
if (externalAddress == null) {
// create local actor system
actorSystem =
BootstrapTools.startLocalActorSystem(
configuration,
actorSystemName,
logger,
actorSystemExecutorConfiguration,
customConfig);
} else {
// 否則創(chuàng)建一個遠(yuǎn)程ActorSystem
// create remote actor system
actorSystem =
BootstrapTools.startRemoteActorSystem(
configuration,
actorSystemName,
externalAddress,
externalPortRange,
bindAddress,
Optional.ofNullable(bindPort),
logger,
actorSystemExecutorConfiguration,
customConfig);
}
// 返回AkkaRpcService實例
// 在后面章節(jié)分析
return new AkkaRpcService(
actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
}
緊接著我們分析ActorSystem的創(chuàng)建過程沈自。
BootstrapTools
startLocalActorSystem
此方法創(chuàng)建一個用于本地調(diào)用的ActorSystem
。
BootstrapTools
的startLocalActorSystem
方法內(nèi)容如下:
public static ActorSystem startLocalActorSystem(
Configuration configuration,
String actorSystemName,
Logger logger,
ActorSystemExecutorConfiguration actorSystemExecutorConfiguration,
Config customConfig)
throws Exception {
logger.info("Trying to start local actor system");
try {
// 獲取Akka配置辜妓,externalAddress和bindAddress為空
// 對應(yīng)的是一個本地的ActorSystem配置
Config akkaConfig =
AkkaUtils.getAkkaConfig(
configuration,
scala.Option.empty(),
scala.Option.empty(),
actorSystemExecutorConfiguration.getAkkaConfig());
// 如果有自定義配置枯途,將基本配置和自定義配置拼裝起來,重復(fù)的配置項基本配置優(yōu)先
if (customConfig != null) {
akkaConfig = customConfig.withFallback(akkaConfig);
}
// 啟動ActorSystem
return startActorSystem(akkaConfig, actorSystemName, logger);
} catch (Throwable t) {
throw new Exception("Could not create actor system", t);
}
}
startRemoteActorSystem
創(chuàng)建一個可以遠(yuǎn)程調(diào)用的ActorSystem
籍滴。
startRemoteActorSystem
方法內(nèi)容如下:
public static ActorSystem startRemoteActorSystem(
Configuration configuration,
String actorSystemName,
String externalAddress,
String externalPortRange,
String bindAddress,
@SuppressWarnings("OptionalUsedAsFieldOrParameterType") Optional<Integer> bindPort,
Logger logger,
ActorSystemExecutorConfiguration actorSystemExecutorConfiguration,
Config customConfig)
throws Exception {
// parse port range definition and create port iterator
Iterator<Integer> portsIterator;
try {
// 解析端口范圍字符串酪夷,生成整數(shù)類型端口集合
portsIterator = NetUtils.getPortRangeFromString(externalPortRange);
} catch (Exception e) {
throw new IllegalArgumentException(
"Invalid port range definition: " + externalPortRange);
}
while (portsIterator.hasNext()) {
final int externalPort = portsIterator.next();
// 逐個嘗試范圍內(nèi)的端口,直到啟動ActorSystem成功
try {
return startRemoteActorSystem(
configuration,
actorSystemName,
externalAddress,
externalPort,
bindAddress,
bindPort.orElse(externalPort),
logger,
actorSystemExecutorConfiguration,
customConfig);
} catch (Exception e) {
// we can continue to try if this contains a netty channel exception
Throwable cause = e.getCause();
if (!(cause instanceof org.jboss.netty.channel.ChannelException
|| cause instanceof java.net.BindException)) {
throw e;
} // else fall through the loop and try the next port
}
}
// 如果執(zhí)行到這一步孽惰,說明范圍內(nèi)所有端口都無法使用
// if we come here, we have exhausted the port range
throw new BindException(
"Could not start actor system on any port in port range " + externalPortRange);
}
注:
Flink1.11之后支持TaskManager JobManager本地和遠(yuǎn)程使用不同的地址和端口晚岭,從而支持Docker和NAT端口映射。具體內(nèi)容參見Flink-15911和Flink-15154勋功。
配置遠(yuǎn)程的監(jiān)聽地址和端口:
- jobmanager.rpc.address
- jobmanager.rpc.port
- taskmanager.host
- taskmanager.rpc.port
- taskmanager.data.port
配置本地的監(jiān)聽地址和端口:
- jobmanager.bind-host
- jobmanager.rpc.bind-port
- taskmanager.bind-host
- taskmanager.rpc.bind-port
- taskmanager.data.bind-port
上面的方法最后調(diào)用了重載方法坦报。該方法內(nèi)容和startLocalActorSystem
類似。
private static ActorSystem startRemoteActorSystem(
Configuration configuration,
String actorSystemName,
String externalAddress,
int externalPort,
String bindAddress,
int bindPort,
Logger logger,
ActorSystemExecutorConfiguration actorSystemExecutorConfiguration,
Config customConfig)
throws Exception {
// 將地址和端口規(guī)范化后返回
String externalHostPortUrl =
NetUtils.unresolvedHostAndPortToNormalizedString(externalAddress, externalPort);
String bindHostPortUrl =
NetUtils.unresolvedHostAndPortToNormalizedString(bindAddress, bindPort);
logger.info(
"Trying to start actor system, external address {}, bind address {}.",
externalHostPortUrl,
bindHostPortUrl);
try {
// 和startLocalActorSystem一樣
// 多了傳入externalAddress port狂鞋,和bindAddress以及port
Config akkaConfig =
AkkaUtils.getAkkaConfig(
configuration,
new Some<>(new Tuple2<>(externalAddress, externalPort)),
new Some<>(new Tuple2<>(bindAddress, bindPort)),
actorSystemExecutorConfiguration.getAkkaConfig());
if (customConfig != null) {
akkaConfig = customConfig.withFallback(akkaConfig);
}
return startActorSystem(akkaConfig, actorSystemName, logger);
} catch (Throwable t) {
if (t instanceof ChannelException) {
Throwable cause = t.getCause();
if (cause != null && t.getCause() instanceof BindException) {
throw new IOException(
"Unable to create ActorSystem at address "
+ bindHostPortUrl
+ " : "
+ cause.getMessage(),
t);
}
}
throw new Exception("Could not create actor system", t);
}
}
startActorSystem
最后片择,通過AkkaUtils
創(chuàng)建出ActorSystem
。
private static ActorSystem startActorSystem(
Config akkaConfig, String actorSystemName, Logger logger) {
logger.debug("Using akka configuration\n {}", akkaConfig);
ActorSystem actorSystem = AkkaUtils.createActorSystem(actorSystemName, akkaConfig);
logger.info("Actor system started at {}", AkkaUtils.getAddress(actorSystem));
return actorSystem;
}
AkkaUtils
AkkaUtils
負(fù)責(zé)生成Akka配置和創(chuàng)建ActorSystem
骚揍。這里我們主要關(guān)注它生成Akka配置的方法字管。
下面是getAkkaConfig
方法,注意該類使用Scala編寫信不。
@throws(classOf[UnknownHostException])
def getAkkaConfig(configuration: Configuration,
externalAddress: Option[(String, Int)],
bindAddress: Option[(String, Int)],
executorConfig: Config): Config = {
// 獲取Akka基本配置嘲叔,以及executor配置
val defaultConfig = getBasicAkkaConfig(configuration).withFallback(executorConfig)
// 根據(jù)bindHostname,bindPort抽活,externalHostname硫戈,externalPort是否傳入,生成不同的配置
externalAddress match {
case Some((externalHostname, externalPort)) =>
bindAddress match {
case Some((bindHostname, bindPort)) =>
val remoteConfig = getRemoteAkkaConfig(
configuration, bindHostname, bindPort, externalHostname, externalPort)
remoteConfig.withFallback(defaultConfig)
case None =>
val remoteConfig = getRemoteAkkaConfig(configuration,
// the wildcard IP lets us bind to all network interfaces
NetUtils.getWildcardIPAddress, externalPort, externalHostname, externalPort)
remoteConfig.withFallback(defaultConfig)
}
case None =>
defaultConfig
}
}
Akka配置
從AkkaConfig中我們可以找到Akka基礎(chǔ)配置和遠(yuǎn)程調(diào)用配置模版酌壕。這些模版非常詳細(xì)掏愁,它使用了Scala的String interpolation方式將配置值填充入模版。
基本配置
Akka基本配置生成邏輯位于getBasicAkkaConfig
方法中卵牍。組裝的配置文件如下:
val config =
s"""
|akka {
| daemonic = off
|
| loggers = ["akka.event.slf4j.Slf4jLogger"]
| logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
| log-config-on-start = off
| logger-startup-timeout = 30s
|
| jvm-exit-on-fatal-error = $jvmExitOnFatalError
|
| serialize-messages = off
|
| loglevel = $logLevel
| stdout-loglevel = OFF
|
| log-dead-letters = $logLifecycleEvents
| log-dead-letters-during-shutdown = $logLifecycleEvents
|
| actor {
| guardian-supervisor-strategy = $supervisorStrategy
|
| warn-about-java-serializer-usage = off
|
| default-dispatcher {
| throughput = $akkaThroughput
| }
|
| supervisor-dispatcher {
| type = Dispatcher
| executor = "thread-pool-executor"
| thread-pool-executor {
| core-pool-size-min = 1
| core-pool-size-max = 1
| }
| }
| }
|}
""".stripMargin
遠(yuǎn)程配置
Akka的遠(yuǎn)程配置組裝方法位于getRemoteAkkaConfig
中果港。這里使用了akka-remote模塊實現(xiàn)遠(yuǎn)程調(diào)用。
val configString =
s"""
|akka {
| actor {
| provider = "akka.remote.RemoteActorRefProvider"
| }
|
| remote {
| startup-timeout = $startupTimeout
|
| transport-failure-detector{
| acceptable-heartbeat-pause = $transportHeartbeatPause
| heartbeat-interval = $transportHeartbeatInterval
| threshold = $transportThreshold
| }
|
| netty {
| tcp {
| transport-class = "akka.remote.transport.netty.NettyTransport"
| port = $externalPort
| bind-port = $port
| connection-timeout = $akkaTCPTimeout
| maximum-frame-size = $akkaFramesize
| tcp-nodelay = on
|
| client-socket-worker-pool {
| pool-size-min = $clientSocketWorkerPoolPoolSizeMin
| pool-size-max = $clientSocketWorkerPoolPoolSizeMax
| pool-size-factor = $clientSocketWorkerPoolPoolSizeFactor
| }
|
| server-socket-worker-pool {
| pool-size-min = $serverSocketWorkerPoolPoolSizeMin
| pool-size-max = $serverSocketWorkerPoolPoolSizeMax
| pool-size-factor = $serverSocketWorkerPoolPoolSizeFactor
| }
| }
| }
|
| log-remote-lifecycle-events = $logLifecycleEvents
|
| retry-gate-closed-for = ${retryGateClosedFor + " ms"}
| }
|}
""".stripMargin
val hostnameConfigString =
s"""
|akka {
| remote {
| netty {
| tcp {
| hostname = "$effectiveHostname"
| bind-hostname = "$bindAddress"
| }
| }
| }
|}
""".stripMargin
val sslConfigString = if (akkaEnableSSLConfig) {
s"""
|akka {
| remote {
|
| enabled-transports = ["akka.remote.netty.ssl"]
|
| netty {
|
| ssl = $${akka.remote.netty.tcp}
|
| ssl {
|
| enable-ssl = $akkaEnableSSL
| ssl-engine-provider = org.apache.flink.runtime.akka.CustomSSLEngineProvider
| security {
| key-store = "$akkaSSLKeyStore"
| key-store-password = "$akkaSSLKeyStorePassword"
| key-password = "$akkaSSLKeyPassword"
| trust-store = "$akkaSSLTrustStore"
| trust-store-password = "$akkaSSLTrustStorePassword"
| protocol = $akkaSSLProtocol
| enabled-algorithms = $akkaSSLAlgorithms
| random-number-generator = ""
| require-mutual-authentication = on
| cert-fingerprints = $akkaSSLCertFingerprints
| }
| }
| }
| }
|}
""".stripMargin
}else{
""
}
最后使用
ConfigFactory.parseString(configString + hostnameConfigString + sslConfigString).resolve()
將這3個配置項組裝融合在一起糊昙,生成完整配置辛掠。
AkkaRpcService
前面章節(jié)講述了RpcService
的創(chuàng)建過程,接下來我們細(xì)看下AkkaRpcService
释牺。
AkkaRpcService
是RpcService
的唯一實現(xiàn)類萝衩。它除了持有AkkaSystem
的引用外,還維護所有注冊了的RpcEndpoint
的引用没咙,為每個RpcEndpoint
分配一個ActerRef
并保存他們的對應(yīng)關(guān)系猩谊。
@GuardedBy("lock")
private final Map<ActorRef, RpcEndpoint> actors = new HashMap<>(4);
我們從構(gòu)造函數(shù)開始分析:
@VisibleForTesting
public AkkaRpcService(
final ActorSystem actorSystem, final AkkaRpcServiceConfiguration configuration) {
this.actorSystem = checkNotNull(actorSystem, "actor system");
this.configuration = checkNotNull(configuration, "akka rpc service configuration");
Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
// 獲取ActorSystem的host地址
if (actorSystemAddress.host().isDefined()) {
address = actorSystemAddress.host().get();
} else {
address = "";
}
// 獲取端口號,如果沒有配置祭刚,返回-1
if (actorSystemAddress.port().isDefined()) {
port = (Integer) actorSystemAddress.port().get();
} else {
port = -1;
}
// 是否捕捉Ask操作的調(diào)用棧
captureAskCallstacks = configuration.captureAskCallStack();
// ActorSystem scheduler的一個包裝類
internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);
// RPC服務(wù)停機之后的異步回調(diào)
terminationFuture = new CompletableFuture<>();
// 標(biāo)記RPC服務(wù)的狀態(tài)牌捷,當(dāng)前是正在運行
stopped = false;
// 啟動SupervisorActor
supervisor = startSupervisorActor();
}
注意到構(gòu)造函數(shù)的最后一行啟動SupervisorActor
。一個AkkaRpcService
可以連接提供給多個RpcEndpoint
使用涡驮,它為每個RpcEndpoint
創(chuàng)建一個Actor
暗甥,這些Actor
都是SupervisorActor
的子Actor
。
private Supervisor startSupervisorActor() {
final ExecutorService terminationFutureExecutor =
Executors.newSingleThreadExecutor(
new ExecutorThreadFactory(
"AkkaRpcService-Supervisor-Termination-Future-Executor"));
// 創(chuàng)建一個SupervisorActor
final ActorRef actorRef =
SupervisorActor.startSupervisorActor(actorSystem, terminationFutureExecutor);
// 將SupervisorActor和terminationFutureExecutor包裝后返回
return Supervisor.create(actorRef, terminationFutureExecutor);
}
startServer
startServer
方法在創(chuàng)建RpcEndpoint的時候調(diào)用:
protected RpcEndpoint(final RpcService rpcService) {
this(rpcService, UUID.randomUUID().toString());
}
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");
// 在創(chuàng)建RpcEndpoint的時候需要傳入RpcService捉捅,同時調(diào)用了它的startServer方法
this.rpcServer = rpcService.startServer(this);
this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}
接下來我們分析AkkaRpcServer
的startServer
方法:
@Override
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
checkNotNull(rpcEndpoint, "rpc endpoint");
// 向SupervisorActor注冊撤防,生成一個新的Actor
final SupervisorActor.ActorRegistration actorRegistration =
registerAkkaRpcActor(rpcEndpoint);
// 獲取這個Actor
final ActorRef actorRef = actorRegistration.getActorRef();
// 獲取這個Actor終止運行時候的CompletableFuture
final CompletableFuture<Void> actorTerminationFuture =
actorRegistration.getTerminationFuture();
LOG.info(
"Starting RPC endpoint for {} at {} .",
rpcEndpoint.getClass().getName(),
actorRef.path());
// 獲取akka地址和hostname
final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
final String hostname;
Option<String> host = actorRef.path().address().host();
if (host.isEmpty()) {
hostname = "localhost";
} else {
hostname = host.get();
}
// 這個方法獲取rpcEndpoint所有實現(xiàn)的接口并保存入HashSet
Set<Class<?>> implementedRpcGateways =
new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
// 接口加入RpcServer,AkkaBasedEndpoint這兩個
implementedRpcGateways.add(RpcServer.class);
implementedRpcGateways.add(AkkaBasedEndpoint.class);
final InvocationHandler akkaInvocationHandler;
// 下面是創(chuàng)建RpcServer代理類的過程
// 根據(jù)RpcEndpoint的類型棒口,使用不同的InvocationHandler
if (rpcEndpoint instanceof FencedRpcEndpoint) {
// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
akkaInvocationHandler =
new FencedAkkaInvocationHandler<>(
akkaAddress,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
actorTerminationFuture,
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
captureAskCallstacks);
implementedRpcGateways.add(FencedMainThreadExecutable.class);
} else {
akkaInvocationHandler =
new AkkaInvocationHandler(
akkaAddress,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
actorTerminationFuture,
captureAskCallstacks);
}
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
ClassLoader classLoader = getClass().getClassLoader();
// 使用JDK代理方式創(chuàng)建一個RpcServer
// 調(diào)用RpcServer實際上執(zhí)行的是akkaInvocationHandler
// 接下來的一章專門分析
@SuppressWarnings("unchecked")
RpcServer server =
(RpcServer)
Proxy.newProxyInstance(
classLoader,
implementedRpcGateways.toArray(
new Class<?>[implementedRpcGateways.size()]),
akkaInvocationHandler);
return server;
}
registerAkkaRpcActor
方法從SupervisorActor
創(chuàng)建出子Actor
寄月,類型為FencedAkkaRpcActor
或者是AkkaRpcActor
。
private <C extends RpcEndpoint & RpcGateway>
SupervisorActor.ActorRegistration registerAkkaRpcActor(C rpcEndpoint) {
final Class<? extends AbstractActor> akkaRpcActorType;
// 判斷RpcEndpoint的類型无牵,創(chuàng)建對應(yīng)的Actor
if (rpcEndpoint instanceof FencedRpcEndpoint) {
akkaRpcActorType = FencedAkkaRpcActor.class;
} else {
akkaRpcActorType = AkkaRpcActor.class;
}
synchronized (lock) {
// 先檢查RpcService必須為停止?fàn)顟B(tài)
checkState(!stopped, "RpcService is stopped");
// 從SupervisorActor創(chuàng)建一個新的Actor
final SupervisorActor.StartAkkaRpcActorResponse startAkkaRpcActorResponse =
SupervisorActor.startAkkaRpcActor(
// 獲取SupervisorActor
supervisor.getActor(),
// 傳入Actor構(gòu)造工廠方法
actorTerminationFuture ->
Props.create(
akkaRpcActorType,
rpcEndpoint,
actorTerminationFuture,
getVersion(),
configuration.getMaximumFramesize()),
//最后是endpoint的id
rpcEndpoint.getEndpointId());
// 為actorRegistration綁定異常響應(yīng)
final SupervisorActor.ActorRegistration actorRegistration =
startAkkaRpcActorResponse.orElseThrow(
cause ->
new AkkaRpcRuntimeException(
String.format(
"Could not create the %s for %s.",
AkkaRpcActor.class.getSimpleName(),
rpcEndpoint.getEndpointId()),
cause));
// 將這個新創(chuàng)建的actor和關(guān)聯(lián)的rpcEndpoint對應(yīng)關(guān)系保存
actors.put(actorRegistration.getActorRef(), rpcEndpoint);
return actorRegistration;
}
}
接著分析SupervisorActor.startAkkaRpcActor
剥懒,即創(chuàng)建Actor的方法。
public static StartAkkaRpcActorResponse startAkkaRpcActor(
ActorRef supervisor, StartAkkaRpcActor.PropsFactory propsFactory, String endpointId) {
// 調(diào)用SupervisorActor并同步等待結(jié)果
// 發(fā)送給SupervisorActor的消息通過createStartAkkaRpcActorMessage創(chuàng)建
return Patterns.ask(
supervisor,
createStartAkkaRpcActorMessage(propsFactory, endpointId),
RpcUtils.INF_DURATION)
.toCompletableFuture()
.thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast)
.join();
}
接著需要跟蹤SupervisorActor
如何響應(yīng)創(chuàng)建actor的請求合敦。
@Override
public Receive createReceive() {
return receiveBuilder()
.match(StartAkkaRpcActor.class, this::createStartAkkaRpcActorMessage)
.matchAny(this::handleUnknownMessage)
.build();
}
SupervisiorActor
接收到StartAkkaRpcActor
類型的message初橘,調(diào)用createStartAkkaRpcActorMessage
處理方法。代碼如下:
private void createStartAkkaRpcActorMessage(StartAkkaRpcActor startAkkaRpcActor) {
// 從接收到的消息中取回endpoint id
final String endpointId = startAkkaRpcActor.getEndpointId();
// 創(chuàng)建AkkaRpcActorRegistration
final AkkaRpcActorRegistration akkaRpcActorRegistration =
new AkkaRpcActorRegistration(endpointId);
// 從接收到的消息中取回屬性工廠方法充岛,創(chuàng)建出Props
// 這樣做的目的是方便操作InternalTerminationFuture
// 屬性構(gòu)建邏輯在RpcService中指定但是InternalTerminationFuture不會暴露給RpcService
final Props akkaRpcActorProps =
startAkkaRpcActor
.getPropsFactory()
.create(akkaRpcActorRegistration.getInternalTerminationFuture());
LOG.debug(
"Starting {} with name {}.",
akkaRpcActorProps.actorClass().getSimpleName(),
endpointId);
try {
// 創(chuàng)建出子actor保檐,名稱為endpoint的id
final ActorRef actorRef = getContext().actorOf(akkaRpcActorProps, endpointId);
// 保存這個actor
registeredAkkaRpcActors.put(actorRef, akkaRpcActorRegistration);
// 將actor創(chuàng)建成功的消息發(fā)送回調(diào)用端
// 創(chuàng)建成功的actor也一并返回
getSender()
.tell(
StartAkkaRpcActorResponse.success(
ActorRegistration.create(
actorRef,
akkaRpcActorRegistration
.getExternalTerminationFuture())),
getSelf());
} catch (AkkaException akkaException) {
getSender().tell(StartAkkaRpcActorResponse.failure(akkaException), getSelf());
}
}
connect方法
在這一節(jié)分析connect
方法。該方法使用指定的地址崔梗,連接到遠(yuǎn)程Rpc服務(wù)夜只。第二個參數(shù)class的含義為RpcGateway
的類型。RpcGateway
是和遠(yuǎn)程ActorSystem通信的途徑蒜魄。建立連接之后扔亥,調(diào)用端可以使用返回的C
類型對象遠(yuǎn)程調(diào)用actor场躯。
@Override
public <C extends RpcGateway> CompletableFuture<C> connect(
final String address, final Class<C> clazz) {
return connectInternal(
address,
clazz,
// 這里是一個工廠方法
// 從actorRef創(chuàng)建出InvocationHandler
// 在connectInternal方法中使用這個工廠方法
(ActorRef actorRef) -> {
// 獲取actor的地址和hostname
Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);
// 和前面一樣,也是創(chuàng)建出一個AkkaInvocationHandler對象
return new AkkaInvocationHandler(
addressHostname.f0,
addressHostname.f1,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
null,
captureAskCallstacks);
});
}
connectInternal
方法內(nèi)容如下:
private <C extends RpcGateway> CompletableFuture<C> connectInternal(
final String address,
final Class<C> clazz,
Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
checkState(!stopped, "RpcService is stopped");
LOG.debug(
"Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",
address,
clazz.getName());
// 從遠(yuǎn)程地址獲取ActorRef旅挤,超時時間在AkkaRpcServiceConfiguration中
final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);
// 獲取到ActorRef后執(zhí)行
final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =
actorRefFuture.thenCompose(
(ActorRef actorRef) ->
FutureUtils.toJava(
// 發(fā)送RemoteHandshakeMessage給遠(yuǎn)端
// 這是握手步驟
Patterns.ask(
actorRef,
new RemoteHandshakeMessage(
clazz, getVersion()),
configuration.getTimeout().toMilliseconds())
.<HandshakeSuccessMessage>mapTo(
ClassTag$.MODULE$
.<HandshakeSuccessMessage>apply(
HandshakeSuccessMessage
.class))));
// 返回一個創(chuàng)建RpcGateway的CompletableFuture
return actorRefFuture.thenCombineAsync(
handshakeFuture,
(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
// 在握手操作完畢后執(zhí)行
// 使用前面所述的方法參數(shù)傳入的invocationHandlerFactory踢关,使用actorRef創(chuàng)建出一個AkkaInvocationHandler
InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and
// all Flink
// code is loaded dynamically (for example from an OSGI bundle) through a custom
// ClassLoader
ClassLoader classLoader = getClass().getClassLoader();
// 創(chuàng)建出代理類,類型轉(zhuǎn)換為C類型
// 使用JDK代理
@SuppressWarnings("unchecked")
C proxy =
(C)
Proxy.newProxyInstance(
classLoader, new Class<?>[] {clazz}, invocationHandler);
return proxy;
},
actorSystem.dispatcher());
}
通過上面的分析我們了解到最終創(chuàng)建出的無論是RpcServer
還是C extends RpcGateway
粘茄,都是使用了JDK的動態(tài)代理签舞。真實的執(zhí)行邏輯都在invocationHandler
中。下一節(jié)我們分析AkkaInvocationHandler
柒瓣。
AkkaInvocationHandler
接上一節(jié)儒搭,RpcServer
創(chuàng)建出的遠(yuǎn)程調(diào)用端實際上為JDK代理類,它的真實執(zhí)行邏輯在AkkaInvocationHandler
中芙贫。關(guān)于Java動態(tài)代理詳細(xì)介紹參見:Java 動態(tài)代理
發(fā)起RPC遠(yuǎn)程方法調(diào)用
invoke
方法根據(jù)聲明method的位置搂鲫,決定調(diào)用AkkaInvocationHandler
中的方法還是走遠(yuǎn)程調(diào)用。詳細(xì)分析如下:
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 獲取聲明該方法的class或接口
Class<?> declaringClass = method.getDeclaringClass();
Object result;
// 如果method在這幾個接口或類中定義磺平,本地調(diào)用AkkaInvocationHandler的同名方法
if (declaringClass.equals(AkkaBasedEndpoint.class)
|| declaringClass.equals(Object.class)
|| declaringClass.equals(RpcGateway.class)
|| declaringClass.equals(StartStoppable.class)
|| declaringClass.equals(MainThreadExecutable.class)
|| declaringClass.equals(RpcServer.class)) {
result = method.invoke(this, args);
} else if (declaringClass.equals(FencedRpcGateway.class)) {
// 不支持FencedRpcGateway中的方法
throw new UnsupportedOperationException(
"AkkaInvocationHandler does not support the call FencedRpcGateway#"
+ method.getName()
+ ". This indicates that you retrieved a FencedRpcGateway without specifying a "
+ "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to "
+ "retrieve a properly FencedRpcGateway.");
} else {
// 如果method不屬于上面所有的接口默穴,走RPC遠(yuǎn)程調(diào)用
result = invokeRpc(method, args);
}
return result;
}
我們繼續(xù)分析RPC調(diào)用邏輯。invokeRpc
方法內(nèi)容和分析如下所示:
private Object invokeRpc(Method method, Object[] args) throws Exception {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 獲取所有的參數(shù)附帶的注解褪秀,以二維數(shù)組形式返回
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
// 從方法參數(shù)中獲取RpcTimeout注解標(biāo)記的參數(shù)蓄诽,并讀取超時時間
Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);
// 創(chuàng)建Actor需要發(fā)送的信息,包含method調(diào)用的必要參數(shù)
final RpcInvocation rpcInvocation =
createRpcInvocationMessage(methodName, parameterTypes, args);
// 獲取方法的返回類型
Class<?> returnType = method.getReturnType();
final Object result;
// 如果方法無返回值媒吗,直接發(fā)送給遠(yuǎn)程Actor
if (Objects.equals(returnType, Void.TYPE)) {
tell(rpcInvocation);
result = null;
} else {
// Capture the call stack. It is significantly faster to do that via an exception than
// via Thread.getStackTrace(), because exceptions lazily initialize the stack trace,
// initially only
// capture a lightweight native pointer, and convert that into the stack trace lazily
// when needed.
// 獲取CallStack仑氛,用于拼裝stackstrace,節(jié)省調(diào)用時間
final Throwable callStackCapture = captureAskCallStack ? new Throwable() : null;
// execute an asynchronous call
// 發(fā)送rpcInvocation給遠(yuǎn)程Actor并等待回復(fù)闸英,超時時間為futureTimeout
final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
// 獲取遠(yuǎn)程調(diào)用結(jié)果锯岖,如有需要則反序列化(返回值為AkkaRpcSerializedValue類型)
resultFuture.whenComplete(
(resultValue, failure) -> {
if (failure != null) {
completableFuture.completeExceptionally(
resolveTimeoutException(failure, callStackCapture, method));
} else {
completableFuture.complete(
deserializeValueIfNeeded(resultValue, method));
}
});
// 如果返回值為CompletableFuture類型,直接返回
// 否則從completableFuture阻塞等待獲取值之后返回
if (Objects.equals(returnType, CompletableFuture.class)) {
result = completableFuture;
} else {
try {
result =
completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
} catch (ExecutionException ee) {
throw new RpcException(
"Failure while obtaining synchronous RPC result.",
ExceptionUtils.stripExecutionException(ee));
}
}
}
return result;
}
接下來的問題是我們?nèi)绾巫尡徽{(diào)用端知道我們希望調(diào)用的方法是什么甫何,參數(shù)是什么出吹。這些關(guān)鍵信息包含在createRpcInvocationMessage
方法中。內(nèi)容如下:
protected RpcInvocation createRpcInvocationMessage(
final String methodName, final Class<?>[] parameterTypes, final Object[] args)
throws IOException {
final RpcInvocation rpcInvocation;
// 如果是本地調(diào)用
// 創(chuàng)建LocalRpcInvocation
if (isLocal) {
rpcInvocation = new LocalRpcInvocation(methodName, parameterTypes, args);
} else {
try {
// 否則創(chuàng)建remoteRpcInvocation
RemoteRpcInvocation remoteRpcInvocation =
new RemoteRpcInvocation(methodName, parameterTypes, args);
// 檢查序列化之后的遠(yuǎn)程方法調(diào)用對象是否超過了最大幀大小
// RemoteRpcInvocation包含了序列化方法辙喂,這樣才能通過Akka發(fā)送
// 如果超過不讓發(fā)送
if (remoteRpcInvocation.getSize() > maximumFramesize) {
throw new IOException(
String.format(
"The rpc invocation size %d exceeds the maximum akka framesize.",
remoteRpcInvocation.getSize()));
} else {
rpcInvocation = remoteRpcInvocation;
}
} catch (IOException e) {
LOG.warn(
"Could not create remote rpc invocation message. Failing rpc invocation because...",
e);
throw e;
}
}
return rpcInvocation;
}
RpcInvocation
RpcInvocation
是RPC調(diào)用發(fā)給遠(yuǎn)程Actor的信息載體捶牢。上面的LocalRpcInvocation
和RemoteRpcInvocation
它封裝RPC遠(yuǎn)程方法調(diào)用的必要參數(shù),分別為方法名巍耗,參數(shù)類型列表和參數(shù)列表秋麸。接口代碼如下所示:
public interface RpcInvocation {
/**
* Returns the method's name.
*
* @return Method name
* @throws IOException if the rpc invocation message is a remote message and could not be
* deserialized
* @throws ClassNotFoundException if the rpc invocation message is a remote message and contains
* serialized classes which cannot be found on the receiving side
*/
String getMethodName() throws IOException, ClassNotFoundException;
/**
* Returns the method's parameter types
*
* @return Method's parameter types
* @throws IOException if the rpc invocation message is a remote message and could not be
* deserialized
* @throws ClassNotFoundException if the rpc invocation message is a remote message and contains
* serialized classes which cannot be found on the receiving side
*/
Class<?>[] getParameterTypes() throws IOException, ClassNotFoundException;
/**
* Returns the arguments of the remote procedure call
*
* @return Arguments of the remote procedure call
* @throws IOException if the rpc invocation message is a remote message and could not be
* deserialized
* @throws ClassNotFoundException if the rpc invocation message is a remote message and contains
* serialized classes which cannot be found on the receiving side
*/
Object[] getArgs() throws IOException, ClassNotFoundException;
}
LocalRpcInvocation
為它的簡單實現(xiàn),沒有添加其他邏輯炬太,這里不再分析代碼灸蟆。
需要重點關(guān)注的是RemoteRpcInvocation
。RemoteRpcInvocation
是支持序列化的亲族,這樣才能夠?qū)⑦@個message發(fā)送給遠(yuǎn)程actor炒考。RemoteRpcInvocation
的信息載體為內(nèi)部類MethodInvocation
可缚。同樣具有Method反射調(diào)用三要素
private String methodName;
private Class<?>[] parameterTypes;
private Object[] args;
我們接著分析下它是如何把這三個參數(shù)序列化和反序列化的。
private void writeObject(ObjectOutputStream oos) throws IOException {
// 首先寫入方法名
oos.writeUTF(methodName);
// 寫入?yún)?shù)個數(shù)
oos.writeInt(parameterTypes.length);
// 逐個寫入?yún)?shù)類型
for (Class<?> parameterType : parameterTypes) {
oos.writeObject(parameterType);
}
if (args != null) {
// 如果有參數(shù)列表斋枢,先寫入true
oos.writeBoolean(true);
// 逐個寫入?yún)?shù)對象
for (int i = 0; i < args.length; i++) {
try {
oos.writeObject(args[i]);
} catch (IOException e) {
throw new IOException(
"Could not serialize "
+ i
+ "th argument of method "
+ methodName
+ ". This indicates that the argument type "
+ args.getClass().getName()
+ " is not serializable. Arguments have to "
+ "be serializable for remote rpc calls.",
e);
}
}
} else {
// 如果沒有參數(shù)列表帘靡,寫入false
oos.writeBoolean(false);
}
}
// 和序列化的邏輯相對應(yīng)
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
// 讀取方法名
methodName = ois.readUTF();
// 讀取參數(shù)列表長度
int length = ois.readInt();
parameterTypes = new Class<?>[length];
// 讀取參數(shù)類型
for (int i = 0; i < length; i++) {
try {
parameterTypes[i] = (Class<?>) ois.readObject();
} catch (IOException e) {
StringBuilder incompleteMethod = getIncompleteMethodString(i, 0);
throw new IOException(
"Could not deserialize "
+ i
+ "th parameter type of method "
+ incompleteMethod
+ '.',
e);
} catch (ClassNotFoundException e) {
// note: wrapping this CNFE into another CNFE does not overwrite the Exception
// stored in the ObjectInputStream (see ObjectInputStream#readSerialData)
// -> add a suppressed exception that adds a more specific message
StringBuilder incompleteMethod = getIncompleteMethodString(i, 0);
e.addSuppressed(
new ClassNotFoundException(
"Could not deserialize "
+ i
+ "th "
+ "parameter type of method "
+ incompleteMethod
+ ". This indicates that the parameter "
+ "type is not part of the system class loader."));
throw e;
}
}
// 讀取boolean判斷調(diào)用時候是否傳入了參數(shù)
boolean hasArgs = ois.readBoolean();
if (hasArgs) {
args = new Object[length];
// 如果有參數(shù),逐個讀取
for (int i = 0; i < length; i++) {
try {
args[i] = ois.readObject();
} catch (IOException e) {
StringBuilder incompleteMethod = getIncompleteMethodString(length, i);
throw new IOException(
"Could not deserialize "
+ i
+ "th argument of method "
+ incompleteMethod
+ '.',
e);
} catch (ClassNotFoundException e) {
// note: wrapping this CNFE into another CNFE does not overwrite the
// Exception
// stored in the ObjectInputStream (see
// ObjectInputStream#readSerialData)
// -> add a suppressed exception that adds a more specific message
StringBuilder incompleteMethod = getIncompleteMethodString(length, i);
e.addSuppressed(
new ClassNotFoundException(
"Could not deserialize "
+ i
+ "th "
+ "argument of method "
+ incompleteMethod
+ ". This indicates that the argument "
+ "type is not part of the system class loader."));
throw e;
}
}
} else {
// 如果無參數(shù)杏慰,args賦值為null
args = null;
}
}
發(fā)送RemoteRpcInvocation
給遠(yuǎn)端的時候测柠,實際包含的內(nèi)容是serializedMethodInvocation
炼鞠。它包含的是字節(jié)數(shù)組類型缘滥,在創(chuàng)建serializedMethodInvocation
的時候,會使用InstantiationUtil.deserializeObject
將MethodInvocation
序列化谒主。
// Serialized invocation data
private SerializedValue<RemoteRpcInvocation.MethodInvocation> serializedMethodInvocation;
// Transient field which is lazily initialized upon first access to the invocation data
private transient RemoteRpcInvocation.MethodInvocation methodInvocation;
AkkaRpcActor
這一章我們開始分析actor朝扼。如上面所述,在Flink中所有的AkkaRpcActor
都是SupervisouActor
的子actor霎肯,AkkaRpcActor
由RpcService
的startServer
方法創(chuàng)建出擎颖。
還有一種FencedAkkaRpcActor
,它為了避免腦裂而設(shè)計观游。使用FencingToken
來標(biāo)記可接受哪個發(fā)送者的消息搂捧。比如說JobManager配置了HA,存在兩個JobManager:JobManager1和JobManager2懂缕。一開始JobManager1為leader狀態(tài)允跑,FencingToken
被設(shè)置為JobManager1的ID,遠(yuǎn)端都只接收J(rèn)obManager1發(fā)來的消息搪柑。突然JobManager1崩潰聋丝,JobManager2獲得了leader,設(shè)置FencingToken
為JobManager2的ID工碾,這時候遠(yuǎn)端都只接收J(rèn)obManager2發(fā)來的消息弱睦。然而,又過了一會兒渊额,JobManager1恢復(fù)况木,在它得知自己失去leader狀態(tài)之前,仍會發(fā)送消息旬迹,因為FencingToken
已經(jīng)更改焦读,校驗失敗,遠(yuǎn)端拒絕接受JobManager1發(fā)來的消息舱权,從而避免了腦裂問題矗晃。
除了Fencing機制外,FencedAkkaRpcActor
和AkkaRpcActor
的邏輯相同宴倍。下面的分析都以AkkaRpcActor
為準(zhǔn)张症。
接受RPC調(diào)用
AkkaRpcActor
接收message的處理方法如下:
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
.match(ControlMessages.class, this::handleControlMessage)
.matchAny(this::handleMessage)
.build();
}
這個方法很容易理解仓技,如果Message類型為RemoteHandshakeMessage
,調(diào)用handleHandshakeMessage
方法處理(建立連接時的握手信息)俗他,如果類型為ControlMessages
脖捻,使用handleControlMessage
處理(改變actor的啟動停止?fàn)顟B(tài)等),其他的message類型都使用handleMessage
方法處理兆衅。接下來我們重點關(guān)注處理消息的handleMessage
方法地沮。
private void handleMessage(final Object message) {
if (state.isRunning()) {
// 確保handleRpcMessage方法沒有被多線程調(diào)用
mainThreadValidator.enterMainThread();
try {
// 處理Rpc消息,下面分析
handleRpcMessage(message);
} finally {
mainThreadValidator.exitMainThread();
}
} else {
log.info(
"The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.",
rpcEndpoint.getClass().getName(),
message.getClass().getName());
sendErrorIfSender(
new AkkaRpcException(
String.format(
"Discard message, because the rpc endpoint %s has not been started yet.",
rpcEndpoint.getAddress())));
}
}
handleRpcMessage
handleRpcMessage
方法內(nèi)容如下所示:
protected void handleRpcMessage(Object message) {
if (message instanceof RunAsync) {
handleRunAsync((RunAsync) message);
} else if (message instanceof CallAsync) {
handleCallAsync((CallAsync) message);
} else if (message instanceof RpcInvocation) {
handleRpcInvocation((RpcInvocation) message);
} else {
log.warn(
"Received message of unknown type {} with value {}. Dropping this message!",
message.getClass().getName(),
message);
sendErrorIfSender(
new AkkaUnknownMessageException(
"Received unknown message "
+ message
+ " of type "
+ message.getClass().getSimpleName()
+ '.'));
}
}
這個方法中進一步根據(jù)message的類型拆分邏輯羡亩,調(diào)用相應(yīng)的處理方法摩疑。message可能為RunAsync
(異步無返回值調(diào)用),CallAsync
(異步有返回值調(diào)用)和RpcInvocation
(RPC方法調(diào)用)畏铆。我們重點關(guān)注的是RPC方法調(diào)用雷袋。
handleRpcInvocation
private void handleRpcInvocation(RpcInvocation rpcInvocation) {
Method rpcMethod = null;
try {
// 獲取方法名和參數(shù)列表
String methodName = rpcInvocation.getMethodName();
Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();
// 從rpcEndpoint中查找匹配的方法
// 這里的RpcEndpoint為T類型,并非RpcEndpoint這個類
rpcMethod = lookupRpcMethod(methodName, parameterTypes);
// 如果出錯辞居,將錯誤信息回送給調(diào)用者
} catch (ClassNotFoundException e) {
log.error("Could not load method arguments.", e);
RpcConnectionException rpcException =
new RpcConnectionException("Could not load method arguments.", e);
getSender().tell(new Status.Failure(rpcException), getSelf());
} catch (IOException e) {
log.error("Could not deserialize rpc invocation message.", e);
RpcConnectionException rpcException =
new RpcConnectionException("Could not deserialize rpc invocation message.", e);
getSender().tell(new Status.Failure(rpcException), getSelf());
} catch (final NoSuchMethodException e) {
log.error("Could not find rpc method for rpc invocation.", e);
RpcConnectionException rpcException =
new RpcConnectionException("Could not find rpc method for rpc invocation.", e);
getSender().tell(new Status.Failure(rpcException), getSelf());
}
if (rpcMethod != null) {
try {
// this supports declaration of anonymous classes
// 設(shè)置方法可以被訪問
rpcMethod.setAccessible(true);
// 如果方法沒有返回值楷怒,直接調(diào)用
if (rpcMethod.getReturnType().equals(Void.TYPE)) {
// No return value to send back
rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
} else {
// 如果有返回值,調(diào)用后將返回值獲取
final Object result;
try {
result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
} catch (InvocationTargetException e) {
log.debug(
"Reporting back error thrown in remote procedure {}", rpcMethod, e);
// tell the sender about the failure
getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
return;
}
final String methodName = rpcMethod.getName();
// 判斷返回值是不是CompletableFuture瓦灶,如果是的話說明結(jié)果可以異步返回
if (result instanceof CompletableFuture) {
final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;
sendAsyncResponse(responseFuture, methodName);
} else {
// 否則同步返回調(diào)用結(jié)果
sendSyncResponse(result, methodName);
}
}
} catch (Throwable e) {
log.error("Error while executing remote procedure call {}.", rpcMethod, e);
// tell the sender about the failure
getSender().tell(new Status.Failure(e), getSelf());
}
}
}
sendAsyncResponse
方法異步返回調(diào)用結(jié)果鸠删,內(nèi)容如下所示:
private void sendAsyncResponse(CompletableFuture<?> asyncResponse, String methodName) {
final ActorRef sender = getSender();
Promise.DefaultPromise<Object> promise = new Promise.DefaultPromise<>();
FutureUtils.assertNoException(
asyncResponse.handle(
(value, throwable) -> {
// 檢查異步結(jié)果是否有異常
if (throwable != null) {
promise.failure(throwable);
} else {
if (isRemoteSender(sender)) {
// 如果是遠(yuǎn)端發(fā)送的調(diào)用
// 將返回結(jié)果序列化
Either<AkkaRpcSerializedValue, AkkaRpcException>
serializedResult =
serializeRemoteResultAndVerifySize(
value, methodName);
// 如果序列化成功,調(diào)用 promise.success贼陶,否則調(diào)用failure
if (serializedResult.isLeft()) {
promise.success(serializedResult.left());
} else {
promise.failure(serializedResult.right());
}
} else {
// 如果是本地調(diào)用刃泡,不需序列化
promise.success(new Status.Success(value));
}
}
// consume the provided throwable
return null;
}));
// 發(fā)送future到調(diào)用端
Patterns.pipe(promise.future(), getContext().dispatcher()).to(sender);
}
sendSyncResponse
方法為同步返回結(jié)果。內(nèi)容和上面類似每界,只是不用在返回future給調(diào)用端捅僵。
private void sendSyncResponse(Object response, String methodName) {
if (isRemoteSender(getSender())) {
Either<AkkaRpcSerializedValue, AkkaRpcException> serializedResult =
serializeRemoteResultAndVerifySize(response, methodName);
if (serializedResult.isLeft()) {
getSender().tell(new Status.Success(serializedResult.left()), getSelf());
} else {
getSender().tell(new Status.Failure(serializedResult.right()), getSelf());
}
} else {
getSender().tell(new Status.Success(response), getSelf());
}
}
RpcEndpoint
最后我們分析到了RpcEndpoint
。它是提供RPC服務(wù)的被調(diào)用端眨层。需要接收遠(yuǎn)程調(diào)用的類例如JobMaster庙楚,TaskExecutor,Dispatcher等都需要繼承這個RpcEndpoint
趴樱。
除此之外馒闷,RpcEndpoint
還可以在Actor主線程池定時執(zhí)行Runnable。安排定時任務(wù)的方法如下:
protected void scheduleRunAsync(Runnable runnable, Time delay) {
scheduleRunAsync(runnable, delay.getSize(), delay.getUnit());
}
protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
rpcServer.scheduleRunAsync(runnable, unit.toMillis(delay));
}
rpcServer
的scheduleRunAsync
方法位于AkkaInvocationHandler
中:
@Override
public void scheduleRunAsync(Runnable runnable, long delayMillis) {
checkNotNull(runnable, "runnable");
checkArgument(delayMillis >= 0, "delay must be zero or greater");
if (isLocal) {
long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);
tell(new RunAsync(runnable, atTimeNanos));
} else {
throw new RuntimeException(
"Trying to send a Runnable to a remote actor at "
+ rpcEndpoint.path()
+ ". This is not supported.");
}
}
最后是tell
方法叁征,把message發(fā)送給了rpcEndpoint
對應(yīng)的actor纳账。
protected void tell(Object message) {
rpcEndpoint.tell(message, ActorRef.noSender());
}
Actor處理RunAsync的方法為handleRunAsync
。如下所示:
private void handleRunAsync(RunAsync runAsync) {
final long timeToRun = runAsync.getTimeNanos();
final long delayNanos;
// 如果沒配置運行時刻捺疼,或者是運行時刻已過
if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime()) <= 0) {
// run immediately
// 需要立刻執(zhí)行
try {
runAsync.getRunnable().run();
} catch (Throwable t) {
log.error("Caught exception while executing runnable in main thread.", t);
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
}
} else {
// schedule for later. send a new message after the delay, which will then be
// immediately executed
// 否則需要延遲執(zhí)行
FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);
final Object envelopedSelfMessage = envelopeSelfMessage(message);
// 使用ActorSystem的scheduler安排一個延時執(zhí)行任務(wù)
getContext()
.system()
.scheduler()
.scheduleOnce(
delay,
getSelf(),
envelopedSelfMessage,
getContext().dispatcher(),
ActorRef.noSender());
}
}
本博客為作者原創(chuàng)疏虫,歡迎大家參與討論和批評指正。如需轉(zhuǎn)載請注明出處。