[Flink BroadcastStream]Flink實(shí)戰(zhàn)廣播流之BroadcastStream

ApacheFlink.jpg

廣播狀態(tài)被引入以支持這樣的用例:來自一個流的一些數(shù)據(jù)需要廣播到所有下游任務(wù)董虱,在那里它被本地存儲,并用于處理另一個流上的所有傳入元素。

作為廣播狀態(tài)自然適合出現(xiàn)的一個例子芒涡,我們可以想象一個低吞吐量流摩梧,其中包含一組規(guī)則物延,我們希望根據(jù)來自另一個流的所有元素對這些規(guī)則進(jìn)行評估〗龈福考慮到上述類型的用例叛薯,廣播狀態(tài)與其他操作符狀態(tài)的區(qū)別在于:
(1)它是一個map格式
(2)它只對輸入有廣播流和無廣播流的特定操作符可用
(3)這樣的操作符可以具有具有不同名稱的多個廣播狀態(tài)。

一條流需要根據(jù)規(guī)則或配置處理數(shù)據(jù)笙纤,而規(guī)則或配置又是隨時(shí)變化的耗溜。此時(shí),就可將規(guī)則或配置作為廣播流廣播出去省容,并以Broadcast State的形式存儲在下游Task中抖拴。下游Task根據(jù)Broadcast State中的規(guī)則或配置來處理常規(guī)流中的數(shù)據(jù)。

場景舉例:

動態(tài)更新計(jì)算規(guī)則: 如事件流需要根據(jù)最新的規(guī)則進(jìn)行計(jì)算腥椒,則可將規(guī)則作為廣播狀態(tài)廣播到下游Task中阿宅。
實(shí)時(shí)增加額外字段: 如事件流需要實(shí)時(shí)增加用戶的基礎(chǔ)信息候衍,則可將用戶的基礎(chǔ)信息作為廣播狀態(tài)廣播到下游Task中。

注意:

Broadcast State是Map類型家夺,即K-V類型脱柱。
Broadcast State只有在廣播的一側(cè),即在BroadcastProcessFunction或KeyedBroadcastProcessFunction的processBroadcastElement方法中可以修改。在非廣播的一側(cè)拉馋,即在BroadcastProcessFunction或KeyedBroadcastProcessFunction的processElement方法中只讀榨为。

Broadcast State中元素的順序,在各Task中可能不同煌茴∷婀耄基于順序的處理,需要注意蔓腐。

Broadcast State在Checkpoint時(shí)矩乐,每個Task都會Checkpoint廣播狀態(tài)。

Broadcast State在運(yùn)行時(shí)保存在內(nèi)存中回论,目前還不能保存在RocksDB State Backend中散罕。

使用場景:

在處理數(shù)據(jù)的時(shí)候,有些配置是要實(shí)時(shí)動態(tài)改變的,比如說我要過濾一些關(guān)鍵字傀蓉,這些關(guān)鍵字呢是在MYSQL里隨時(shí)配置修改的欧漱,那我們在高吞吐計(jì)算的Function中動態(tài)查詢配置文件有可能使整個計(jì)算阻塞,甚至任務(wù)停止葬燎。
廣播流可以通過查詢配置文件误甚,廣播到某個 operator 的所有并發(fā)實(shí)例中,然后與另一條流數(shù)據(jù)連接進(jìn)行計(jì)算谱净。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.istudy</groupId>
    <artifactId>HaiFlink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.12.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- redis -->
        <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

        <!-- ElasticSearch7 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
            <version>1.12.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.19</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>


        <!--fastjson-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>
    </dependencies>
</project>
package com.istudy.broadcast;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * Author: Wang Pei
 * Summary:
 *   基于Broadcast State 動態(tài)更新配置以實(shí)現(xiàn)實(shí)時(shí)過濾數(shù)據(jù)并增加字段
 */
@Slf4j
public class TestBroadcastState {
    public static void main(String[] args) throws Exception{

        //1窑邦、解析命令行參數(shù)
        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("applicationProperties"));

