【Flink on k8s】Native Kubernetes Application 部署模式詳解

本文對 Flink 的 Application年局、Per-Job 和 Session 部署模式進行了對比分析。詳細介紹了 Native Kubernetes 場景下的 Application 部署模式从绘,并且對整個啟動流程進行了源碼分析爪瓜。

1.Native Kubernetes Application 簡介

1.1 Flink 部署模式簡介

Flink 的部署模式有 Application桐汤、Per-Job 和 Session 模式

Application掰派、Per-Job 和 Session 部署模式的主要區(qū)別:
● 集群與作業(yè)的生命周期是否一致
● 資源的隔離程度
● 作業(yè)的 mian() 運行在 client 還是集群上

Application 模式的特點:① 作業(yè)與 Flink 集群打包在一起,在 JobManager 的啟動時候會執(zhí)行作業(yè)的 main 函數直接啟動作業(yè)左痢,而不需要通過 Flink Client 提交作業(yè)靡羡。② 作業(yè)的生命周期與 Flink 集群的一致,即作業(yè)關閉后 Flink 集群也會關閉

說明:Application 模式對比 Per-Job 模式最大的區(qū)別是前者使用 executeAsync() 提交作業(yè)(不阻塞)俊性,而后者使用 execute() 提交作業(yè)(阻塞)略步,因此 Application 模式可以運行多個作業(yè)

Per-Job 模式的特點:作業(yè)與 Flink 集群不是打包在一起,在 JobManager 啟動后需要通過 Flink Client 提交作業(yè)定页,即增加了網絡傳輸的壓力和客戶端的 CPU 資源趟薄。

Session 模式的特點:常駐的 JobManager,多個作業(yè)共享同一個集群典徊。如果其中一個作業(yè)異常導致 TaskManager 關閉杭煎,則該 TM 上的全部作業(yè)都會重新調度。

部署模式匯總.PNG

1.2 Flink Native Kubernetes Application 架構圖

資源調度方面:Flink 支持 Kubernetes宫峦、Yarn 和 Mesos 資源調度器

Native 是指可以通過底層的資源調度管理器岔帽,實現彈性擴縮容。Native Kubernetes Application 是指 Flink 采用 Application 的部署模式导绷,并使用 Kubernetes 進行資源管理犀勒。

用戶只需要通過 Flink Client/CLI 啟動作業(yè)。首先通過 K8s 啟動 JobManager(deployment)的同時啟動作業(yè),然后通過 JobManager 內部的 K8sResourceManager 模塊向 K8s 直接申請 TaskManager 的資源并啟動贾费,最后當 TM 注冊到 JM 后作業(yè)就提交到 TM钦购。用戶在整個過程無需指定 TaskManager 資源的數量,而是由 JobManager 向 K8s 按需申請的褂萧。

flink native kubernetes application 架構圖.png

Flink Application on Native Kubernetes 的實踐案例
《Flink on K8s 在阿里巴巴的實踐》
《Native Flink on K8s 在小紅書的實踐》
《Flink on K8s 在京東的持續(xù)優(yōu)化實踐》

2.啟動流程詳解

2.1 啟動流程總覽

image.png

2.2 啟動腳本及其配置

$ ./bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=top-speed-windowing-application \
    -Dkubernetes.container.image=172.1.45.167:5000/flink:1.13.6-scala_2.11 \
    local:///opt/flink/examples/streaming/TopSpeedWindowing.jar

Native Kubernetes Application 模式下押桃,啟動腳本 ./bin/flink 的必要參數有 --target kubernetes-application-Dkubernetes.cluster-id=***导犹、-Dkubernetes.container.image=*** 和 作業(yè) jar 路徑 local:///***

2.3 啟動 JobManager 和作業(yè)

