Flink-Streaming-State & Fault Tolerance-The Broadcast State Pattern

上篇講了operator state在恢復(fù)(重啟/故障恢復(fù))時要么使用evenly distributed策略鸵闪,要么使用union策略突那,來重啟operator的并發(fā)實例独旷。
operator state支持的第三種類型是 Broadcast State。引入該類型state是為了支持一個流中的一些數(shù)據(jù)需要廣播到所有流中的場景宿稀,這些數(shù)據(jù)會被存儲在本地趁舀,并應(yīng)用在另一些流的所有數(shù)據(jù)上以便進行處理。例如一個很自然的例子祝沸,我們有一個包含一系列規(guī)則的很緩慢的流矮烹,我們想要將這些規(guī)則應(yīng)用到其他流的所有數(shù)據(jù)上。把這個例子記在腦子里罩锐,broadcast state與其他operator state不同點在于:

  • 它有一個map結(jié)構(gòu)
  • 它僅能在某些特定的operator中使用奉狈,operator需要既可以將 broadcasted stream作為輸入,也可以將 non-broadcasted 流作為輸入
  • 這些特定的操作符可以有多個不同的 broadcast state涩惑,每個state都有自己的名稱仁期。

Provided APIs


在展示完整功能前,我們先從一個示例開始講解flink提供的api。我們假設(shè)我們的數(shù)據(jù)流數(shù)據(jù)含有color與shape兩個屬性跛蛋。我們想要找到一些特性模式的數(shù)據(jù)對熬的,如:color屬性相同,且shape符合某些規(guī)則赊级,如先出現(xiàn)矩形再出現(xiàn)三角形押框。我們假設(shè)這些規(guī)則是隨時間變化的。
在這個示例中理逊,第一個流包含 Item類型的數(shù)據(jù)橡伞,他有 Color 與 Shape兩個屬性。另一個流包含Rule類型數(shù)據(jù)晋被。
我們從包含Item的流入手骑歹,我們需要將其根據(jù)Color進行key操作,因為我們要找的數(shù)據(jù)對的color都是一樣的墨微。這樣做之后道媚,會保證具有相同color的數(shù)據(jù)會分配到相同的物理機器上。

// key the shapes by color
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
                        .keyBy(new KeySelector<Shape, Color>(){...});

再來看 Rule 數(shù)據(jù)流翘县,包含Rule的數(shù)據(jù)流需要被廣播broadcast到所有的下游任務(wù)中去最域,并且這些任務(wù)需要將其存儲到本地,這樣就可以使用本地數(shù)據(jù)與Item數(shù)據(jù)做計算
下面的小段程序會:1)廣播rule數(shù)據(jù)流 2)使用提供的MapStateDescriptor 來創(chuàng)建rule需要存儲到的broadcast state對象锈麸。

// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));
        
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
                        .broadcast(ruleStateDescriptor);

最后镀脂,為了在Item流上的每個Item中都應(yīng)用Rule進行計算,我們需要:
1.連接這兩個流
2.定義我們模式匹配的邏輯

可以使用 connect() 方法來連接一個(keyed/non-keyed)流與 Broadcast 流忘伞。在非 broadcast流上調(diào)用connect() 方法薄翅,broadcast流作為參數(shù)傳入。這個方法會返回 BroadcastConnectedStream 類氓奈,在這個類上翘魄,我們可以調(diào)用 process() 方法傳入 CoProcessFunction的實現(xiàn)類。我們可以在這個函數(shù)內(nèi)實現(xiàn)我們的邏輯舀奶。函數(shù)的類型取決于non-broadcast 流的情況:

  • 如果它是 keyed 流暑竟,那么函數(shù)類型就是 KeyedBroadcastProcessFunction
  • 如果它是non-keyed 流,那么函數(shù)類型是 BroadcastProcessFunction

假設(shè)育勺,我們的non-broadcast流是 non-keyed 類型但荤。
注意:connect方法需要由 non-broadcast 流來調(diào)用,broadcast流作為參數(shù)

DataStream<String> output = colorPartitionedStream
                 .connect(ruleBroadcastStream)
                 .process(
                     
                     // type arguments in our KeyedBroadcastProcessFunction represent: 
                     //   1. the key of the keyed stream
                     //   2. the type of elements in the non-broadcast side
                     //   3. the type of elements in the broadcast side
                     //   4. the type of the result, here a string
                     
                     new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
                         // my matching logic
                     }
                 );
BroadcastProcessFunction 與 KeyedBroadcastProcessFunction

在 CoProcessFunction 接口中涧至,有兩個處理方法需要實現(xiàn):processBroadcastElement() 方法腹躁,處理broadcast 流中的數(shù)據(jù);processElement() 方法處理 non-broadcast流的數(shù)據(jù)南蓬。兩個方法的詳細方法簽名如下:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

