本文目的:
針對(duì)main()方法在ApplicationClusterEntryPoint入口類(lèi)中執(zhí)行,從源碼角度解析
首先和其他集群比對(duì)
例如StandaloneSessionClusterEntryPoint模式:main()方法是在客戶端執(zhí)行的。當(dāng)我們通過(guò)如下命令提交任務(wù)時(shí):
$ ./bin/flink run examples/streaming/WordCount.jar
執(zhí)行flink命令匕得,參數(shù)是run败砂,將最終調(diào)用CliFrontend.java類(lèi)的main()方法:核心邏輯如下
會(huì)通過(guò)解析將用戶程序生成PackagedProgram
類(lèi)型的對(duì)象,PackageProgram
類(lèi)型的對(duì)象主要封裝如下信息:
/**
* Executions the run action.
*
* @param args Command line arguments for the run action.
*/
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
final List<URL> jobJars = getJobJarAndDependencies(programOptions);
final Configuration effectiveConfiguration =
getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
/**
* han_pf
* 執(zhí)行用戶程序梭依,通過(guò)反射執(zhí)行提交Job的main()方法匆背,將用戶程序轉(zhuǎn)換成StreamGraph蒜危,并生成JobGraph提交到集群虱痕。
*/
executeProgram(effectiveConfiguration, program);
}
try {
/**
* han_pf
* 通過(guò)反射調(diào)用提交job的main()方法。
*/
program.invokeInteractiveModeForExecution();
} finally {
ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();
}
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
Yarn管理下的Application模式
而對(duì)于Yarn管理下的Application模式辐赞,客戶端只是進(jìn)行jar包上傳部翘,以上executeProgram()
將在集群側(cè)執(zhí)行(準(zhǔn)確來(lái)說(shuō)是Dispatcher啟動(dòng)過(guò)程中執(zhí)行),分析如下:
首先响委,提交作業(yè)啟動(dòng)集群(yarn和K8s才支持如下命令新思,Standalone集群需要直接提交作業(yè)到JM上)
$ ./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
protected void runApplication(String[] args) throws Exception {
LOG.info("Running 'run-application' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRunApplication(customCommandLines);
return;
}
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader);
final ProgramOptions programOptions;
final Configuration effectiveConfiguration;
。赘风。夹囚。。
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
//調(diào)用ApplicationClusterDeployer.run()方法部署程序到集群中
deployer.run(effectiveConfiguration, applicationConfiguration);
}
ApplicationClusterDeployer:
public <ClusterID> void run(
final Configuration configuration,
final ApplicationConfiguration applicationConfiguration)
throws Exception {
checkNotNull(configuration);
checkNotNull(applicationConfiguration);
LOG.info("Submitting application in 'Application Mode'.");
final ClusterClientFactory<ClusterID> clientFactory =
clientServiceLoader.getClusterClientFactory(configuration);
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clientFactory.createClusterDescriptor(configuration)) {
final ClusterSpecification clusterSpecification =
clientFactory.getClusterSpecification(configuration);
clusterDescriptor.deployApplicationCluster(
clusterSpecification, applicationConfiguration);
}
}
至此邀窃,客戶端并未執(zhí)行StreamGraph和JobGraph的轉(zhuǎn)換荸哟,只是將用戶程序的JAR上傳。
Standalone管理下的Application模式
- 第一步準(zhǔn)備jar并提交作業(yè)至JobManager
$ ./bin/standalone-job.sh start --job-classname org.apache.flink.streaming.examples.windowing.WordCount
- 第二步啟動(dòng)TM
$ ./bin/taskmanager.sh start
standalone-job.sh:
USAGE="Usage: standalone-job.sh ((start|start-foreground))|stop [args]"
STARTSTOP=$1
ENTRY_POINT_NAME="standalonejob"
if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]]; then
echo $USAGE
exit 1
fi
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
# Startup parameters
ARGS=("${@:2}")
if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
# Add cluster entry point specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
parseJmArgsAndExportLogs "${ARGS[@]}"
if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}")
fi
fi
ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}")
if [[ $STARTSTOP == "start-foreground" ]]; then
exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}"
else
"${FLINK_BIN_DIR}"/flink-daemon.sh ${STARTSTOP} ${ENTRY_POINT_NAME} "${ARGS[@]}"
fi
最終調(diào)用flink-daemon.sh 并傳standalonejob參數(shù):
flink-daemon.sh:
(standalonejob)
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
StandaloneApplicationClusterEntryPoint分析如下:
standaloneApplicationClusterEntryPoint:
public static void main(String[] args) {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(
LOG, StandaloneApplicationClusterEntryPoint.class.getSimpleName(), args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
final StandaloneApplicationClusterConfiguration clusterConfiguration =
ClusterEntrypointUtils.parseParametersOrExit(
args,
new StandaloneApplicationClusterConfigurationParserFactory(),
StandaloneApplicationClusterEntryPoint.class);
Configuration configuration = loadConfigurationFromClusterConfig(clusterConfiguration);
/***************************/
PackagedProgram program = null;
try {
/**
* han_pf
* session模式的客戶端會(huì)生成這個(gè)對(duì)象
*/
program = getPackagedProgram(clusterConfiguration, configuration);
} catch (Exception e) {
LOG.error("Could not create application program.", e);
System.exit(1);
}
try {
configureExecution(configuration, program);
} catch (Exception e) {
LOG.error("Could not apply application configuration.", e);
System.exit(1);
}
/***************************/
StandaloneApplicationClusterEntryPoint entrypoint =
new StandaloneApplicationClusterEntryPoint(configuration, program);
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}
如上源碼分析流程圖,在ClusterEntrypoint.runClusterEntrypoint()方法執(zhí)行前并未調(diào)用用戶程序的main()方法執(zhí)行鞍历,最終調(diào)用main()方法執(zhí)行的是在啟動(dòng)JobManager的Dispatcher組件過(guò)程中調(diào)用的舵抹。
在創(chuàng)建DefaultDispatcherRunner
對(duì)象后執(zhí)行l(wèi)eader選舉,選舉成功最終回調(diào)DefaultDispatcherRunner
的grantLeadership()方法:
@Override
public void grantLeadership(UUID leaderSessionID) {
runActionIfRunning(
() -> {
LOG.info(
"{} was granted leadership with leader id {}. Creating new {}.",
getClass().getSimpleName(),
leaderSessionID,
DispatcherLeaderProcess.class.getSimpleName());
/**
* han_pf
*啟動(dòng)dispatcher
*/
startNewDispatcherLeaderProcess(leaderSessionID);
});
}
startNewDispatcherLeaderProcess()繼續(xù)調(diào)用AbstractDispatcherLeaderProcess
的onStart()方法:
public final void start() {
runIfStateIs(State.CREATED, this::startInternal);
}
private void startInternal() {
log.info("Start {}.", getClass().getSimpleName());
state = State.RUNNING;
/**
* han_pf
*執(zhí)行實(shí)現(xiàn)類(lèi)的onStart方法,實(shí)現(xiàn)類(lèi)有兩個(gè)SessionDispatcherLeaderProcess和JobDispatcherLeaderProcess,
* 此時(shí)是session模式劣砍,所以看SessionDispatcherLeaderProcess
*/
onStart();
}
Application模式下調(diào)用JobDispatcherLeaderProcess
類(lèi)的onStart()方法:
protected void onStart() {
final DispatcherGatewayService dispatcherService =
/**
* han_pf
* Application模式走不同分支,ApplicationDispatcherGatewayServiceFactory,DefaultDispatcherGatewayServiceFactory
*/
dispatcherGatewayServiceFactory.create(
DispatcherId.fromUuid(getLeaderSessionId()),
Collections.singleton(jobGraph),
ThrowingJobGraphWriter.INSTANCE);
completeDispatcherSetup(dispatcherService);
}
ApplicationDispatcherGatewayServiceFactory:
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
JobGraphWriter jobGraphWriter) {
final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);
final Dispatcher dispatcher;
try {
dispatcher =
dispatcherFactory.createDispatcher(
rpcService,
fencingToken,
recoveredJobs,
/**
* han_pf
* application模式main方法的執(zhí)行入口
*/
(dispatcherGateway, scheduledExecutor, errorHandler) -> new ApplicationDispatcherBootstrap(application, recoveredJobIds,configuration,dispatcherGateway, scheduledExecutor,errorHandler)
,
PartialDispatcherServicesWithJobGraphStore.from(
partialDispatcherServices, jobGraphWriter));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
dispatcher.start();
return DefaultDispatcherGatewayService.from(dispatcher);
}
調(diào)用new ApplicationDispatcherBootstrap
()創(chuàng)建對(duì)象:
public ApplicationDispatcherBootstrap(
final PackagedProgram application,
final Collection<JobID> recoveredJobIds,
final Configuration configuration,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor scheduledExecutor,
final FatalErrorHandler errorHandler) {
this.configuration = checkNotNull(configuration);
this.recoveredJobIds = checkNotNull(recoveredJobIds);
this.application = checkNotNull(application);
this.errorHandler = checkNotNull(errorHandler);
/**
* han_pf
* 執(zhí)行用戶程序
*/
this.applicationCompletionFuture =
fixJobIdAndRunApplicationAsync(dispatcherGateway, scheduledExecutor);
this.clusterShutdownFuture = runApplicationAndShutdownClusterAsync(dispatcherGateway);
}
private void runApplicationEntryPoint(
final CompletableFuture<List<JobID>> jobIdsFuture,
final Set<JobID> tolerateMissingResult,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor scheduledExecutor,
final boolean enforceSingleJobExecution) {
try {
final List<JobID> applicationJobIds = new ArrayList<>(recoveredJobIds);
final PipelineExecutorServiceLoader executorServiceLoader =
new EmbeddedExecutorServiceLoader(
applicationJobIds, dispatcherGateway, scheduledExecutor);
/**
* han_pf
* 跟客戶端CliFrontend.executeProgram調(diào)用同一個(gè)方法惧蛹。
*/
ClientUtils.executeProgram(
executorServiceLoader,
configuration,
application,
enforceSingleJobExecution,
true /* suppress sysout */);
if (applicationJobIds.isEmpty()) {
jobIdsFuture.completeExceptionally(
new ApplicationExecutionException(
"The application contains no execute() calls."));
} else {
jobIdsFuture.complete(applicationJobIds);
}
} catch (Throwable t) {
}
}
至此在集群側(cè)將執(zhí)行用戶程序main()
方法進(jìn)行StreamGraph及JobGraph的轉(zhuǎn)換。