前言
抱歉起這種爛大街的日本輕小說(shuō)風(fēng)格標(biāo)題來(lái)吸引注意力于游。原本我認(rèn)為這是常識(shí),不需要專(zhuān)門(mén)寫(xiě)一篇文章來(lái)講解如此細(xì)碎的點(diǎn)孵稽。但是在最近工作巡檢中發(fā)現(xiàn)了越來(lái)越多如同ValueState<Map>
的狀態(tài)用法(當(dāng)然大部分是歷史遺留)咙好,部分Flink作業(yè)深受性能問(wèn)題困擾,所以還是抽出點(diǎn)時(shí)間快速聊一聊术吝,順便給出不算優(yōu)雅但還算有效的挽救方案。
基于RocksDB的狀態(tài)序列化
我們已經(jīng)知道茸苇,RocksDB是基于二進(jìn)制流的內(nèi)嵌K-V存儲(chǔ)排苍,所以Flink任務(wù)使用RocksDB狀態(tài)后端時(shí),寫(xiě)/讀操作的狀態(tài)數(shù)據(jù)都需要經(jīng)過(guò)序列化和反序列化学密,從而利用TaskManager本地磁盤(pán)實(shí)現(xiàn)海量的狀態(tài)存儲(chǔ)淘衙。
舉個(gè)栗子,RocksDBValueState的取值和更新方法如下:
class RocksDBValueState<K, N, V> extends AbstractRocksDBState<K, N, V>
implements InternalValueState<K, N, V> {
@Override
public TypeSerializer<K> getKeySerializer() {
return backend.getKeySerializer();
}
@Override
public TypeSerializer<N> getNamespaceSerializer() {
return namespaceSerializer;
}
@Override
public TypeSerializer<V> getValueSerializer() {
return valueSerializer;
}
@Override
public V value() {
try {
byte[] valueBytes =
backend.db.get(columnFamily, serializeCurrentKeyWithGroupAndNamespace());
if (valueBytes == null) {
return getDefaultValue();
}
dataInputView.setBuffer(valueBytes);
return valueSerializer.deserialize(dataInputView);
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while retrieving data from RocksDB.", e);
}
}
@Override
public void update(V value) {
if (value == null) {
clear();
return;
}
try {
backend.db.put(
columnFamily,
writeOptions,
serializeCurrentKeyWithGroupAndNamespace(),
serializeValue(value));
} catch (Exception e) {
throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
}
}
}
可見(jiàn)key和value都需要經(jīng)過(guò)對(duì)應(yīng)類(lèi)型的TypeSerializer
的處理腻暮,即如果將狀態(tài)聲明為ValueState<Map<K, V>>
彤守,那么將由MapSerializer<K, V>
負(fù)責(zé)值的正反序列化毯侦。特別注意,serializeCurrentKeyWithGroupAndNamespace()
方法中具垫,key需要加上它所對(duì)應(yīng)的KeyGroup編號(hào)和對(duì)應(yīng)的Namespace(Namespace是窗口信息)侈离,形成一個(gè)復(fù)合key,即:CompositeKey(KG, K, NS)
筝蚕,RocksDB實(shí)際存儲(chǔ)的狀態(tài)數(shù)據(jù)的key都類(lèi)似如此卦碾。具體可參看SerializedCompositeKeyBuilder
類(lèi),不再贅述起宽。
接下來(lái)再看一下RocksDBMapState的部分實(shí)現(xiàn)洲胖。
class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, UV>>
implements InternalMapState<K, N, UK, UV> {
@Override
public TypeSerializer<K> getKeySerializer() {
return backend.getKeySerializer();
}
@Override
public TypeSerializer<N> getNamespaceSerializer() {
return namespaceSerializer;
}
@Override
public TypeSerializer<Map<UK, UV>> getValueSerializer() {
return valueSerializer;
}
@Override
public UV get(UK userKey) throws IOException, RocksDBException {
byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
return (rawValueBytes == null
? null
: deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
}
@Override
public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);
backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
}
@Override
public void putAll(Map<UK, UV> map) throws IOException, RocksDBException {
if (map == null) {
return;
}
try (RocksDBWriteBatchWrapper writeBatchWrapper =
new RocksDBWriteBatchWrapper(
backend.db, writeOptions, backend.getWriteBatchSize())) {
for (Map.Entry<UK, UV> entry : map.entrySet()) {
byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(
entry.getKey(), userKeySerializer);
byte[] rawValueBytes =
serializeValueNullSensitive(entry.getValue(), userValueSerializer);
writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes);
}
}
}
@Override
public Iterable<Map.Entry<UK, UV>> entries() {
return this::iterator;
}
@Override
public Iterator<Map.Entry<UK, UV>> iterator() {
final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
return new RocksDBMapIterator<Map.Entry<UK, UV>>(
backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
@Override
public Map.Entry<UK, UV> next() {
return nextEntry();
}
};
}
由于MapState的本身有用戶(hù)定義的key UK
,所以RocksDB存儲(chǔ)它時(shí)坯沪,會(huì)在上文所述的復(fù)合key后面绿映,再加上UK
的值,即:CompositeKey(KG, K, NS) :: UK
屏箍。這樣绘梦,同屬于一個(gè)KeyContext的所有用戶(hù)鍵值對(duì)就存在一個(gè)連續(xù)的存儲(chǔ)空間內(nèi),可以通過(guò)RocksDB WriteBatch機(jī)制攢批赴魁,實(shí)現(xiàn)批量寫(xiě)(putAll()
方法),也可以通過(guò)RocksDB Iterator機(jī)制做前綴掃描钝诚,實(shí)現(xiàn)批量讀(entries()
方法)颖御。
問(wèn)題的癥結(jié)
代碼讀完了。假設(shè)我們?cè)谀硞€(gè)key下有5條數(shù)據(jù)的狀態(tài)凝颇,若使用ValueState<Map<String, String>>
來(lái)存儲(chǔ)潘拱,按照MapSerializer
的序列化方式,其存儲(chǔ)可以記為:
(1, k, VoidNamespace) -> [5, k1, false, v1, k2, false, v2, k3, true, k4, false, v4, k5, false, v5]
注意對(duì)于無(wú)窗口上下文的狀態(tài)拧略,NS為VoidNamespace
芦岂。且序列化Map時(shí),會(huì)加上Map的大小垫蛆,以及表示每個(gè)value是否為NULL的標(biāo)記禽最。
如果使用MapState<String, String>
存儲(chǔ),可以記為:
(1, k, VoidNamespace) :: k1 -> v1
(1, k, VoidNamespace) :: k2 -> v2
(1, k, VoidNamespace) :: k3 -> NULL
(1, k, VoidNamespace) :: k4 -> v4
(1, k, VoidNamespace) :: k5 -> v5
如果我們獲取或修改一條狀態(tài)數(shù)據(jù)袱饭,前者需要將所有數(shù)據(jù)做一遍序列化和反序列化川无,而后者只需要處理一條。在Map比較小的情況下可能沒(méi)有明顯的性能差異虑乖,但是如果Map有幾十個(gè)甚至上百個(gè)鍵值對(duì)懦趋,或者某些value的長(zhǎng)度很長(zhǎng)(如各類(lèi)打標(biāo)標(biāo)記串等),ValueState<Map>
的性能退化就會(huì)非常嚴(yán)重疹味,造成反壓仅叫。
有的同學(xué)可能會(huì)問(wèn):我對(duì)狀態(tài)數(shù)據(jù)的操作基本都是“整存整取”(即讀/寫(xiě)整個(gè)Map)帜篇,也不建議使用ValueState<Map>
嗎?答案仍然是不建議诫咱。除了前面提到的WriteBatch和Iterator為MapState
帶來(lái)的優(yōu)化之外坠狡,RocksDB更可以利用多線(xiàn)程進(jìn)行讀寫(xiě),而單個(gè)大value不僅不能享受這個(gè)便利遂跟,還會(huì)擠占Block Cache空間逃沿,在出現(xiàn)數(shù)據(jù)傾斜等場(chǎng)景時(shí),磁盤(pán)I/O可能會(huì)打到瓶頸幻锁。所以凯亮,我們?cè)陂_(kāi)始編寫(xiě)作業(yè)時(shí)就應(yīng)該正確使用MapState
。
平滑遷移
為了消除此類(lèi)狀態(tài)誤用的影響哄尔,常見(jiàn)的重構(gòu)方式是將ValueState<Map>
修改為MapState
假消,重置位點(diǎn)后消費(fèi)歷史數(shù)據(jù),積攢狀態(tài)岭接,并替換掉舊任務(wù)富拗。但是對(duì)于狀態(tài)TTL較長(zhǎng)、size較大的場(chǎng)景(例如物流監(jiān)控場(chǎng)景經(jīng)常有30天TTL鸣戴、十幾TB大的State)啃沪,這樣顯然非常不方便,下面提供一種簡(jiǎn)單的平滑遷移方式窄锅。
假設(shè)原本誤用的狀態(tài)為mainState
创千,我們聲明兩個(gè)新的狀態(tài),一個(gè)是新的MapState newMainState
入偷,一個(gè)是布爾型ValueState isMigratedState
追驴,表示該key對(duì)應(yīng)的狀態(tài)是否已經(jīng)遷移成了新的,即:
private transient ValueState<Map<String, String>> mainState;
private transient ValueState<Boolean> isMigratedState;
private transient MapState<String, String> newMainState;
當(dāng)然疏之,它們的TTL等參數(shù)要完全相同殿雪。
寫(xiě)兩個(gè)新的方法,負(fù)責(zé)在讀寫(xiě)mainState
時(shí)將其遷移成newMainState
锋爪,并做上相應(yīng)的標(biāo)記丙曙。不存在歷史狀態(tài)的,直接以新格式存儲(chǔ)几缭。再?gòu)?qiáng)調(diào)一遍河泳,newMainState.entries()
和newMainState.putAll()
的性能很不錯(cuò),不必過(guò)于擔(dān)心年栓。
private Map<String, String> wrapGetMainState() throws Exception {
Boolean isMigrated = isMigratedState.value();
if (isMigrated == null || !isMigrated) {
Map<String, String> oldStateData = mainState.value();
if (oldStateData != null) {
newMainState.putAll(mainState.value());
}
isMigratedState.update(true);
mainState.clear();
}
Map<String, String> result = new HashMap<>();
for (Entry<String, String> e : newMainState.entries()) {
result.put(e.getKey(), e.getValue());
}
return result;
}
private void wrapUpdateMainState(Map<String, String> data) throws Exception {
Boolean isMigrated = isMigratedState.value();
if (isMigrated == null || !isMigrated) {
Map<String, String> oldStateData = mainState.value();
if (oldStateData != null) {
newMainState.putAll(mainState.value());
}
isMigratedState.update(true);
mainState.clear();
}
newMainState.putAll(data);
}
再將歷史代碼中的狀態(tài)訪問(wèn)全部替換成wrapGetMainState()
和wrapUpdateMainState()
方法的調(diào)用即可拆挥。表面上看是由一個(gè)狀態(tài)句柄變成了兩個(gè)狀態(tài)句柄,但是標(biāo)記狀態(tài)的訪問(wèn)十分輕量級(jí),且隨著程序的運(yùn)行纸兔,舊狀態(tài)的數(shù)據(jù)漸進(jìn)式地替換完畢之后惰瓜,就可以安全地刪除mainState
和isMigratedState
了。當(dāng)然汉矿,托管內(nèi)存的設(shè)置要科學(xué)崎坊,并添加一些有利于RocksDB狀態(tài)吞吐量的參數(shù),如:
state.backend.rocksdb.predefined-options SPINNING_DISK_OPTIMIZED_HIGH_MEM
state.backend.rocksdb.memory.partitioned-index-filters true
基于堆的狀態(tài)呢洲拇?
與RocksDB相反奈揍,基于堆的JobManager和FileSystem狀態(tài)后端無(wú)需序列化和反序列化,當(dāng)然狀態(tài)的大小就要受制于TaskManager內(nèi)存赋续。不過(guò)男翰,如果我們采用這兩種狀態(tài)后端,ValueState<Map>
和MapState
也就沒(méi)有明顯的性能差別了纽乱,因?yàn)?code>HeapValueState和HeapMapState
的底層都是相同的蛾绎,即CopyOnWriteStateTable
,本質(zhì)上是內(nèi)存中的狀態(tài)映射表鸦列。讀者有興趣可以自行參考對(duì)應(yīng)的Flink源碼租冠,這里不再啰嗦了。
ValueState薯嗤、ListState顽爹、MapState三者在RocksDB狀態(tài)后端和基于堆的狀態(tài)后端中的異同點(diǎn)可以概括成下表。