Flink Streaming廣播狀態(tài)模式(The Broadcast State Pattern)

Working with State描述了運(yùn)算符狀態(tài)资昧,該狀態(tài)在恢復(fù)時(shí)均勻分布于運(yùn)算符的并行任務(wù)之間酬土,或unioned,使用整個(gè)狀態(tài)初始化恢復(fù)的并行任務(wù)格带。

Flink支持的第三種操作符狀態(tài)是廣播狀態(tài)(Broadcast State)撤缴。廣播狀態(tài)(Broadcast State)的引入是為了支持一些來自一個(gè)流的數(shù)據(jù)需要廣播到所有下游任務(wù)的情況,它存儲(chǔ)在本地叽唱,用于處理其他流上的所有傳入元素屈呕。例如,廣播狀態(tài)可以作為一種自然匹配出現(xiàn)棺亭,您可以想象一個(gè)低吞吐量流虎眨,其中包含一組規(guī)則,我們希望對(duì)來自另一個(gè)流的所有元素進(jìn)行評(píng)估镶摘∷宰考慮到上述類型的情況,broadcast狀態(tài)與其他運(yùn)算符狀態(tài)的不同之處在于:

  1. 它有一種Map格式凄敢。
  2. 它僅對(duì)具有廣播流和非廣播流作為輸入的特定操作符可用碌冶。
  3. 并且這樣的操作符可以有不同名稱的多個(gè)廣播狀態(tài)。

Provided APIs

為了展示所提供的api涝缝,在展示其全部功能之前扑庞,我們將從一個(gè)示例開始。作為我們的運(yùn)行示例拒逮,我們會(huì)用到這樣的例子罐氨,有一個(gè)不同顏色和形狀的對(duì)象流,我們想要遵循一定的模式找到一對(duì)相同顏色的對(duì)象消恍,
長(zhǎng)方形后面跟一個(gè)三角形岂昭,我們假設(shè)這組有趣的模式會(huì)隨著時(shí)間而發(fā)展。

在本例中狠怨,第一個(gè)流將包含具有顏色和形狀屬性的Item類型的元素。另一個(gè)流將包含規(guī)則邑遏。

從Items流開始佣赖,因?yàn)槲覀兿胍嗤伾膶?duì),只需要按顏色key分組记盒,這將確保相同顏色的元素最終出現(xiàn)在相同的物理機(jī)器上憎蛤。

// key the shapes by color
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
                        .keyBy(new KeySelector<Shape, Color>(){...});

接下來看規(guī)則,包含規(guī)則的流應(yīng)該廣播給所有下游任務(wù),這些任務(wù)應(yīng)該在本地存儲(chǔ)俩檬,這樣就可以針對(duì)所有輸入項(xiàng)對(duì)它們進(jìn)行評(píng)估萎胰。下面的代碼片段將i)廣播規(guī)則流,ii)使用提供的MapStateDescriptor棚辽,它將創(chuàng)建規(guī)則存儲(chǔ)的廣播狀態(tài)技竟。

// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));
        
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
                        .broadcast(ruleStateDescriptor);

最后,為了對(duì)來自項(xiàng)目流的輸入元素評(píng)估規(guī)則屈藐,我們需要:

  1. 連接兩個(gè)流榔组。
  2. 并且指定匹配檢測(cè)邏輯。

將流(鍵或非鍵)與廣播流連接联逻,可在非廣播流上調(diào)用connect()搓扯,以BroadcastStream為參數(shù)。將返回一個(gè)BroadcastConnectedStream包归,在它上面我們可以使用一種特殊類型的CoProcessFunction調(diào)用process()锨推。該函數(shù)將包含我們的匹配邏輯。函數(shù)的確切類型取決于非廣播流的類型:

  • 如果是鍵的公壤,那么這個(gè)函數(shù)就是一個(gè)KeyedBroadcastProcessFunction换可。
  • 如果是非鍵的,函數(shù)就是BroadcastProcessFunction境钟。

