Flink 源碼之RPC調(diào)用

Flink源碼分析系列文檔目錄

請點擊:Flink 源碼分析系列文檔目錄

簡介

本篇為大家分析Flink底層RPC調(diào)用的執(zhí)行過程和原理频轿。Flink RPC使用了Akka Actor框架。建議閱讀前需要Akka Actor的基礎(chǔ)知識居扒。

Flink為了簡化Actor的使用蝙云,對Actor做了一系列封裝佃却。定義了如下幾個重要的接口或類:

  • RpcService:定義了RPC服務(wù)端(調(diào)用和被調(diào)用端)的一系列行為经备,遠(yuǎn)程調(diào)用需要指定地址和端口號捻爷。包含服務(wù)的啟停疑苔,和連接其他遠(yuǎn)程的RpcService镜豹。
  • AkkaRpcService:RpcService的實現(xiàn)類傲须。
  • RpcServer:RPC服務(wù),它自己也是個RpcGateway趟脂。
  • AkkaInvocationHandler:RpcServer的實現(xiàn)類泰讽。RpcService連接其他遠(yuǎn)程RpcService返回的實際上是一個代理類。這個代理類的真實執(zhí)行邏輯位于AkkaInvocationHandler
  • RpcGateway:RPC網(wǎng)關(guān)已卸,調(diào)用其他RpcService的入口佛玄。一個RpcService連接其他遠(yuǎn)程RpcService返回的對象是RpcGateway類型。通常RpcGateway還實現(xiàn)了其他接口累澡,這個接口和遠(yuǎn)端被調(diào)用的RpcService實現(xiàn)的接口相同梦抢。上面說過,RpcService連接其他遠(yuǎn)程RpcService返回的實際上是一個代理類愧哟,我們調(diào)用這個代理類的時候奥吩,底層會通過Akka調(diào)用遠(yuǎn)程RpcService的同名方法。
  • RpcEndpoint:RPC被調(diào)用端蕊梧,需要持有一個RpcService霞赫,并且實現(xiàn)自己的業(yè)務(wù)邏輯接口,以供RpcGateway遠(yuǎn)程調(diào)用的時候執(zhí)行肥矢。
Flink RPC

Flink封裝后的Akka使用起來非常簡單端衰。我們可以參考Flink單元測試RemoteAkkaRpcActorTestcanRespondWithSerializedValueRemotely方法,它通過remoteGateway遠(yuǎn)程調(diào)用AkkaRpcActorTest.SerializedValueRespondingEndpointgetSerializedValueSynchronously方法甘改。代碼如下所示:

@Test
public void canRespondWithSerializedValueRemotely() throws Exception {
    try (final AkkaRpcActorTest.SerializedValueRespondingEndpoint endpoint =
            new AkkaRpcActorTest.SerializedValueRespondingEndpoint(rpcService)) {
        endpoint.start();

        final AkkaRpcActorTest.SerializedValueRespondingGateway remoteGateway =
                otherRpcService
                        .connect(
                                endpoint.getAddress(),
                                AkkaRpcActorTest.SerializedValueRespondingGateway.class)
                        .join();

        assertThat(
                remoteGateway.getSerializedValueSynchronously(),
                equalTo(AkkaRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE));

        final CompletableFuture<SerializedValue<String>> responseFuture =
                remoteGateway.getSerializedValue();

        assertThat(
                responseFuture.get(),
                equalTo(AkkaRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE));
    }
}

單元測試中如何創(chuàng)建RpcService的方法沒有貼出旅东。

本篇從TaskManagerRunner創(chuàng)建RpcService開始,分析Flink封裝Akka Actor的方法和整個調(diào)用流程十艾。

RpcService

TaskManager創(chuàng)建RpcService

TaskManager的createRpcService方法根據(jù)Flink的配置文件和高可用服務(wù)玉锌,創(chuàng)建出RpcService。

TaskManagerRunnercreateRpcService方法如下:

@VisibleForTesting
static RpcService createRpcService(
        final Configuration configuration, final HighAvailabilityServices haServices)
        throws Exception {

    checkNotNull(configuration);
    checkNotNull(haServices);

    return AkkaRpcServiceUtils.createRemoteRpcService(
            configuration,
            determineTaskManagerBindAddress(configuration, haServices),
            configuration.getString(TaskManagerOptions.RPC_PORT),
            configuration.getString(TaskManagerOptions.BIND_HOST),
            configuration.getOptional(TaskManagerOptions.RPC_BIND_PORT));
}

創(chuàng)建RpcService的邏輯在AkkaRpcServiceUtils工具類中疟羹。接下來我們談一下RpcService和它的創(chuàng)建過程主守。

RpcService定義

負(fù)責(zé)使用Akka Actor執(zhí)行RPC。擁有一個子類AkkaRpcService榄融。一個進程擁有一個RpcService参淫。用于統(tǒng)籌RPC調(diào)用服務(wù),連接到本地Endpoint啟動服務(wù)愧杯,供遠(yuǎn)端調(diào)用(RpcEndpoint是被調(diào)用端)涎才,或者連接到遠(yuǎn)程RPC服務(wù),創(chuàng)建出一個RpcGateway(調(diào)用端)力九,從而可以調(diào)用遠(yuǎn)端耍铜。

RpcService接口代碼如下所示:

public interface RpcService {

    /**
     * Return the hostname or host address under which the rpc service can be reached. If the rpc
     * service cannot be contacted remotely, then it will return an empty string.
     *
     * @return Address of the rpc service or empty string if local rpc service
     */
    // 獲取RPC服務(wù)的地址
    // 如果是本地RPC服務(wù)的話,返回空
    String getAddress();

    /**
     * Return the port under which the rpc service is reachable. If the rpc service cannot be
     * contacted remotely, then it will return -1.
     *
     * @return Port of the rpc service or -1 if local rpc service
     */
    // 返回RPC服務(wù)端口號
    // 如果是本地RPC服務(wù)跌前,返回-1
    int getPort();

