Flink-1.10 源碼筆記 process && 調(diào)用過程

我們知道flink已經(jīng)封裝了很多高級的api供用戶訪問使用,但是有時候我們可能根據(jù)不同的需求凛忿,發(fā)現(xiàn)提供的高級api不能滿足我們的需求,這個時候flink也為我們提供了low-level層面的api,比如processFunction,通過processFunction函數(shù),我們可以訪問state,進行注冊process ,event time定時器來幫助我們完成一項復(fù)雜的操作褐奴。在我們使用process 函數(shù)的時候,有一個前提就是要求我們必須使用在keyedStream上于毙,有兩個原因敦冬,一個是getRuntimeContext 得到的StreamingRuntimeContext 只提供了KeyedStateStore的訪問權(quán)限,所以只能訪問keyd state唯沮,另外一個是我們在注冊定時器的時候脖旱,需要有三個維度,namespace介蛉,key,time萌庆,所以要求我們有key,這就是在ProcessFunction中只能在keyedStream做定時器注冊,在flink1.8.0版本中币旧,有ProcessFunction 和KeyedProcessFunction 這個類面向用戶的api,但是在ProcessFunction 類我們無法注冊定時器踊兜,在ProcessOperator源碼中我們發(fā)現(xiàn)注冊是拋出異常

為什么KeyedProcessFunction可以調(diào)用RuntimeContext對象,通過源碼看一下
KeyedProcessFunction是一個抽象類,繼承了AbstractRichFunction抽象類

@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction

進入AbstractRichFunction類,可以看到,該類實現(xiàn)了實現(xiàn)了RichFunction,和Serializable接口
RichFunction中定義了getRuntimeContext方法,在AbstractRichFunction中實現(xiàn)了該方法

@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable

我們調(diào)用getRuntimeContext方法時,便可以獲取RuntimeContext對象,對狀態(tài)等進行操作

    private transient RuntimeContext runtimeContext;

    @Override
    public void setRuntimeContext(RuntimeContext t) {
        this.runtimeContext = t;
    }

    @Override
    public RuntimeContext getRuntimeContext() {
        if (this.runtimeContext != null) {
            return this.runtimeContext;
        } else {
            throw new IllegalStateException("The runtime context has not been initialized.");
        }
    }

現(xiàn)在開始看process算子的實現(xiàn)

process算子需要傳入的值,傳入值分為兩種processFunc和KeyedProcessFunc,但不建議使用ProcessFunction了,建議使用KeyedProcessFunction,所以主要看KeyedProcessFunction


image.png

數(shù)據(jù)流在經(jīng)過keyBy之后會轉(zhuǎn)換成KeyedStream,先看一下KeyStream中的procss方法
KeyedStream是DataStream的實現(xiàn)

public class KeyedStream<T, KEY> extends DataStream<T>

可以看到process需要傳入一個keyedProcessFunction (用編寫的),如果用戶不指定輸出類型,會獲取默認(rèn)類型

    @Internal
    public <R> SingleOutputStreamOperator<R> process(
            KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
            TypeInformation<R> outputType) {

        KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
        return transform("KeyedProcess", outputType, operator);
    }

可以看出將函數(shù)封裝成了一個KeyedProcessOperator類型,這個類繼承了AbstractUdfStreamOperator類和實現(xiàn)了OneInputStreamOperato接口和Triggerable接口

public class KeyedProcessOperator<K, IN, OUT>
    extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace>

該類重寫了 父類的open方法,實現(xiàn)了AbstractUdfStreamOperator的processElement方法和Triggerable的onEventTime和onProcessingTime方法, 現(xiàn)在看一下實現(xiàn)的邏輯

open方法
在方法中,首先調(diào)用父類Open方法進行初始化操作, 然后初始化本類服務(wù),

    @Override
    public void open() throws Exception {
        //調(diào)用父類open方法 進行初始化
        super.open();
        //創(chuàng)建一個 timestampedCollector 來給定Flink output             ----英翻  時間戳收集器
        collector = new TimestampedCollector<>(output);

        //定義內(nèi)部定時服務(wù)      
        InternalTimerService<VoidNamespace> internalTimerService =
            getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
            //internalTimerService 封裝到 TimeService中
                //獲取timerSErvice    SimpleTimerService 內(nèi)部使用了 internalTimerService
        TimerService timerService = new SimpleTimerService(internalTimerService);

        //傳入 userFun 和 定時器  返回context對象
        context = new ContextImpl(userFunction, timerService);
        //同上 返回定時器onTimerContext對象
        onTimerContext = new OnTimerContextImpl(userFunction, timerService);
    }

這里重要的是這行代碼context = new ContextImpl(userFunction, timerService);
現(xiàn)在看下他的內(nèi)部實現(xiàn), 這個是內(nèi)部類,他繼承了KeyedProcessFunction的Context類
在該類中實現(xiàn)了Countext對象,對我們提供上下文服務(wù)

    private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context {

        private final TimerService timerService;

        private StreamRecord<IN> element;

        ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) {
            function.super();
            this.timerService = checkNotNull(timerService);
        }

        @Override
        public Long timestamp() {
            checkState(element != null);

            if (element.hasTimestamp()) {
                return element.getTimestamp();
            } else {
                return null;
            }
        }
            
        @Override
        public TimerService timerService() {
            return timerService;
        }

        @Override
        public <X> void output(OutputTag<X> outputTag, X value) {
            if (outputTag == null) {
                throw new IllegalArgumentException("OutputTag must not be null.");
            }

            output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
        }

        @Override
        @SuppressWarnings("unchecked")
        public K getCurrentKey() {
            return (K) KeyedProcessOperator.this.getCurrentKey();
        }
    }

