Flink源碼閱讀(三)--- ExecutionGraph 的生成

本文內(nèi)容是基于Flink 1.9來講解。在執(zhí)行Flink任務(wù)的時候左腔,會涉及到三個Graph诈泼,分別是StreamGraph懂拾,JobGraph,ExecutionGraph铐达。其中StreamGraph和JobGraph是在client端生成的岖赋,ExecutionGraph是在JobMaster中執(zhí)行的。

  • StreamGraph是根據(jù)用戶代碼生成的最原始執(zhí)行圖瓮孙,也就是直接翻譯用戶邏輯得到的圖
  • JobGraph是對StreamGraph進行優(yōu)化唐断,比如設(shè)置哪些算子可以chain,減少網(wǎng)絡(luò)開銷
  • ExecutionGraph是用于作業(yè)調(diào)度的執(zhí)行圖杭抠,對JobGraph加了并行度的概念

本篇文章在Flink源碼閱讀(二)--- JobGraph 的生成 基礎(chǔ)上脸甘,介紹下ExecutionGraph的生成

1. ExecutionJobVertex


    private final ExecutionGraph graph;

    private final JobVertex jobVertex;

     * The IDs of all operators contained in this execution job vertex.
     * <p>The ID's are stored depth-first post-order; for the forking chain below the ID's would be stored as [D, E, B, C, A].
     *  A - B - D
     *   \    \
     *    C    E
     * This is the same order that operators are stored in the {@code StreamTask}.
    private final List<OperatorID> operatorIDs;

     * The alternative IDs of all operators contained in this execution job vertex.
     * <p>The ID's are in the same order as {@link ExecutionJobVertex#operatorIDs}.
    private final List<OperatorID> userDefinedOperatorIds;

    private final ExecutionVertex[] taskVertices;

    private final IntermediateResult[] producedDataSets;

    private final List<IntermediateResult> inputs;

    private final int parallelism;

    private final SlotSharingGroup slotSharingGroup;

    private final CoLocationGroup coLocationGroup;

    private final InputSplit[] inputSplits;

    private final boolean maxParallelismConfigured;

    private int maxParallelism;

從源碼可以看出丹诀,ExecutionJobVertex是由JobVertex轉(zhuǎn)換來的,會包含一個ExecutionVertex list和IntermediateResult list翁垂。

2. ExecutionVertex


    private final ExecutionJobVertex jobVertex;

    private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;

    private final ExecutionEdge[][] inputEdges;

    private final int subTaskIndex;

    private final ExecutionVertexID executionVertexId;

    private final EvictingBoundedList<ArchivedExecution> priorExecutions;

    private final Time timeout;

    /** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations. */
    private final String taskNameWithSubtask;

    private volatile CoLocationConstraint locationConstraint;

    /** The current or latest execution attempt of this vertex's task. */
    private volatile Execution currentExecution;    // this field must never be null

    private final ArrayList<InputSplit> inputSplits;


  • 一個ExecutionVertex會包含一個ExecutionJobVertex和subTaskIndex
  • 使用ExecutionVertexID唯一標識,
  • 一個ExecutionVertex會包含一個Execution
  • 一個ExecutionVertex會包含一個resultPartitions map

3. Execution


    /** The executor which is used to execute futures. */
    private final Executor executor;

    /** The execution vertex whose task this execution executes. */
    private final ExecutionVertex vertex;

    /** The unique ID marking the specific execution instant of the task. */
    private final ExecutionAttemptID attemptId;

    /** Gets the global modification version of the execution graph when this execution was created.
     * This version is bumped in the ExecutionGraph whenever a global failover happens. It is used
     * to resolve conflicts between concurrent modification by global and local failover actions. */
    private final long globalModVersion;

    /** The timestamps when state transitions occurred, indexed by {@link ExecutionState#ordinal()}. */
    private final long[] stateTimestamps;

    private final int attemptNumber;

4. IntermediateResult

