【零基礎(chǔ)學(xué)flink】flink中的轉(zhuǎn)換算子(transform operator)

轉(zhuǎn)化算子(transform operator)將一個(gè)或多個(gè)DataStream轉(zhuǎn)換為新的DataStream,如此下去可以將多個(gè)轉(zhuǎn)換組合成復(fù)雜的數(shù)據(jù)流拓?fù)洹?/p>

本節(jié)介紹了基本轉(zhuǎn)換库倘,應(yīng)用這些轉(zhuǎn)換的有效物理分區(qū)(partition)临扮,以及對(duì)Flink轉(zhuǎn)換chain的深入介紹。

目錄

  • DataStream轉(zhuǎn)換
  • 物理分區(qū)
  • 任務(wù)鏈和資源組

DataStream轉(zhuǎn)換

  • map
    DataStream→DataStream |
    讀取一個(gè)元素并生成一個(gè)元素教翩。例如杆勇,一個(gè)map函數(shù),它將輸入流的值加倍:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});
  • FlatMap
    DataStream→DataStream
    讀取一個(gè)元素饱亿,并生成零個(gè)蚜退、一個(gè)或多個(gè)元素。例如:將句子分割為單詞的flatmap函數(shù):
dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});
  • Filter
    DataStream→DataStream
    將每個(gè)元素輸入的filter布爾函數(shù):僅保留filter函數(shù)返回true的那部分元素彪笼,filter返回false的元素會(huì)被過濾掉钻注。例如:過濾掉零值的過濾器:
dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});
  • KeyBy
    DataStream→KeyedStream |

在邏輯上將流分區(qū)為互不相交的分區(qū)。具有相同key的所有記錄會(huì)分配給到同一分區(qū)配猫。在內(nèi)部幅恋,keyBy()是使用hash分區(qū)實(shí)現(xiàn)。在flink中有多種指定鍵的方法
此轉(zhuǎn)換返回的是KeyedStream泵肄,其中包括key-state捆交。

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

注意 :

  1. 它是POJO類型淑翼,但不覆蓋hashCode()方法,key內(nèi)部是hash分區(qū)依賴于hashCode()方法的實(shí)現(xiàn)品追。
  2. 任何類型的數(shù)組也不能成為key玄括。
  • reduce
    KeyedStream→DataStream

注意reduce函數(shù)是將KeyedStream轉(zhuǎn)換為DataStream,也就是reduce調(diào)用前必須進(jìn)行分區(qū)肉瓦,即得先調(diào)用keyBy()函數(shù)

在分區(qū)的數(shù)據(jù)流上調(diào)用reduce函數(shù):將當(dāng)前元素與最后一個(gè)reduce的值合并生成新值遭京。

例如:一個(gè)求和的reduce函數(shù):

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});
  • Fold
    KeyedStream→DataStream
    注意Fold轉(zhuǎn)換必須是基于KeyedStream(比如先執(zhí)行keyBy操作)。

在一個(gè)初始值上進(jìn)行Fold操作:將當(dāng)前值和上一次Fold產(chǎn)生的值進(jìn)行合并產(chǎn)生一個(gè)新的值:

比如:將Fold函數(shù)應(yīng)用于(1,2,3,4,5)時(shí)泞莉,結(jié)果為:“start-1”哪雕,“start-1-2”,“start-1-2-3”,. ..

DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });
  • Aggregations(聚合)
    KeyedStream→DataStream
    min和minBy之間的差異是min返回最小值戒财,而minBy返回該字段中具有最小值的元素(max和maxBy相同)热监。
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
  • Window
    KeyedStream→WindowedStream

可以在已經(jīng)分區(qū)的KeyedStream上定義Windows捺弦。Windows根據(jù)某些特征(例如饮寞,在最后5秒內(nèi)到達(dá)的數(shù)據(jù))對(duì)每個(gè)key中的數(shù)據(jù)進(jìn)行分組。有關(guān)窗口的完整說明列吼,請參見windows幽崩。

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

  • WindowAll
    DataStream→AllWindowedStream

Windows可以在常規(guī)DataStream上定義。Windows根據(jù)某些特征(例如寞钥,在最后5秒內(nèi)到達(dá)的數(shù)據(jù))對(duì)所有流事件進(jìn)行分組慌申。有關(guān)窗口的完整說明,請參見windows理郑。

警告:在許多情況下蹄溉,這是非并行轉(zhuǎn)換。對(duì)于indowAll運(yùn)算算子來說所有記錄將收集在一個(gè)任務(wù)中您炉。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

  • Window Apply
    WindowedStream→DataStream
    AllWindowedStream→DataStream |

將一般性函數(shù)應(yīng)用于整個(gè)窗口柒爵。下面是一個(gè)求和窗口函數(shù)。

注意:如果您正在使用windowAll轉(zhuǎn)換赚爵,則需要使用AllWindowFunction棉胀。

windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

  • Window Reduce
    WindowedStream→DataStream |

將reduce函數(shù)應(yīng)用于窗口。返回reduce后的新結(jié)果冀膝。

windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
});

  • Window Fold
    WindowedStream→DataStream

將fold函數(shù)應(yīng)用于窗口并返回新的值唁奢。示例函數(shù)應(yīng)用于序列(1,2,3,4,5)時(shí),fold函數(shù)的輸出為字符串“start-1-2-3-4-5”:

windowedStream.fold("start", new FoldFunction<Integer, String>() {
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
});
  • Windows上的聚合
    WindowedStream→DataStream

聚合窗口上的內(nèi)容窝剖。min和minBy之間的差異是:min返回最小值麻掸,而minBy返回該字段中具有最小值的元素(max和maxBy相同)。

windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");
  • Union
    DataStream *→DataStream

兩個(gè)或多個(gè)數(shù)據(jù)流的聯(lián)合赐纱,創(chuàng)建包含來自所有流的所有元素的新流脊奋。注意:如果將數(shù)據(jù)流與其自身聯(lián)合采郎,則會(huì)在結(jié)果流中獲取兩次元素。

dataStream.union(otherStream1, otherStream2, ...);
  • Window join
    DataStream狂魔,DataStream→DataStream |

在給定key和公共窗口上連接兩個(gè)數(shù)據(jù)流蒜埋。

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});

  • Window CoGroup
    DataStream,DataStream→DataStream |

在給定key和公共窗口上對(duì)兩個(gè)數(shù)據(jù)流進(jìn)行Cogroup最楷。

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});

  • Connect
    DataStream整份,DataStream→ConnectedStreams |

“連接”兩個(gè)保留其類型的數(shù)據(jù)流。連接操作允許兩個(gè)流之間的共享狀態(tài)籽孙。

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

  • CoMap烈评,CoFlatMap
    ConnectedStreams→DataStream

類似于ConnectedStreams上的map和flatMap

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});

  • Split
    DataStream→SplitStream

根據(jù)某些標(biāo)準(zhǔn)將流拆分為兩個(gè)或更多個(gè)流。

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

  • Select
    SplitStream→DataStream

從split流中選擇一個(gè)或多個(gè)流犯建。

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
  • 迭代
    DataStream→IterativeStream→DataStream |

通過將一個(gè)運(yùn)算符的輸出重定向到某個(gè)先前的運(yùn)算符讲冠,在流中創(chuàng)建“反饋”循環(huán)。這對(duì)于定義不斷更新模型的算法特別有用适瓦。以下代碼以流開頭并連續(xù)應(yīng)用迭代體竿开。大于0的元素將被發(fā)送回反饋通道,其余元素將向下游轉(zhuǎn)發(fā)玻熙。有關(guān)完整說明否彩,請參閱迭代

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});

物理分區(qū)

Flink還通過以下函數(shù)對(duì)轉(zhuǎn)換后的stream進(jìn)行精確分區(qū)嗦随、進(jìn)行l(wèi)ow-level控制(如果需要)列荔。

  • 自定義分區(qū)
    DataStream→DataStream

使用用戶定義的分區(qū)程序(Partitioner )為每個(gè)元素選擇目標(biāo)分區(qū)。

dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);

  • 隨機(jī)分區(qū)
    DataStream→DataStream
    均勻分布隨機(jī)分配元素:(均勻分布)
dataStream.shuffle();
  • 重新平衡(循環(huán)分區(qū))
    DataStream→DataStream
    對(duì)元素循環(huán)分區(qū)枚尼,每個(gè)分區(qū)的負(fù)載相等贴浙。在存在數(shù)據(jù)偏斜時(shí),用于性能優(yōu)化署恍。
dataStream.rebalance();
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末崎溃,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子锭汛,更是在濱河造成了極大的恐慌笨奠,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,907評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件唤殴,死亡現(xiàn)場離奇詭異般婆,居然都是意外死亡晤硕,警方通過查閱死者的電腦和手機(jī)哪轿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,987評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來奸柬,“玉大人,你說我怎么就攤上這事啤咽〗荆” “怎么了?”我有些...
    開封第一講書人閱讀 164,298評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵宇整,是天一觀的道長瓶佳。 經(jīng)常有香客問我,道長鳞青,這世上最難降的妖魔是什么霸饲? 我笑而不...
    開封第一講書人閱讀 58,586評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮臂拓,結(jié)果婚禮上厚脉,老公的妹妹穿的比我還像新娘。我一直安慰自己胶惰,他們只是感情好傻工,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,633評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著孵滞,像睡著了一般中捆。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上剃斧,一...
    開封第一講書人閱讀 51,488評(píng)論 1 302
  • 那天轨香,我揣著相機(jī)與錄音忽你,去河邊找鬼幼东。 笑死,一個(gè)胖子當(dāng)著我的面吹牛科雳,可吹牛的內(nèi)容都是我干的根蟹。 我是一名探鬼主播,決...
    沈念sama閱讀 40,275評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼糟秘,長吁一口氣:“原來是場噩夢啊……” “哼简逮!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起尿赚,我...
    開封第一講書人閱讀 39,176評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤散庶,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后凌净,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體悲龟,經(jīng)...
    沈念sama閱讀 45,619評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,819評(píng)論 3 336
  • 正文 我和宋清朗相戀三年冰寻,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了须教。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,932評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖轻腺,靈堂內(nèi)的尸體忽然破棺而出乐疆,到底是詐尸還是另有隱情,我是刑警寧澤贬养,帶...
    沈念sama閱讀 35,655評(píng)論 5 346
  • 正文 年R本政府宣布挤土,位于F島的核電站,受9級(jí)特大地震影響误算,放射性物質(zhì)發(fā)生泄漏耕挨。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,265評(píng)論 3 329
  • 文/蒙蒙 一尉桩、第九天 我趴在偏房一處隱蔽的房頂上張望筒占。 院中可真熱鬧,春花似錦蜘犁、人聲如沸翰苫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,871評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽奏窑。三九已至,卻和暖如春屈扎,著一層夾襖步出監(jiān)牢的瞬間埃唯,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,994評(píng)論 1 269
  • 我被黑心中介騙來泰國打工鹰晨, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留墨叛,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,095評(píng)論 3 370
  • 正文 我出身青樓模蜡,卻偏偏與公主長得像漠趁,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子忍疾,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,884評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容