上篇講了operator state在恢復(fù)(重啟/故障恢復(fù))時要么使用evenly distributed策略鸵闪,要么使用union策略突那,來重啟operator的并發(fā)實例独旷。
operator state支持的第三種類型是 Broadcast State。引入該類型state是為了支持一個流中的一些數(shù)據(jù)需要廣播到所有流中的場景宿稀,這些數(shù)據(jù)會被存儲在本地趁舀,并應(yīng)用在另一些流的所有數(shù)據(jù)上以便進行處理。例如一個很自然的例子祝沸,我們有一個包含一系列規(guī)則的很緩慢的流矮烹,我們想要將這些規(guī)則應(yīng)用到其他流的所有數(shù)據(jù)上。把這個例子記在腦子里罩锐,broadcast state與其他operator state不同點在于:
- 它有一個map結(jié)構(gòu)
- 它僅能在某些特定的operator中使用奉狈,operator需要既可以將 broadcasted stream作為輸入,也可以將 non-broadcasted 流作為輸入
- 這些特定的操作符可以有多個不同的 broadcast state涩惑,每個state都有自己的名稱仁期。
Provided APIs
在展示完整功能前,我們先從一個示例開始講解flink提供的api。我們假設(shè)我們的數(shù)據(jù)流數(shù)據(jù)含有color與shape兩個屬性跛蛋。我們想要找到一些特性模式的數(shù)據(jù)對熬的,如:color屬性相同,且shape符合某些規(guī)則赊级,如先出現(xiàn)矩形再出現(xiàn)三角形押框。我們假設(shè)這些規(guī)則是隨時間變化的。
在這個示例中理逊,第一個流包含 Item類型的數(shù)據(jù)橡伞,他有 Color 與 Shape兩個屬性。另一個流包含Rule類型數(shù)據(jù)晋被。
我們從包含Item的流入手骑歹,我們需要將其根據(jù)Color進行key操作,因為我們要找的數(shù)據(jù)對的color都是一樣的墨微。這樣做之后道媚,會保證具有相同color的數(shù)據(jù)會分配到相同的物理機器上。
// key the shapes by color
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
.keyBy(new KeySelector<Shape, Color>(){...});
再來看 Rule 數(shù)據(jù)流翘县,包含Rule的數(shù)據(jù)流需要被廣播broadcast到所有的下游任務(wù)中去最域,并且這些任務(wù)需要將其存儲到本地,這樣就可以使用本地數(shù)據(jù)與Item數(shù)據(jù)做計算
下面的小段程序會:1)廣播rule數(shù)據(jù)流 2)使用提供的MapStateDescriptor 來創(chuàng)建rule需要存儲到的broadcast state對象锈麸。
// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
最后镀脂,為了在Item流上的每個Item中都應(yīng)用Rule進行計算,我們需要:
1.連接這兩個流
2.定義我們模式匹配的邏輯
可以使用 connect() 方法來連接一個(keyed/non-keyed)流與 Broadcast 流忘伞。在非 broadcast流上調(diào)用connect() 方法薄翅,broadcast流作為參數(shù)傳入。這個方法會返回 BroadcastConnectedStream 類氓奈,在這個類上翘魄,我們可以調(diào)用 process() 方法傳入 CoProcessFunction的實現(xiàn)類。我們可以在這個函數(shù)內(nèi)實現(xiàn)我們的邏輯舀奶。函數(shù)的類型取決于non-broadcast 流的情況:
- 如果它是 keyed 流暑竟,那么函數(shù)類型就是 KeyedBroadcastProcessFunction
- 如果它是non-keyed 流,那么函數(shù)類型是 BroadcastProcessFunction
假設(shè)育勺,我們的non-broadcast流是 non-keyed 類型但荤。
注意:connect方法需要由 non-broadcast 流來調(diào)用,broadcast流作為參數(shù)
DataStream<String> output = colorPartitionedStream
.connect(ruleBroadcastStream)
.process(
// type arguments in our KeyedBroadcastProcessFunction represent:
// 1. the key of the keyed stream
// 2. the type of elements in the non-broadcast side
// 3. the type of elements in the broadcast side
// 4. the type of the result, here a string
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// my matching logic
}
);
BroadcastProcessFunction 與 KeyedBroadcastProcessFunction
在 CoProcessFunction 接口中涧至,有兩個處理方法需要實現(xiàn):processBroadcastElement() 方法腹躁,處理broadcast 流中的數(shù)據(jù);processElement() 方法處理 non-broadcast流的數(shù)據(jù)南蓬。兩個方法的詳細方法簽名如下:
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
首先需要注意的是:兩個接口都需要實現(xiàn) processBroadcastElement() 方法來處理 broadcast流中的數(shù)據(jù)纺非,以及 processElement() 方法來處理 non-broadcast流中的數(shù)據(jù)哑了。
兩個方法的區(qū)別在于他們?nèi)雲(yún)⒌腸ontext 的不同。處理non-broadcast 的方法的入?yún)?ReadOnlyContext铐炫,處理 broadcast 的方法的入?yún)?Context垒手。
兩個context都有如下特點
- 獲取 broadcast.state 的能力:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
- 允許查詢元素的時間戳:ctx.timestamp(
- 獲取當(dāng)前的watermark:ctx.currentWatermark()
- 獲取當(dāng)前processing time:ctx.currentProcessingTime()
- 發(fā)射數(shù)據(jù)到 side-output:ctx.output(OutputTag<X> outputTag, X value)
getBroadcastState()方法中的 stateDescriptor 需要與 .broadcast(ruleStateDescriptor) 中的相同蒜焊。
兩個context的不同點在于兩者對 broadcast state 的訪問級別倒信。 處理broadcast 流的方法的context對broadcast state 是讀寫權(quán)限,而處理 non-broadcast流的方法的context對broadcast state是只讀權(quán)限泳梆。這樣做的原因是:在Flink中鳖悠,是沒有跨任務(wù)數(shù)據(jù)交換(no cross-task communication)的。因此优妙,為了保證在操作符的所有并發(fā)實例中的broadcast state都是相同的乘综,我們僅給broadcast流讀寫權(quán)限,這樣所有任務(wù)中的broadcast的數(shù)據(jù)都會相同套硼,我們也能保證應(yīng)用于non-broadcast數(shù)據(jù)上的計算在所有task中都是相同的卡辰。不這么做,就無法達到一致性的保證邪意,導(dǎo)致不一致且很難調(diào)試的結(jié)果九妈。
注意:processBroadcast() 方法中實現(xiàn)的邏輯也需要在所有并發(fā)操作實例中保持一致性。
最終雾鬼,由于 KeyedBroadcastProcessFunction 是在 keyed 流上進行操作萌朱,它提供了 BroadcastProcessFunction 沒有的一些功能:
- processElement()中的 ReadOnlyContext可以訪問Flink提供的 timer server服務(wù),它允許注冊一個 event time/processing time的時間回調(diào)函數(shù)策菜。當(dāng)觸發(fā)回調(diào)時晶疼,會調(diào)用onTime()方法,該方法的 OnTimeContext 參數(shù)包含了 ReadOnlyContext的所有功能又憨,再加上:
- 判斷該timer是基于event time還是processing time的
- 查詢timer關(guān)聯(lián)的key
2.processBroadcastElement()方法中的 Context 有 applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function) 方法翠霍。該方法可以注冊一個
KeyedStateFunction 應(yīng)用于stateDescriptor所指代的state,所有key上的符合要求的state都會應(yīng)用此functino蠢莺。
注意:只可以在 KeyedBroadcastProcessFunction 中的 processElement() 中注冊timer壶运。不可以在 processBroadcastElement()中注冊,因為這個方法中處理的是broadcast element浪秘,沒有關(guān)聯(lián)的key蒋情。
回到我們一開始的例子,我們的 KeyedBroadcastProcessFunction 可以這樣寫:
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// store partial matches, i.e. first elements of the pair waiting for their second element
// we keep a list as we may have many first elements waiting
private final MapStateDescriptor<String, List<Item>> mapStateDesc =
new MapStateDescriptor<>(
"items",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo<>(Item.class));
// identical to our ruleStateDescriptor above
private final MapStateDescriptor<String, Rule> ruleStateDescriptor =
new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
@Override
public void processBroadcastElement(Rule value,
Context ctx,
Collector<String> out) throws Exception {
ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
}
@Override
public void processElement(Item value,
ReadOnlyContext ctx,
Collector<String> out) throws Exception {
final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
final Shape shape = value.getShape();
for (Map.Entry<String, Rule> entry :
ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
final String ruleName = entry.getKey();
final Rule rule = entry.getValue();
List<Item> stored = state.get(ruleName);
if (stored == null) {
stored = new ArrayList<>();
}
if (shape == rule.second && !stored.isEmpty()) {
for (Item i : stored) {
out.collect("MATCH: " + i + " - " + value);
}
stored.clear();
}
// there is no else{} to cover if rule.first == rule.second
if (shape.equals(rule.first)) {
stored.add(value);
}
if (stored.isEmpty()) {
state.remove(ruleName);
} else {
state.put(ruleName, stored);
}
}
}
}
重要考量
介紹完提供的API后耸携,接下來的這部分用于提醒你棵癣,在使用broadcast state時,需要記住這些重要的事情:
- 沒有跨任務(wù)(實例)間的數(shù)據(jù)交換(no cross-task communication):之前已經(jīng)提過沒什么在 (Keyed)-BroadcastProcessFunction 中僅有處理 broadcast數(shù)據(jù)的方法可以修改 broadcast state 的內(nèi)容夺衍。除此之外狈谊,使用者需要保證所有并發(fā)實例中修改broadcast state時的邏輯都是一樣的。否則,不同的實例就會有不同的state內(nèi)容河劝,導(dǎo)致計算結(jié)果的不一致壁榕。
- 不同任務(wù)(實例)中broadcast state的內(nèi)容的順序可能會不一致:盡管廣播一個流的數(shù)據(jù)會保證所有數(shù)據(jù)將會(最終)到達所有下游任務(wù),但是數(shù)據(jù)到達每一個任務(wù)的順序可能不一樣赎瞎。因此牌里,state更新時,絕對不可以根據(jù)數(shù)據(jù)的順序來更新(而是根據(jù)數(shù)據(jù)的屬性)
- 所有的任務(wù)都會checkpoint它們的 broadcast state:盡管所有的任務(wù)(實例)的broadcast state都保存著相同的數(shù)據(jù)务甥,但是在checkpoint時牡辽,所有的任務(wù)(實例)都會對它們所維持的broadcast state進行備份,而不是只選擇其中的一個實例的broadcast state進行備份敞临。這樣的設(shè)計是為了避免在故障恢復(fù)時态辛,所有的任務(wù)都讀取同一個文件,盡管這樣做會帶來增加state快照的消耗問題挺尿,這個消耗與并發(fā)度p相關(guān)奏黑。Flink會保證,在重啟/伸縮 應(yīng)用時编矾,沒有副本并且不會丟失數(shù)據(jù)熟史。為了以相同的并發(fā)度或更少的并發(fā)度來重啟,每個task都會讀取它自己狀態(tài)快照洽沟。當(dāng)以更多的并發(fā)度來重啟時以故,每一個task讀取自己的狀態(tài)快照,剩下的任務(wù)(p_new - p_old)以輪詢的方式依次讀取舊任務(wù)的checkpoint裆操。
- No RocksDB state backend:broadcast state會保存在內(nèi)存中怒详,需要完成相應(yīng)的內(nèi)存配置。這適用于所有的 operator state踪区。