在Flink中吁伺,同一個算子可能存在若干個不同的并行實例矩肩,計算過程可能不在同一個Slot中進行秸谢,不同算子之間更是如此靠抑,因此不同算子的計算數(shù)據(jù)之間不能像Java數(shù)組之間一樣互相訪問量九,而廣播變量Broadcast
便是解決這種情況的. 在 flink 中, 針對某一個算子需要使用公共變量的情況下, 就可以把對應(yīng)的數(shù)據(jù)給廣播出去, 這樣在所有的節(jié)點中都可以使用了. 典型的代碼結(jié)構(gòu)如下所示:
在一個算子中使用廣播變量主要有兩個步驟:
-
廣播變量 (一般寫在算子的后面即可)
使用 withBroadcastSet(data, "name") 這個方法即可, name變量代表了獲取該廣播變量的名稱
-
使用廣播變量
使用方法主要是通過 RichFunction, 在 對應(yīng)的 open( )方法中, 可以根據(jù)名稱來獲取對應(yīng)的廣播變量, 只需要一次獲取, 就可以一直使用了, 具體方法如下:
dataSet.map(new RichMapFunction<String, String>() {
List<Integer> bc;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 2. 獲取廣播變量
this.bc = getRuntimeContext().getBroadcastVariable("broadcastData");
}
@Override
public String map(String s) throws Exception {
return s;
}
// 1. 將需要用的變量廣播出去 (這一步可以寫在后面)
}).withBroadcastSet(broadcastData, "broadcastData").print();
下面以一個獲取用戶年齡的例子來演示一個常見的使用案例:
broadcastData 是一個包含用戶 (姓名, 年齡) 的數(shù)據(jù)表
需要在另外一個算子中通過姓名查找年齡, 那么就需要把上表廣播
public class BroadcastExample {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// 創(chuàng)建需要廣播的 數(shù)據(jù)集 (name, age)
Tuple2<String, Integer> john = new Tuple2<>("john", 23);
Tuple2<String, Integer> tom = new Tuple2<>("tom", 24);
Tuple2<String, Integer> shiny = new Tuple2<>("shiny", 22);
DataSource<Tuple2<String, Integer>> broadcastData = env.fromElements(john, tom, shiny);
// 新建一個dataset -> d1, 設(shè)置并行度為4
// 此時 d1 是無法訪問 broadcastData 的數(shù)據(jù)的, 因為兩個dataset可能不在一個節(jié)點或者slot中, 所以 flink 是不允許去訪問的
DataSet<String> d1 = env.fromElements("john", "tom", "shiny").setParallelism(4);
// 使用 RichMapFunction, 在open() 方法中拿到廣播變量
d1.map(new RichMapFunction<String, String>() {
List<Tuple2<String, Integer>> bc;
HashMap<String, Integer> map = new HashMap<>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.bc = getRuntimeContext().getBroadcastVariable("broadcastData");
for (Tuple2<String, Integer> tp : bc) {
this.map.put(tp.f0, tp.f1);
}
}
@Override
public String map(String s) throws Exception {
Integer age = this.map.get(s);
return s + "->" + age;
}
}).withBroadcastSet(broadcastData, "broadcastData").print();
}
}