Flink源碼解析之Job提交(下)

上一篇文章我們講解了通過命令行將一個(gè)Job提交到TaskManager的整體過程灯帮,但是我們中間忽略了一些細(xì)節(jié),比如Job提交到集群的哪些節(jié)點(diǎn)做瞪,JobGraph是什么契讲,它是如何生成的?JobClient又是如何將Job提交到集群中的等等缰犁,本文會(huì)為你一一解決這些問題淳地。

Flink運(yùn)行時(shí)環(huán)境

Flink運(yùn)行時(shí)主要包含兩種類型的處理器:

  • JobManager: 主要負(fù)責(zé)協(xié)調(diào)分布式執(zhí)行怖糊。調(diào)度任務(wù),協(xié)調(diào)Checkpoint颇象,協(xié)調(diào)故障時(shí)容錯(cuò)功能等伍伤。
  • TaskManager: 執(zhí)行數(shù)據(jù)流的Task(或更具體地說,子任務(wù))遣钳,并緩沖和交換數(shù)據(jù)流扰魂。

根據(jù)JobManager和TaskManager的分工和名稱,應(yīng)該可以很顯然的看出JobClient提交Job到JobManager節(jié)點(diǎn)上蕴茴,并通過它將子任務(wù)分配到TaskManager上執(zhí)行劝评。

交互模式提交Job

在通過命令行提交Job時(shí),會(huì)調(diào)用CluterClient的run方法去執(zhí)行提交邏輯倦淀,而且分為兩種方式蒋畜,交互模式和非交互模式:

    public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException {
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        if (prog.isUsingProgramEntryPoint()) { // 如果包含入口類(非交互模式提交Job)

            // JobWithJars是一個(gè)Flink數(shù)據(jù)流計(jì)劃,包含了jar中所有的類晃听,以及用于加載用戶代碼的ClassLoader
            final JobWithJars jobWithJars;
            if (hasUserJarsInClassPath(prog.getAllLibraries())) {
                jobWithJars = prog.getPlanWithoutJars();
            } else {
                jobWithJars = prog.getPlanWithJars();
            }

            return run(jobWithJars, parallelism, prog.getSavepointSettings());
        } else if (prog.isUsingInteractiveMode()) { // 使用交互模式提交Job
            log.info("Starting program in interactive mode");

            final List<URL> libraries;
            if (hasUserJarsInClassPath(prog.getAllLibraries())) {
                libraries = Collections.emptyList();
            } else {
                libraries = prog.getAllLibraries();
            }

            ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries,
                prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
                prog.getSavepointSettings());
            ContextEnvironment.setAsContext(factory);

            try {
                // 調(diào)用main方法
                prog.invokeInteractiveModeForExecution();
                if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) {
                    throw new ProgramMissingJobException("The program didn't contain a Flink job.");
                }
                if (isDetached()) {
                    // in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here
                    return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute();
                } else {
                    // in blocking mode, we execute all Flink jobs contained in the user code and then return here
                    return this.lastJobExecutionResult;
                }
            } finally {
                ContextEnvironment.unsetContext();
            }
        } else {
            throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode.");
        }
    }

而實(shí)際中百侧,大家可能都是采用交互模式提交作業(yè),在提交的jar包中包含mainClass能扒。以Flink的流處理示例WordCount為例:

    public static void main(String[] args) throws Exception {

        // Checking input parameters
        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataStream<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            // get default test text data
            text = env.fromElements(WordCountData.WORDS);
        }

        DataStream<Tuple2<String, Integer>> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new Tokenizer())
        // group by the tuple field "0" and sum up tuple field "1"
                .keyBy(0).sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }

        // execute program
        env.execute("Streaming WordCount");
    }

