Flink源碼閱讀(三)--- ExecutionGraph 的生成

本文內(nèi)容是基于Flink 1.9來講解。在執(zhí)行Flink任務(wù)的時候左腔,會涉及到三個Graph诈泼,分別是StreamGraph懂拾,JobGraph,ExecutionGraph铐达。其中StreamGraph和JobGraph是在client端生成的岖赋,ExecutionGraph是在JobMaster中執(zhí)行的。

  • StreamGraph是根據(jù)用戶代碼生成的最原始執(zhí)行圖瓮孙,也就是直接翻譯用戶邏輯得到的圖
  • JobGraph是對StreamGraph進行優(yōu)化唐断,比如設(shè)置哪些算子可以chain,減少網(wǎng)絡(luò)開銷
  • ExecutionGraph是用于作業(yè)調(diào)度的執(zhí)行圖杭抠,對JobGraph加了并行度的概念

本篇文章在Flink源碼閱讀(二)--- JobGraph 的生成 基礎(chǔ)上脸甘,介紹下ExecutionGraph的生成

1. ExecutionJobVertex

ExecutionJobVertex與JobGraph中的JobVertex一一對應(yīng),在graph中使用JobVertexID來唯一標識偏灿。
來看下ExecutionJobVertex類的成員變量

    private final ExecutionGraph graph;

    private final JobVertex jobVertex;

    /**
     * The IDs of all operators contained in this execution job vertex.
     *
     * <p>The ID's are stored depth-first post-order; for the forking chain below the ID's would be stored as [D, E, B, C, A].
     *  A - B - D
     *   \    \
     *    C    E
     * This is the same order that operators are stored in the {@code StreamTask}.
     */
    private final List<OperatorID> operatorIDs;

    /**
     * The alternative IDs of all operators contained in this execution job vertex.
     *
     * <p>The ID's are in the same order as {@link ExecutionJobVertex#operatorIDs}.
     */
    private final List<OperatorID> userDefinedOperatorIds;

    private final ExecutionVertex[] taskVertices;

    private final IntermediateResult[] producedDataSets;

    private final List<IntermediateResult> inputs;

    private final int parallelism;

    private final SlotSharingGroup slotSharingGroup;

    private final CoLocationGroup coLocationGroup;

    private final InputSplit[] inputSplits;

    private final boolean maxParallelismConfigured;

    private int maxParallelism;

從源碼可以看出丹诀,ExecutionJobVertex是由JobVertex轉(zhuǎn)換來的,會包含一個ExecutionVertex list和IntermediateResult list翁垂。

2. ExecutionVertex

對于每個ExecutionJobVertex铆遭,有多少并發(fā)就有多少個ExecutionVertex。ExecutionVertex代表subtask的單個并發(fā)沿猜,可以使用ExecutionJobVertex+subtask的并發(fā)index來表示枚荣。
來看下ExecutionVertex類的成員變量

    private final ExecutionJobVertex jobVertex;

    private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;

    private final ExecutionEdge[][] inputEdges;

    private final int subTaskIndex;

    private final ExecutionVertexID executionVertexId;

    private final EvictingBoundedList<ArchivedExecution> priorExecutions;

    private final Time timeout;

    /** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations. */
    private final String taskNameWithSubtask;

    private volatile CoLocationConstraint locationConstraint;

    /** The current or latest execution attempt of this vertex's task. */
    private volatile Execution currentExecution;    // this field must never be null

    private final ArrayList<InputSplit> inputSplits;

從源碼可以看出:

  • 一個ExecutionVertex會包含一個ExecutionJobVertex和subTaskIndex
  • 使用ExecutionVertexID唯一標識,
  • 一個ExecutionVertex會包含一個Execution
  • 一個ExecutionVertex會包含一個resultPartitions map

3. Execution

Execution是ExecutionVertex的一次執(zhí)行邢疙。對于一個ExecutionVertex可能需要執(zhí)行多次棍弄,那這種情況就會有多個Execution望薄,比如恢復,重新計算等呼畸。一個Execution使用一個ExecutionAttemptID唯一標識痕支。
接下來看下Execution類的成員變量

    /** The executor which is used to execute futures. */
    private final Executor executor;

    /** The execution vertex whose task this execution executes. */
    private final ExecutionVertex vertex;

    /** The unique ID marking the specific execution instant of the task. */
    private final ExecutionAttemptID attemptId;

    /** Gets the global modification version of the execution graph when this execution was created.
     * This version is bumped in the ExecutionGraph whenever a global failover happens. It is used
     * to resolve conflicts between concurrent modification by global and local failover actions. */
    private final long globalModVersion;

    /** The timestamps when state transitions occurred, indexed by {@link ExecutionState#ordinal()}. */
    private final long[] stateTimestamps;

    private final int attemptNumber;

