本文僅為筆者平日學(xué)習(xí)記錄之用屈留,侵刪
原文:https://mp.weixin.qq.com/s/MWBoBPVhiB4VgpchtR6_nQ
在Flink1.10任務(wù)提交流程分析(一)中分析了從flink run開始到任務(wù)提交到集群前的流程分析憔辫,對(duì)于不同的提交模式Flink中使用不同的PipelineExecutor倒信,本篇基于yarn-per-job模式分析向yarn-cluster提交任務(wù)的流程馁筐。(注:基于1.10.1分析)
YarnJobClusterExecutor
接著上篇的分析搁进,任務(wù)最終提交是交給PipelineExecutor來execute灼舍,PipelineExecutor的選擇是根據(jù)不同的提交模式來決定即execution.target參數(shù)來決定萧福,對(duì)于yarn-per-job會(huì)選擇YarnJobClusterExecutor類型的executor。
public class YarnJobClusterExecutor extends AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> {
public static final String NAME = "yarn-per-job";
public YarnJobClusterExecutor() {
super(new YarnClusterClientFactory());
}
}
其實(shí)現(xiàn)比較簡(jiǎn)單擎值,比較重要其構(gòu)造器中YarnClusterClientFactory慌烧,用于創(chuàng)建YarnClusterDescriptor,包含了yarn客戶端YarnClient鸠儿、yarn配置屹蚊、提交yarn的隊(duì)列等一些提交yarn的信息。它繼承了AbstractJobClusterExecutor 抽象任務(wù)提交executor进每,execute也是由AbstractJobClusterExecutor來執(zhí)行:
public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements PipelineExecutor {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class);
//代表的就是YarnClusterClientFactory
private final ClientFactory clusterClientFactory;
public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
this.clusterClientFactory = checkNotNull(clusterClientFactory);
}
//執(zhí)行任務(wù)提交
//pipeline 代表StreamGraph
public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
//將StreamGraph轉(zhuǎn)換為JobGraph
final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
//創(chuàng)建提交任務(wù)的一些信息:YarnClusterDescriptor
try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
//將配置信息封裝在ExecutionConfigAccessor中
final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
//包含了提交任務(wù)所需資源描述:內(nèi)存大小汹粤、并行度
final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
//提交任務(wù)
final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor
.deployJobCluster(clusterSpecification, jobGraph,
//是否采用分離模式
configAccessor.getDetachedMode());
LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
return CompletableFuture.completedFuture(
new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID()));
}
}
}
關(guān)于ClusterSpecification中描述了任務(wù)提交到集群所需的資源大小,對(duì)于分配模式建議詳讀一下官網(wǎng)Flink1.10的內(nèi)存管理機(jī)制便于更好的理解田晚。任務(wù)最終交給YarnClusterDescriptor deploy嘱兼。
Deploy過程
deploy過程代表了與yarn交互的過程,clusterDescriptor.deployJobCluster會(huì)調(diào)用內(nèi)部deployInternal方法:
private ClusterClientProvider<ApplicationId> deployInternal(
ClusterSpecification clusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached) throws Exception {
//..... 會(huì)做一些檢查工作: yarn隊(duì)列是否存在贤徒、配置檢查
//校驗(yàn)資源大小等等
ApplicationReport report = startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
//....
}
最重的就是startAppMaster芹壕,在yarn上啟動(dòng)一個(gè)AppMaster進(jìn)程,其中yarnClusterEntrypoint表示該進(jìn)程的入口類接奈,也就是JobMaster的啟動(dòng)入口類:org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint踢涌, 在集群的機(jī)器進(jìn)程上也能看到該類,如果看到這個(gè)進(jìn)程我們就可知道表示的是JobMaster的進(jìn)程序宦。startAppMaster的過程比較長(zhǎng)睁壁,這里也會(huì)逐一分解:
private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification) throws Exception {
// ------------------ Initialize the file systems -------------------------
org.apache.flink.core.fs.FileSystem.initialize(
configuration,
PluginUtils.createPluginManagerFromRootFolder(configuration));
//獲取homeDir, 表示jar挨厚、log配置上傳的路徑堡僻, 一般表示在hdfs上
//其路徑為/user/hadoop, (hadoop表示的當(dāng)前的用戶)
final FileSystem fs = FileSystem.get(yarnConfiguration);
final Path homeDir = fs.getHomeDirectory();
//提交到y(tǒng)arn的描述信息
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
// 會(huì)被上傳到hdfs的文件 并且被添加到classpath中
Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
// 僅僅是會(huì)被上傳到hdfs , 但是不會(huì)被添加到classpath
Set<File> shipOnlyFiles = new HashSet<>();
for (File file : shipFiles) {
systemShipFiles.add(file.getAbsoluteFile());
}
final String logConfigFilePath = configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
if (logConfigFilePath != null) {
systemShipFiles.add(new File(logConfigFilePath));
}
//將flink_home/lib 下的文件添加到systemShipFiles、通過-yt指定的文件也在里面
addLibFoldersToShipFiles(systemShipFiles);
//將flink_home/plugins 下的文件添加到shipOnlyFiles
addPluginsFoldersToShipFiles(shipOnlyFiles);
final ApplicationId appId = appContext.getApplicationId();
// zk-ha相關(guān)的配置
String zkNamespace = getZookeeperNamespace();
// no user specified cli argument for namespace?
if (zkNamespace == null || zkNamespace.isEmpty()) {
// namespace defined in config? else use applicationId as default.
zkNamespace = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
setZookeeperNamespace(zkNamespace);
}
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
// activate re-execution of failed applications
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
activateHighAvailabilitySupport(appContext);
} else {
// set number of application retries to 1 in the default case
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
1));
}
//userJarFiles 表示用戶jar
final Set<File> userJarFiles = (jobGraph == null)
// not per-job submission
? Collections.emptySet()
// add user code jars from the provided JobGraph
: jobGraph.getUserJars().stream().map(f -> f.toUri()).map(File::new).collect(Collectors.toSet());
//需要cache文件上傳到hdfs疫剃,一般使用在文件共享中
if (jobGraph != null) {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : jobGraph.getUserArtifacts().entrySet()) {
org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(entry.getValue().filePath);
// only upload local files
if (!path.getFileSystem().isDistributedFS()) {
Path localPath = new Path(path.getPath());
Tuple2<Path, Long> remoteFileInfo =
Utils.uploadLocalFileToRemote(fs, appId.toString(), localPath, homeDir, entry.getKey());
jobGraph.setUserArtifactRemotePath(entry.getKey(), remoteFileInfo.f0.toString());
}
}
jobGraph.writeUserArtifactEntriesToConfiguration();
}
//表示啟動(dòng)appMaster需要的資源文件钉疫,會(huì)從hdfs上下載
final Map<String, LocalResource> localResources = new HashMap<>(2 + systemShipFiles.size() + userJarFiles.size());
// 訪問hdfs的安全設(shè)置
final List<Path> paths = new ArrayList<>(2 + systemShipFiles.size() + userJarFiles.size());
// 啟動(dòng)taskExecutor需要的資源文件
StringBuilder envShipFileList = new StringBuilder();
//幾個(gè)uploadAndRegisterFiles 方法,將systemShipFiles巢价、shipOnlyFiles牲阁、用戶jar上傳到hdfs
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
}
// normalize classpath by sorting
Collections.sort(systemClassPaths); //系統(tǒng)的一些classpath 排序
Collections.sort(userClassPaths); //用戶classpath 排序
// classPathBuilder: 存放classpath的信息
StringBuilder classPathBuilder = new StringBuilder();
/*
* 構(gòu)建classpath: shipFile-jar、user-jar壤躲、log4j城菊、yaml配置文件
*/
final Path yarnFilesDir = getYarnFilesDir(appId);
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
fs.setPermission(yarnFilesDir, permission); // set permission for path.
/*
*中間一堆與安全相關(guān)的配置
*/
//執(zhí)行的java命令信息,啟動(dòng)YarnJobClusterEntrypoint
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
yarnClusterEntrypoint,
hasLogback,
hasLog4j,
hasKrb5,
clusterSpecification.getMasterMemoryMB());
if (UserGroupInformation.isSecurityEnabled()) {
// set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container.");
Utils.setTokensFor(amContainer, paths, yarnConfiguration);
}
amContainer.setLocalResources(localResources);
fs.close();
// Setup CLASSPATH and environment variables for ApplicationMaster
final Map<String, String> appMasterEnv = new HashMap<>();
/**
* 配置環(huán)境變量參數(shù) 到 appMasterEnv中碉克,在啟動(dòng)啟動(dòng)YarnJobClusterEntrypoint時(shí)用到凌唬,
* 例如:classpath、hadoopUser漏麦、appId等
*/
amContainer.setEnvironment(appMasterEnv);
// 還有一堆設(shè)置提交任務(wù)隊(duì)列客税、yarn任務(wù)名稱的配置信息
// add a hook to clean up in case deployment fails
Thread deploymentFailureHook = new DeploymentFailureHook(yarnApplication, yarnFilesDir);
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
LOG.info("Submitting application master " + appId);
//提交任務(wù)
yarnClient.submitApplication(appContext);
/**
* 獲取任務(wù)狀態(tài)
*/
}
這部分的流程比較長(zhǎng)况褪,總結(jié)一下主要有以下幾點(diǎn):
將shipFiles、plugins更耻、userJar测垛、logFile、flink-conf.yaml秧均、job.graph等文件上傳到hdfs
構(gòu)建啟動(dòng)需要的classpath食侮、ha-zk配置、安全配置目胡、jobMaster啟動(dòng)命令等
向yarn提交任務(wù)
在yarn上啟動(dòng)成功后锯七,在JobMaster的工作目錄可以看到launch_container.sh這樣的一個(gè)文件,這個(gè)文件里面包含了在startAppMaster所做的所有環(huán)境變量參數(shù)設(shè)置讶隐、啟動(dòng)命令起胰。
總結(jié)
本篇主要介紹了yarn-per-job的任務(wù)提交流程久又,結(jié)合前面兩篇的分析巫延,到現(xiàn)在應(yīng)該掌握了如何通過API的方式去實(shí)現(xiàn)任務(wù)的提交,我認(rèn)為重要有兩點(diǎn):一是做好參數(shù)的解析地消、配置炉峰,二是選擇一個(gè)合適的PipelineExecutor提交任務(wù)。