Apache Flink 學(xué)習(xí)筆記(二)

上一篇 Apache Flink 學(xué)習(xí)筆記(一) 簡單示范了批處理的使用,本篇展示流式處理的使用方法。

流處理也叫無界處理,因為數(shù)據(jù)是源源不斷的被加載進來的腿时,流處理需要用到DataStream類。本篇demo 將結(jié)合kafka(公司有現(xiàn)成的消息生產(chǎn)者)來演示饭宾。

kafka 消息體如下(json):

{
    "appId":"xxxx",
    "module":"xxxx"
    //其余省略
}

現(xiàn)在我想每10s統(tǒng)計一次批糟,按照appid分組計數(shù)(需求簡單一點),Event TimeProcessingTime看铆,Windows滾動窗口跃赚。

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class Demo3 {
    public static void main(String[] args) {
        //生成流式執(zhí)行環(huán)境對象 StreamExecutionEnvironment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().enableSysoutLogging();//開啟Sysout打日志
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //設(shè)置窗口的時間單位為process time
        env.setParallelism(2);//全局并發(fā)數(shù)
        //配置kafka bootstrap.servers
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "kafka bootstrap.servers");
        //配置消息主題和應(yīng)用名(自定義工具類FlinkKafkaManager,源碼在后面)
        FlinkKafkaManager manager = new FlinkKafkaManager("kafka.topic", "app.name", properties);
        //用JsonObject 反序列化接收kafka
        FlinkKafkaConsumer09<JSONObject> consumer = manager.build(JSONObject.class);
        //從最新的消息開始接收
        consumer.setStartFromLatest();
        //獲得DataStream
        DataStream<JSONObject> messageStream = env.addSource(consumer);
        //轉(zhuǎn)化為pojo
        DataStream<Bean3> bean3DataStream = messageStream.map(new FlatMap());
        bean3DataStream
                .keyBy(Bean3::getAppId) //也可以用“appId”替換
                .timeWindow(Time.seconds(10))//等價于下面這一行性湿,因為上面設(shè)置了TimeCharacteristic.ProcessingTime
               // .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))//基于process time的窗口
                .aggregate(new Agg()) //聚合函數(shù)满败,這里也可以參照demo2用reduce函數(shù)
                .addSink(new Sink()); //輸出函數(shù)
        try {
            env.execute("app.name");//流式處理需要調(diào)用觸發(fā)
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static class FlatMap implements MapFunction<JSONObject, Bean3> {
        @Override
        public Bean3 map(JSONObject jsonObject) throws Exception {
            return new Bean3(jsonObject.getString("appId"), jsonObject.getString("module"));
        }
    }

    public static class Agg implements AggregateFunction<Bean3, Tuple2<Bean3, Long>, Tuple2<Bean3, Long>> {
        @Override
        public Tuple2<Bean3, Long> createAccumulator() {
            return new Tuple2<Bean3, Long>();
        }

        @Override
        public Tuple2<Bean3, Long> add(Bean3 bean3, Tuple2<Bean3, Long> bean3LongTuple2) {
            Bean3 bean = bean3LongTuple2.f0;
            Long count = bean3LongTuple2.f1;
            if (bean == null) {
                bean = bean3;
            }
            if (count == null) {
                count = 1L;
            } else {
                count++;
            }
            return new Tuple2<>(bean, count);
        }

        @Override
        public Tuple2<Bean3, Long> getResult(Tuple2<Bean3, Long> bean3LongTuple2) {
            return bean3LongTuple2;
        }

        @Override
        public Tuple2<Bean3, Long> merge(Tuple2<Bean3, Long> bean3LongTuple2, Tuple2<Bean3, Long> acc1) {
            Bean3 bean = bean3LongTuple2.f0;
            Long count = bean3LongTuple2.f1;
            Long acc = acc1.f1;
            return new Tuple2<>(bean, count + acc);
        }
    }

    public static class Sink implements SinkFunction<Tuple2<Bean3, Long>> {
        @Override
        public void invoke(Tuple2<Bean3, Long> value, Context context) throws Exception {
            System.out.println(value.f0.toString() + "," + value.f1);
        }
    }

    public static class Bean3 {
        public String appId;
        public String module;

        public Bean3() {
        }

        public Bean3(String appId, String module) {
            this.appId = appId;
            this.module = module;
        }

        public String getAppId() {
            return appId;
        }

        public void setAppId(String appId) {
            this.appId = appId;
        }

        public String getModule() {
            return module;
        }

        public void setModule(String module) {
            this.module = module;
        }

        @Override
        public String toString() {
            return "Bean3{" +
                    "appId='" + appId + '\'' +
                    ", module='" + module + '\'' +
                    '}';
        }
    }
}

與上一篇批處理的demo相比肤频,流處理顯得復(fù)雜了許多。實際上二者有很多想通的地方算墨,比如批處理中的groupBy和流處理的keyBy宵荒,都是按照指定維度分組的。

而流處理中會引入窗口的概念净嘀,正如前面所說报咳,流式數(shù)據(jù)是無界數(shù)據(jù),Flink 借助窗口將無界數(shù)據(jù)轉(zhuǎn)化成一個個“批處理”再做計算挖藏。窗口分為滾動窗口暑刃,滑動窗口會話窗口等等膜眠,具體可參見官網(wǎng)介紹岩臣。而每個窗口的時間劃分則是由event time 決定的溜嗜,本例采用的是ProcessingTime即處理時間。

下面我將demo3改造架谎,使其變成使用EventTime炸宵,也就是說窗口的時間由數(shù)據(jù)源的時間戳(事件發(fā)生)決定。

改動1
//為pojo Bean3 添加時間戳字段
public static class Bean3 {
    public Long timestamp;//add event time
    public String appId;
    public String module;

    public Bean3() {
    }

    public Bean3(Long timestamp, String appId, String module) {
        this.timestamp = timestamp;
        this.appId = appId;
        this.module = module;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Long timestamp) {
        this.timestamp = timestamp;
    }
//省略其他
}
改動2
//設(shè)置窗口的時間單位為event time  
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
改動3
//新增
//指定數(shù)據(jù)源的時間戳谷扣,Time.seconds(int)是指允許多長時間消息延遲
DataStream<Bean3> bean3DataStreamWithAssignTime = 
bean3DataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Bean3>(Time.seconds(0)) {
    @Override
    public long extractTimestamp(Bean3 element) {
        return element.getTimestamp();
    }
});
改動4
bean3DataStreamWithAssignTime
                .keyBy(Bean3::getAppId)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))//基于event time的窗口
                .allowedLateness(Time.seconds(5)) //允許數(shù)據(jù)延遲多長時間,謹慎使用,遲到的數(shù)據(jù)會導(dǎo)致出現(xiàn)重復(fù)
