序
本文主要研究一下flink的PartitionableListState
PartitionableListState
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
/**
* Implementation of operator list state.
*
* @param <S> the type of an operator state partition.
*/
static final class PartitionableListState<S> implements ListState<S> {
/**
* Meta information of the state, including state name, assignment mode, and serializer
*/
private RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo;
/**
* The internal list the holds the elements of the state
*/
private final ArrayList<S> internalList;
/**
* A serializer that allows to perform deep copies of internalList
*/
private final ArrayListSerializer<S> internalListCopySerializer;
PartitionableListState(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
this(stateMetaInfo, new ArrayList<S>());
}
private PartitionableListState(
RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo,
ArrayList<S> internalList) {
this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
this.internalList = Preconditions.checkNotNull(internalList);
this.internalListCopySerializer = new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer());
}
private PartitionableListState(PartitionableListState<S> toCopy) {
this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList));
}
public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
this.stateMetaInfo = stateMetaInfo;
}
public RegisteredOperatorStateBackendMetaInfo<S> getStateMetaInfo() {
return stateMetaInfo;
}
public PartitionableListState<S> deepCopy() {
return new PartitionableListState<>(this);
}
@Override
public void clear() {
internalList.clear();
}
@Override
public Iterable<S> get() {
return internalList;
}
@Override
public void add(S value) {
Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
internalList.add(value);
}
@Override
public String toString() {
return "PartitionableListState{" +
"stateMetaInfo=" + stateMetaInfo +
", internalList=" + internalList +
'}';
}
public long[] write(FSDataOutputStream out) throws IOException {
long[] partitionOffsets = new long[internalList.size()];
DataOutputView dov = new DataOutputViewStreamWrapper(out);
for (int i = 0; i < internalList.size(); ++i) {
S element = internalList.get(i);
partitionOffsets[i] = out.getPos();
getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov);
}
return partitionOffsets;
}
@Override
public void update(List<S> values) {
internalList.clear();
addAll(values);
}
@Override
public void addAll(List<S> values) {
if (values != null && !values.isEmpty()) {
internalList.addAll(values);
}
}
}
- PartitionableListState是DefaultOperatorStateBackend使用的ListState實現(xiàn)骗村,其內(nèi)部使用的是ArrayList(
internalList
)來存儲state蔗衡,而stateMetaInfo使用的是RegisteredOperatorStateBackendMetaInfo答倡;其write方法將internalList的數(shù)據(jù)序列化到FSDataOutputStream,并返回每個記錄對應(yīng)的offset數(shù)組(partitionOffsets
)
ListState
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/ListState.java
/**
* {@link State} interface for partitioned list state in Operations.
* The state is accessed and modified by user functions, and checkpointed consistently
* by the system as part of the distributed snapshots.
*
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning
* consistently together.
*
* @param <T> Type of values that this list state keeps.
*/
@PublicEvolving
public interface ListState<T> extends MergingState<T, Iterable<T>> {
/**
* Updates the operator state accessible by {@link #get()} by updating existing values to
* to the given list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* <p>If null or an empty list is passed in, the state value will be null.
*
* @param values The new values for the state.
*
* @throws Exception The method may forward exception thrown internally (by I/O or functions).
*/
void update(List<T> values) throws Exception;
/**
* Updates the operator state accessible by {@link #get()} by adding the given values
* to existing list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* <p>If null or an empty list is passed in, the state value remains unchanged.
*
* @param values The new values to be added to the state.
*
* @throws Exception The method may forward exception thrown internally (by I/O or functions).
*/
void addAll(List<T> values) throws Exception;
}
- ListState主要用于operation存儲partitioned list state莹桅,它繼承了MergingState接口(
指定OUT的泛型為Iterable<T>
)睛榄,同時聲明了兩個方法匈棘;其中update用于全量更新state飘蚯,如果參數(shù)為null或者empty,那么state會被清空艳吠;addAll方法用于增量更新麦备,如果參數(shù)為null或者empty,則保持不變,否則則新增給定的values
MergingState
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MergingState.java
/**
* Extension of {@link AppendingState} that allows merging of state. That is, two instances
* of {@link MergingState} can be combined into a single instance that contains all the
* information of the two merged states.
*
* @param <IN> Type of the value that can be added to the state.
* @param <OUT> Type of the value that can be retrieved from the state.
*/
@PublicEvolving
public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> { }
- MergingState接口僅僅是繼承了AppendingState接口凛篙,用接口命名表示該state支持state合并
AppendingState
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/AppendingState.java
/**
* Base interface for partitioned state that supports adding elements and inspecting the current
* state. Elements can either be kept in a buffer (list-like) or aggregated into one value.
*
* <p>The state is accessed and modified by user functions, and checkpointed consistently
* by the system as part of the distributed snapshots.
*
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning
* consistently together.
*
* @param <IN> Type of the value that can be added to the state.
* @param <OUT> Type of the value that can be retrieved from the state.
*/
@PublicEvolving
public interface AppendingState<IN, OUT> extends State {
/**
* Returns the current value for the state. When the state is not
* partitioned the returned value is the same for all inputs in a given
* operator instance. If state partitioning is applied, the value returned
* depends on the current operator input, as the operator maintains an
* independent state for each partition.
*
* <p><b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method
* should return {@code null}.
*
* @return The operator state value corresponding to the current input or {@code null}
* if the state is empty.
*
* @throws Exception Thrown if the system cannot access the state.
*/
OUT get() throws Exception;
/**
* Updates the operator state accessible by {@link #get()} by adding the given value
* to the list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* <p>If null is passed in, the state value will remain unchanged.
*
* @param value The new value for the state.
*
* @throws Exception Thrown if the system cannot access the state.
*/
void add(IN value) throws Exception;
}
- AppendingState是partitioned state的基本接口黍匾,它繼承了State接口,同時聲明了get呛梆、add兩個方法锐涯;get方法用于返回當前state的值,如果為空則返回null填物;add方法用于給state添加值
State
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/State.java
/**
* Interface that different types of partitioned state must implement.
*
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the
* key of the current element. That way, the system can handle stream and state partitioning
* consistently together.
*/
@PublicEvolving
public interface State {
/**
* Removes the value mapped under the current key.
*/
void clear();
}
- State接口定義了所有不同partitioned state實現(xiàn)必須實現(xiàn)的方法纹腌,這里定義了clear方法用于清空當前state的所有值
RegisteredOperatorStateBackendMetaInfo
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java
/**
* Compound meta information for a registered state in an operator state backend.
* This contains the state name, assignment mode, and state partition serializer.
*
* @param <S> Type of the state.
*/
public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMetaInfoBase {
/**
* The mode how elements in this state are assigned to tasks during restore
*/
@Nonnull
private final OperatorStateHandle.Mode assignmentMode;
/**
* The type serializer for the elements in the state list
*/
@Nonnull
private final TypeSerializer<S> partitionStateSerializer;
public RegisteredOperatorStateBackendMetaInfo(
@Nonnull String name,
@Nonnull TypeSerializer<S> partitionStateSerializer,
@Nonnull OperatorStateHandle.Mode assignmentMode) {
super(name);
this.partitionStateSerializer = partitionStateSerializer;
this.assignmentMode = assignmentMode;
}
private RegisteredOperatorStateBackendMetaInfo(@Nonnull RegisteredOperatorStateBackendMetaInfo<S> copy) {
this(
Preconditions.checkNotNull(copy).name,
copy.partitionStateSerializer.duplicate(),
copy.assignmentMode);
}
@SuppressWarnings("unchecked")
public RegisteredOperatorStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {
this(
snapshot.getName(),
(TypeSerializer<S>) Preconditions.checkNotNull(
snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)),
OperatorStateHandle.Mode.valueOf(
snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)));
Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType());
}
/**
* Creates a deep copy of the itself.
*/
@Nonnull
public RegisteredOperatorStateBackendMetaInfo<S> deepCopy() {
return new RegisteredOperatorStateBackendMetaInfo<>(this);
}
@Nonnull
@Override
public StateMetaInfoSnapshot snapshot() {
return computeSnapshot();
}
//......
@Nonnull
private StateMetaInfoSnapshot computeSnapshot() {
Map<String, String> optionsMap = Collections.singletonMap(
StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
assignmentMode.toString());
String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
Map<String, TypeSerializer<?>> serializerMap =
Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate());
Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap =
Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration());
return new StateMetaInfoSnapshot(
name,
StateMetaInfoSnapshot.BackendStateType.OPERATOR,
optionsMap,
serializerConfigSnapshotsMap,
serializerMap);
}
}
- RegisteredOperatorStateBackendMetaInfo繼承了抽象類RegisteredStateMetaInfoBase,實現(xiàn)了snapshot的抽象方法滞磺,這里是通過computeSnapshot方法來實現(xiàn)升薯;computeSnapshot方法主要是構(gòu)造StateMetaInfoSnapshot所需的optionsMap、serializerConfigSnapshotsMap击困、serializerMap
小結(jié)
- flink的manageed operator state僅僅支持ListState涎劈,DefaultOperatorStateBackend使用的ListState實現(xiàn)是PartitionableListState,其內(nèi)部使用的是ArrayList(
internalList
)來存儲state阅茶,而stateMetaInfo使用的是RegisteredOperatorStateBackendMetaInfo - PartitionableListState實現(xiàn)了ListState接口(
update责语、addAll方法
);而ListState接口繼承了MergingState接口(指定OUT的泛型為Iterable<T>
)目派;MergingState接口沒有聲明其他方法,它繼承了AppendingState接口胁赢;AppendingState接口繼承了State接口企蹭,同時聲明了get、add方法智末;State接口則定義了clear方法 - RegisteredOperatorStateBackendMetaInfo繼承了抽象類RegisteredStateMetaInfoBase谅摄,實現(xiàn)了snapshot的抽象方法,這里是通過computeSnapshot方法來實現(xiàn)系馆;computeSnapshot方法主要是構(gòu)造StateMetaInfoSnapshot所需的optionsMap送漠、serializerConfigSnapshotsMap、serializerMap