首先需要注意的是:兩個接口都需要實現(xiàn) processBroadcastElement() 方法來處理 broadcast流中的數(shù)據(jù)纺非,以及 processElement() 方法來處理 non-broadcast流中的數(shù)據(jù)哑了。
兩個方法的區(qū)別在于他們?nèi)雲(yún)⒌腸ontext 的不同。處理non-broadcast 的方法的入?yún)?ReadOnlyContext铐炫,處理 broadcast 的方法的入?yún)?Context垒手。
兩個context都有如下特點

  • 獲取 broadcast.state 的能力:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
  • 允許查詢元素的時間戳:ctx.timestamp(
  • 獲取當(dāng)前的watermark:ctx.currentWatermark()
  • 獲取當(dāng)前processing time:ctx.currentProcessingTime()
  • 發(fā)射數(shù)據(jù)到 side-output:ctx.output(OutputTag<X> outputTag, X value)

getBroadcastState()方法中的 stateDescriptor 需要與 .broadcast(ruleStateDescriptor) 中的相同蒜焊。
兩個context的不同點在于兩者對 broadcast state 的訪問級別倒信。 處理broadcast 流的方法的context對broadcast state 是讀寫權(quán)限,而處理 non-broadcast流的方法的context對broadcast state是只讀權(quán)限泳梆。這樣做的原因是:在Flink中鳖悠,是沒有跨任務(wù)數(shù)據(jù)交換(no cross-task communication)的。因此优妙,為了保證在操作符的所有并發(fā)實例中的broadcast state都是相同的乘综,我們僅給broadcast流讀寫權(quán)限,這樣所有任務(wù)中的broadcast的數(shù)據(jù)都會相同套硼,我們也能保證應(yīng)用于non-broadcast數(shù)據(jù)上的計算在所有task中都是相同的卡辰。不這么做,就無法達到一致性的保證邪意,導(dǎo)致不一致且很難調(diào)試的結(jié)果九妈。
注意:processBroadcast() 方法中實現(xiàn)的邏輯也需要在所有并發(fā)操作實例中保持一致性。

最終雾鬼,由于 KeyedBroadcastProcessFunction 是在 keyed 流上進行操作萌朱,它提供了 BroadcastProcessFunction 沒有的一些功能:

  1. processElement()中的 ReadOnlyContext可以訪問Flink提供的 timer server服務(wù),它允許注冊一個 event time/processing time的時間回調(diào)函數(shù)策菜。當(dāng)觸發(fā)回調(diào)時晶疼,會調(diào)用onTime()方法,該方法的 OnTimeContext 參數(shù)包含了 ReadOnlyContext的所有功能又憨,再加上:
  • 判斷該timer是基于event time還是processing time的
  • 查詢timer關(guān)聯(lián)的key

2.processBroadcastElement()方法中的 Context 有 applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function) 方法翠霍。該方法可以注冊一個
KeyedStateFunction 應(yīng)用于stateDescriptor所指代的state,所有key上的符合要求的state都會應(yīng)用此functino蠢莺。

注意:只可以在 KeyedBroadcastProcessFunction 中的 processElement() 中注冊timer壶运。不可以在 processBroadcastElement()中注冊,因為這個方法中處理的是broadcast element浪秘,沒有關(guān)聯(lián)的key蒋情。

回到我們一開始的例子,我們的 KeyedBroadcastProcessFunction 可以這樣寫:

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {

    // store partial matches, i.e. first elements of the pair waiting for their second element
    // we keep a list as we may have many first elements waiting
    private final MapStateDescriptor<String, List<Item>> mapStateDesc =
        new MapStateDescriptor<>(
            "items",
            BasicTypeInfo.STRING_TYPE_INFO,
            new ListTypeInfo<>(Item.class));

    // identical to our ruleStateDescriptor above
    private final MapStateDescriptor<String, Rule> ruleStateDescriptor = 
        new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));

    @Override
    public void processBroadcastElement(Rule value,
                                        Context ctx,
                                        Collector<String> out) throws Exception {
        ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
    }

    @Override
    public void processElement(Item value,
                               ReadOnlyContext ctx,
                               Collector<String> out) throws Exception {

        final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
        final Shape shape = value.getShape();
    
        for (Map.Entry<String, Rule> entry :
                ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
            final String ruleName = entry.getKey();
            final Rule rule = entry.getValue();
    
            List<Item> stored = state.get(ruleName);
            if (stored == null) {
                stored = new ArrayList<>();
            }
    
            if (shape == rule.second && !stored.isEmpty()) {
                for (Item i : stored) {
                    out.collect("MATCH: " + i + " - " + value);
                }
                stored.clear();
            }
    
            // there is no else{} to cover if rule.first == rule.second
            if (shape.equals(rule.first)) {
                stored.add(value);
            }
    
            if (stored.isEmpty()) {
                state.remove(ruleName);
            } else {
                state.put(ruleName, stored);
            }
        }
    }
}