    /**
     * Connect to a remote rpc server under the provided address. Returns a rpc gateway which can be
     * used to communicate with the rpc server. If the connection failed, then the returned future
     * is failed with a {@link RpcConnectionException}.
     *
     * @param address Address of the remote rpc server
     * @param clazz Class of the rpc gateway to return
     * @param <C> Type of the rpc gateway to return
     * @return Future containing the rpc gateway or an {@link RpcConnectionException} if the
     *     connection attempt failed
     */
    // 根據(jù)提供的地址棕兼,連接到遠(yuǎn)程RPC服務(wù)
    // 返回C類型RPC網(wǎng)關(guān),用于和遠(yuǎn)端通信
    <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);

    /**
     * Connect to a remote fenced rpc server under the provided address. Returns a fenced rpc
     * gateway which can be used to communicate with the rpc server. If the connection failed, then
     * the returned future is failed with a {@link RpcConnectionException}.
     *
     * @param address Address of the remote rpc server
     * @param fencingToken Fencing token to be used when communicating with the server
     * @param clazz Class of the rpc gateway to return
     * @param <F> Type of the fencing token
     * @param <C> Type of the rpc gateway to return
     * @return Future containing the fenced rpc gateway or an {@link RpcConnectionException} if the
     *     connection attempt failed
     */
    // 創(chuàng)建一個具有Fence功能的RPC網(wǎng)關(guān)
    // Fence是防止腦裂的機制抵乓,我們在AkkaRpcActor分析Fencing機制原理
    <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
            String address, F fencingToken, Class<C> clazz);

    /**
     * Start a rpc server which forwards the remote procedure calls to the provided rpc endpoint.
     *
     * @param rpcEndpoint Rpc protocol to dispatch the rpcs to
     * @param <C> Type of the rpc endpoint
     * @return Self gateway to dispatch remote procedure calls to oneself
     */
    // 啟動RPC服務(wù)伴挚,將接收到的遠(yuǎn)程請求發(fā)送給rpcEndpoint處理
    <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint);

    /**
     * Fence the given RpcServer with the given fencing token.
     *
     * <p>Fencing the RpcServer means that we fix the fencing token to the provided value. All RPCs
     * will then be enriched with this fencing token. This expects that the receiving RPC endpoint
     * extends {@link FencedRpcEndpoint}.
     *
     * @param rpcServer to fence with the given fencing token
     * @param fencingToken to fence the RpcServer with
     * @param <F> type of the fencing token
     * @return Fenced RpcServer
     */
    // 和上面的方法一樣靶衍,只不過啟用了防腦裂功能
    <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F fencingToken);

    /**
     * Stop the underlying rpc server of the provided self gateway.
     *
     * @param selfGateway Self gateway describing the underlying rpc server
     */
    // 停止RPC服務(wù)
    void stopServer(RpcServer selfGateway);

    /**
     * Trigger the asynchronous stopping of the {@link RpcService}.
     *
     * @return Future which is completed once the {@link RpcService} has been fully stopped.
     */
    // 異步停止RPC服務(wù)
    CompletableFuture<Void> stopService();

    /**
     * Returns a future indicating when the RPC service has been shut down.
     *
     * @return Termination future
     */
    // 返回一個CompletableFuture,在RPC服務(wù)完全關(guān)閉之后調(diào)用
    CompletableFuture<Void> getTerminationFuture();

    /**
     * Gets the executor, provided by this RPC service. This executor can be used for example for
     * the {@code handleAsync(...)} or {@code thenAcceptAsync(...)} methods of futures.
     *
     * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
     * concurrent invocations and is therefore not suitable to run completion methods of futures
     * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the {@link
     * RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that {@code RpcEndpoint}.
     *
     * @return The execution context provided by the RPC service
     */
    // 獲取RPC服務(wù)執(zhí)行線程茎芋,可用于handleAsync等異步邏輯執(zhí)行
    Executor getExecutor();

    /**
     * Gets a scheduled executor from the RPC service. This executor can be used to schedule tasks
     * to be executed in the future.
     *
     * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
     * concurrent invocations and is therefore not suitable to run completion methods of futures
     * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the {@link
     * RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that {@code RpcEndpoint}.
     *
     * @return The RPC service provided scheduled executor
     */
    // 獲取定時任務(wù)線程池
    ScheduledExecutor getScheduledExecutor();

    /**
     * Execute the runnable in the execution context of this RPC Service, as returned by {@link
     * #getExecutor()}, after a scheduled delay.
     *
     * @param runnable Runnable to be executed
     * @param delay The delay after which the runnable will be executed
     */
    // 設(shè)置一個定時任務(wù)颅眶,在ScheduledExecutor中執(zhí)行
    ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);

    /**
     * Execute the given runnable in the executor of the RPC service. This method can be used to run
     * code outside of the main thread of a {@link RpcEndpoint}.
     *
     * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
     * concurrent invocations and is therefore not suitable to run completion methods of futures
     * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the {@link
     * RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that {@code RpcEndpoint}.
     *
     * @param runnable to execute
     */
    // 在RPC服務(wù)線程池中運行runnable
    void execute(Runnable runnable);

    /**
     * Execute the given callable and return its result as a {@link CompletableFuture}. This method
     * can be used to run code outside of the main thread of a {@link RpcEndpoint}.
     *
     * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against any
     * concurrent invocations and is therefore not suitable to run completion methods of futures
     * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the {@link
     * RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that {@code RpcEndpoint}.
     *
     * @param callable to execute
     * @param <T> is the return value type
     * @return Future containing the callable's future result
     */
    // 在RPC服務(wù)線程池異步運行callable任務(wù),異步結(jié)果以CompletableFuture形式返回
    <T> CompletableFuture<T> execute(Callable<T> callable);
}

AkkaRpcServiceUtils

一個負(fù)責(zé)創(chuàng)建AkkaRpcService的工具類田弥。

我們繼續(xù)第一節(jié)的創(chuàng)建RpcService的過程涛酗。查看AkkaRpcServiceUtilscreateRemoteRpcService方法,如下所示:

public static AkkaRpcService createRemoteRpcService(
        Configuration configuration,
        @Nullable String externalAddress,
        String externalPortRange,
        @Nullable String bindAddress,
        @SuppressWarnings("OptionalUsedAsFieldOrParameterType") Optional<Integer> bindPort)
        throws Exception {
    // 創(chuàng)建一個serviceBuilder
    final AkkaRpcServiceBuilder akkaRpcServiceBuilder =
            AkkaRpcServiceUtils.remoteServiceBuilder(
                    configuration, externalAddress, externalPortRange);

    // 傳入bind地址和bind端口號配置
    if (bindAddress != null) {
        akkaRpcServiceBuilder.withBindAddress(bindAddress);
    }

    bindPort.ifPresent(akkaRpcServiceBuilder::withBindPort);

    // 創(chuàng)建并啟動RpcService
    return akkaRpcServiceBuilder.createAndStart();
}

AkkaRpcService通過建造者模式構(gòu)建偷厦,在給予akkaRpcServiceBuilder足夠配置信息后煤杀,調(diào)用createAndStart方法創(chuàng)建出AkkaRpcService