ClusterClient中的prog.invokeInteractiveModeForExecution();其實(shí)就是調(diào)用WordCount的main方法佣渴。main方法的邏輯很簡(jiǎn)單,分為兩部分:構(gòu)建和執(zhí)行數(shù)據(jù)流初斑。本節(jié)重點(diǎn)講執(zhí)行數(shù)據(jù)流辛润,也就是最后一行的env.execute("Streaming WordCount");
以本地流執(zhí)行環(huán)境(LocalStreamEnvironment)來看一下execute方法執(zhí)行了哪些邏輯

    @Override
    public JobExecutionResult execute(String jobName) throws Exception {
        // transform the streaming program into a JobGraph
        // 生成流圖
        StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);

        // 將流圖轉(zhuǎn)換成作業(yè)圖
        JobGraph jobGraph = streamGraph.getJobGraph();

        Configuration configuration = new Configuration();
        configuration.addAll(jobGraph.getJobConfiguration());

        configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
        configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

        // add (and override) the settings with what the user defined
        configuration.addAll(this.conf);

        if (LOG.isInfoEnabled()) {
            LOG.info("Running job on local embedded Flink mini cluster");
        }

        LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true);
        try {
            exec.start();
            // 提交作業(yè)圖
            return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled());
        }
        finally {
            transformations.clear();
            exec.stop();
        }
    }

可以看出主要分為三步:生成流圖见秤,生成作業(yè)圖和提交Job砂竖。首先看下提交Job的邏輯

  @throws(classOf[JobExecutionException])
  def submitJobAndWait(
      jobGraph: JobGraph,
      printUpdates: Boolean)
    : JobExecutionResult = {
    submitJobAndWait(jobGraph, printUpdates, timeout)
  }
  @throws(classOf[JobExecutionException])
  def submitJobAndWait(
      jobGraph: JobGraph,
      printUpdates: Boolean,
      timeout: FiniteDuration)
    : JobExecutionResult = {

    val clientActorSystem = startJobClientActorSystem(jobGraph.getJobID)

    val userCodeClassLoader =
      try {
        createUserCodeClassLoader(
          jobGraph.getUserJars,
          jobGraph.getClasspaths,
          Thread.currentThread().getContextClassLoader)
      } catch {
        case e: Exception => throw new JobExecutionException(
          jobGraph.getJobID,
          "Could not create the user code class loader.",
          e)
      }

    try {
      JobClient.submitJobAndWait(
       clientActorSystem,
       configuration,
       highAvailabilityServices,
       jobGraph,
       timeout,
       printUpdates,
       userCodeClassLoader)
    } finally {
       if(!useSingleActorSystem) {
         // we have to shutdown the just created actor system
         shutdownJobClientActorSystem(clientActorSystem)
       }
     }
  }

通過執(zhí)行鏈,可以看出最終還是會(huì)通過上文描述過的JobClient.submitJobAndWait(...)方法提交作業(yè)鹃答,這里不再贅述乎澄。JobClient會(huì)啟動(dòng)一個(gè)Actor System,雖然它不是Flink運(yùn)行時(shí)的一部分测摔,但是它可以斷開連接置济,或者保持連接以接收進(jìn)度報(bào)告。一個(gè)整體的Job提交圖如下所示:

上面講了提交作業(yè)的三步锋八,第一和第二步分別是生成流圖和作業(yè)圖浙于,下面我們分別看下流圖和作業(yè)圖 。

流圖

StreamGraph(流圖)是用來表示流的拓補(bǔ)結(jié)構(gòu)的數(shù)據(jù)結(jié)構(gòu)挟纱,它包含了生成JobGraph的必要信息羞酗。
流圖是由節(jié)點(diǎn)和邊組成的,分別對(duì)應(yīng)數(shù)據(jù)結(jié)構(gòu)StreamNode和StreamEdge紊服。一個(gè)StreamGraph可能如下圖所示:


