一。 概述
在流計(jì)算場(chǎng)景中,數(shù)據(jù)沒(méi)有邊界源源不斷的流入的忠蝗,每條數(shù)據(jù)流入都可能會(huì)觸發(fā)計(jì)算增淹,比如在進(jìn)行count或sum這些操作椿访,是選擇每次觸發(fā)計(jì)算將所有流入的歷史數(shù)據(jù)重新計(jì)算一邊還是每次計(jì)算都基于上次計(jì)算結(jié)果進(jìn)行增量計(jì)算呢? 從綜合考慮角度虑润,很多人都會(huì) 選擇增量計(jì)算成玫,那么問(wèn)題就產(chǎn)生了:上一次的中間計(jì)算結(jié)果保存在哪里??jī)?nèi)存拳喻?這其中會(huì)由于本身的網(wǎng)絡(luò)哭当,硬件或軟件等問(wèn)題造成某個(gè)計(jì)算節(jié)點(diǎn)失敗,對(duì)應(yīng)的上次計(jì)算結(jié)果就會(huì)丟失冗澈,在節(jié)點(diǎn)恢復(fù)時(shí)钦勘,是需要將所有歷史數(shù)據(jù)重新計(jì)算一遍的,對(duì)于這樣的結(jié)果大家是很難接受的亚亲。
二彻采。flink中state
而在flink中提出了state用來(lái)存放計(jì)算過(guò)程的節(jié)點(diǎn)中間結(jié)果或元數(shù)據(jù)等腐缤,并提供Exactly-Once語(yǔ)義,例如:執(zhí)行aggregation時(shí)在state中記錄中間聚合結(jié)果肛响,再如從kafka中攝取記錄時(shí)岭粤,是需要記錄對(duì)應(yīng)的partition的offset,而這些state數(shù)據(jù)在計(jì)算過(guò)程中會(huì)進(jìn)行持久化的特笋。state就變成了與時(shí)間相關(guān)的是對(duì)flink任務(wù)內(nèi)部數(shù)據(jù)的快照剃浇。
由于流計(jì)算大多數(shù)場(chǎng)景下都是增量計(jì)算的,數(shù)據(jù)逐條被處理猎物,每次當(dāng)前結(jié)果都是基于上一次計(jì)算結(jié)果之上進(jìn)行處理的偿渡,這也勢(shì)必要將上一次的計(jì)算結(jié)果進(jìn)行存儲(chǔ)持久化,無(wú)論是機(jī)器霸奕,網(wǎng)絡(luò)溜宽,臟數(shù)據(jù)等原因?qū)е碌某绦蝈e(cuò)誤,都能在job進(jìn)行任務(wù)恢復(fù)時(shí)提供支持质帅∈嗜啵基于這些已被持久化的state,而非將歷史的數(shù)據(jù)重新計(jì)算一遍煤惩。
在flink內(nèi)部提供三種state存儲(chǔ)實(shí)現(xiàn)
- 內(nèi)存HeapStateBackend:存放數(shù)據(jù)量小嫉嘀,用于開(kāi)發(fā)測(cè)試使用;生產(chǎn)不建議使用
- HDFS的FsStateBackend :分布式文件持久化魄揉,每次都會(huì)產(chǎn)生網(wǎng)絡(luò)io剪侮,可用于大state,不支持增量洛退;可用于生產(chǎn)
- RocksDB的RocksDBStateBackend:本地文件 + 異步hdfs持久化瓣俯,也可用于大state數(shù)據(jù)量,唯一支持增量兵怯,可用于生產(chǎn)彩匕;
比如使用RocksDB + HDFS進(jìn)行state存儲(chǔ):首先state先在本地存儲(chǔ)到RocksDB,然后異步寫入到HDFS中媒区,這樣可以突破HeapStateBackend受單節(jié)點(diǎn)資源限制(物理內(nèi)存驼仪,機(jī)器故障數(shù)據(jù)丟失等),也減少了分布式過(guò)程寫入帶來(lái)的網(wǎng)絡(luò)io開(kāi)銷袜漩。
(state詳情)https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html
state
三绪爸。operator state與keyedstate相關(guān)
3.1 state 分類
從operator和data兩個(gè)角度可將state劃分為2類
- KeyedState:一般groupby或PartitionBy組成的內(nèi)容,即為key宙攻,每個(gè)key都有自己的state奠货,并且key與key間的state是不可見(jiàn)的
- OperatorState:Source Connector的實(shí)現(xiàn)中就會(huì)用OperatorState來(lái)記錄source數(shù)據(jù)讀取的offset。
3.2 state擴(kuò)容重新分配
在flink中每一個(gè)并行運(yùn)算操作實(shí)例就是一個(gè)獨(dú)立的任務(wù)粘优,可以在機(jī)器上調(diào)度到網(wǎng)絡(luò)中其他的機(jī)器仇味;并且flink能夠進(jìn)行大規(guī)模的有狀態(tài)流處理呻顽,在邏輯上將這些分割成不同operator graph,同時(shí)operator也將被物理分解成多個(gè)操作實(shí)例丹墨。在flink的DAG圖中隨著data流向廊遍,垂直方向存在網(wǎng)絡(luò)io,而水平方向的stateful節(jié)點(diǎn)間是沒(méi)有網(wǎng)絡(luò)通信的贩挣,這樣每個(gè)operator維護(hù)一份自己本地的state喉前,并保存在本地磁盤。
比如source有5個(gè)partition王财,將source并行度1->2,中間stateful operator并行度2->3,結(jié)果如下圖:
分析結(jié)果如下:在flink中不同的state有不同的擴(kuò)容方法
- 關(guān)于operatorstate的擴(kuò)容
這種state分配結(jié)合operator對(duì)應(yīng)并行度parallelism有關(guān)聯(lián)卵迂,比如FlinkKafkaConsumerBase的定義
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
CheckpointListener,
ResultTypeQueryable<T>,
CheckpointedFunction
看到在代碼內(nèi)部使用ListState:ListState<Tuple2<KafkaTopicPartition, Long>>
@PublicEvolving
public interface ListState<T> extends MergingState<T, Iterable<T>> {
void update(List<T> values) throws Exception;
void addAll(List<T> values) throws Exception;
}
通過(guò)源碼可以看到ListState具體定義,T是Tuple2<KafkaTopicPartition,Long>說(shuō)明了state存儲(chǔ)了當(dāng)前partition及其offset信息的列表绒净,KafkaTopicPartition代表一個(gè)partition见咒,Long代表當(dāng)前partition的offset,
當(dāng)source并行度=1挂疆,代表所有的partition都在同一個(gè)線程中讀取改览,對(duì)應(yīng)所有的partition的state也在同一個(gè)state維護(hù):如下圖
當(dāng)把source并行度=2,對(duì)應(yīng)的operator并行度=3缤言,先看下parition與subtask之間的映射方法:
首先根據(jù)topic的hash值得到當(dāng)前的index開(kāi)始點(diǎn)宝当,進(jìn)行對(duì)齊,接著對(duì)當(dāng)前operator的subtasks進(jìn)行取模胆萧,得到的結(jié)果即為當(dāng)前partition分配的subtask的index
public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
// here, the assumption is that the id of Kafka partitions are always ascending
// starting from 0, and therefore can be used directly as the offset clockwise from the start index
return (startIndex + partition.getPartition()) % numParallelSubtasks;
}
可以發(fā)現(xiàn)通過(guò)Operator使用List<T>作為state的存儲(chǔ)結(jié)構(gòu)庆揩,是很容易解決這類state擴(kuò)容的,不過(guò)有一點(diǎn)source擴(kuò)容后的parallelism是否可以超過(guò)Source物理存儲(chǔ)上的partition個(gè)數(shù)跌穗?這樣會(huì)造成資源的浪費(fèi)订晌,超過(guò)partition的并發(fā)永遠(yuǎn)分配不到待處理的partition。
- KeyedState擴(kuò)容處理
在flink相對(duì)OperatorState的大小來(lái)說(shuō)瞻离,KeyedState還是比較大的腾仅,如果采用OperatorState進(jìn)行取模方式可能會(huì)帶來(lái)網(wǎng)絡(luò)拉取的成本較大,flink直接采用key-Groups(類似range的方式分配)
1.Key-Groups:flink對(duì)keyedstate按照key進(jìn)行分組的方式套利,每個(gè)key-group會(huì)包含N>0個(gè)key,是keystate分配的原子單元鹤耍,在flink使用KeyGroupRange代表一個(gè)key-group肉迫。
public class KeyGroupRange implements KeyGroupsList, Serializable {
...
...
private final int startKeyGroup;
private final int endKeyGroup;
...
...
}
在KeyGroupRange中:startKeyGroup和endKeyGroup用來(lái)定義Operator關(guān)聯(lián)的Key-Group個(gè)數(shù)。
不過(guò)參考flink源碼可看到key-group在job啟動(dòng)之前對(duì)應(yīng)的數(shù)量是需要確定并且運(yùn)行中是不可變的稿黄。本身Operator的最大并行度<= key-group個(gè)數(shù)喊衫,每個(gè)Operator實(shí)例都會(huì)有自己的state,每個(gè)state關(guān)聯(lián)至少一個(gè)key-group
...
public static int assignToKeyGroup(Object key, int maxParallelism) {
return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
}
...
...
public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
return MathUtils.murmurHash(keyHash) % maxParallelism;
}
...
其實(shí)從分配key到key-group利用key的hash值與maxParallelism進(jìn)行取模來(lái)完成的杆怕。比如parallelism=2 maxParallelism=10
如上圖比如key=a對(duì)應(yīng)hash(a)=97族购, hash(a) % 10 = 7則分配到KG-7壳贪,其他的以此類推。
flink源碼中針對(duì)分配到key-group的操作如下:
public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
int maxParallelism,
int parallelism,
int operatorIndex) {
checkParallelismPreconditions(parallelism);
checkParallelismPreconditions(maxParallelism);
Preconditions.checkArgument(maxParallelism >= parallelism,
"Maximum parallelism must not be smaller than parallelism.");
// 當(dāng)前operator實(shí)例
int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
// 當(dāng)前operator下一個(gè)實(shí)例的位置
int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
return new KeyGroupRange(start, end);
}
如上代碼就是用于將operator(已指定parallelism和maxparallelism)對(duì)應(yīng)的key-groups進(jìn)行分配寝杖,計(jì)算當(dāng)前keygroup的start违施,end:先計(jì)算每個(gè)Operator實(shí)例至少分配的Key-Group個(gè)數(shù),將不能整除的部分N個(gè)瑟幕,平均分給前N個(gè)實(shí)例磕蒲。最終每個(gè)Operator實(shí)例管理的Key-Groups會(huì)在GroupRange中表示,本質(zhì)是一個(gè)區(qū)間值只盹;實(shí)例圖如下:
1.當(dāng)parallelism=2時(shí)可得到KeyGroupRange:
operatorIndex=0辣往,則得到start=0, end=4:如圖kg-keys:0殖卑,1站削,2,3孵稽,4
operatorIndex=1许起,則得到start=5,end=9:如圖kg-keys:5肛冶,6街氢,7,8睦袖,9
2.當(dāng)parallelism=3時(shí)可得到KeyGroupRange:
operatorIndex=0珊肃,則得到start=0, end=3:如圖kg-keys:0馅笙,1伦乔,2,3
operatorIndex=1董习,則得到start=4烈和,end=6:如圖kg-keys:4,5皿淋,6
operatorIndex=2招刹,則得到start=7, end=9:如圖kg-keys:7窝趣,8疯暑,9
一旦當(dāng)job修改了maxParallelism的值那么會(huì)直接影響到Key-Groups的數(shù)量和key的分配,也會(huì)打亂所有的Key-Group的分配哑舒,目前在Apache Flink系統(tǒng)中統(tǒng)一將maxParallelism的默認(rèn)值調(diào)整到4096妇拯,最大程度的避免無(wú)法擴(kuò)容的情況發(fā)生。