JobGraph 使用 IntermediateDataSet 表示 JobVertex 的輸出,一個 JobVertex 可能有 0或者多個輸出蛮原。在 ExecutionGraph 中卧须,與此對應(yīng)的就是 IntermediateResult,使用IntermediateDataSetID來唯一標識儒陨。

    private final IntermediateDataSetID id;

    private final ExecutionJobVertex producer;

    private final IntermediateResultPartition[] partitions;

     * Maps intermediate result partition IDs to a partition index. This is
     * used for ID lookups of intermediate results. I didn't dare to change the
     * partition connect logic in other places that is tightly coupled to the
     * partitions being held as an array.
    private final HashMap<IntermediateResultPartitionID, Integer> partitionLookupHelper = new HashMap<>();

    // ExecutionJobVertex producer的并行度
    private final int numParallelProducers;

    private final AtomicInteger numberOfRunningProducers;

    private int partitionsAssigned;

    private int numConsumers;

    private final int connectionIndex;
  • 一個ExecutionJobVertex可能包含0或者多個IntermediateResult花嘶,每一個并行的子任務(wù)ExecutionVertex可能會包含0或者多個IntermediateResultPartition。

5. IntermediateResultPartition


    private final IntermediateResult totalResult;

    private final ExecutionVertex producer;

    private final int partitionNumber;

    private final IntermediateResultPartitionID partitionId;

    private List<List<ExecutionEdge>> consumers;


6. ExecutionGraph生成

