Apache Flink——數(shù)據(jù)源算子(Source)

前言

Flink 可以從各種來源獲取數(shù)據(jù)箫老,然后構(gòu)建 DataStream 進(jìn)行轉(zhuǎn)換處理析桥。一般將數(shù)據(jù)的輸入來源稱為數(shù)據(jù)源(data source)涕蚤,而讀取數(shù)據(jù)的算子就是源算子(source operator)地来。所以,source就是我們整個處理程序的輸入端幌蚊。

Flink 代碼中通用的添加 source 的方式,是調(diào)用執(zhí)行環(huán)境的 addSource()方法:

DataStream<String> stream = env.addSource(...);

方法傳入一個對象參數(shù)溃卡,需要實現(xiàn) SourceFunction 接口溢豆;返回 DataStreamSource。這里的 DataStreamSource 類繼承自 SingleOutputStreamOperator 類瘸羡,又進(jìn)一步繼承自 DataStream漩仙。所以很明顯,讀取數(shù)據(jù)的 source 操作是一個算子犹赖,得到的是一個數(shù)據(jù)流(DataStream)队他。

傳入的參數(shù)是一個“源函數(shù)”(source function),需要實現(xiàn)SourceFunction 接口峻村。

Flink 直接提供了很多預(yù)實現(xiàn)的接口麸折,此外還有很多外部連接工具也幫我們實現(xiàn)了對應(yīng)的 source function,通常情況下足以應(yīng)對我們的實際需求粘昨。

Flink 已實現(xiàn)的Source:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/overview/

準(zhǔn)備工作

為了更好地理解垢啼,我們先構(gòu)建一個實際應(yīng)用場景。比如網(wǎng)站的訪問操作张肾,可以抽象成一個三元組(用戶名芭析,用戶訪問的 urrl,用戶訪問 url 的時間戳)吞瞪,所以在這里馁启,我們可以創(chuàng)建一個類 Event,將用戶行為包裝成它的一個對象芍秆。Event 包含了以下一些字段惯疙。

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Event {

    private String user;

    private String url;

    private Long timestamp;
    
}

導(dǎo)入相關(guān)maven依賴

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.15.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.15.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>1.15.0</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.36</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.36</version>
    </dependency>

    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.17.2</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.24</version>
    </dependency>
</dependencies>

一、從集合中讀取數(shù)據(jù)

public class SourceTest {

    public static void main(String[] args) throws Exception {
        readCollection();
    }

    /**
     * 從集合中讀取數(shù)據(jù)
     * @throws Exception
     */
    private static void readCollection() throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        List<Event> list = new ArrayList<>();
        list.add(new Event("Mary","./home",1000L));
        list.add(new Event("Bob","./cart",2000L));
        //從集合中讀取數(shù)據(jù)
        DataStreamSource<Event> dataStream = env.fromCollection(list);

        dataStream.print();

        env.execute();
    }
}

二浪听、從元素中讀取數(shù)據(jù)

public class SourceTest {

    public static void main(String[] args) throws Exception {
        readElement();
    }

    /**
     * 從元素中讀取數(shù)據(jù)
     * @throws Exception
     */
    private static void readElement() throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //從元素中讀取數(shù)據(jù)
        DataStreamSource<Event> dataStream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        dataStream.print();

        env.execute();
    }
}

三螟碎、從文件中讀取數(shù)據(jù)

真正的實際應(yīng)用中,自然不會直接將數(shù)據(jù)寫在代碼中迹栓。通常情況下掉分,我們會從存儲介質(zhì)中獲取數(shù)據(jù)俭缓,一個比較常見的方式就是讀取日志文件。這也是批處理中最常見的讀取方式酥郭。

說明:

  • 參數(shù)可以是目錄华坦,也可以是文件;
  • 路徑可以是相對路徑不从,也可以是絕對路徑惜姐;
  • 相對路徑是從系統(tǒng)屬性 user.dir 獲取路徑: idea 下是 project 的根目錄, standalone 模式
    下是集群節(jié)點根目錄;
  • 也可以從 hdfs 目錄下讀取, 使用路徑 hdfs://..., 由于 Flink 沒有提供 hadoop 相關(guān)依賴,
    需要 pom 中添加相關(guān)依賴:
public class SourceTest {

    public static void main(String[] args) throws Exception {
        readFile();
    }