//后面省略
FlinkKafkaManager 源碼
package flink.test.manager;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import java.util.Properties;

public class FlinkKafkaManager<T> {
    private String topic;
    private String groupId;
    private Properties properties;

    public FlinkKafkaManager(String topic, String groupId, Properties properties) {
        this.topic = topic;
        this.groupId = groupId;
        this.properties = properties;
        this.properties.setProperty("group.id", this.groupId);
        //為使用默認kafka的用戶配置基礎(chǔ)配置
        this.setDefaultKafkaProperties();
    }

    private void setDefaultKafkaProperties() {
        //啟用auto commit offset, 每5s commit一次
        this.properties.setProperty("enable.auto.commit", "true");
        this.properties.setProperty("auto.commit.interval.ms", "5000");
    }

    public FlinkKafkaConsumer09<T> build(Class<T> clazz) {
        if (checkProperties()) {
            return new FlinkKafkaConsumer09<T>(topic, new ConsumerDeserializationSchema(clazz), properties);
        } else {
            return null;
        }
    }

    private boolean checkProperties() {
        boolean isValued = true;

        if (!properties.containsKey("bootstrap.servers")) {
            isValued = false;
        } else {
            String brokers = properties.getProperty("bootstrap.servers");
            if (brokers == null || brokers.isEmpty()) {
                isValued = false;
            }
        }

        if (this.topic == null || this.topic.isEmpty()) {
            isValued = false;
        }

        if (!properties.containsKey("group.id")) {
            isValued = false;
        } else {
            String groupId = properties.getProperty("group.id");
            if (groupId == null || groupId.isEmpty()) {
                isValued = false;
            }
        }

        return isValued;
    }
}
ConsumerDeserializationSchema 源碼
package flink.test.manager;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;