public AkkaRpcService createAndStart() throws Exception {
    // 獲取線程池并行度配置
    if (actorSystemExecutorConfiguration == null) {
        actorSystemExecutorConfiguration =
                BootstrapTools.ForkJoinExecutorConfiguration.fromConfiguration(
                        configuration);
    }

    final ActorSystem actorSystem;

    // 如果沒有配置外部訪問地址沪哺,創(chuàng)建本地ActorSystem
    if (externalAddress == null) {
        // create local actor system
        actorSystem =
                BootstrapTools.startLocalActorSystem(
                        configuration,
                        actorSystemName,
                        logger,
                        actorSystemExecutorConfiguration,
                        customConfig);
    } else {
        // 否則創(chuàng)建一個遠(yuǎn)程ActorSystem
        // create remote actor system
        actorSystem =
                BootstrapTools.startRemoteActorSystem(
                        configuration,
                        actorSystemName,
                        externalAddress,
                        externalPortRange,
                        bindAddress,
                        Optional.ofNullable(bindPort),
                        logger,
                        actorSystemExecutorConfiguration,
                        customConfig);
    }

    // 返回AkkaRpcService實例
    // 在后面章節(jié)分析
    return new AkkaRpcService(
            actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
}

緊接著我們分析ActorSystem的創(chuàng)建過程沈自。

BootstrapTools

startLocalActorSystem

此方法創(chuàng)建一個用于本地調(diào)用的ActorSystem

BootstrapToolsstartLocalActorSystem方法內(nèi)容如下:

public static ActorSystem startLocalActorSystem(
        Configuration configuration,
        String actorSystemName,
        Logger logger,
        ActorSystemExecutorConfiguration actorSystemExecutorConfiguration,
        Config customConfig)
        throws Exception {

    logger.info("Trying to start local actor system");

    try {
        // 獲取Akka配置辜妓,externalAddress和bindAddress為空
        // 對應(yīng)的是一個本地的ActorSystem配置
        Config akkaConfig =
                AkkaUtils.getAkkaConfig(
                        configuration,
                        scala.Option.empty(),
                        scala.Option.empty(),
                        actorSystemExecutorConfiguration.getAkkaConfig());

        // 如果有自定義配置枯途,將基本配置和自定義配置拼裝起來,重復(fù)的配置項基本配置優(yōu)先
        if (customConfig != null) {
            akkaConfig = customConfig.withFallback(akkaConfig);
        }

        // 啟動ActorSystem
        return startActorSystem(akkaConfig, actorSystemName, logger);
    } catch (Throwable t) {
        throw new Exception("Could not create actor system", t);
    }
}

startRemoteActorSystem

創(chuàng)建一個可以遠(yuǎn)程調(diào)用的ActorSystem籍滴。

startRemoteActorSystem方法內(nèi)容如下:

public static ActorSystem startRemoteActorSystem(
        Configuration configuration,
        String actorSystemName,
        String externalAddress,
        String externalPortRange,
        String bindAddress,
        @SuppressWarnings("OptionalUsedAsFieldOrParameterType") Optional<Integer> bindPort,
        Logger logger,
        ActorSystemExecutorConfiguration actorSystemExecutorConfiguration,
        Config customConfig)
        throws Exception {

    // parse port range definition and create port iterator
    Iterator<Integer> portsIterator;
    try {
        // 解析端口范圍字符串酪夷,生成整數(shù)類型端口集合
        portsIterator = NetUtils.getPortRangeFromString(externalPortRange);
    } catch (Exception e) {
        throw new IllegalArgumentException(
                "Invalid port range definition: " + externalPortRange);
    }

    while (portsIterator.hasNext()) {
        final int externalPort = portsIterator.next();

        // 逐個嘗試范圍內(nèi)的端口,直到啟動ActorSystem成功
        try {
            return startRemoteActorSystem(
                    configuration,
                    actorSystemName,
                    externalAddress,
                    externalPort,
                    bindAddress,
                    bindPort.orElse(externalPort),
                    logger,
                    actorSystemExecutorConfiguration,
                    customConfig);
        } catch (Exception e) {
            // we can continue to try if this contains a netty channel exception
            Throwable cause = e.getCause();
            if (!(cause instanceof org.jboss.netty.channel.ChannelException
                    || cause instanceof java.net.BindException)) {
                throw e;
            } // else fall through the loop and try the next port
        }
    }

    // 如果執(zhí)行到這一步孽惰,說明范圍內(nèi)所有端口都無法使用
    // if we come here, we have exhausted the port range
    throw new BindException(
            "Could not start actor system on any port in port range " + externalPortRange);
}

注:
Flink1.11之后支持TaskManager JobManager本地和遠(yuǎn)程使用不同的地址和端口晚岭,從而支持Docker和NAT端口映射。具體內(nèi)容參見Flink-15911和Flink-15154勋功。

配置遠(yuǎn)程的監(jiān)聽地址和端口:

  • jobmanager.rpc.address
  • jobmanager.rpc.port
  • taskmanager.host
  • taskmanager.rpc.port
  • taskmanager.data.port

配置本地的監(jiān)聽地址和端口:

  • jobmanager.bind-host
  • jobmanager.rpc.bind-port
  • taskmanager.bind-host
  • taskmanager.rpc.bind-port
  • taskmanager.data.bind-port

上面的方法最后調(diào)用了重載方法坦报。該方法內(nèi)容和startLocalActorSystem類似。

private static ActorSystem startRemoteActorSystem(
    Configuration configuration,
    String actorSystemName,
    String externalAddress,
    int externalPort,
    String bindAddress,
    int bindPort,
    Logger logger,
    ActorSystemExecutorConfiguration actorSystemExecutorConfiguration,
    Config customConfig)
    throws Exception {

    // 將地址和端口規(guī)范化后返回
    String externalHostPortUrl =
        NetUtils.unresolvedHostAndPortToNormalizedString(externalAddress, externalPort);
    String bindHostPortUrl =
        NetUtils.unresolvedHostAndPortToNormalizedString(bindAddress, bindPort);
    logger.info(
        "Trying to start actor system, external address {}, bind address {}.",
        externalHostPortUrl,
        bindHostPortUrl);

    try {
        // 和startLocalActorSystem一樣
        // 多了傳入externalAddress port狂鞋,和bindAddress以及port
        Config akkaConfig =
            AkkaUtils.getAkkaConfig(
            configuration,
            new Some<>(new Tuple2<>(externalAddress, externalPort)),
            new Some<>(new Tuple2<>(bindAddress, bindPort)),
            actorSystemExecutorConfiguration.getAkkaConfig());

        if (customConfig != null) {
            akkaConfig = customConfig.withFallback(akkaConfig);
        }

        return startActorSystem(akkaConfig, actorSystemName, logger);
    } catch (Throwable t) {
        if (t instanceof ChannelException) {
            Throwable cause = t.getCause();
            if (cause != null && t.getCause() instanceof BindException) {
                throw new IOException(
                    "Unable to create ActorSystem at address "
                    + bindHostPortUrl
                    + " : "
                    + cause.getMessage(),
                    t);
            }
        }
        throw new Exception("Could not create actor system", t);
    }
}

startActorSystem

最后片择,通過AkkaUtils創(chuàng)建出ActorSystem

private static ActorSystem startActorSystem(
    Config akkaConfig, String actorSystemName, Logger logger) {
    logger.debug("Using akka configuration\n {}", akkaConfig);
    ActorSystem actorSystem = AkkaUtils.createActorSystem(actorSystemName, akkaConfig);

    logger.info("Actor system started at {}", AkkaUtils.getAddress(actorSystem));
    return actorSystem;
}

AkkaUtils

AkkaUtils負(fù)責(zé)生成Akka配置和創(chuàng)建ActorSystem骚揍。這里我們主要關(guān)注它生成Akka配置的方法字管。

下面是getAkkaConfig方法,注意該類使用Scala編寫信不。

@throws(classOf[UnknownHostException])
def getAkkaConfig(configuration: Configuration,
                  externalAddress: Option[(String, Int)],
                  bindAddress: Option[(String, Int)],
                  executorConfig: Config): Config = {
    // 獲取Akka基本配置嘲叔,以及executor配置
    val defaultConfig = getBasicAkkaConfig(configuration).withFallback(executorConfig)

    // 根據(jù)bindHostname,bindPort抽活,externalHostname硫戈,externalPort是否傳入,生成不同的配置
    externalAddress match {

        case Some((externalHostname, externalPort)) =>

        bindAddress match {

            case Some((bindHostname, bindPort)) =>

            val remoteConfig = getRemoteAkkaConfig(
                configuration, bindHostname, bindPort, externalHostname, externalPort)

            remoteConfig.withFallback(defaultConfig)

            case None =>
            val remoteConfig = getRemoteAkkaConfig(configuration,
                                                   // the wildcard IP lets us bind to all network interfaces
                                                   NetUtils.getWildcardIPAddress, externalPort, externalHostname, externalPort)

            remoteConfig.withFallback(defaultConfig)
        }

        case None =>
        defaultConfig
    }
}

Akka配置

從AkkaConfig中我們可以找到Akka基礎(chǔ)配置和遠(yuǎn)程調(diào)用配置模版酌壕。這些模版非常詳細(xì)掏愁,它使用了Scala的String interpolation方式將配置值填充入模版。

基本配置

Akka基本配置生成邏輯位于getBasicAkkaConfig方法中卵牍。組裝的配置文件如下:

val config =
  s"""
    |akka {
    | daemonic = off
    |
    | loggers = ["akka.event.slf4j.Slf4jLogger"]
    | logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
    | log-config-on-start = off
    | logger-startup-timeout = 30s
    |
    | jvm-exit-on-fatal-error = $jvmExitOnFatalError
    |
    | serialize-messages = off
    |
    | loglevel = $logLevel
    | stdout-loglevel = OFF
    |
    | log-dead-letters = $logLifecycleEvents
    | log-dead-letters-during-shutdown = $logLifecycleEvents
    |
    | actor {
    |   guardian-supervisor-strategy = $supervisorStrategy
    |
    |   warn-about-java-serializer-usage = off
    |
    |   default-dispatcher {
    |     throughput = $akkaThroughput
    |   }
    |
    |   supervisor-dispatcher {
    |     type = Dispatcher
    |     executor = "thread-pool-executor"
    |     thread-pool-executor {
    |       core-pool-size-min = 1
    |       core-pool-size-max = 1
    |     }
    |   }
    | }
    |}
  """.stripMargin

遠(yuǎn)程配置

Akka的遠(yuǎn)程配置組裝方法位于getRemoteAkkaConfig中果港。這里使用了akka-remote模塊實現(xiàn)遠(yuǎn)程調(diào)用。

val configString =
s"""
         |akka {
         |  actor {
         |    provider = "akka.remote.RemoteActorRefProvider"
         |  }
         |
         |  remote {
         |    startup-timeout = $startupTimeout
         |
         |    transport-failure-detector{
         |      acceptable-heartbeat-pause = $transportHeartbeatPause
         |      heartbeat-interval = $transportHeartbeatInterval
         |      threshold = $transportThreshold
         |    }
         |
         |    netty {
         |      tcp {
         |        transport-class = "akka.remote.transport.netty.NettyTransport"
         |        port = $externalPort
         |        bind-port = $port
         |        connection-timeout = $akkaTCPTimeout
         |        maximum-frame-size = $akkaFramesize
         |        tcp-nodelay = on
         |
         |        client-socket-worker-pool {
         |          pool-size-min = $clientSocketWorkerPoolPoolSizeMin
         |          pool-size-max = $clientSocketWorkerPoolPoolSizeMax
         |          pool-size-factor = $clientSocketWorkerPoolPoolSizeFactor
         |        }
         |
         |        server-socket-worker-pool {
         |          pool-size-min = $serverSocketWorkerPoolPoolSizeMin
         |          pool-size-max = $serverSocketWorkerPoolPoolSizeMax
         |          pool-size-factor = $serverSocketWorkerPoolPoolSizeFactor
         |        }
         |      }
         |    }
         |
         |    log-remote-lifecycle-events = $logLifecycleEvents
         |
         |    retry-gate-closed-for = ${retryGateClosedFor + " ms"}
         |  }
         |}
       """.stripMargin

val hostnameConfigString =
s"""
         |akka {
         |  remote {
         |    netty {
         |      tcp {
         |        hostname = "$effectiveHostname"
         |        bind-hostname = "$bindAddress"
         |      }
         |    }
         |  }
         |}
       """.stripMargin

val sslConfigString = if (akkaEnableSSLConfig) {
    s"""
         |akka {
         |  remote {
         |
         |    enabled-transports = ["akka.remote.netty.ssl"]
         |
         |    netty {
         |
         |      ssl = $${akka.remote.netty.tcp}
         |
         |      ssl {
         |
         |        enable-ssl = $akkaEnableSSL
         |        ssl-engine-provider = org.apache.flink.runtime.akka.CustomSSLEngineProvider
         |        security {
         |          key-store = "$akkaSSLKeyStore"
         |          key-store-password = "$akkaSSLKeyStorePassword"
         |          key-password = "$akkaSSLKeyPassword"
         |          trust-store = "$akkaSSLTrustStore"
         |          trust-store-password = "$akkaSSLTrustStorePassword"
         |          protocol = $akkaSSLProtocol
         |          enabled-algorithms = $akkaSSLAlgorithms
         |          random-number-generator = ""
         |          require-mutual-authentication = on
         |          cert-fingerprints = $akkaSSLCertFingerprints
         |        }
         |      }
         |    }
         |  }
         |}
       """.stripMargin
}else{
    ""
}

最后使用

ConfigFactory.parseString(configString + hostnameConfigString + sslConfigString).resolve()

將這3個配置項組裝融合在一起糊昙,生成完整配置辛掠。

AkkaRpcService

前面章節(jié)講述了RpcService的創(chuàng)建過程,接下來我們細(xì)看下AkkaRpcService释牺。

AkkaRpcServiceRpcService的唯一實現(xiàn)類萝衩。它除了持有AkkaSystem的引用外,還維護所有注冊了的RpcEndpoint的引用没咙,為每個RpcEndpoint分配一個ActerRef并保存他們的對應(yīng)關(guān)系猩谊。

@GuardedBy("lock")
private final Map<ActorRef, RpcEndpoint> actors = new HashMap<>(4);

我們從構(gòu)造函數(shù)開始分析:

@VisibleForTesting
public AkkaRpcService(
        final ActorSystem actorSystem, final AkkaRpcServiceConfiguration configuration) {
    this.actorSystem = checkNotNull(actorSystem, "actor system");
    this.configuration = checkNotNull(configuration, "akka rpc service configuration");

    Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);

    // 獲取ActorSystem的host地址
    if (actorSystemAddress.host().isDefined()) {
        address = actorSystemAddress.host().get();
    } else {
        address = "";
    }

    // 獲取端口號,如果沒有配置祭刚,返回-1
    if (actorSystemAddress.port().isDefined()) {
        port = (Integer) actorSystemAddress.port().get();
    } else {
        port = -1;
    }

    // 是否捕捉Ask操作的調(diào)用棧
    captureAskCallstacks = configuration.captureAskCallStack();

    // ActorSystem scheduler的一個包裝類
    internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);

    // RPC服務(wù)停機之后的異步回調(diào)
    terminationFuture = new CompletableFuture<>();

    // 標(biāo)記RPC服務(wù)的狀態(tài)牌捷,當(dāng)前是正在運行
    stopped = false;

    // 啟動SupervisorActor
    supervisor = startSupervisorActor();
}

