Flink-application運(yùn)行模式詳解

image.png

本文目的:

針對(duì)main()方法在ApplicationClusterEntryPoint入口類(lèi)中執(zhí)行,從源碼角度解析

首先和其他集群比對(duì)

例如StandaloneSessionClusterEntryPoint模式:main()方法是在客戶端執(zhí)行的。當(dāng)我們通過(guò)如下命令提交任務(wù)時(shí):

$ ./bin/flink run examples/streaming/WordCount.jar

執(zhí)行flink命令匕得,參數(shù)是run败砂,將最終調(diào)用CliFrontend.java類(lèi)的main()方法:核心邏輯如下
會(huì)通過(guò)解析將用戶程序生成PackagedProgram類(lèi)型的對(duì)象,PackageProgram類(lèi)型的對(duì)象主要封裝如下信息:

image.png

 /**
     * Executions the run action.
     *
     * @param args Command line arguments for the run action.
     */
    protected void run(String[] args) throws Exception {
        LOG.info("Running 'run' command.");

        final Options commandOptions = CliFrontendParser.getRunCommandOptions();
        final CommandLine commandLine = getCommandLine(commandOptions, args, true);

        // evaluate help flag
        if (commandLine.hasOption(HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForRun(customCommandLines);
            return;
        }

        final CustomCommandLine activeCommandLine =
                validateAndGetActiveCommandLine(checkNotNull(commandLine));

        final ProgramOptions programOptions = ProgramOptions.create(commandLine);

        final List<URL> jobJars = getJobJarAndDependencies(programOptions);

        final Configuration effectiveConfiguration =
                getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

        LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

        try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
            /**
            * han_pf
            * 執(zhí)行用戶程序梭依,通過(guò)反射執(zhí)行提交Job的main()方法匆背,將用戶程序轉(zhuǎn)換成StreamGraph蒜危,并生成JobGraph提交到集群虱痕。
            */
            executeProgram(effectiveConfiguration, program);
        }


try {
                /**
                * han_pf
                * 通過(guò)反射調(diào)用提交job的main()方法。
                */
                program.invokeInteractiveModeForExecution();
            } finally {
                ContextEnvironment.unsetAsContext();
                StreamContextEnvironment.unsetAsContext();
            }
        } finally {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }

    }

Yarn管理下的Application模式

而對(duì)于Yarn管理下的Application模式辐赞,客戶端只是進(jìn)行jar包上傳部翘,以上executeProgram()將在集群側(cè)執(zhí)行(準(zhǔn)確來(lái)說(shuō)是Dispatcher啟動(dòng)過(guò)程中執(zhí)行),分析如下:
首先响委,提交作業(yè)啟動(dòng)集群(yarn和K8s才支持如下命令新思,Standalone集群需要直接提交作業(yè)到JM上)

