前言
在之前那篇講解Flink Timer的文章里蛤签,我曾經(jīng)用三言兩語簡單解釋了Key Group和KeyGroupRange的概念瘟芝。實際上性宏,Key Group是Flink狀態(tài)機制中的一個重要設(shè)計踩验,值得專門探究一下涉茧。本文先介紹Flink狀態(tài)的理念赴恨,再經(jīng)由狀態(tài)——主要是Keyed State——的縮放(rescale)引出KeyGroup的細節(jié)。
再認識Flink狀態(tài)
自從開始寫關(guān)于Flink的東西以來伴栓,“狀態(tài)”這個詞被提過不下百次伦连,卻從來沒有統(tǒng)一的定義。Flink官方博客中給出的一種定義如下:
When it comes to stateful stream processing, state comprises of the information that an application or stream processing engine will remember across events and streams as more realtime (unbounded) and/or offline (bounded) data flow through the system.
根據(jù)這句話钳垮,狀態(tài)就是流處理過程中需要“記住”的那些數(shù)據(jù)的快照惑淳。而這些數(shù)據(jù)既可以包括業(yè)務(wù)數(shù)據(jù),也可以包括元數(shù)據(jù)(例如Kafka Consumer的offset)饺窿。以最常用也是最可靠的RocksDB狀態(tài)后端為例歧焦,狀態(tài)數(shù)據(jù)的流動可以抽象為3層,如下圖所示肚医。
用戶代碼產(chǎn)生的狀態(tài)實時地存儲在本地文件中绢馍,并且隨著Checkpoint的周期異步地同步到遠端的可靠分布式文件系統(tǒng)(如HDFS)向瓷。這樣就保證了100%本地性,各個Sub-Task只需要負責自己所屬的那部分狀態(tài)舰涌,不需要通過網(wǎng)絡(luò)互相傳輸狀態(tài)數(shù)據(jù)猖任,也不需要頻繁地讀寫HDFS,減少了開銷舵稠。在Flink作業(yè)重啟時超升,從HDFS取回狀態(tài)數(shù)據(jù)到本地,即可恢復現(xiàn)場哺徊。
我們已經(jīng)知道Flink的狀態(tài)分為兩類:Keyed State和Operator State室琢。前者與每個鍵相關(guān)聯(lián),后者與每個算子的并行實例(即Sub-Task)相關(guān)聯(lián)落追。下面來看看Keyed State的縮放盈滴。
Keyed State的縮放
所謂縮放,在Flink中就是指改變算子的并行度轿钠。Flink是不支持動態(tài)改變并行度的巢钓,必須先停止作業(yè),修改并行度之后再從Savepoint恢復疗垛。如果沒有狀態(tài)症汹,那么不管scale-in還是scale-out都非常簡單,只要做好數(shù)據(jù)流的重新分配就行贷腕,如下圖的例子所示背镇。
可是如果考慮狀態(tài)的話,就沒有那么簡單了:并行度改變之后泽裳,HDFS里的狀態(tài)數(shù)據(jù)該按何種規(guī)則取回給新作業(yè)里的各個Sub-Task瞒斩?下圖示出了這種困局。
按照最naive的思路考慮涮总,F(xiàn)link中的key是按照hash(key) % parallelism
的規(guī)則分配到各個Sub-Task上去的胸囱,那么我們可以在縮放完成后,根據(jù)新分配的key集合從HDFS直接取回對應(yīng)的Keyed State數(shù)據(jù)瀑梗。下圖示出并行度從3增加到4后烹笔,Keyed State中各個key的重新分配。
在Checkpoint發(fā)生時抛丽,狀態(tài)數(shù)據(jù)是順序?qū)懭胛募到y(tǒng)的箕宙。但從上圖可以看出,從狀態(tài)恢復時是隨機讀的铺纽,效率非常低下柬帕。并且縮放之后各Sub-Task處理的key有可能大多都不是縮放之前的那些key,無形中降低了本地性。為了解決這兩個問題陷寝,在FLINK-3755對Keyed State專門引入了Key Group锅很,下面具體看看。
引入Key Group
如果看官有仔細讀Flink官方文檔的話凤跑,可能對這個概念已經(jīng)不陌生了爆安,原話抄錄如下:
Keyed State is further organized into so-called Key Groups. Key Groups are the atomic unit by which Flink can redistribute Keyed State; there are exactly as many Key Groups as the defined maximum parallelism. During execution each parallel instance of a keyed operator works with the keys for one or more Key Groups.
翻譯一下,Key Group是Keyed State分配的原子單位仔引,且Flink作業(yè)內(nèi)Key Group的數(shù)量與最大并行度相同扔仓,也就是說Key Group的索引位于[0, maxParallelism - 1]的區(qū)間內(nèi)。每個Sub-Task都會處理一個到多個Key Group咖耘,在源碼中翘簇,以KeyGroupRange數(shù)據(jù)結(jié)構(gòu)來表示。
KeyGroupRange的邏輯相對簡單儿倒,部分源碼如下版保。注意startKeyGroup和endKeyGroup實際上指的是Key Group的索引,并且是閉區(qū)間夫否。
public class KeyGroupRange implements KeyGroupsList, Serializable {
private static final long serialVersionUID = 4869121477592070607L;
public static final KeyGroupRange EMPTY_KEY_GROUP_RANGE = new KeyGroupRange();
private final int startKeyGroup;
private final int endKeyGroup;
private KeyGroupRange() {
this.startKeyGroup = 0;
this.endKeyGroup = -1;
}
public KeyGroupRange(int startKeyGroup, int endKeyGroup) {
this.startKeyGroup = startKeyGroup;
this.endKeyGroup = endKeyGroup;
}
@Override
public boolean contains(int keyGroup) {
return keyGroup >= startKeyGroup && keyGroup <= endKeyGroup;
}
public KeyGroupRange getIntersection(KeyGroupRange other) {
int start = Math.max(startKeyGroup, other.startKeyGroup);
int end = Math.min(endKeyGroup, other.endKeyGroup);
return start <= end ? new KeyGroupRange(start, end) : EMPTY_KEY_GROUP_RANGE;
}
public int getNumberOfKeyGroups() {
return 1 + endKeyGroup - startKeyGroup;
}
public int getStartKeyGroup() {
return startKeyGroup;
}
public int getEndKeyGroup() {
return endKeyGroup;
}
@Override
public int getKeyGroupId(int idx) {
if (idx < 0 || idx > getNumberOfKeyGroups()) {
throw new IndexOutOfBoundsException("Key group index out of bounds: " + idx);
}
return startKeyGroup + idx;
}
public static KeyGroupRange of(int startKeyGroup, int endKeyGroup) {
return startKeyGroup <= endKeyGroup ? new KeyGroupRange(startKeyGroup, endKeyGroup) : EMPTY_KEY_GROUP_RANGE;
}
}
我們還有兩個問題需要解決:
- 如何決定一個key該分配到哪個Key Group中彻犁?
- 如何決定一個Sub-Task該處理哪些Key Group(即對應(yīng)的KeyGroupRange)?
第一個問題凰慈,相關(guān)方法位于KeyGroupRangeAssignment類:
public static int assignToKeyGroup(Object key, int maxParallelism) {
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
可見是對key進行兩重哈希(一次取hashCode汞幢,一次做MurmurHash)之后,再對最大并行度取余微谓,得到Key Group的索引急鳄。
第二個問題,仍然在上述類中的computeKeyGroupRangeForOperatorIndex()方法堰酿,源碼如下。
public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
int maxParallelism,
int parallelism,
int operatorIndex) {
checkParallelismPreconditions(parallelism);
checkParallelismPreconditions(maxParallelism);
Preconditions.checkArgument(maxParallelism >= parallelism,
"Maximum parallelism must not be smaller than parallelism.");
int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
return new KeyGroupRange(start, end);
}
可見是由并行度张足、最大并行度和算子實例(即Sub-Task)的ID共同決定的触创。根據(jù)Key Group的邏輯,上一節(jié)中Keyed State重分配的場景就會變成下圖所示(設(shè)最大并行度為10)为牍。
很明顯哼绑,將Key Group作為Keyed State的基本分配單元之后,上文所述本地性差和隨機讀的問題都部分得到了解決碉咆。當然還要注意抖韩,最大并行度對Key Group分配的影響是顯而易見的,因此不要隨意修改最大并行度的值疫铜。Flink內(nèi)部確定默認最大并行度的邏輯如下代碼所示茂浮。
public static int computeDefaultMaxParallelism(int operatorParallelism) {
checkParallelismPreconditions(operatorParallelism);
return Math.min(
Math.max(
MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
UPPER_BOUND_MAX_PARALLELISM);
}
其中,下限值DEFAULT_LOWER_BOUND_MAX_PARALLELISM
為128,上限值UPPER_BOUND_MAX_PARALLELISM
為32768席揽。
The End
準備開啟遠程辦公模式顽馋。希望疫情快點好轉(zhuǎn)。
民那晚安幌羞。