假設(shè)我們的非廣播流是鍵的锦担,以下片段包括上述調(diào)用:
應(yīng)該在非廣播流上調(diào)用連接,并將廣播流作為參數(shù)慨削。

DataStream<Match> output = colorPartitionedStream
                 .connect(ruleBroadcastStream)
                 .process(
                     
                     // type arguments in our KeyedBroadcastProcessFunction represent: 
                     //   1. the key of the keyed stream
                     //   2. the type of elements in the non-broadcast side
                     //   3. the type of elements in the broadcast side
                     //   4. the type of the result, here a string
                     
                     new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
                         // my matching logic
                     }
                 )

BroadcastProcessFunction和KeyedBroadcastProcessFunction

與CoProcessFunction函數(shù)一樣洞渔,這些函數(shù)有兩個(gè)要實(shí)現(xiàn)的過程方法;processBroadcastElement()負(fù)責(zé)處理廣播流中的傳入元素,processElement()負(fù)責(zé)處理非廣播流中的傳入元素缚态。方法的完整簽名如下:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

首先要注意的是磁椒,這兩個(gè)函數(shù)都需要實(shí)現(xiàn)processBroadcastElement()方法來處理廣播端中的元素,以及processElement()方法來處理非廣播端中的元素玫芦。

這兩種方法提供的上下文中有所不同浆熔。非廣播端有一個(gè)ReadOnlyContext,而廣播端有一個(gè)Context桥帆。

這兩種情況(以下列舉的ctx):

  1. 訪問廣播狀態(tài):ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)医增。
  2. 允許查詢?cè)氐臅r(shí)間戳:ctx.timestamp()
  3. 獲取當(dāng)前水印:ctx.currentWatermark()老虫。
  4. 獲取當(dāng)前處理時(shí)間:ctx.currentProcessingTime()叶骨,
  5. 向側(cè)輸出發(fā)送元素:ctx.output(OutputTag<X> outputTag, X value)

getBroadcastState()中的狀態(tài)描述符(stateDescriptor)應(yīng)該與上面.broadcast(ruleStateDescriptor)中的狀態(tài)描述符相同祈匙。

不同之處在于它們對(duì)廣播狀態(tài)的訪問類型忽刽。廣播端具有讀寫訪問權(quán)天揖,而非廣播端具有只讀訪問權(quán)(即名稱)。原因是在Flink中沒有跨任務(wù)通信跪帝。為了保證廣播狀態(tài)下的內(nèi)容在運(yùn)算符的所有并行實(shí)例中都是相同的今膊,我們只對(duì)廣播端提供讀寫訪問,它在所有任務(wù)中看到相同的元素伞剑,我們要求在這一側(cè)的每個(gè)傳入元素上的計(jì)算在所有任務(wù)中都是相同的斑唬。忽略此規(guī)則將破壞狀態(tài)的一致性保證,導(dǎo)致不一致且通常難以調(diào)試結(jié)果纸泄。

注意:在“processBroadcast()”中實(shí)現(xiàn)的邏輯必須在所有并行實(shí)例中具有相同的確定性行為!

最后赖钞,由于KeyedBroadcastProcessFunction是在一個(gè)鍵流上操作的,它公開了一些BroadcastProcessFunction不可用的功能聘裁。那就是:

  1. processElement()方法中的ReadOnlyContext允許訪問Flink的底層計(jì)時(shí)器服務(wù)雪营,允許注冊(cè)事件和/或處理時(shí)間計(jì)時(shí)器。當(dāng)計(jì)時(shí)器觸發(fā)時(shí)衡便,onTimer()(如上所示)通過OnTimerContext調(diào)用献起,OnTimerContext公開了與ReadOnlyContext plus相同的功能
  • 能夠詢問觸發(fā)的計(jì)時(shí)器是否是事件或處理時(shí)間1并查詢與計(jì)時(shí)器關(guān)聯(lián)的鍵。
  1. processBroadcastElement()方法中的上下文包含applyToKeyedState(StateDescriptor<S, VS> StateDescriptor, KeyedStateFunction<KS, S> function)镣陕。允許注冊(cè)一個(gè)KeyedStateFunction谴餐,將其應(yīng)用于與提供的stateDescriptor關(guān)聯(lián)的所有鍵的所有狀態(tài)。

