聊聊flink的PartitionableListState

本文主要研究一下flink的PartitionableListState

PartitionableListState

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java

    /**
     * Implementation of operator list state.
     *
     * @param <S> the type of an operator state partition.
     */
    static final class PartitionableListState<S> implements ListState<S> {

        /**
         * Meta information of the state, including state name, assignment mode, and serializer
         */
        private RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo;

        /**
         * The internal list the holds the elements of the state
         */
        private final ArrayList<S> internalList;

        /**
         * A serializer that allows to perform deep copies of internalList
         */
        private final ArrayListSerializer<S> internalListCopySerializer;

        PartitionableListState(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
            this(stateMetaInfo, new ArrayList<S>());
        }

        private PartitionableListState(
                RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo,
                ArrayList<S> internalList) {

            this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
            this.internalList = Preconditions.checkNotNull(internalList);
            this.internalListCopySerializer = new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
        }

        private PartitionableListState(PartitionableListState<S> toCopy) {

            this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList));
        }

        public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
            this.stateMetaInfo = stateMetaInfo;
        }

        public RegisteredOperatorStateBackendMetaInfo<S> getStateMetaInfo() {
            return stateMetaInfo;
        }

        public PartitionableListState<S> deepCopy() {
            return new PartitionableListState<>(this);
        }

        @Override
        public void clear() {
            internalList.clear();
        }

        @Override
        public Iterable<S> get() {
            return internalList;
        }

        @Override
        public void add(S value) {
            Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
            internalList.add(value);
        }

        @Override
        public String toString() {
            return "PartitionableListState{" +
                    "stateMetaInfo=" + stateMetaInfo +
                    ", internalList=" + internalList +
                    '}';
        }

        public long[] write(FSDataOutputStream out) throws IOException {

            long[] partitionOffsets = new long[internalList.size()];

            DataOutputView dov = new DataOutputViewStreamWrapper(out);

            for (int i = 0; i < internalList.size(); ++i) {
                S element = internalList.get(i);
                partitionOffsets[i] = out.getPos();
                getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov);
            }

            return partitionOffsets;
        }

        @Override
        public void update(List<S> values) {
            internalList.clear();

            addAll(values);
        }

        @Override
        public void addAll(List<S> values) {
            if (values != null && !values.isEmpty()) {
                internalList.addAll(values);
            }
        }
    }
  • PartitionableListState是DefaultOperatorStateBackend使用的ListState實現(xiàn)骗村,其內(nèi)部使用的是ArrayList(internalList)來存儲state蔗衡,而stateMetaInfo使用的是RegisteredOperatorStateBackendMetaInfo答倡;其write方法將internalList的數(shù)據(jù)序列化到FSDataOutputStream,并返回每個記錄對應(yīng)的offset數(shù)組(partitionOffsets)

ListState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/ListState.java

/**
 * {@link State} interface for partitioned list state in Operations.
 * The state is accessed and modified by user functions, and checkpointed consistently
 * by the system as part of the distributed snapshots.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.
 *
 * @param <T> Type of values that this list state keeps.
 */
@PublicEvolving
public interface ListState<T> extends MergingState<T, Iterable<T>> {

    /**
     * Updates the operator state accessible by {@link #get()} by updating existing values to
     * to the given list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null or an empty list is passed in, the state value will be null.
     *
     * @param values The new values for the state.
     *
     * @throws Exception The method may forward exception thrown internally (by I/O or functions).
     */
    void update(List<T> values) throws Exception;

    /**
     * Updates the operator state accessible by {@link #get()} by adding the given values
     * to existing list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null or an empty list is passed in, the state value remains unchanged.
     *
     * @param values The new values to be added to the state.
     *
     * @throws Exception The method may forward exception thrown internally (by I/O or functions).
     */
    void addAll(List<T> values) throws Exception;
}
  • ListState主要用于operation存儲partitioned list state莹桅,它繼承了MergingState接口(指定OUT的泛型為Iterable<T>)睛榄,同時聲明了兩個方法匈棘;其中update用于全量更新state飘蚯,如果參數(shù)為null或者empty,那么state會被清空艳吠;addAll方法用于增量更新麦备,如果參數(shù)為null或者empty,則保持不變,否則則新增給定的values

MergingState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MergingState.java

/**
 * Extension of {@link AppendingState} that allows merging of state. That is, two instances
 * of {@link MergingState} can be combined into a single instance that contains all the
 * information of the two merged states.
 *
 * @param <IN> Type of the value that can be added to the state.
 * @param <OUT> Type of the value that can be retrieved from the state.
 */