import java.io.IOException;

public class ConsumerDeserializationSchema<T> implements DeserializationSchema<T> {
    private Class<T> clazz;

    public ConsumerDeserializationSchema(Class<T> clazz) {
        this.clazz = clazz;
    }

    @Override
    public T deserialize(byte[] bytes) throws IOException {
        //確保 new String(bytes) 是json 格式土全,如果不是,請自行解析
        return JSON.parseObject(new String(bytes), clazz);
    }

    @Override
    public boolean isEndOfStream(T t) {
        return false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(clazz);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末会涎,一起剝皮案震驚了整個濱河市裹匙,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌在塔,老刑警劉巖幻件,帶你破解...
    沈念sama閱讀 211,376評論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蛔溃,居然都是意外死亡绰沥,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評論 2 385
  • 文/潘曉璐 我一進店門贺待,熙熙樓的掌柜王于貴愁眉苦臉地迎上來徽曲,“玉大人,你說我怎么就攤上這事麸塞⊥撼迹” “怎么了?”我有些...
    開封第一講書人閱讀 156,966評論 0 347
  • 文/不壞的土叔 我叫張陵哪工,是天一觀的道長奥此。 經(jīng)常有香客問我,道長雁比,這世上最難降的妖魔是什么稚虎? 我笑而不...
    開封第一講書人閱讀 56,432評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮偎捎,結(jié)果婚禮上蠢终,老公的妹妹穿的比我還像新娘。我一直安慰自己茴她,他們只是感情好寻拂,可當(dāng)我...
    茶點故事閱讀 65,519評論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著丈牢,像睡著了一般祭钉。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上己沛,一...
    開封第一講書人閱讀 49,792評論 1 290
  • 那天朴皆,我揣著相機與錄音帕识,去河邊找鬼。 笑死遂铡,一個胖子當(dāng)著我的面吹牛肮疗,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播扒接,決...
    沈念sama閱讀 38,933評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼伪货,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了钾怔?” 一聲冷哼從身側(cè)響起碱呼,我...
    開封第一講書人閱讀 37,701評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎宗侦,沒想到半個月后愚臀,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,143評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡矾利,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,488評論 2 327
  • 正文 我和宋清朗相戀三年姑裂,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片男旗。...
    茶點故事閱讀 38,626評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡舶斧,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出察皇,到底是詐尸還是另有隱情茴厉,我是刑警寧澤,帶...
    沈念sama閱讀 34,292評論 4 329
  • 正文 年R本政府宣布什荣,位于F島的核電站矾缓,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏稻爬。R本人自食惡果不足惜嗜闻,卻給世界環(huán)境...
    茶點故事閱讀 39,896評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望因篇。 院中可真熱鬧,春花似錦笔横、人聲如沸竞滓。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽商佑。三九已至,卻和暖如春厢塘,著一層夾襖步出監(jiān)牢的瞬間茶没,已是汗流浹背肌幽。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留抓半,地道東北人喂急。 一個月前我還...
    沈念sama閱讀 46,324評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像笛求,于是被迫代替她去往敵國和親廊移。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,494評論 2 348

推薦閱讀更多精彩內(nèi)容