Flink-Streaming-算子學(xué)習(xí)-01

概述

  • 系統(tǒng)的學(xué)習(xí)一下flink的streaming 算子操作怀伦,學(xué)習(xí)一下新的技術(shù),增加一下個(gè)人的技術(shù)棧儲(chǔ)備峡迷,java/scala的api儡遮;
  • Flink流式算子api官網(wǎng)連接:DataStream Transformations
  • 建議多閱讀官網(wǎng),F(xiàn)link在阿里的推動(dòng)下抛杨,發(fā)展很猛茄靠,在官網(wǎng)已經(jīng)有可中文版,可以嘗試閱讀一下蝶桶,邊閱讀邊總結(jié)慨绳;
  • flink 1.9.0官網(wǎng)釋義

數(shù)據(jù)準(zhǔn)備

  • 先準(zhǔn)備一份測(cè)試數(shù)據(jù),flink消費(fèi)kafka_010的數(shù)據(jù)真竖,不采用batch的方式;

  • scala的api練習(xí)詳見github;

  • github:flink_way (https://github.com/yahuili1128/flink_way)

  • java代碼中flink消費(fèi)kafka中的單個(gè)topic脐雪,scla代碼中消費(fèi)的是兩個(gè)topic

  • java代碼中先運(yùn)行mockup包下方法,產(chǎn)生測(cè)試數(shù)據(jù)恢共,即可運(yùn)行算子操作的demo战秋;

streaming算子

Map

  • 處理DataStream中的每一個(gè)元素;
  • 官網(wǎng)釋義如下:
DataStream → DataStream 
Takes one element and produces one element. A map function that doubles the values of the input stream:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});
  • demo
/**
     * 獲取的是整個(gè)pojo
     *  MockUpModel(name=yahui, gender=female, timestamp=1563090399305, age=34)
     * @param kafkaData
     * @return
     */
    private static SingleOutputStreamOperator<MockUpModel> getAddAgePojo(SingleOutputStreamOperator<MockUpModel> kafkaData) {
        //      返回的是MockUpModel pojo類讨韭,其中age字段均+5
        return kafkaData.map(new MapFunction<MockUpModel, MockUpModel>() {
            @Override
            public MockUpModel map(MockUpModel value) throws Exception {
                MockUpModel mockUpModel = new MockUpModel();
                mockUpModel.name = value.name;
                mockUpModel.gender = value.gender;
                mockUpModel.age = value.age + 5;
                mockUpModel.timestamp = value.timestamp;
                return mockUpModel;

            }
        });
    }

FlatMap

  • 將DataStream中的每一個(gè)元素展開返回0到多個(gè)元素脂信,以iterator的形式返回;
  • 官網(wǎng)釋義
DataStream → DataStream 
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});
  • demo
public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator<MockUpModel> kafkaData = getKafka010Data(env);

        //      這個(gè)操作不能反應(yīng)flatMap的算子作用,下面的作用相當(dāng)于filter,輸出結(jié)果為MockUpModel(name=liyahui-0, gender=male, timestamp=1561516105296, age=0)
        kafkaData.flatMap(new FlatMapFunction<MockUpModel, MockUpModel>() {
            @Override
            public void flatMap(MockUpModel value, Collector<MockUpModel> out) throws Exception {
                if (value.age % 2 == 0) {
                    out.collect(value);
                }
            }
        }).print().setParallelism(1);
        //   flatmap透硝,是將嵌套集合轉(zhuǎn)換并平鋪成非嵌套集合狰闪。最好的解釋詳見官網(wǎng)的釋義
        /*
        dataStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out)
                    throws Exception {
                for(String word: value.split(" ")){
                    out.collect(word);
                }
            }
        });
        */


        env.execute("flink kafka010 demo");
    }

Fliter

  • 過濾,用于DataStream的每一個(gè)元素,返回的是true或者false
DataStream → DataStream 
Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});
  • demo
// method 1
/**
     * 過濾出年齡是偶數(shù)的人
     *
     * @param kafkaData
     * @return
     */
    private static SingleOutputStreamOperator<MockUpModel> getFilterDS2(SingleOutputStreamOperator<MockUpModel> kafkaData) {
        return kafkaData.filter(new FilterFunction<MockUpModel>() {
            @Override
            public boolean filter(MockUpModel value) throws Exception {
                if (value.age % 2 == 0) {
                    return true;
                }
                return false;
            }
        });
    }
// method-2
/**
     * lambda 的方式
     *
     * @param kafkaData
     * @return
     */
    private static SingleOutputStreamOperator<MockUpModel> getFilterDS(SingleOutputStreamOperator<MockUpModel> kafkaData) {
        return kafkaData.filter(line -> line.age % 2 == 0);
    }

KeyBy

  • 對(duì)DataStream進(jìn)行按照指定的key進(jìn)行分區(qū)
  • 注意:pojo不能重寫hashCode方法濒生;