@PublicEvolving
public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> { }
  • MergingState接口僅僅是繼承了AppendingState接口凛篙,用接口命名表示該state支持state合并

AppendingState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/AppendingState.java

/**
 * Base interface for partitioned state that supports adding elements and inspecting the current
 * state. Elements can either be kept in a buffer (list-like) or aggregated into one value.
 *
 * <p>The state is accessed and modified by user functions, and checkpointed consistently
 * by the system as part of the distributed snapshots.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.
 *
 * @param <IN> Type of the value that can be added to the state.
 * @param <OUT> Type of the value that can be retrieved from the state.
 */
@PublicEvolving
public interface AppendingState<IN, OUT> extends State {

    /**
     * Returns the current value for the state. When the state is not
     * partitioned the returned value is the same for all inputs in a given
     * operator instance. If state partitioning is applied, the value returned
     * depends on the current operator input, as the operator maintains an
     * independent state for each partition.
     *
     * <p><b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method
     * should return {@code null}.
     *
     * @return The operator state value corresponding to the current input or {@code null}
     * if the state is empty.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    OUT get() throws Exception;

    /**
     * Updates the operator state accessible by {@link #get()} by adding the given value
     * to the list of values. The next time {@link #get()} is called (for the same state
     * partition) the returned state will represent the updated list.
     *
     * <p>If null is passed in, the state value will remain unchanged.
     *
     * @param value The new value for the state.
     *
     * @throws Exception Thrown if the system cannot access the state.
     */
    void add(IN value) throws Exception;

}
  • AppendingState是partitioned state的基本接口黍匾,它繼承了State接口,同時聲明了get呛梆、add兩個方法锐涯;get方法用于返回當前state的值,如果為空則返回null填物;add方法用于給state添加值

State

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/State.java

/**
 * Interface that different types of partitioned state must implement.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.
 */
@PublicEvolving
public interface State {

    /**
     * Removes the value mapped under the current key.
     */
    void clear();
}
  • State接口定義了所有不同partitioned state實現(xiàn)必須實現(xiàn)的方法纹腌,這里定義了clear方法用于清空當前state的所有值

RegisteredOperatorStateBackendMetaInfo

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java

/**
 * Compound meta information for a registered state in an operator state backend.
 * This contains the state name, assignment mode, and state partition serializer.
 *
 * @param <S> Type of the state.
 */
public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMetaInfoBase {

    /**
     * The mode how elements in this state are assigned to tasks during restore
     */
    @Nonnull
    private final OperatorStateHandle.Mode assignmentMode;

    /**
     * The type serializer for the elements in the state list
     */
    @Nonnull
    private final TypeSerializer<S> partitionStateSerializer;

    public RegisteredOperatorStateBackendMetaInfo(
            @Nonnull String name,
            @Nonnull TypeSerializer<S> partitionStateSerializer,
            @Nonnull OperatorStateHandle.Mode assignmentMode) {
        super(name);
        this.partitionStateSerializer = partitionStateSerializer;
        this.assignmentMode = assignmentMode;
    }

    private RegisteredOperatorStateBackendMetaInfo(@Nonnull RegisteredOperatorStateBackendMetaInfo<S> copy) {
        this(
            Preconditions.checkNotNull(copy).name,
            copy.partitionStateSerializer.duplicate(),
            copy.assignmentMode);
    }

    @SuppressWarnings("unchecked")
    public RegisteredOperatorStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
        this(
            snapshot.getName(),
            (TypeSerializer<S>) Preconditions.checkNotNull(
                snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)),
            OperatorStateHandle.Mode.valueOf(
                snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
        Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType());
    }

    /**
     * Creates a deep copy of the itself.
     */
    @Nonnull
    public RegisteredOperatorStateBackendMetaInfo<S> deepCopy() {
        return new RegisteredOperatorStateBackendMetaInfo<>(this);
    }

    @Nonnull
    @Override
    public StateMetaInfoSnapshot snapshot() {
        return computeSnapshot();
    }

    //......

    @Nonnull
    private StateMetaInfoSnapshot computeSnapshot() {
        Map<String, String> optionsMap = Collections.singletonMap(
            StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
            assignmentMode.toString());
        String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
        Map<String, TypeSerializer<?>> serializerMap =
            Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate());
        Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap =
            Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration());

        return new StateMetaInfoSnapshot(
            name,
            StateMetaInfoSnapshot.BackendStateType.OPERATOR,
            optionsMap,
            serializerConfigSnapshotsMap,
            serializerMap);
    }
}
  • RegisteredOperatorStateBackendMetaInfo繼承了抽象類RegisteredStateMetaInfoBase,實現(xiàn)了snapshot的抽象方法滞磺,這里是通過computeSnapshot方法來實現(xiàn)升薯;computeSnapshot方法主要是構(gòu)造StateMetaInfoSnapshot所需的optionsMap、serializerConfigSnapshotsMap击困、serializerMap