2.3.1 CliFrontend 入口

    public int parseAndRun(String[] args) {
        // 省略...
        try {
            // do action
            switch (action) {
                case ACTION_RUN:
                    run(params);
                    return 0;
                 // 匹配參數 run-application
                case ACTION_RUN_APPLICATION:
                    runApplication(params);
                    return 0;
                case ACTION_LIST:
                    list(params);
                    return 0;
                // 省略...
    }


    protected void runApplication(String[] args) throws Exception {
        //  省略...

        //  創(chuàng)建 ApplicationDeployer 用于創(chuàng)建 Kubernetes ClusterDescriptor
        final ApplicationDeployer deployer =
                new ApplicationClusterDeployer(clusterClientServiceLoader);

        if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {
            programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);
            effectiveConfiguration =
                    getEffectiveConfiguration(
                            activeCommandLine,
                            commandLine,
                            programOptions,
                            Collections.emptyList());
        } else {
            //  作業(yè)參數唱凯,例如 jar 路徑、main 函數入口谎痢、args 入參等等
            programOptions = new ProgramOptions(commandLine);
            programOptions.validate();
            final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());
            effectiveConfiguration =
                    getEffectiveConfiguration(
                            activeCommandLine,
                            commandLine,
                            programOptions,
                            Collections.singletonList(uri.toString()));
        }

        final ApplicationConfiguration applicationConfiguration =
                new ApplicationConfiguration(
                        programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
        //  提交用戶的作業(yè)并在集群中運行其 main 函數
        deployer.run(effectiveConfiguration, applicationConfiguration);
    }

2.3.2 Flink Client 通過 K8s Client 創(chuàng)建集群

public class ApplicationClusterDeployer implements ApplicationDeployer {
    // 省略...
    public <ClusterID> void run(
            final Configuration configuration,
            final ApplicationConfiguration applicationConfiguration)
            throws Exception {
        // 省略...
        // 通過 ClusterClientServiceLoader 創(chuàng)建 KubernetesClusterClientFactory
        final ClusterClientFactory<ClusterID> clientFactory =
                clientServiceLoader.getClusterClientFactory(configuration);
        try (final ClusterDescriptor<ClusterID> clusterDescriptor =
                clientFactory.createClusterDescriptor(configuration)) {
            // 通過 KubernetesClusterClientFactory 創(chuàng)建 KubernetesClusterDescriptor
            final ClusterSpecification clusterSpecification =
                    clientFactory.getClusterSpecification(configuration);
            // KubernetesClusterDescriptor 創(chuàng)建 application 集群
            clusterDescriptor.deployApplicationCluster(
                    clusterSpecification, applicationConfiguration);
        }
    }
}

public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {
    // 省略...
    @Override
    public ClusterClientProvider<String> deployApplicationCluster(
            final ClusterSpecification clusterSpecification,
            final ApplicationConfiguration applicationConfiguration)
            throws ClusterDeploymentException {
        // 省略...
        // 指定集群入口 KubernetesApplicationClusterEntrypoint 部署/啟動集群
        final ClusterClientProvider<String> clusterClientProvider =
                deployClusterInternal(
                        KubernetesApplicationClusterEntrypoint.class.getName(),
                        clusterSpecification,
                        false);
        // 省略...
    }


    private ClusterClientProvider<String> deployClusterInternal(
            String entryPoint, ClusterSpecification clusterSpecification, boolean detached)
            throws ClusterDeploymentException {
        // 省略...
        // 設置集群配置磕昼,例如啟動入口entry、blobserver端口节猿、taskmanager rpc端口票从、rest端口等等
        flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint);

        // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values.
        KubernetesUtils.checkAndUpdatePortConfigOption(
                flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT);
        KubernetesUtils.checkAndUpdatePortConfigOption(
                flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT);
        KubernetesUtils.checkAndUpdatePortConfigOption(
                flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);
        // 省略...

        // 配置 JobManager 的 PodTemplate
        try {
            final KubernetesJobManagerParameters kubernetesJobManagerParameters =
                    new KubernetesJobManagerParameters(flinkConfig, clusterSpecification);

            final FlinkPod podTemplate =
                    kubernetesJobManagerParameters
                            .getPodTemplateFilePath()
                            .map(
                                    file ->
                                            KubernetesUtils.loadPodFromTemplateFile(
                                                    client, file, Constants.MAIN_CONTAINER_NAME))
                            .orElse(new FlinkPod.Builder().build());
            // 配置 JobManager 的 Deployment
            // 配置 Deployment 的過程中,利用 CmdJobManagerDecorator 設置 JobManager main container 的啟動命令滨嘱,即 kubernetes-jobmanager.sh kubernetes-application
            final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
                    KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
                            podTemplate, kubernetesJobManagerParameters);

            client.createJobManagerComponent(kubernetesJobManagerSpec);

            return createClusterClientProvider(clusterId);
            // 省略...
        } 
    }
}

