我們知道flink已經(jīng)封裝了很多高級的api供用戶訪問使用,但是有時候我們可能根據(jù)不同的需求凛忿,發(fā)現(xiàn)提供的高級api不能滿足我們的需求,這個時候flink也為我們提供了low-level層面的api,比如processFunction,通過processFunction函數(shù),我們可以訪問state,進行注冊process ,event time定時器來幫助我們完成一項復(fù)雜的操作褐奴。在我們使用process 函數(shù)的時候,有一個前提就是要求我們必須使用在keyedStream上于毙,有兩個原因敦冬,一個是getRuntimeContext 得到的StreamingRuntimeContext 只提供了KeyedStateStore的訪問權(quán)限,所以只能訪問keyd state唯沮,另外一個是我們在注冊定時器的時候脖旱,需要有三個維度,namespace介蛉,key,time萌庆,所以要求我們有key,這就是在ProcessFunction中只能在keyedStream做定時器注冊,在flink1.8.0版本中币旧,有ProcessFunction 和KeyedProcessFunction 這個類面向用戶的api,但是在ProcessFunction 類我們無法注冊定時器踊兜,在ProcessOperator源碼中我們發(fā)現(xiàn)注冊是拋出異常
為什么KeyedProcessFunction可以調(diào)用RuntimeContext對象,通過源碼看一下
KeyedProcessFunction是一個抽象類,繼承了AbstractRichFunction抽象類
@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction
進入AbstractRichFunction類,可以看到,該類實現(xiàn)了實現(xiàn)了RichFunction,和Serializable接口
RichFunction中定義了getRuntimeContext方法,在AbstractRichFunction中實現(xiàn)了該方法
@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable
我們調(diào)用getRuntimeContext方法時,便可以獲取RuntimeContext對象,對狀態(tài)等進行操作
private transient RuntimeContext runtimeContext;
@Override
public void setRuntimeContext(RuntimeContext t) {
this.runtimeContext = t;
}
@Override
public RuntimeContext getRuntimeContext() {
if (this.runtimeContext != null) {
return this.runtimeContext;
} else {
throw new IllegalStateException("The runtime context has not been initialized.");
}
}
現(xiàn)在開始看process算子的實現(xiàn)
process算子需要傳入的值,傳入值分為兩種processFunc和KeyedProcessFunc,但不建議使用ProcessFunction了,建議使用KeyedProcessFunction,所以主要看KeyedProcessFunction
數(shù)據(jù)流在經(jīng)過keyBy之后會轉(zhuǎn)換成KeyedStream,先看一下KeyStream中的procss方法
KeyedStream是DataStream的實現(xiàn)
public class KeyedStream<T, KEY> extends DataStream<T>
可以看到process需要傳入一個keyedProcessFunction (用編寫的),如果用戶不指定輸出類型,會獲取默認(rèn)類型
@Internal
public <R> SingleOutputStreamOperator<R> process(
KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
TypeInformation<R> outputType) {
KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
return transform("KeyedProcess", outputType, operator);
}
可以看出將函數(shù)封裝成了一個KeyedProcessOperator類型,這個類繼承了AbstractUdfStreamOperator類和實現(xiàn)了OneInputStreamOperato接口和Triggerable接口
public class KeyedProcessOperator<K, IN, OUT>
extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace>
該類重寫了 父類的open方法,實現(xiàn)了AbstractUdfStreamOperator的processElement方法和Triggerable的onEventTime和onProcessingTime方法, 現(xiàn)在看一下實現(xiàn)的邏輯
open方法
在方法中,首先調(diào)用父類Open方法進行初始化操作, 然后初始化本類服務(wù),
@Override
public void open() throws Exception {
//調(diào)用父類open方法 進行初始化
super.open();
//創(chuàng)建一個 timestampedCollector 來給定Flink output ----英翻 時間戳收集器
collector = new TimestampedCollector<>(output);
//定義內(nèi)部定時服務(wù)
InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
//internalTimerService 封裝到 TimeService中
//獲取timerSErvice SimpleTimerService 內(nèi)部使用了 internalTimerService
TimerService timerService = new SimpleTimerService(internalTimerService);
//傳入 userFun 和 定時器 返回context對象
context = new ContextImpl(userFunction, timerService);
//同上 返回定時器onTimerContext對象
onTimerContext = new OnTimerContextImpl(userFunction, timerService);
}
這里重要的是這行代碼context = new ContextImpl(userFunction, timerService);
現(xiàn)在看下他的內(nèi)部實現(xiàn), 這個是內(nèi)部類,他繼承了KeyedProcessFunction的Context類
在該類中實現(xiàn)了Countext對象,對我們提供上下文服務(wù)
private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context {
private final TimerService timerService;
private StreamRecord<IN> element;
ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) {
function.super();
this.timerService = checkNotNull(timerService);
}
@Override
public Long timestamp() {
checkState(element != null);
if (element.hasTimestamp()) {
return element.getTimestamp();
} else {
return null;
}
}
@Override
public TimerService timerService() {
return timerService;
}
@Override
public <X> void output(OutputTag<X> outputTag, X value) {
if (outputTag == null) {
throw new IllegalArgumentException("OutputTag must not be null.");
}
output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
}
@Override
@SuppressWarnings("unchecked")
public K getCurrentKey() {
return (K) KeyedProcessOperator.this.getCurrentKey();
}
}
在看一下 processElement 方法,主要調(diào)用用戶邏輯
這里userFunc調(diào)用processElement方法,該方法為用戶定義的內(nèi)容
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
//賦值element
context.element = element;
//將context對象 和collector 傳入 userFunc中
//為用戶層級提供了訪問時間和注冊定時器的入口
userFunction.processElement(element.getValue(), context, collector);
//賦值調(diào)用完后 清空
context.element = null;
}
當(dāng)用戶通過ctx.timerService().registerProcessingTimeTimer(); 設(shè)置定時器后,定時器觸發(fā)會走KeyedProcessOperator的onEventTime或onProcessingTime方法 這里看下onEventTime的實現(xiàn)
在EventTime計時器觸發(fā)時調(diào)用,在方法中 調(diào)用了 invokeUserFunction方法
@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
invokeUserFunction(TimeDomain.EVENT_TIME, timer);
}
我們跟隨invokeUserFunction進入方法 看下實現(xiàn),這個方法會調(diào)用 用戶的onTime方法,執(zhí)行里面邏輯
private void invokeUserFunction(
TimeDomain timeDomain,
InternalTimer<K, VoidNamespace> timer) throws Exception {
onTimerContext.timeDomain = timeDomain;
onTimerContext.timer = timer;
//這個時候也就是調(diào)用了我們自定義類K\eyedProcessFunction中的onTimer,
//調(diào)用時傳入了OnTimerContextImpl對象佳恬,其持有IntervalTimeService服務(wù)捏境,也可以注冊定時器操作。
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
最終 將用戶的Func 包裝成KeyedProcessOperator對象 調(diào)用transform方法,最終返回轉(zhuǎn)換后的DataStream
@Internal
public <R> SingleOutputStreamOperator<R> process(
KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
TypeInformation<R> outputType) {
KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
return transform("KeyedProcess", outputType, operator);
}
現(xiàn)在我們追蹤進去看,最終調(diào)用了doTransform方法,經(jīng)過一系列的轉(zhuǎn)換,將將operator添加到拓補圖中,最終將operator轉(zhuǎn)換成SingleOutputStreamOperator對象,該類繼承DataStream,進行返回
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
// 檢查輸出類型是否為MissingTypeInfo,如果是拋出異常,
transformation.getOutputType();
//創(chuàng)建OneInputTransformation
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
transformation, //input --上游的 transformation
operatorName,
operatorFactory, //需要進行轉(zhuǎn)換操作的
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
//多個級聯(lián)的map和filter操作會被transform成為一連串的OneInputTransformation毁葱。
// 后一個transformation的input指向前一個transformation
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
到此整個process算子調(diào)用完成
如有錯誤,歡迎指正!