(5)Flink CEP SQL四種匹配模式效果演示

Flinkcep封面.png

Flink CEP SQL中提供了四種匹配策略:
(1)skip to next row
從匹配成功的事件序列中的第一個(gè)事件的下一個(gè)事件開始進(jìn)行下一次匹配
(2)skip past last row
從匹配成功的事件序列中的最后一個(gè)事件的下一個(gè)事件開始進(jìn)行下一次匹配
(3)skip to first pattern Item
從匹配成功的事件序列中第一個(gè)對應(yīng)于patternItem的事件開始進(jìn)行下一次匹配
(4)skip to last pattern Item
從匹配成功的事件序列中最后一個(gè)對應(yīng)于patternItem的事件開始進(jìn)行下一次匹配

接下來我們代碼來演示一下每種策略模式表達(dá)的效果:
(1)skip to next row

package com.examples;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * Created by lj on 2022-08-08.
 */
public class CEPSQLExampleAfterMatch {
    private static final Logger LOG = LoggerFactory.getLogger(CEPSQLExampleAfterMatch.class);

    public static void main(String[] args) {
        EnvironmentSettings settings = null;
        StreamTableEnvironment tEnv = null;
        try {

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
            tEnv = StreamTableEnvironment.create(env, settings);
            final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            DataStream<Ticker> dataStream =
                    env.fromElements(
                            new Ticker(2, "Apple", 11, 2, LocalDateTime.parse("2021-12-10 10:00:01", dateTimeFormatter)),
                            new Ticker(3, "Apple", 16, 2, LocalDateTime.parse("2021-12-10 10:00:02", dateTimeFormatter)),
                            new Ticker(4, "Apple", 13, 2, LocalDateTime.parse("2021-12-10 10:00:03", dateTimeFormatter)),
                            new Ticker(5, "Apple", 15, 2, LocalDateTime.parse("2021-12-10 10:00:04", dateTimeFormatter)),
                            new Ticker(6, "Apple", 14, 1, LocalDateTime.parse("2021-12-10 10:00:05", dateTimeFormatter)),
                            new Ticker(7, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:06", dateTimeFormatter)),
                            new Ticker(8, "Apple", 23, 2, LocalDateTime.parse("2021-12-10 10:00:07", dateTimeFormatter)),
                            new Ticker(9, "Apple", 22, 2, LocalDateTime.parse("2021-12-10 10:00:08", dateTimeFormatter)),
                            new Ticker(10, "Apple", 25, 2, LocalDateTime.parse("2021-12-10 10:00:09", dateTimeFormatter)),
                            new Ticker(11, "Apple", 11, 1, LocalDateTime.parse("2021-12-10 10:00:11", dateTimeFormatter)),
                            new Ticker(12, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:12", dateTimeFormatter)),
                            new Ticker(13, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:13", dateTimeFormatter)),
                            new Ticker(14, "Apple", 25, 1, LocalDateTime.parse("2021-12-10 10:00:14", dateTimeFormatter)),
                            new Ticker(15, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:15", dateTimeFormatter)),
                            new Ticker(16, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:16", dateTimeFormatter)),
                            new Ticker(17, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:17", dateTimeFormatter)),
                            new Ticker(18, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:18", dateTimeFormatter)));
            
            Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
                    .column("id", DataTypes.BIGINT())
                    .column("symbol", DataTypes.STRING())
                    .column("price", DataTypes.BIGINT())
                    .column("tax", DataTypes.BIGINT())
                    .column("rowtime", DataTypes.TIMESTAMP(3))
                    .watermark("rowtime", "rowtime - INTERVAL '1' SECOND")
                    .build());
            tEnv.createTemporaryView("CEP_SQL_1", table);
            
            String sql = "SELECT * " +
                    "FROM CEP_SQL_1 " +
                    "    MATCH_RECOGNIZE ( " +
                    "        PARTITION BY symbol " +       //按symbol分區(qū),將相同卡號(hào)的數(shù)據(jù)分到同一個(gè)計(jì)算節(jié)點(diǎn)上容燕。
                    "        ORDER BY rowtime " +          //在窗口內(nèi)拄轻,對事件時(shí)間進(jìn)行排序。
                    "        MEASURES " +                   //定義如何根據(jù)匹配成功的輸入事件構(gòu)造輸出事件
                    "            FIRST(e1.id) as id,"+
                    "            AVG(e1.price) as avgPrice,"+
                    "            FIRST(e1.rowtime) AS e1_start_tstamp, " +
                    "            LAST(e2.rowtime) AS e2_fast_tstamp " +
                    "        ONE ROW PER MATCH " +                                        //匹配成功輸出一條
                    "        AFTER MATCH skip to next row " +
                    "        PATTERN ( e1+ e2) WITHIN INTERVAL '2' MINUTE" +
                    "        DEFINE " +                                                     //定義各事件的匹配條件
                    "            e1 AS " +
                    "                e1.price < 19 , " +
                    "            e2 AS " +
                    "                e2.price >= 19 " +
                    "    ) MR";
            
            
            TableResult res = tEnv.executeSql(sql);
            res.print();
            tEnv.dropTemporaryView("CEP_SQL_1");
                
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }


    public static class Ticker {
        public long id;
        public String symbol;
        public long price;
        public long tax;
        public LocalDateTime rowtime;

        public Ticker() {
        }

        public Ticker(long id, String symbol, long price, long item, LocalDateTime rowtime) {
            this.id = id;
            this.symbol = symbol;
            this.price = price;
            this.tax = tax;
            this.rowtime = rowtime;
        }
    }
}
image.png

(2)skip past last row

package com.examples;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * Created by lj on 2022-08-08.
 */
public class CEPSQLExampleAfterMatch {
    private static final Logger LOG = LoggerFactory.getLogger(CEPSQLExampleAfterMatch.class);

    public static void main(String[] args) {
        EnvironmentSettings settings = null;
        StreamTableEnvironment tEnv = null;
        try {

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
            tEnv = StreamTableEnvironment.create(env, settings);
            final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            DataStream<Ticker> dataStream =
                    env.fromElements(
                            new Ticker(2, "Apple", 11, 2, LocalDateTime.parse("2021-12-10 10:00:01", dateTimeFormatter)),
                            new Ticker(3, "Apple", 16, 2, LocalDateTime.parse("2021-12-10 10:00:02", dateTimeFormatter)),
                            new Ticker(4, "Apple", 13, 2, LocalDateTime.parse("2021-12-10 10:00:03", dateTimeFormatter)),
                            new Ticker(5, "Apple", 15, 2, LocalDateTime.parse("2021-12-10 10:00:04", dateTimeFormatter)),
                            new Ticker(6, "Apple", 14, 1, LocalDateTime.parse("2021-12-10 10:00:05", dateTimeFormatter)),
                            new Ticker(7, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:06", dateTimeFormatter)),
                            new Ticker(8, "Apple", 23, 2, LocalDateTime.parse("2021-12-10 10:00:07", dateTimeFormatter)),
                            new Ticker(9, "Apple", 22, 2, LocalDateTime.parse("2021-12-10 10:00:08", dateTimeFormatter)),
                            new Ticker(10, "Apple", 25, 2, LocalDateTime.parse("2021-12-10 10:00:09", dateTimeFormatter)),
                            new Ticker(11, "Apple", 11, 1, LocalDateTime.parse("2021-12-10 10:00:11", dateTimeFormatter)),
                            new Ticker(12, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:12", dateTimeFormatter)),
                            new Ticker(13, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:13", dateTimeFormatter)),
                            new Ticker(14, "Apple", 25, 1, LocalDateTime.parse("2021-12-10 10:00:14", dateTimeFormatter)),
                            new Ticker(15, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:15", dateTimeFormatter)),
                            new Ticker(16, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:16", dateTimeFormatter)),
                            new Ticker(17, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:17", dateTimeFormatter)),
                            new Ticker(18, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:18", dateTimeFormatter)));
            
            Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
                    .column("id", DataTypes.BIGINT())
                    .column("symbol", DataTypes.STRING())
                    .column("price", DataTypes.BIGINT())
                    .column("tax", DataTypes.BIGINT())
                    .column("rowtime", DataTypes.TIMESTAMP(3))
                    .watermark("rowtime", "rowtime - INTERVAL '1' SECOND")
                    .build());
            tEnv.createTemporaryView("CEP_SQL_2", table);
            
            String sql = "SELECT * " +
                    "FROM CEP_SQL_2 " +
                    "    MATCH_RECOGNIZE ( " +
                    "        PARTITION BY symbol " +       //按symbol分區(qū)皱卓,將相同卡號(hào)的數(shù)據(jù)分到同一個(gè)計(jì)算節(jié)點(diǎn)上裹芝。
                    "        ORDER BY rowtime " +          //在窗口內(nèi),對事件時(shí)間進(jìn)行排序好爬。
                    "        MEASURES " +                   //定義如何根據(jù)匹配成功的輸入事件構(gòu)造輸出事件
                    "            e1.id as id,"+
                    "            AVG(e1.price) as avgPrice,"+
                    "            FIRST(e1.rowtime) AS e1_start_tstamp, " +
                    "            LAST(e1.rowtime) AS e1_fast_tstamp, " +
                    "            e2.rowtime AS end_tstamp " +
                    "        ONE ROW PER MATCH " +                                        //匹配成功輸出一條
                    "        AFTER MATCH skip past last row " +
                    "        PATTERN (e1+ e2) WITHIN INTERVAL '2' MINUTE" +
                    "        DEFINE " +                                                     //定義各事件的匹配條件
                    "            e1 AS " +
                    "                e1.price < 19 , " +
                    "            e2 AS " +
                    "                e2.price >= 19 " +
                    "    ) MR";
            
            
            TableResult res = tEnv.executeSql(sql);
            res.print();
            tEnv.dropTemporaryView("CEP_SQL_2");
                
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }


    public static class Ticker {
        public long id;
        public String symbol;
        public long price;
        public long tax;
        public LocalDateTime rowtime;

        public Ticker() {
        }

        public Ticker(long id, String symbol, long price, long item, LocalDateTime rowtime) {
            this.id = id;
            this.symbol = symbol;
            this.price = price;
            this.tax = tax;
            this.rowtime = rowtime;
        }
    }
}
image.png

(3)skip to first pattern Item

package com.examples;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * Created by lj on 2022-08-08.
 */
public class CEPSQLExampleAfterMatch {
    private static final Logger LOG = LoggerFactory.getLogger(CEPSQLExampleAfterMatch.class);

    public static void main(String[] args) {
        EnvironmentSettings settings = null;
        StreamTableEnvironment tEnv = null;
        try {

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
            tEnv = StreamTableEnvironment.create(env, settings);
            final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            DataStream<Ticker> dataStream =
                    env.fromElements(
                            new Ticker(2, "Apple", 11, 2, LocalDateTime.parse("2021-12-10 10:00:01", dateTimeFormatter)),
                            new Ticker(3, "Apple", 16, 2, LocalDateTime.parse("2021-12-10 10:00:02", dateTimeFormatter)),
                            new Ticker(4, "Apple", 13, 2, LocalDateTime.parse("2021-12-10 10:00:03", dateTimeFormatter)),
                            new Ticker(5, "Apple", 15, 2, LocalDateTime.parse("2021-12-10 10:00:04", dateTimeFormatter)),
                            new Ticker(6, "Apple", 14, 1, LocalDateTime.parse("2021-12-10 10:00:05", dateTimeFormatter)),
                            new Ticker(7, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:06", dateTimeFormatter)),
                            new Ticker(8, "Apple", 23, 2, LocalDateTime.parse("2021-12-10 10:00:07", dateTimeFormatter)),
                            new Ticker(9, "Apple", 22, 2, LocalDateTime.parse("2021-12-10 10:00:08", dateTimeFormatter)),
                            new Ticker(10, "Apple", 25, 2, LocalDateTime.parse("2021-12-10 10:00:09", dateTimeFormatter)),
                            new Ticker(11, "Apple", 11, 1, LocalDateTime.parse("2021-12-10 10:00:11", dateTimeFormatter)),
                            new Ticker(12, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:12", dateTimeFormatter)),
                            new Ticker(13, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:13", dateTimeFormatter)),
                            new Ticker(14, "Apple", 25, 1, LocalDateTime.parse("2021-12-10 10:00:14", dateTimeFormatter)),
                            new Ticker(15, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:15", dateTimeFormatter)),
                            new Ticker(16, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:16", dateTimeFormatter)),
                            new Ticker(17, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:17", dateTimeFormatter)),
                            new Ticker(18, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:18", dateTimeFormatter)));
            
            Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
                    .column("id", DataTypes.BIGINT())
                    .column("symbol", DataTypes.STRING())
                    .column("price", DataTypes.BIGINT())
                    .column("tax", DataTypes.BIGINT())
                    .column("rowtime", DataTypes.TIMESTAMP(3))
                    .watermark("rowtime", "rowtime - INTERVAL '1' SECOND")
                    .build());
            tEnv.createTemporaryView("CEP_SQL_3", table);
            
            String sql = "SELECT * " +
                    "FROM CEP_SQL_3 " +
                    "    MATCH_RECOGNIZE ( " +
                    "        PARTITION BY symbol " +       //按symbol分區(qū)局雄,將相同卡號(hào)的數(shù)據(jù)分到同一個(gè)計(jì)算節(jié)點(diǎn)上。
                    "        ORDER BY rowtime " +          //在窗口內(nèi)存炮,對事件時(shí)間進(jìn)行排序炬搭。
                    "        MEASURES " +                   //定義如何根據(jù)匹配成功的輸入事件構(gòu)造輸出事件
                    "            FIRST(e1.id) as id,"+
                    "            AVG(e1.price) as avgPrice,"+
                    "            FIRST(e1.rowtime) AS e1_start_tstamp, " +
                    "            LAST(e1.rowtime) AS e1_fast_tstamp, " +
                    "            e2.rowtime AS end_tstamp " +
                    "        ONE ROW PER MATCH " +                                        //匹配成功輸出一條
                    "        AFTER MATCH skip to first e1 " +
                    "        PATTERN (e0 e1+ e2) WITHIN INTERVAL '2' MINUTE" +
                    "        DEFINE " +                                                     //定義各事件的匹配條件
                    "            e1 AS " +
                    "                e1.price < 19 , " +
                    "            e2 AS " +
                    "                e2.price >= 19 " +
                    "    ) MR";
            
            
            TableResult res = tEnv.executeSql(sql);
            res.print();
            tEnv.dropTemporaryView("CEP_SQL_3");
                
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }


    public static class Ticker {
        public long id;
        public String symbol;
        public long price;
        public long tax;
        public LocalDateTime rowtime;

        public Ticker() {
        }

        public Ticker(long id, String symbol, long price, long item, LocalDateTime rowtime) {
            this.id = id;
            this.symbol = symbol;
            this.price = price;
            this.tax = tax;
            this.rowtime = rowtime;
        }
    }
}
image.png

(4)skip to last pattern Item

package com.examples;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * Created by lj on 2022-08-08.
 */
public class CEPSQLExampleAfterMatch {
    private static final Logger LOG = LoggerFactory.getLogger(CEPSQLExampleAfterMatch.class);

    public static void main(String[] args) {
        EnvironmentSettings settings = null;
        StreamTableEnvironment tEnv = null;
        try {

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
            tEnv = StreamTableEnvironment.create(env, settings);
            final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            DataStream<Ticker> dataStream =
                    env.fromElements(
                            new Ticker(2, "Apple", 11, 2, LocalDateTime.parse("2021-12-10 10:00:01", dateTimeFormatter)),
                            new Ticker(3, "Apple", 16, 2, LocalDateTime.parse("2021-12-10 10:00:02", dateTimeFormatter)),
                            new Ticker(4, "Apple", 13, 2, LocalDateTime.parse("2021-12-10 10:00:03", dateTimeFormatter)),
                            new Ticker(5, "Apple", 15, 2, LocalDateTime.parse("2021-12-10 10:00:04", dateTimeFormatter)),
                            new Ticker(6, "Apple", 14, 1, LocalDateTime.parse("2021-12-10 10:00:05", dateTimeFormatter)),
                            new Ticker(7, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:06", dateTimeFormatter)),
                            new Ticker(8, "Apple", 23, 2, LocalDateTime.parse("2021-12-10 10:00:07", dateTimeFormatter)),
                            new Ticker(9, "Apple", 22, 2, LocalDateTime.parse("2021-12-10 10:00:08", dateTimeFormatter)),
                            new Ticker(10, "Apple", 25, 2, LocalDateTime.parse("2021-12-10 10:00:09", dateTimeFormatter)),
                            new Ticker(11, "Apple", 11, 1, LocalDateTime.parse("2021-12-10 10:00:11", dateTimeFormatter)),
                            new Ticker(12, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:12", dateTimeFormatter)),
                            new Ticker(13, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:13", dateTimeFormatter)),
                            new Ticker(14, "Apple", 25, 1, LocalDateTime.parse("2021-12-10 10:00:14", dateTimeFormatter)),
                            new Ticker(15, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:15", dateTimeFormatter)),
                            new Ticker(16, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:16", dateTimeFormatter)),
                            new Ticker(17, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:17", dateTimeFormatter)),
                            new Ticker(18, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:18", dateTimeFormatter)));
            
            Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
                    .column("id", DataTypes.BIGINT())
                    .column("symbol", DataTypes.STRING())
                    .column("price", DataTypes.BIGINT())
                    .column("tax", DataTypes.BIGINT())
                    .column("rowtime", DataTypes.TIMESTAMP(3))
                    .watermark("rowtime", "rowtime - INTERVAL '1' SECOND")
                    .build());
            tEnv.createTemporaryView("CEP_SQL_4", table);
            
            String sql = "SELECT * " +
                    "FROM CEP_SQL_4 " +
                    "    MATCH_RECOGNIZE ( " +
                    "        PARTITION BY symbol " +       //按symbol分區(qū)蜈漓,將相同卡號(hào)的數(shù)據(jù)分到同一個(gè)計(jì)算節(jié)點(diǎn)上。
                    "        ORDER BY rowtime " +          //在窗口內(nèi)宫盔,對事件時(shí)間進(jìn)行排序融虽。
                    "        MEASURES " +                   //定義如何根據(jù)匹配成功的輸入事件構(gòu)造輸出事件
                    "            e1.id as id,"+
                    "            AVG(e1.price) as avgPrice,"+
                    "            FIRST(e1.rowtime) AS e1_start_tstamp, " +
                    "            LAST(e1.rowtime) AS e1_fast_tstamp, " +
                    "            e2.rowtime AS end_tstamp " +
                    "        ONE ROW PER MATCH " +                                        //匹配成功輸出一條
                    "        AFTER MATCH skip to last e1 " +
                    "        PATTERN (e0 e1+ e2) WITHIN INTERVAL '2' MINUTE" +
                    "        DEFINE " +                                                     //定義各事件的匹配條件
                    "            e1 AS " +
                    "                e1.price < 19 , " +
                    "            e2 AS " +
                    "                e2.price >= 19 " +
                    "    ) MR";
            
            
            TableResult res = tEnv.executeSql(sql);
            res.print();
            tEnv.dropTemporaryView("CEP_SQL_4");
                
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }


    public static class Ticker {
        public long id;
        public String symbol;
        public long price;
        public long tax;
        public LocalDateTime rowtime;

        public Ticker() {
        }

        public Ticker(long id, String symbol, long price, long item, LocalDateTime rowtime) {
            this.id = id;
            this.symbol = symbol;
            this.price = price;
            this.tax = tax;
            this.rowtime = rowtime;
        }
    }
}
image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市灼芭,隨后出現(xiàn)的幾起案子有额,更是在濱河造成了極大的恐慌,老刑警劉巖彼绷,帶你破解...
    沈念sama閱讀 206,311評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件巍佑,死亡現(xiàn)場離奇詭異,居然都是意外死亡寄悯,警方通過查閱死者的電腦和手機(jī)萤衰,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來猜旬,“玉大人脆栋,你說我怎么就攤上這事∪鞑粒” “怎么了椿争?”我有些...
    開封第一講書人閱讀 152,671評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長熟嫩。 經(jīng)常有香客問我秦踪,道長,這世上最難降的妖魔是什么邦危? 我笑而不...
    開封第一講書人閱讀 55,252評(píng)論 1 279
  • 正文 為了忘掉前任洋侨,我火速辦了婚禮,結(jié)果婚禮上倦蚪,老公的妹妹穿的比我還像新娘希坚。我一直安慰自己,他們只是感情好陵且,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評(píng)論 5 371
  • 文/花漫 我一把揭開白布裁僧。 她就那樣靜靜地躺著,像睡著了一般慕购。 火紅的嫁衣襯著肌膚如雪聊疲。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,031評(píng)論 1 285
  • 那天沪悲,我揣著相機(jī)與錄音获洲,去河邊找鬼隘竭。 笑死帆精,一個(gè)胖子當(dāng)著我的面吹牛君仆,可吹牛的內(nèi)容都是我干的铃岔。 我是一名探鬼主播,決...
    沈念sama閱讀 38,340評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼门岔,長吁一口氣:“原來是場噩夢啊……” “哼爱致!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起寒随,我...
    開封第一講書人閱讀 36,973評(píng)論 0 259
  • 序言:老撾萬榮一對情侶失蹤糠悯,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后妻往,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體互艾,經(jīng)...
    沈念sama閱讀 43,466評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評(píng)論 2 323
  • 正文 我和宋清朗相戀三年蒲讯,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了忘朝。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片灰署。...
    茶點(diǎn)故事閱讀 38,039評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡判帮,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出溉箕,到底是詐尸還是另有隱情晦墙,我是刑警寧澤,帶...
    沈念sama閱讀 33,701評(píng)論 4 323
  • 正文 年R本政府宣布肴茄,位于F島的核電站晌畅,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏寡痰。R本人自食惡果不足惜抗楔,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望拦坠。 院中可真熱鬧连躏,春花似錦、人聲如沸贞滨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽晓铆。三九已至勺良,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間骄噪,已是汗流浹背尚困。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留链蕊,地道東北人事甜。 一個(gè)月前我還...
    沈念sama閱讀 45,497評(píng)論 2 354
  • 正文 我出身青樓忙芒,卻偏偏與公主長得像,于是被迫代替她去往敵國和親讳侨。 傳聞我的和親對象是個(gè)殘疾皇子呵萨,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容