flink將用戶編寫的程序轉(zhuǎn)換為jobGraph進(jìn)行提交。ProgramDeployer線程類負(fù)責(zé)將用戶程序部署到集群中,它根據(jù)ExecutionContext是否包含ClusterId選擇重新啟動集群部署任務(wù)或者將任務(wù)運行在已有的集群中。同時侦鹏,在部署時需要獲取ClusterDescriptor铜跑,也就是集群相關(guān)的配置信息湿右,flink根據(jù)啟動的命令行來解析將任務(wù)部署在yarn集群還是standalone集群荐操。當(dāng)然如果我們不是通過執(zhí)行flink腳本的當(dāng)時提交任務(wù)芜抒,可以根據(jù)部署的方式手動構(gòu)建ClusterDescriptor。我們的任務(wù)部署在yarn上托启,所以使用的是YarnClusterDescriptor宅倒。
創(chuàng)建ClusterDescriptor
private AbstractYarnClusterDescriptor getClusterDescriptor(
Configuration configuration,
YarnConfiguration yarnConfiguration,
String configurationDirectory) {
final YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfiguration);
yarnClient.start();
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
configurationDirectory,
yarnClient,
false);
}
集群啟動入口
private <T> void deployJob(ExecutionContext<T> context, JobGraph jobGraph, Result<T> result) {
// create or retrieve cluster and deploy job
try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
try {
// 創(chuàng)建新集群
if (context.getClusterId() == null) {
deployJobOnNewCluster(clusterDescriptor, jobGraph, result, context.getClassLoader());
}
// 將任務(wù)提交到已有的集群中
else {
deployJobOnExistingCluster(context.getClusterId(), clusterDescriptor, jobGraph, result);
}
} catch (Exception e) {
throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
}
} catch (SqlExecutionException e) {
throw e;
} catch (Exception e) {
throw new SqlExecutionException("Could not locate a cluster.", e);
}
}
創(chuàng)建新的集群
- 通過yarn部署flink集群,并執(zhí)行jobGraph屯耸。
- 根據(jù)執(zhí)行的是query或者update語句拐迁,來選擇是否阻塞等待執(zhí)行結(jié)果。并將結(jié)果存儲到executionResultBucket中疗绣。
private <T> void deployJobOnNewCluster(
ClusterDescriptor<T> clusterDescriptor,
JobGraph jobGraph,
Result<T> result,
ClassLoader classLoader) throws Exception {
ClusterClient<T> clusterClient = null;
try {
// 將Job部署到新集群中线召,detached為false表示任務(wù)結(jié)束需要返回結(jié)果
clusterClient = clusterDescriptor.deployJobCluster(context.getClusterSpec(), jobGraph, false);
// 保存集群id和weburl
result.setClusterInformation(clusterClient.getClusterId(), clusterClient.getWebInterfaceURL());
// sql or update
if (awaitJobResult) {
// we need to hard cast for now
final JobExecutionResult jobResult = ((RestClusterClient<T>) clusterClient)
.requestJobResult(jobGraph.getJobID())
.get()
.toJobExecutionResult(context.getClassLoader()); // throws exception if job fails
executionResultBucket.add(jobResult);
}
} finally {
try {
if (clusterClient != null) {
clusterClient.shutdown();
}
} catch (Exception e) {
// ignore
}
}
}
Yarn應(yīng)用部署
public ClusterClient<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) throws ClusterDeploymentException {
// this is required because the slots are allocated lazily
jobGraph.setAllowQueuedScheduling(true);
try {
return deployInternal(
clusterSpecification , // 集群啟動時的配置
"Flink per-job cluster", //任務(wù)名稱
getYarnJobClusterEntrypoint(), // Appmaster啟動類
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
}
}
通過調(diào)用AbstractYarnClusterDescriptor#deployInternal方法來啟動Flink應(yīng)用程序。其中主要流程有:
- 內(nèi)存相關(guān)配置驗證多矮。
- 對 cutoff ratio參數(shù)檢查缓淹,也就是分配給contains內(nèi)的其他JVM使用的內(nèi)存率(0-1)。
- 對 minCutoff參數(shù)檢查塔逃,minCutoff(默認(rèn)600M)要小于taskManagerMemoryMB讯壶。
- 非堆內(nèi)存大小驗證。非堆內(nèi)存要小于taskManagerMemoryMB-minCutoff-networkReservedMemory湾盗。
private void validateClusterSpecification(ClusterSpecification clusterSpecification) {
// 獲取flink配置的taskManager內(nèi)存大小
final long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB();
// 計算contains中被其他Jvm程序使用的內(nèi)存大小伏蚊,規(guī)則 min(taskManager內(nèi)存*0.25, 600M),比如設(shè)置的tm內(nèi)存1024,分配給其他程序的有600M.
final long cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize);
// 從剩余的內(nèi)存中計算應(yīng)用程序堆內(nèi)存大小
TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration);
}
public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
// M轉(zhuǎn)字節(jié)
final long totalProcessMemory = megabytesToBytes(totalJavaMemorySizeMB);
// 網(wǎng)絡(luò)通信使用的內(nèi)存格粪,64M
final long networkReservedMemory = getReservedNetworkMemory(config, totalProcessMemory);
// 424-64 = 360m
final long heapAndManagedMemory = totalProcessMemory - networkReservedMemory;
// 如果設(shè)置了非堆內(nèi)存參數(shù)躏吊,對非堆內(nèi)存大小進(jìn)行檢查
if (config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
final long managedMemorySize = getManagedMemoryFromHeapAndManaged(config, heapAndManagedMemory);
// 非堆大小進(jìn)行檢查
ConfigurationParserUtils.checkConfigParameter(managedMemorySize < heapAndManagedMemory, managedMemorySize,
TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
"Managed memory size too large for " + (networkReservedMemory >> 20) +
" MB network buffer memory and a total of " + totalJavaMemorySizeMB +
" MB JVM memory");
return bytesToMegabytes(heapAndManagedMemory - managedMemorySize);
}
else {
return bytesToMegabytes(heapAndManagedMemory);
}
}
- flink依賴、申請core驗證帐萎。
- Flink lib颜阐、config相關(guān)路徑檢查
- Yarn配置文件路徑檢查
- 啟動JM時的核心數(shù),要小于所有NM可用的最大核心數(shù)吓肋。
- 啟動TM時的slot個數(shù)凳怨,要小于所有NM可用的最大核心數(shù)
private void isReadyForDeployment(ClusterSpecification clusterSpecification) throws YarnDeploymentException {
if (clusterSpecification.getNumberTaskManagers() <= 0) {
throw new YarnDeploymentException("Taskmanager count must be positive");
}
if (this.flinkJarPath == null) {
throw new YarnDeploymentException("The Flink jar path is null");
}
if (this.configurationDirectory == null) {
throw new YarnDeploymentException("Configuration directory not set");
}
if (this.flinkConfiguration == null) {
throw new YarnDeploymentException("Flink configuration object has not been set");
}
// 判斷要啟動的flink jm核數(shù)是否大于已經(jīng)Yarn允許啟動的最大核數(shù)
// Check if we don't exceed YARN's maximum virtual cores.
// Fetch numYarnMaxVcores from all the RUNNING nodes via yarnClient
final int numYarnMaxVcores; // yarn max vcores
try {
numYarnMaxVcores = yarnClient.getNodeReports(NodeState.RUNNING)
.stream()
.mapToInt(report -> report.getCapability().getVirtualCores())
.max()
.orElse(0);
} catch (Exception e) {
throw new YarnDeploymentException("Couldn't get cluster description, please check on the YarnConfiguration", e);
}
// AM 核心檢查
int configuredAmVcores = flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES);
if (configuredAmVcores > numYarnMaxVcores) {
throw new IllegalConfigurationException(
String.format("The number of requested virtual cores for application master %d" +
" exceeds the maximum number of virtual cores %d available in the Yarn Cluster.",
configuredAmVcores, numYarnMaxVcores));
}
// yarn.containers.vcores slot檢查
int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
// don't configure more than the maximum configured number of vcores
if (configuredVcores > numYarnMaxVcores) {
throw new IllegalConfigurationException(
String.format("The number of requested virtual cores per node %d" +
" exceeds the maximum number of virtual cores %d available in the Yarn Cluster." +
" Please note that the number of virtual cores is set to the number of task slots by default" +
" unless configured in the Flink config with '%s.'",
configuredVcores, numYarnMaxVcores, YarnConfigOptions.VCORES.key()));
}
// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv("HADOOP_CONF_DIR") == null &&
System.getenv("YARN_CONF_DIR") == null) {
LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. " +
"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
"configuration for accessing YARN.");
}
}
- 任務(wù)綁定的隊列是否存在檢查
private void checkYarnQueues(YarnClient yarnClient) {
try {
List<QueueInfo> queues = yarnClient.getAllQueues();
if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session.
boolean queueFound = false;
for (QueueInfo queue : queues) {
if (queue.getQueueName().equals(this.yarnQueue)) {
queueFound = true;
break;
}
}
if (!queueFound) {
String queueNames = "";
for (QueueInfo queue : queues) {
queueNames += queue.getQueueName() + ", ";
}
LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " +
"Available queues: " + queueNames);
}
} else {
LOG.debug("The YARN cluster does not have any queues configured");
}
} catch (Throwable e) {
LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Error details", e);
}
}
}
- 動態(tài)參數(shù)綁定
- 啟動時綁定的動態(tài)參數(shù)填充到flinkConfiguration,例如:-D env.java.opts=-DappName=foobar
// ------------------ Add dynamic properties to local flinkConfiguraton ------
Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
}
- Yarn內(nèi)存資源檢查
- 獲取集群允許調(diào)度的最大是鬼、最小核數(shù)和內(nèi)存<memory:, vCores:>肤舞。
- 根據(jù)YarnClient獲取運行狀態(tài)的節(jié)點,并計算出集群總的剩余內(nèi)存totalFreeMemory均蜜、每個NM剩余內(nèi)存nodeManagersFree李剖、NM中剩余最大內(nèi)存containerLimit。
- JM,TM內(nèi)存下限設(shè)置為運行Yarn調(diào)度的最小內(nèi)存囤耳,并且小于Yarn調(diào)度的最大內(nèi)存篙顺,且要小于containerLimit偶芍,并且申請的總內(nèi)存要小于totalFreeMemory。
- 返回flink運行配置ClusterSpecification德玫。
// ------------------ Check if the YARN ClusterClient has the requested resources --------------
// Create application via yarnClient
final YarnClientApplication yarnApplication = yarnClient.createApplication();
final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
Resource maxRes = appResponse.getMaximumResourceCapability();
// 剩余集群內(nèi)存進(jìn)行統(tǒng)計
final ClusterResourceDescription freeClusterMem;
try {
freeClusterMem = getCurrentFreeClusterResources(yarnClient);
} catch (YarnException | IOException e) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
}
// 調(diào)度小分配的內(nèi)存
final int yarnMinAllocationMB = yarnConfiguration.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
final ClusterSpecification validClusterSpecification;
try {
validClusterSpecification = validateClusterResources(
clusterSpecification,
yarnMinAllocationMB,
maxRes,
freeClusterMem);
} catch (YarnDeploymentException yde) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw yde;
}
protected ClusterSpecification validateClusterResources(
ClusterSpecification clusterSpecification,
int yarnMinAllocationMB,
Resource maximumResourceCapability,
ClusterResourceDescription freeClusterResources) throws YarnDeploymentException {
int taskManagerCount = clusterSpecification.getNumberTaskManagers();
int jobManagerMemoryMb = clusterSpecification.getMasterMemoryMB();
int taskManagerMemoryMb = clusterSpecification.getTaskManagerMemoryMB();
if (jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
+ "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." +
"YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " +
"you requested will start.");
}
// set the memory to minAllocationMB to do the next checks correctly
if (jobManagerMemoryMb < yarnMinAllocationMB) {
jobManagerMemoryMb = yarnMinAllocationMB;
}
if (taskManagerMemoryMb < yarnMinAllocationMB) {
taskManagerMemoryMb = yarnMinAllocationMB;
}
final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
+ "Maximum Memory: " + maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
}
if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
+ "Maximum Memory: " + maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
}
final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
"connecting from the beginning because the resources are currently not available in the cluster. " +
"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
"the resources become available.";
int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
if (freeClusterResources.totalFreeMemory < totalMemoryRequired) {
LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
+ "There are currently only " + freeClusterResources.totalFreeMemory + "MB available." + noteRsc);
}
if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
}
if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
}
// ----------------- check if the requested containers fit into the cluster.
int[] nmFree = Arrays.copyOf(freeClusterResources.nodeManagersFree, freeClusterResources.nodeManagersFree.length);
// first, allocate the jobManager somewhere.
if (!allocateResource(nmFree, jobManagerMemoryMb)) {
LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
Arrays.toString(freeClusterResources.nodeManagersFree) + noteRsc);
}
// allocate TaskManagers
for (int i = 0; i < taskManagerCount; i++) {
if (!allocateResource(nmFree, taskManagerMemoryMb)) {
LOG.warn("There is not enough memory available in the YARN cluster. " +
"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
"NodeManagers available: " + Arrays.toString(freeClusterResources.nodeManagersFree) + "\n" +
"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
"the following NodeManagers are available: " + Arrays.toString(nmFree) + noteRsc);
}
}
return new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMb)
.setTaskManagerMemoryMB(taskManagerMemoryMb)
.setNumberTaskManagers(clusterSpecification.getNumberTaskManagers())
.setSlotsPerTaskManager(clusterSpecification.getSlotsPerTaskManager())
.createClusterSpecification();
}
- 啟動AM
Yarn啟動AppMaster也就是啟動Flink JobManager是最為核心的步驟匪蟀,它主要包含以下步驟:
- 根據(jù)配置文件初始化文件系統(tǒng),在hdfs下創(chuàng)建/user/admin/.flink/application_xx/臨時目錄宰僧,任務(wù)正常停止會刪除材彪。
- 上傳環(huán)境變量配置的文件、命令行指定的文件琴儿、日志文件段化、jobGraph依賴的jar。
- 創(chuàng)建并填充ContainerLaunchContext造成,主要是啟動JM的命令行显熏,入口類為org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint,堆內(nèi)存大小為cutoff后的大小晒屎,如果JM內(nèi)存配置為1024M喘蟆,xmx設(shè)置為424M。
- 創(chuàng)建并填充ApplicationSubmissionContext夷磕,包括ContainerLaunchContext的 LocalResource、啟動時的環(huán)境參數(shù)仔沿。
- 阻塞等待坐桩,直到獲取應(yīng)用成功部署狀態(tài)。
啟動JM的命令行:
"$JAVA_HOME/bin/java -Xmx424m -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+CMSIncrementalMode -XX:+CMSIncrementalPacing org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint 1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"
學(xué)習(xí)點:
tmpConfigurationFile.deleteOnExit(); //JVM退出時刪除臨時文件
// JVM添加停止時的回調(diào)線程
// add a hook to clean up in case deployment fails 部署失敗刪除yarnFilesDir
Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
//部署成功則移除
// since deployment was successful, remove the hook
ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
ApplicationReport report = startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
public ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification) throws Exception {
// ------------------ Initialize the file systems -------------------------
org.apache.flink.core.fs.FileSystem.initialize(
configuration,
PluginUtils.createPluginManagerFromRootFolder(configuration));
// initialize file system
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
final FileSystem fs = FileSystem.get(yarnConfiguration);
final Path homeDir = fs.getHomeDirectory(); // 本地文件拷貝到文件系統(tǒng)的目錄 /user/admin/
// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
fs.getScheme().startsWith("file")) {
LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+ "The Flink YARN client needs to store its files in a distributed file system");
}
//===========ApplicationSubmissionContext
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
for (File file : shipFiles) {
systemShipFiles.add(file.getAbsoluteFile()); // 命令行中綁定的jar文件
}
//上傳conf文件夾下的logback.xml
//check if there is a logback or log4j file
File logbackFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
final boolean hasLogback = logbackFile.exists();
if (hasLogback) {
systemShipFiles.add(logbackFile);
}
//上傳conf文件夾下的log4j.properties
File log4jFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
final boolean hasLog4j = log4jFile.exists();
if (hasLog4j) {
systemShipFiles.add(log4jFile);
if (hasLogback) {
// this means there is already a logback configuration file --> fail
LOG.warn("The configuration directory ('" + configurationDirectory + "') contains both LOG4J and " +
"Logback configuration files. Please delete or rename one of them.");
}
}
// 上傳環(huán)境變量中FLINK_PLUGINS_DIR ,FLINK_LIB_DIR包含的jar
addEnvironmentFoldersToShipFiles(systemShipFiles);
// 啟動ApplicationSubmissionContext
// Set-up ApplicationSubmissionContext for the application
final ApplicationId appId = appContext.getApplicationId();
// ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
// 如果啟動命令沒有加zk相關(guān)參數(shù)封锉,則使用flinkConfig文件
String zkNamespace = getZookeeperNamespace();
// no user specified cli argument for namespace?
if (zkNamespace == null || zkNamespace.isEmpty()) {
// namespace defined in config? else use applicationId as default.
// ha zk下的路徑绵跷,默認(rèn)/default
zkNamespace = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
setZookeeperNamespace(zkNamespace);
}
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
// activate re-execution of failed applications
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
//設(shè)置失敗間隔
activateHighAvailabilitySupport(appContext);
} else {
// set number of application retries to 1 in the default case
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
1));
}
// 上傳用戶程序的jar
final Set<File> userJarFiles = (jobGraph == null)
// not per-job submission
? Collections.emptySet()
// add user code jars from the provided JobGraph
: jobGraph.getUserJars().stream().map(f -> f.toUri()).map(File::new).collect(Collectors.toSet());
// local resource map for Yarn
final Map<String, LocalResource> localResources = new HashMap<>(2 + systemShipFiles.size() + userJarFiles.size());
// list of remote paths (after upload)
final List<Path> paths = new ArrayList<>(2 + systemShipFiles.size() + userJarFiles.size());
// ship list that enables reuse of resources for task manager containers
StringBuilder envShipFileList = new StringBuilder();
// 將要上傳文件的路徑填充到Paths中
// upload and register ship files
List<String> systemClassPaths = uploadAndRegisterFiles(
systemShipFiles,
fs,
homeDir,
appId,
paths,
localResources,
envShipFileList);
final List<String> userClassPaths = uploadAndRegisterFiles(
userJarFiles,
fs,
homeDir,
appId,
paths,
localResources,
envShipFileList);
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
}
// normalize classpath by sorting
Collections.sort(systemClassPaths);
Collections.sort(userClassPaths);
// classpath assembler
StringBuilder classPathBuilder = new StringBuilder();
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
for (String classPath : systemClassPaths) {
classPathBuilder.append(classPath).append(File.pathSeparator);
}
// Setup jar for ApplicationMaster
Path remotePathJar = setupSingleLocalResource(
"flink.jar",
fs,
appId,
flinkJarPath,
localResources,
homeDir,
"");
// set the right configuration values for the TaskManager
configuration.setInteger(
TaskManagerOptions.NUM_TASK_SLOTS,
clusterSpecification.getSlotsPerTaskManager());
configuration.setString(
TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY,
clusterSpecification.getTaskManagerMemoryMB() + "m");
// 本地創(chuàng)建appliction_id--flink-conf.yaml-randonm 配置文件,默認(rèn)創(chuàng)建在命令行啟動路徑下
// Upload the flink configuration
// write out configuration file
File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
//jvm 退出時刪除
tmpConfigurationFile.deleteOnExit();
BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);
//flink-conf.yaml中的內(nèi)容拷貝到臨時文件中
Path remotePathConf = setupSingleLocalResource(
"flink-conf.yaml",
fs,
appId,
new Path(tmpConfigurationFile.getAbsolutePath()),
localResources,
homeDir,
"");
paths.add(remotePathJar);
classPathBuilder.append("flink.jar").append(File.pathSeparator);
paths.add(remotePathConf);
classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
//序列化job graph并上傳成福,local resource Key為job.graph
// write job graph to tmp file and add it to local resource
// TODO: server use user main method to generate job graph
if (jobGraph != null) {
try {
File fp = File.createTempFile(appId.toString(), null);
fp.deleteOnExit();
try (FileOutputStream output = new FileOutputStream(fp);
ObjectOutputStream obOutput = new ObjectOutputStream(output);){
obOutput.writeObject(jobGraph);
}
final String jobGraphFilename = "job.graph";
flinkConfiguration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);
Path pathFromYarnURL = setupSingleLocalResource(
jobGraphFilename,
fs,
appId,
new Path(fp.toURI()),
localResources,
homeDir,
"");
paths.add(pathFromYarnURL);
classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
} catch (Exception e) {
LOG.warn("Add job graph to local resource fail");
throw e;
}
}
final Path yarnFilesDir = getYarnFilesDir(appId);
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
fs.setPermission(yarnFilesDir, permission); // set permission for path.
//To support Yarn Secure Integration Test Scenario
//In Integration test setup, the Yarn containers created by YarnMiniCluster does not have the Yarn site XML
//and KRB5 configuration files. We are adding these files as container local resources for the container
//applications (JM/TMs) to have proper secure cluster setup
Path remoteKrb5Path = null;
Path remoteYarnSiteXmlPath = null;
boolean hasKrb5 = false;
if (System.getenv("IN_TESTS") != null) {
File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
Path yarnSitePath = new Path(f.getAbsolutePath());
remoteYarnSiteXmlPath = setupSingleLocalResource(
Utils.YARN_SITE_FILE_NAME,
fs,
appId,
yarnSitePath,
localResources,
homeDir,
"");
String krb5Config = System.getProperty("java.security.krb5.conf");
if (krb5Config != null && krb5Config.length() != 0) {
File krb5 = new File(krb5Config);
LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath());
Path krb5ConfPath = new Path(krb5.getAbsolutePath());
remoteKrb5Path = setupSingleLocalResource(
Utils.KRB5_FILE_NAME,
fs,
appId,
krb5ConfPath,
localResources,
homeDir,
"");
hasKrb5 = true;
}
}
// setup security tokens
Path remotePathKeytab = null;
String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if (keytab != null) {
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
remotePathKeytab = setupSingleLocalResource(
Utils.KEYTAB_FILE_NAME,
fs,
appId,
new Path(keytab),
localResources,
homeDir,
"");
}
// 創(chuàng)建amContainer 并填充啟動am的java命令, jm1024----->1024-600 =xmx424
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
yarnClusterEntrypoint,
hasLogback,
hasLog4j,
hasKrb5,
clusterSpecification.getMasterMemoryMB());
if (UserGroupInformation.isSecurityEnabled()) {
// set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container..");
Utils.setTokensFor(amContainer, paths, yarnConfiguration);
}
// 為Yarn啟動綁定環(huán)境參數(shù)
amContainer.setLocalResources(localResources);
fs.close();
// 綁定啟動的classpath
// Setup CLASSPATH and environment variables for ApplicationMaster
final Map<String, String> appMasterEnv = new HashMap<>();
// set user specified app master environment variables 加載到環(huán)境變量中的參數(shù)的前綴
appMasterEnv.putAll(Utils.getEnvironmentVariables(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
// set Flink app class path
appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());
// set Flink on YARN internal configuration values
appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(clusterSpecification.getNumberTaskManagers()));
appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(clusterSpecification.getTaskManagerMemoryMB()));
appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString());
appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, homeDir.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(clusterSpecification.getSlotsPerTaskManager()));
appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES, yarnFilesDir.toUri().toString());
// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
if (remotePathKeytab != null) {
appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString());
String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
}
//To support Yarn Secure Integration Test Scenario
if (remoteYarnSiteXmlPath != null) {
appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
}
if (remoteKrb5Path != null) {
appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
}
if (dynamicPropertiesEncoded != null) {
appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
}
// 環(huán)境變量中的參數(shù)添加到appMasterEnv
// set classpath from YARN configuration
Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);
amContainer.setEnvironment(appMasterEnv);
// 為ApplicationMaster綁定資源
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(clusterSpecification.getMasterMemoryMB());
capability.setVirtualCores(flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));// yarn.appmaster.vcores 默認(rèn)為1
final String customApplicationName = customName != null ? customName : applicationName;
// 應(yīng)用名稱碾局、應(yīng)用類型、隊列優(yōu)先級奴艾、用戶提交的應(yīng)用净当、ContainerLaunchContext
appContext.setApplicationName(customApplicationName);
appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
if (yarnQueue != null) {
appContext.setQueue(yarnQueue);
}
// 應(yīng)用程序節(jié)點標(biāo)簽, 走標(biāo)簽調(diào)度
setApplicationNodeLabel(appContext);
// 設(shè)置多個標(biāo)簽
setApplicationTags(appContext);
// 回調(diào)函數(shù)蕴潦, 任務(wù)提交成功后執(zhí)行
// add a hook to clean up in case deployment fails 部署失敗刪除yarnFilesDir
Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext);
LOG.info("Waiting for the cluster to be allocated");
final long startTime = System.currentTimeMillis();
ApplicationReport report;
YarnApplicationState lastAppState = YarnApplicationState.NEW;
// 阻塞等待直到創(chuàng)建成功
loop: while (true) {
try {
report = yarnClient.getApplicationReport(appId);
} catch (IOException e) {
throw new YarnDeploymentException("Failed to deploy the cluster.", e);
}
YarnApplicationState appState = report.getYarnApplicationState();
LOG.debug("Application State: {}", appState);
switch(appState) {
case FAILED:
case FINISHED:
case KILLED:
throw new YarnDeploymentException("The YARN application unexpectedly switched to state "
+ appState + " during deployment. \n" +
"Diagnostics from YARN: " + report.getDiagnostics() + "\n" +
"If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n" +
"yarn logs -applicationId " + appId);
//break ..
case RUNNING:
LOG.info("YARN application has been deployed successfully.");
break loop;
default:
if (appState != lastAppState) {
LOG.info("Deploying cluster, current state " + appState);
}
if (System.currentTimeMillis() - startTime > 60000) {
LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
}
}
lastAppState = appState;
Thread.sleep(250);
}
// print the application id for user to cancel themselves.
if (isDetachedMode()) {
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop " +
"it:\nyarn application -kill " + appId + "\nPlease also note that the " +
"temporary files of the YARN session in the home directory will not be removed.");
}
// 部署成功移除shutdown 回調(diào)
// since deployment was successful, remove the hook
ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
return report;
}
上傳到HDFS中的文件像啼,紅色表示序列化的JobGraph,local Resource中的key為job.graph潭苞。
- 填充運行時配置忽冻,返回集群客戶端。
String host = report.getHost();
int port = report.getRpcPort();
// Correctly initialize the Flink config
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
flinkConfiguration.setString(RestOptions.ADDRESS, host);
flinkConfiguration.setInteger(RestOptions.PORT, port);
// the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient(
this,
validClusterSpecification.getNumberTaskManagers(),
validClusterSpecification.getSlotsPerTaskManager(),
report,
flinkConfiguration,
true);
protected ClusterClient<ApplicationId> createYarnClusterClient(
AbstractYarnClusterDescriptor descriptor,
int numberTaskManagers,
int slotsPerTaskManager,
ApplicationReport report,
Configuration flinkConfiguration,
boolean perJobCluster) throws Exception {
return new RestClusterClient<>(
flinkConfiguration,
report.getApplicationId());
}
AM運行時獲取JobGraph
生產(chǎn)環(huán)境使用yarn per-job模式執(zhí)行此疹,也就是一個yarn應(yīng)用只執(zhí)行一個jobGraph僧诚,看下yarn per-job模式的執(zhí)行流程遮婶。
jobGraph的執(zhí)行是由Dispatcher發(fā)起的,在Dispatcher創(chuàng)建的時候會先從LocalResource中提取該JobGraph湖笨,把它傳遞給SubmittedJobGraphStore的子類SingleJobSubmittedJobGraphStore旗扑,當(dāng)Dispatcher啟動并成為Leader后會從SingleJobSubmittedJobGraphStore里面恢復(fù)JobGraph,執(zhí)行調(diào)度赶么。具體流程:
- 集群啟動時肩豁,創(chuàng)建Dispatcher和ResourceManager的工廠類。傳遞JobGraphRetriever子類FileJobGraphRetriever辫呻,表示從LocalResource文件中提取JobGraph.
ClusterEntrypoint#runCluster
final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new JobDispatcherResourceManagerComponentFactory(
YarnResourceManagerFactory.INSTANCE,
// 從上傳的文件中提取JobGrap
FileJobGraphRetriever.createFrom(configuration));
}
- 創(chuàng)建MiniDispatcher反序列化JobGraph
@Override
public MiniDispatcher createDispatcher(
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricQueryServicePath,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
FatalErrorHandler fatalErrorHandler,
HistoryServerArchivist historyServerArchivist) throws Exception {
// 獲取jobGraph
final JobGraph jobGraph = jobGraphRetriever.retrieveJobGraph(configuration);
final String executionModeValue = configuration.getString(EXECUTION_MODE);
final ClusterEntrypoint.ExecutionMode executionMode = ClusterEntrypoint.ExecutionMode.valueOf(executionModeValue);
return new MiniDispatcher(
rpcService,
getEndpointId(),
configuration,
highAvailabilityServices,
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricQueryServicePath,
archivedExecutionGraphStore,
DefaultJobManagerRunnerFactory.INSTANCE,
fatalErrorHandler,
historyServerArchivist,
jobGraph,
executionMode);
}
- 將該jobGraph存儲到SingleJobSubmittedJobGraphStore
public MiniDispatcher(
RpcService rpcService,
String endpointId,
Configuration configuration,
HighAvailabilityServices highAvailabilityServices,
GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricQueryServicePath,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
JobManagerRunnerFactory jobManagerRunnerFactory,
FatalErrorHandler fatalErrorHandler,
HistoryServerArchivist historyServerArchivist,
JobGraph jobGraph,
JobClusterEntrypoint.ExecutionMode executionMode) throws Exception {
super(
rpcService,
endpointId,
configuration,
highAvailabilityServices,
new SingleJobSubmittedJobGraphStore(jobGraph),
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricQueryServicePath,
archivedExecutionGraphStore,
jobManagerRunnerFactory,
fatalErrorHandler,
historyServerArchivist);
this.executionMode = checkNotNull(executionMode);
this.jobTerminationFuture = new CompletableFuture<>();
}
- Dispatcher成為Leader后清钥,進(jìn)行任務(wù)恢復(fù),從submittedJobGraphStore拿到傳遞的JobGraph后開始部署放闺。
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
runAsyncWithoutFencing(
() -> {
log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID);
final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoveryOperation.thenApplyAsync(
// recoverJobs 任務(wù)恢復(fù)
FunctionUtils.uncheckedFunction(ignored -> recoverJobs()),
getRpcService().getExecutor());
final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
(Collection<JobGraph> recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
getUnfencedMainThreadExecutor());
final CompletableFuture<Void> confirmationFuture = fencingTokenFuture.thenCombineAsync(
recoveredJobsFuture,
BiFunctionWithException.unchecked((Boolean confirmLeadership, Collection<JobGraph> recoveredJobs) -> {
if (confirmLeadership) {
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
} else {
for (JobGraph recoveredJob : recoveredJobs) {
submittedJobGraphStore.releaseJobGraph(recoveredJob.getJobID());
}
}
return null;
}),
getRpcService().getExecutor());
confirmationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
onFatalError(
new DispatcherException(
String.format("Failed to take leadership with session id %s.", newLeaderSessionID),
(ExceptionUtils.stripCompletionException(throwable))));
}
});
recoveryOperation = confirmationFuture;
});
}
/**
* Recovers all jobs persisted via the submitted job graph store.
*/
@VisibleForTesting
Collection<JobGraph> recoverJobs() throws Exception {
log.info("Recovering all persisted jobs.");
final Collection<JobID> jobIds = submittedJobGraphStore.getJobIds();
try {
return recoverJobGraphs(jobIds);
} catch (Exception e) {
// release all recovered job graphs
for (JobID jobId : jobIds) {
try {
submittedJobGraphStore.releaseJobGraph(jobId);
} catch (Exception ie) {
e.addSuppressed(ie);
}
}
throw e;
}
}
@Nonnull
private Collection<JobGraph> recoverJobGraphs(Collection<JobID> jobIds) throws Exception {
final List<JobGraph> jobGraphs = new ArrayList<>(jobIds.size());
for (JobID jobId : jobIds) {
final JobGraph jobGraph = recoverJob(jobId);
if (jobGraph == null) {
throw new FlinkJobNotFoundException(jobId);
}
jobGraphs.add(jobGraph);
}
return jobGraphs;
}