$ ./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
 protected void runApplication(String[] args) throws Exception {
        LOG.info("Running 'run-application' command.");

        final Options commandOptions = CliFrontendParser.getRunCommandOptions();
        final CommandLine commandLine = getCommandLine(commandOptions, args, true);

        if (commandLine.hasOption(HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForRunApplication(customCommandLines);
            return;
        }

        final CustomCommandLine activeCommandLine =
                validateAndGetActiveCommandLine(checkNotNull(commandLine));

        final ApplicationDeployer deployer =
                new ApplicationClusterDeployer(clusterClientServiceLoader);

        final ProgramOptions programOptions;
        final Configuration effectiveConfiguration;

        。赘风。夹囚。。

        final ApplicationConfiguration applicationConfiguration =
                new ApplicationConfiguration(
                        programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
      //調(diào)用ApplicationClusterDeployer.run()方法部署程序到集群中
        deployer.run(effectiveConfiguration, applicationConfiguration);
    }

ApplicationClusterDeployer:

 public <ClusterID> void run(
            final Configuration configuration,
            final ApplicationConfiguration applicationConfiguration)
            throws Exception {
        checkNotNull(configuration);
        checkNotNull(applicationConfiguration);

        LOG.info("Submitting application in 'Application Mode'.");

        final ClusterClientFactory<ClusterID> clientFactory =
                clientServiceLoader.getClusterClientFactory(configuration);
        try (final ClusterDescriptor<ClusterID> clusterDescriptor =
                clientFactory.createClusterDescriptor(configuration)) {
            final ClusterSpecification clusterSpecification =
                    clientFactory.getClusterSpecification(configuration);

            clusterDescriptor.deployApplicationCluster(
                    clusterSpecification, applicationConfiguration);
        }
    }

至此邀窃,客戶端并未執(zhí)行StreamGraph和JobGraph的轉(zhuǎn)換荸哟,只是將用戶程序的JAR上傳。

Standalone管理下的Application模式

  1. 第一步準(zhǔn)備jar并提交作業(yè)至JobManager
$ ./bin/standalone-job.sh start --job-classname org.apache.flink.streaming.examples.windowing.WordCount

  1. 第二步啟動(dòng)TM
$ ./bin/taskmanager.sh start

standalone-job.sh:

USAGE="Usage: standalone-job.sh ((start|start-foreground))|stop [args]"

STARTSTOP=$1
ENTRY_POINT_NAME="standalonejob"

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]]; then
  echo $USAGE
  exit 1
fi

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

# Startup parameters
ARGS=("${@:2}")

if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
    # Add cluster entry point specific JVM options
    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
    parseJmArgsAndExportLogs "${ARGS[@]}"

    if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
        ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}")
    fi
fi

ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}")

if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}"
else
    "${FLINK_BIN_DIR}"/flink-daemon.sh ${STARTSTOP} ${ENTRY_POINT_NAME} "${ARGS[@]}"
fi

最終調(diào)用flink-daemon.sh 并傳standalonejob參數(shù):
flink-daemon.sh:

(standalonejob)
        CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint

StandaloneApplicationClusterEntryPoint分析如下:

image.png

standaloneApplicationClusterEntryPoint:

public static void main(String[] args) {
       // startup checks and logging
       EnvironmentInformation.logEnvironmentInfo(
               LOG, StandaloneApplicationClusterEntryPoint.class.getSimpleName(), args);
       SignalHandler.register(LOG);
       JvmShutdownSafeguard.installAsShutdownHook(LOG);

       final StandaloneApplicationClusterConfiguration clusterConfiguration =
               ClusterEntrypointUtils.parseParametersOrExit(
                       args,
                       new StandaloneApplicationClusterConfigurationParserFactory(),
                       StandaloneApplicationClusterEntryPoint.class);

       Configuration configuration = loadConfigurationFromClusterConfig(clusterConfiguration);
      /***************************/
       PackagedProgram program = null;
       try {
           /**
           * han_pf
           * session模式的客戶端會(huì)生成這個(gè)對(duì)象
           */
           program = getPackagedProgram(clusterConfiguration, configuration);
       } catch (Exception e) {
           LOG.error("Could not create application program.", e);
           System.exit(1);
       }

       try {
           configureExecution(configuration, program);
       } catch (Exception e) {
           LOG.error("Could not apply application configuration.", e);
           System.exit(1);
       }
       /***************************/

       StandaloneApplicationClusterEntryPoint entrypoint =
               new StandaloneApplicationClusterEntryPoint(configuration, program);

       ClusterEntrypoint.runClusterEntrypoint(entrypoint);
   }

