(2)FlinkSQL滾動窗口demo演示

滾動窗口(Tumbling Windows) 滾動窗口有固定的大小,是一種對數據進行均勻切片的劃分方式。窗口之間沒有重疊棚辽,也不會有間隔,是“首尾相接”的狀態(tài)冰肴。滾動窗口可以基于時間定義屈藐,也可以基于數據個數定義;需要的參數只有一個熙尉,就是窗口的大辛摺(window size)。


1.png

demo演示:
場景:接收通過socket發(fā)送過來的數據检痰,每30秒觸發(fā)一次窗口計算邏輯
(1)準備一個實體對象包归,消息對象

package com.pojo;

import java.io.Serializable;

/**
 * Created by lj on 2022-07-05.
 */
public class WaterSensor implements Serializable {
    private String id;
    private long ts;
    private int vc;

    public WaterSensor(){

    }

    public WaterSensor(String id,long ts,int vc){
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }

    public int getVc() {
        return vc;
    }

    public void setVc(int vc) {
        this.vc = vc;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public long getTs() {
        return ts;
    }

    public void setTs(long ts) {
        this.ts = ts;
    }
}

(2)編寫socket代碼,模擬數據發(fā)送

package com.producers;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

/**
 * Created by lj on 2022-07-05.
 */
public class Socket_Producer {
    public static void main(String[] args) throws IOException {

        try {
            ServerSocket ss = new ServerSocket(9999);
            System.out.println("啟動 server ....");
            Socket s = ss.accept();
            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
            String response = "java,1,2";

            //每 2s 發(fā)送一次消息
            int i = 0;
            Random r=new Random();   
            String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};

            while(true){
                Thread.sleep(2000);
                response= lang[r.nextInt(lang.length)] + "," + i + "," + i+"\n";
                System.out.println(response);
                try{
                    bw.write(response);
                    bw.flush();
                    i++;
                }catch (Exception ex){
                    System.out.println(ex.getMessage());
                }

            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

(3)從socket端接收數據铅歼,并設置30秒觸發(fā)執(zhí)行一次窗口運算

package com.examples;

import com.pojo.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

/**
 * Created by lj on 2022-07-06.
 *
 * 滾動窗口(Tumbling Windows) 滾動窗口有固定的大小公壤,是一種對數據進行均勻切片的劃分方式。窗口之間沒有重疊椎椰,也不會有間隔厦幅,
 * 是“首尾相接”的狀態(tài)。滾動窗口可以基于時間定義慨飘,也可以基于數據個數定義确憨;需要的參數只有一個译荞,
 * 就是窗口的大小(window size)休弃。
 */
public class Flink_Group_Window_Tumble {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");
        SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] split = s.split(",");
                return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
            }
        });

        // 將流轉化為表
        Table table = tableEnv.fromDataStream(waterDS,
                $("id"),
                $("ts"),
                $("vc"),
                $("pt").proctime());

        tableEnv.createTemporaryView("EventTable", table);

        Table result = tableEnv.sqlQuery(
                "SELECT " +
                        "id, " +                //window_start, window_end,
                        "COUNT(ts) ,SUM(ts)" +
                        "FROM TABLE( " +
                        "TUMBLE( TABLE EventTable , " +
                        "DESCRIPTOR(pt), " +
                        "INTERVAL '30' SECOND)) " +
                        "GROUP BY id , window_start, window_end"
        );

//        tableEnv.toChangelogStream(result).print("count");
//        tableEnv.toDataStream(result).print("toDataStream");
//        tableEnv.toAppendStream(result, Row.class).print("toAppendStream");           //追加模式
        tableEnv.toRetractStream(result, Row.class).print("toRetractStream");       //縮進模式
        env.execute();
    }
}

(4)效果演示


2.png
3.png
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末吞歼,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子塔猾,更是在濱河造成了極大的恐慌篙骡,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,113評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件丈甸,死亡現場離奇詭異糯俗,居然都是意外死亡,警方通過查閱死者的電腦和手機老虫,發(fā)現死者居然都...
    沈念sama閱讀 88,644評論 2 381
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來茫多,“玉大人祈匙,你說我怎么就攤上這事√煲荆” “怎么了夺欲?”我有些...
    開封第一講書人閱讀 153,340評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長今膊。 經常有香客問我些阅,道長,這世上最難降的妖魔是什么斑唬? 我笑而不...
    開封第一講書人閱讀 55,449評論 1 279
  • 正文 為了忘掉前任市埋,我火速辦了婚禮,結果婚禮上恕刘,老公的妹妹穿的比我還像新娘缤谎。我一直安慰自己,他們只是感情好褐着,可當我...
    茶點故事閱讀 64,445評論 5 374
  • 文/花漫 我一把揭開白布坷澡。 她就那樣靜靜地躺著,像睡著了一般含蓉。 火紅的嫁衣襯著肌膚如雪频敛。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,166評論 1 284
  • 那天馅扣,我揣著相機與錄音斟赚,去河邊找鬼。 笑死差油,一個胖子當著我的面吹牛汁展,可吹牛的內容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 38,442評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼食绿,長吁一口氣:“原來是場噩夢啊……” “哼侈咕!你這毒婦竟也來了?” 一聲冷哼從身側響起器紧,我...
    開封第一講書人閱讀 37,105評論 0 261
  • 序言:老撾萬榮一對情侶失蹤耀销,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后铲汪,有當地人在樹林里發(fā)現了一具尸體熊尉,經...
    沈念sama閱讀 43,601評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,066評論 2 325
  • 正文 我和宋清朗相戀三年掌腰,在試婚紗的時候發(fā)現自己被綠了狰住。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,161評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡齿梁,死狀恐怖催植,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情勺择,我是刑警寧澤创南,帶...
    沈念sama閱讀 33,792評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站省核,受9級特大地震影響稿辙,放射性物質發(fā)生泄漏。R本人自食惡果不足惜气忠,卻給世界環(huán)境...
    茶點故事閱讀 39,351評論 3 307
  • 文/蒙蒙 一邻储、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧旧噪,春花似錦芥备、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至日月,卻和暖如春袱瓮,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背爱咬。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評論 1 261
  • 我被黑心中介騙來泰國打工尺借, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人精拟。 一個月前我還...
    沈念sama閱讀 45,618評論 2 355
  • 正文 我出身青樓燎斩,卻偏偏與公主長得像虱歪,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子栅表,可洞房花燭夜當晚...
    茶點故事閱讀 42,916評論 2 344

推薦閱讀更多精彩內容