在看一下 processElement 方法,主要調(diào)用用戶邏輯
這里userFunc調(diào)用processElement方法,該方法為用戶定義的內(nèi)容

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);

        //賦值element
        context.element = element;
        //將context對象 和collector 傳入 userFunc中
        //為用戶層級提供了訪問時間和注冊定時器的入口
        userFunction.processElement(element.getValue(), context, collector);

        //賦值調(diào)用完后 清空
        context.element = null;
    }

當(dāng)用戶通過ctx.timerService().registerProcessingTimeTimer(); 設(shè)置定時器后,定時器觸發(fā)會走KeyedProcessOperator的onEventTime或onProcessingTime方法 這里看下onEventTime的實現(xiàn)
在EventTime計時器觸發(fā)時調(diào)用,在方法中 調(diào)用了 invokeUserFunction方法

  @Override
  public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
      collector.setAbsoluteTimestamp(timer.getTimestamp());
      invokeUserFunction(TimeDomain.EVENT_TIME, timer);
  }

我們跟隨invokeUserFunction進入方法 看下實現(xiàn),這個方法會調(diào)用 用戶的onTime方法,執(zhí)行里面邏輯

    private void invokeUserFunction(
        TimeDomain timeDomain,
        InternalTimer<K, VoidNamespace> timer) throws Exception {
        onTimerContext.timeDomain = timeDomain;
        onTimerContext.timer = timer;
        //這個時候也就是調(diào)用了我們自定義類K\eyedProcessFunction中的onTimer,
        //調(diào)用時傳入了OnTimerContextImpl對象佳恬,其持有IntervalTimeService服務(wù)捏境,也可以注冊定時器操作。
        userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
        onTimerContext.timeDomain = null;
        onTimerContext.timer = null;
    }

最終 將用戶的Func 包裝成KeyedProcessOperator對象 調(diào)用transform方法,最終返回轉(zhuǎn)換后的DataStream

    @Internal
    public <R> SingleOutputStreamOperator<R> process(
            KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
            TypeInformation<R> outputType) {

        KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
        return transform("KeyedProcess", outputType, operator);
    }

現(xiàn)在我們追蹤進去看,最終調(diào)用了doTransform方法,經(jīng)過一系列的轉(zhuǎn)換,將將operator添加到拓補圖中,最終將operator轉(zhuǎn)換成SingleOutputStreamOperator對象,該類繼承DataStream,進行返回

    protected <R> SingleOutputStreamOperator<R> doTransform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            StreamOperatorFactory<R> operatorFactory) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        // 檢查輸出類型是否為MissingTypeInfo,如果是拋出異常,
        transformation.getOutputType();

        //創(chuàng)建OneInputTransformation
        OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
            transformation,          //input   --上游的 transformation
                operatorName,
                operatorFactory,     //需要進行轉(zhuǎn)換操作的
                outTypeInfo,
                environment.getParallelism());

        @SuppressWarnings({"unchecked", "rawtypes"})
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

        //多個級聯(lián)的map和filter操作會被transform成為一連串的OneInputTransformation毁葱。
        // 后一個transformation的input指向前一個transformation
        getExecutionEnvironment().addOperator(resultTransform);

        return returnStream;
    }

到此整個process算子調(diào)用完成

如有錯誤,歡迎指正!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
禁止轉(zhuǎn)載垫言,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。
  • 序言:七十年代末倾剿,一起剝皮案震驚了整個濱河市筷频,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌前痘,老刑警劉巖凛捏,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異芹缔,居然都是意外死亡坯癣,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門最欠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來示罗,“玉大人惩猫,你說我怎么就攤上這事⊙恋悖” “怎么了轧房?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長绍绘。 經(jīng)常有香客問我奶镶,道長,這世上最難降的妖魔是什么陪拘? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任厂镇,我火速辦了婚禮,結(jié)果婚禮上藻丢,老公的妹妹穿的比我還像新娘剪撬。我一直安慰自己,他們只是感情好悠反,可當(dāng)我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布残黑。 她就那樣靜靜地躺著,像睡著了一般斋否。 火紅的嫁衣襯著肌膚如雪梨水。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天茵臭,我揣著相機與錄音疫诽,去河邊找鬼。 笑死旦委,一個胖子當(dāng)著我的面吹牛奇徒,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播缨硝,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼摩钙,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了查辩?” 一聲冷哼從身側(cè)響起胖笛,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎宜岛,沒想到半個月后长踊,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡萍倡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年身弊,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡佑刷,死狀恐怖莉擒,靈堂內(nèi)的尸體忽然破棺而出酿炸,到底是詐尸還是另有隱情瘫絮,我是刑警寧澤,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布填硕,位于F島的核電站麦萤,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏扁眯。R本人自食惡果不足惜壮莹,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望姻檀。 院中可真熱鬧命满,春花似錦、人聲如沸绣版。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽杂抽。三九已至诈唬,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間缩麸,已是汗流浹背铸磅。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留杭朱,地道東北人阅仔。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像弧械,于是被迫代替她去往敵國和親八酒。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內(nèi)容