基于Apache Flink的流處理 第五章DataStream API

1廓推、一個簡單的開始

package BaseOperationTest;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamContextEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCount {
    public static void main(String[] args) throws Exception {
        // 創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();

        // 從文件中讀取數(shù)據(jù)
//        String inputPath = "/Users/kaiker/Documents/projects/flink_study/src/main/resources/hello.txt";
//        DataStream<String> inputDataStream = env.readTextFile(inputPath);

        DataStream<String> inputDataStream = env.socketTextStream("localhost", 7777);

        // 對數(shù)據(jù)集進行處理辅辩,按空格分詞展開难礼,轉(zhuǎn)換成(word, 1)二元組進行統(tǒng)計
        // 按照第一個位置的word分組
        // 按照第二個位置上的數(shù)據(jù)求和
        DataStream<Tuple2<String, Integer>> resultSet = inputDataStream.flatMap(new MyFlatMapper())
                .keyBy(0)
                .sum(1);

        resultSet.print();

        env.execute();
    }

    // 自定義類,實現(xiàn)FlatMapFunction接口
    public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception {
            // 按空格分詞
            String[] words = s.split(" ");
            // 遍歷所有word玫锋,包成二元組輸出
            for (String str : words) {
                out.collect(new Tuple2<>(str, 1));
            }
        }
    }

}

2蛾茉、轉(zhuǎn)換操作

2.2 基于KeyedStream的轉(zhuǎn)換

keyBy

通過指定鍵值的方式將一個DataStream轉(zhuǎn)化為KeyedStream。流中的事件會根據(jù)各自鍵值被分到不同的分區(qū)

KeyBy分區(qū)操作
DataStream<SensorReading> inputDataStream = env.socketTextStream("localhost", 7777);
keyStream = inputDataStream.keyBy(SensorReading::getId);

滾動聚合

sum() min() max() 不斷獲取數(shù)據(jù)景醇,更新獲取到的聚合值
minBy() maxBy() sumBy() 不斷獲取數(shù)據(jù)臀稚,更新獲取到的聚合值吝岭,非聚合的值會使用最新的那一條更新

Reduce

每個到來的事件都會和reduce結(jié)果進行一次組合三痰,reduce不會改變數(shù)據(jù)類型
接收兩個參數(shù)吧寺,一個記錄現(xiàn)有(或者說已經(jīng)聚合的)記錄,一個記錄新來的記錄散劫,然后對兩者進行聚合稚机。

DataStream<SensorReading> resultStream = keyedStream.reduce(
                (curSensor,newSensor)->new SensorReading(curSensor.getId(),newSensor.getTimestamp(), Math.max(curSensor.getTemperature(), newSensor.getTemperature()))
        );

2.3 多流轉(zhuǎn)換

Union

DataStream.union可以合并多條類型相同的DataStream,生成一個新的類型相同的DataStream


union

Connect

  • connect會連接起兩條流获搏,默認情況下赖条,不會使兩條輸入流的事件之間產(chǎn)生任何關(guān)聯(lián)。但是可以結(jié)合keyBy等方法使用常熙,讓兩條流有一定關(guān)聯(lián)
  • 有coMap\coFlatMap等方法可以分別對兩條流進行處理

3纬乍、類型

  • 原始類型
  • Java和Scala Tuple,F(xiàn)link提供了Java元組的高效實現(xiàn)裸卫,最多可包含25個字段
  • Scala case類
  • POJO類仿贬,如果一個類有公共類、有一個無參構(gòu)造器墓贿、每個字段都有g(shù)etter和setter茧泪、字段類型可以支持,則可以被分析為POJO
  • 其他特殊類型

為數(shù)據(jù)類型創(chuàng)建類型信息

TypeInformation<Integer> intType = Types.INT

顯式提供類型信息

有時候一些必要的信息可能無法提取聋袋,比如Java的類型擦除
可以通過實現(xiàn)ResultTypeQueryable接口來提供類型队伟,也可以使用returns()方法來顯示指定某算子的返回類型

DataStream<Person> persons = tuples
.map(t -> new Person(t.f0, t.f1)).returns(Types..POJO(Person.class));

4、富函數(shù)

  • DataStreamAPI中所有轉(zhuǎn)換函數(shù)都有對應(yīng)的富函數(shù)
  • 富函數(shù)可以在處理第一條數(shù)據(jù)之前進行初始化操作幽勒,獲取到一些上下文信息
public static class MyMapFunction extends RichMapFunction<SensorReading, Tuple2<Integer, String>> { 

  @Override public Tuple2<Integer, String> map(SensorReading value) throws Exception {
    return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), value.getId()); 
  } 

  @Override public void open(Configuration parameters) throws Exception { 
    System.out.println(getRuntimeContext.getIndexOfThisSubtask); // 以下可以做一些初始化工作嗜侮,例如建立一個和HDFS的連接 
  } 

  @Override public void close() throws Exception { 
    System.out.println("my map close"); // 以下做一些清理工作,例如斷開和HDFS的連接 
  } 
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末代嗤,一起剝皮案震驚了整個濱河市棘钞,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌干毅,老刑警劉巖宜猜,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異硝逢,居然都是意外死亡姨拥,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門渠鸽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來叫乌,“玉大人,你說我怎么就攤上這事徽缚『┘椋” “怎么了?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵凿试,是天一觀的道長排宰。 經(jīng)常有香客問我似芝,道長,這世上最難降的妖魔是什么板甘? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任党瓮,我火速辦了婚禮,結(jié)果婚禮上盐类,老公的妹妹穿的比我還像新娘寞奸。我一直安慰自己,他們只是感情好在跳,可當(dāng)我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布枪萄。 她就那樣靜靜地躺著,像睡著了一般猫妙。 火紅的嫁衣襯著肌膚如雪呻引。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天吐咳,我揣著相機與錄音逻悠,去河邊找鬼。 笑死韭脊,一個胖子當(dāng)著我的面吹牛童谒,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播沪羔,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼饥伊,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了蔫饰?” 一聲冷哼從身側(cè)響起琅豆,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎篓吁,沒想到半個月后茫因,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡杖剪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年冻押,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片盛嘿。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡洛巢,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出次兆,到底是詐尸還是另有隱情稿茉,我是刑警寧澤,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站漓库,受9級特大地震影響城须,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜米苹,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望砰琢。 院中可真熱鬧蘸嘶,春花似錦、人聲如沸陪汽。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽挚冤。三九已至况增,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間训挡,已是汗流浹背澳骤。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留澜薄,地道東北人为肮。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像肤京,于是被迫代替她去往敵國和親颊艳。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容