DataStream → KeyedStream 

Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, *keyBy()* is implemented with hash partitioning. There are different ways to [specify keys](https://ci.apache.org/projects/flink/flink-docs-
master/dev/api_concepts.html#specifying-keys).

This transformation returns a *KeyedStream*, which is, among other things, required to use [keyed state](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#keyed-state).


dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple


Attention A type **cannot be a key** if:

1.  it is a POJO type but does not override the *hashCode()* method and relies on the *Object.hashCode()* implementation.
2.  it is an array of any type.
  • demo
/**
     * 以年齡為分組條件進(jìn)行keyBy
     *
     * @param kafkaData
     * @return
     */
    private static KeyedStream<MockUpModel, Integer> getKeyedDS(SingleOutputStreamOperator<MockUpModel> kafkaData) {
        //  lambda 表達(dá)式
        //kafkaData.keyBy(line -> line.age).print().setParallelism(1);
        return kafkaData.keyBy(new KeySelector<MockUpModel, Integer>() {
            @Override
            public Integer getKey(MockUpModel value) throws Exception {
                return value.age;
            }
        });
    }

Reduce

  • 疊加操作
Reduce
KeyedStream → DataStream    
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value. 

A reduce function that creates a stream of partial sums:

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});
  • demo
public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator<MockUpModel> kafka010Data = getKafka010Data(env);
        //      lambda +java
        kafka010Data.keyBy(line -> line.gender).reduce(new ReduceFunction<MockUpModel>() {
            @Override
            public MockUpModel reduce(MockUpModel value1, MockUpModel value2) throws Exception {
                MockUpModel mockUpModel = new MockUpModel();
                mockUpModel.name = value1.name + "--" + value2.name;
                mockUpModel.gender = value1.gender;
                mockUpModel.age = (value1.age + value2.age) / 2;
                return mockUpModel;

            }
        }).print().setParallelism(1);

        env.execute("flink kafka010 demo");
    }

Fold

  • 將keyedDS轉(zhuǎn)成DS;在V1.9中是過期函數(shù)
  • 官網(wǎng)釋義
KeyedStream → DataStream    
A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value. 

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...

DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });

Aggregations

  • 聚合函數(shù)埋泵,獲取keyedStream中的最大/最小/sum值, max 和 maxBy 之間的區(qū)別在于 max 返回流中的最大值罪治,但 maxBy 返回具有最大值的鍵丽声, min 和 minBy 同理。
KeyedStream → DataStream    
Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

union

  • 聚合算子觉义,可以將多個(gè)結(jié)構(gòu)相同的DataStream進(jìn)行union雁社;
DataStream* → DataStream    
Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.

dataStream.union(otherStream1, otherStream2, ...);

connect

  • 用來將兩個(gè)dataStream組裝成一個(gè)ConnectedStreams,而且這個(gè)connectedStream的組成結(jié)構(gòu)就是保留原有的dataStream的結(jié)構(gòu)體;這樣我們就可以把不同的數(shù)據(jù)組裝成同一個(gè)結(jié)構(gòu);
Connect
DataStream,DataStream → ConnectedStreams    
"Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

split

  • Split就是將一個(gè)DataStream分成兩個(gè)或者多個(gè)DataStream
DataStream → SplitStream    
Split the stream into two or more streams according to some criterion.

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

select

  • Select就是獲取分流后對(duì)應(yīng)的數(shù)據(jù)
SplitStream → DataStream    
Select one or more streams from a split stream.

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

iterate

DataStream → IterativeStream → DataStream 

Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See [iterations](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/index.html#iterations) for a complete description.

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});

總結(jié)

  • Flink的stream算子學(xué)習(xí)第一部分晒骇,簡(jiǎn)單完成霉撵。后期優(yōu)化滋饲;

李小李可不能落后啊

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市喊巍,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌箍鼓,老刑警劉巖崭参,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異款咖,居然都是意外死亡何暮,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門铐殃,熙熙樓的掌柜王于貴愁眉苦臉地迎上來海洼,“玉大人,你說我怎么就攤上這事富腊』捣辏” “怎么了?”我有些...
    開封第一講書人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵赘被,是天一觀的道長(zhǎng)是整。 經(jīng)常有香客問我,道長(zhǎng)民假,這世上最難降的妖魔是什么浮入? 我笑而不...
    開封第一講書人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮羊异,結(jié)果婚禮上事秀,老公的妹妹穿的比我還像新娘。我一直安慰自己野舶,他們只是感情好易迹,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著平道,像睡著了一般赴蝇。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上巢掺,一...
    開封第一講書人閱讀 49,144評(píng)論 1 285
  • 那天句伶,我揣著相機(jī)與錄音,去河邊找鬼陆淀。 笑死考余,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的轧苫。 我是一名探鬼主播楚堤,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼疫蔓,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了身冬?” 一聲冷哼從身側(cè)響起衅胀,我...
    開封第一講書人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎酥筝,沒想到半個(gè)月后滚躯,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡嘿歌,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年掸掏,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片宙帝。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡丧凤,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出步脓,到底是詐尸還是另有隱情愿待,我是刑警寧澤,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布靴患,位于F島的核電站呼盆,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏蚁廓。R本人自食惡果不足惜访圃,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望相嵌。 院中可真熱鬧腿时,春花似錦、人聲如沸饭宾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽看铆。三九已至徽鼎,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間弹惦,已是汗流浹背否淤。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留棠隐,地道東北人石抡。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像助泽,于是被迫代替她去往敵國(guó)和親啰扛。 傳聞我的和親對(duì)象是個(gè)殘疾皇子嚎京,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容