注意到構(gòu)造函數(shù)的最后一行啟動SupervisorActor。一個AkkaRpcService可以連接提供給多個RpcEndpoint使用涡驮,它為每個RpcEndpoint創(chuàng)建一個Actor暗甥,這些Actor都是SupervisorActor的子Actor

private Supervisor startSupervisorActor() {
    final ExecutorService terminationFutureExecutor =
        Executors.newSingleThreadExecutor(
        new ExecutorThreadFactory(
            "AkkaRpcService-Supervisor-Termination-Future-Executor"));
    // 創(chuàng)建一個SupervisorActor
    final ActorRef actorRef =
        SupervisorActor.startSupervisorActor(actorSystem, terminationFutureExecutor);

    // 將SupervisorActor和terminationFutureExecutor包裝后返回
    return Supervisor.create(actorRef, terminationFutureExecutor);
}

startServer

startServer方法在創(chuàng)建RpcEndpoint的時候調(diào)用:

protected RpcEndpoint(final RpcService rpcService) {
    this(rpcService, UUID.randomUUID().toString());
}
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
    this.rpcService = checkNotNull(rpcService, "rpcService");
    this.endpointId = checkNotNull(endpointId, "endpointId");
    // 在創(chuàng)建RpcEndpoint的時候需要傳入RpcService捉捅,同時調(diào)用了它的startServer方法
    this.rpcServer = rpcService.startServer(this);

    this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}

接下來我們分析AkkaRpcServerstartServer方法:

@Override
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
    checkNotNull(rpcEndpoint, "rpc endpoint");

    // 向SupervisorActor注冊撤防,生成一個新的Actor
    final SupervisorActor.ActorRegistration actorRegistration =
            registerAkkaRpcActor(rpcEndpoint);
    // 獲取這個Actor
    final ActorRef actorRef = actorRegistration.getActorRef();
    // 獲取這個Actor終止運行時候的CompletableFuture
    final CompletableFuture<Void> actorTerminationFuture =
            actorRegistration.getTerminationFuture();

    LOG.info(
            "Starting RPC endpoint for {} at {} .",
            rpcEndpoint.getClass().getName(),
            actorRef.path());

    // 獲取akka地址和hostname
    final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
    final String hostname;
    Option<String> host = actorRef.path().address().host();
    if (host.isEmpty()) {
        hostname = "localhost";
    } else {
        hostname = host.get();
    }

    // 這個方法獲取rpcEndpoint所有實現(xiàn)的接口并保存入HashSet
    Set<Class<?>> implementedRpcGateways =
            new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));

    // 接口加入RpcServer,AkkaBasedEndpoint這兩個
    implementedRpcGateways.add(RpcServer.class);
    implementedRpcGateways.add(AkkaBasedEndpoint.class);

    final InvocationHandler akkaInvocationHandler;

    // 下面是創(chuàng)建RpcServer代理類的過程
    
    // 根據(jù)RpcEndpoint的類型棒口,使用不同的InvocationHandler
    if (rpcEndpoint instanceof FencedRpcEndpoint) {
        // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
        akkaInvocationHandler =
                new FencedAkkaInvocationHandler<>(
                        akkaAddress,
                        hostname,
                        actorRef,
                        configuration.getTimeout(),
                        configuration.getMaximumFramesize(),
                        actorTerminationFuture,
                        ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
                        captureAskCallstacks);

        implementedRpcGateways.add(FencedMainThreadExecutable.class);
    } else {
        akkaInvocationHandler =
                new AkkaInvocationHandler(
                        akkaAddress,
                        hostname,
                        actorRef,
                        configuration.getTimeout(),
                        configuration.getMaximumFramesize(),
                        actorTerminationFuture,
                        captureAskCallstacks);
    }

    // Rather than using the System ClassLoader directly, we derive the ClassLoader
    // from this class . That works better in cases where Flink runs embedded and all Flink
    // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
    ClassLoader classLoader = getClass().getClassLoader();

    // 使用JDK代理方式創(chuàng)建一個RpcServer
    // 調(diào)用RpcServer實際上執(zhí)行的是akkaInvocationHandler
    // 接下來的一章專門分析
    @SuppressWarnings("unchecked")
    RpcServer server =
            (RpcServer)
                    Proxy.newProxyInstance(
                            classLoader,
                            implementedRpcGateways.toArray(
                                    new Class<?>[implementedRpcGateways.size()]),
                            akkaInvocationHandler);

    return server;
}

registerAkkaRpcActor方法從SupervisorActor創(chuàng)建出子Actor寄月,類型為FencedAkkaRpcActor或者是AkkaRpcActor

