廣播狀態(tài)被引入以支持這樣的用例:來自一個流的一些數(shù)據(jù)需要廣播到所有下游任務(wù)董虱,在那里它被本地存儲,并用于處理另一個流上的所有傳入元素。
作為廣播狀態(tài)自然適合出現(xiàn)的一個例子芒涡,我們可以想象一個低吞吐量流摩梧,其中包含一組規(guī)則物延,我們希望根據(jù)來自另一個流的所有元素對這些規(guī)則進(jìn)行評估〗龈福考慮到上述類型的用例叛薯,廣播狀態(tài)與其他操作符狀態(tài)的區(qū)別在于:
(1)它是一個map格式
(2)它只對輸入有廣播流和無廣播流的特定操作符可用
(3)這樣的操作符可以具有具有不同名稱的多個廣播狀態(tài)。
一條流需要根據(jù)規(guī)則或配置處理數(shù)據(jù)笙纤,而規(guī)則或配置又是隨時(shí)變化的耗溜。此時(shí),就可將規(guī)則或配置作為廣播流廣播出去省容,并以Broadcast State的形式存儲在下游Task中抖拴。下游Task根據(jù)Broadcast State中的規(guī)則或配置來處理常規(guī)流中的數(shù)據(jù)。
場景舉例:
動態(tài)更新計(jì)算規(guī)則: 如事件流需要根據(jù)最新的規(guī)則進(jìn)行計(jì)算腥椒,則可將規(guī)則作為廣播狀態(tài)廣播到下游Task中阿宅。
實(shí)時(shí)增加額外字段: 如事件流需要實(shí)時(shí)增加用戶的基礎(chǔ)信息候衍,則可將用戶的基礎(chǔ)信息作為廣播狀態(tài)廣播到下游Task中。
注意:
Broadcast State是Map類型家夺,即K-V類型脱柱。
Broadcast State只有在廣播的一側(cè),即在BroadcastProcessFunction或KeyedBroadcastProcessFunction的processBroadcastElement方法中可以修改。在非廣播的一側(cè)拉馋,即在BroadcastProcessFunction或KeyedBroadcastProcessFunction的processElement方法中只讀榨为。
Broadcast State中元素的順序,在各Task中可能不同煌茴∷婀耄基于順序的處理,需要注意蔓腐。
Broadcast State在Checkpoint時(shí)矩乐,每個Task都會Checkpoint廣播狀態(tài)。
Broadcast State在運(yùn)行時(shí)保存在內(nèi)存中回论,目前還不能保存在RocksDB State Backend中散罕。
使用場景:
在處理數(shù)據(jù)的時(shí)候,有些配置是要實(shí)時(shí)動態(tài)改變的,比如說我要過濾一些關(guān)鍵字傀蓉,這些關(guān)鍵字呢是在MYSQL里隨時(shí)配置修改的欧漱,那我們在高吞吐計(jì)算的Function中動態(tài)查詢配置文件有可能使整個計(jì)算阻塞,甚至任務(wù)停止葬燎。
廣播流可以通過查詢配置文件误甚,廣播到某個 operator 的所有并發(fā)實(shí)例中,然后與另一條流數(shù)據(jù)連接進(jìn)行計(jì)算谱净。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.istudy</groupId>
<artifactId>HaiFlink</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.12.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- redis -->
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<!-- ElasticSearch7 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.12.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
</dependencies>
</project>
package com.istudy.broadcast;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Author: Wang Pei
* Summary:
* 基于Broadcast State 動態(tài)更新配置以實(shí)現(xiàn)實(shí)時(shí)過濾數(shù)據(jù)并增加字段
*/
@Slf4j
public class TestBroadcastState {
public static void main(String[] args) throws Exception{
//1窑邦、解析命令行參數(shù)
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("applicationProperties"));
//checkpoint配置
String checkpointDirectory = parameterTool.getRequired("checkpointDirectory");
long checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval");
//事件流配置
String fromKafkaBootstrapServers = parameterTool.getRequired("fromKafka.bootstrap.servers");
String fromKafkaGroupID = parameterTool.getRequired("fromKafka.group.id");
String fromKafkaTopic = parameterTool.getRequired("fromKafka.topic");
//配置流配置
String fromMysqlHost = parameterTool.getRequired("fromMysql.host");
int fromMysqlPort = parameterTool.getInt("fromMysql.port");
String fromMysqlDB = parameterTool.getRequired("fromMysql.db");
String fromMysqlUser = parameterTool.getRequired("fromMysql.user");
String fromMysqlPasswd = parameterTool.getRequired("fromMysql.passwd");
int fromMysqlSecondInterval = parameterTool.getInt("fromMysql.secondInterval");
//2、配置運(yùn)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設(shè)置StateBackend
env.setStateBackend((StateBackend) new FsStateBackend(checkpointDirectory, true));
//設(shè)置Checkpoint
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointInterval(checkpointSecondInterval * 1000);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//3壕探、Kafka事件流
//從Kafka中獲取事件數(shù)據(jù)
//數(shù)據(jù):某個用戶在某個時(shí)刻瀏覽或點(diǎn)擊了某個商品,如
//{"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}
Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers",fromKafkaBootstrapServers);
kafkaProperties.put("group.id",fromKafkaGroupID);
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(fromKafkaTopic, new SimpleStringSchema(), kafkaProperties);
kafkaConsumer.setStartFromLatest();
DataStream<String> kafkaSource = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id-kafka-source");
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> eventStream = kafkaSource.process(new ProcessFunction<String, Tuple4<String, String, String, Integer>>() {
@Override
public void processElement(String value, Context ctx, Collector<Tuple4<String, String, String, Integer>> out){
try {
JSONObject obj = JSON.parseObject(value);
String userID = obj.getString("userID");
String eventTime = obj.getString("eventTime");
String eventType = obj.getString("eventType");
int productID = obj.getIntValue("productID");
out.collect(new Tuple4<>(userID, eventTime, eventType, productID));
}catch (Exception ex){
// log.warn("異常數(shù)據(jù):{}",value,ex);
}
}
});
//4冈钦、Mysql配置流
//自定義Mysql Source,周期性地從Mysql中獲取配置李请,并廣播出去
//數(shù)據(jù): 用戶ID,用戶姓名瞧筛,用戶年齡
DataStreamSource<HashMap<String, Tuple2<String, Integer>>> configStream = env.addSource(new MysqlSource(fromMysqlHost, fromMysqlPort, fromMysqlDB, fromMysqlUser, fromMysqlPasswd, fromMysqlSecondInterval));
/*
(1) 先建立MapStateDescriptor
MapStateDescriptor定義了狀態(tài)的名稱、Key和Value的類型捻艳。
這里,MapStateDescriptor中庆猫,key是Void類型认轨,value是Map<String, Tuple2<String,Int>>類型。
*/
MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> configDescriptor = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
/*
(2) 將配置流廣播月培,形成BroadcastStream
*/
BroadcastStream<HashMap<String, Tuple2<String, Integer>>> broadcastConfigStream = configStream.broadcast(configDescriptor);
//5嘁字、事件流和廣播的配置流連接恩急,形成BroadcastConnectedStream
BroadcastConnectedStream<Tuple4<String, String, String, Integer>, HashMap<String, Tuple2<String, Integer>>> connectedStream = eventStream.connect(broadcastConfigStream);
//6、對BroadcastConnectedStream應(yīng)用process方法纪蜒,根據(jù)配置(規(guī)則)處理事件
SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> resultStream = connectedStream.process(new CustomBroadcastProcessFunction());
//7衷恭、輸出結(jié)果
resultStream.print();
//8、生成JobGraph纯续,并開始執(zhí)行
env.execute();
}
/**
* 自定義BroadcastProcessFunction
* 當(dāng)事件流中的用戶ID在配置中出現(xiàn)時(shí)随珠,才對該事件處理, 并在事件中補(bǔ)全用戶的基礎(chǔ)信息
* Tuple4<String, String, String, Integer>: 第一個流(事件流)的數(shù)據(jù)類型
* HashMap<String, Tuple2<String, Integer>>: 第二個流(配置流)的數(shù)據(jù)類型
* Tuple6<String, String, String, Integer,String, Integer>: 返回的數(shù)據(jù)類型
*/
static class CustomBroadcastProcessFunction extends BroadcastProcessFunction<Tuple4<String, String, String, Integer>, HashMap<String, Tuple2<String, Integer>>, Tuple6<String, String, String, Integer, String, Integer>>{
/**定義MapStateDescriptor*/
MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> configDescriptor = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
/**
* 讀取狀態(tài),并基于狀態(tài)猬错,處理事件流中的數(shù)據(jù)
* 在這里窗看,從上下文中獲取狀態(tài),基于獲取的狀態(tài)倦炒,對事件流中的數(shù)據(jù)進(jìn)行處理
* @param value 事件流中的數(shù)據(jù)
* @param ctx 上下文
* @param out 輸出零條或多條數(shù)據(jù)
* @throws Exception
*/
@Override
public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
//事件流中的用戶ID
String userID = value.f0;
//獲取狀態(tài)
ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(configDescriptor);
Map<String, Tuple2<String, Integer>> broadcastStateUserInfo = broadcastState.get(null);
//配置中有此用戶显沈,則在該事件中添加用戶的userName、userAge字段逢唤。
//配置中沒有此用戶拉讯,則丟棄
Tuple2<String, Integer> userInfo = broadcastStateUserInfo.get(userID);
if(userInfo!=null){
out.collect(new Tuple6<>(value.f0,value.f1,value.f2,value.f3,userInfo.f0,userInfo.f1));
}
}
/**
* 處理廣播流中的每一條數(shù)據(jù),并更新狀態(tài)
* @param value 廣播流中的數(shù)據(jù)
* @param ctx 上下文
* @param out 輸出零條或多條數(shù)據(jù)
* @throws Exception
*/
@Override
public void processBroadcastElement(HashMap<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
//獲取狀態(tài)
BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(configDescriptor);
//清空狀態(tài)
broadcastState.clear();
//更新狀態(tài)
broadcastState.put(null,value);
}
}
/**
* 自定義Mysql Source鳖藕,每隔 secondInterval 秒從Mysql中獲取一次配置
*/
static class MysqlSource extends RichSourceFunction<HashMap<String, Tuple2<String, Integer>>> {
private String host;
private Integer port;
private String db;
private String user;
private String passwd;
private Integer secondInterval;
private volatile boolean isRunning = true;
private Connection connection;
private PreparedStatement preparedStatement;
MysqlSource(String host, Integer port, String db, String user, String passwd,Integer secondInterval) {
this.host = host;
this.port = port;
this.db = db;
this.user = user;
this.passwd = passwd;
this.secondInterval = secondInterval;
}
/**
* 開始時(shí), 在open()方法中建立連接
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("com.mysql.jdbc.Driver");
connection= DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/"+db+"?useUnicode=true&characterEncoding=UTF-8", user, passwd);
String sql="select userID,userName,userAge from user_info";
preparedStatement=connection.prepareStatement(sql);
}
/**
* 執(zhí)行完魔慷,調(diào)用close()方法關(guān)系連接,釋放資源
* @throws Exception
*/
@Override
public void close() throws Exception {
super.close();
if(connection!=null){
connection.close();
}
if(preparedStatement !=null){
preparedStatement.close();
}
}
/**
* 調(diào)用run()方法獲取數(shù)據(jù)
* @param ctx
*/
@Override
public void run(SourceContext<HashMap<String, Tuple2<String, Integer>>> ctx) {
try {
while (isRunning){
HashMap<String, Tuple2<String, Integer>> output = new HashMap<>();
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()){
String userID = resultSet.getString("userID");
String userName = resultSet.getString("userName");
int userAge = resultSet.getInt("userAge");
output.put(userID,new Tuple2<>(userName,userAge));
}
ctx.collect(output);
//每隔多少秒執(zhí)行一次查詢
Thread.sleep(1000*secondInterval);
}
}catch (Exception ex){
// log.error("從Mysql獲取配置異常...",ex);
}
}
/**
* 取消時(shí)吊奢,會調(diào)用此方法
*/
@Override
public void cancel() {
isRunning = false;
}
}
}