Flink源碼分析系列文檔目錄
請點擊:Flink 源碼分析系列文檔目錄
前言
本篇我們一起分析下Flink中流處理作業(yè)的初始化和執(zhí)行邏輯淤刃。
AbstractInvokable
AbstractInvokable
是TaskManager
中運行的所有任務的父類晒他。所有的讀取上游數(shù)據(jù),用戶數(shù)據(jù)處理邏輯(map逸贾,filter算子以及用戶自己編寫的processFunction等等)和發(fā)送處理過的數(shù)據(jù)到下游相關邏輯都在該類的invoke
方法中得到執(zhí)行陨仅。
AbstractInvokable
中與任務執(zhí)行相關的2個方法為:
- invoke方法:啟動任務執(zhí)行的入口方法。實現(xiàn)類必須重寫這個方法耕陷。
- cancel方法:任務被取消或者是用戶終止任務的時候被調用
它有兩個實現(xiàn)類:
- BatchTask:所有批處理類型Task的基類掂名。
- StreamTask:所有流處理類型Task的基類。
我們以流處理為重點哟沫,下面詳細介紹下StreamTask
這個類饺蔑。
AbstractInvokable的創(chuàng)建
在開始分析StreamTask
之前我們需要了解下它是在何處,如何被創(chuàng)建出來的嗜诀。
翻閱Task線程的處理邏輯猾警,不難發(fā)現(xiàn)它的invoke
變量初始化位于Task
的doRun
方法。
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
這一行代碼使用用戶代碼類加載器(userCodeClassLoader)隆敢,調用目標類唯一參數(shù)為Environment
類型的構造方法发皿,創(chuàng)建出invokable對象。
private static AbstractInvokable loadAndInstantiateInvokable(
ClassLoader classLoader, String className, Environment environment) throws Throwable {
final Class<? extends AbstractInvokable> invokableClass;
try {
// 使用指定的classloader加載className對應的class拂蝎,并轉換為AbstractInvokable類型
invokableClass =
Class.forName(className, true, classLoader).asSubclass(AbstractInvokable.class);
} catch (Throwable t) {
throw new Exception("Could not load the task's invokable class.", t);
}
Constructor<? extends AbstractInvokable> statelessCtor;
try {
// 獲取構造函數(shù)
statelessCtor = invokableClass.getConstructor(Environment.class);
} catch (NoSuchMethodException ee) {
throw new FlinkException("Task misses proper constructor", ee);
}
// instantiate the class
try {
//noinspection ConstantConditions --> cannot happen
// 傳入environment變量穴墅,創(chuàng)建出新的對象
return statelessCtor.newInstance(environment);
} catch (InvocationTargetException e) {
// directly forward exceptions from the eager initialization
throw e.getTargetException();
} catch (Exception e) {
throw new FlinkException("Could not instantiate the task's invokable class.", e);
}
}
StreamTask
StreamTask
類是所有流處理任務的基類。Task由TaskManager部署和執(zhí)行。Task是本地運行單元玄货。每一個Task包含了一個或多個operator皇钞。這些operator在同一個OperatorChain中。
StreamTask任務執(zhí)行生命周期包含:
- setInitialState:設置各個operator的初始狀態(tài)松捉。對應
initializeState
方法夹界。 - 調用
invoke
方法。
其中invoke
方法包含的邏輯可細分為:
- 創(chuàng)建出task相關配置隘世,創(chuàng)建OperatorChain可柿。
- 執(zhí)行operator的setup邏輯。
- 執(zhí)行task相關的初始化邏輯丙者。
- 加載并初始化operator的狀態(tài)复斥。
- 調用各個operator的open方法。
- 執(zhí)行各個operator內的數(shù)據(jù)處理邏輯械媒。
- 關閉operator永票。
- 銷毀operator。
- 任務清理操作滥沫。
下面我們從代碼層面詳細分析下invoke
方法的處理流程侣集。
invoke方法
本節(jié)我們分析StreamTask
核心執(zhí)行邏輯invoke
方法。invoke
方法如下所示:
@Override
public final void invoke() throws Exception {
try {
// 調用作業(yè)執(zhí)行前相關準備邏輯
beforeInvoke();
// final check to exit early before starting to run
// 如果任務被取消兰绣,拋出異常退出
if (canceled) {
throw new CancelTaskException();
}
// let the task do its work
// 執(zhí)行用戶編寫的task邏輯
runMailboxLoop();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
// 再次檢查如果任務被取消世分,拋出異常退出
if (canceled) {
throw new CancelTaskException();
}
// 執(zhí)行調用后相關邏輯
afterInvoke();
} catch (Throwable invokeException) {
failing = !canceled;
try {
cleanUpInvoke();
}
// TODO: investigate why Throwable instead of Exception is used here.
catch (Throwable cleanUpException) {
Throwable throwable =
ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
ExceptionUtils.rethrowException(throwable);
}
ExceptionUtils.rethrowException(invokeException);
}
// 執(zhí)行invoke后清理操作
cleanUpInvoke();
}
beforeInvoke方法
beforeInvoke
方法主要為task的初始化操作,包含創(chuàng)建OperatorChain
缀辩,讀取上游數(shù)據(jù)和下游數(shù)據(jù)輸出配置等臭埋。詳細內容如下:
protected void beforeInvoke() throws Exception {
disposedOperators = false;
LOG.debug("Initializing {}.", getName());
// 創(chuàng)建出OperatorChain
// OperatorChain是JobGraph生成時的一箱優(yōu)化措施
// 將復合條件的多個StreamNode(對應數(shù)據(jù)變換操作)合并到一個chain中
// 他們會被調度到同一個StreamTask中執(zhí)行
operatorChain = new OperatorChain<>(this, recordWriter);
// 獲取OperatorChain中第一個operator
mainOperator = operatorChain.getMainOperator();
// task specific initialization
// 執(zhí)行task專屬的初始化工作
// 這個是抽象方法
// 具體邏輯需要在子類中實現(xiàn)
init();
// save the work of reloading state, etc, if the task is already canceled
if (canceled) {
throw new CancelTaskException();
}
// -------- Invoke --------
LOG.debug("Invoking {}", getName());
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
// task動作必須在StreamTaskActionExecutor中執(zhí)行,防止出現(xiàn)并發(fā)執(zhí)行問題臀玄,影響checkpoint
// 該executor實際為StreamTaskActionExecutor.IMMEDIATE瓢阴,即在當前線程直接運行
actionExecutor.runThrowing(
() -> {
// 創(chuàng)建SequentialChannelStateReader,用于讀取checkpoint時保存的channel狀態(tài)
SequentialChannelStateReader reader =
getEnvironment()
.getTaskStateManager()
.getSequentialChannelStateReader();
// 獲取ResultPartitionWriter狀態(tài)
reader.readOutputData(
getEnvironment().getAllWriters(),
!configuration.isGraphContainingLoops());
// 初始化OperatorChain中所有的operator
// 調用他們的initializeState(初始化狀態(tài))和open(包含初始化動作)方法
operatorChain.initializeStateAndOpenOperators(
createStreamTaskStateInitializer());
channelIOExecutor.execute(
() -> {
try {
// 獲取InputGate狀態(tài)
reader.readInputData(getEnvironment().getAllInputGates());
} catch (Exception e) {
asyncExceptionHandler.handleAsyncException(
"Unable to read channel state", e);
}
});
for (InputGate inputGate : getEnvironment().getAllInputGates()) {
// 在inputGate狀態(tài)被讀取之后執(zhí)行
inputGate
.getStateConsumedFuture()
.thenRun(
() ->
// 在task線程中執(zhí)行
mainMailboxExecutor.execute(
// 執(zhí)行請求partition方法
inputGate::requestPartitions,
"Input gate request partitions"));
}
});
// 水池狀態(tài)為正在執(zhí)行
isRunning = true;
}
runMailboxLoop方法
runMailboxLoop
方法啟動task的數(shù)據(jù)輸入和處理邏輯:
public void runMailboxLoop() throws Exception {
mailboxProcessor.runMailboxLoop();
}
MailBoxProcessor
在StreamTask
的構造函數(shù)中創(chuàng)建出來:
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
mailboxProcessor.runMailboxLoop()
方法可以理解為在actionExecutor
線程池執(zhí)行processInput
方法荣恐。
processInput
方法從上游(StreamTaskNetworkInput
叠穆,InputGate
)讀取數(shù)據(jù)。這部分邏輯參見Flink 源碼之節(jié)點間通信硼被。
afterInvoke
afterInvoke
方法內容如下渗磅,概括起來為task執(zhí)行完畢后的清理工作检访,關閉operator等仔掸。
protected void afterInvoke() throws Exception {
LOG.debug("Finished task {}", getName());
getCompletionFuture().exceptionally(unused -> null).join();
final CompletableFuture<Void> timersFinishedFuture = new CompletableFuture<>();
// close all operators in a chain effect way
// 關閉OperatorChain中所有的operator
// 從前向后依次調用各個operator的close方法
operatorChain.closeOperators(actionExecutor);
// make sure no further checkpoint and notification actions happen.
// at the same time, this makes sure that during any "regular" exit where still
actionExecutor.runThrowing(
() -> {
// make sure no new timers can come
// 停止timer服務
FutureUtils.forward(timerService.quiesce(), timersFinishedFuture);
// let mailbox execution reject all new letters from this point
// 準備關閉mailboxProcessor,不再接受新的事件
mailboxProcessor.prepareClose();
// only set the StreamTask to not running after all operators have been closed!
// See FLINK-7430
// 設置task狀態(tài)為停止
isRunning = false;
});
// processes the remaining mails; no new mails can be enqueued
// 處理積壓的事件
mailboxProcessor.drain();
// make sure all timers finish
// 等待所有的time都停止
timersFinishedFuture.get();
LOG.debug("Closed operators for task {}", getName());
// make sure all buffered data is flushed
// 處理掉buffer中的所有數(shù)據(jù)
operatorChain.flushOutputs();
// make an attempt to dispose the operators such that failures in the dispose call
// still let the computation fail
// 依次廢棄掉OperatorChain中的所有operator(順序為從頭到尾)
disposeAllOperators();
}
StreamTask的子類
StreamTask
是所有流處理計算任務的父類嘉汰,它本身是一個抽象類。為了處理不同類型的StreamOperator
状勤,StreamTask
有多種不同的實現(xiàn)鞋怀。幾個典型的實現(xiàn)如下:
- OneInputStreamTask:處理
OneInputStreamOperator
,即只有一個輸入流的StreamOperator
持搜。 - TwoInputStreamTask:處理
TwoInputStreamOperator
密似,具有2個輸入流。 - MultipleInputStreamTask:處理
MultipleInputStreamOperator
葫盼,具有多個輸入流残腌。 - SourceStreamTask:處理
StreamSource
,即數(shù)據(jù)源贫导。
接下來我們重點關注這些類實現(xiàn)的抽象方法抛猫。
OneInputStreamTask的init方法
它的init
方法主要流程為創(chuàng)建網(wǎng)絡輸入與輸出,創(chuàng)建inputProcessor
用于從網(wǎng)絡輸入讀取數(shù)據(jù)孩灯,反序列化之后傳遞給網(wǎng)絡輸出闺金。最后初始化數(shù)據(jù)流監(jiān)控。代碼和分析如下:
@Override
public void init() throws Exception {
// 獲取流作業(yè)配置
StreamConfig configuration = getConfiguration();
// 獲取網(wǎng)絡輸入流數(shù)量
int numberOfInputs = configuration.getNumberOfNetworkInputs();
if (numberOfInputs > 0) {
// 創(chuàng)建一個CheckpointedInputGate
// 該類型InputGate擁有一個CheckpointBarrierHandler峰档,用來處理接收到的CheckpointBarrier
CheckpointedInputGate inputGate = createCheckpointedInputGate();
// 監(jiān)控相關败匹,設置流入數(shù)據(jù)條數(shù)計數(shù)器
Counter numRecordsIn = setupNumRecordsInCounter(mainOperator);
// 創(chuàng)建StreamTaskNetworkOutput
// 發(fā)送反序列化后的數(shù)據(jù)給task處理流程
DataOutput<IN> output = createDataOutput(numRecordsIn);
// 創(chuàng)建StreamTaskNetworkInput
// 包裝了CheckpointedInputGate,從中讀取網(wǎng)絡接收到的原始數(shù)據(jù)并發(fā)給反序列化器
StreamTaskInput<IN> input = createTaskInput(inputGate);
// 讀取輸入流配置
StreamConfig.InputConfig[] inputConfigs =
configuration.getInputs(getUserCodeClassLoader());
StreamConfig.InputConfig inputConfig = inputConfigs[0];
// 如果要求對數(shù)據(jù)排序
// 含義為數(shù)據(jù)按照key字段分組
// 在一段時間內只會給task提供同一分組的數(shù)據(jù)
// 不同組的數(shù)據(jù)不會頻繁交替出現(xiàn)
if (requiresSorting(inputConfig)) {
checkState(
!configuration.isCheckpointingEnabled(),
"Checkpointing is not allowed with sorted inputs.");
input = wrapWithSorted(input);
}
// 注冊流入數(shù)據(jù)條數(shù)計數(shù)器監(jiān)控
getEnvironment()
.getMetricGroup()
.getIOMetricGroup()
.reuseRecordsInputCounter(numRecordsIn);
// 創(chuàng)建inputProcessor
// 從網(wǎng)絡讀取數(shù)據(jù)讥巡,反序列化后給output掀亩,然后把反序列化后的數(shù)據(jù)交給OperatorChain
inputProcessor = new StreamOneInputProcessor<>(input, output, operatorChain);
}
// 創(chuàng)建watermark監(jiān)控
mainOperator
.getMetricGroup()
.gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
getEnvironment()
.getMetricGroup()
.gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, inputWatermarkGauge::getValue);
}
其中創(chuàng)建CheckpointedInputGate
的過程在 Flink 源碼之分布式快照 有介紹,請大家查閱欢顷。
TwoInputStreamTask的init方法
它的初始化方法和OneInputStreamTask
的類似槽棍,只不過需要創(chuàng)建兩個InputGate
。TwoInputStreamTask
對應CoOperator
抬驴,即有兩個輸入流的operator(比如CoFlatmap)刹泄。
@Override
public void init() throws Exception {
StreamConfig configuration = getConfiguration();
ClassLoader userClassLoader = getUserCodeClassLoader();
int numberOfInputs = configuration.getNumberOfNetworkInputs();
ArrayList<IndexedInputGate> inputList1 = new ArrayList<>();
ArrayList<IndexedInputGate> inputList2 = new ArrayList<>();
List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
for (int i = 0; i < numberOfInputs; i++) {
int inputType = inEdges.get(i).getTypeNumber();
IndexedInputGate reader = getEnvironment().getInputGate(i);
switch (inputType) {
case 1:
// 如果是輸入流1,加入到inputList1中
inputList1.add(reader);
break;
case 2:
// 如果是輸入流2怎爵,加入到inputList2中
inputList2.add(reader);
break;
default:
throw new RuntimeException("Invalid input type number: " + inputType);
}
}
// 創(chuàng)建CheckpointedInputGate特石,包裝了UnionInputGate
// 包裝了多個InputGate姆蘸,ID相同的channel會被合并
// 這里創(chuàng)建出兩個UnionInputGate逞敷,每個UnionInputGate合并了多個inputType相同的InputGate
// 最后根據(jù)這個InputGate推捐,創(chuàng)建出StreamTwoInputProcessor
createInputProcessor(
inputList1, inputList2, gateIndex -> inEdges.get(gateIndex).getPartitioner());
// 監(jiān)控相關部分堪簿,這里省略
// ...
}
MultipleInputStreamTask
和上面的邏輯類似椭更,不再贅述虑瀑。
SourceStreamTask的init方法
@Override
protected void init() {
// we check if the source is actually inducing the checkpoints, rather
// than the trigger
// 獲取數(shù)據(jù)源數(shù)據(jù)產(chǎn)生邏輯SourceFunction
SourceFunction<?> source = mainOperator.getUserFunction();
// 如果source實現(xiàn)了這個接口舌狗,說明接收到CheckpointCoordinator發(fā)來的觸發(fā)checkpoint消息之時source不觸發(fā)checkpoint
// checkpoint的觸發(fā)由輸入數(shù)據(jù)控制
if (source instanceof ExternallyInducedSource) {
externallyInducedCheckpoints = true;
// 創(chuàng)建checkpoint觸發(fā)鉤子
ExternallyInducedSource.CheckpointTrigger triggerHook =
new ExternallyInducedSource.CheckpointTrigger() {
@Override
public void triggerCheckpoint(long checkpointId) throws FlinkException {
// TODO - we need to see how to derive those. We should probably not
// encode this in the
// TODO - source's trigger message, but do a handshake in this task
// between the trigger
// TODO - message from the master, and the source's trigger
// notification
final CheckpointOptions checkpointOptions =
CheckpointOptions.forConfig(
CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault(),
configuration.isExactlyOnceCheckpointMode(),
configuration.isUnalignedCheckpointsEnabled(),
configuration.getAlignmentTimeout());
final long timestamp = System.currentTimeMillis();
final CheckpointMetaData checkpointMetaData =
new CheckpointMetaData(checkpointId, timestamp);
try {
// 調用StreamTask的異步觸發(fā)checkpoint方法
SourceStreamTask.super
.triggerCheckpointAsync(
checkpointMetaData, checkpointOptions)
.get();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new FlinkException(e.getMessage(), e);
}
}
};
((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);
}
// 配置checkpoint啟動延遲時間監(jiān)控
getEnvironment()
.getMetricGroup()
.getIOMetricGroup()
.gauge(
MetricNames.CHECKPOINT_START_DELAY_TIME,
this::getAsyncCheckpointStartDelayNanos);
}
StreamTask從上游獲取數(shù)據(jù)
StreamTask從上游獲取數(shù)據(jù)的調用鏈為:
- StreamTask.processInput
- inputProcessor.processInput
- StreamTaskNetworkInput.emitNext
- inputGate.pollNext
- inputChannel.getNextBuffer
StreamTask
通過InputGate
從上游其他Task獲取到數(shù)據(jù)铭污。每個InputGate
包含一個或多個InputChannel
嘹狞,根據(jù)數(shù)據(jù)是否走網(wǎng)絡通信磅网,這些InputChannel
分為RemoteInputChannel
和LocalInputChannel
涧偷。其中RemoteInputChannel
使用Netty通過網(wǎng)絡從上游task的ResultSubPartition
獲取數(shù)據(jù)燎潮,適用與本task和上游task運行在不同集群節(jié)點的情況确封。和它相反的是LocalInputChannel
,適用于本task和上游task運行在同一節(jié)點的情況颜曾,從上游task獲取數(shù)據(jù)不需要走網(wǎng)絡通信稠诲。
這部分邏輯的詳細分析诡曙,參見 Flink 源碼之節(jié)點間通信。
數(shù)據(jù)傳遞給OperatorChain
這一段邏輯我們從StreamTaskNetworkInput
的processElement
方法開始分析聚请。
StreamTask
的processInput
方法為處理數(shù)據(jù)邏輯的入口驶赏。這個方法調用了StreamOneInputProcessor
的同名方法煤傍,命令StreamTaskNetworkInput
一直循環(huán)不停的從InputGate
中獲取數(shù)據(jù)蚯姆。對于獲取到的數(shù)據(jù)龄恋,需要先交給反序列化器郭毕,將二進制數(shù)據(jù)反序列化為StreamRecord
對象显押。接著交給processElement
方法處理乘碑。
上面邏輯的分析請參見 Flink 源碼之節(jié)點間通信 讀取數(shù)據(jù)章節(jié)兽肤。
下面是processElement
方法。該方法位于AbstractStreamTaskNetworkInput
睦疫。參數(shù)中的output
實際上就是StreamTaskNetworkOutput`對象蛤育。
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
// 首先判斷元素的類型瓦糕,可能是數(shù)據(jù)咕娄,watermark圣勒,延遲標記或者是流狀態(tài)
if (recordOrMark.isRecord()) {
output.emitRecord(recordOrMark.asRecord());
} else if (recordOrMark.isWatermark()) {
statusWatermarkValve.inputWatermark(
recordOrMark.asWatermark(), flattenedChannelIndices.get(lastChannel), output);
} else if (recordOrMark.isLatencyMarker()) {
output.emitLatencyMarker(recordOrMark.asLatencyMarker());
} else if (recordOrMark.isStreamStatus()) {
statusWatermarkValve.inputStreamStatus(
recordOrMark.asStreamStatus(),
flattenedChannelIndices.get(lastChannel),
output);
} else {
throw new UnsupportedOperationException("Unknown type of StreamElement");
}
}
StreamTaskNetworkOutput
接收反序列化處理過的數(shù)據(jù)圣贸,發(fā)送給OperatorChain
的第一個operator吁峻。
private static class StreamTaskNetworkOutput<IN> extends AbstractDataOutput<IN> {
// 創(chuàng)建的時候傳入的是OperatorChain的mainOperator用含,即第一個operator
private final OneInputStreamOperator<IN, ?> operator;
private final WatermarkGauge watermarkGauge;
private final Counter numRecordsIn;
private StreamTaskNetworkOutput(
OneInputStreamOperator<IN, ?> operator,
StreamStatusMaintainer streamStatusMaintainer,
WatermarkGauge watermarkGauge,
Counter numRecordsIn) {
super(streamStatusMaintainer);
this.operator = checkNotNull(operator);
this.watermarkGauge = checkNotNull(watermarkGauge);
this.numRecordsIn = checkNotNull(numRecordsIn);
}
// 發(fā)送數(shù)據(jù)
@Override
public void emitRecord(StreamRecord<IN> record) throws Exception {
numRecordsIn.inc();
operator.setKeyContextElement1(record);
// 調用processElement方法,處理數(shù)據(jù)
operator.processElement(record);
}
// 發(fā)送watermark
@Override
public void emitWatermark(Watermark watermark) throws Exception {
watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
// 調用processWatermark方法肠缔,處理watermark
operator.processWatermark(watermark);
}
// 發(fā)送延遲標記明未,被用于統(tǒng)計數(shù)據(jù)在整個Flink處理流程中的耗時
@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
operator.processLatencyMarker(latencyMarker);
}
}
OperatorChain
的邏輯在后續(xù)博客中單獨分析趟妥。
本博客為作者原創(chuàng)披摄,歡迎大家參與討論和批評指正。如需轉載請注明出處义辕。