private <C extends RpcEndpoint & RpcGateway>
    SupervisorActor.ActorRegistration registerAkkaRpcActor(C rpcEndpoint) {
    final Class<? extends AbstractActor> akkaRpcActorType;

    // 判斷RpcEndpoint的類型无牵,創(chuàng)建對應(yīng)的Actor
    if (rpcEndpoint instanceof FencedRpcEndpoint) {
        akkaRpcActorType = FencedAkkaRpcActor.class;
    } else {
        akkaRpcActorType = AkkaRpcActor.class;
    }

    synchronized (lock) {
        // 先檢查RpcService必須為停止?fàn)顟B(tài)
        checkState(!stopped, "RpcService is stopped");

        // 從SupervisorActor創(chuàng)建一個新的Actor
        final SupervisorActor.StartAkkaRpcActorResponse startAkkaRpcActorResponse =
            SupervisorActor.startAkkaRpcActor(
            // 獲取SupervisorActor
            supervisor.getActor(),
            // 傳入Actor構(gòu)造工廠方法
            actorTerminationFuture ->
            Props.create(
                akkaRpcActorType,
                rpcEndpoint,
                actorTerminationFuture,
                getVersion(),
                configuration.getMaximumFramesize()),
            //最后是endpoint的id
            rpcEndpoint.getEndpointId());

        // 為actorRegistration綁定異常響應(yīng)
        final SupervisorActor.ActorRegistration actorRegistration =
            startAkkaRpcActorResponse.orElseThrow(
            cause ->
            new AkkaRpcRuntimeException(
                String.format(
                    "Could not create the %s for %s.",
                    AkkaRpcActor.class.getSimpleName(),
                    rpcEndpoint.getEndpointId()),
                cause));

        // 將這個新創(chuàng)建的actor和關(guān)聯(lián)的rpcEndpoint對應(yīng)關(guān)系保存
        actors.put(actorRegistration.getActorRef(), rpcEndpoint);

        return actorRegistration;
    }
}

接著分析SupervisorActor.startAkkaRpcActor剥懒,即創(chuàng)建Actor的方法。

public static StartAkkaRpcActorResponse startAkkaRpcActor(
    ActorRef supervisor, StartAkkaRpcActor.PropsFactory propsFactory, String endpointId) {
    // 調(diào)用SupervisorActor并同步等待結(jié)果
    // 發(fā)送給SupervisorActor的消息通過createStartAkkaRpcActorMessage創(chuàng)建
    return Patterns.ask(
        supervisor,
        createStartAkkaRpcActorMessage(propsFactory, endpointId),
        RpcUtils.INF_DURATION)
        .toCompletableFuture()
        .thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast)
        .join();
}

接著需要跟蹤SupervisorActor如何響應(yīng)創(chuàng)建actor的請求合敦。

@Override
public Receive createReceive() {
    return receiveBuilder()
        .match(StartAkkaRpcActor.class, this::createStartAkkaRpcActorMessage)
        .matchAny(this::handleUnknownMessage)
        .build();
}

SupervisiorActor接收到StartAkkaRpcActor類型的message初橘,調(diào)用createStartAkkaRpcActorMessage處理方法。代碼如下:

private void createStartAkkaRpcActorMessage(StartAkkaRpcActor startAkkaRpcActor) {
    // 從接收到的消息中取回endpoint id
    final String endpointId = startAkkaRpcActor.getEndpointId();
    // 創(chuàng)建AkkaRpcActorRegistration
    final AkkaRpcActorRegistration akkaRpcActorRegistration =
        new AkkaRpcActorRegistration(endpointId);

    // 從接收到的消息中取回屬性工廠方法充岛,創(chuàng)建出Props
    // 這樣做的目的是方便操作InternalTerminationFuture
    // 屬性構(gòu)建邏輯在RpcService中指定但是InternalTerminationFuture不會暴露給RpcService
    final Props akkaRpcActorProps =
        startAkkaRpcActor
        .getPropsFactory()
        .create(akkaRpcActorRegistration.getInternalTerminationFuture());

    LOG.debug(
        "Starting {} with name {}.",
        akkaRpcActorProps.actorClass().getSimpleName(),
        endpointId);

    try {
        // 創(chuàng)建出子actor保檐,名稱為endpoint的id
        final ActorRef actorRef = getContext().actorOf(akkaRpcActorProps, endpointId);

        // 保存這個actor
        registeredAkkaRpcActors.put(actorRef, akkaRpcActorRegistration);

        // 將actor創(chuàng)建成功的消息發(fā)送回調(diào)用端
        // 創(chuàng)建成功的actor也一并返回
        getSender()
            .tell(
            StartAkkaRpcActorResponse.success(
                ActorRegistration.create(
                    actorRef,
                    akkaRpcActorRegistration
                    .getExternalTerminationFuture())),
            getSelf());
    } catch (AkkaException akkaException) {
        getSender().tell(StartAkkaRpcActorResponse.failure(akkaException), getSelf());
    }
}

connect方法

在這一節(jié)分析connect方法。該方法使用指定的地址崔梗,連接到遠(yuǎn)程Rpc服務(wù)夜只。第二個參數(shù)class的含義為RpcGateway的類型。RpcGateway是和遠(yuǎn)程ActorSystem通信的途徑蒜魄。建立連接之后扔亥,調(diào)用端可以使用返回的C類型對象遠(yuǎn)程調(diào)用actor场躯。

@Override
public <C extends RpcGateway> CompletableFuture<C> connect(
    final String address, final Class<C> clazz) {

    return connectInternal(
        address,
        clazz,
        // 這里是一個工廠方法
        // 從actorRef創(chuàng)建出InvocationHandler
        // 在connectInternal方法中使用這個工廠方法
        (ActorRef actorRef) -> {
            // 獲取actor的地址和hostname
            Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);

            // 和前面一樣,也是創(chuàng)建出一個AkkaInvocationHandler對象
            return new AkkaInvocationHandler(
                addressHostname.f0,
                addressHostname.f1,
                actorRef,
                configuration.getTimeout(),
                configuration.getMaximumFramesize(),
                null,
                captureAskCallstacks);
        });
}

connectInternal方法內(nèi)容如下:

private <C extends RpcGateway> CompletableFuture<C> connectInternal(
    final String address,
    final Class<C> clazz,
    Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
    checkState(!stopped, "RpcService is stopped");

    LOG.debug(
        "Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.",
        address,
        clazz.getName());

    // 從遠(yuǎn)程地址獲取ActorRef旅挤,超時時間在AkkaRpcServiceConfiguration中
    final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);

    // 獲取到ActorRef后執(zhí)行
    final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =
        actorRefFuture.thenCompose(
        (ActorRef actorRef) ->
        FutureUtils.toJava(
            // 發(fā)送RemoteHandshakeMessage給遠(yuǎn)端
            // 這是握手步驟
            Patterns.ask(
                actorRef,
                new RemoteHandshakeMessage(
                    clazz, getVersion()),
                configuration.getTimeout().toMilliseconds())
            .<HandshakeSuccessMessage>mapTo(
                ClassTag$.MODULE$
                .<HandshakeSuccessMessage>apply(
                    HandshakeSuccessMessage
                    .class))));

    // 返回一個創(chuàng)建RpcGateway的CompletableFuture
    return actorRefFuture.thenCombineAsync(
        handshakeFuture,
        (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
            // 在握手操作完畢后執(zhí)行
            // 使用前面所述的方法參數(shù)傳入的invocationHandlerFactory踢关,使用actorRef創(chuàng)建出一個AkkaInvocationHandler
            InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);

            // Rather than using the System ClassLoader directly, we derive the ClassLoader
            // from this class . That works better in cases where Flink runs embedded and
            // all Flink
            // code is loaded dynamically (for example from an OSGI bundle) through a custom
            // ClassLoader
            ClassLoader classLoader = getClass().getClassLoader();

            // 創(chuàng)建出代理類,類型轉(zhuǎn)換為C類型
            // 使用JDK代理
            @SuppressWarnings("unchecked")
            C proxy =
                (C)
                Proxy.newProxyInstance(
                classLoader, new Class<?>[] {clazz}, invocationHandler);

            return proxy;
        },
        actorSystem.dispatcher());
}

通過上面的分析我們了解到最終創(chuàng)建出的無論是RpcServer還是C extends RpcGateway粘茄,都是使用了JDK的動態(tài)代理签舞。真實的執(zhí)行邏輯都在invocationHandler中。下一節(jié)我們分析AkkaInvocationHandler柒瓣。

AkkaInvocationHandler

接上一節(jié)儒搭,RpcServer創(chuàng)建出的遠(yuǎn)程調(diào)用端實際上為JDK代理類,它的真實執(zhí)行邏輯在AkkaInvocationHandler中芙贫。關(guān)于Java動態(tài)代理詳細(xì)介紹參見:Java 動態(tài)代理

發(fā)起RPC遠(yuǎn)程方法調(diào)用

