本文對 Flink 的 Application年局、Per-Job 和 Session 部署模式進行了對比分析。詳細介紹了 Native Kubernetes 場景下的 Application 部署模式从绘,并且對整個啟動流程進行了源碼分析爪瓜。
1.Native Kubernetes Application 簡介
1.1 Flink 部署模式簡介
Flink 的部署模式有 Application桐汤、Per-Job 和 Session 模式。
Application掰派、Per-Job 和 Session 部署模式的主要區(qū)別:
● 集群與作業(yè)的生命周期是否一致
● 資源的隔離程度
● 作業(yè)的mian()
運行在 client 還是集群上
Application 模式的特點:① 作業(yè)與 Flink 集群打包在一起,在 JobManager 的啟動時候會執(zhí)行作業(yè)的 main 函數直接啟動作業(yè)左痢,而不需要通過 Flink Client 提交作業(yè)靡羡。② 作業(yè)的生命周期與 Flink 集群的一致,即作業(yè)關閉后 Flink 集群也會關閉
說明:Application 模式對比 Per-Job 模式最大的區(qū)別是前者使用
executeAsync()
提交作業(yè)(不阻塞)俊性,而后者使用execute()
提交作業(yè)(阻塞)略步,因此 Application 模式可以運行多個作業(yè)
Per-Job 模式的特點:作業(yè)與 Flink 集群不是打包在一起,在 JobManager 啟動后需要通過 Flink Client 提交作業(yè)定页,即增加了網絡傳輸的壓力和客戶端的 CPU 資源趟薄。
Session 模式的特點:常駐的 JobManager,多個作業(yè)共享同一個集群典徊。如果其中一個作業(yè)異常導致 TaskManager 關閉杭煎,則該 TM 上的全部作業(yè)都會重新調度。
1.2 Flink Native Kubernetes Application 架構圖
資源調度方面:Flink 支持 Kubernetes宫峦、Yarn 和 Mesos 資源調度器
Native 是指可以通過底層的資源調度管理器岔帽,實現彈性擴縮容。Native Kubernetes Application 是指 Flink 采用 Application 的部署模式导绷,并使用 Kubernetes 進行資源管理犀勒。
用戶只需要通過 Flink Client/CLI 啟動作業(yè)。首先通過 K8s 啟動 JobManager(deployment)的同時啟動作業(yè),然后通過 JobManager 內部的 K8sResourceManager 模塊向 K8s 直接申請 TaskManager 的資源并啟動贾费,最后當 TM 注冊到 JM 后作業(yè)就提交到 TM钦购。用戶在整個過程無需指定 TaskManager 資源的數量,而是由 JobManager 向 K8s 按需申請的褂萧。
Flink Application on Native Kubernetes 的實踐案例:
《Flink on K8s 在阿里巴巴的實踐》
《Native Flink on K8s 在小紅書的實踐》
《Flink on K8s 在京東的持續(xù)優(yōu)化實踐》
2.啟動流程詳解
2.1 啟動流程總覽
2.2 啟動腳本及其配置
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=top-speed-windowing-application \
-Dkubernetes.container.image=172.1.45.167:5000/flink:1.13.6-scala_2.11 \
local:///opt/flink/examples/streaming/TopSpeedWindowing.jar
Native Kubernetes Application 模式下押桃,啟動腳本 ./bin/flink
的必要參數有 --target kubernetes-application
、-Dkubernetes.cluster-id=***
导犹、-Dkubernetes.container.image=***
和 作業(yè) jar 路徑 local:///***
2.3 啟動 JobManager 和作業(yè)
2.3.1 CliFrontend 入口
public int parseAndRun(String[] args) {
// 省略...
try {
// do action
switch (action) {
case ACTION_RUN:
run(params);
return 0;
// 匹配參數 run-application
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
case ACTION_LIST:
list(params);
return 0;
// 省略...
}
protected void runApplication(String[] args) throws Exception {
// 省略...
// 創(chuàng)建 ApplicationDeployer 用于創(chuàng)建 Kubernetes ClusterDescriptor
final ApplicationDeployer deployer =
new ApplicationClusterDeployer(clusterClientServiceLoader);
if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.emptyList());
} else {
// 作業(yè)參數唱凯,例如 jar 路徑、main 函數入口谎痢、args 入參等等
programOptions = new ProgramOptions(commandLine);
programOptions.validate();
final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
effectiveConfiguration =
getEffectiveConfiguration(
activeCommandLine,
commandLine,
programOptions,
Collections.singletonList(uri.toString()));
}
final ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
// 提交用戶的作業(yè)并在集群中運行其 main 函數
deployer.run(effectiveConfiguration, applicationConfiguration);
}
2.3.2 Flink Client 通過 K8s Client 創(chuàng)建集群
public class ApplicationClusterDeployer implements ApplicationDeployer {
// 省略...
public <ClusterID> void run(
final Configuration configuration,
final ApplicationConfiguration applicationConfiguration)
throws Exception {
// 省略...
// 通過 ClusterClientServiceLoader 創(chuàng)建 KubernetesClusterClientFactory
final ClusterClientFactory<ClusterID> clientFactory =
clientServiceLoader.getClusterClientFactory(configuration);
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clientFactory.createClusterDescriptor(configuration)) {
// 通過 KubernetesClusterClientFactory 創(chuàng)建 KubernetesClusterDescriptor
final ClusterSpecification clusterSpecification =
clientFactory.getClusterSpecification(configuration);
// KubernetesClusterDescriptor 創(chuàng)建 application 集群
clusterDescriptor.deployApplicationCluster(
clusterSpecification, applicationConfiguration);
}
}
}
public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {
// 省略...
@Override
public ClusterClientProvider<String> deployApplicationCluster(
final ClusterSpecification clusterSpecification,
final ApplicationConfiguration applicationConfiguration)
throws ClusterDeploymentException {
// 省略...
// 指定集群入口 KubernetesApplicationClusterEntrypoint 部署/啟動集群
final ClusterClientProvider<String> clusterClientProvider =
deployClusterInternal(
KubernetesApplicationClusterEntrypoint.class.getName(),
clusterSpecification,
false);
// 省略...
}
private ClusterClientProvider<String> deployClusterInternal(
String entryPoint, ClusterSpecification clusterSpecification, boolean detached)
throws ClusterDeploymentException {
// 省略...
// 設置集群配置磕昼,例如啟動入口entry、blobserver端口节猿、taskmanager rpc端口票从、rest端口等等
flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint);
// Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values.
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT);
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT);
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);
// 省略...
// 配置 JobManager 的 PodTemplate
try {
final KubernetesJobManagerParameters kubernetesJobManagerParameters =
new KubernetesJobManagerParameters(flinkConfig, clusterSpecification);
final FlinkPod podTemplate =
kubernetesJobManagerParameters
.getPodTemplateFilePath()
.map(
file ->
KubernetesUtils.loadPodFromTemplateFile(
client, file, Constants.MAIN_CONTAINER_NAME))
.orElse(new FlinkPod.Builder().build());
// 配置 JobManager 的 Deployment
// 配置 Deployment 的過程中,利用 CmdJobManagerDecorator 設置 JobManager main container 的啟動命令滨嘱,即 kubernetes-jobmanager.sh kubernetes-application
final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
podTemplate, kubernetesJobManagerParameters);
client.createJobManagerComponent(kubernetesJobManagerSpec);
return createClusterClientProvider(clusterId);
// 省略...
}
}
}
public class Fabric8FlinkKubeClient implements FlinkKubeClient {
@Override
public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) {
final Deployment deployment = kubernetesJMSpec.getDeployment();
// 省略...
// 利用 Fabric8 Kubernetes Client 創(chuàng)建 JobManager 的 deployment
this.internalClient.resourceList(accompanyingResources).createOrReplace();
}
}
2.3.3 容器內啟動集群
public final class KubernetesApplicationClusterEntrypoint extends ApplicationClusterEntryPoint {
// 省略...
public static void main(final String[] args) {
// 省略...
// 設置作業(yè)配置
PackagedProgram program = null;
try {
program = getPackagedProgram(configuration);
} catch (Exception e) {
LOG.error("Could not create application program.", e);
System.exit(1);
}
try {
configureExecution(configuration, program);
} catch (Exception e) {
LOG.error("Could not apply application configuration.", e);
System.exit(1);
}
final KubernetesApplicationClusterEntrypoint kubernetesApplicationClusterEntrypoint =
new KubernetesApplicationClusterEntrypoint(configuration, program);
// 利用 helper 啟動集群
ClusterEntrypoint.runClusterEntrypoint(kubernetesApplicationClusterEntrypoint);
}
}
private void runCluster(Configuration configuration, PluginManager pluginManager)
throws Exception {
synchronized (lock) {
// 初始化 rpcserver峰鄙、haservice、blobserver等
initializeServices(configuration, pluginManager);
// 省略...
// DispatcherResourceManagerComponent太雨,其封裝Dispatcher吟榴、ResourceManager和WebMonitorEndpoint
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
// 內部使用DispatcherRunnerFactory創(chuàng)建DispatcherRunner
// 接著Dispatcher選主的時候,DefaultDispatcherRunner.grantLeadership() 啟動新 dispatcher leader即startNewDispatcherLeaderProcess()躺彬,DispatcherLeaderProcess.start()會利用JobDispatcherLeaderProcess.create()創(chuàng)建ApplicationDispatcherBootstrap煤墙,最終調用ApplicationDispatcherBootstrap.runApplicationAsync()執(zhí)行用戶作業(yè)的main函數
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
executionGraphInfoStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
this);
// 省略...
}
}
當 Dispatcher 選擇主節(jié)點的時候,DefaultDispatcherRunner.grantLeadership() -> DefaultDispatcherRunner.startNewDispatcherLeaderProcess() -> DispatcherLeaderProcess.start() -> JobDispatcherLeaderProcess.create()創(chuàng)建ApplicationDispatcherBootstrap -> ApplicationDispatcherBootstrap.runApplicationAsync() -> ... -> ClientUtils.executeProgram() 調用作業(yè)的 main函數
說明:Dispatcher 選主是利用了 Kubernetes Client 的
LeaderElector
宪拥,通過KubernetesLeaderElector
封裝 LeaderElector仿野,最終利用LeaderElectionEventHandler
處理選主的回調任務,其樣例如下所示她君。
public class LeaderElectionExample {
public static void main(String[] args) throws Exception {
ApiClient client = Config.defaultClient();
Configuration.setDefaultApiClient(client);
String lockHolderIdentityName = InetAddress.getLocalHost().getHostAddress();
// 創(chuàng)建 ConfigMap 鎖
ConfigMapLock lock = new ConfigMapLock( "default", "leader-election-ip", lockHolderIdentityName);
// Leader 選舉的配置
LeaderElectionConfig leaderElectionConfig =
new LeaderElectionConfig(lock,
Duration.ofMillis(10000),
Duration.ofMillis(8000),
Duration.ofMillis(2000));
// 初始化 LeaderElector
LeaderElector leaderElector = new LeaderElector(leaderElectionConfig);
// 選舉 Leader
leaderElector.run(
() -> {
System.out.println("Do something when getting leadership.");
},
() -> {
System.out.println("Do something when losing leadership.");
});
}
}
2.3.4 ApplicationDispatcherBootstrap 啟動作業(yè)
Dispatcher 通過 ApplicationDispatcherBootstrap
利用異步線程和反射機制脚作,執(zhí)行作業(yè)的 mian 函數,并且使用輪訓的方式不斷查詢作業(yè)的狀態(tài)缔刹,執(zhí)行步驟如下:
步驟 1:通過 ThreadLocal
控制 Context 對象球涛,在外部創(chuàng)建好 applicationJobIds
的引用列表并且層層傳入,然后利用反射執(zhí)行用戶 main 函數校镐;
步驟 2:在 main 函數中通過執(zhí)行 execute 或 executeAysnc 生成流圖并提交作業(yè)亿扁,接著把作業(yè) ID 保存到 submitJobIds
即 applicationJobIds
,因此 ApplicationDispatcherBootstrap
可以獲取提交的 jobId
步驟 3:循環(huán)每個作業(yè) ID 查詢其狀態(tài)是否為結束狀態(tài)鸟廓。如果沒有結束从祝,則一直輪訓狀態(tài)襟己;如果全部結束,則退出并關閉集群牍陌。
2.3.5 申請資源啟動 TaskManager
說明:
KubernetesResourceManagerDriver.requestResource
通過 Kubernetes 申請資源啟動 TaskManager擎浴。