Working with State描述了運(yùn)算符狀態(tài)资昧,該狀態(tài)在恢復(fù)時(shí)均勻分布于運(yùn)算符的并行任務(wù)之間酬土,或unioned,使用整個(gè)狀態(tài)初始化恢復(fù)的并行任務(wù)格带。
Flink支持的第三種操作符狀態(tài)是廣播狀態(tài)(Broadcast State)撤缴。廣播狀態(tài)(Broadcast State)的引入是為了支持一些來自一個(gè)流的數(shù)據(jù)需要廣播到所有下游任務(wù)的情況,它存儲(chǔ)在本地叽唱,用于處理其他流上的所有傳入元素屈呕。例如,廣播狀態(tài)可以作為一種自然匹配出現(xiàn)棺亭,您可以想象一個(gè)低吞吐量流虎眨,其中包含一組規(guī)則,我們希望對(duì)來自另一個(gè)流的所有元素進(jìn)行評(píng)估镶摘∷宰考慮到上述類型的情況,broadcast狀態(tài)與其他運(yùn)算符狀態(tài)的不同之處在于:
- 它有一種Map格式凄敢。
- 它僅對(duì)具有廣播流和非廣播流作為輸入的特定操作符可用碌冶。
- 并且這樣的操作符可以有不同名稱的多個(gè)廣播狀態(tài)。
Provided APIs
為了展示所提供的api涝缝,在展示其全部功能之前扑庞,我們將從一個(gè)示例開始。作為我們的運(yùn)行示例拒逮,我們會(huì)用到這樣的例子罐氨,有一個(gè)不同顏色和形狀的對(duì)象流,我們想要遵循一定的模式找到一對(duì)相同顏色的對(duì)象消恍,
長(zhǎng)方形后面跟一個(gè)三角形岂昭,我們假設(shè)這組有趣的模式會(huì)隨著時(shí)間而發(fā)展。
在本例中狠怨,第一個(gè)流將包含具有顏色和形狀屬性的Item類型的元素。另一個(gè)流將包含規(guī)則邑遏。
從Items流開始佣赖,因?yàn)槲覀兿胍嗤伾膶?duì),只需要按顏色key分組记盒,這將確保相同顏色的元素最終出現(xiàn)在相同的物理機(jī)器上憎蛤。
// key the shapes by color
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
.keyBy(new KeySelector<Shape, Color>(){...});
接下來看規(guī)則,包含規(guī)則的流應(yīng)該廣播給所有下游任務(wù),這些任務(wù)應(yīng)該在本地存儲(chǔ)俩檬,這樣就可以針對(duì)所有輸入項(xiàng)對(duì)它們進(jìn)行評(píng)估萎胰。下面的代碼片段將i)廣播規(guī)則流,ii)使用提供的MapStateDescriptor棚辽,它將創(chuàng)建規(guī)則存儲(chǔ)的廣播狀態(tài)技竟。
// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
最后,為了對(duì)來自項(xiàng)目流的輸入元素評(píng)估規(guī)則屈藐,我們需要:
- 連接兩個(gè)流榔组。
- 并且指定匹配檢測(cè)邏輯。
將流(鍵或非鍵)與廣播流連接联逻,可在非廣播流上調(diào)用connect()
搓扯,以BroadcastStream
為參數(shù)。將返回一個(gè)BroadcastConnectedStream
包归,在它上面我們可以使用一種特殊類型的CoProcessFunction
調(diào)用process()
锨推。該函數(shù)將包含我們的匹配邏輯。函數(shù)的確切類型取決于非廣播流的類型:
- 如果是鍵的公壤,那么這個(gè)函數(shù)就是一個(gè)
KeyedBroadcastProcessFunction
换可。 - 如果是非鍵的,函數(shù)就是
BroadcastProcessFunction
境钟。
假設(shè)我們的非廣播流是鍵的锦担,以下片段包括上述調(diào)用:
應(yīng)該在非廣播流上調(diào)用連接,并將廣播流作為參數(shù)慨削。
DataStream<Match> output = colorPartitionedStream
.connect(ruleBroadcastStream)
.process(
// type arguments in our KeyedBroadcastProcessFunction represent:
// 1. the key of the keyed stream
// 2. the type of elements in the non-broadcast side
// 3. the type of elements in the broadcast side
// 4. the type of the result, here a string
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// my matching logic
}
)
BroadcastProcessFunction和KeyedBroadcastProcessFunction
與CoProcessFunction函數(shù)一樣洞渔,這些函數(shù)有兩個(gè)要實(shí)現(xiàn)的過程方法;processBroadcastElement()
負(fù)責(zé)處理廣播流中的傳入元素,processElement()
負(fù)責(zé)處理非廣播流中的傳入元素缚态。方法的完整簽名如下:
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
首先要注意的是磁椒,這兩個(gè)函數(shù)都需要實(shí)現(xiàn)processBroadcastElement()
方法來處理廣播端中的元素,以及processElement()
方法來處理非廣播端中的元素玫芦。
這兩種方法提供的上下文中有所不同浆熔。非廣播端有一個(gè)ReadOnlyContext
,而廣播端有一個(gè)Context
桥帆。
這兩種情況(以下列舉的ctx):
- 訪問廣播狀態(tài):
ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
医增。 - 允許查詢?cè)氐臅r(shí)間戳:
ctx.timestamp()
。 - 獲取當(dāng)前水印:
ctx.currentWatermark()
老虫。 - 獲取當(dāng)前處理時(shí)間:
ctx.currentProcessingTime()
叶骨, - 向側(cè)輸出發(fā)送元素:
ctx.output(OutputTag<X> outputTag, X value)
。
getBroadcastState()
中的狀態(tài)描述符(stateDescriptor)應(yīng)該與上面.broadcast(ruleStateDescriptor)
中的狀態(tài)描述符相同祈匙。
不同之處在于它們對(duì)廣播狀態(tài)的訪問類型忽刽。廣播端具有讀寫訪問權(quán)天揖,而非廣播端具有只讀訪問權(quán)(即名稱)。原因是在Flink中沒有跨任務(wù)通信跪帝。為了保證廣播狀態(tài)下的內(nèi)容在運(yùn)算符的所有并行實(shí)例中都是相同的今膊,我們只對(duì)廣播端提供讀寫訪問,它在所有任務(wù)中看到相同的元素伞剑,我們要求在這一側(cè)的每個(gè)傳入元素上的計(jì)算在所有任務(wù)中都是相同的斑唬。忽略此規(guī)則將破壞狀態(tài)的一致性保證,導(dǎo)致不一致且通常難以調(diào)試結(jié)果纸泄。
注意:在“processBroadcast()”中實(shí)現(xiàn)的邏輯必須在所有并行實(shí)例中具有相同的確定性行為!
最后赖钞,由于KeyedBroadcastProcessFunction
是在一個(gè)鍵流上操作的,它公開了一些BroadcastProcessFunction
不可用的功能聘裁。那就是:
-
processElement()
方法中的ReadOnlyContext
允許訪問Flink的底層計(jì)時(shí)器服務(wù)雪营,允許注冊(cè)事件和/或處理時(shí)間計(jì)時(shí)器。當(dāng)計(jì)時(shí)器觸發(fā)時(shí)衡便,onTimer()
(如上所示)通過OnTimerContext
調(diào)用献起,OnTimerContext
公開了與ReadOnlyContext
plus相同的功能
- 能夠詢問觸發(fā)的計(jì)時(shí)器是否是事件或處理時(shí)間1并查詢與計(jì)時(shí)器關(guān)聯(lián)的鍵。
-
processBroadcastElement()
方法中的上下文包含applyToKeyedState(StateDescriptor<S, VS> StateDescriptor, KeyedStateFunction<KS, S> function)
镣陕。允許注冊(cè)一個(gè)KeyedStateFunctio
n谴餐,將其應(yīng)用于與提供的stateDescriptor
關(guān)聯(lián)的所有鍵的所有狀態(tài)。
注意:注冊(cè)計(jì)時(shí)器只能在' KeyedBroadcastProcessFunction '的' processElement() '上進(jìn)行呆抑,而且只能在那里進(jìn)行岂嗓。在“processBroadcastElement()”方法中是不可能的,因?yàn)闆]有與廣播元素相關(guān)聯(lián)的鍵鹊碍。
回到我們?cè)瓉淼睦友嵫常覀兊腒eyedBroadcastProcessFunction可以如下所示:
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// store partial matches, i.e. first elements of the pair waiting for their second element
// we keep a list as we may have many first elements waiting
private final MapStateDescriptor<String, List<Item>> mapStateDesc =
new MapStateDescriptor<>(
"items",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo<>(Item.class));
// identical to our ruleStateDescriptor above
private final MapStateDescriptor<String, Rule> ruleStateDescriptor =
new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
@Override
public void processBroadcastElement(Rule value,
Context ctx,
Collector<String> out) throws Exception {
ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
}
@Override
public void processElement(Item value,
ReadOnlyContext ctx,
Collector<String> out) throws Exception {
final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
final Shape shape = value.getShape();
for (Map.Entry<String, Rule> entry :
ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
final String ruleName = entry.getKey();
final Rule rule = entry.getValue();
List<Item> stored = state.get(ruleName);
if (stored == null) {
stored = new ArrayList<>();
}
if (shape == rule.second && !stored.isEmpty()) {
for (Item i : stored) {
out.collect("MATCH: " + i + " - " + value);
}
stored.clear();
}
// there is no else{} to cover if rule.first == rule.second
if (shape.equals(rule.first)) {
stored.add(value);
}
if (stored.isEmpty()) {
state.remove(ruleName);
} else {
state.put(ruleName, stored);
}
}
}
}
重要的注意事項(xiàng)
在描述了提供的api之后,本節(jié)將重點(diǎn)介紹在使用broadcast state時(shí)需要記住的重要內(nèi)容侈咕。
-
沒有跨任務(wù)通信(There is no cross-task communication): 如前所述公罕,這就是為什么只有(鍵的)
broadcastprocessfunction
的廣播端可以修改廣播狀態(tài)的內(nèi)容。此外耀销,用戶必須確保所有任務(wù)以相同的方式為每個(gè)傳入元素修改broadcast狀態(tài)的內(nèi)容楼眷。
否則,不同的任務(wù)可能具有不同的內(nèi)容熊尉,從而導(dǎo)致不一致的結(jié)果罐柳。 - 廣播狀態(tài)下的事件順序可能會(huì)因任務(wù)的不同而不同(Order of events in Broadcast State may differ across tasks):盡管廣播流的元素可以保證所有元素(最終)都將進(jìn)入所有下游任務(wù),元素可能以不同的順序到達(dá)每個(gè)任務(wù)狰住。因此硝清,每個(gè)傳入元素的狀態(tài)更新不能依賴于傳入事件的順序。
- 所有任務(wù)檢查它們的廣播狀態(tài)(All tasks checkpoint their broadcast state):盡管在檢查點(diǎn)發(fā)生時(shí)转晰,所有任務(wù)的廣播狀態(tài)中都有相同的元素(檢查點(diǎn)屏障不會(huì)跨越元素),所有任務(wù)檢查它們的廣播狀態(tài),而且不止一個(gè)查邢。這是一個(gè)設(shè)計(jì)決策蔗崎,避免在恢復(fù)期間從同一個(gè)文件讀取所有任務(wù)(從而避免熱點(diǎn)),盡管這樣做的代價(jià)是將檢查點(diǎn)狀態(tài)的大小增加p(=并行度)扰藕。Flink保證在恢復(fù)/重新掃描時(shí)不會(huì)有重復(fù)和丟失數(shù)據(jù)缓苛。在并行度相同或更小的恢復(fù)情況下,每個(gè)任務(wù)讀取其檢查點(diǎn)狀態(tài)邓深。在擴(kuò)展時(shí)未桥,每個(gè)任務(wù)讀取自己的狀態(tài),其余任務(wù)(p_new-p_old)以循環(huán)方式讀取以前任務(wù)的檢查點(diǎn)芥备。
- 沒有RocksDB狀態(tài)后端(No RocksDB state backend):廣播狀態(tài)在運(yùn)行時(shí)保存在內(nèi)存中冬耿,應(yīng)該相應(yīng)地進(jìn)行內(nèi)存供應(yīng)。這適用于所有的運(yùn)算符狀態(tài)萌壳。