下面我們看下StreamGraph是如何創(chuàng)建的檀轨,即getStreamGraph()方法的邏輯胸竞。

    @Internal
    public StreamGraph getStreamGraph() {
        if (transformations.size() <= 0) {
            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
        }
        return StreamGraphGenerator.generate(this, transformations);
    }
    public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
        return new StreamGraphGenerator(env).generateInternal(transformations);
    }
    /**
     * This starts the actual transformation, beginning from the sinks.
     */
    private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
        for (StreamTransformation<?> transformation: transformations) {
            transform(transformation);
        }
        return streamGraph;
    }
    /**
     * Transforms one {@code StreamTransformation}.
     *
     * <p>This checks whether we already transformed it and exits early in that case. If not it
     * delegates to one of the transformation specific methods.
     */
    private Collection<Integer> transform(StreamTransformation<?> transform) {

        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        LOG.debug("Transforming " + transform);

        if (transform.getMaxParallelism() <= 0) {

            // if the max parallelism hasn't been set, then first use the job wide max parallelism
            // from theExecutionConfig.
            int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
            if (globalMaxParallelismFromConfig > 0) {
                transform.setMaxParallelism(globalMaxParallelismFromConfig);
            }
        }

        // call at least once to trigger exceptions about MissingTypeInfo
        transform.getOutputType();

        Collection<Integer> transformedIds;
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
        } else if (transform instanceof SourceTransformation<?>) {
            transformedIds = transformSource((SourceTransformation<?>) transform);
        } else if (transform instanceof SinkTransformation<?>) {
            transformedIds = transformSink((SinkTransformation<?>) transform);
        } else if (transform instanceof UnionTransformation<?>) {
            transformedIds = transformUnion((UnionTransformation<?>) transform);
        } else if (transform instanceof SplitTransformation<?>) {
            transformedIds = transformSplit((SplitTransformation<?>) transform);
        } else if (transform instanceof SelectTransformation<?>) {
            transformedIds = transformSelect((SelectTransformation<?>) transform);
        } else if (transform instanceof FeedbackTransformation<?>) {
            transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
        } else if (transform instanceof CoFeedbackTransformation<?>) {
            transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
        } else if (transform instanceof PartitionTransformation<?>) {
            transformedIds = transformPartition((PartitionTransformation<?>) transform);
        } else if (transform instanceof SideOutputTransformation<?>) {
            transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
        } else {
            throw new IllegalStateException("Unknown transformation: " + transform);
        }

        // need this check because the iterate transformation adds itself before
        // transforming the feedback edges
        if (!alreadyTransformed.containsKey(transform)) {
            alreadyTransformed.put(transform, transformedIds);
        }

        if (transform.getBufferTimeout() > 0) {
            streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        }
        if (transform.getUid() != null) {
            streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }
        if (transform.getUserProvidedNodeHash() != null) {
            streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }

        if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
            streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
        }

        return transformedIds;
    }

可以看出,核心的邏輯在transform(StreamTransformation<?> transform)方法中裤园,可能大家疑惑StreamTransformation是什么撤师?StreamTransformation是DataStream創(chuàng)建操作的描述信息剂府,每一個(gè)DataStream底層都有一個(gè)StreamTransformation拧揽,它是DataStream的原始信息。通過StreamTransformation就可以構(gòu)建一副整體的StreamGraph腺占。以O(shè)neInputTransformation為例淤袜,看下是如何進(jìn)行transform操作的。

    /**
     * Transforms a {@code OneInputTransformation}.
     *
     * <p>This recursively transforms the inputs, creates a new {@code StreamNode} in the graph and
     * wired the inputs to this new node.
     */
    private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {

        // 轉(zhuǎn)換當(dāng)前OneInputTransformation的輸入StreamTransformation
        Collection<Integer> inputIds = transform(transform.getInput());

        // the recursive call might have already transformed this
        if (alreadyTransformed.containsKey(transform)) {
            return alreadyTransformed.get(transform);
        }

        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);

        //添加 StreamGraph 節(jié)點(diǎn)
        streamGraph.addOperator(transform.getId(),
                slotSharingGroup,
                transform.getOperator(),
                transform.getInputType(),
                transform.getOutputType(),
                transform.getName());

        if (transform.getStateKeySelector() != null) {
            TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
            streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
        }

        streamGraph.setParallelism(transform.getId(), transform.getParallelism());
        streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

        // 添加 StreamGraph 邊
        for (Integer inputId: inputIds) {
            streamGraph.addEdge(inputId, transform.getId(), 0);
        }

        return Collections.singleton(transform.getId());
    }

邏輯很清晰衰伯,解析當(dāng)前OneInputTransformation的輸入StreamTransformation铡羡,根據(jù)OneInputTransformation的operator等信息構(gòu)建StreamNode,然后根據(jù)解析的輸入StreamTransformation的Id意鲸,構(gòu)建StreamEdge烦周。
在創(chuàng)建Stream,以及生成StreamGraph的過程中怎顾,涉及到較多的數(shù)據(jù)結(jié)構(gòu)以及層次關(guān)系读慎,以上述的WordCount示例中,通過text.flatMap(new Tokenizer())創(chuàng)建的流為例槐雾,具體的數(shù)據(jù)結(jié)構(gòu)和層次如下圖所示:

