Flink數(shù)據(jù)交換源碼及其原理
前言
對(duì)于里面內(nèi)容我們需要一定的netty只是,在涉及網(wǎng)絡(luò)交互的時(shí)候是基于netty的,所以不了解netty可能讓我們難以理解其中邏輯
對(duì)于一些相關(guān)組件我們提前介紹一下,后面會(huì)常提到
ResultPartition: 算子的數(shù)據(jù)會(huì)寫入ResultPartition,包含多個(gè)ResultSubPartition
ResultSubPartition : 每個(gè)task消費(fèi)一個(gè)ResultSubPartition,
task不會(huì)只消費(fèi)一個(gè)ResultSubPartition
InputGate : 對(duì)數(shù)據(jù)輸入的封裝,Gate(翻譯 : 門)就很容易理解,其實(shí)InputGate主要還是inputChannel的封裝,InputGate中包含多個(gè)InputChannel
InputChannel : 實(shí)際工作的內(nèi)容,分為本地和遠(yuǎn)程,每一個(gè)InputChannel接收一個(gè)ResultSubPartition輸出
Task構(gòu)建簡(jiǎn)單流程
我們從構(gòu)建出來(lái)Task之后開始讀取數(shù)據(jù)發(fā)送數(shù)據(jù)的流程,今天聊一聊Task是如何傳遞數(shù)據(jù),這里是討論數(shù)據(jù)交互前所需要的組件構(gòu)建的過(guò)程以及交互過(guò)程
在chain中直接調(diào)用下游算子的processElement方法即可,如果是taskManger和跨網(wǎng)絡(luò)中,會(huì)對(duì)數(shù)據(jù)進(jìn)行序列化以及反序列寫入到buffer(buffer包含一個(gè)MemorySegment)中,會(huì)通過(guò)bufferBuilder來(lái)講數(shù)據(jù)寫入到MemorySegment中,與BufferBuilder想對(duì)應(yīng)的時(shí)候BufferConsumer位于下游task,負(fù)責(zé)讀取MemorySegment的數(shù)據(jù),一個(gè)bufferBuilder對(duì)應(yīng)一個(gè)BufferConsumer
// 這是flink調(diào)度任務(wù)的一個(gè)具體執(zhí)行類,繼承runnable,表示一個(gè)task由一個(gè)線程執(zhí)行
public class Task implements
Runnable,TaskSlotPayload,TaskActions,
PartitionProducerStateProvider,
CheckpointListener,BackPressureSampleableTask{ }
// Task.run,可以看到直接調(diào)用doRun方法,doRun方法也是主要邏輯存在的地方
public void run() {
try {
doRun();
} finally {
terminationFuture.complete(executionState);
}
}
// 這里主要保留了 最主要的幾行代碼,其他的都刪掉了,這里通過(guò)invokable調(diào)用invoke方法來(lái)真正的啟動(dòng)任務(wù)
// AbstractInvokable 一些實(shí)現(xiàn)類 StreamTask DataSrouceTask等.
// 我們通過(guò)api的方式source生成SourceStreamTask,其他算子根據(jù)不同的輸入生成不同的StreamTask
private void doRun() {
AbstractInvokable invokable = null;
// 構(gòu)建AbstractInvokable,實(shí)際上就是StreamTask,當(dāng)然我說(shuō)的是我這里的栗子,不同的模式可能會(huì)創(chuàng)建不同的AbstractInvokable,里面通過(guò)具體nameOfInvokableClass全限定類名來(lái)反射構(gòu)建具體的AbstractInvokable對(duì)象,比如這里構(gòu)建的是OneInputStreamTask
invokable =loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(), nameOfInvokableClass, env);
this.invokable = invokable;
// 調(diào)用invoke開始執(zhí)行
invokable.invoke();
}
// 在進(jìn)入invoke的方法前,我們看看AbstractInvokable是如何構(gòu)建出來(lái)的,以O(shè)neInputStreamTask為栗子
// 這里通過(guò)反射的方式構(gòu)建了傳入的Environment
public OneInputStreamTask(Environment env) throws Exception {
// 調(diào)用父類構(gòu)造
super(env);
}
// 父類構(gòu)造
protected StreamTask(Environment env) throws Exception {
this(env, null);
}
// 層層調(diào)用在這里注意一點(diǎn)
protected StreamTask(
Environment environment,
@Nullable TimerService timerService,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
StreamTaskActionExecutor actionExecutor)
throws Exception {
this(
environment,
timerService,
uncaughtExceptionHandler,
actionExecutor,
// 創(chuàng)建了一個(gè)Mailbox對(duì)象,后面task的數(shù)據(jù)接收和發(fā)送都依賴于該對(duì)象,并將構(gòu)建當(dāng)前線程作為參數(shù)傳入
// 表示當(dāng)前線程才可以讀取mail --- 畢竟這是屬于你的郵件
new TaskMailboxImpl(Thread.currentThread()));
}
// 最終執(zhí)行到該構(gòu)造方法中
// 只標(biāo)注重點(diǎn)內(nèi)容
protected StreamTask(
Environment environment,
@Nullable TimerService timerService,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
StreamTaskActionExecutor actionExecutor,
TaskMailbox mailbox)
throws Exception {
// 調(diào)用父類構(gòu)造即AbstractInvokable
super(environment);
this.configuration = new StreamConfig(getTaskConfiguration());
// TODO 重點(diǎn) 后面需要
// private final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
// 構(gòu)建recordWriter,實(shí)際構(gòu)建的是RecordWriterDelegate,通過(guò)getRecordWriter方法獲取具體的recordWriter
// 下面介紹怎么構(gòu)建出來(lái)recordWriter的
this.recordWriter = createRecordWriterDelegate(configuration, environment);
// 處理基于mailbox之外的一些動(dòng)作,比如發(fā)送事件等動(dòng)作
this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
// TODO 重點(diǎn)
// 構(gòu)建mailbox處理器,用于處理mailbox收到的mail信息
// 參數(shù)1:MailboxDefaultAction實(shí)現(xiàn)類,由lambda表達(dá)式方式編寫,即調(diào)用當(dāng)前類的processInput方法(即OneInputStreamTask)
// 參數(shù)2:mailbox --- 處理郵箱,你首先得有一個(gè)郵箱對(duì)吧
// 參數(shù)3:處理郵箱的線程
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
this.mailboxProcessor.initMetric(environment.getMetricGroup());
// 用于執(zhí)行mail的executor,類似于java的線程池,但不醫(yī)院
this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor();
this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
// 異步處理snapshot的線程,嘿嘿嘿
this.asyncOperationsThreadPool =
Executors.newCachedThreadPool(
new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));
this.stateBackend = createStateBackend();
// task處理checkpoint的conordinator
this.subtaskCheckpointCoordinator =
new SubtaskCheckpointCoordinatorImpl(
stateBackend.createCheckpointStorage(getEnvironment().getJobID()),
getName(),
actionExecutor,
getCancelables(),
getAsyncOperationsThreadPool(),
getEnvironment(),
this,
configuration.isUnalignedCheckpointsEnabled(),
this::prepareInputSnapshot);
if (timerService == null) {
ThreadFactory timerThreadFactory =
new DispatcherThreadFactory(
TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());
this.timerService =
new SystemProcessingTimeService(this::handleTimerException, timerThreadFactory);
} else {
this.timerService = timerService;
}
// 用于處理io的線程,比如恢復(fù)state
this.channelIOExecutor =
Executors.newSingleThreadExecutor(
new ExecutorThreadFactory("channel-state-unspilling"));
injectChannelStateWriterIntoChannels();
}
// 在上面我們已經(jīng)了解了streamTask的構(gòu)造方法,現(xiàn)在我們把recordWirter構(gòu)建的過(guò)程介紹一下
// StreamTask.createRecordWriterDelegate
public static <OUT>
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>>
createRecordWriterDelegate(
StreamConfig configuration, Environment environment) {
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWrites =
// 構(gòu)建task對(duì)應(yīng)的writers,通常是1個(gè)
createRecordWriters(configuration, environment);
// 判斷writer的輸出數(shù)量,一般情況下就是1 也就是構(gòu)建的都是SingleRecordWriter
if (recordWrites.size() == 1) {
// 直接獲取對(duì)其包裝一下即可
return new SingleRecordWriter<>(recordWrites.get(0));
} else if (recordWrites.size() == 0) {
return new NonRecordWriter<>();
} else {
// 比如
// 在下面中,map生成的時(shí)候就會(huì)生成MultipleRecordWriters
// DataStreamSource<Object> source =....'
// SingleOutputStreamOperator<U> map =source.map;
// source.connect(map);
return new MultipleRecordWriters<>(recordWrites);
}
}
// 構(gòu)建task對(duì)應(yīng)的recordWriter
// StreamTask.createRecordWriters
private static <OUT>
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
StreamConfig configuration, Environment environment) {
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters =
new ArrayList<>();
// 找到operator的所有出邊
List<StreamEdge> outEdgesInOrder =
configuration.getOutEdgesInOrder(
environment.getUserCodeClassLoader().asClassLoader());
// 遍歷出邊集合,為每一個(gè)出邊構(gòu)建一個(gè)recordWriter
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge edge = outEdgesInOrder.get(i);
recordWriters.add(
// 構(gòu)建recordWriter
// 具體不看了,簡(jiǎn)單介紹一下
// 獲取task對(duì)應(yīng)的partitioner,獲取對(duì)應(yīng)的ResultPartitionWriter,然后通過(guò)構(gòu)造者模式構(gòu)建RecordWriter
// 在內(nèi)部會(huì)判斷是不是廣播的,如果是廣播的則構(gòu)建BroadcastRecordWriter,否則是ChannelSelectorRecordWriter(基本都是他)
// ChannelSelectorRecordWriter的構(gòu)造過(guò)程中最終要的兩個(gè)參數(shù)ResultPartitionWriter, ChannelSelector
createRecordWriter(
edge,
i,
environment,
environment.getTaskInfo().getTaskName(),
edge.getBufferTimeout()));
}
return recordWriters;
}
在上面我們已經(jīng)了解到了task的構(gòu)建和recordWriter的構(gòu)建了,現(xiàn)在我們要進(jìn)入invoke方法,來(lái)看看怎么執(zhí)行的了,當(dāng)invoke調(diào)用的時(shí)候,Task會(huì)做一些執(zhí)行前的準(zhǔn)備工作,然后真正的開始調(diào)用userFunction讀取數(shù)據(jù)發(fā)送數(shù)據(jù)的過(guò)程了,那么我們現(xiàn)在開始通過(guò)StreamTask來(lái)看內(nèi)部如何實(shí)現(xiàn)的
// StreamTask是抽象類
// StreamTask.invoke方法
@Override
public final void invoke() throws Exception {
try {
// 最主要的兩個(gè)方法
// 在里面會(huì)初始化 input和output,來(lái)明確數(shù)據(jù)的輸入和輸出
// 通過(guò)不同的輸入輸出來(lái)確定數(shù)據(jù)的交互方式,比如線程內(nèi),taskManager的task之間或者基于網(wǎng)絡(luò)傳輸
beforeInvoke();
// 運(yùn)行mail,開始持續(xù)的讀取數(shù)據(jù)發(fā)送下游,后面看
runMailboxLoop();
}
// 不重要代碼去掉了
protected void beforeInvoke() throws Exception {
// 重點(diǎn)
// 構(gòu)建operatorChain,我們需要深入去看, recordWriter在構(gòu)造方法是構(gòu)建的,前面已經(jīng)講解
operatorChain = new OperatorChain<>(this, recordWriter);
// 即運(yùn)行在task里的具體的operator
mainOperator = operatorChain.getMainOperator();
// 做一些task的初始化工作
actionExecutor.runThrowing(() -> {
// 讀取state的reader
SequentialChannelStateReader reader =
getEnvironment().getTaskStateManager()
.getSequentialChannelStateReader();
reader.readOutputData(getEnvironment().getAllWriters(), false);
// operator初始化,會(huì)調(diào)用的算子的open方法
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
// 讀取state數(shù)據(jù)
channelIOExecutor.execute(() -> {
try {
reader.readInputData(getEnvironment().getAllInputGates());
} catch (Exception e) {
asyncExceptionHandler.handleAsyncException(
"Unable to read channel state", e);
}
});
});
isRunning = true;
}
// beforeInvoke的最終要一點(diǎn)就是構(gòu)建的OperatorChain,我們一起深入看看operatorChain的構(gòu)造方法
// 代碼很多,由于我們只關(guān)注數(shù)據(jù)交換的內(nèi)容,其他的地方不做講解
public OperatorChain(
StreamTask<OUT, OP> containingTask,
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {
this.operatorEventDispatcher =
new OperatorEventDispatcherImpl(
containingTask.getEnvironment().getUserCodeClassLoader().asClassLoader(),
containingTask.getEnvironment().getOperatorCoordinatorEventGateway());
final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
final StreamConfig configuration = containingTask.getConfiguration();
StreamOperatorFactory<OUT> operatorFactory =
configuration.getStreamOperatorFactory(userCodeClassloader);
// we read the chained configs, and the order of record writer registrations by output name
Map<Integer, StreamConfig> chainedConfigs =
configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
// create the final output stream writers
// we iterate through all the out edges from this job vertex and create a stream output
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap =
new HashMap<>(outEdgesInOrder.size());
this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
// from here on, we need to make sure that the output writers are shut down again on failure
boolean success = false;
try {
createChainOutputs(
outEdgesInOrder,
recordWriterDelegate,
chainedConfigs,
containingTask,
streamOutputMap);
// we create the chain of operators and grab the collector that leads into the chain
List<StreamOperatorWrapper<?, ?>> allOpWrappers =
new ArrayList<>(chainedConfigs.size());
this.mainOperatorOutput =
// 重點(diǎn)
// 構(gòu)建output,即數(shù)據(jù)的輸出
createOutputCollector(
containingTask,
configuration,
chainedConfigs,
userCodeClassloader,
streamOutputMap,
allOpWrappers,
containingTask.getMailboxExecutorFactory());
if (operatorFactory != null) {
Tuple2<OP, Optional<ProcessingTimeService>> mainOperatorAndTimeService =
StreamOperatorFactoryUtil.createOperator(
operatorFactory,
containingTask,
configuration,
mainOperatorOutput,
operatorEventDispatcher);
OP mainOperator = mainOperatorAndTimeService.f0;
mainOperator
.getMetricGroup()
.gauge(
MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
mainOperatorOutput.getWatermarkGauge());
this.mainOperatorWrapper =
createOperatorWrapper(
mainOperator,
containingTask,
configuration,
mainOperatorAndTimeService.f1,
true);
// add main operator to end of chain
allOpWrappers.add(mainOperatorWrapper);
this.tailOperatorWrapper = allOpWrappers.get(0);
} else {
checkState(allOpWrappers.size() == 0);
this.mainOperatorWrapper = null;
this.tailOperatorWrapper = null;
}
this.chainedSources =
createChainedSources(
containingTask,
configuration.getInputs(userCodeClassloader),
chainedConfigs,
userCodeClassloader,
allOpWrappers);
this.numOperators = allOpWrappers.size();
firstOperatorWrapper = linkOperatorWrappers(allOpWrappers);
success = true;
} finally {
// make sure we clean up after ourselves in case of a failure after acquiring
// the first resources
if (!success) {
for (RecordWriterOutput<?> output : this.streamOutputs) {
if (output != null) {
output.close();
}
}
}
}
}
// 在這里面構(gòu)建output
// OperatorChain.createOutputCollector
private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(
StreamTask<?, ?> containingTask,
StreamConfig operatorConfig,
Map<Integer, StreamConfig> chainedConfigs,
ClassLoader userCodeClassloader,
Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
List<StreamOperatorWrapper<?, ?>> allOperatorWrappers,
MailboxExecutorFactory mailboxExecutorFactory) {
List<Tuple2<WatermarkGaugeExposingOutput<StreamRecord<T>>, StreamEdge>> allOutputs =
new ArrayList<>(4);
// 當(dāng)調(diào)用了startNewChain或進(jìn)行chain之后會(huì)進(jìn)入這里,構(gòu)建非chain的output
// 這里的需要跨網(wǎng)絡(luò)傳輸或者線程之間傳輸
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// 默認(rèn)情況下開啟chain,都會(huì)進(jìn)入到該邏輯,生成對(duì)應(yīng)的output
// 開啟chain會(huì)進(jìn)入到這個(gè)循環(huán),這個(gè)循環(huán)里構(gòu)建的output都是在一個(gè)task中,即一個(gè)線程中執(zhí)行
for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
int outputId = outputEdge.getTargetId();
StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
/*
這里簡(jiǎn)單介紹一下這個(gè)里的遞歸操作
假設(shè) : chain1 => source->map1->map2 | chain2 flatMap->sink
我們跟蹤chain1的構(gòu)建過(guò)程
當(dāng)構(gòu)建operatorChain的output時(shí)候,首先構(gòu)建source的output,由于會(huì)進(jìn)入chained的循環(huán),開始遞歸,進(jìn)入map1的遞歸,
此時(shí)map1的也會(huì)進(jìn)入chained的循環(huán),繼續(xù)遞歸,此時(shí)構(gòu)建map2的output,由于map2后面是非chained(因?yàn)槭切碌腸hain)一
起的,所以此時(shí)map2進(jìn)入進(jìn)入nonChained的循環(huán),構(gòu)建出需要網(wǎng)絡(luò)的傳輸?shù)膐utput,構(gòu)建完成map2的output,開始彈棧,對(duì)之
前入棧的chained的構(gòu)建output的方法,開始完成遞歸的構(gòu)造output
結(jié)果 :
調(diào)用 : input.processElement() input.processElement() (包裝了RecordWriterOuput)序列化發(fā)送buffer
chain1 : source(CopyingChainingOutput)->map1(CopyingChainingOutput)->map2(CountingOuput(RecordWriterOuput))
*/
WatermarkGaugeExposingOutput<StreamRecord<T>> output =
// 這個(gè)方法會(huì)遞歸的調(diào)用createOutputCollector(當(dāng)前方法)方法
// 因?yàn)樾枰獙?duì)chain在一起的每個(gè)operator創(chuàng)建一個(gè)對(duì)應(yīng)的output
// 具體構(gòu)建代碼我們就不看了,簡(jiǎn)單介紹一下
// 內(nèi)部條件判斷,是否開啟了reuse,如果開啟了則創(chuàng)建ChainingOutput,否則創(chuàng)建CopyingChainingOutput
// CopyingChainingOutput(通過(guò)StreamRecord.copy創(chuàng)建一個(gè)新的)和ChainingOutput(復(fù)用StreamRecord)
createOperatorChain(
containingTask,
chainedOpConfig,
chainedConfigs,
userCodeClassloader,
streamOutputs,
allOperatorWrappers,
outputEdge.getOutputTag(),
mailboxExecutorFactory);
allOutputs.add(new Tuple2<>(output, outputEdge));
}
// 下面進(jìn)行返回,如果上面構(gòu)建了output,基本都會(huì)進(jìn)入該邏輯,如果廣播/sink則會(huì)進(jìn)行下面的邏輯
if (allOutputs.size() == 1) {
return allOutputs.get(0).f0;
} else {
Output<StreamRecord<T>>[] asArray = new Output[allOutputs.size()];
for (int i = 0; i < allOutputs.size(); i++) {
asArray[i] = allOutputs.get(i).f0;
}
// 如果是sink asArray一定是0,所以下面的collector不會(huì)輸出任何結(jié)果,嘿嘿嘿
if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
return closer.register(new CopyingBroadcastingOutputCollector<>(asArray, this));
} else {
return closer.register(new BroadcastingOutputCollector<>(asArray, this));
}
}
}
通過(guò)上面我們已經(jīng)了解到了數(shù)據(jù)傳輸所需要的相關(guān)組件,現(xiàn)在根據(jù)不同的交互方式來(lái)觀察一下數(shù)據(jù)的走向
一. 線程內(nèi)數(shù)據(jù)傳遞
在上面我們看到了構(gòu)建output的過(guò)程,那么數(shù)據(jù)發(fā)送到下游就是通過(guò)output來(lái)完成,在上面了解都開啟operatorChain生成output是ChainingOutput,否則創(chuàng)建CopyingChainingOutput,那我們就看看不復(fù)用streamRecord的CopyingChainingOutput
// 未開啟reuse的output,這里面內(nèi)容比較簡(jiǎn)單
final class CopyingChainingOutput<T> extends ChainingOutput<T> {
private final TypeSerializer<T> serializer;
public CopyingChainingOutput(
OneInputStreamOperator<T, ?> operator,
TypeSerializer<T> serializer, // 用于序列化的
OutputTag<T> outputTag,
StreamStatusProvider streamStatusProvider) {
// 調(diào)用父類的構(gòu)造即ChainingOutput,operator復(fù)制給成員變量input
// 注意 : input對(duì)應(yīng)的就是下游算子
// 這么理解 : op1 -> output -> input -> op2 -> output
super(operator, streamStatusProvider, outputTag);
this.serializer = serializer;
}
// collector.collect方法,畢竟是子類,想想我們?cè)谒阕又?向下游發(fā)送數(shù)據(jù),比如process中也是通過(guò)out.collect方法
@Override
public void collect(StreamRecord<T> record) {
if (this.outputTag != null) {
// we are not responsible for emitting to the main output.
return;
}
// 具體的發(fā)送到下游的方法
pushToOperator(record);
}
@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
if (this.outputTag == null || !this.outputTag.equals(outputTag)) {
// we are not responsible for emitting to the side-output specified by this
// OutputTag.
return;
}
pushToOperator(record);
}
@Override
protected <X> void pushToOperator(StreamRecord<X> record) {
try {
StreamRecord<T> castRecord = (StreamRecord<T>) record;
// metric內(nèi)容
numRecordsIn.inc();
// 調(diào)用record的copy方法獲取一個(gè)新的record,里面是new了一個(gè)新的record
// 這里調(diào)用了序列化的方法,嘿嘿 后面想起來(lái)了就一起看看
StreamRecord<T> copy = castRecord.copy(serializer.copy(castRecord.getValue()));
input.setKeyContextElement(copy);
// 直接調(diào)用下游的processElement方法,所以不需要對(duì)數(shù)據(jù)進(jìn)行序列化和反序列
input.processElement(copy);
} catch (ClassCastException e) {
if (outputTag != null) {
// Enrich error message
ClassCastException replace =
new ClassCastException(
String.format("%s. Failed to push OutputTag with id '%s' to operator. "+ "This can occur when multiple OutputTags with different types "+ "but identical names are being used.",
e.getMessage(), outputTag.getId()));
throw new ExceptionInChainedOperatorException(replace);
} else {
throw new ExceptionInChainedOperatorException(e);
}
} catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
}
// map算子的包裝類,在同一個(gè)chain中,會(huì)直接調(diào)用算子的processElement方法
// StreamMap.processElement
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// 接收到record后直接調(diào)用userFunction的map方法,然后將結(jié)果發(fā)送下游
output.collect(element.replace(userFunction.map(element.getValue())));
}
可以看到對(duì)于在operatorChain中數(shù)據(jù)傳遞就是直接調(diào)用下游的processElement方法,當(dāng)位于operatorChain最后一個(gè)的operator,他會(huì)持有一個(gè)RecordWriterOutput,將數(shù)據(jù)發(fā)送到下一個(gè)task(chained/operator),對(duì)于數(shù)據(jù)交互,我們分別通過(guò)數(shù)據(jù)的輸入和數(shù)據(jù)的輸出來(lái)進(jìn)行講解
二. TaskManager內(nèi)Task數(shù)據(jù)傳遞
雖然Task都在一個(gè)JVM進(jìn)程內(nèi),但是數(shù)據(jù)在傳遞的過(guò)程中也是需要進(jìn)行序列化和反序列化
在同一個(gè)TaskManger內(nèi)Task傳遞數(shù)據(jù)主要通過(guò)一下幾個(gè)組件完成的,ResultSubPartition,InputGate,InputChannel ,BufferBuilder
流程 :
上游算子持續(xù)寫入數(shù)據(jù),將數(shù)據(jù)寫入到ResultSubPartition中,ResultSubPartition接收到數(shù)據(jù)會(huì)通知InputGate,InputGate將InputChannel加入的可用的Channel隊(duì)列中,下游算子通過(guò)channel.pollNext()讀取數(shù)據(jù)進(jìn)行消費(fèi)交給用戶的代碼執(zhí)行,對(duì)于簡(jiǎn)述的流程我們?cè)谙旅鏁?huì)看到對(duì)應(yīng)實(shí)現(xiàn)
1.數(shù)據(jù)輸出
接上上面,在chain中,一層一層調(diào)用下游的processElement方法,直到chain的最后一個(gè)operator的output是RecordWriterOutput,該operator通過(guò)該output發(fā)送到下游(當(dāng)然不是真的發(fā)送到下游,而是發(fā)送到subpartition中)
// 直接進(jìn)入源碼,看看數(shù)據(jù)到底是怎么發(fā)送的
public class RecordWriterOutput<OUT> implements WatermarkGaugeExposingOutput<StreamRecord<OUT>> {
public RecordWriterOutput(
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
TypeSerializer<OUT> outSerializer,
OutputTag outputTag,
StreamStatusProvider streamStatusProvider,
boolean supportsUnalignedCheckpoints) {
// 真正干活的
// flink在stream中數(shù)據(jù)傳遞的抽象通過(guò)record表示,該writer用于將數(shù)據(jù)序列化并寫入的subPartitin中
this.recordWriter =(RecordWriter<SerializationDelegate<StreamElement>>) (RecordWriter<?>) recordWriter;
// 數(shù)據(jù)序列化的包裝器,里面包裝了數(shù)據(jù)類型的序列化器
TypeSerializer<StreamElement> outRecordSerializer = new StreamElementSerializer<>(outSerializer);
if (outSerializer != null) {
serializationDelegate = new SerializationDelegate<StreamElement>(outRecordSerializer);
}
// 忽略非重點(diǎn)....
}
@Override
public void collect(StreamRecord<OUT> record) {
if (this.outputTag != null) {
return;
}
// 真正干活的方法
pushToRecordWriter(record);
}
private <X> void pushToRecordWriter(StreamRecord<X> record) {
// 將數(shù)據(jù)設(shè)置到序列化包裝器的instance中
serializationDelegate.setInstance(record);
try {
// 通過(guò)recordWriter將包裝器發(fā)送
// recordWriter的實(shí)現(xiàn)類是ChannelSelectorRecordWriter
recordWriter.emit(serializationDelegate);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
// 一些事件相關(guān)的發(fā)送邏輯此處省略............
}
// --------------------------------------------------------------------------------------------
// 簡(jiǎn)單先了解一下ChannelSelector,主要返回一個(gè)int值,該值代表的是數(shù)據(jù)發(fā)送下游的具體channel的索引
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
// 一個(gè)輪訓(xùn)的算法
nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
return nextChannelToSendTo;
}
}
// 通過(guò)上面的邏輯我們可以看到,collect方法最終調(diào)用了recordWriter.emit()方法進(jìn)行發(fā)送數(shù)據(jù),那么我們進(jìn)入具體的實(shí)現(xiàn)類看看細(xì)節(jié)
// 顧名思義,通過(guò)名字我們大致可以猜到,對(duì)于record的發(fā)送,我們需要選擇發(fā)送到對(duì)應(yīng)的channel中
public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
@Override
public void emit(T record) throws IOException {
// 方法負(fù)載,調(diào)用父類emit方法
// 通過(guò)selectorChannel方法計(jì)算要發(fā)送到的具體channel的索引.
// 這其實(shí)就是數(shù)據(jù)的分發(fā)策略了,比如shuffle,keyby等,表示數(shù)據(jù)的發(fā)送策略,發(fā)送哪個(gè)channel
emit(record, channelSelector.selectChannel(record));
}
// RecordWriter.emit()
protected void emit(T record, int targetSubpartition) throws IOException {
// targetPartition = PipelinedResultPartition
// serializeRecord方法傳入序列化器和record,將record使用指定的序列化器進(jìn)行序列化會(huì)返回一個(gè)java的ByteBuffer
// 通過(guò)emitRecord方法發(fā)送數(shù)據(jù)
targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);
// 數(shù)據(jù)傳輸中,肯定是一次發(fā)送批量性能會(huì)更高,所以在flink中也會(huì)以這種形式發(fā)送數(shù)據(jù)
// 在父類中RecordWriter有個(gè)內(nèi)部類(OutputFlusher),繼承Thread并重寫run方法,在父類構(gòu)造方法中構(gòu)建此對(duì)象,
// 如果傳入的timeout>0則創(chuàng)建OutputFlusher對(duì)象否則不創(chuàng)建直接走下面判斷邏輯,在run方法中會(huì)循環(huán)調(diào)用
// Thread.sleep(timeout)進(jìn)行睡眠,當(dāng)睡眠結(jié)束后調(diào)用flushAll()
// flushAlways=(timeout==0)的時(shí)候,每接收一條數(shù)據(jù)發(fā)送一條數(shù)據(jù)
// timeout時(shí)間配置 DataStream.setBufferTimeout();
if (flushAlways) {
targetPartition.flush(targetSubpartition);
}
}
}
// 進(jìn)入 BufferWritingResultPartition.emitRecord()
// 這個(gè)用于將數(shù)據(jù)放入buffer中在resultPartitin中,有一個(gè)localBufferPool,緩存這可用的buffer,
// 當(dāng)buffer不可用的時(shí)候請(qǐng)求全局的networkBufferPool來(lái)申請(qǐng)可用的buf
@Override
public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
// 通過(guò)appendUnicastDataForNewRecord方法申請(qǐng)buffer,具體代碼不看了,有興趣自己研究一下
// 簡(jiǎn)單介紹一下
// 內(nèi)部通過(guò)LocalBuuferPool來(lái)申請(qǐng)buffer,在申請(qǐng)buffer時(shí)候如果local的用完,則申請(qǐng)network的buffer
// 如果network的buffer申請(qǐng)超過(guò)限制或者network沒(méi)有可用buffer則會(huì)阻塞等待,直到有可用buffer,聯(lián)想一下`反壓`
// 當(dāng)申請(qǐng)到buffer之后會(huì)構(gòu)建一個(gè)BufferBuilder,然后調(diào)用對(duì)應(yīng)的targetSubpartition的add方法傳入一個(gè)
// BufferConsumer(通過(guò)buffer內(nèi)部方法構(gòu)建的),在add方法中會(huì)調(diào)用notifyDataAvailable()方法,最終會(huì)調(diào)用
// 到channel的notifyDataAvailable()方法開始觸發(fā)消費(fèi)
// 上面走完后,將record寫入的buffer中
// 該方法我們?cè)跀?shù)據(jù)輸入的時(shí)候進(jìn)行講解
BufferBuilder buffer = appendUnicastDataForNewRecord(record, targetSubpartition);
// 對(duì)于上面的數(shù)據(jù)寫入到buffer后,可能會(huì)出現(xiàn)下面三種情況,每種情況對(duì)應(yīng)不同的處理方式
// buffer滿了,但是record之存入了一部分
while (record.hasRemaining()) {
// full buffer, partial record
// 將buffer進(jìn)行finish,表示buffer不可在寫入
finishUnicastBufferBuilder(targetSubpartition);
// 重親申請(qǐng)一個(gè)buffer,寫入剩余的數(shù)據(jù),直到退出循環(huán)
buffer = appendUnicastDataForRecordContinuation(record, targetSubpartition);
}
// buffer滿了,record也全部存入了
if (buffer.isFull()) {
// full buffer, full record
// 將buffer進(jìn)行finish,表示buffer不可在寫入
finishUnicastBufferBuilder(targetSubpartition);
}
// buffer沒(méi)滿,但record全部存入了
// 什么也不做,只要record是完整的存入就可以了
// partial buffer, full record
}
// 到此record已經(jīng)寫入到buffer,等待下游去消費(fèi)數(shù)據(jù)了
2.數(shù)據(jù)輸入
// 對(duì)于上面,我們已經(jīng)了解到數(shù)據(jù)是如何寫入到buffer(`在SubPartition中`)中的了,
// 那么我們接著上面是如何通知下游去觸發(fā)消費(fèi)的
// 該方法在emitRecord()調(diào)用
// BufferWritingResultPartition.appendUnicastDataForNewRecord()
private BufferBuilder appendUnicastDataForNewRecord(
final ByteBuffer record, final int targetSubpartition) throws IOException {
// 獲取bufferBuilder,當(dāng)數(shù)據(jù)寫完之后沒(méi)有寫滿buffer
BufferBuilder buffer = unicastBufferBuilders[targetSubpartition];
// 如果等于null則說(shuō)明已經(jīng)寫滿發(fā)送了,或者剛初始化的時(shí)候
if (buffer == null) {
buffer = requestNewUnicastBufferBuilder(targetSubpartition);
// 通知數(shù)據(jù)可用的方法在add中被調(diào)用
// 添加buffer消費(fèi)者, 構(gòu)建一個(gè)buffer消費(fèi)者, 0 表示要跳過(guò)的數(shù)據(jù)長(zhǎng)度
subpartitions[targetSubpartition].add(buffer.createBufferConsumerFromBeginning(), 0);
}
// 追加的形式寫入數(shù)據(jù)
buffer.appendAndCommit(record);
return buffer;
}
// PipelinedSubpartition.add()
@Override
public boolean add(BufferConsumer bufferConsumer, int partialRecordLength) {
return add(bufferConsumer, partialRecordLength, false);
}
// 繼續(xù)走
private boolean add(BufferConsumer bufferConsumer, int partialRecordLength, boolean finish) {
checkNotNull(bufferConsumer);
synchronized (buffers) {
// 構(gòu)建一個(gè)BufferConsumerWithPartialRecordLength添加到buffers中,在后面會(huì)用到
if (addBuffer(bufferConsumer, partialRecordLength)) {
prioritySequenceNumber = sequenceNumber;
}
}
// 其他內(nèi)容省略,不是終點(diǎn),通過(guò)notifyDataAvailable()方法,來(lái)通知數(shù)據(jù)可用,可以被消費(fèi)了
if (notifyDataAvailable) {
notifyDataAvailable();
}
return true;
}
// 下面就是一長(zhǎng)串的調(diào)用
private void notifyDataAvailable() {
final PipelinedSubpartitionView readView = this.readView;
if (readView != null) {
readView.notifyDataAvailable();
}
}
// PipelinedSubpartitionView
@Override
public void notifyDataAvailable() {
availabilityListener.notifyDataAvailable();
}
// LocalInputChannel
@Override
public void notifyDataAvailable() {
notifyChannelNonEmpty();
}
// InputChanne
protected void notifyChannelNonEmpty() {
// 傳入了this,表示該channel有數(shù)據(jù)了,可以被消費(fèi)
inputGate.notifyChannelNonEmpty(this);
}
// SingleInputChanne
void notifyChannelNonEmpty(InputChannel channel) {
queueChannel(checkNotNull(channel), null);
}
// 最終會(huì)調(diào)用SingleInputGate.queueChannel
// 代碼走到這里在后面都是有OutputFlusher線程去執(zhí)行到的(不是絕對(duì)的),
// 如果不記得了可以看看上面有RecordWriter內(nèi)部的flush線程做的
// 當(dāng)代碼走完了,說(shuō)明有數(shù)據(jù)的channel已經(jīng)被放入隊(duì)列中,執(zhí)行當(dāng)前task的線程會(huì)開始
// 讀取數(shù)據(jù)通過(guò)pollnext方法
private void queueChannel(InputChannel channel, @Nullable Integer prioritySequenceNumber) {
try (GateNotificationHelper notification =
new GateNotificationHelper(this, inputChannelsWithData)) {
synchronized (inputChannelsWithData) {
boolean priority = prioritySequenceNumber != null;
if (priority
&& isOutdated(
prioritySequenceNumber,
lastPrioritySequenceNumber[channel.getChannelIndex()])) {
// priority event at the given offset already polled (notification is not atomic
// in respect to
// buffer enqueuing), so just ignore the notification
return;
}
// 嘗試將channel放入inputChannelsWithData隊(duì)列
// 如果放入隊(duì)列,
if (!queueChannelUnsafe(channel, priority)) {
return;
}
if (priority && inputChannelsWithData.getNumPriorityElements() == 1) {
notification.notifyPriority();
}
// 如果隊(duì)列有channel可用,調(diào)用該方法,方法內(nèi)部會(huì)調(diào)用inputChannelsWithData.notifyAll喚醒wait的線程
// inputChannelsWithData 里面存的是可用的channel,即該channel有數(shù)據(jù)
if (inputChannelsWithData.size() == 1) {
notification.notifyDataAvailable();
}
}
}
}
// 上面有數(shù)據(jù)的channel已經(jīng)被加入到隊(duì)列了,對(duì)于當(dāng)前task需要從隊(duì)列拉取到channel來(lái)消費(fèi)數(shù)據(jù)即可
// 在上面構(gòu)建task的時(shí)候我們構(gòu)建了郵箱(mailbox)以及相關(guān)的處理程序
// 那么task是怎么運(yùn)行起來(lái)的呢,我們聊一下
// 在invokeable調(diào)用invoke()后,我們?cè)诶锩嫦日{(diào)用了beforeInvoke構(gòu)建operatorChain調(diào)用open等初始化的邏輯
// 然后調(diào)用runMailboxLoop()方法真的開始執(zhí)行task
// 我們直接進(jìn)入該方法看看
public void runMailboxLoop() throws Exception {
// 循環(huán)mailbox處理器
mailboxProcessor.runMailboxLoop();
}
//MailboxProcessor.runMailboxLoop()
public void runMailboxLoop() throws Exception {
final TaskMailbox localMailbox = mailbox;
Preconditions.checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");
assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";
final MailboxController defaultActionContext = new MailboxController(this);
// 基于郵箱的線程模型消費(fèi)數(shù)據(jù),這里不展開
while (isMailboxLoopRunning()) {
// default action 可用之前會(huì)阻塞
processMail(localMailbox, false);
if (isMailboxLoopRunning()) {
// 執(zhí)行默認(rèn)的action,里面執(zhí)行的任務(wù),在構(gòu)建task的時(shí)候,構(gòu)建的通過(guò)lambda表達(dá)式傳入了 this::processInput
// 執(zhí)行StreamTask.processInput()方法
mailboxDefaultAction.runDefaultAction(defaultActionContext);
}
}
}
// StreamTask.processInput()
protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
// 調(diào)用輸入處理器的processInput方法進(jìn)行數(shù)據(jù)的讀取操作,inputPorcessor在初始化的時(shí)候調(diào)用構(gòu)造的
// 這里進(jìn)入方法里
InputStatus status = inputProcessor.processInput();
if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
return;
}
if (status == InputStatus.END_OF_INPUT) {
controller.allActionsCompleted();
return;
}
CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
assertNoException(jointFuture.thenRun(suspendedDefaultAction::resume));
}
//StreamOneInputProcessor.processInput()
public InputStatus processInput() throws Exception {
// 這里又是 input和output,可能不太好理解, 可以想一想flink的物理執(zhí)行圖的樣子
// input = inputGate 這里表示數(shù)據(jù)的輸入,上游算子將數(shù)據(jù)發(fā)送到subPartition,inputGate的inputChannel消費(fèi)其數(shù)據(jù)
// 數(shù)據(jù)是序列化后的buffer
// output = 用于將數(shù)據(jù)拿到反序列化后的record,發(fā)送給operator,output持有operator的引用
// 數(shù)據(jù)流向
// op1(map) --> subPartition --> inputGate --> output --> op2(flatMap) --> subPartition
// 將input的數(shù)據(jù)通過(guò)output傳遞
InputStatus status = input.emitNext(output);
if (status == InputStatus.END_OF_INPUT) {
endOfInputAware.endInput(input.getInputIndex() + 1);
}
return status;
}
// StreamTaskNetworkInput.emitNext()
public InputStatus emitNext(DataOutput<T> output) throws Exception {
// 可以看到這里是一個(gè)循環(huán),正常的數(shù)據(jù)讀取會(huì)循環(huán)兩次
// 第一次進(jìn)入processBuffer方法,currentRecordDeserializer對(duì)其賦值
// 第二次將reocrd反序列化進(jìn)行發(fā)送下游
while (true) {
// get the stream element from the deserializer
//
if (currentRecordDeserializer != null) {
DeserializationResult result;
try {
// 將數(shù)據(jù) 序列化到deserializationDelegate
// deserializationDelegate里面會(huì)通過(guò)對(duì)buf的數(shù)據(jù)反序列化,復(fù)制到instance變量
result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
} catch (IOException e) {
throw new IOException(
String.format("Can't get next record for channel %s", lastChannel), e);
}
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
// 如果結(jié)果是一個(gè)完成的record,那么直接發(fā)送下游不需要在等新的buffer了
if (result.isFullRecord()) {
// 想下游發(fā)送數(shù)據(jù)
// 實(shí)際就是 output.emitRecord(reocrd) -> operator.processElement 假設(shè) : op = streamMap
// 到這里就完成了數(shù)據(jù)的發(fā)送
processElement(deserializationDelegate.getInstance(), output);
return InputStatus.MORE_AVAILABLE;
}
}
// 從inputGate中獲取buffer,
Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
if (bufferOrEvent.isPresent()) {
// return to the mailbox after receiving a checkpoint barrier to avoid processing of
// data after the barrier before checkpoint is performed for unaligned checkpoint
// mode
if (bufferOrEvent.get().isBuffer()) {
// 處理record的buf
// 并獲取對(duì)應(yīng)的序列化器賦值給currentRecordDeserializer
processBuffer(bufferOrEvent.get());
} else {
// 處理事件的buf
processEvent(bufferOrEvent.get());
return InputStatus.MORE_AVAILABLE;
}
} else {
if (checkpointedInputGate.isFinished()) {
checkState(
checkpointedInputGate.getAvailableFuture().isDone(),
"Finished BarrierHandler should be available");
return InputStatus.END_OF_INPUT;
}
return InputStatus.NOTHING_AVAILABLE;
}
}
}
//CheckpointedInputGate.pollNext()
//CheckpointedInputGate[InputGateWithMetrics[SingleInputGate]],想想裝飾者模式,每次封裝都是對(duì)其功能的增強(qiáng)
// 我們需要看buffer是怎么拿到的
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
// 我們直接進(jìn)入SingleInputGate.pollNext方法
Optional<BufferOrEvent> next = inputGate.pollNext();
// 省略無(wú)用代碼
return next;
}
//SingleInputGate.pollNext()
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
// false表示是否要阻塞
return getNextBufferOrEvent(false);
}
private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking)
throws IOException, InterruptedException {
// 省略無(wú)用代碼
Optional<InputWithData<InputChannel, BufferAndAvailability>> next =
// 主要獲取buf數(shù)據(jù)的方法
waitAndGetNextData(blocking);
InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
return Optional.of( transformToBufferOrEvent( // 構(gòu)建BufferOrEvent對(duì)象
inputWithData.data.buffer(),
inputWithData.moreAvailable,
inputWithData.input,
inputWithData.morePriorityEvents));
}
// 如果在傳入blocking==true,如果沒(méi)有可用channel的情況下則進(jìn)入wait
// 這里buffer獲取到了,然后就是一層層的封裝過(guò)程,返回到最后就是一個(gè)BufferOrEvent
// 至此 buffer獲取到,開始數(shù)據(jù)的解析,反序列化的過(guò)程
private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(
boolean blocking) throws IOException, InterruptedException {
while (true) {
synchronized (inputChannelsWithData) {
// 從inputChannelsWithData隊(duì)列獲取可用channel,該隊(duì)列前面提到過(guò),如果blocking=true
// 在沒(méi)有可用的channel的時(shí)候會(huì)進(jìn)入wait狀態(tài)讓出lock
Optional<InputChannel> inputChannelOpt = getChannel(blocking);
if (!inputChannelOpt.isPresent()) {
return Optional.empty();
}
final InputChannel inputChannel = inputChannelOpt.get();
// inputChannel = LocalInputChannel
// LocalInputChannel持有subpartitionView引用.可以直接從subpartitionView的buffers獲取buf
// 在上面addBuffer中添加的
// 在getNextBuffer中重新包裝buffer,包裝為BufferAndAvailability對(duì)象
Optional<BufferAndAvailability> bufferAndAvailabilityOpt =
inputChannel.getNextBuffer();
if (!bufferAndAvailabilityOpt.isPresent()) {
checkUnavailability();
continue;
}
final BufferAndAvailability bufferAndAvailability = bufferAndAvailabilityOpt.get();
if (bufferAndAvailability.moreAvailable()) {
// enqueue the inputChannel at the end to avoid starvation
queueChannelUnsafe(inputChannel, bufferAndAvailability.morePriorityEvents());
}
final boolean morePriorityEvents =
inputChannelsWithData.getNumPriorityElements() > 0;
if (bufferAndAvailability.hasPriority()) {
lastPrioritySequenceNumber[inputChannel.getChannelIndex()] =
bufferAndAvailability.getSequenceNumber();
if (!morePriorityEvents) {
priorityAvailabilityHelper.resetUnavailable();
}
}
checkUnavailability();
return Optional.of(
new InputWithData<>(
inputChannel,
bufferAndAvailability,
!inputChannelsWithData.isEmpty(),
morePriorityEvents));
}
}
}
在上面StreamTaskNetworkInput.emitNext()方法中最后通過(guò)processElement方法,將數(shù)據(jù)傳入當(dāng)前的operator,然后開始調(diào)用算子處理數(shù)據(jù)
三. TaskManager和TaskManager傳遞數(shù)據(jù)
TaskManagerRunner.runTaskManager()中會(huì)構(gòu)建TaskManagerRunner對(duì)象,該對(duì)象的構(gòu)建過(guò)程很復(fù)雜,一層一層封裝,最終調(diào)用start方法啟動(dòng)tm,在構(gòu)造taskManager的時(shí)候,會(huì)構(gòu)建ShuffleEnvironment<NettyShuffleEnvironment>
,在ShuffleEnvironment持有一個(gè)ConnectionManager,該ConnectionManager會(huì)持有netty的client和server,當(dāng)需要網(wǎng)絡(luò)請(qǐng)求的時(shí)候會(huì)通過(guò)ConnectionManager去進(jìn)行網(wǎng)絡(luò)連接做請(qǐng)求
// ----------------- 由于代碼比較邏輯比較多,這里直接截取重點(diǎn)地方 ----------------------------
// 說(shuō)一下創(chuàng)建流程
TaskManagerRunner.runTaskManager() // createTaskExecutorService 構(gòu)造方法調(diào)用createTaskExecutorService()
TaskManagerRunner. createTaskExecutorService() // 構(gòu)建taskExecutor
TaskManagerRunner.startTaskManager() // 啟動(dòng)TaskManagerService
TaskManagerServices.fromConfiguration() // 構(gòu)建tm的內(nèi)部組件
TaskManagerServices.createShuffleEnvironment //構(gòu)建ShuffleEnvironment并調(diào)用start方法 -- 內(nèi)部構(gòu)建connectionManager
NettyShuffleEnvironment.start() // 最終會(huì)調(diào)用connectionManager.start方法,初始化netty的clinet和server
// 我們看看NettyShuffleEnvironment的構(gòu)建已經(jīng)connectionManager的構(gòu)建過(guò)程中
static NettyShuffleEnvironment createNettyShuffleEnvironment(
NettyShuffleEnvironmentConfiguration config,
ResourceID taskExecutorResourceId,
TaskEventPublisher taskEventPublisher,
ResultPartitionManager resultPartitionManager,
MetricGroup metricGroup,
Executor ioExecutor) {
checkNotNull(config);
checkNotNull(taskExecutorResourceId);
checkNotNull(taskEventPublisher);
checkNotNull(resultPartitionManager);
checkNotNull(metricGroup);
NettyConfig nettyConfig = config.nettyConfig();
FileChannelManager fileChannelManager =
new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
// 如果本地測(cè)試構(gòu)建的是LocalConnectionManger,當(dāng)然如果一個(gè)tm就已經(jīng)把任務(wù)啟動(dòng)起來(lái)了,
// 那么也是LocalConnectionManger,否則都是基于netty的
// 這里我們稍后看內(nèi)容
ConnectionManager connectionManager =
nettyConfig != null? new NettyConnectionManager(resultPartitionManager, taskEventPublisher, nettyConfig)
: new LocalConnectionManager();
// 全局的buffer,是基于netty的
// 回想一下,每個(gè)task有自己的localBufferPool,當(dāng)buffer不夠了,就回去請(qǐng)求netty申請(qǐng)buffer
// 是的,沒(méi)錯(cuò),請(qǐng)求的就是這個(gè)pool
NetworkBufferPool networkBufferPool =
new NetworkBufferPool(
config.numNetworkBuffers(),
config.networkBufferSize(),
config.getRequestSegmentsTimeout());
registerShuffleMetrics(metricGroup, networkBufferPool);
ResultPartitionFactory resultPartitionFactory =
new ResultPartitionFactory(
resultPartitionManager,
fileChannelManager,
networkBufferPool,
config.getBlockingSubpartitionType(),
config.networkBuffersPerChannel(),
config.floatingNetworkBuffersPerGate(),
config.networkBufferSize(),
config.isBlockingShuffleCompressionEnabled(),
config.getCompressionCodec(),
config.getMaxBuffersPerChannel(),
config.sortShuffleMinBuffers(),
config.sortShuffleMinParallelism(),
config.isSSLEnabled());
// 這里可以看到是一個(gè)SingleInputGate的工廠,為什么這么做呢,因?yàn)閠ask在初始化的時(shí)候,需要針對(duì)該task每個(gè)要消費(fèi)的分區(qū)構(gòu)建
// 對(duì)應(yīng)的inputGate,通過(guò)工廠可以簡(jiǎn)化構(gòu)建過(guò)程
SingleInputGateFactory singleInputGateFactory =
new SingleInputGateFactory(
taskExecutorResourceId,
config,
connectionManager,
resultPartitionManager,
taskEventPublisher,
networkBufferPool);
return new NettyShuffleEnvironment(taskExecutorResourceId,config,networkBufferPool,connectionManager,
resultPartitionManager, fileChannelManager,resultPartitionFactory,singleInputGateFactory,ioExecutor);
}
上面我們看到了如果構(gòu)建出NettyShuffleEnvironment,在NettyConnectionManager中管理netty的client和server,作為TaskManager的tcp連接,所有Task共用
// 管理netty的client和server
public class NettyConnectionManager implements ConnectionManager {
public NettyConnectionManager(
ResultPartitionProvider partitionProvider,
TaskEventPublisher taskEventPublisher,
NettyConfig nettyConfig) {
// server : 用于其他client連接自己的 clinet : 用于連接其他server
// server和client就是對(duì)netty代碼的一層簡(jiǎn)單封裝,內(nèi)部就是構(gòu)建bootstrap然后配置一些option和handler等
this.server = new NettyServer(nettyConfig);
this.client = new NettyClient(nettyConfig);
this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());
this.partitionRequestClientFactory =
new PartitionRequestClientFactory(client, nettyConfig.getNetworkRetries());
// 內(nèi)部提供了兩個(gè)方法,分別是提供server和client的handlers方法
this.nettyProtocol =
new NettyProtocol(
checkNotNull(partitionProvider), checkNotNull(taskEventPublisher));
}
@Override
public int start() throws IOException {
// 初始化clinet 和 server
client.init(nettyProtocol, bufferPool);
return server.init(nettyProtocol, bufferPool);
}
// 請(qǐng)求partition,并為其創(chuàng)建一個(gè)client
// 里面主要包含的兩個(gè)內(nèi)容
// CreditBasedPartitionRequestClientHandler channelHandler
// Channel tcpChannel task用于發(fā)送tcp請(qǐng)求
@Override
public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
throws IOException, InterruptedException {
return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
}
// 省略其他代碼 ......
}
上面我們看到TaskManager已經(jīng)構(gòu)建出了network相關(guān)的所有內(nèi)容了,那么數(shù)據(jù)交換的流程是什么樣子的呢
其實(shí)和TaskManager內(nèi)的task之間的交互流程很相似,對(duì)于上面TaskManager內(nèi)數(shù)據(jù)傳輸?shù)幕A(chǔ)上加了四個(gè)組件分別是PartitonRequestQueue,PartitionRequestServerHandler(server端的handler),PartitionRequestClient,CreditBasedClientHandler,四個(gè)組件
流程 :
上游算子寫入數(shù)據(jù)到subPartiton中,有PartitonRequestQueue讀取buffer,將buffer寫入netty的channel,在netty出站之前會(huì)被netty的handler進(jìn)行處理,即PartitionRequestServerHandler,在PartitionRequestClient讀取到server發(fā)來(lái)的數(shù)據(jù),通過(guò)CreditBasedPartitionRequestClientHandler(client端handler)的處理寫入InputChannel
注意 : Flink數(shù)據(jù)交互是基于生產(chǎn)者消費(fèi)者模型的
// 對(duì)于通過(guò)網(wǎng)絡(luò)傳輸數(shù)據(jù)來(lái)說(shuō),數(shù)據(jù)寫入到buffer與tm內(nèi)傳輸是一致的都是調(diào)用addBuffer方法將生成的buffer加入的buffers隊(duì)列中,其中不同的區(qū)別在于消費(fèi)數(shù)據(jù)的對(duì)象有一些變更,在后面我們會(huì)看到
// 由于其中涉及網(wǎng)絡(luò)通信可能看起來(lái)比較亂,如果我們把思路捋清楚之后其實(shí)就會(huì)變得很簡(jiǎn)單
// 網(wǎng)絡(luò)傳輸是基于netty的所以下面內(nèi)容需要一定的netty知識(shí),如果netty不太了解,建議先了解一下netty
// 繼承netty的入棧處理器,具體調(diào)用時(shí)機(jī)可以參考netty
class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMessage> {
// 其他方法忽略掉
// 當(dāng)有數(shù)據(jù)進(jìn)入的時(shí)候入棧處理器的該方法被調(diào)用
// 請(qǐng)求時(shí)機(jī),下游的PartitionRequestClient發(fā)起請(qǐng)求,當(dāng)前task接收到對(duì)應(yīng)的請(qǐng)求后,會(huì)調(diào)用該方法進(jìn)行響應(yīng)的處理
// 要注意在進(jìn)入該handler之前會(huì)先進(jìn)入decodeHandler對(duì)數(shù)據(jù)進(jìn)行解碼,解碼出來(lái)的對(duì)象就是NettyMessage對(duì)象
protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
try {
Class<?> msgClazz = msg.getClass();
// 根據(jù)不同的請(qǐng)求做出不同的處理,我們只關(guān)注ResumeConsumption這個(gè)事件即可
if (msgClazz == PartitionRequest.class) {
PartitionRequest request = (PartitionRequest) msg;
try {
NetworkSequenceViewReader reader;
reader =
new CreditBasedSequenceNumberingViewReader(
request.receiverId, request.credit, outboundQueue);
reader.requestSubpartitionView(
partitionProvider, request.partitionId, request.queueIndex);
outboundQueue.notifyReaderCreated(reader);
} catch (PartitionNotFoundException notFound) {
respondWithError(ctx, notFound, request.receiverId);
}
}else if (msgClazz == TaskEventRequest.class) {
TaskEventRequest request = (TaskEventRequest) msg;
if (!taskEventPublisher.publish(request.partitionId, request.event)) {
respondWithError(
ctx,
new IllegalArgumentException("Task event receiver not found."),
request.receiverId);
}
} else if (msgClazz == CancelPartitionRequest.class) {
CancelPartitionRequest request = (CancelPartitionRequest) msg;
outboundQueue.cancel(request.receiverId);
} else if (msgClazz == CloseRequest.class) {
outboundQueue.close();
}
// 收到client發(fā)來(lái)的credit,flink的流控機(jī)制是基于credit
// 收到credit表示這我一次性要發(fā)送多少數(shù)據(jù),如果對(duì)方credit過(guò)低,則只會(huì)
// 發(fā)送對(duì)應(yīng)credit的數(shù)據(jù),不然會(huì)導(dǎo)致整的tcp連接反壓
// credit 表示 下游消費(fèi)者可用的buffer數(shù)量
else if (msgClazz == AddCredit.class) {
AddCredit request = (AddCredit) msg;
// 添加credit或者resume之后,消費(fèi)者可以開始消費(fèi)數(shù)據(jù),即有請(qǐng)求我們基于響應(yīng)
// resume是在checkpoint barrier對(duì)齊階段的,當(dāng)前barrier之后的數(shù)據(jù)不可以被消費(fèi)
outboundQueue.addCreditOrResumeConsumption(
request.receiverId,
// 將當(dāng)前的credit與歷史的credit累加 lambda表達(dá)式
reader -> reader.addCredit(request.credit));
} else if (msgClazz == ResumeConsumption.class) {
ResumeConsumption request = (ResumeConsumption) msg;
outboundQueue.addCreditOrResumeConsumption(
request.receiverId, NetworkSequenceViewReader::resumeConsumption);
} else {
LOG.warn("Received unexpected client request: {}", msg);
}
} catch (Throwable t) {
respondWithError(ctx, t);
}
}
}
//PartitonRequestQueue.addCreditOrResumeConsumption()
// 上面是我們netty收到credit請(qǐng)求,那我們需要給消費(fèi)者響應(yīng)數(shù)據(jù)
void addCreditOrResumeConsumption(
InputChannelID receiverId, Consumer<NetworkSequenceViewReader> operation)
throws Exception {
if (fatalError) {
return;
}
// 通過(guò)對(duì)應(yīng)的recevierId獲取對(duì)應(yīng)的reader讀取數(shù)據(jù)
// recevierId記錄了發(fā)起請(qǐng)求或者需要接收的inputChannel編號(hào)
NetworkSequenceViewReader reader = allReaders.get(receiverId);
if (reader != null) {
// 傳入的表達(dá)式,累加credit
operation.accept(reader);
// 嘗試將reader加入的隊(duì)列,加入隊(duì)列后,會(huì)被循環(huán)的調(diào)用處理該reader讀取數(shù)據(jù)
// 后面一層一層調(diào)用 availableReaders.add(reader); 將其加入到隊(duì)列中
enqueueAvailableReader(reader);
} else {
throw new IllegalStateException(
"No reader for receiverId = " + receiverId + " exists.");
}
}
private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
return;
}
boolean triggerWrite = availableReaders.isEmpty();
registerAvailableReader(reader);
// 如果為空則開始觸發(fā)寫
if (triggerWrite) {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
}
// 如果仔細(xì)看下面的內(nèi)容其實(shí)和tm內(nèi)數(shù)據(jù)傳輸很相似
// 區(qū)別就是通過(guò)一個(gè)是先獲取對(duì)應(yīng)reader,在調(diào)用subpartitionView.pollNext,
// 而tm內(nèi)傳輸就是通過(guò)直接調(diào)用subpartitionView
// 注意他們的類也不一樣,該類是PartitonRequestQueue
private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
if (fatalError || !channel.isWritable()) {
return;
}
BufferAndAvailability next = null;
try {
while (true) {
// 從可用reader中獲取一個(gè)reader,如果存在的話則開始進(jìn)行處理
NetworkSequenceViewReader reader = pollAvailableReader();
if (reader == null) {
return;
}
// reader里是調(diào)用了subpartitionView.pollNext,與tm內(nèi)tsask交互類似
next = reader.getNextBuffer();
if (next == null) {
if (!reader.isReleased()) {
continue;
}
Throwable cause = reader.getFailureCause();
if (cause != null) {
ErrorResponse msg =
new ErrorResponse(
new ProducerFailedException(cause), reader.getReceiverId());
ctx.writeAndFlush(msg);
}
} else {
// 如果還有數(shù)據(jù)則將reader在次加入隊(duì)列中,等待下次被調(diào)用,注意這里是循環(huán)
if (next.moreAvailable()) {
registerAvailableReader(reader);
}
// 構(gòu)建一個(gè)response
// 該response的響應(yīng)帶有buffer,buffer內(nèi)是record,注意是被序列化的
BufferResponse msg =
new BufferResponse(
next.buffer(),
next.getSequenceNumber(),
reader.getReceiverId(),
next.buffersInBacklog());
// 將數(shù)據(jù)發(fā)送給clinet端, 當(dāng)前屬于server端
// 時(shí)刻要記住現(xiàn)在是在netty階段,不要忘記網(wǎng)絡(luò)傳輸
channel.writeAndFlush(msg)
// 當(dāng)flush完成會(huì)嘗試處理下一個(gè)buffer
// 添加一個(gè)listener,該listener也會(huì)做與上面相同的內(nèi)容
.addListener(writeListener);
return;
}
}
} catch (Throwable t) {
if (next != null) {
next.buffer().recycleBuffer();
}
throw new IOException(t.getMessage(), t);
}
}
上面的內(nèi)容是下游算子發(fā)送請(qǐng)求到上游,netty接收到下游的請(qǐng)求,對(duì)消息進(jìn)行decode,最后封裝成nettyMessage,在PartitionRequestServerHandler判斷事件類型,進(jìn)行響應(yīng)的處理,由于我們關(guān)注的是數(shù)據(jù)交互內(nèi)容,所以我們關(guān)注的是AddCredit事件,收到該實(shí)際后找到請(qǐng)求的inpuCannel對(duì)應(yīng)的reader,然后通過(guò)reader調(diào)用pollNext讀取數(shù)據(jù)反饋給client響應(yīng)
下面我們需要對(duì)client的響應(yīng)內(nèi)容進(jìn)行分析
netty的server端即producer,收到client端即consumer發(fā)送的請(qǐng)求后,處理完成后會(huì)進(jìn)行響應(yīng),數(shù)據(jù)是通過(guò)tcp進(jìn)行發(fā)送的,在netty對(duì)數(shù)據(jù)處理抽象成了一個(gè)pipeline,pipeline是由多個(gè)handler組成的,當(dāng)netty接收的數(shù)據(jù)的時(shí)候會(huì)觸發(fā)inboundHander的計(jì)算,當(dāng)發(fā)送數(shù)據(jù)出去的時(shí)候會(huì)觸發(fā)outboundHander的計(jì)算,所以netty的client端接收到數(shù)據(jù)后對(duì)進(jìn)入decode進(jìn)行解碼響應(yīng),解析的response會(huì)向下游的handler發(fā)送,下游的handler就是CreditBasedPartitionRequestClientHandler,所以我們的入口就是這個(gè)類
注 : 由于涉及網(wǎng)絡(luò)連接我們對(duì)于其關(guān)系的調(diào)用可能會(huì)比較混亂,畢竟跨網(wǎng)絡(luò)其代碼復(fù)雜度就會(huì)增加,在后面我會(huì)通過(guò)畫圖的方式將其原理講解出來(lái)
class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdapter
implements NetworkClientHandler {
// netty client接收到數(shù)據(jù)的時(shí)候調(diào)用該方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
// 解碼消息
decodeMsg(msg);
} catch (Throwable t) {
notifyAllChannelsOfErrorAndClose(t);
}
}
private void decodeMsg(Object msg) throws Throwable {
final Class<?> msgClazz = msg.getClass();
// 解碼實(shí)際上在數(shù)據(jù)接入的時(shí)候已經(jīng)解碼,現(xiàn)在只不過(guò)是對(duì)msg做了強(qiáng)轉(zhuǎn)
if (msgClazz == NettyMessage.BufferResponse.class) {
NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;
RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
if (inputChannel == null || inputChannel.isReleased()) {
bufferOrEvent.releaseBuffer();
cancelRequestFor(bufferOrEvent.receiverId);
return;
}
try {
// 就當(dāng)層層調(diào)用嗎
decodeBufferOrEvent(inputChannel, bufferOrEvent);
} catch (Throwable t) {
inputChannel.onError(t);
}
} else if (msgClazz == NettyMessage.ErrorResponse.class) {
// 省略 .....
} else {
throw new IllegalStateException( "Received unknown message from producer: " + msg.getClass());
}
}
private void decodeBufferOrEvent(
RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent)
throws Throwable {
if (bufferOrEvent.isBuffer() && bufferOrEvent.bufferSize == 0) {
inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
} else if (bufferOrEvent.getBuffer() != null) {
// 如果有buffer調(diào)用inputChannel的onBuffer方法
// inputChannel此時(shí)是RemoteInputChannel
// 主要就是將buffer加入到隊(duì)列中
inputChannel.onBuffer(
bufferOrEvent.getBuffer(), bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
} else {
throw new IllegalStateException(
"The read buffer is null in credit-based input channel.");
}
}
}
// RemoteInputChannel.onBuffer
public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
boolean recycleBuffer = true;
try {
if (expectedSequenceNumber != sequenceNumber) {
onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
return;
}
final boolean wasEmpty;
boolean firstPriorityEvent = false;
synchronized (receivedBuffers) {
NetworkActionsLogger.traceInput(
"RemoteInputChannel#onBuffer",
buffer,
inputGate.getOwningTaskName(),
channelInfo,
channelStatePersister,
sequenceNumber);
if (isReleased.get()) {
return;
}
wasEmpty = receivedBuffers.isEmpty();
SequenceBuffer sequenceBuffer = new SequenceBuffer(buffer, sequenceNumber);
DataType dataType = buffer.getDataType();
if (dataType.hasPriority()) {
firstPriorityEvent = addPriorityBuffer(sequenceBuffer);
recycleBuffer = false;
} else {
receivedBuffers.add(sequenceBuffer);
recycleBuffer = false;
channelStatePersister.maybePersist(buffer);
if (dataType.requiresAnnouncement()) {
firstPriorityEvent = addPriorityBuffer(announce(sequenceBuffer));
}
}
++expectedSequenceNumber;
}
if (firstPriorityEvent) {
notifyPriorityEvent(sequenceNumber);
}
// 改代碼前面遇到過(guò)很多次了
if (wasEmpty) {
// 通知channel有數(shù)據(jù),將channel加入可用channel,和前面的代碼一樣
// 都是調(diào)用到了inputGate的notifyChannelNonEmpty方法
// 并喚醒在inputChannelsWithData等待的線程
notifyChannelNonEmpty();
}
if (backlog >= 0) {
onSenderBacklog(backlog);
}
} finally {
if (recycleBuffer) {
buffer.recycleBuffer();
}
}
}
上面已經(jīng)完成了數(shù)據(jù)進(jìn)入隊(duì)列和可用channel進(jìn)入隊(duì)列,后面的內(nèi)容與tm內(nèi)數(shù)據(jù)交互基本一直只不過(guò)在請(qǐng)求inputChannel的時(shí)候從LocalInputChannel變成了RemotInputChannel,其余邏輯都是相同的
到這里我們已經(jīng)把數(shù)據(jù)交換的內(nèi)容已經(jīng)講解完成,下面是圖解,對(duì)于里面的內(nèi)容在代碼中都有體現(xiàn)