如上源碼分析流程圖,在ClusterEntrypoint.runClusterEntrypoint()方法執(zhí)行前并未調(diào)用用戶程序的main()方法執(zhí)行鞍历,最終調(diào)用main()方法執(zhí)行的是在啟動(dòng)JobManager的Dispatcher組件過(guò)程中調(diào)用的舵抹。
在創(chuàng)建DefaultDispatcherRunner對(duì)象后執(zhí)行l(wèi)eader選舉,選舉成功最終回調(diào)DefaultDispatcherRunner的grantLeadership()方法:

 @Override
    public void grantLeadership(UUID leaderSessionID) {
        runActionIfRunning(
                () -> {
                    LOG.info(
                            "{} was granted leadership with leader id {}. Creating new {}.",
                            getClass().getSimpleName(),
                            leaderSessionID,
                            DispatcherLeaderProcess.class.getSimpleName());
                    /**
                    * han_pf
                    *啟動(dòng)dispatcher
                    */
                    startNewDispatcherLeaderProcess(leaderSessionID);
                });
    }

startNewDispatcherLeaderProcess()繼續(xù)調(diào)用AbstractDispatcherLeaderProcess的onStart()方法:

   public final void start() {
        runIfStateIs(State.CREATED, this::startInternal);
    }

    private void startInternal() {
        log.info("Start {}.", getClass().getSimpleName());
        state = State.RUNNING;
        /**
        * han_pf
        *執(zhí)行實(shí)現(xiàn)類(lèi)的onStart方法,實(shí)現(xiàn)類(lèi)有兩個(gè)SessionDispatcherLeaderProcess和JobDispatcherLeaderProcess,
         * 此時(shí)是session模式劣砍,所以看SessionDispatcherLeaderProcess
        */
        onStart();
    }
image.png

Application模式下調(diào)用JobDispatcherLeaderProcess類(lèi)的onStart()方法:

    protected void onStart() {
        final DispatcherGatewayService dispatcherService =
                /**
                * han_pf
                * Application模式走不同分支,ApplicationDispatcherGatewayServiceFactory,DefaultDispatcherGatewayServiceFactory
                */
                dispatcherGatewayServiceFactory.create(
                        DispatcherId.fromUuid(getLeaderSessionId()),
                        Collections.singleton(jobGraph),
                        ThrowingJobGraphWriter.INSTANCE);

        completeDispatcherSetup(dispatcherService);
    }

ApplicationDispatcherGatewayServiceFactory:

  public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
            DispatcherId fencingToken,
            Collection<JobGraph> recoveredJobs,
            JobGraphWriter jobGraphWriter) {

        final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);

        final Dispatcher dispatcher;
        try {
            dispatcher =
                    dispatcherFactory.createDispatcher(
                            rpcService,
                            fencingToken,
                            recoveredJobs,
                            /**
                            * han_pf
                            * application模式main方法的執(zhí)行入口
                            */
                            (dispatcherGateway, scheduledExecutor, errorHandler) -> new ApplicationDispatcherBootstrap(application, recoveredJobIds,configuration,dispatcherGateway, scheduledExecutor,errorHandler)
                            ,
                            PartialDispatcherServicesWithJobGraphStore.from(
                                    partialDispatcherServices, jobGraphWriter));
        } catch (Exception e) {
            throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
        }

        dispatcher.start();

        return DefaultDispatcherGatewayService.from(dispatcher);
    }

調(diào)用new ApplicationDispatcherBootstrap()創(chuàng)建對(duì)象:

public ApplicationDispatcherBootstrap(
            final PackagedProgram application,
            final Collection<JobID> recoveredJobIds,
            final Configuration configuration,
            final DispatcherGateway dispatcherGateway,
            final ScheduledExecutor scheduledExecutor,
            final FatalErrorHandler errorHandler) {
        this.configuration = checkNotNull(configuration);
        this.recoveredJobIds = checkNotNull(recoveredJobIds);
        this.application = checkNotNull(application);
        this.errorHandler = checkNotNull(errorHandler);
        /**
        * han_pf
        * 執(zhí)行用戶程序
        */
        this.applicationCompletionFuture =
                fixJobIdAndRunApplicationAsync(dispatcherGateway, scheduledExecutor);

        this.clusterShutdownFuture = runApplicationAndShutdownClusterAsync(dispatcherGateway);
    }


