聊聊flink的Managed Keyed State

本文主要研究一下flink的Managed Keyed State

State

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/State.java

/**
 * Interface that different types of partitioned state must implement.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.
 */
@PublicEvolving
public interface State {

    /**
     * Removes the value mapped under the current key.
     */
    void clear();
}
  • State是所有不同類型的State必須實(shí)現(xiàn)的接口胸完,它定義了clear方法

ValueState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/ValueState.java

@PublicEvolving
public interface ValueState<T> extends State {

    /**
     * Returns the current value for the state. When the state is not
     * partitioned the returned value is the same for all inputs in a given
     * operator instance. If state partitioning is applied, the value returned
     * depends on the current operator input, as the operator maintains an
     * independent state for each partition.
     *
     * <p>If you didn't specify a default value when creating the {@link ValueStateDescriptor}
     * this will return {@code null} when to value was previously set using {@link #update(Object)}.
     *
     * @return The state value corresponding to the current input.
     *
     * @throws IOException Thrown if the system cannot access the state.
     */
    T value() throws IOException;

    /**
     * Updates the operator state accessible by {@link #value()} to the given
     * value. The next time {@link #value()} is called (for the same state
     * partition) the returned state will represent the updated value. When a
     * partitioned state is updated with null, the state for the current key
     * will be removed and the default value is returned on the next access.
     *
     * @param value The new value for the state.
     *
     * @throws IOException Thrown if the system cannot access the state.
     */
    void update(T value) throws IOException;

}
  • ValueState繼承了State接口赛不,它定義了value鸥滨、update兩個(gè)方法,一個(gè)用于取值品追,一個(gè)用于更新值

AppendingState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/AppendingState.java

@PublicEvolving
public interface AppendingState<IN, OUT> extends State {

    /**
     * Returns the current value for the state. When the state is not
     * partitioned the returned value is the same for all inputs in a given
     * operator instance. If state partitioning is applied, the value returned
     * depends on the current operator input, as the operator maintains an
     * independent state for each partition.
     *
     * <p><b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method
     * should return {@code null}.
     *
     * @return The operator state value corresponding to the current input or {@code null}
     * if the state is empty.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    OUT get() throws Exception;

    /**
     * Updates the operator state accessible by {@link #get()} by adding the given value
     * to the list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null is passed in, the state value will remain unchanged.
     *
     * @param value The new value for the state.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    void add(IN value) throws Exception;

}
  • AppendingState繼承了State接口,它定義了get、add方法液荸,該State接收IN、OUT兩個(gè)泛型

FoldingState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/FoldingState.java

@PublicEvolving
@Deprecated
public interface FoldingState<T, ACC> extends AppendingState<T, ACC> {}
  • FoldingState繼承了AppendingState脱篙,其中OUT泛型表示ACC娇钱,即累積值;FoldingState在Flink 1.4版本被標(biāo)記為廢棄绊困,后續(xù)會(huì)被移除掉文搂,可使用AggregatingState替代

MergingState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MergingState.java

/**
 * Extension of {@link AppendingState} that allows merging of state. That is, two instances
 * of {@link MergingState} can be combined into a single instance that contains all the
 * information of the two merged states.
 *
 * @param <IN> Type of the value that can be added to the state.
 * @param <OUT> Type of the value that can be retrieved from the state.
 */
@PublicEvolving
public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> { }
  • MergingState繼承了AppendingState,這里用命名表達(dá)merge state的意思秤朗,它有幾個(gè)子接口煤蹭,分別是ListState、ReducingState取视、AggregatingState

ListState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/ListState.java

@PublicEvolving
public interface ListState<T> extends MergingState<T, Iterable<T>> {

    /**
     * Updates the operator state accessible by {@link #get()} by updating existing values to
     * to the given list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null or an empty list is passed in, the state value will be null.
     *
     * @param values The new values for the state.
     *
     * @throws Exception The method may forward exception thrown internally (by I/O or functions).
     */
    void update(List<T> values) throws Exception;

    /**
     * Updates the operator state accessible by {@link #get()} by adding the given values
     * to existing list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null or an empty list is passed in, the state value remains unchanged.
     *
     * @param values The new values to be added to the state.
     *
     * @throws Exception The method may forward exception thrown internally (by I/O or functions).
     */
    void addAll(List<T> values) throws Exception;
}
  • ListState繼承了MergingState硝皂,它的OUT類型為Iterable<IN>;它主要用于operation存儲(chǔ)partitioned list state作谭,它繼承了MergingState接口(指定OUT的泛型為Iterable<T>)稽物,同時(shí)聲明了兩個(gè)方法;其中update用于全量更新state丢早,如果參數(shù)為null或者empty姨裸,那么state會(huì)被清空;addAll方法用于增量更新怨酝,如果參數(shù)為null或者empty傀缩,則保持不變,否則則新增給定的values

ReducingState

flink-core/1.7.0/flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/ReducingState.java

@PublicEvolving
public interface ReducingState<T> extends MergingState<T, T> {}
  • ReducingState繼承了MergingState农猬,它的IN赡艰、OUT類型相同

AggregatingState

flink-core/1.7.0/flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/AggregatingState.java

@PublicEvolving
public interface AggregatingState<IN, OUT> extends MergingState<IN, OUT> {}
  • AggregatingState繼承了MergingState,它與ReducingState不同斤葱,IN慷垮、OUT類型可以不同

MapState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MapState.java

@PublicEvolving
public interface MapState<UK, UV> extends State {