作業(yè)圖

作業(yè)圖(JobGraph)是唯一被Flink的數(shù)據(jù)流引擎所識(shí)別的表述作業(yè)的數(shù)據(jù)結(jié)構(gòu)夭委,也正是這一共同的抽象體現(xiàn)了流處理和批處理在運(yùn)行時(shí)的統(tǒng)一。

相比流圖(StreamGraph)以及批處理優(yōu)化計(jì)劃(OptimizedPlan)募强,JobGraph發(fā)生了一些變化株灸,已經(jīng)不完全是“靜態(tài)”的數(shù)據(jù)結(jié)構(gòu)了,因?yàn)樗尤肓酥虚g數(shù)據(jù)集(IntermediateDataSet)這一“動(dòng)態(tài)”概念擎值。

作業(yè)頂點(diǎn)(JobVertex)慌烧、中間數(shù)據(jù)集(IntermediateDataSet)、作業(yè)邊(JobEdge)是組成JobGraph的基本元素鸠儿。這三個(gè)對(duì)象彼此之間互為依賴:

  • 一個(gè)JobVertex關(guān)聯(lián)著若干個(gè)JobEdge作為輸入端以及若干個(gè)IntermediateDataSet作為其生產(chǎn)的結(jié)果集屹蚊;
  • 一個(gè)IntermediateDataSet關(guān)聯(lián)著一個(gè)JobVertex作為生產(chǎn)者以及若干個(gè)JobEdge作為消費(fèi)者;
  • 一個(gè)JobEdge關(guān)聯(lián)著一個(gè)IntermediateDataSet可認(rèn)為是源以及一個(gè)JobVertex可認(rèn)為是目標(biāo)消費(fèi)者捆交;

因此一個(gè)JobGraph可能的如下圖所示:


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末淑翼,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子品追,更是在濱河造成了極大的恐慌玄括,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件肉瓦,死亡現(xiàn)場(chǎng)離奇詭異遭京,居然都是意外死亡胃惜,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門哪雕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來船殉,“玉大人,你說我怎么就攤上這事斯嚎±妫” “怎么了?”我有些...
    開封第一講書人閱讀 165,345評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵堡僻,是天一觀的道長(zhǎng)糠惫。 經(jīng)常有香客問我,道長(zhǎng)钉疫,這世上最難降的妖魔是什么硼讽? 我笑而不...
    開封第一講書人閱讀 58,851評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮牲阁,結(jié)果婚禮上固阁,老公的妹妹穿的比我還像新娘。我一直安慰自己城菊,他們只是感情好备燃,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著役电,像睡著了一般赚爵。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上法瑟,一...
    開封第一講書人閱讀 51,688評(píng)論 1 305
  • 那天冀膝,我揣著相機(jī)與錄音,去河邊找鬼霎挟。 笑死窝剖,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的酥夭。 我是一名探鬼主播赐纱,決...
    沈念sama閱讀 40,414評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼熬北!你這毒婦竟也來了疙描?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,319評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤讶隐,失蹤者是張志新(化名)和其女友劉穎起胰,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體巫延,經(jīng)...
    沈念sama閱讀 45,775評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡效五,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年地消,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片畏妖。...
    茶點(diǎn)故事閱讀 40,096評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡脉执,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出戒劫,到底是詐尸還是另有隱情半夷,我是刑警寧澤,帶...
    沈念sama閱讀 35,789評(píng)論 5 346
  • 正文 年R本政府宣布谱仪,位于F島的核電站玻熙,受9級(jí)特大地震影響否彩,放射性物質(zhì)發(fā)生泄漏疯攒。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評(píng)論 3 331
  • 文/蒙蒙 一列荔、第九天 我趴在偏房一處隱蔽的房頂上張望敬尺。 院中可真熱鬧,春花似錦贴浙、人聲如沸砂吞。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蜻直。三九已至,卻和暖如春袁串,著一層夾襖步出監(jiān)牢的瞬間概而,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評(píng)論 1 271
  • 我被黑心中介騙來泰國(guó)打工囱修, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留赎瑰,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,308評(píng)論 3 372
  • 正文 我出身青樓破镰,卻偏偏與公主長(zhǎng)得像餐曼,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子鲜漩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容