聊聊flink DataStream的connect操作

本文主要研究一下flink DataStream的connect操作

DataStream.connect

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream<T> {

    //......

    public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
        return new ConnectedStreams<>(environment, this, dataStream);
    }

    @PublicEvolving
    public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
        return new BroadcastConnectedStream<>(
                environment,
                this,
                Preconditions.checkNotNull(broadcastStream),
                broadcastStream.getBroadcastStateDescriptor());
    }

    //......
}
  • DataStream的connect操作創(chuàng)建的是ConnectedStreams或BroadcastConnectedStream泰涂,它用了兩個泛型遗契,即不要求兩個dataStream的element是同一類型

ConnectedStreams

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/ConnectedStreams.java

@Public
public class ConnectedStreams<IN1, IN2> {

    protected final StreamExecutionEnvironment environment;
    protected final DataStream<IN1> inputStream1;
    protected final DataStream<IN2> inputStream2;

    protected ConnectedStreams(StreamExecutionEnvironment env, DataStream<IN1> input1, DataStream<IN2> input2) {
        this.environment = requireNonNull(env);
        this.inputStream1 = requireNonNull(input1);
        this.inputStream2 = requireNonNull(input2);
    }

    public StreamExecutionEnvironment getExecutionEnvironment() {
        return environment;
    }

    public DataStream<IN1> getFirstInput() {
        return inputStream1;
    }

    public DataStream<IN2> getSecondInput() {
        return inputStream2;
    }

    public TypeInformation<IN1> getType1() {
        return inputStream1.getType();
    }

    public TypeInformation<IN2> getType2() {
        return inputStream2.getType();
    }

    public ConnectedStreams<IN1, IN2> keyBy(int keyPosition1, int keyPosition2) {
        return new ConnectedStreams<>(this.environment, inputStream1.keyBy(keyPosition1),
                inputStream2.keyBy(keyPosition2));
    }

    public ConnectedStreams<IN1, IN2> keyBy(int[] keyPositions1, int[] keyPositions2) {
        return new ConnectedStreams<>(environment, inputStream1.keyBy(keyPositions1),
                inputStream2.keyBy(keyPositions2));
    }

    public ConnectedStreams<IN1, IN2> keyBy(String field1, String field2) {
        return new ConnectedStreams<>(environment, inputStream1.keyBy(field1),
                inputStream2.keyBy(field2));
    }

    public ConnectedStreams<IN1, IN2> keyBy(String[] fields1, String[] fields2) {
        return new ConnectedStreams<>(environment, inputStream1.keyBy(fields1),
                inputStream2.keyBy(fields2));
    }

