flink學(xué)習(xí)之五-數(shù)據(jù)持久化to-mysql

flink中數(shù)據(jù)的落地粹污,是使用sink來(lái)處理的蜕着。

上面例子中已經(jīng)可以看到可以使用DataStream.addSink()方法來(lái)添加數(shù)據(jù)落地的目標(biāo)僵蛛,表示將數(shù)據(jù)輸出到對(duì)應(yīng)目的地互纯。

RichSinkFunction及它的爸爸們:

flink中的sink可以自定義實(shí)現(xiàn)瑟幕,一般需要繼承抽象類(lèi)RichSinkFunction,與數(shù)據(jù)源RichSourceFunction非常類(lèi)似,看下實(shí)現(xiàn)代碼:

package org.apache.flink.streaming.api.functions.sink;

import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.AbstractRichFunction;

@Public
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
    private static final long serialVersionUID = 1L;

    public RichSinkFunction() {
    }
}

可以看下只盹,依然是繼承AbstractRichFunction鼻忠,跟RichSourceFunction一樣旺聚,這里在之前文章中已經(jīng)看過(guò)榨了。

而主要的方法則是來(lái)自SinkFunction<IN>倒源,IN是一個(gè)輸入泛型,代表需要sink的數(shù)據(jù)類(lèi)型懦鼠∽炅ǎ看下SinkFunction的定義:

package org.apache.flink.streaming.api.functions.sink;

import java.io.Serializable;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.Function;

@Public
public interface SinkFunction<IN> extends Function, Serializable {
    /** @deprecated */
    @Deprecated
    default void invoke(IN value) throws Exception {
    }

    default void invoke(IN value, SinkFunction.Context context) throws Exception {
        this.invoke(value);
    }

    @Public
    public interface Context<T> {
        long currentProcessingTime();

        long currentWatermark();

        Long timestamp();
    }
}

可以看到屹堰,這里主要需要關(guān)注的是invoke方法肛冶。

RichSinkFunction的兒子

這里看下自己實(shí)現(xiàn)的mysql數(shù)據(jù)源:

package myflink.sinks;

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import myflink.manager.UrlInfoManager;
import myflink.model.UrlInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * sink,用于將數(shù)據(jù)沉淀存儲(chǔ)在不同的位置
 * 這里存儲(chǔ)在mysql中的url_info表
 */
@Slf4j
@NoArgsConstructor
@AllArgsConstructor
public class UrlMysqlSink extends RichSinkFunction<UrlInfo> implements ApplicationContextAware {

    private UrlInfoManager urlInfoManager;

    private ApplicationContext applicationContext;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        log.info("applicationContext=" + applicationContext);
        if (applicationContext == null) {
            init();
        }
    }

    @Override
    public void invoke(UrlInfo value, Context context) throws Exception {
        if (urlInfoManager == null) {
            init();
        }
        urlInfoManager.insert(value);
        log.info("---insert url info:", JSON.toJSONString(value));
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    private void init() {
        applicationContext = new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");
        urlInfoManager = (UrlInfoManager) applicationContext.getBean("urlInfoManager");
    }
}

可以看到扯键,這里主要是重寫(xiě)其中的兩個(gè)方法睦袖,open、invoke荣刑;在open中初始化spring容器馅笙、數(shù)據(jù)庫(kù)鏈接,在invoke中執(zhí)行具體的持久化邏輯厉亏。

使用UrlMySqlSink

數(shù)據(jù)來(lái)源依然來(lái)自kafka董习,復(fù)用之前的kafkaSender。

import java.util.Properties;

@Slf4j
public class KafkaUrlSinkJob {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("zookeeper.connect", "localhost:2181");
        properties.put("group.id", "metric-group");
        properties.put("auto.offset.reset", "latest");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        SingleOutputStreamOperator<UrlInfo> dataStreamSource = env.addSource(
                new FlinkKafkaConsumer010<String>(
                        "testjin",// topic
                        new SimpleStringSchema(),
                        properties
                )
        ).setParallelism(1)
                // map操作爱只,轉(zhuǎn)換皿淋,從一個(gè)數(shù)據(jù)流轉(zhuǎn)換成另一個(gè)數(shù)據(jù)流,這里是從string-->UrlInfo
                .map(string -> JSON.parseObject(string, UrlInfo.class))恬试;
            
       dataStreamSource.addSink(new UrlMysqlSink());
       dataStreamSource.addSink(new PrintSinkFunction<>());

        env.execute("save url to db");

直接在datasource中addSink即可窝趣。一個(gè)datasource可以同時(shí)添加多個(gè)sink。

注意训柴,dataSource是添加到StreamExecutionEnvironment實(shí)例上的哑舒,而sink則是直接添加到dataStreamSource上的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末幻馁,一起剝皮案震驚了整個(gè)濱河市洗鸵,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌仗嗦,老刑警劉巖膘滨,帶你破解...
    沈念sama閱讀 212,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異儒将,居然都是意外死亡吏祸,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,755評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)贡翘,“玉大人蹈矮,你說(shuō)我怎么就攤上這事∶” “怎么了泛鸟?”我有些...
    開(kāi)封第一講書(shū)人閱讀 158,369評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀(guān)的道長(zhǎng)踊东。 經(jīng)常有香客問(wèn)我北滥,道長(zhǎng),這世上最難降的妖魔是什么闸翅? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,799評(píng)論 1 285
  • 正文 為了忘掉前任再芋,我火速辦了婚禮,結(jié)果婚禮上坚冀,老公的妹妹穿的比我還像新娘济赎。我一直安慰自己,他們只是感情好记某,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,910評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布司训。 她就那樣靜靜地躺著,像睡著了一般液南。 火紅的嫁衣襯著肌膚如雪壳猜。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 50,096評(píng)論 1 291
  • 那天滑凉,我揣著相機(jī)與錄音统扳,去河邊找鬼。 笑死譬涡,一個(gè)胖子當(dāng)著我的面吹牛闪幽,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播涡匀,決...
    沈念sama閱讀 39,159評(píng)論 3 411
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼盯腌,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了陨瘩?” 一聲冷哼從身側(cè)響起腕够,我...
    開(kāi)封第一講書(shū)人閱讀 37,917評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎舌劳,沒(méi)想到半個(gè)月后帚湘,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,360評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡甚淡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,673評(píng)論 2 327
  • 正文 我和宋清朗相戀三年大诸,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,814評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡资柔,死狀恐怖焙贷,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情贿堰,我是刑警寧澤辙芍,帶...
    沈念sama閱讀 34,509評(píng)論 4 334
  • 正文 年R本政府宣布,位于F島的核電站羹与,受9級(jí)特大地震影響故硅,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜纵搁,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,156評(píng)論 3 317
  • 文/蒙蒙 一吃衅、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧诡渴,春花似錦捐晶、人聲如沸菲语。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)山上。三九已至眼耀,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間佩憾,已是汗流浹背哮伟。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,123評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留妄帘,地道東北人楞黄。 一個(gè)月前我還...
    沈念sama閱讀 46,641評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像抡驼,于是被迫代替她去往敵國(guó)和親鬼廓。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,728評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容