    /**
     * Returns the current value associated with the given key.
     *
     * @param key The key of the mapping
     * @return The value of the mapping with the given key
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    UV get(UK key) throws Exception;

    /**
     * Associates a new value with the given key.
     *
     * @param key The key of the mapping
     * @param value The new value of the mapping
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    void put(UK key, UV value) throws Exception;

    /**
     * Copies all of the mappings from the given map into the state.
     *
     * @param map The mappings to be stored in this state
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    void putAll(Map<UK, UV> map) throws Exception;

    /**
     * Deletes the mapping of the given key.
     *
     * @param key The key of the mapping
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    void remove(UK key) throws Exception;

    /**
     * Returns whether there exists the given mapping.
     *
     * @param key The key of the mapping
     * @return True if there exists a mapping whose key equals to the given key
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    boolean contains(UK key) throws Exception;

    /**
     * Returns all the mappings in the state.
     *
     * @return An iterable view of all the key-value pairs in the state.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    Iterable<Map.Entry<UK, UV>> entries() throws Exception;

    /**
     * Returns all the keys in the state.
     *
     * @return An iterable view of all the keys in the state.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    Iterable<UK> keys() throws Exception;

    /**
     * Returns all the values in the state.
     *
     * @return An iterable view of all the values in the state.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    Iterable<UV> values() throws Exception;

    /**
     * Iterates over all the mappings in the state.
     *
     * @return An iterator over all the mappings in the state
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    Iterator<Map.Entry<UK, UV>> iterator() throws Exception;
}
  • MapState直接繼承了State,它接收UK揍堕、UV兩個(gè)泛型料身,分別是map的key和value的類型

小結(jié)

  • flink提供了好幾個(gè)不同類型的Managed Keyed State,有ValueState<T>衩茸、ListState<T>芹血、ReducingState<T>、AggregatingState<IN, OUT>、FoldingState<T, ACC>幔烛、MapState<UK, UV>
  • ValueState<T>和MapState<UK, UV>是直接繼承State接口啃擦;FoldingState繼承了AppendingState<IN, OUT>(AppendingState直接繼承了State);ListState饿悬、ReducingState令蛉、AggregatingState繼承了MergingState<IN, OUT>(MergingState繼承了AppendingState)
  • FoldingState在Flink 1.4版本被標(biāo)記為廢棄,后續(xù)會(huì)被移除掉狡恬,可使用AggregatingState替代

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末珠叔,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子弟劲,更是在濱河造成了極大的恐慌运杭,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,542評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件铸史,死亡現(xiàn)場(chǎng)離奇詭異翅楼,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,596評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門摆霉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人讼昆,你說(shuō)我怎么就攤上這事蒙袍。” “怎么了玄坦?”我有些...
    開(kāi)封第一講書(shū)人閱讀 158,021評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵血筑,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我煎楣,道長(zhǎng)豺总,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,682評(píng)論 1 284
  • 正文 為了忘掉前任择懂,我火速辦了婚禮喻喳,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘困曙。我一直安慰自己表伦,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,792評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布慷丽。 她就那樣靜靜地躺著蹦哼,像睡著了一般。 火紅的嫁衣襯著肌膚如雪要糊。 梳的紋絲不亂的頭發(fā)上纲熏,一...
    開(kāi)封第一講書(shū)人閱讀 49,985評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音,去河邊找鬼赤套。 笑死飘痛,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的容握。 我是一名探鬼主播宣脉,決...
    沈念sama閱讀 39,107評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼剔氏!你這毒婦竟也來(lái)了塑猖?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,845評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤谈跛,失蹤者是張志新(化名)和其女友劉穎羊苟,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體感憾,經(jīng)...
    沈念sama閱讀 44,299評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蜡励,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,612評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了阻桅。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片凉倚。...
    茶點(diǎn)故事閱讀 38,747評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖嫂沉,靈堂內(nèi)的尸體忽然破棺而出稽寒,到底是詐尸還是另有隱情,我是刑警寧澤趟章,帶...
    沈念sama閱讀 34,441評(píng)論 4 333
  • 正文 年R本政府宣布杏糙,位于F島的核電站,受9級(jí)特大地震影響蚓土,放射性物質(zhì)發(fā)生泄漏宏侍。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,072評(píng)論 3 317
  • 文/蒙蒙 一北戏、第九天 我趴在偏房一處隱蔽的房頂上張望负芋。 院中可真熱鬧,春花似錦嗜愈、人聲如沸旧蛾。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,828評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)锨天。三九已至,卻和暖如春剃毒,著一層夾襖步出監(jiān)牢的瞬間病袄,已是汗流浹背搂赋。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,069評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留益缠,地道東北人脑奠。 一個(gè)月前我還...
    沈念sama閱讀 46,545評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像幅慌,于是被迫代替她去往敵國(guó)和親宋欺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,658評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容

  • mean to add the formatted="false" attribute?.[ 46% 47325/...
    ProZoom閱讀 2,694評(píng)論 0 3
  • 序 本文主要研究一下flink的OperatorStateBackend OperatorStateBackend...
    go4it閱讀 1,699評(píng)論 0 0
  • 隨著大數(shù)據(jù) 2.0 時(shí)代悄然到來(lái)胰伍,大數(shù)據(jù)從簡(jiǎn)單的批處理擴(kuò)展到了實(shí)時(shí)處理齿诞、流處理、交互式查詢和機(jī)器學(xué)習(xí)應(yīng)用骂租。近年來(lái)涌...
    Java大生閱讀 2,106評(píng)論 0 6
  • 序 本文主要研究一下flink的PartitionableListState PartitionableListS...
    go4it閱讀 1,129評(píng)論 0 0
  • 序 本文主要研究一下flink的InputFormatSourceFunction 實(shí)例 這里使用Iterator...
    go4it閱讀 2,679評(píng)論 0 0