6.1 構(gòu)建ExecutionGraph,主要是對jobInformation研铆,restartStrategy埋同,slotProvider等初始化。
6.2 JobVertex在Master上進行初始化

        // initialize the vertices that have a master initialization hook
        // file output formats create directories here, input formats create splits

        final long initMasterStart = System.nanoTime();
        log.info("Running initialization on master for job {} ({}).", jobName, jobId);

        for (JobVertex vertex : jobGraph.getVertices()) {
            String executableClass = vertex.getInvokableClassName();
            if (executableClass == null || executableClass.isEmpty()) {
                throw new JobSubmissionException(jobId,
                        "The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");

            try {
            catch (Throwable t) {
                    throw new JobExecutionException(jobId,
                            "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);

        log.info("Successfully ran initialization on master in {} ms.",
                (System.nanoTime() - initMasterStart) / 1_000_000);
  • 設(shè)置input format和input splits
  • 設(shè)置output format并且調(diào)用initializeGlobal方法棵红。
    ?? - initializeGlobal是在分布式程序執(zhí)行之前凶赁,在JM中調(diào)用
    ?? - initializeGlobal方法通過指定的write mode在分布式文件系統(tǒng)中初始化ouput目錄
    6.3 把JobGraph中的所有JobVertex從source進行拓撲排序
        // topologically sort the job vertices and attach the graph to the existing one
        List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
        if (log.isDebugEnabled()) {
            log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);

6.4 將6.3排序之后的JobVertex list加到ExecutionGraph中

    public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {


        LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
                "vertices and {} intermediate results.",

        final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
        final long createTimestamp = System.currentTimeMillis();

        for (JobVertex jobVertex : topologiallySorted) {

            if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
                this.isStoppable = false;

            // create the execution job vertex and attach it to the graph
            ExecutionJobVertex ejv = new ExecutionJobVertex(


            ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
            if (previousTask != null) {
                throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
                    jobVertex.getID(), ejv, previousTask));

            for (IntermediateResult res : ejv.getProducedDataSets()) {
                IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
                if (previousDataSet != null) {
                    throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
                        res.getId(), res, previousDataSet));

            this.numVerticesTotal += ejv.getParallelism();


        schedulingTopology = new ExecutionGraphToSchedulingTopologyAdapter(this);
        partitionReleaseStrategy = partitionReleaseStrategyFactory.createInstance(
            new DefaultFailoverTopology(this));


  • 每個JobVertex生成一個對應(yīng)的ExecutionJobVertex
  • 將生成的ExecutionJobVertex加入tasks map中,map key就是JobVertexID
  • 把ExecutionJobVertex節(jié)點生成的結(jié)果IntermediateResult加入intermediateResults map中逆甜,map key是IntermediateDataSetID
  • ExecutionGraph有個成員變量numVerticesTotal虱肄,這個變量會把所有ExecutionJobVertex的并發(fā)度都算進去,也就是后面申請資源的時候忆绰,slot的個數(shù)

6.5 設(shè)置checkpoint

  • 為job創(chuàng)建一個CompletedCheckpointStore實例浩峡,主要工作是設(shè)置checkpoint路徑
  • 為job創(chuàng)建一個CheckpointIDCounter實例,主要是設(shè)置checkpointIdCounterPath
  • 創(chuàng)建CheckpointFailureManager错敢,當checkpoint失敗時增加callback翰灾,比如fail job
  • 創(chuàng)建CheckpointCoordinator實例,可以看下對象的成員變量
    /** The job whose checkpoint this coordinator coordinates. */
    private final JobID job;

    /** Default checkpoint properties. **/
    private final CheckpointProperties checkpointProperties;

    /** The executor used for asynchronous calls, like potentially blocking I/O. */
    private final Executor executor;

    /** Tasks who need to be sent a message when a checkpoint is started. */
    private final ExecutionVertex[] tasksToTrigger;

    /** Tasks who need to acknowledge a checkpoint before it succeeds. */
    private final ExecutionVertex[] tasksToWaitFor;

    /** Tasks who need to be sent a message when a checkpoint is confirmed. */
    private final ExecutionVertex[] tasksToCommitTo;

    /** Map from checkpoint ID to the pending checkpoint. */
    private final Map<Long, PendingCheckpoint> pendingCheckpoints;

    /** Completed checkpoints. Implementations can be blocking. Make sure calls to methods
     * accessing this don't block the job manager actor and run asynchronously. */
    private final CompletedCheckpointStore completedCheckpointStore;

    /** The root checkpoint state backend, which is responsible for initializing the
     * checkpoint, storing the metadata, and cleaning up the checkpoint. */
    private final CheckpointStorageCoordinatorView checkpointStorage;

    /** A list of recent checkpoint IDs, to identify late messages (vs invalid ones). */
    private final ArrayDeque<Long> recentPendingCheckpoints;

    /** Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these
     * need to be ascending across job managers. */
    private final CheckpointIDCounter checkpointIdCounter;

    /** The base checkpoint interval. Actual trigger time may be affected by the
     * max concurrent checkpoints and minimum-pause values */
    private final long baseInterval;

    /** The max time (in ms) that a checkpoint may take. */
    private final long checkpointTimeout;

    /** The min time(in ns) to delay after a checkpoint could be triggered. Allows to
     * enforce minimum processing time between checkpoint attempts */
    private final long minPauseBetweenCheckpointsNanos;

    /** The maximum number of checkpoints that may be in progress at the same time. */
    private final int maxConcurrentCheckpointAttempts;

    /** The timer that handles the checkpoint timeouts and triggers periodic checkpoints. */
    private final ScheduledThreadPoolExecutor timer;

    /** The master checkpoint hooks executed by this checkpoint coordinator. */
    private final HashMap<String, MasterTriggerRestoreHook<?>> masterHooks;

    /** Actor that receives status updates from the execution graph this coordinator works for. */
    private JobStatusListener jobStatusListener;

    /** The number of consecutive failed trigger attempts. */
    private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);

    /** A handle to the current periodic trigger, to cancel it when necessary. */
    private ScheduledFuture<?> currentPeriodicTrigger;

    /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed. */
    private long lastCheckpointCompletionNanos;

    /** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
     * Non-volatile, because only accessed in synchronized scope */
    private boolean periodicScheduling;

    /** Flag whether a trigger request could not be handled immediately. Non-volatile, because only
     * accessed in synchronized scope */
    private boolean triggerRequestQueued;

    /** Flag marking the coordinator as shut down (not accepting any messages any more). */
    private volatile boolean shutdown;

    /** Optional tracker for checkpoint statistics. */
    private CheckpointStatsTracker statsTracker;

    /** A factory for SharedStateRegistry objects. */
    private final SharedStateRegistryFactory sharedStateRegistryFactory;

    /** Registry that tracks state which is shared across (incremental) checkpoints. */
    private SharedStateRegistry sharedStateRegistry;

    private boolean isPreferCheckpointForRecovery;

    private final CheckpointFailureManager failureManager;

  • 里面包含了checkpoint 制作間隔稚茅,job所有的tasks以及checkpointIdCounter等信息
  • checkpointIdCounter開始計數(shù)纸淮,checkpointIDCounter.start();

6.6 為ExecutionGraph增加metric



  • JobGraph 的 JobVertex 與 ExecutionGraph 的 ExecutionJobVertex 一一對應(yīng)咽块。
  • 每個ExecutionJobVertex 有 parallelism 個 ExecutionVertex。
  • 每個JobVertex 可能有 n(n>=0) 個 IntermediateDataSet欺税,在 ExecutionJobVertex 中侈沪,一個 IntermediateDataSet 對應(yīng)一個 IntermediateResult揭璃,每一個 IntermediateResult 都有 parallelism 個生產(chǎn)者, 對應(yīng) parallelism 個 IntermediateResultPartition
  • 每個ExecutionJobVertex 都會和前面的 IntermediateResult 連接,ExecutionVertex 和 IntermediateResult 建立連接生成 ExecutionEdge亭罪。