4. IntermediateResult

JobGraph 使用 IntermediateDataSet 表示 JobVertex 的輸出,一個 JobVertex 可能有 0或者多個輸出蛮原。在 ExecutionGraph 中卧须,與此對應(yīng)的就是 IntermediateResult,使用IntermediateDataSetID來唯一標識儒陨。
接下來看下IntermediateResult類的成員變量

    private final IntermediateDataSetID id;

    private final ExecutionJobVertex producer;

    private final IntermediateResultPartition[] partitions;

    /**
     * Maps intermediate result partition IDs to a partition index. This is
     * used for ID lookups of intermediate results. I didn't dare to change the
     * partition connect logic in other places that is tightly coupled to the
     * partitions being held as an array.
     */
    private final HashMap<IntermediateResultPartitionID, Integer> partitionLookupHelper = new HashMap<>();

    // ExecutionJobVertex producer的并行度
    private final int numParallelProducers;

    private final AtomicInteger numberOfRunningProducers;

    private int partitionsAssigned;

    private int numConsumers;

    private final int connectionIndex;
  • 一個ExecutionJobVertex可能包含0或者多個IntermediateResult花嘶,每一個并行的子任務(wù)ExecutionVertex可能會包含0或者多個IntermediateResultPartition。

5. IntermediateResultPartition

IntermediateResult可以包含0或者多個IntermediateResultPartition蹦漠,IntermediateResultPartition是ExecutionVertex的一個輸出分區(qū)椭员。
接下來看下IntermediateResultPartition類的成員變量

    private final IntermediateResult totalResult;

    private final ExecutionVertex producer;

    private final int partitionNumber;

    private final IntermediateResultPartitionID partitionId;

    private List<List<ExecutionEdge>> consumers;

從源碼可以看出,IntermediateResultPartition的生產(chǎn)者是ExecutionVertex笛园,消費者是一個或若干個ExecutionEdge隘击。

6. ExecutionGraph生成

入口ExecutionGraphBuilder#buildGraph()方法
主要做的工作內(nèi)容:
6.1 構(gòu)建ExecutionGraph,主要是對jobInformation研铆,restartStrategy埋同,slotProvider等初始化。
6.2 JobVertex在Master上進行初始化

        // initialize the vertices that have a master initialization hook
        // file output formats create directories here, input formats create splits

        final long initMasterStart = System.nanoTime();
        log.info("Running initialization on master for job {} ({}).", jobName, jobId);

        for (JobVertex vertex : jobGraph.getVertices()) {
            String executableClass = vertex.getInvokableClassName();
            if (executableClass == null || executableClass.isEmpty()) {
                throw new JobSubmissionException(jobId,
                        "The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
            }

            try {
                vertex.initializeOnMaster(classLoader);
            }
            catch (Throwable t) {
                    throw new JobExecutionException(jobId,
                            "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
            }
        }

        log.info("Successfully ran initialization on master in {} ms.",
                (System.nanoTime() - initMasterStart) / 1_000_000);
  • 設(shè)置input format和input splits
  • 設(shè)置output format并且調(diào)用initializeGlobal方法棵红。
    關(guān)于initializeGlobal說明下面兩點:
    ?? - initializeGlobal是在分布式程序執(zhí)行之前凶赁,在JM中調(diào)用
    ?? - initializeGlobal方法通過指定的write mode在分布式文件系統(tǒng)中初始化ouput目錄
    6.3 把JobGraph中的所有JobVertex從source進行拓撲排序
        // topologically sort the job vertices and attach the graph to the existing one
        List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
        if (log.isDebugEnabled()) {
            log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
        }

6.4 將6.3排序之后的JobVertex list加到ExecutionGraph中

    public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {

        assertRunningInJobMasterMainThread();

        LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
                "vertices and {} intermediate results.",
            topologiallySorted.size(),
            tasks.size(),
            intermediateResults.size());

        final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
        final long createTimestamp = System.currentTimeMillis();

        for (JobVertex jobVertex : topologiallySorted) {

            if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
                this.isStoppable = false;
            }

            // create the execution job vertex and attach it to the graph
            ExecutionJobVertex ejv = new ExecutionJobVertex(
                    this,
                    jobVertex,
                    1,
                    maxPriorAttemptsHistoryLength,
                    rpcTimeout,
                    globalModVersion,
                    createTimestamp);

            ejv.connectToPredecessors(this.intermediateResults);

            ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
            if (previousTask != null) {
                throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
                    jobVertex.getID(), ejv, previousTask));
            }

            for (IntermediateResult res : ejv.getProducedDataSets()) {
                IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
                if (previousDataSet != null) {
                    throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
                        res.getId(), res, previousDataSet));
                }
            }

            this.verticesInCreationOrder.add(ejv);
            this.numVerticesTotal += ejv.getParallelism();
            newExecJobVertices.add(ejv);
        }

        failoverStrategy.notifyNewVertices(newExecJobVertices);

        schedulingTopology = new ExecutionGraphToSchedulingTopologyAdapter(this);
        partitionReleaseStrategy = partitionReleaseStrategyFactory.createInstance(
            schedulingTopology,
            new DefaultFailoverTopology(this));
    }

