一 、 task執(zhí)行入口 0:15
接 上期 回顧
★ ——7 》 TaskExecutor#submitTask()
第一個入口:Task 構(gòu)造函數(shù)
——》Task 構(gòu)造函數(shù)()
▼
* 注釋: 當(dāng)前任務(wù)的 Task 信息
*/
this.taskInfo = new TaskInfo()
......
* 注釋: 初始化 ResultPartition 和 ResultSubpartition 關(guān)于輸出的抽象
* ResultSubpartition具體實(shí)現(xiàn)為 PipelinedSubpartition
*/
final ResultPartitionWriter[] resultPartitionWriters = shuffleEnvironment
.createResultPartitionWriters()
this.consumableNotifyingPartitionWriters =
ConsumableNotifyingResultPartitionWriterDecorator
......
* 注釋: 初始化 InputGate 輸入的 對象,
inputchanle 從上游一個task節(jié)點(diǎn)拉取數(shù)據(jù)
*/
final IndexedInputGate[] gates = shuffleEnvironment.createInputGates()
* 注釋:創(chuàng)建 Task 的線程 但是不執(zhí)行 run() 方法
*/
executingThread = new Thread(TASK_THREADS_GROUP,
第二個入口:task.startTaskThread(); 通過一個線程來運(yùn)行 Task 0:45
—— 》Task #run()
—— ★》Task #dorun()
▼
★ 重要步驟 反射實(shí)例化StreamTask實(shí)例
dorun:1 if(transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING))
dorun:2 setupPartitionsAndGates(consumableNotifyingPartitionWriters,
dorun:3 Environment env = new RuntimeEnvironment(jobId, vertexId,
dorun:4 invokable = loadAndInstantiateInvokable(userCodeClassLoader,
dorun: 5 invokable.invoke()豺旬;
—— dorun:4》Task #loadAndInstantiateInvokable()
▼
反射調(diào)用構(gòu)造函數(shù)
statelessCtor = invokableClass.getConstructor(Environment.class);
//#1 SourceStreamTask 帶RuntimeEnvvironment的構(gòu)造函數(shù)
//#2 OneInputStreamTask 帶RuntimeEnvvironment的構(gòu)造函數(shù)
Constructor<? extends AbstractInvokable> statelessCtor;
try {
statelessCtor = invokableClass.getConstructor(Environment.class);
}
——#1 SourceStreamTask(Environment env) 構(gòu)造函數(shù)
——#2 OneInputStreamTask (Environment env) 構(gòu)造函數(shù)
//1,2最后都到 父類構(gòu)造函數(shù) StreamTask()
——★ 》 父類構(gòu)造函數(shù) StreamTask()
【streamtask 截圖或者筆記】
4件大事
StreamTask:1 this.recordWriter = createRecordWriterDelegate(configuration,
StreamTask:2 this.mailboxProcessor = new MailboxProcessor(this::processInput,
StreamTask:3 this.stateBackend = createStateBackend();
StreamTask:4 this.subtaskCheckpointCoordinator = new
—— StreamTask:1 》 StreamTask. createRecordWriterDelegate 1:04
——》1》StreamTask.createRecordWriters()
——》1》StreamTask.createRecordWriter()
——》1》StreamTask.createRecordWriters
▼
// TODO_MA 注釋: 初始化一個 ArrayList 容器用來存放創(chuàng)建出來的 RecordWriter
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>
// TODO_MA 注釋: 獲取該 StreamTask 的輸出 StreamEdge 集合
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder
// TODO_MA 注釋: 一個 out StreamEdge 來構(gòu)建 一個 RecordWriter
// TODO_MA 注釋: 大概率 createRecordWriter() 方法的返回值是:
ChannelSelectorRecordWriter
recordWriters.add(createRecordWriter(edge, i,
——》1》StreamTask.createRecordWriters()
▼
- 1轻专、如果上游 StreamNode 和 下游 StreamNode 的并行度一樣魔眨,則使用: ForwardPartitioner 數(shù)據(jù)分發(fā)策略
-
// TODO_MA 注釋: 其實(shí)這個 output 就是負(fù)責(zé)幫您完成這個 StrewamTask 的所有數(shù)據(jù)的輸出2滴劲、如果上游 StreamNode 和 下游 StreamNode 的并行度不一樣苏章,則使用: RebalancePartitioner 數(shù)據(jù)分發(fā)策略 StreamPartitioner<OUT> outputPartitioner = null; try { outputPartitioner = InstantiationUtil.clone(
// TODO_MA 注釋: 輸出到 ResultPartition
// TODO_MA 注釋: 初始化輸出 ChannelSelectorRecordWriter
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output
= new RecordWriterBuilder<SerializationDelegate<StreamRecord<OUT>>>() .setChannelSelector(outputPartitioner)
.setTimeout(bufferTimeout)
.setTaskName(taskName) // TODO_MA 注釋: 構(gòu)建一個 RecordWriter 返回
.build(bufferWriter);
▲回到父類構(gòu)造函數(shù) StreamTask ▲
StreamTask:2——》 this.mailboxProcessor = new
MailboxProcessor(this::processInput,
//#1 SourceStreamTask 帶RuntimeEnvvironment的構(gòu)造函數(shù)
//#2 OneInputStreamTask 帶RuntimeEnvvironment的構(gòu)造函數(shù)
▲回到 父類構(gòu)造函數(shù) StreamTask ▲
StreamTask:3 ——》 this.stateBackend = createStateBackend(); 1:26 ~
state簡介 部分
——》 StreamTask.createStateBackend() 1:36
▼
* 注釋: 根據(jù)配置獲取 StateBackend
* 一般情況下寂嘉,我們在生產(chǎn)環(huán)境中奏瞬, 在 flink-conf.yaml 文件中進(jìn)行配置:
* 1、state.backend: filesystem
* 2泉孩、state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
* 一般有三種方式:
* 1硼端、state.backend: filesystem = FsStateBackend
* 2、state.backend: jobmanager = MemoryStateBackend
* 3寓搬、state.backend: rocksdb = RocksDBStateBackend
* 也可以在程序中泽疆,進(jìn)行設(shè)置 這種方式會覆蓋配置文件中的配置:
* StreamExecutionEnvironment.setStateBackend(StateBackend backend)
*/
return StateBackendLoader.fromApplicationOrConfigOrDefault
▲ 回 父類構(gòu)造函數(shù) StreamTask▲
******創(chuàng)建 CheckpointStorage
* 1恭应、FsStateBackend = FsCheckpointStorage
*/
stateBackend.createCheckpointStorage(getEnvironment()
↓
FsStateBackendcreateCheckpointStorage
****** 注釋: 創(chuàng)建 Channel 的 IO 線程池
*/
this.channelIOExecutor = Executors.newSingleThreadExecutor
▲ 回 父類構(gòu)造函數(shù) StreamTask▲
▲回 ★》Task #dorun()下 的 dorun: 5 ▲
*** dorun: 5 invokable.invoke()埂陆;
* 注釋: 運(yùn)行任務(wù)钢坦, 在流式應(yīng)用程序中虏劲,都是 StreamTask 的子類
* AbstractInvokable 是 Task 執(zhí)行的主要邏輯颓影,也是所有被執(zhí)行的任務(wù)的基類赁炎,包括 Streaming 模式和 Batch 模式擎场。
* 在 Streaming 模式下瞒御,所有任務(wù)都繼承自 StreamTask父叙,
* 包括 StreamTask 的子類包括 SourceStreamTask, OneInputStreamTask, TwoInputStreamTask,
* 以及用于迭代模式下的 StreamIterationHead 和 StreamIterationTail。
* 每一個 StreamNode 在添加到 StreamGraph 的時候都會有一個關(guān)聯(lián)的 jobVertexClass 屬性肴裙,
* 這個屬性就是該 StreamNode 對應(yīng)的 StreamTask 類型趾唱;對于一個 OperatorChain 而言,它所對應(yīng)的
* StreamTask 就是其 head operator 對應(yīng)的 StreamTask蜻懦。
*/
// run the invokable
invokable.invoke();
↓
——》StreamTask#invoke()
▼
inv 1: beforeInvoke()
inv 2: runMailboxLoop();
inv 3: afterInvoke();
inv 4: cleanUpInvoke();
——inv 1 》StreamTask#beforeInvoke()
▼
-
注釋: 構(gòu)建 OperatorChain 對象甜癞,里面會做很多事情
* 初始化 output 輸出對象
* 主要做三件事情:
* 1、調(diào)用createStreamOutput()創(chuàng)建對應(yīng)的下游輸出RecordWriterOutput
* 2宛乃、調(diào)用createOutputCollector()將優(yōu)化邏輯計劃當(dāng)中Chain中的StreamConfig(也就是數(shù)據(jù))寫入到第三步創(chuàng)建的RecordWriterOutput中
* 3悠咱、通過調(diào)用getChainedOutputs()輸出結(jié)果RecordWriterOutput
*/
operatorChain = new OperatorChain<>(this, recordWriter);——inv 1 》OperatorChain 構(gòu)造函數(shù) ▼ // TODO_MA 注釋: 遍歷每個輸出邊,給每個 outEdge 構(gòu)造一個 RecordWriterOutput 實(shí)例
for(int i = 0; i < outEdgesInOrder.size(); i++) {
* 為每一個 Operator 構(gòu)造 RecordWriterOutput
RecordWriterOutput<?> streamOutput =
createStreamOutput(recordWriterDelegate
//放到map 里面
streamOutputMap.put(outEdge, streamOutput);
} 注釋: 為每一個 Operator 創(chuàng)建 OutputCollector
*/
this.chainEntryPoint = createOutputCollector(-
注釋: 創(chuàng)建 Operator
Tuple2<OP, Optional<ProcessingTimeService>> headOperatorAndTimeService =- 注釋: 創(chuàng)建 OperatorWrapper
this.headOperatorWrapper = createOperatorWrapper(
// TODO_MA 注釋: 所有 OperatorWrapper 對象集合征炼,把 headOperatorWrapper 放入到最后
// TODO_MA 注釋: 其實(shí)一個 OperatorChain 中析既,包含了多個 Operator,最終都被封裝成 OperatorWrapper 放入這個集合中
// add head operator to end of chain
allOpWrappers.add(headOperatorWrapper);
- 注釋: 創(chuàng)建 OperatorWrapper
// TODO_MA 注釋: tailOperatorWrapper 在 ArrayList 的最前面
this.tailOperatorWrapper = allOpWrappers.get(0);
-
注釋: 以 forward topological order 鏈接全部的 operator wrappers為一張圖
*/
linkOperatorWrappers(allOpWrappers);
operatorchain.png
▲OperatorChain 構(gòu)造函數(shù)結(jié)束▲
▲回inv 1 》StreamTask#beforeInvoke()方法 里面
// TODO_MA 注釋: 獲取 OperatorChain 的第一個 Operator
// TODO_MA 注釋: 可以認(rèn)為 接收數(shù)據(jù)線程中谆奥,用到的 headOpeartor 終于被初始化
// TODO_MA 注釋: 其實(shí)到此為止眼坏,可以認(rèn)為,在當(dāng)前 OperatorChain 中要用到的各種組件都已經(jīng)創(chuàng)建好了酸些,可以接收數(shù)據(jù)宰译,然后開始流式處理了。
headOperator = operatorChain.getHeadOperator();
- 注釋: 執(zhí)行 StreamTask 的初始化
- 1魄懂、可能是 SourceStreamTask沿侈, 對于 SourceStreamTask 來說,只是注冊一個 savepoint 鉤子
- 2市栗、也可能是 OneInputStreamTask
*/
init(); - 注釋: 狀態(tài)恢復(fù)入口
operatorChain.initializeStateAndOpenOperators( - 注釋: 初始化 Mail
* 這個地方主要是初始化 InputGate 等輸入相關(guān)的細(xì)節(jié)
*/
readRecoveredChannelState();
----------------------至此 所有準(zhǔn)備工作完成缀拭, 準(zhǔn)備真正執(zhí)行task數(shù)據(jù)流處理到此為止,Task 初始化和預(yù)執(zhí)行相關(guān)的,都基本到位了智厌,然后就開始從我們的 SourceStreamTask 的HeadOperator 的數(shù)據(jù)接收線程诲泌,開始流式處理。-------------
中場休息
▲回到 inv 2: runMailboxLoop(); 開始 2:17
inv 2: runMailboxLoop();
——inv 2》MailboxProcessor.runMailboxLoop()
—— 跳轉(zhuǎn)到父類入口 》SourceStreamTask#LegacySourceFunctionThread.run()
headOperator.run(lock,
——★ 》StreamSource#run()
▼
注釋: 獲取 Operator 的執(zhí)行上下文對象
*/
this.ctx = StreamSourceContexts.getSourceContext(timeCharacteristic注釋: 真正運(yùn)行用戶的 Operator
* 1铣鹏、如果你使用:env.socketTextStream() 則調(diào)用:
SocketTextStreamFunction
* 2敷扫、如果你使用:Kafka數(shù)據(jù)源, 則調(diào)用: FlinkKafkaConsumerBase
* ......
* function --> transformation ---> streamOperator
* headOperator.run();
*/
userFunction.run(ctx);
↓ 有很多source 诚卸,選擇SocketTextStreamFunction 為例
—— 》SocketTextStreamFunction.run()
// TODO_MA 注釋: 沒有數(shù)據(jù)葵第,則阻塞在這兒
// TODO_MA 注釋: 在 SourceStreamTask 初始化的時候,SourceThread 的代碼能執(zhí)行到這兒
while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
// TODO_MA 注釋: 一直讀到有分隔符
while (buffer.length() >= delimiter.length()
.......
***** 把讀取到的數(shù)據(jù)傳給ctx 上下文處理****
ctx.collect(record);
↓ 選擇NonTimestamp - 時間戳為 processtime
StreamSourceContexts#NonTimestampContext#collect()
▼
* output = CountingOutput
* -
* element 收到的 一條數(shù)據(jù)
* reuse 裝一條數(shù)據(jù)合溺,待序列化的容器
* collect 執(zhí)行 collect 會對 reuse 對象執(zhí)行序列化
*/
output.collect(reuse.replace(element));
↓
—— 》 CountingOutput.collect()
* output = ChainingOutput
*/
output.collect(record);
↓
—— 》OperatorChain#ChainingOutput.collect()
—— 》 pushToOperator(record);
▼
-
注釋: 調(diào)用 Operator 的 processElement 來處理 castRecord 數(shù)據(jù)記錄
* 假設(shè)下一個算子是 keyBy卒密, 則跳轉(zhuǎn)到 : KeyedProcessOperator
* 因?yàn)橹笠?shuffle 了,所以之后就沒有其他的 Operator 了
* map() = StreamOperator = StreamMap = operator
*/
operator.processElement(castRecord);
↓
—— 》StreamMap.processElement()
▼- 注釋: 通過 Output 收集處理之后的結(jié)果數(shù)據(jù)
- 1棠赛、userFunction.map(element.getValue()) 這是用戶自定義的 map 邏輯
- 2哮奇、然后計算完的結(jié)果替換掉當(dāng)前 Opeartor 中的成員變量
- 3、然后被 StreamMap 這個 StreamOperator 繼續(xù)收集
- 記拙υ肌:因?yàn)楫?dāng)前是 map 操作鼎俘,所以下一步肯定不是 shuffle
- 但是,如果是 shuffle 算子辩涝,則會執(zhí)行輸出了贸伐。
*/
output.collect(element.replace(userFunction.map(element.getValue())));
↓
—— 》▲回到 OperatorChain#ChainingOutput..collect() 遞歸式鏈接調(diào)用
// TODO_MA 注釋: 跳轉(zhuǎn)到下一個 Operator 來處理元素
// TODO_MA 注釋: StreamMap
// TODO_MA 注釋: keyBy
pushToOperator(record);
▼
operator.processElement(castRecord);
↓
—— 》 KeyedProcessOperator.processElement(castRecord);
▼
- 注釋: 通過 Output 收集處理之后的結(jié)果數(shù)據(jù)
注釋: 處理元素
userFunction.processElement(element.getValue(), context, collector);
↓ keyby就是聚合
—— 》 GroupAggFunction.processElement()
▼-
注釋: 狀態(tài)初始化 拿到初始值
*/
RowData accumulators = accState.value();
// 設(shè)置累加器到第一個
function.setAccumulators(accumulators);
// 得到上一個值
RowData prevAggValue = function.getValue();
// TODO_MA 注釋: 累加 或者 縮回,其實(shí)都是聚合
// update aggregate result and set to the newRow
if (isAccumulateMsg(input)) {
// accumulate input
function.accumulate(input);
} else {
// retract input
function.retract(input);
}
// TODO_MA 注釋: 計算完畢怔揩,得到新的結(jié)果
// get current aggregate result
RowData newAggValue = function.getValue();
// get accumulator
accumulators = function.getAccumulators();
// TODO_MA 注釋: 講計算得到的結(jié)果捉邢,替換到當(dāng)前的 resultRow 成員變量中
// prepare UPDATE_BEFORE message for previous row
resultRow.replace(currentKey, prevAggValue)
.setRowKind(RowKind.UPDATE_BEFORE);
// TODO_MA 注釋: 寫出
out.collect(resultRow);
↓
RecordWriterOutput.collect(resultRow)
—— 》 pushToRecordWriter(StreamRecord<X> record)
* 注釋: 發(fā)送
*/
recordWriter.emit(serializationDelegate);
↓ 一般不是廣播
—— 》 ChannelSelectorRecordWriter.emit()
▼
* 注釋: channelSelector確定目標(biāo)channel- channelSelector 的作用,就和 mapreduce 框架中的 Partitioner 是一樣的作用:
- 用來決定 record 到底被分發(fā)到那個一個分區(qū)
- channelSelector.selectChannel(record) = partitioner.getPartition()
*/
emit(record, channelSelector.selectChannel(record));
▼
// TODO_MA 注釋: 序列化
serializer.serializeRecord(record);
// TODO_MA 注釋: 將序列化器中的序列化結(jié)果寫入目標(biāo) channel if (copyFromSerializerToTargetChannel(targetChannel)) {
- channelSelector 的作用,就和 mapreduce 框架中的 Partitioner 是一樣的作用:
—— 》 ChannelSelectorRecordWriter.copyFromSerializerToTargetChannel()
flushTargetPartition(targetChannel);
↓ 一般不是廣播
—— 》ResultPartitionWriter#flushTargetPartition()
* 注釋: flush 到對應(yīng)的 ResultPartition 中
* targetChannel = InputChannel
* targetPartition = ResultPartition
*/
targetPartition.flush(targetChannel);
↓
ResultPartition.flush()
↓
PipelinedSubpartition.flush()
—— 》 PipelinedSubpartition# notifyDataAvailable();
▼
// TODO_MA 注釋: readView 是 ResultSubPartition 的消費(fèi)者視圖 對象
// TODO_MA 注釋: 下游的一個Task 可能會消費(fèi)上游的多個Task的某一個分區(qū)的數(shù)據(jù)商膊。
/ TODO_MA 注釋: 上游個任意一個Task的任意一個分區(qū)叫做: ResultSubPartition伏伐,
// TODO_MA 注釋: 這個 ResultSubPartition 對應(yīng)一個消費(fèi)者:
PipelinedSubpartitionView
readView.notifyDataAvailable();
—— 》PipelinedSubpartitionView#notifyDataAvailable()
↓
—— 》LocalInputChannel#notifyDataAvailable()
* 注釋:
*/
inputGate.notifyChannelNonEmpty(this);
—— 》SingleInputGate.notifyChannelNonEmpty()
* 注釋: 某個 channel 有可寫入數(shù)了,該干活了翘狱。
*/
queueChannel(checkNotNull(channel));
—— 》SingleInputGate.queueChannel()
▼
注釋: 加入隊(duì)列中
* 既然將 有數(shù)據(jù)可用的channel 加入到 inputChannelsWithData秘案,
* 那就證明,一定有其他的什么角色來從這個隊(duì)列中獲取 可用的channel 來消費(fèi)數(shù)據(jù)
*/
inputChannelsWithData.add(channel);注釋: 發(fā)送信號潦匈!
*/
if(availableChannels == 0) {
// TODO_MA 注釋: 如果之前隊(duì)列中沒有channel
//阱高,這個channel加入后,通知等待的線程
inputChannelsWithData.notifyAll(); 2:50
以上線程會接到 steamTask啟動時wait的 線程傳到這個方法*
—— 》SingleInputGate.getChannel()
——主數(shù)據(jù)處理執(zhí)行這個 》 StreamTaskNetworkInput.emitNext()
—— 輸出》 OneInputStreamTask#StreamTaskNetworkOutput.emitRecord()