    /**
     * 從文件中讀取數(shù)據(jù)
     * @throws Exception
     */
    private static void readFile() throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //從文件中讀取數(shù)據(jù)
        DataStreamSource<String> dataStream = env.readTextFile("src/main/resources/clicks.txt");

        dataStream.print();

        env.execute();
    }
}

四椿息、從hdfs中讀取數(shù)據(jù)

引入依賴

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.3.3</version>
</dependency>
public class SourceTest {

    public static void main(String[] args) throws Exception {
        readHdfs();
    }
    
    /**
     * 從hdfs中讀取數(shù)據(jù)
     * @throws Exception
     */
    private static void readHdfs() throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //讀取hdfs文件路徑
        DataStreamSource<String> hdfsSource = env.readTextFile("hdfs://192.168.111.188:8020/input/README.txt");

        //將hdfs文件路徑打印輸出
        hdfsSource.print();

        env.execute();
    }   
}

五歹袁、從socket中讀取數(shù)據(jù)

public class SourceTest {

    public static void main(String[] args) throws Exception {
        readSocket();
    }

    /**
     * 從socket中讀取數(shù)據(jù)
     * @throws Exception
     */
    private static void readSocket() throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //讀取文本流, 監(jiān)聽linux主機(jī)端口, linux通過nc -lk 7777發(fā)送文本
        DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.111.188",7777);

        dataStreamSource.print();

        env.execute();
    }
}

六、從kafka中讀取數(shù)據(jù)

Kafka 作為分布式消息傳輸隊列寝优,是一個高吞吐条舔、易于擴(kuò)展的消息系統(tǒng)。而消息隊列的傳輸方式乏矾,恰恰和流處理是完全一致的孟抗。所以可以說 Kafka 和 Flink 天生一對,是當(dāng)前處理流式數(shù)據(jù)的雙子星钻心。在如今的實時流處理應(yīng)用中凄硼,由 Kafka 進(jìn)行數(shù)據(jù)的收集和傳輸,F(xiàn)link 進(jìn)行分析計算捷沸,這樣的架構(gòu)已經(jīng)成為眾多企業(yè)的首選摊沉。

Flink官方提供了連接工具flink-connector-kafka,直接幫我們實現(xiàn)了一個消費者FlinkKafkaConsumer亿胸,它就是用來讀取 Kafka 數(shù)據(jù)的SourceFunction坯钦。

所以想要以 Kafka 作為數(shù)據(jù)源獲取數(shù)據(jù),我們只需要引入 Kafka 連接器的依賴侈玄。Flink 官方提供的是一個通用的 Kafka 連接器婉刀,它會自動跟蹤最新版本的 Kafka 客戶端。目前最新版本只支持 0.10.0 版本以上的 Kafka序仙,讀者使用時可以根據(jù)自己安裝的 Kafka 版本選定連接器的依賴版本突颊。這里我們需要導(dǎo)入的依賴如下。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.15.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hadoop-compatibility_2.12</artifactId>
    <version>1.15.0</version>
</dependency>
public class SourceTest {

    public static void main(String[] args) throws Exception {
        readKafka();
    }

    /**
     * 從kafka中讀取數(shù)據(jù)
     * @throws Exception
     */
    private static void readKafka() throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.111.188:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");
        DataStreamSource<String> kafkaDataStream = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties));

        kafkaDataStream.print();

        env.execute();
    }
}

創(chuàng)建 FlinkKafkaConsumer 時需要傳入三個參數(shù):

  • 第一個參數(shù) topic潘悼,定義了從哪些主題中讀取數(shù)據(jù)律秃。可以是一個 topic治唤,也可以是 topic
    列表棒动,還可以是匹配所有想要讀取的 topic 的正則表達(dá)式。當(dāng)從多個 topic 中讀取數(shù)據(jù)
    時宾添,Kafka 連接器將會處理所有 topic 的分區(qū)船惨,將這些分區(qū)的數(shù)據(jù)放到一條流中去柜裸。
  • 第二個參數(shù)是一個 DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消
    息被存儲為原始的字節(jié)數(shù)據(jù)粱锐,所以需要反序列化成 Java 或者 Scala 對象疙挺。上面代碼中
    使用的 SimpleStringSchema,是一個內(nèi)置的 DeserializationSchema怜浅,它只是將字節(jié)數(shù)
    組簡單地反序列化成字符串铐然。DeserializationSchema 和 KeyedDeserializationSchema 是公共接口,所以我們也可以自定義反序列化邏輯恶座。
  • 第三個參數(shù)是一個 Properties 對象搀暑,設(shè)置了 Kafka 客戶端的一些屬性。

