本文內(nèi)容是基于Flink 1.9來講解。在執(zhí)行Flink任務(wù)的時候左腔,會涉及到三個Graph诈泼,分別是StreamGraph懂拾,JobGraph,ExecutionGraph铐达。其中StreamGraph和JobGraph是在client端生成的岖赋,ExecutionGraph是在JobMaster中執(zhí)行的。
- StreamGraph是根據(jù)用戶代碼生成的最原始執(zhí)行圖瓮孙,也就是直接翻譯用戶邏輯得到的圖
- JobGraph是對StreamGraph進行優(yōu)化唐断,比如設(shè)置哪些算子可以chain,減少網(wǎng)絡(luò)開銷
- ExecutionGraph是用于作業(yè)調(diào)度的執(zhí)行圖杭抠,對JobGraph加了并行度的概念
本篇文章在Flink源碼閱讀(二)--- JobGraph 的生成 基礎(chǔ)上脸甘,介紹下ExecutionGraph的生成
1. ExecutionJobVertex
ExecutionJobVertex與JobGraph中的JobVertex一一對應(yīng),在graph中使用JobVertexID來唯一標識偏灿。
來看下ExecutionJobVertex類的成員變量
private final ExecutionGraph graph;
private final JobVertex jobVertex;
/**
* The IDs of all operators contained in this execution job vertex.
*
* <p>The ID's are stored depth-first post-order; for the forking chain below the ID's would be stored as [D, E, B, C, A].
* A - B - D
* \ \
* C E
* This is the same order that operators are stored in the {@code StreamTask}.
*/
private final List<OperatorID> operatorIDs;
/**
* The alternative IDs of all operators contained in this execution job vertex.
*
* <p>The ID's are in the same order as {@link ExecutionJobVertex#operatorIDs}.
*/
private final List<OperatorID> userDefinedOperatorIds;
private final ExecutionVertex[] taskVertices;
private final IntermediateResult[] producedDataSets;
private final List<IntermediateResult> inputs;
private final int parallelism;
private final SlotSharingGroup slotSharingGroup;
private final CoLocationGroup coLocationGroup;
private final InputSplit[] inputSplits;
private final boolean maxParallelismConfigured;
private int maxParallelism;
從源碼可以看出丹诀,ExecutionJobVertex是由JobVertex轉(zhuǎn)換來的,會包含一個ExecutionVertex list和IntermediateResult list翁垂。
2. ExecutionVertex
對于每個ExecutionJobVertex铆遭,有多少并發(fā)就有多少個ExecutionVertex。ExecutionVertex代表subtask的單個并發(fā)沿猜,可以使用ExecutionJobVertex+subtask的并發(fā)index來表示枚荣。
來看下ExecutionVertex類的成員變量
private final ExecutionJobVertex jobVertex;
private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;
private final ExecutionEdge[][] inputEdges;
private final int subTaskIndex;
private final ExecutionVertexID executionVertexId;
private final EvictingBoundedList<ArchivedExecution> priorExecutions;
private final Time timeout;
/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations. */
private final String taskNameWithSubtask;
private volatile CoLocationConstraint locationConstraint;
/** The current or latest execution attempt of this vertex's task. */
private volatile Execution currentExecution; // this field must never be null
private final ArrayList<InputSplit> inputSplits;
從源碼可以看出:
- 一個ExecutionVertex會包含一個ExecutionJobVertex和subTaskIndex
- 使用ExecutionVertexID唯一標識,
- 一個ExecutionVertex會包含一個Execution
- 一個ExecutionVertex會包含一個resultPartitions map
3. Execution
Execution是ExecutionVertex的一次執(zhí)行邢疙。對于一個ExecutionVertex可能需要執(zhí)行多次棍弄,那這種情況就會有多個Execution望薄,比如恢復,重新計算等呼畸。一個Execution使用一個ExecutionAttemptID唯一標識痕支。
接下來看下Execution類的成員變量
/** The executor which is used to execute futures. */
private final Executor executor;
/** The execution vertex whose task this execution executes. */
private final ExecutionVertex vertex;
/** The unique ID marking the specific execution instant of the task. */
private final ExecutionAttemptID attemptId;
/** Gets the global modification version of the execution graph when this execution was created.
* This version is bumped in the ExecutionGraph whenever a global failover happens. It is used
* to resolve conflicts between concurrent modification by global and local failover actions. */
private final long globalModVersion;
/** The timestamps when state transitions occurred, indexed by {@link ExecutionState#ordinal()}. */
private final long[] stateTimestamps;
private final int attemptNumber;
4. IntermediateResult
JobGraph 使用 IntermediateDataSet 表示 JobVertex 的輸出,一個 JobVertex 可能有 0或者多個輸出蛮原。在 ExecutionGraph 中卧须,與此對應(yīng)的就是 IntermediateResult,使用IntermediateDataSetID來唯一標識儒陨。
接下來看下IntermediateResult類的成員變量
private final IntermediateDataSetID id;
private final ExecutionJobVertex producer;
private final IntermediateResultPartition[] partitions;
/**
* Maps intermediate result partition IDs to a partition index. This is
* used for ID lookups of intermediate results. I didn't dare to change the
* partition connect logic in other places that is tightly coupled to the
* partitions being held as an array.
*/
private final HashMap<IntermediateResultPartitionID, Integer> partitionLookupHelper = new HashMap<>();
// ExecutionJobVertex producer的并行度
private final int numParallelProducers;
private final AtomicInteger numberOfRunningProducers;
private int partitionsAssigned;
private int numConsumers;
private final int connectionIndex;
- 一個ExecutionJobVertex可能包含0或者多個IntermediateResult花嘶,每一個并行的子任務(wù)ExecutionVertex可能會包含0或者多個IntermediateResultPartition。
5. IntermediateResultPartition
IntermediateResult可以包含0或者多個IntermediateResultPartition蹦漠,IntermediateResultPartition是ExecutionVertex的一個輸出分區(qū)椭员。
接下來看下IntermediateResultPartition類的成員變量
private final IntermediateResult totalResult;
private final ExecutionVertex producer;
private final int partitionNumber;
private final IntermediateResultPartitionID partitionId;
private List<List<ExecutionEdge>> consumers;
從源碼可以看出,IntermediateResultPartition的生產(chǎn)者是ExecutionVertex笛园,消費者是一個或若干個ExecutionEdge隘击。
6. ExecutionGraph生成
入口ExecutionGraphBuilder#buildGraph()方法
主要做的工作內(nèi)容:
6.1 構(gòu)建ExecutionGraph,主要是對jobInformation研铆,restartStrategy埋同,slotProvider等初始化。
6.2 JobVertex在Master上進行初始化
// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits
final long initMasterStart = System.nanoTime();
log.info("Running initialization on master for job {} ({}).", jobName, jobId);
for (JobVertex vertex : jobGraph.getVertices()) {
String executableClass = vertex.getInvokableClassName();
if (executableClass == null || executableClass.isEmpty()) {
throw new JobSubmissionException(jobId,
"The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
}
try {
vertex.initializeOnMaster(classLoader);
}
catch (Throwable t) {
throw new JobExecutionException(jobId,
"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
}
}
log.info("Successfully ran initialization on master in {} ms.",
(System.nanoTime() - initMasterStart) / 1_000_000);
- 設(shè)置input format和input splits
- 設(shè)置output format并且調(diào)用initializeGlobal方法棵红。
關(guān)于initializeGlobal說明下面兩點:
?? - initializeGlobal是在分布式程序執(zhí)行之前凶赁,在JM中調(diào)用
?? - initializeGlobal方法通過指定的write mode在分布式文件系統(tǒng)中初始化ouput目錄
6.3 把JobGraph中的所有JobVertex從source進行拓撲排序
// topologically sort the job vertices and attach the graph to the existing one
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
if (log.isDebugEnabled()) {
log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
}
6.4 將6.3排序之后的JobVertex list加到ExecutionGraph中
public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
assertRunningInJobMasterMainThread();
LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
"vertices and {} intermediate results.",
topologiallySorted.size(),
tasks.size(),
intermediateResults.size());
final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
final long createTimestamp = System.currentTimeMillis();
for (JobVertex jobVertex : topologiallySorted) {
if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
this.isStoppable = false;
}
// create the execution job vertex and attach it to the graph
ExecutionJobVertex ejv = new ExecutionJobVertex(
this,
jobVertex,
1,
maxPriorAttemptsHistoryLength,
rpcTimeout,
globalModVersion,
createTimestamp);
ejv.connectToPredecessors(this.intermediateResults);
ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
if (previousTask != null) {
throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
jobVertex.getID(), ejv, previousTask));
}
for (IntermediateResult res : ejv.getProducedDataSets()) {
IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
if (previousDataSet != null) {
throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
res.getId(), res, previousDataSet));
}
}
this.verticesInCreationOrder.add(ejv);
this.numVerticesTotal += ejv.getParallelism();
newExecJobVertices.add(ejv);
}
failoverStrategy.notifyNewVertices(newExecJobVertices);
schedulingTopology = new ExecutionGraphToSchedulingTopologyAdapter(this);
partitionReleaseStrategy = partitionReleaseStrategyFactory.createInstance(
schedulingTopology,
new DefaultFailoverTopology(this));
}
主要做了哪些工作:
- 每個JobVertex生成一個對應(yīng)的ExecutionJobVertex
- 將生成的ExecutionJobVertex加入tasks map中,map key就是JobVertexID
- 把ExecutionJobVertex節(jié)點生成的結(jié)果IntermediateResult加入intermediateResults map中逆甜,map key是IntermediateDataSetID
- ExecutionGraph有個成員變量numVerticesTotal虱肄,這個變量會把所有ExecutionJobVertex的并發(fā)度都算進去,也就是后面申請資源的時候忆绰,slot的個數(shù)
6.5 設(shè)置checkpoint
- 為job創(chuàng)建一個CompletedCheckpointStore實例浩峡,主要工作是設(shè)置checkpoint路徑
- 為job創(chuàng)建一個CheckpointIDCounter實例,主要是設(shè)置checkpointIdCounterPath
- 創(chuàng)建CheckpointFailureManager错敢,當checkpoint失敗時增加callback翰灾,比如fail job
- 創(chuàng)建CheckpointCoordinator實例,可以看下對象的成員變量
/** The job whose checkpoint this coordinator coordinates. */
private final JobID job;
/** Default checkpoint properties. **/
private final CheckpointProperties checkpointProperties;
/** The executor used for asynchronous calls, like potentially blocking I/O. */
private final Executor executor;
/** Tasks who need to be sent a message when a checkpoint is started. */
private final ExecutionVertex[] tasksToTrigger;
/** Tasks who need to acknowledge a checkpoint before it succeeds. */
private final ExecutionVertex[] tasksToWaitFor;
/** Tasks who need to be sent a message when a checkpoint is confirmed. */
private final ExecutionVertex[] tasksToCommitTo;
/** Map from checkpoint ID to the pending checkpoint. */
private final Map<Long, PendingCheckpoint> pendingCheckpoints;
/** Completed checkpoints. Implementations can be blocking. Make sure calls to methods
* accessing this don't block the job manager actor and run asynchronously. */
private final CompletedCheckpointStore completedCheckpointStore;
/** The root checkpoint state backend, which is responsible for initializing the
* checkpoint, storing the metadata, and cleaning up the checkpoint. */
private final CheckpointStorageCoordinatorView checkpointStorage;
/** A list of recent checkpoint IDs, to identify late messages (vs invalid ones). */
private final ArrayDeque<Long> recentPendingCheckpoints;
/** Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these
* need to be ascending across job managers. */
private final CheckpointIDCounter checkpointIdCounter;
/** The base checkpoint interval. Actual trigger time may be affected by the
* max concurrent checkpoints and minimum-pause values */
private final long baseInterval;
/** The max time (in ms) that a checkpoint may take. */
private final long checkpointTimeout;
/** The min time(in ns) to delay after a checkpoint could be triggered. Allows to
* enforce minimum processing time between checkpoint attempts */
private final long minPauseBetweenCheckpointsNanos;
/** The maximum number of checkpoints that may be in progress at the same time. */
private final int maxConcurrentCheckpointAttempts;
/** The timer that handles the checkpoint timeouts and triggers periodic checkpoints. */
private final ScheduledThreadPoolExecutor timer;
/** The master checkpoint hooks executed by this checkpoint coordinator. */
private final HashMap<String, MasterTriggerRestoreHook<?>> masterHooks;
/** Actor that receives status updates from the execution graph this coordinator works for. */
private JobStatusListener jobStatusListener;
/** The number of consecutive failed trigger attempts. */
private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
/** A handle to the current periodic trigger, to cancel it when necessary. */
private ScheduledFuture<?> currentPeriodicTrigger;
/** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed. */
private long lastCheckpointCompletionNanos;
/** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
* Non-volatile, because only accessed in synchronized scope */
private boolean periodicScheduling;
/** Flag whether a trigger request could not be handled immediately. Non-volatile, because only
* accessed in synchronized scope */
private boolean triggerRequestQueued;
/** Flag marking the coordinator as shut down (not accepting any messages any more). */
private volatile boolean shutdown;
/** Optional tracker for checkpoint statistics. */
@Nullable
private CheckpointStatsTracker statsTracker;
/** A factory for SharedStateRegistry objects. */
private final SharedStateRegistryFactory sharedStateRegistryFactory;
/** Registry that tracks state which is shared across (incremental) checkpoints. */
private SharedStateRegistry sharedStateRegistry;
private boolean isPreferCheckpointForRecovery;
private final CheckpointFailureManager failureManager;
- 里面包含了checkpoint 制作間隔稚茅,job所有的tasks以及checkpointIdCounter等信息
- checkpointIdCounter開始計數(shù)纸淮,checkpointIDCounter.start();
6.6 為ExecutionGraph增加metric
小結(jié)
ExecutionGraph是分布式執(zhí)行job時最核心的數(shù)據(jù)結(jié)構(gòu),在JobGraph的基礎(chǔ)上增加了并發(fā)度的概念亚享。
ExecutionGraph涉及到節(jié)點總結(jié)如下:
- JobGraph 的 JobVertex 與 ExecutionGraph 的 ExecutionJobVertex 一一對應(yīng)咽块。
- 每個ExecutionJobVertex 有 parallelism 個 ExecutionVertex。
- 每個JobVertex 可能有 n(n>=0) 個 IntermediateDataSet欺税,在 ExecutionJobVertex 中侈沪,一個 IntermediateDataSet 對應(yīng)一個 IntermediateResult揭璃,每一個 IntermediateResult 都有 parallelism 個生產(chǎn)者, 對應(yīng) parallelism 個 IntermediateResultPartition
- 每個ExecutionJobVertex 都會和前面的 IntermediateResult 連接,ExecutionVertex 和 IntermediateResult 建立連接生成 ExecutionEdge亭罪。