注意:注冊(cè)計(jì)時(shí)器只能在' KeyedBroadcastProcessFunction '的' processElement() '上進(jìn)行呆抑,而且只能在那里進(jìn)行岂嗓。在“processBroadcastElement()”方法中是不可能的,因?yàn)闆]有與廣播元素相關(guān)聯(lián)的鍵鹊碍。

回到我們?cè)瓉淼睦友嵫常覀兊腒eyedBroadcastProcessFunction可以如下所示:

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {

    // store partial matches, i.e. first elements of the pair waiting for their second element
    // we keep a list as we may have many first elements waiting
    private final MapStateDescriptor<String, List<Item>> mapStateDesc =
        new MapStateDescriptor<>(
            "items",
            BasicTypeInfo.STRING_TYPE_INFO,
            new ListTypeInfo<>(Item.class));

    // identical to our ruleStateDescriptor above
    private final MapStateDescriptor<String, Rule> ruleStateDescriptor = 
        new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));

    @Override
    public void processBroadcastElement(Rule value,
                                        Context ctx,
                                        Collector<String> out) throws Exception {
        ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
    }

    @Override
    public void processElement(Item value,
                               ReadOnlyContext ctx,
                               Collector<String> out) throws Exception {

        final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
        final Shape shape = value.getShape();
    
        for (Map.Entry<String, Rule> entry :
                ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
            final String ruleName = entry.getKey();
            final Rule rule = entry.getValue();
    
            List<Item> stored = state.get(ruleName);
            if (stored == null) {
                stored = new ArrayList<>();
            }
    
            if (shape == rule.second && !stored.isEmpty()) {
                for (Item i : stored) {
                    out.collect("MATCH: " + i + " - " + value);
                }
                stored.clear();
            }
    
            // there is no else{} to cover if rule.first == rule.second
            if (shape.equals(rule.first)) {
                stored.add(value);
            }
    
            if (stored.isEmpty()) {
                state.remove(ruleName);
            } else {
                state.put(ruleName, stored);
            }
        }
    }
}

重要的注意事項(xiàng)