七跨琳、從Pulsar中讀取數(shù)據(jù)

隨著數(shù)據(jù)日益膨脹险掀,采用事件流處理數(shù)據(jù)至關(guān)重要。Apache Flink 將批流處理統(tǒng)一到計算引擎中湾宙,提供了一致化的編程接口。Apache Pulsar(與 Apache BookKeeper 一起)以 "流 "的方式統(tǒng)一數(shù)據(jù)冈绊。在 Pulsar 中侠鳄,數(shù)據(jù)存儲成一個副本,以流(streaming)(通過 pub-sub 接口)和 segment(用于批處理)的方式進(jìn)行訪問死宣。Pulsar 解決了企業(yè)在使用不同的存儲和消息技術(shù)解決方案時遇到的數(shù)據(jù)孤島問題伟恶。

Flink 可以直接與 Pulsar broker 進(jìn)行實時的流式讀寫,同時 Flink 也可以批量讀取 Pulsar 底層離線存儲毅该,與 BookKeeper 的內(nèi)容進(jìn)行批次讀寫博秫。同時支持批流,使得 Pulsar 和 Flink 先天就是契合的伙伴眶掌。把 Flink 和 Pulsar 結(jié)合使用挡育,這兩種開源技術(shù)可以創(chuàng)建一個統(tǒng)一的數(shù)據(jù)架構(gòu),為實時數(shù)據(jù)驅(qū)動企業(yè)提供最佳解決方案朴爬。

為了將 Pulsar 與 Flink 的功能進(jìn)行整合即寒,為用戶提供更強(qiáng)大的開發(fā)能力,StreamNative 開發(fā)并開源了 Pulsar Flink Connector召噩。經(jīng)過多次的打磨母赵,Pulsar Flink Connector 已合并進(jìn) Flink 代碼倉庫,并在 Flink 1.14.0 及其之后版本中發(fā)布具滴!

Pulsar Flink Connector 基于 Apache Pulsar 和 Apache Flink 提供彈性數(shù)據(jù)處理凹嘲,允許 Apache Flink 讀寫 Apache Pulsar 中的數(shù)據(jù)。使用 Pulsar Flink Connector构韵,企業(yè)能夠更專注于業(yè)務(wù)邏輯周蹭,無需關(guān)注存儲問題趋艘。

引入依賴

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-pulsar</artifactId>
    <version>1.15.0</version>
</dependency>
public class SourceTest {

    public static void main(String[] args) throws Exception {
        readPulsar();
    }

    /**
     * 從Pulsar中讀取數(shù)據(jù)
     * @throws Exception
     */
    private static void readPulsar() throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2. 添加source源, 用于讀取數(shù)據(jù) pulsar
        String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
        String adminUrl = "http://192.168.23.111:8080,192.168.23.112:8080,192.168.23.113:8080";
        String topic = "persistent://my-tenant/my-ns/my-partitioned-topic";

        PulsarSource<String> pulsarSource = PulsarSource.builder()
                .setServiceUrl(serviceUrl)
                .setAdminUrl(adminUrl)
                .setStartCursor(StartCursor.earliest())
                .setTopics(topic)
                .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
                .setSubscriptionName("my-subscription")
                .setSubscriptionType(SubscriptionType.Exclusive)
                .build();
        DataStreamSource<String> streamSource = env.fromSource(pulsarSource, WatermarkStrategy.noWatermarks(),"Pulsar Source");

        streamSource.print();

        env.execute();
    }
}

八、自定義 Source

大多數(shù)情況下谷醉,前面的數(shù)據(jù)源已經(jīng)能夠滿足需要致稀。但是凡事總有例外,如果遇到特殊情況俱尼,我們想要讀取的數(shù)據(jù)源來自某個外部系統(tǒng)抖单,而 flink 既沒有預(yù)實現(xiàn)的方法、也沒有提供連接器遇八,又該怎么辦呢矛绘?

那就只好自定義實現(xiàn) SourceFunction 了。

Flink還提供了數(shù)據(jù)源接口刃永,實現(xiàn)該接口就可以實現(xiàn)自定義數(shù)據(jù)源货矮,不同的接口有不同的功能,分類如下:

  • SourceFunction:非并行數(shù)據(jù)源(并行度只能=1)
  • ParallelSourceFunction:并行數(shù)據(jù)源(并行度能夠>=l)
  • RichSourceFunction:多功能非并行數(shù)據(jù)源(并行度只能=1)
  • RichParallelSourceFunction:多功能并行數(shù)據(jù)源(并行度能夠>=1)