    public ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
        return new ConnectedStreams<>(environment, inputStream1.keyBy(keySelector1),
                inputStream2.keyBy(keySelector2));
    }

    public <KEY> ConnectedStreams<IN1, IN2> keyBy(
            KeySelector<IN1, KEY> keySelector1,
            KeySelector<IN2, KEY> keySelector2,
            TypeInformation<KEY> keyType) {
        return new ConnectedStreams<>(
            environment,
            inputStream1.keyBy(keySelector1, keyType),
            inputStream2.keyBy(keySelector2, keyType));
    }

    public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper) {

        TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
            coMapper,
            CoMapFunction.class,
            0,
            1,
            2,
            TypeExtractor.NO_INDEX,
            getType1(),
            getType2(),
            Utils.getCallLocationName(),
            true);

        return transform("Co-Map", outTypeInfo, new CoStreamMap<>(inputStream1.clean(coMapper)));

    }

    public <R> SingleOutputStreamOperator<R> flatMap(
            CoFlatMapFunction<IN1, IN2, R> coFlatMapper) {

        TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
            coFlatMapper,
            CoFlatMapFunction.class,
            0,
            1,
            2,
            TypeExtractor.NO_INDEX,
            getType1(),
            getType2(),
            Utils.getCallLocationName(),
            true);

        return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> process(
            CoProcessFunction<IN1, IN2, R> coProcessFunction) {

        TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
            coProcessFunction,
            CoProcessFunction.class,
            0,
            1,
            2,
            TypeExtractor.NO_INDEX,
            getType1(),
            getType2(),
            Utils.getCallLocationName(),
            true);

        return process(coProcessFunction, outTypeInfo);
    }

    @Internal
    public <R> SingleOutputStreamOperator<R> process(
            CoProcessFunction<IN1, IN2, R> coProcessFunction,
            TypeInformation<R> outputType) {

        TwoInputStreamOperator<IN1, IN2, R> operator;

        if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) {
            operator = new KeyedCoProcessOperator<>(inputStream1.clean(coProcessFunction));
        } else {
            operator = new CoProcessOperator<>(inputStream1.clean(coProcessFunction));
        }

        return transform("Co-Process", outputType, operator);
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(String functionName,
            TypeInformation<R> outTypeInfo,
            TwoInputStreamOperator<IN1, IN2, R> operator) {

        // read the output type of the input Transforms to coax out errors about MissingTypeInfo
        inputStream1.getType();
        inputStream2.getType();

        TwoInputTransformation<IN1, IN2, R> transform = new TwoInputTransformation<>(
                inputStream1.getTransformation(),
                inputStream2.getTransformation(),
                functionName,
                operator,
                outTypeInfo,
                environment.getParallelism());

        if (inputStream1 instanceof KeyedStream && inputStream2 instanceof KeyedStream) {
            KeyedStream<IN1, ?> keyedInput1 = (KeyedStream<IN1, ?>) inputStream1;
            KeyedStream<IN2, ?> keyedInput2 = (KeyedStream<IN2, ?>) inputStream2;

            TypeInformation<?> keyType1 = keyedInput1.getKeyType();
            TypeInformation<?> keyType2 = keyedInput2.getKeyType();
            if (!(keyType1.canEqual(keyType2) && keyType1.equals(keyType2))) {
                throw new UnsupportedOperationException("Key types if input KeyedStreams " +
                        "don't match: " + keyType1 + " and " + keyType2 + ".");
            }

            transform.setStateKeySelectors(keyedInput1.getKeySelector(), keyedInput2.getKeySelector());
            transform.setStateKeyType(keyType1);
        }

        @SuppressWarnings({ "unchecked", "rawtypes" })
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, transform);

        getExecutionEnvironment().addOperator(transform);

        return returnStream;
    }
}
  • ConnectedStreams提供了keyBy方法用于指定兩個stream的keySelector,提供了map、flatMap、process、transform操作,其中前三個操作最后都是調(diào)用transform操作
  • transform操作接收TwoInputStreamOperator類型的operator,然后轉(zhuǎn)換為SingleOutputStreamOperator
  • map操作接收CoMapFunction淹遵,flatMap操作接收CoFlatMapFunction,process操作接收CoProcessFunction

CoMapFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/co/CoMapFunction.java

@Public
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {

    OUT map1(IN1 value) throws Exception;

    OUT map2(IN2 value) throws Exception;
}
  • CoMapFunction繼承了Function负溪,它定義了map1透揣、map2方法

CoFlatMapFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java

@Public
public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {

    void flatMap1(IN1 value, Collector<OUT> out) throws Exception;

    void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
}
  • CoFlatMapFunction繼承了Function,它定義了map1笙以、map2方法淌实,與CoMapFunction不同的是,CoFlatMapFunction的map1、map2方法多了Collector參數(shù)

CoProcessFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java

@PublicEvolving
public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {

    private static final long serialVersionUID = 1L;

    public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;

    public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}

    public abstract class Context {

        public abstract Long timestamp();

        public abstract TimerService timerService();

        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }

    public abstract class OnTimerContext extends Context {
        /**
         * The {@link TimeDomain} of the firing timer.
         */
        public abstract TimeDomain timeDomain();
    }
}
  • CoProcessFunction繼承了AbstractRichFunction拆祈,它定義了processElement1恨闪、processElement2方法,與CoFlatMapFunction不同的是放坏,它定義的這兩個方法多了Context參數(shù)
  • CoProcessFunction定義了Context及OnTimerContext咙咽,在processElement1、processElement2方法可以訪問到Context淤年,Context提供了timestamp钧敞、timerService、output方法
  • CoProcessFunction與CoFlatMapFunction不同的另外一點是它可以使用TimerService來注冊timer麸粮,然后在onTimer方法里頭實現(xiàn)響應(yīng)的邏輯