invoke方法根據(jù)聲明method的位置搂鲫,決定調(diào)用AkkaInvocationHandler中的方法還是走遠(yuǎn)程調(diào)用。詳細(xì)分析如下:

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    // 獲取聲明該方法的class或接口
    Class<?> declaringClass = method.getDeclaringClass();

    Object result;
    // 如果method在這幾個接口或類中定義磺平,本地調(diào)用AkkaInvocationHandler的同名方法
    if (declaringClass.equals(AkkaBasedEndpoint.class)
        || declaringClass.equals(Object.class)
        || declaringClass.equals(RpcGateway.class)
        || declaringClass.equals(StartStoppable.class)
        || declaringClass.equals(MainThreadExecutable.class)
        || declaringClass.equals(RpcServer.class)) {
        result = method.invoke(this, args);
    } else if (declaringClass.equals(FencedRpcGateway.class)) {
        // 不支持FencedRpcGateway中的方法
        throw new UnsupportedOperationException(
            "AkkaInvocationHandler does not support the call FencedRpcGateway#"
            + method.getName()
            + ". This indicates that you retrieved a FencedRpcGateway without specifying a "
            + "fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to "
            + "retrieve a properly FencedRpcGateway.");
    } else {
        // 如果method不屬于上面所有的接口默穴,走RPC遠(yuǎn)程調(diào)用
        result = invokeRpc(method, args);
    }

    return result;
}

我們繼續(xù)分析RPC調(diào)用邏輯。invokeRpc方法內(nèi)容和分析如下所示:

private Object invokeRpc(Method method, Object[] args) throws Exception {
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    // 獲取所有的參數(shù)附帶的注解褪秀,以二維數(shù)組形式返回
    Annotation[][] parameterAnnotations = method.getParameterAnnotations();
    
    // 從方法參數(shù)中獲取RpcTimeout注解標(biāo)記的參數(shù)蓄诽,并讀取超時時間
    Time futureTimeout = extractRpcTimeout(parameterAnnotations, args, timeout);

    // 創(chuàng)建Actor需要發(fā)送的信息,包含method調(diào)用的必要參數(shù)
    final RpcInvocation rpcInvocation =
        createRpcInvocationMessage(methodName, parameterTypes, args);

    // 獲取方法的返回類型
    Class<?> returnType = method.getReturnType();

    final Object result;

    // 如果方法無返回值媒吗,直接發(fā)送給遠(yuǎn)程Actor
    if (Objects.equals(returnType, Void.TYPE)) {
        tell(rpcInvocation);

        result = null;
    } else {
        // Capture the call stack. It is significantly faster to do that via an exception than
        // via Thread.getStackTrace(), because exceptions lazily initialize the stack trace,
        // initially only
        // capture a lightweight native pointer, and convert that into the stack trace lazily
        // when needed.
        // 獲取CallStack仑氛,用于拼裝stackstrace,節(jié)省調(diào)用時間
        final Throwable callStackCapture = captureAskCallStack ? new Throwable() : null;

        // execute an asynchronous call
        // 發(fā)送rpcInvocation給遠(yuǎn)程Actor并等待回復(fù)闸英,超時時間為futureTimeout
        final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);

        final CompletableFuture<Object> completableFuture = new CompletableFuture<>();
        // 獲取遠(yuǎn)程調(diào)用結(jié)果锯岖,如有需要則反序列化(返回值為AkkaRpcSerializedValue類型)
        resultFuture.whenComplete(
            (resultValue, failure) -> {
                if (failure != null) {
                    completableFuture.completeExceptionally(
                        resolveTimeoutException(failure, callStackCapture, method));
                } else {
                    completableFuture.complete(
                        deserializeValueIfNeeded(resultValue, method));
                }
            });

        // 如果返回值為CompletableFuture類型,直接返回
        // 否則從completableFuture阻塞等待獲取值之后返回
        if (Objects.equals(returnType, CompletableFuture.class)) {
            result = completableFuture;
        } else {
            try {
                result =
                    completableFuture.get(futureTimeout.getSize(), futureTimeout.getUnit());
            } catch (ExecutionException ee) {
                throw new RpcException(
                    "Failure while obtaining synchronous RPC result.",
                    ExceptionUtils.stripExecutionException(ee));
            }
        }
    }

    return result;
}

接下來的問題是我們?nèi)绾巫尡徽{(diào)用端知道我們希望調(diào)用的方法是什么甫何,參數(shù)是什么出吹。這些關(guān)鍵信息包含在createRpcInvocationMessage方法中。內(nèi)容如下:

protected RpcInvocation createRpcInvocationMessage(
    final String methodName, final Class<?>[] parameterTypes, final Object[] args)
    throws IOException {
    final RpcInvocation rpcInvocation;

    // 如果是本地調(diào)用
    // 創(chuàng)建LocalRpcInvocation
    if (isLocal) {
        rpcInvocation = new LocalRpcInvocation(methodName, parameterTypes, args);
    } else {
        try {
            // 否則創(chuàng)建remoteRpcInvocation
            RemoteRpcInvocation remoteRpcInvocation =
                new RemoteRpcInvocation(methodName, parameterTypes, args);

            // 檢查序列化之后的遠(yuǎn)程方法調(diào)用對象是否超過了最大幀大小
            // RemoteRpcInvocation包含了序列化方法辙喂,這樣才能通過Akka發(fā)送
            // 如果超過不讓發(fā)送
            if (remoteRpcInvocation.getSize() > maximumFramesize) {
                throw new IOException(
                    String.format(
                        "The rpc invocation size %d exceeds the maximum akka framesize.",
                        remoteRpcInvocation.getSize()));
            } else {
                rpcInvocation = remoteRpcInvocation;
            }
        } catch (IOException e) {
            LOG.warn(
                "Could not create remote rpc invocation message. Failing rpc invocation because...",
                e);
            throw e;
        }
    }

    return rpcInvocation;
}

RpcInvocation

RpcInvocation是RPC調(diào)用發(fā)給遠(yuǎn)程Actor的信息載體捶牢。上面的LocalRpcInvocationRemoteRpcInvocation

它封裝RPC遠(yuǎn)程方法調(diào)用的必要參數(shù),分別為方法名巍耗,參數(shù)類型列表和參數(shù)列表秋麸。接口代碼如下所示:

public interface RpcInvocation {

    /**
     * Returns the method's name.
     *
     * @return Method name
     * @throws IOException if the rpc invocation message is a remote message and could not be
     *     deserialized
     * @throws ClassNotFoundException if the rpc invocation message is a remote message and contains
     *     serialized classes which cannot be found on the receiving side
     */
    String getMethodName() throws IOException, ClassNotFoundException;

    /**
     * Returns the method's parameter types
     *
     * @return Method's parameter types
     * @throws IOException if the rpc invocation message is a remote message and could not be
     *     deserialized
     * @throws ClassNotFoundException if the rpc invocation message is a remote message and contains
     *     serialized classes which cannot be found on the receiving side
     */
    Class<?>[] getParameterTypes() throws IOException, ClassNotFoundException;

    /**
     * Returns the arguments of the remote procedure call
     *
     * @return Arguments of the remote procedure call
     * @throws IOException if the rpc invocation message is a remote message and could not be
     *     deserialized
     * @throws ClassNotFoundException if the rpc invocation message is a remote message and contains
     *     serialized classes which cannot be found on the receiving side
     */
    Object[] getArgs() throws IOException, ClassNotFoundException;
}

LocalRpcInvocation為它的簡單實現(xiàn),沒有添加其他邏輯炬太,這里不再分析代碼灸蟆。

需要重點關(guān)注的是RemoteRpcInvocationRemoteRpcInvocation是支持序列化的亲族,這樣才能夠?qū)⑦@個message發(fā)送給遠(yuǎn)程actor炒考。RemoteRpcInvocation的信息載體為內(nèi)部類MethodInvocation可缚。同樣具有Method反射調(diào)用三要素

private String methodName;
private Class<?>[] parameterTypes;
private Object[] args;

我們接著分析下它是如何把這三個參數(shù)序列化和反序列化的。