接下來我們創(chuàng)建一個自定義的數(shù)據(jù)源斯够,實現(xiàn) SourceFunction 接口囚玫。主要重寫兩個關(guān)鍵方法:run()和 cancel()。

  • run()方法:使用運行時上下文對象(SourceContext)向下游發(fā)送數(shù)據(jù)读规;
  • cancel()方法:通過標(biāo)識位控制退出循環(huán)抓督,來達(dá)到中斷數(shù)據(jù)源的效果。

8.1 SourceFunction——單并行度Source

實現(xiàn)代碼

public class ClickSource implements SourceFunction<Event> {

    //聲明一個標(biāo)志位
    private boolean running = true;

    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        //隨機(jī)生成數(shù)據(jù)
        Random random = new Random();
        //定義字段選取的數(shù)據(jù)集
        String[] users = {"Mary", "Alice", "Bobo", "lucy"};
        String[] urls = {"./home", "./cart", "./prod", "./order"};

        //循環(huán)生成數(shù)據(jù)
        while (running){
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            long timestamp = System.currentTimeMillis();
            sourceContext.collect(new Event(user,url,timestamp));

            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

調(diào)用代碼

public class SourceCustomTest {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> customDataStream = env.addSource(new ClickSource());

        parallelDataStream.print();

        env.execute();
    }
}

注意:SourceFunction 接口定義的數(shù)據(jù)源束亏,并行度只能設(shè)置為 1铃在,如果數(shù)據(jù)源設(shè)置為大于 1 的并行度,則會拋出異常碍遍。

8.2 ParallelSourceFunction——多并行度Source

實現(xiàn)代碼

public class ParallelClickSource implements ParallelSourceFunction<Event> {

    //聲明一個標(biāo)志位
    private boolean running = true;

    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        //隨機(jī)生成數(shù)據(jù)
        Random random = new Random();
        //定義字段選取的數(shù)據(jù)集
        String[] users = {"Mary", "Alice", "Bobo", "lucy"};
        String[] urls = {"./home", "./cart", "./prod", "./order"};

        //循環(huán)生成數(shù)據(jù)
        while (running){
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            long timestamp = System.currentTimeMillis();
            sourceContext.collect(new Event(user,url,timestamp));

            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

調(diào)用代碼

public class SourceCustomTest {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> parallelDataStream = env.addSource(new ParallelClickSource()).setParallelism(2);

        parallelDataStream.print();

        env.execute();
    }
}

RichSourceFunction定铜、RichParallelSourceFunction提供了外部連接的open和close方法以及運行時的getRuntimeContext等方法

8.3 RichSourceFunction——多功能非并行Source

實現(xiàn)代碼

public class ClickRichSource extends RichSourceFunction<Event> {

    //聲明一個標(biāo)志位
    private boolean running = true;

    @Override
    public void open(Configuration parameters) throws Exception {
        RuntimeContext runtimeContext = getRuntimeContext();
        String taskName = runtimeContext.getTaskName();
        int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
        System.out.println(taskName + "-" + indexOfThisSubtask);
    }

    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        //隨機(jī)生成數(shù)據(jù)
        Random random = new Random();
        //定義字段選取的數(shù)據(jù)集
        String[] users = {"Mary", "Alice", "Bobo", "lucy"};
        String[] urls = {"./home", "./cart", "./prod", "./order"};

        //循環(huán)生成數(shù)據(jù)
        while (running){
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            long timestamp = System.currentTimeMillis();
            Event event = new Event(user, url, timestamp);
            System.out.println(event.toString());
            sourceContext.collect(event);

            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }

    @Override
    public void close() throws Exception {
        System.out.println("關(guān)閉資源.....");
    }
}

調(diào)用代碼

public class SourceCustomTest {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> RichDataStream = env.addSource(new ClickRichSource());

        parallelDataStream.print();

        env.execute();
    }
}

8.4 RichParallelSourceFunction——多功能并行Source

實現(xiàn)代碼

public class RichParallelClickSource extends RichParallelSourceFunction<Event> {

    //聲明一個標(biāo)志位
    private boolean running = true;