private void runApplicationEntryPoint(
            final CompletableFuture<List<JobID>> jobIdsFuture,
            final Set<JobID> tolerateMissingResult,
            final DispatcherGateway dispatcherGateway,
            final ScheduledExecutor scheduledExecutor,
            final boolean enforceSingleJobExecution) {
        try {
            final List<JobID> applicationJobIds = new ArrayList<>(recoveredJobIds);

            final PipelineExecutorServiceLoader executorServiceLoader =
                    new EmbeddedExecutorServiceLoader(
                            applicationJobIds, dispatcherGateway, scheduledExecutor);
            /**
            * han_pf
            * 跟客戶端CliFrontend.executeProgram調(diào)用同一個(gè)方法惧蛹。
            */
            ClientUtils.executeProgram(
                    executorServiceLoader,
                    configuration,
                    application,
                    enforceSingleJobExecution,
                    true /* suppress sysout */);

            if (applicationJobIds.isEmpty()) {
                jobIdsFuture.completeExceptionally(
                        new ApplicationExecutionException(
                                "The application contains no execute() calls."));
            } else {
                jobIdsFuture.complete(applicationJobIds);
            }
        } catch (Throwable t) {
            
        }
    }

至此在集群側(cè)將執(zhí)行用戶程序main()方法進(jìn)行StreamGraph及JobGraph的轉(zhuǎn)換。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末刑枝,一起剝皮案震驚了整個(gè)濱河市赊淑,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌仅讽,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件钾挟,死亡現(xiàn)場(chǎng)離奇詭異洁灵,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)掺出,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)徽千,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人汤锨,你說(shuō)我怎么就攤上這事双抽。” “怎么了闲礼?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,762評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵牍汹,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我柬泽,道長(zhǎng)慎菲,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,273評(píng)論 1 279
  • 正文 為了忘掉前任锨并,我火速辦了婚禮露该,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘第煮。我一直安慰自己解幼,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,289評(píng)論 5 373
  • 文/花漫 我一把揭開(kāi)白布包警。 她就那樣靜靜地躺著撵摆,像睡著了一般。 火紅的嫁衣襯著肌膚如雪揽趾。 梳的紋絲不亂的頭發(fā)上台汇,一...
    開(kāi)封第一講書(shū)人閱讀 49,046評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音,去河邊找鬼苟呐。 笑死痒芝,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的牵素。 我是一名探鬼主播严衬,決...
    沈念sama閱讀 38,351評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼笆呆!你這毒婦竟也來(lái)了请琳?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 36,988評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤赠幕,失蹤者是張志新(化名)和其女友劉穎俄精,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體榕堰,經(jīng)...
    沈念sama閱讀 43,476評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡竖慧,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,948評(píng)論 2 324
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了逆屡。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片圾旨。...
    茶點(diǎn)故事閱讀 38,064評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖魏蔗,靈堂內(nèi)的尸體忽然破棺而出砍的,到底是詐尸還是另有隱情,我是刑警寧澤莺治,帶...
    沈念sama閱讀 33,712評(píng)論 4 323
  • 正文 年R本政府宣布廓鞠,位于F島的核電站,受9級(jí)特大地震影響谣旁,放射性物質(zhì)發(fā)生泄漏诫惭。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,261評(píng)論 3 307
  • 文/蒙蒙 一蔓挖、第九天 我趴在偏房一處隱蔽的房頂上張望夕土。 院中可真熱鬧,春花似錦瘟判、人聲如沸怨绣。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,264評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)篮撑。三九已至,卻和暖如春匆瓜,著一層夾襖步出監(jiān)牢的瞬間赢笨,已是汗流浹背未蝌。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,486評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留茧妒,地道東北人萧吠。 一個(gè)月前我還...
    沈念sama閱讀 45,511評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像桐筏,于是被迫代替她去往敵國(guó)和親纸型。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,802評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容