        //checkpoint配置
        String checkpointDirectory = parameterTool.getRequired("checkpointDirectory");
        long checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval");

        //事件流配置
        String fromKafkaBootstrapServers = parameterTool.getRequired("fromKafka.bootstrap.servers");
        String fromKafkaGroupID = parameterTool.getRequired("fromKafka.group.id");
        String fromKafkaTopic = parameterTool.getRequired("fromKafka.topic");

        //配置流配置
        String fromMysqlHost = parameterTool.getRequired("fromMysql.host");
        int fromMysqlPort = parameterTool.getInt("fromMysql.port");
        String fromMysqlDB = parameterTool.getRequired("fromMysql.db");
        String fromMysqlUser = parameterTool.getRequired("fromMysql.user");
        String fromMysqlPasswd = parameterTool.getRequired("fromMysql.passwd");
        int fromMysqlSecondInterval = parameterTool.getInt("fromMysql.secondInterval");

        //2、配置運(yùn)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設(shè)置StateBackend
        env.setStateBackend((StateBackend) new FsStateBackend(checkpointDirectory, true));
        //設(shè)置Checkpoint
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointInterval(checkpointSecondInterval * 1000);
        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //3壕探、Kafka事件流
        //從Kafka中獲取事件數(shù)據(jù)
        //數(shù)據(jù):某個用戶在某個時(shí)刻瀏覽或點(diǎn)擊了某個商品,如
        //{"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers",fromKafkaBootstrapServers);
        kafkaProperties.put("group.id",fromKafkaGroupID);

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(fromKafkaTopic, new SimpleStringSchema(), kafkaProperties);
        kafkaConsumer.setStartFromLatest();
        DataStream<String> kafkaSource = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id-kafka-source");

        SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> eventStream = kafkaSource.process(new ProcessFunction<String, Tuple4<String, String, String, Integer>>() {
            @Override
            public void processElement(String value, Context ctx, Collector<Tuple4<String, String, String, Integer>> out){
                try {
                    JSONObject obj = JSON.parseObject(value);
                    String userID = obj.getString("userID");
                    String eventTime = obj.getString("eventTime");
                    String eventType = obj.getString("eventType");
                    int productID = obj.getIntValue("productID");
                    out.collect(new Tuple4<>(userID, eventTime, eventType, productID));
                }catch (Exception ex){
//                    log.warn("異常數(shù)據(jù):{}",value,ex);
                }
            }
        });

        //4冈钦、Mysql配置流
        //自定義Mysql Source,周期性地從Mysql中獲取配置李请,并廣播出去
        //數(shù)據(jù): 用戶ID,用戶姓名瞧筛,用戶年齡
        DataStreamSource<HashMap<String, Tuple2<String, Integer>>> configStream = env.addSource(new MysqlSource(fromMysqlHost, fromMysqlPort, fromMysqlDB, fromMysqlUser, fromMysqlPasswd, fromMysqlSecondInterval));