主要做了哪些工作:

  • 每個JobVertex生成一個對應(yīng)的ExecutionJobVertex
  • 將生成的ExecutionJobVertex加入tasks map中,map key就是JobVertexID
  • 把ExecutionJobVertex節(jié)點生成的結(jié)果IntermediateResult加入intermediateResults map中逆甜,map key是IntermediateDataSetID
  • ExecutionGraph有個成員變量numVerticesTotal虱肄,這個變量會把所有ExecutionJobVertex的并發(fā)度都算進去,也就是后面申請資源的時候忆绰,slot的個數(shù)

6.5 設(shè)置checkpoint

  • 為job創(chuàng)建一個CompletedCheckpointStore實例浩峡,主要工作是設(shè)置checkpoint路徑
  • 為job創(chuàng)建一個CheckpointIDCounter實例,主要是設(shè)置checkpointIdCounterPath
  • 創(chuàng)建CheckpointFailureManager错敢,當checkpoint失敗時增加callback翰灾,比如fail job
  • 創(chuàng)建CheckpointCoordinator實例,可以看下對象的成員變量
    /** The job whose checkpoint this coordinator coordinates. */
    private final JobID job;

    /** Default checkpoint properties. **/
    private final CheckpointProperties checkpointProperties;

    /** The executor used for asynchronous calls, like potentially blocking I/O. */
    private final Executor executor;

    /** Tasks who need to be sent a message when a checkpoint is started. */
    private final ExecutionVertex[] tasksToTrigger;

    /** Tasks who need to acknowledge a checkpoint before it succeeds. */
    private final ExecutionVertex[] tasksToWaitFor;

    /** Tasks who need to be sent a message when a checkpoint is confirmed. */
    private final ExecutionVertex[] tasksToCommitTo;

    /** Map from checkpoint ID to the pending checkpoint. */
    private final Map<Long, PendingCheckpoint> pendingCheckpoints;

    /** Completed checkpoints. Implementations can be blocking. Make sure calls to methods
     * accessing this don't block the job manager actor and run asynchronously. */
    private final CompletedCheckpointStore completedCheckpointStore;

    /** The root checkpoint state backend, which is responsible for initializing the
     * checkpoint, storing the metadata, and cleaning up the checkpoint. */
    private final CheckpointStorageCoordinatorView checkpointStorage;

    /** A list of recent checkpoint IDs, to identify late messages (vs invalid ones). */
    private final ArrayDeque<Long> recentPendingCheckpoints;

    /** Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these
     * need to be ascending across job managers. */
    private final CheckpointIDCounter checkpointIdCounter;

    /** The base checkpoint interval. Actual trigger time may be affected by the
     * max concurrent checkpoints and minimum-pause values */
    private final long baseInterval;

    /** The max time (in ms) that a checkpoint may take. */
    private final long checkpointTimeout;

    /** The min time(in ns) to delay after a checkpoint could be triggered. Allows to
     * enforce minimum processing time between checkpoint attempts */
    private final long minPauseBetweenCheckpointsNanos;

    /** The maximum number of checkpoints that may be in progress at the same time. */
    private final int maxConcurrentCheckpointAttempts;

    /** The timer that handles the checkpoint timeouts and triggers periodic checkpoints. */
    private final ScheduledThreadPoolExecutor timer;

    /** The master checkpoint hooks executed by this checkpoint coordinator. */
    private final HashMap<String, MasterTriggerRestoreHook<?>> masterHooks;

    /** Actor that receives status updates from the execution graph this coordinator works for. */
    private JobStatusListener jobStatusListener;

    /** The number of consecutive failed trigger attempts. */
    private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);

    /** A handle to the current periodic trigger, to cancel it when necessary. */
    private ScheduledFuture<?> currentPeriodicTrigger;

    /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed. */
    private long lastCheckpointCompletionNanos;

    /** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
     * Non-volatile, because only accessed in synchronized scope */
    private boolean periodicScheduling;

    /** Flag whether a trigger request could not be handled immediately. Non-volatile, because only
     * accessed in synchronized scope */
    private boolean triggerRequestQueued;

    /** Flag marking the coordinator as shut down (not accepting any messages any more). */
    private volatile boolean shutdown;

    /** Optional tracker for checkpoint statistics. */
    @Nullable
    private CheckpointStatsTracker statsTracker;

    /** A factory for SharedStateRegistry objects. */
    private final SharedStateRegistryFactory sharedStateRegistryFactory;

    /** Registry that tracks state which is shared across (incremental) checkpoints. */
    private SharedStateRegistry sharedStateRegistry;

    private boolean isPreferCheckpointForRecovery;

    private final CheckpointFailureManager failureManager;

  • 里面包含了checkpoint 制作間隔稚茅,job所有的tasks以及checkpointIdCounter等信息
  • checkpointIdCounter開始計數(shù)纸淮,checkpointIDCounter.start();