public class Fabric8FlinkKubeClient implements FlinkKubeClient {
    @Override
    public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) {
        final Deployment deployment = kubernetesJMSpec.getDeployment();
        // 省略...
        // 利用  Fabric8 Kubernetes Client 創(chuàng)建 JobManager 的 deployment
        this.internalClient.resourceList(accompanyingResources).createOrReplace();
    }
}

2.3.3 容器內啟動集群

public final class KubernetesApplicationClusterEntrypoint extends ApplicationClusterEntryPoint {
    // 省略...
    public static void main(final String[] args) {
        // 省略...
        // 設置作業(yè)配置
        PackagedProgram program = null;
        try {
            program = getPackagedProgram(configuration);
        } catch (Exception e) {
            LOG.error("Could not create application program.", e);
            System.exit(1);
        }

        try {
            configureExecution(configuration, program);
        } catch (Exception e) {
            LOG.error("Could not apply application configuration.", e);
            System.exit(1);
        }

        final KubernetesApplicationClusterEntrypoint kubernetesApplicationClusterEntrypoint =
                new KubernetesApplicationClusterEntrypoint(configuration, program);
        // 利用 helper 啟動集群
        ClusterEntrypoint.runClusterEntrypoint(kubernetesApplicationClusterEntrypoint);
    }
}

    private void runCluster(Configuration configuration, PluginManager pluginManager)
            throws Exception {
        synchronized (lock) {
            //  初始化 rpcserver峰鄙、haservice、blobserver等
            initializeServices(configuration, pluginManager);
            //  省略...
            //  DispatcherResourceManagerComponent太雨,其封裝Dispatcher吟榴、ResourceManager和WebMonitorEndpoint
            final DispatcherResourceManagerComponentFactory
                    dispatcherResourceManagerComponentFactory =
                            createDispatcherResourceManagerComponentFactory(configuration);
            //  內部使用DispatcherRunnerFactory創(chuàng)建DispatcherRunner
            // 接著Dispatcher選主的時候,DefaultDispatcherRunner.grantLeadership() 啟動新 dispatcher leader即startNewDispatcherLeaderProcess()躺彬,DispatcherLeaderProcess.start()會利用JobDispatcherLeaderProcess.create()創(chuàng)建ApplicationDispatcherBootstrap煤墙,最終調用ApplicationDispatcherBootstrap.runApplicationAsync()執(zhí)行用戶作業(yè)的main函數
            clusterComponent =
                    dispatcherResourceManagerComponentFactory.create(
                            configuration,
                            ioExecutor,
                            commonRpcService,
                            haServices,
                            blobServer,
                            heartbeatServices,
                            metricRegistry,
                            executionGraphInfoStore,
                            new RpcMetricQueryServiceRetriever(
                                    metricRegistry.getMetricQueryServiceRpcService()),
                            this);
            //  省略...
        }
    }

當 Dispatcher 選擇主節(jié)點的時候,DefaultDispatcherRunner.grantLeadership() -> DefaultDispatcherRunner.startNewDispatcherLeaderProcess() -> DispatcherLeaderProcess.start() -> JobDispatcherLeaderProcess.create()創(chuàng)建ApplicationDispatcherBootstrap -> ApplicationDispatcherBootstrap.runApplicationAsync() -> ... -> ClientUtils.executeProgram() 調用作業(yè)的 main函數

說明:Dispatcher 選主是利用了 Kubernetes Client 的 LeaderElector宪拥,通過 KubernetesLeaderElector 封裝 LeaderElector仿野,最終利用 LeaderElectionEventHandler 處理選主的回調任務,其樣例如下所示她君。

public class LeaderElectionExample {
    public static void main(String[] args) throws Exception {
        ApiClient client = Config.defaultClient();
        Configuration.setDefaultApiClient(client);
        String lockHolderIdentityName = InetAddress.getLocalHost().getHostAddress();
        // 創(chuàng)建 ConfigMap 鎖
        ConfigMapLock lock = new ConfigMapLock( "default", "leader-election-ip", lockHolderIdentityName);
        // Leader 選舉的配置
        LeaderElectionConfig leaderElectionConfig =
                new LeaderElectionConfig(lock,
                        Duration.ofMillis(10000),
                        Duration.ofMillis(8000),
                        Duration.ofMillis(2000));

        // 初始化 LeaderElector
        LeaderElector leaderElector = new LeaderElector(leaderElectionConfig);
        // 選舉 Leader
        leaderElector.run(
                () -> {
                    System.out.println("Do something when getting leadership.");
                },
                () -> {
                    System.out.println("Do something when losing leadership.");
                });
    }
}