        /*
          (1) 先建立MapStateDescriptor
          MapStateDescriptor定義了狀態(tài)的名稱、Key和Value的類型捻艳。
          這里,MapStateDescriptor中庆猫,key是Void類型认轨,value是Map<String, Tuple2<String,Int>>類型。
         */
        MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> configDescriptor = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));

        /*
          (2) 將配置流廣播月培,形成BroadcastStream
         */
        BroadcastStream<HashMap<String, Tuple2<String, Integer>>> broadcastConfigStream = configStream.broadcast(configDescriptor);

        //5嘁字、事件流和廣播的配置流連接恩急,形成BroadcastConnectedStream
        BroadcastConnectedStream<Tuple4<String, String, String, Integer>, HashMap<String, Tuple2<String, Integer>>> connectedStream = eventStream.connect(broadcastConfigStream);

        //6、對BroadcastConnectedStream應(yīng)用process方法纪蜒,根據(jù)配置(規(guī)則)處理事件
        SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> resultStream = connectedStream.process(new CustomBroadcastProcessFunction());

        //7衷恭、輸出結(jié)果
        resultStream.print();

        //8、生成JobGraph纯续,并開始執(zhí)行
        env.execute();

    }

    /**
     * 自定義BroadcastProcessFunction
     * 當(dāng)事件流中的用戶ID在配置中出現(xiàn)時(shí)随珠,才對該事件處理, 并在事件中補(bǔ)全用戶的基礎(chǔ)信息
     * Tuple4<String, String, String, Integer>: 第一個流(事件流)的數(shù)據(jù)類型
     * HashMap<String, Tuple2<String, Integer>>: 第二個流(配置流)的數(shù)據(jù)類型
     * Tuple6<String, String, String, Integer,String, Integer>: 返回的數(shù)據(jù)類型
     */
    static class CustomBroadcastProcessFunction extends BroadcastProcessFunction<Tuple4<String, String, String, Integer>, HashMap<String, Tuple2<String, Integer>>, Tuple6<String, String, String, Integer, String, Integer>>{

        /**定義MapStateDescriptor*/
        MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> configDescriptor = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));

        /**
         * 讀取狀態(tài),并基于狀態(tài)猬错,處理事件流中的數(shù)據(jù)
         * 在這里窗看,從上下文中獲取狀態(tài),基于獲取的狀態(tài)倦炒,對事件流中的數(shù)據(jù)進(jìn)行處理
         * @param value 事件流中的數(shù)據(jù)
         * @param ctx 上下文
         * @param out 輸出零條或多條數(shù)據(jù)
         * @throws Exception
         */
        @Override
        public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {

            //事件流中的用戶ID
            String userID = value.f0;

            //獲取狀態(tài)
            ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(configDescriptor);
            Map<String, Tuple2<String, Integer>> broadcastStateUserInfo = broadcastState.get(null);

            //配置中有此用戶显沈,則在該事件中添加用戶的userName、userAge字段逢唤。
            //配置中沒有此用戶拉讯,則丟棄
            Tuple2<String, Integer> userInfo = broadcastStateUserInfo.get(userID);
            if(userInfo!=null){
                out.collect(new Tuple6<>(value.f0,value.f1,value.f2,value.f3,userInfo.f0,userInfo.f1));
            }

        }

        /**
         * 處理廣播流中的每一條數(shù)據(jù),并更新狀態(tài)
         * @param value 廣播流中的數(shù)據(jù)
         * @param ctx 上下文
         * @param out 輸出零條或多條數(shù)據(jù)
         * @throws Exception
         */
        @Override
        public void processBroadcastElement(HashMap<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {

            //獲取狀態(tài)
            BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(configDescriptor);

            //清空狀態(tài)
            broadcastState.clear();

            //更新狀態(tài)
            broadcastState.put(null,value);

        }
    }



    /**
     * 自定義Mysql Source鳖藕,每隔 secondInterval 秒從Mysql中獲取一次配置
     */
    static class MysqlSource extends RichSourceFunction<HashMap<String, Tuple2<String, Integer>>> {

        private String host;
        private Integer port;
        private String db;
        private String user;
        private String passwd;
        private Integer secondInterval;

        private volatile boolean isRunning = true;

        private Connection connection;
        private PreparedStatement preparedStatement;

        MysqlSource(String host, Integer port, String db, String user, String passwd,Integer secondInterval) {
            this.host = host;
            this.port = port;
            this.db = db;
            this.user = user;
            this.passwd = passwd;
            this.secondInterval = secondInterval;
        }

        /**
         * 開始時(shí), 在open()方法中建立連接
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            Class.forName("com.mysql.jdbc.Driver");
            connection= DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/"+db+"?useUnicode=true&characterEncoding=UTF-8", user, passwd);
            String sql="select userID,userName,userAge from user_info";
            preparedStatement=connection.prepareStatement(sql);
        }

        /**
         * 執(zhí)行完魔慷,調(diào)用close()方法關(guān)系連接,釋放資源
         * @throws Exception
         */
        @Override
        public void close() throws Exception {
            super.close();

            if(connection!=null){
                connection.close();
            }

            if(preparedStatement !=null){
                preparedStatement.close();
            }
        }

        /**
         * 調(diào)用run()方法獲取數(shù)據(jù)
         * @param ctx
         */
        @Override
        public void run(SourceContext<HashMap<String, Tuple2<String, Integer>>> ctx) {
            try {
                while (isRunning){
                    HashMap<String, Tuple2<String, Integer>> output = new HashMap<>();
                    ResultSet resultSet = preparedStatement.executeQuery();
                    while (resultSet.next()){
                        String userID = resultSet.getString("userID");
                        String userName = resultSet.getString("userName");
                        int userAge = resultSet.getInt("userAge");
                        output.put(userID,new Tuple2<>(userName,userAge));
                    }
                    ctx.collect(output);
                    //每隔多少秒執(zhí)行一次查詢
                    Thread.sleep(1000*secondInterval);
                }
            }catch (Exception ex){
//                log.error("從Mysql獲取配置異常...",ex);
            }


        }

        /**
         * 取消時(shí)吊奢,會調(diào)用此方法
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末盖彭,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子页滚,更是在濱河造成了極大的恐慌召边,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,743評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件裹驰,死亡現(xiàn)場離奇詭異隧熙,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)幻林,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,296評論 3 385
  • 文/潘曉璐 我一進(jìn)店門贞盯,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人沪饺,你說我怎么就攤上這事躏敢。” “怎么了整葡?”我有些...
    開封第一講書人閱讀 157,285評論 0 348
  • 文/不壞的土叔 我叫張陵件余,是天一觀的道長。 經(jīng)常有香客問我,道長啼器,這世上最難降的妖魔是什么旬渠? 我笑而不...
    開封第一講書人閱讀 56,485評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮端壳,結(jié)果婚禮上告丢,老公的妹妹穿的比我還像新娘。我一直安慰自己损谦,他們只是感情好岖免,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,581評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著成翩,像睡著了一般觅捆。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上麻敌,一...
    開封第一講書人閱讀 49,821評論 1 290
  • 那天栅炒,我揣著相機(jī)與錄音,去河邊找鬼术羔。 笑死赢赊,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的级历。 我是一名探鬼主播释移,決...
    沈念sama閱讀 38,960評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼寥殖!你這毒婦竟也來了玩讳?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,719評論 0 266
  • 序言:老撾萬榮一對情侶失蹤嚼贡,失蹤者是張志新(化名)和其女友劉穎熏纯,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體粤策,經(jīng)...
    沈念sama閱讀 44,186評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡樟澜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,516評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了叮盘。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片秩贰。...
    茶點(diǎn)故事閱讀 38,650評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖柔吼,靈堂內(nèi)的尸體忽然破棺而出毒费,到底是詐尸還是另有隱情,我是刑警寧澤愈魏,帶...
    沈念sama閱讀 34,329評論 4 330
  • 正文 年R本政府宣布觅玻,位于F島的核電站艇棕,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏串塑。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,936評論 3 313
  • 文/蒙蒙 一北苟、第九天 我趴在偏房一處隱蔽的房頂上張望桩匪。 院中可真熱鬧,春花似錦友鼻、人聲如沸傻昙。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,757評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽妆档。三九已至,卻和暖如春虫碉,著一層夾襖步出監(jiān)牢的瞬間贾惦,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,991評論 1 266
  • 我被黑心中介騙來泰國打工敦捧, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留须板,地道東北人。 一個月前我還...
    沈念sama閱讀 46,370評論 2 360
  • 正文 我出身青樓兢卵,卻偏偏與公主長得像习瑰,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子秽荤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,527評論 2 349

推薦閱讀更多精彩內(nèi)容