6.6 為ExecutionGraph增加metric

小結(jié)

ExecutionGraph是分布式執(zhí)行job時最核心的數(shù)據(jù)結(jié)構(gòu),在JobGraph的基礎(chǔ)上增加了并發(fā)度的概念亚享。
ExecutionGraph涉及到節(jié)點總結(jié)如下:

  • JobGraph 的 JobVertex 與 ExecutionGraph 的 ExecutionJobVertex 一一對應(yīng)咽块。
  • 每個ExecutionJobVertex 有 parallelism 個 ExecutionVertex。
  • 每個JobVertex 可能有 n(n>=0) 個 IntermediateDataSet欺税,在 ExecutionJobVertex 中侈沪,一個 IntermediateDataSet 對應(yīng)一個 IntermediateResult揭璃,每一個 IntermediateResult 都有 parallelism 個生產(chǎn)者, 對應(yīng) parallelism 個 IntermediateResultPartition
  • 每個ExecutionJobVertex 都會和前面的 IntermediateResult 連接,ExecutionVertex 和 IntermediateResult 建立連接生成 ExecutionEdge亭罪。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末瘦馍,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子应役,更是在濱河造成了極大的恐慌情组,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,539評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件箩祥,死亡現(xiàn)場離奇詭異院崇,居然都是意外死亡,警方通過查閱死者的電腦和手機袍祖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評論 3 396
  • 文/潘曉璐 我一進店門底瓣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蕉陋,你說我怎么就攤上這事濒持。” “怎么了寺滚?”我有些...
    開封第一講書人閱讀 165,871評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長屈雄。 經(jīng)常有香客問我村视,道長,這世上最難降的妖魔是什么酒奶? 我笑而不...
    開封第一講書人閱讀 58,963評論 1 295
  • 正文 為了忘掉前任蚁孔,我火速辦了婚禮,結(jié)果婚禮上惋嚎,老公的妹妹穿的比我還像新娘杠氢。我一直安慰自己,他們只是感情好另伍,可當我...
    茶點故事閱讀 67,984評論 6 393
  • 文/花漫 我一把揭開白布鼻百。 她就那樣靜靜地躺著,像睡著了一般摆尝。 火紅的嫁衣襯著肌膚如雪温艇。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,763評論 1 307
  • 那天堕汞,我揣著相機與錄音勺爱,去河邊找鬼。 笑死讯检,一個胖子當著我的面吹牛琐鲁,可吹牛的內(nèi)容都是我干的进鸠。 我是一名探鬼主播,決...
    沈念sama閱讀 40,468評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼鳖擒,長吁一口氣:“原來是場噩夢啊……” “哼台腥!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起蒜撮,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤暴构,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后段磨,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體取逾,經(jīng)...
    沈念sama閱讀 45,850評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,002評論 3 338
  • 正文 我和宋清朗相戀三年苹支,在試婚紗的時候發(fā)現(xiàn)自己被綠了砾隅。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,144評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡债蜜,死狀恐怖晴埂,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情寻定,我是刑警寧澤儒洛,帶...
    沈念sama閱讀 35,823評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站狼速,受9級特大地震影響琅锻,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜向胡,卻給世界環(huán)境...
    茶點故事閱讀 41,483評論 3 331
  • 文/蒙蒙 一恼蓬、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧僵芹,春花似錦处硬、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至件豌,卻和暖如春桐腌,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背苟径。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評論 1 272
  • 我被黑心中介騙來泰國打工案站, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 48,415評論 3 373
  • 正文 我出身青樓蟆盐,卻偏偏與公主長得像承边,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子石挂,可洞房花燭夜當晚...
    茶點故事閱讀 45,092評論 2 355

推薦閱讀更多精彩內(nèi)容