private void writeObject(ObjectOutputStream oos) throws IOException {
    // 首先寫入方法名
    oos.writeUTF(methodName);

    // 寫入?yún)?shù)個數(shù)
    oos.writeInt(parameterTypes.length);

    // 逐個寫入?yún)?shù)類型
    for (Class<?> parameterType : parameterTypes) {
        oos.writeObject(parameterType);
    }

    if (args != null) {
        // 如果有參數(shù)列表斋枢,先寫入true
        oos.writeBoolean(true);

        // 逐個寫入?yún)?shù)對象
        for (int i = 0; i < args.length; i++) {
            try {
                oos.writeObject(args[i]);
            } catch (IOException e) {
                throw new IOException(
                    "Could not serialize "
                    + i
                    + "th argument of method "
                    + methodName
                    + ". This indicates that the argument type "
                    + args.getClass().getName()
                    + " is not serializable. Arguments have to "
                    + "be serializable for remote rpc calls.",
                    e);
            }
        }
    } else {
        // 如果沒有參數(shù)列表帘靡,寫入false
        oos.writeBoolean(false);
    }
}

// 和序列化的邏輯相對應(yīng)
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
    // 讀取方法名
    methodName = ois.readUTF();

    // 讀取參數(shù)列表長度
    int length = ois.readInt();

    parameterTypes = new Class<?>[length];

    // 讀取參數(shù)類型
    for (int i = 0; i < length; i++) {
        try {
            parameterTypes[i] = (Class<?>) ois.readObject();
        } catch (IOException e) {
            StringBuilder incompleteMethod = getIncompleteMethodString(i, 0);
            throw new IOException(
                "Could not deserialize "
                + i
                + "th parameter type of method "
                + incompleteMethod
                + '.',
                e);
        } catch (ClassNotFoundException e) {
            // note: wrapping this CNFE into another CNFE does not overwrite the Exception
            //       stored in the ObjectInputStream (see ObjectInputStream#readSerialData)
            // -> add a suppressed exception that adds a more specific message
            StringBuilder incompleteMethod = getIncompleteMethodString(i, 0);
            e.addSuppressed(
                new ClassNotFoundException(
                    "Could not deserialize "
                    + i
                    + "th "
                    + "parameter type of method "
                    + incompleteMethod
                    + ". This indicates that the parameter "
                    + "type is not part of the system class loader."));
            throw e;
        }
    }

    // 讀取boolean判斷調(diào)用時候是否傳入了參數(shù)
    boolean hasArgs = ois.readBoolean();

    if (hasArgs) {
        args = new Object[length];

        // 如果有參數(shù),逐個讀取
        for (int i = 0; i < length; i++) {
            try {
                args[i] = ois.readObject();
            } catch (IOException e) {
                StringBuilder incompleteMethod = getIncompleteMethodString(length, i);
                throw new IOException(
                    "Could not deserialize "
                    + i
                    + "th argument of method "
                    + incompleteMethod
                    + '.',
                    e);
            } catch (ClassNotFoundException e) {
                // note: wrapping this CNFE into another CNFE does not overwrite the
                // Exception
                //       stored in the ObjectInputStream (see
                // ObjectInputStream#readSerialData)
                // -> add a suppressed exception that adds a more specific message
                StringBuilder incompleteMethod = getIncompleteMethodString(length, i);
                e.addSuppressed(
                    new ClassNotFoundException(
                        "Could not deserialize "
                        + i
                        + "th "
                        + "argument of method "
                        + incompleteMethod
                        + ". This indicates that the argument "
                        + "type is not part of the system class loader."));
                throw e;
            }
        }
    } else {
        // 如果無參數(shù)杏慰,args賦值為null
        args = null;
    }
}

發(fā)送RemoteRpcInvocation給遠(yuǎn)端的時候测柠,實際包含的內(nèi)容是serializedMethodInvocation炼鞠。它包含的是字節(jié)數(shù)組類型缘滥,在創(chuàng)建serializedMethodInvocation的時候,會使用InstantiationUtil.deserializeObjectMethodInvocation序列化谒主。

// Serialized invocation data
private SerializedValue<RemoteRpcInvocation.MethodInvocation> serializedMethodInvocation;

// Transient field which is lazily initialized upon first access to the invocation data
private transient RemoteRpcInvocation.MethodInvocation methodInvocation;

AkkaRpcActor

這一章我們開始分析actor朝扼。如上面所述,在Flink中所有的AkkaRpcActor都是SupervisouActor的子actor霎肯,AkkaRpcActorRpcServicestartServer方法創(chuàng)建出擎颖。

還有一種FencedAkkaRpcActor,它為了避免腦裂而設(shè)計观游。使用FencingToken來標(biāo)記可接受哪個發(fā)送者的消息搂捧。比如說JobManager配置了HA,存在兩個JobManager:JobManager1和JobManager2懂缕。一開始JobManager1為leader狀態(tài)允跑,FencingToken被設(shè)置為JobManager1的ID,遠(yuǎn)端都只接收J(rèn)obManager1發(fā)來的消息搪柑。突然JobManager1崩潰聋丝,JobManager2獲得了leader,設(shè)置FencingToken為JobManager2的ID工碾,這時候遠(yuǎn)端都只接收J(rèn)obManager2發(fā)來的消息弱睦。然而,又過了一會兒渊额,JobManager1恢復(fù)况木,在它得知自己失去leader狀態(tài)之前,仍會發(fā)送消息旬迹,因為FencingToken已經(jīng)更改焦读,校驗失敗,遠(yuǎn)端拒絕接受JobManager1發(fā)來的消息舱权,從而避免了腦裂問題矗晃。

除了Fencing機制外,FencedAkkaRpcActorAkkaRpcActor的邏輯相同宴倍。下面的分析都以AkkaRpcActor為準(zhǔn)张症。

接受RPC調(diào)用

AkkaRpcActor接收message的處理方法如下:

@Override
public Receive createReceive() {
    return ReceiveBuilder.create()
        .match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
        .match(ControlMessages.class, this::handleControlMessage)
        .matchAny(this::handleMessage)
        .build();
}

這個方法很容易理解仓技,如果Message類型為RemoteHandshakeMessage,調(diào)用handleHandshakeMessage方法處理(建立連接時的握手信息)俗他,如果類型為ControlMessages脖捻,使用handleControlMessage處理(改變actor的啟動停止?fàn)顟B(tài)等),其他的message類型都使用handleMessage方法處理兆衅。接下來我們重點關(guān)注處理消息的handleMessage方法地沮。

private void handleMessage(final Object message) {
    if (state.isRunning()) {
        // 確保handleRpcMessage方法沒有被多線程調(diào)用
        mainThreadValidator.enterMainThread();

        try {
            // 處理Rpc消息,下面分析
            handleRpcMessage(message);
        } finally {
            mainThreadValidator.exitMainThread();
        }
    } else {
        log.info(
            "The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.",
            rpcEndpoint.getClass().getName(),
            message.getClass().getName());

        sendErrorIfSender(
            new AkkaRpcException(
                String.format(
                    "Discard message, because the rpc endpoint %s has not been started yet.",
                    rpcEndpoint.getAddress())));
    }
}

handleRpcMessage

handleRpcMessage方法內(nèi)容如下所示:

protected void handleRpcMessage(Object message) {
    if (message instanceof RunAsync) {
        handleRunAsync((RunAsync) message);
    } else if (message instanceof CallAsync) {
        handleCallAsync((CallAsync) message);
    } else if (message instanceof RpcInvocation) {
        handleRpcInvocation((RpcInvocation) message);
    } else {
        log.warn(
            "Received message of unknown type {} with value {}. Dropping this message!",
            message.getClass().getName(),
            message);

        sendErrorIfSender(
            new AkkaUnknownMessageException(
                "Received unknown message "
                + message
                + " of type "
                + message.getClass().getSimpleName()
                + '.'));
    }
}

這個方法中進一步根據(jù)message的類型拆分邏輯羡亩,調(diào)用相應(yīng)的處理方法摩疑。message可能為RunAsync(異步無返回值調(diào)用),CallAsync(異步有返回值調(diào)用)和RpcInvocation(RPC方法調(diào)用)畏铆。我們重點關(guān)注的是RPC方法調(diào)用雷袋。

handleRpcInvocation

