Flink-2.Flink Table API

Flink 1.14.3

package com.ctgu.flink.sql;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.*;


public class Flink_Table_Window {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String createSql = "CREATE TABLE windowTable " +
                "    (" +
                "    `id` STRING," +
                "    `timestamp` BIGINT," +
                "    `value` DOUBLE," +
                "    `time_ltz` AS TO_TIMESTAMP_LTZ(`timestamp`, 3)," +
                "    `pt` AS PROCTIME()," +
                "    WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND" +
                "    )" +
                "    WITH (" +
                "       'connector'='filesystem'," +
                "       'format'='csv'," +
                "       'csv.field-delimiter'=' '," +
                "       'path'='data/dataInfo.txt'" +
                "    )";

        tableEnv.executeSql(createSql);
        Table table = tableEnv.from("windowTable");

        table.printSchema();

        Table tumble = table.window(Tumble.over(lit(4).seconds()).on($("time_ltz")).as("tw"))
                .groupBy($("tw"), $("id"))
                .select($("id"), $("value").count(), $("tw").end().toTime());

        Table slide = table.window(Slide.over(lit(20).seconds()).every(lit(4).seconds()).on($("time_ltz")).as("tw"))
                .groupBy($("tw"), $("id"))
                .select($("id"), $("value").count(), $("tw").end().toTime());


        Table result = table.select($("id"), $("value"), $("time_ltz").toTime());

        tableEnv.toDataStream(result, Row.class).print("result");
        tableEnv.toDataStream(tumble, Row.class).print("tumble");
        tableEnv.toDataStream(slide, Row.class).print("slide");

        Table rank = table.window(
                Over.partitionBy($("id"))
//                        .orderBy($("pt")).preceding(rowInterval(10L)).as("ow"))
                        .orderBy($("time_ltz")).preceding(lit(4).seconds()).as("ow"))
                .select($("id"),
                        $("value"),
                        $("value").max().over($("ow")).as("value_max"),
                        $("time_ltz"));

        rank.printSchema();

        tableEnv.toChangelogStream(rank, Schema.newBuilder()
                .column("id", "STRING")
                .column("score", "DOUBLE")
                .column("value_max", "DOUBLE")
                .column("time_ltz", "TIMESTAMP_LTZ(3)")
                .build()).print("rank");

        result.printSchema();

        env.execute("Table SQL");
    }

}


maven 配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.ctgu</groupId>
    <artifactId>flink_class</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink-version>1.14.3</flink-version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.12</artifactId>
            <version>${flink-version}</version>
        </dependency>

        <dependency>
            <groupId>com.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.3.2-test3</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.6</version>
        </dependency>
    </dependencies>
</project>
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末奸汇,一起剝皮案震驚了整個濱河市拢军,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌山上,老刑警劉巖手报,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蚯舱,死亡現(xiàn)場離奇詭異改化,居然都是意外死亡,警方通過查閱死者的電腦和手機枉昏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門陈肛,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人兄裂,你說我怎么就攤上這事句旱。” “怎么了晰奖?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵谈撒,是天一觀的道長。 經(jīng)常有香客問我匾南,道長啃匿,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任午衰,我火速辦了婚禮立宜,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘臊岸。我一直安慰自己橙数,他們只是感情好,可當我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布帅戒。 她就那樣靜靜地躺著灯帮,像睡著了一般。 火紅的嫁衣襯著肌膚如雪逻住。 梳的紋絲不亂的頭發(fā)上钟哥,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天,我揣著相機與錄音瞎访,去河邊找鬼腻贰。 笑死,一個胖子當著我的面吹牛扒秸,可吹牛的內(nèi)容都是我干的播演。 我是一名探鬼主播,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼伴奥,長吁一口氣:“原來是場噩夢啊……” “哼写烤!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起拾徙,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤洲炊,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體暂衡,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡询微,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了狂巢。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片拓提。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖隧膘,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情寺惫,我是刑警寧澤疹吃,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站西雀,受9級特大地震影響萨驶,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜艇肴,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一腔呜、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧再悼,春花似錦核畴、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至莺奸,卻和暖如春丑孩,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背灭贷。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工温学, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人甚疟。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓仗岖,卻偏偏與公主長得像,于是被迫代替她去往敵國和親古拴。 傳聞我的和親對象是個殘疾皇子箩帚,可洞房花燭夜當晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容