重要考量


介紹完提供的API后耸携,接下來的這部分用于提醒你棵癣,在使用broadcast state時,需要記住這些重要的事情:

  • 沒有跨任務(wù)(實例)間的數(shù)據(jù)交換(no cross-task communication):之前已經(jīng)提過沒什么在 (Keyed)-BroadcastProcessFunction 中僅有處理 broadcast數(shù)據(jù)的方法可以修改 broadcast state 的內(nèi)容夺衍。除此之外狈谊,使用者需要保證所有并發(fā)實例中修改broadcast state時的邏輯都是一樣的。否則,不同的實例就會有不同的state內(nèi)容河劝,導(dǎo)致計算結(jié)果的不一致壁榕。
  • 不同任務(wù)(實例)中broadcast state的內(nèi)容的順序可能會不一致:盡管廣播一個流的數(shù)據(jù)會保證所有數(shù)據(jù)將會(最終)到達所有下游任務(wù),但是數(shù)據(jù)到達每一個任務(wù)的順序可能不一樣赎瞎。因此牌里,state更新時,絕對不可以根據(jù)數(shù)據(jù)的順序來更新(而是根據(jù)數(shù)據(jù)的屬性)
  • 所有的任務(wù)都會checkpoint它們的 broadcast state:盡管所有的任務(wù)(實例)的broadcast state都保存著相同的數(shù)據(jù)务甥,但是在checkpoint時牡辽,所有的任務(wù)(實例)都會對它們所維持的broadcast state進行備份,而不是只選擇其中的一個實例的broadcast state進行備份敞临。這樣的設(shè)計是為了避免在故障恢復(fù)時态辛,所有的任務(wù)都讀取同一個文件,盡管這樣做會帶來增加state快照的消耗問題挺尿,這個消耗與并發(fā)度p相關(guān)奏黑。Flink會保證,在重啟/伸縮 應(yīng)用時编矾,沒有副本并且不會丟失數(shù)據(jù)熟史。為了以相同的并發(fā)度或更少的并發(fā)度來重啟,每個task都會讀取它自己狀態(tài)快照洽沟。當(dāng)以更多的并發(fā)度來重啟時以故,每一個task讀取自己的狀態(tài)快照,剩下的任務(wù)(p_new - p_old)以輪詢的方式依次讀取舊任務(wù)的checkpoint裆操。
  • No RocksDB state backend:broadcast state會保存在內(nèi)存中怒详,需要完成相應(yīng)的內(nèi)存配置。這適用于所有的 operator state踪区。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末昆烁,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子缎岗,更是在濱河造成了極大的恐慌静尼,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件传泊,死亡現(xiàn)場離奇詭異鼠渺,居然都是意外死亡,警方通過查閱死者的電腦和手機眷细,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進店門拦盹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人溪椎,你說我怎么就攤上這事普舆√窨冢” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵沼侣,是天一觀的道長祖能。 經(jīng)常有香客問我,道長蛾洛,這世上最難降的妖魔是什么养铸? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮雅潭,結(jié)果婚禮上揭厚,老公的妹妹穿的比我還像新娘却特。我一直安慰自己扶供,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布裂明。 她就那樣靜靜地躺著椿浓,像睡著了一般。 火紅的嫁衣襯著肌膚如雪闽晦。 梳的紋絲不亂的頭發(fā)上扳碍,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天,我揣著相機與錄音仙蛉,去河邊找鬼笋敞。 笑死,一個胖子當(dāng)著我的面吹牛荠瘪,可吹牛的內(nèi)容都是我干的夯巷。 我是一名探鬼主播,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼哀墓,長吁一口氣:“原來是場噩夢啊……” “哼趁餐!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起篮绰,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤后雷,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后吠各,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體臀突,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年贾漏,在試婚紗的時候發(fā)現(xiàn)自己被綠了候学。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡磕瓷,死狀恐怖盒齿,靈堂內(nèi)的尸體忽然破棺而出念逞,到底是詐尸還是另有隱情,我是刑警寧澤边翁,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布翎承,位于F島的核電站,受9級特大地震影響符匾,放射性物質(zhì)發(fā)生泄漏叨咖。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一啊胶、第九天 我趴在偏房一處隱蔽的房頂上張望甸各。 院中可真熱鬧,春花似錦焰坪、人聲如沸趣倾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽儒恋。三九已至,卻和暖如春黔漂,著一層夾襖步出監(jiān)牢的瞬間诫尽,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工炬守, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留牧嫉,地道東北人。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓减途,卻偏偏與公主長得像酣藻,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子观蜗,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容