2.3.4 ApplicationDispatcherBootstrap 啟動作業(yè)

Dispatcher 通過 ApplicationDispatcherBootstrap 利用異步線程和反射機制脚作,執(zhí)行作業(yè)的 mian 函數,并且使用輪訓的方式不斷查詢作業(yè)的狀態(tài)缔刹,執(zhí)行步驟如下:

步驟 1:通過 ThreadLocal 控制 Context 對象球涛,在外部創(chuàng)建好 applicationJobIds 的引用列表并且層層傳入,然后利用反射執(zhí)行用戶 main 函數校镐;

步驟 2:在 main 函數中通過執(zhí)行 execute 或 executeAysnc 生成流圖并提交作業(yè)亿扁,接著把作業(yè) ID 保存到 submitJobIdsapplicationJobIds,因此 ApplicationDispatcherBootstrap 可以獲取提交的 jobId

步驟 3:循環(huán)每個作業(yè) ID 查詢其狀態(tài)是否為結束狀態(tài)鸟廓。如果沒有結束从祝,則一直輪訓狀態(tài)襟己;如果全部結束,則退出并關閉集群牍陌。

image.png

2.3.5 申請資源啟動 TaskManager

說明KubernetesResourceManagerDriver.requestResource 通過 Kubernetes 申請資源啟動 TaskManager擎浴。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市毒涧,隨后出現的幾起案子贮预,更是在濱河造成了極大的恐慌,老刑警劉巖契讲,帶你破解...
    沈念sama閱讀 221,576評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件仿吞,死亡現場離奇詭異,居然都是意外死亡捡偏,警方通過查閱死者的電腦和手機茫藏,發(fā)現死者居然都...
    沈念sama閱讀 94,515評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來霹琼,“玉大人,你說我怎么就攤上這事凉当≡嫔辏” “怎么了?”我有些...
    開封第一講書人閱讀 168,017評論 0 360
  • 文/不壞的土叔 我叫張陵看杭,是天一觀的道長忠藤。 經常有香客問我,道長楼雹,這世上最難降的妖魔是什么模孩? 我笑而不...
    開封第一講書人閱讀 59,626評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮贮缅,結果婚禮上榨咐,老公的妹妹穿的比我還像新娘。我一直安慰自己谴供,他們只是感情好块茁,可當我...
    茶點故事閱讀 68,625評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著桂肌,像睡著了一般数焊。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上崎场,一...
    開封第一講書人閱讀 52,255評論 1 308
  • 那天佩耳,我揣著相機與錄音,去河邊找鬼谭跨。 笑死干厚,一個胖子當著我的面吹牛李滴,可吹牛的內容都是我干的。 我是一名探鬼主播萍诱,決...
    沈念sama閱讀 40,825評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼悬嗓,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了裕坊?” 一聲冷哼從身側響起包竹,我...
    開封第一講書人閱讀 39,729評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎籍凝,沒想到半個月后周瞎,有當地人在樹林里發(fā)現了一具尸體,經...
    沈念sama閱讀 46,271評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡饵蒂,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 38,363評論 3 340
  • 正文 我和宋清朗相戀三年声诸,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片退盯。...
    茶點故事閱讀 40,498評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡彼乌,死狀恐怖,靈堂內的尸體忽然破棺而出渊迁,到底是詐尸還是另有隱情慰照,我是刑警寧澤,帶...
    沈念sama閱讀 36,183評論 5 350
  • 正文 年R本政府宣布琉朽,位于F島的核電站毒租,受9級特大地震影響,放射性物質發(fā)生泄漏箱叁。R本人自食惡果不足惜墅垮,卻給世界環(huán)境...
    茶點故事閱讀 41,867評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望耕漱。 院中可真熱鬧算色,春花似錦、人聲如沸孤个。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽齐鲤。三九已至斥废,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間给郊,已是汗流浹背牡肉。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留淆九,地道東北人统锤。 一個月前我還...
    沈念sama閱讀 48,906評論 3 376
  • 正文 我出身青樓毛俏,卻偏偏與公主長得像,于是被迫代替她去往敵國和親饲窿。 傳聞我的和親對象是個殘疾皇子煌寇,可洞房花燭夜當晚...
    茶點故事閱讀 45,507評論 2 359

推薦閱讀更多精彩內容