在描述了提供的api之后,本節(jié)將重點(diǎn)介紹在使用broadcast state時(shí)需要記住的重要內(nèi)容侈咕。

  • 沒有跨任務(wù)通信(There is no cross-task communication): 如前所述公罕,這就是為什么只有(鍵的)broadcastprocessfunction的廣播端可以修改廣播狀態(tài)的內(nèi)容。此外耀销,用戶必須確保所有任務(wù)以相同的方式為每個(gè)傳入元素修改broadcast狀態(tài)的內(nèi)容楼眷。
    否則,不同的任務(wù)可能具有不同的內(nèi)容熊尉,從而導(dǎo)致不一致的結(jié)果罐柳。
  • 廣播狀態(tài)下的事件順序可能會(huì)因任務(wù)的不同而不同(Order of events in Broadcast State may differ across tasks):盡管廣播流的元素可以保證所有元素(最終)都將進(jìn)入所有下游任務(wù),元素可能以不同的順序到達(dá)每個(gè)任務(wù)狰住。因此硝清,每個(gè)傳入元素的狀態(tài)更新不能依賴于傳入事件的順序。
  • 所有任務(wù)檢查它們的廣播狀態(tài)(All tasks checkpoint their broadcast state):盡管在檢查點(diǎn)發(fā)生時(shí)转晰,所有任務(wù)的廣播狀態(tài)中都有相同的元素(檢查點(diǎn)屏障不會(huì)跨越元素),所有任務(wù)檢查它們的廣播狀態(tài),而且不止一個(gè)查邢。這是一個(gè)設(shè)計(jì)決策蔗崎,避免在恢復(fù)期間從同一個(gè)文件讀取所有任務(wù)(從而避免熱點(diǎn)),盡管這樣做的代價(jià)是將檢查點(diǎn)狀態(tài)的大小增加p(=并行度)扰藕。Flink保證在恢復(fù)/重新掃描時(shí)不會(huì)有重復(fù)和丟失數(shù)據(jù)缓苛。在并行度相同或更小的恢復(fù)情況下,每個(gè)任務(wù)讀取其檢查點(diǎn)狀態(tài)邓深。在擴(kuò)展時(shí)未桥,每個(gè)任務(wù)讀取自己的狀態(tài),其余任務(wù)(p_new-p_old)以循環(huán)方式讀取以前任務(wù)的檢查點(diǎn)芥备。
  • 沒有RocksDB狀態(tài)后端(No RocksDB state backend):廣播狀態(tài)在運(yùn)行時(shí)保存在內(nèi)存中冬耿,應(yīng)該相應(yīng)地進(jìn)行內(nèi)存供應(yīng)。這適用于所有的運(yùn)算符狀態(tài)萌壳。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末亦镶,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子袱瓮,更是在濱河造成了極大的恐慌缤骨,老刑警劉巖,帶你破解...
    沈念sama閱讀 223,002評(píng)論 6 519
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件尺借,死亡現(xiàn)場(chǎng)離奇詭異绊起,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)燎斩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,357評(píng)論 3 400
  • 文/潘曉璐 我一進(jìn)店門虱歪,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人瘫里,你說我怎么就攤上這事实蔽。” “怎么了谨读?”我有些...
    開封第一講書人閱讀 169,787評(píng)論 0 365
  • 文/不壞的土叔 我叫張陵局装,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我劳殖,道長(zhǎng)铐尚,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,237評(píng)論 1 300
  • 正文 為了忘掉前任哆姻,我火速辦了婚禮宣增,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘矛缨。我一直安慰自己爹脾,他們只是感情好帖旨,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,237評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著灵妨,像睡著了一般解阅。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上泌霍,一...
    開封第一講書人閱讀 52,821評(píng)論 1 314
  • 那天货抄,我揣著相機(jī)與錄音,去河邊找鬼朱转。 笑死蟹地,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的藤为。 我是一名探鬼主播怪与,決...
    沈念sama閱讀 41,236評(píng)論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼凉蜂!你這毒婦竟也來了琼梆?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,196評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤窿吩,失蹤者是張志新(化名)和其女友劉穎茎杂,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體纫雁,經(jīng)...
    沈念sama閱讀 46,716評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡煌往,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,794評(píng)論 3 343
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了轧邪。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片刽脖。...
    茶點(diǎn)故事閱讀 40,928評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖忌愚,靈堂內(nèi)的尸體忽然破棺而出曲管,到底是詐尸還是另有隱情,我是刑警寧澤硕糊,帶...
    沈念sama閱讀 36,583評(píng)論 5 351
  • 正文 年R本政府宣布院水,位于F島的核電站,受9級(jí)特大地震影響简十,放射性物質(zhì)發(fā)生泄漏檬某。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,264評(píng)論 3 336
  • 文/蒙蒙 一螟蝙、第九天 我趴在偏房一處隱蔽的房頂上張望恢恼。 院中可真熱鬧,春花似錦胰默、人聲如沸场斑。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,755評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽和簸。三九已至彭雾,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間锁保,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,869評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工半沽, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留爽柒,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,378評(píng)論 3 379
  • 正文 我出身青樓者填,卻偏偏與公主長(zhǎng)得像浩村,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子占哟,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,937評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容