    @Override
    public void open(Configuration parameters) throws Exception {
        RuntimeContext runtimeContext = getRuntimeContext();
        String taskName = runtimeContext.getTaskName();
        int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
        System.out.println(taskName + "-" + indexOfThisSubtask);
    }

    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        //隨機(jī)生成數(shù)據(jù)
        Random random = new Random();
        //定義字段選取的數(shù)據(jù)集
        String[] users = {"Mary", "Alice", "Bobo", "lucy"};
        String[] urls = {"./home", "./cart", "./prod", "./order"};

        //循環(huán)生成數(shù)據(jù)
        while (running){
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            long timestamp = System.currentTimeMillis();
            sourceContext.collect(new Event(user,url,timestamp));

            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }

    @Override
    public void close() throws Exception {
        System.out.println("關(guān)閉資源.....");
    }
}

調(diào)用代碼

public class SourceCustomTest {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> RichDataStream = env.addSource(new RichParallelClickSource()).setParallelism(2);

        parallelDataStream.print();

        env.execute();
    }
}

8.5 從MySQL實時加載數(shù)據(jù)

實現(xiàn)代碼

public class MySQLSource extends RichParallelSourceFunction<Student> {

    private boolean flag = true;
    private Connection conn;
    private PreparedStatement statement;
    private ResultSet resultSet;

    @Override
    public void open(Configuration parameters) throws Exception {
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "123456");
        String sql = "select id, name, age from t_student";
        statement = conn.prepareStatement(sql);
        super.open(parameters);
    }

    @Override
    public void run(SourceContext<Student> ctx) throws Exception {
        while (flag) {
            resultSet = statement.executeQuery();
            while (resultSet.next()){
                String id = resultSet.getString("id");
                String name = resultSet.getString("name");
                Integer age = resultSet.getInt("age");
                ctx.collect(new Student(id, name, age));
            }
            Thread.sleep(3000);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }

    @Override
    public void close() throws Exception {
        if(conn != null) conn.close();
        if(statement != null) statement.close();
        if(resultSet != null) resultSet.close();
    }
}

調(diào)用代碼

public class SourceCustomTest {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        DataStreamSource<Student> parallelDataStream = env.addSource(new MySQLSource()).setParallelism(4);

        parallelDataStream.print();

        env.execute();
    }
}

參考:
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/datastream/pulsar/

https://blog.csdn.net/weixin_47491957/article/details/124317150

https://blog.csdn.net/weixin_45417821/article/details/124143407

https://blog.csdn.net/weixin_45417821/article/details/124145083

https://blog.csdn.net/weixin_45417821/article/details/124146085

https://blog.csdn.net/weixin_45417821/article/details/124147285

https://segmentfault.com/a/1190000041048040

https://blog.csdn.net/qq_41924766/article/details/130681921

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市怕敬,隨后出現(xiàn)的幾起案子揣炕,更是在濱河造成了極大的恐慌,老刑警劉巖东跪,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件祝沸,死亡現(xiàn)場離奇詭異,居然都是意外死亡越庇,警方通過查閱死者的電腦和手機(jī)罩锐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來卤唉,“玉大人涩惑,你說我怎么就攤上這事∩G” “怎么了竭恬?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵跛蛋,是天一觀的道長。 經(jīng)常有香客問我痊硕,道長赊级,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任岔绸,我火速辦了婚禮理逊,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘盒揉。我一直安慰自己晋被,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布刚盈。 她就那樣靜靜地躺著羡洛,像睡著了一般。 火紅的嫁衣襯著肌膚如雪藕漱。 梳的紋絲不亂的頭發(fā)上欲侮,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天,我揣著相機(jī)與錄音肋联,去河邊找鬼锈麸。 笑死,一個胖子當(dāng)著我的面吹牛牺蹄,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播薄翅,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼沙兰,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了翘魄?” 一聲冷哼從身側(cè)響起鼎天,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎暑竟,沒想到半個月后斋射,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡但荤,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年罗岖,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片腹躁。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡桑包,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出纺非,到底是詐尸還是另有隱情哑了,我是刑警寧澤赘方,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站弱左,受9級特大地震影響窄陡,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜拆火,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一跳夭、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧榜掌,春花似錦优妙、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至胞皱,卻和暖如春邪意,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背反砌。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工雾鬼, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人宴树。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓策菜,卻偏偏與公主長得像,于是被迫代替她去往敵國和親酒贬。 傳聞我的和親對象是個殘疾皇子又憨,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容