private void handleRpcInvocation(RpcInvocation rpcInvocation) {
    Method rpcMethod = null;

    try {
        // 獲取方法名和參數(shù)列表
        String methodName = rpcInvocation.getMethodName();
        Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();

        // 從rpcEndpoint中查找匹配的方法
        // 這里的RpcEndpoint為T類型,并非RpcEndpoint這個類
        rpcMethod = lookupRpcMethod(methodName, parameterTypes);
        // 如果出錯辞居,將錯誤信息回送給調(diào)用者
    } catch (ClassNotFoundException e) {
        log.error("Could not load method arguments.", e);

        RpcConnectionException rpcException =
            new RpcConnectionException("Could not load method arguments.", e);
        getSender().tell(new Status.Failure(rpcException), getSelf());
    } catch (IOException e) {
        log.error("Could not deserialize rpc invocation message.", e);

        RpcConnectionException rpcException =
            new RpcConnectionException("Could not deserialize rpc invocation message.", e);
        getSender().tell(new Status.Failure(rpcException), getSelf());
    } catch (final NoSuchMethodException e) {
        log.error("Could not find rpc method for rpc invocation.", e);

        RpcConnectionException rpcException =
            new RpcConnectionException("Could not find rpc method for rpc invocation.", e);
        getSender().tell(new Status.Failure(rpcException), getSelf());
    }

    if (rpcMethod != null) {
        try {
            // this supports declaration of anonymous classes
            // 設(shè)置方法可以被訪問
            rpcMethod.setAccessible(true);

            // 如果方法沒有返回值楷怒,直接調(diào)用
            if (rpcMethod.getReturnType().equals(Void.TYPE)) {
                // No return value to send back
                rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
            } else {
                // 如果有返回值,調(diào)用后將返回值獲取
                final Object result;
                try {
                    result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
                } catch (InvocationTargetException e) {
                    log.debug(
                        "Reporting back error thrown in remote procedure {}", rpcMethod, e);

                    // tell the sender about the failure
                    getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
                    return;
                }

                final String methodName = rpcMethod.getName();

                // 判斷返回值是不是CompletableFuture瓦灶,如果是的話說明結(jié)果可以異步返回
                if (result instanceof CompletableFuture) {
                    final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;
                    sendAsyncResponse(responseFuture, methodName);
                } else {
                    // 否則同步返回調(diào)用結(jié)果
                    sendSyncResponse(result, methodName);
                }
            }
        } catch (Throwable e) {
            log.error("Error while executing remote procedure call {}.", rpcMethod, e);
            // tell the sender about the failure
            getSender().tell(new Status.Failure(e), getSelf());
        }
    }
}

sendAsyncResponse方法異步返回調(diào)用結(jié)果鸠删,內(nèi)容如下所示:

private void sendAsyncResponse(CompletableFuture<?> asyncResponse, String methodName) {
    final ActorRef sender = getSender();
    Promise.DefaultPromise<Object> promise = new Promise.DefaultPromise<>();

    FutureUtils.assertNoException(
        asyncResponse.handle(
            (value, throwable) -> {
                // 檢查異步結(jié)果是否有異常
                if (throwable != null) {
                    promise.failure(throwable);
                } else {
                    if (isRemoteSender(sender)) {
                        // 如果是遠(yuǎn)端發(fā)送的調(diào)用
                        // 將返回結(jié)果序列化
                        Either<AkkaRpcSerializedValue, AkkaRpcException>
                            serializedResult =
                            serializeRemoteResultAndVerifySize(
                            value, methodName);

                        // 如果序列化成功,調(diào)用 promise.success贼陶,否則調(diào)用failure
                        if (serializedResult.isLeft()) {
                            promise.success(serializedResult.left());
                        } else {
                            promise.failure(serializedResult.right());
                        }
                    } else {
                        // 如果是本地調(diào)用刃泡,不需序列化
                        promise.success(new Status.Success(value));
                    }
                }

                // consume the provided throwable
                return null;
            }));

    // 發(fā)送future到調(diào)用端
    Patterns.pipe(promise.future(), getContext().dispatcher()).to(sender);
}

sendSyncResponse方法為同步返回結(jié)果。內(nèi)容和上面類似每界,只是不用在返回future給調(diào)用端捅僵。

private void sendSyncResponse(Object response, String methodName) {
    if (isRemoteSender(getSender())) {
        Either<AkkaRpcSerializedValue, AkkaRpcException> serializedResult =
            serializeRemoteResultAndVerifySize(response, methodName);

        if (serializedResult.isLeft()) {
            getSender().tell(new Status.Success(serializedResult.left()), getSelf());
        } else {
            getSender().tell(new Status.Failure(serializedResult.right()), getSelf());
        }
    } else {
        getSender().tell(new Status.Success(response), getSelf());
    }
}

RpcEndpoint

最后我們分析到了RpcEndpoint。它是提供RPC服務(wù)的被調(diào)用端眨层。需要接收遠(yuǎn)程調(diào)用的類例如JobMaster庙楚,TaskExecutor,Dispatcher等都需要繼承這個RpcEndpoint趴樱。

除此之外馒闷,RpcEndpoint還可以在Actor主線程池定時執(zhí)行Runnable。安排定時任務(wù)的方法如下:

protected void scheduleRunAsync(Runnable runnable, Time delay) {
    scheduleRunAsync(runnable, delay.getSize(), delay.getUnit());
}

protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
    rpcServer.scheduleRunAsync(runnable, unit.toMillis(delay));
}

rpcServerscheduleRunAsync方法位于AkkaInvocationHandler中:

@Override
public void scheduleRunAsync(Runnable runnable, long delayMillis) {
    checkNotNull(runnable, "runnable");
    checkArgument(delayMillis >= 0, "delay must be zero or greater");

    if (isLocal) {
        long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);
        tell(new RunAsync(runnable, atTimeNanos));
    } else {
        throw new RuntimeException(
            "Trying to send a Runnable to a remote actor at "
            + rpcEndpoint.path()
            + ". This is not supported.");
    }
}

最后是tell方法叁征,把message發(fā)送給了rpcEndpoint對應(yīng)的actor纳账。

protected void tell(Object message) {
    rpcEndpoint.tell(message, ActorRef.noSender());
}

Actor處理RunAsync的方法為handleRunAsync。如下所示:

private void handleRunAsync(RunAsync runAsync) {
    final long timeToRun = runAsync.getTimeNanos();
    final long delayNanos;

    // 如果沒配置運行時刻捺疼,或者是運行時刻已過
    if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime()) <= 0) {
        // run immediately
        // 需要立刻執(zhí)行
        try {
            runAsync.getRunnable().run();
        } catch (Throwable t) {
            log.error("Caught exception while executing runnable in main thread.", t);
            ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
        }
    } else {
        // schedule for later. send a new message after the delay, which will then be
        // immediately executed
        // 否則需要延遲執(zhí)行
        FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
        RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);

        final Object envelopedSelfMessage = envelopeSelfMessage(message);

        // 使用ActorSystem的scheduler安排一個延時執(zhí)行任務(wù)
        getContext()
            .system()
            .scheduler()
            .scheduleOnce(
            delay,
            getSelf(),
            envelopedSelfMessage,
            getContext().dispatcher(),
            ActorRef.noSender());
    }
}

本博客為作者原創(chuàng)疏虫,歡迎大家參與討論和批評指正。如需轉(zhuǎn)載請注明出處。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末卧秘,一起剝皮案震驚了整個濱河市呢袱,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌翅敌,老刑警劉巖羞福,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蚯涮,居然都是意外死亡治专,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進店門遭顶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來张峰,“玉大人,你說我怎么就攤上這事液肌⌒妫” “怎么了鸥滨?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵嗦哆,是天一觀的道長。 經(jīng)常有香客問我婿滓,道長老速,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任凸主,我火速辦了婚禮橘券,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘卿吐。我一直安慰自己旁舰,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布嗡官。 她就那樣靜靜地躺著箭窜,像睡著了一般痊剖。 火紅的嫁衣襯著肌膚如雪毒坛。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天磅网,我揣著相機與錄音婆咸,去河邊找鬼竹捉。 笑死,一個胖子當(dāng)著我的面吹牛尚骄,可吹牛的內(nèi)容都是我干的块差。 我是一名探鬼主播,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼憨闰!你這毒婦竟也來了询兴?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤起趾,失蹤者是張志新(化名)和其女友劉穎诗舰,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體训裆,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡眶根,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了边琉。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片属百。...
    茶點故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖变姨,靈堂內(nèi)的尸體忽然破棺而出族扰,到底是詐尸還是另有隱情,我是刑警寧澤定欧,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布渔呵,位于F島的核電站,受9級特大地震影響砍鸠,放射性物質(zhì)發(fā)生泄漏扩氢。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一爷辱、第九天 我趴在偏房一處隱蔽的房頂上張望录豺。 院中可真熱鬧,春花似錦饭弓、人聲如沸双饥。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽咏花。三九已至夫嗓,卻和暖如春迟螺,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背舍咖。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工矩父, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人排霉。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓窍株,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子球订,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容