小結(jié)

  • flink的manageed operator state僅僅支持ListState涎劈,DefaultOperatorStateBackend使用的ListState實現(xiàn)是PartitionableListState,其內(nèi)部使用的是ArrayList(internalList)來存儲state阅茶,而stateMetaInfo使用的是RegisteredOperatorStateBackendMetaInfo
  • PartitionableListState實現(xiàn)了ListState接口(update责语、addAll方法);而ListState接口繼承了MergingState接口(指定OUT的泛型為Iterable<T>)目派;MergingState接口沒有聲明其他方法,它繼承了AppendingState接口胁赢;AppendingState接口繼承了State接口企蹭,同時聲明了get、add方法智末;State接口則定義了clear方法
  • RegisteredOperatorStateBackendMetaInfo繼承了抽象類RegisteredStateMetaInfoBase谅摄,實現(xiàn)了snapshot的抽象方法,這里是通過computeSnapshot方法來實現(xiàn)系馆;computeSnapshot方法主要是構(gòu)造StateMetaInfoSnapshot所需的optionsMap送漠、serializerConfigSnapshotsMap、serializerMap

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末由蘑,一起剝皮案震驚了整個濱河市闽寡,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌尼酿,老刑警劉巖爷狈,帶你破解...
    沈念sama閱讀 212,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異裳擎,居然都是意外死亡涎永,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,755評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來羡微,“玉大人谷饿,你說我怎么就攤上這事÷杈螅” “怎么了博投?”我有些...
    開封第一講書人閱讀 158,369評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長启涯。 經(jīng)常有香客問我贬堵,道長,這世上最難降的妖魔是什么结洼? 我笑而不...
    開封第一講書人閱讀 56,799評論 1 285
  • 正文 為了忘掉前任黎做,我火速辦了婚禮,結(jié)果婚禮上松忍,老公的妹妹穿的比我還像新娘蒸殿。我一直安慰自己,他們只是感情好鸣峭,可當我...
    茶點故事閱讀 65,910評論 6 386
  • 文/花漫 我一把揭開白布宏所。 她就那樣靜靜地躺著,像睡著了一般摊溶。 火紅的嫁衣襯著肌膚如雪爬骤。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,096評論 1 291
  • 那天莫换,我揣著相機與錄音霞玄,去河邊找鬼。 笑死拉岁,一個胖子當著我的面吹牛坷剧,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播喊暖,決...
    沈念sama閱讀 39,159評論 3 411
  • 文/蒼蘭香墨 我猛地睜開眼惫企,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了陵叽?” 一聲冷哼從身側(cè)響起狞尔,我...
    開封第一講書人閱讀 37,917評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎巩掺,沒想到半個月后沪么,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,360評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡锌半,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,673評論 2 327
  • 正文 我和宋清朗相戀三年禽车,在試婚紗的時候發(fā)現(xiàn)自己被綠了寇漫。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,814評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡殉摔,死狀恐怖州胳,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情逸月,我是刑警寧澤栓撞,帶...
    沈念sama閱讀 34,509評論 4 334
  • 正文 年R本政府宣布,位于F島的核電站碗硬,受9級特大地震影響瓤湘,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜恩尾,卻給世界環(huán)境...
    茶點故事閱讀 40,156評論 3 317
  • 文/蒙蒙 一弛说、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧翰意,春花似錦木人、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至进鸠,卻和暖如春稠曼,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背客年。 一陣腳步聲響...
    開封第一講書人閱讀 32,123評論 1 267
  • 我被黑心中介騙來泰國打工霞幅, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人搀罢。 一個月前我還...
    沈念sama閱讀 46,641評論 2 362
  • 正文 我出身青樓,卻偏偏與公主長得像侥猩,于是被迫代替她去往敵國和親榔至。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,728評論 2 351

推薦閱讀更多精彩內(nèi)容