小結(jié)

  • DataStream的connect操作創(chuàng)建的是ConnectedStreams或BroadcastConnectedStream溉苛,它用了兩個泛型,即不要求兩個dataStream的element是同一類型
  • ConnectedStreams提供了keyBy方法用于指定兩個stream的keySelector弄诲,提供了map愚战、flatMap、process齐遵、transform操作寂玲,其中前三個操作最后都是調(diào)用transform操作;transform操作接收TwoInputStreamOperator類型的operator梗摇,然后轉(zhuǎn)換為SingleOutputStreamOperator拓哟;map操作接收CoMapFunction,flatMap操作接收CoFlatMapFunction伶授,process操作接收CoProcessFunction
  • CoFlatMapFunction與CoMapFunction不同的是断序,CoFlatMapFunction的map1、map2方法多了Collector參數(shù)谎砾;CoProcessFunction定義了processElement1逢倍、processElement2方法捧颅,與CoFlatMapFunction不同的是景图,它定義的這兩個方法多了Context參數(shù);CoProcessFunction與CoFlatMapFunction不同的另外一點是它可以使用TimerService來注冊timer碉哑,然后在onTimer方法里頭實現(xiàn)響應(yīng)的邏輯

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末挚币,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子扣典,更是在濱河造成了極大的恐慌妆毕,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,548評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件贮尖,死亡現(xiàn)場離奇詭異笛粘,居然都是意外死亡,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,497評論 3 399
  • 文/潘曉璐 我一進店門薪前,熙熙樓的掌柜王于貴愁眉苦臉地迎上來润努,“玉大人,你說我怎么就攤上這事示括∑探剑” “怎么了?”我有些...
    開封第一講書人閱讀 167,990評論 0 360
  • 文/不壞的土叔 我叫張陵垛膝,是天一觀的道長鳍侣。 經(jīng)常有香客問我,道長吼拥,這世上最難降的妖魔是什么倚聚? 我笑而不...
    開封第一講書人閱讀 59,618評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮凿可,結(jié)果婚禮上秉沼,老公的妹妹穿的比我還像新娘。我一直安慰自己矿酵,他們只是感情好唬复,可當(dāng)我...
    茶點故事閱讀 68,618評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著全肮,像睡著了一般敞咧。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上辜腺,一...
    開封第一講書人閱讀 52,246評論 1 308
  • 那天休建,我揣著相機與錄音,去河邊找鬼评疗。 笑死测砂,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的百匆。 我是一名探鬼主播砌些,決...
    沈念sama閱讀 40,819評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼加匈!你這毒婦竟也來了存璃?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,725評論 0 276
  • 序言:老撾萬榮一對情侶失蹤雕拼,失蹤者是張志新(化名)和其女友劉穎纵东,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體啥寇,經(jīng)...
    沈念sama閱讀 46,268評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡偎球,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,356評論 3 340
  • 正文 我和宋清朗相戀三年洒扎,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片衰絮。...
    茶點故事閱讀 40,488評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡逊笆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出岂傲,到底是詐尸還是另有隱情难裆,我是刑警寧澤,帶...
    沈念sama閱讀 36,181評論 5 350
  • 正文 年R本政府宣布镊掖,位于F島的核電站乃戈,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏亩进。R本人自食惡果不足惜症虑,卻給世界環(huán)境...
    茶點故事閱讀 41,862評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望归薛。 院中可真熱鬧谍憔,春花似錦、人聲如沸主籍。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,331評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽千元。三九已至苫昌,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間幸海,已是汗流浹背祟身。 一陣腳步聲響...
    開封第一講書人閱讀 33,445評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留物独,地道東北人袜硫。 一個月前我還...
    沈念sama閱讀 48,897評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像挡篓,于是被迫代替她去往敵國和親婉陷。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,500評論 2 359

推薦閱讀更多精彩內(nèi)容