一筝野、概述
數(shù)據(jù)局部性是Flink中的一個關(guān)鍵原則,并且強烈影響狀態(tài)的存儲和訪問方式粤剧。Flink中的狀態(tài)都是Local State遗座。Why local state is a fundamental primitive in stream processing
- 1、Apache Flink is a massively parallel distributed system that allows stateful stream processing at large scale. For scalability, a Flink job is logically decomposed into a graph of operators, and the execution of each operator is physically decomposed into multiple parallel operator instances. Conceptually, each parallel operator instance in Flink is an independent task that can be scheduled on its own machine in a network-connected cluster of shared-nothing machines.
Apache Flink是一個大規(guī)模并行分布式系統(tǒng)俊扳,允許大規(guī)模的有狀態(tài)流處理途蒋。對于可伸縮性,F(xiàn)link作業(yè)在邏輯上被分解為運算符圖馋记,并且每個運算符的執(zhí)行在物理上被分解為多個并行運算符實例号坡。從概念上講,F(xiàn)link中的每個并行運算符實例都是一個獨立的任務(wù)梯醒,可以在無共享機器的網(wǎng)絡(luò)連接集群中的自己的機器上進(jìn)行調(diào)度宽堆。
- 2、For high throughput and low latency in this setting, network communications among tasks must be minimized. In Flink, network communication for stream processing only happens along the logical edges in the job’s operator graph (vertically), so that the stream data can be transferred from upstream to downstream operators.
對于此設(shè)置中的高吞吐量和低延遲茸习,必須最小化任務(wù)之間的網(wǎng)絡(luò)通信畜隶。在Flink中,用于流處理的網(wǎng)絡(luò)通信僅發(fā)生在作業(yè)運算符圖中的邏輯邊緣(垂直)号胚,以便流數(shù)據(jù)可以從上游傳輸?shù)较掠蝟perator籽慢。
- 3、For high throughput and low latency in this setting, network communications among tasks must be minimized. In Flink, network communication for stream processing only happens along the logical edges in the job’s operator graph (vertically), so that the stream data can be transferred from upstream to downstream operators.
對于此設(shè)置中的高吞吐量和低延遲猫胁,必須最小化任務(wù)之間的網(wǎng)絡(luò)通信箱亿。在Flink中,用于流處理的網(wǎng)絡(luò)通信僅發(fā)生在作業(yè)運算符圖中的邏輯邊緣(垂直)弃秆,以便流數(shù)據(jù)可以從上游傳輸?shù)较掠蝟perator届惋。
- 4、However, there is no communication between the parallel instances of an operator (horizontally). To avoid such network communication, data locality is a key principle in Flink and strongly affects how state is stored and accessed.
但是菠赚,operator的并行實例之間沒有通信(水平)脑豹。為了避免這種網(wǎng)絡(luò)通信,數(shù)據(jù)局部性是Flink中的一個關(guān)鍵原則衡查,并且強烈影響狀態(tài)的存儲和訪問方式瘩欺。
二、Rescaling Stateful Stream Processing Jobs
三峡捡、Reassigning Operator State When Rescaling
Operator States的動態(tài)擴展是非常靈活的击碗,現(xiàn)提供了3種擴展筑悴,下面分別介紹:
- 1们拙、ListState:并發(fā)度在改變的時候稍途,會將并發(fā)上的每個List都取出,然后把這些List合并到一個新的List,然后根據(jù)元素的個數(shù)在均勻分配給新的Task;
- 2砚婆、UnionListState:相比于ListState更加靈活械拍,把劃分的方式交給用戶去做,當(dāng)改變并發(fā)的時候装盯,會將原來的List拼接起來坷虑。然后不做劃分,直接交給用戶埂奈;
- 3迄损、BroadcastState:如大表和小表做Join時,小表可以直接廣播給大表的分區(qū)账磺,在每個并發(fā)上的數(shù)據(jù)都是完全一致的芹敌。做的更新也相同,當(dāng)改變并發(fā)的時候垮抗,把這些數(shù)據(jù)COPY到新的Task即可氏捞;
1、Operator State只有一種數(shù)據(jù)結(jié)構(gòu)即:ListState<T>冒版,并且是全局的, Operator State的每個SubTask貢獻(xiàn)一部分T給ListState<T>液茎。正是因為是List,Operator在rescaling的時候辞嗡,才會進(jìn)行分配捆等。否則一個T,對于Flink续室,這個T就是一個黑盒楚里,F(xiàn)link無法進(jìn)行分配。
2猎贴、為什么Operator State只提供了一種數(shù)據(jù)結(jié)構(gòu)ListState<T>班缎,就是因為Operator State的Rescale的問題。
- 1她渴、Operator State不像Keyed State可以有一個全局的狀態(tài)(對于每一個Key來說达址,當(dāng)然Transform也是圍繞一個個key獨立進(jìn)行),因為無論怎樣改變并行度趁耗,所有具有相同key的Records都會落到一個Task上面沉唠,只是有可能這個key對應(yīng)的Records換個Task而已,一個key只有一個并行度苛败。
- 2满葛、對于Operator State來說径簿,當(dāng)并行度改變的時候,上游過來的Records會重新散列到SubTask上面嘀韧,可以理解為每個subTask上的Records變了篇亭。所以ListState中的T(item),我們希望是is considered an atomic, independently re-distributable part of the operator state.
圖解
- As a generalized approach to solve this black box problem, we slightly modified the checkpointing interface, called ListCheckpointed. Figure 2B shows the new checkpointing interface, which returns and receives a list of state partitions. Introducing a list instead of a single object makes the meaningful partitioning of state explicit: each item in the list still remains a black box to Flink, but is considered an atomic, independently re-distributable part of the operator state.
作為解決這個黑盒問題的一種通用方法锄贷,我們稍微修改了一個名為的checkpointing接口译蒂,稱為ListCheckpointed。圖2B顯示了新的檢查點接口谊却,它返回并接收狀態(tài)分區(qū)列表柔昼。引入列表而不是單個對象會使?fàn)顟B(tài)的有意義分區(qū)顯式化:列表中的每個T仍然是Flink的黑盒子,但被認(rèn)為是原子的炎辨,可獨立重新分配的operator state的一部分捕透。
四、Reassigning Keyed State When Rescaling
1碴萧、Question:
- 1乙嘀、While this automatically solves the problem of logically remapping the state to sub-tasks after rescaling, there is one more practical problem left to solve: how can we efficiently transfer the state to the subtasks’ local backends?
雖然這會自動解決重新縮放后邏輯上將狀態(tài)重新映射到子任務(wù)的問題(因為由key??),但還有一個實際問題需要解決:我們?nèi)绾尾拍苡行У貙顟B(tài)轉(zhuǎn)移到子任務(wù)的本地后端勿决?
2乒躺、Answer
- 1、Naive Approach
A naive approach might be to read all the previous subtask state from the checkpoint in all sub-tasks and filter out the matching keys for each sub-task. While this approach can benefit from a sequential read pattern, each subtask potentially reads a large fraction of irrelevant state data, and the distributed file system receives a huge number of parallel read requests.
一種天真的方法可能是從所有子任務(wù)中的檢查點讀取所有先前的子任務(wù)狀態(tài)低缩,并過濾掉每個子任務(wù)的匹配鍵嘉冒。雖然這種方法可以從順序讀取模式中受益,但是每個子任務(wù)可能會讀取大部分不相關(guān)的狀態(tài)數(shù)據(jù)咆繁,并且分布式文件系統(tǒng)接收大量的并行讀取請求讳推。
- 2、Index
Another approach could be to build an index that tracks the location of the state for each key in the checkpoint. With this approach, all sub-tasks could locate and read the matching keys very selectively. This approach would avoid reading irrelevant data, but it has two major downsides. A materialized index for all keys, i.e. a key-to-read-offset mapping, can potentially grow very large. Furthermore, this approach can also introduce a huge amount of random I/O (when seeking to the data for individual keys, see Figure 3A, which typically entails very bad performance in distributed file systems.
另一種方法可以是構(gòu)建一個索引玩般,該索引跟蹤檢查點中每個密鑰的狀態(tài)位置银觅。通過這種方法,所有子任務(wù)都可以非常有選擇地定位和讀取匹配的鍵坏为。這種方法可以避免讀取不相關(guān)的數(shù)據(jù)究驴,但它有兩個主要缺點。1匀伏、所有鍵的物化索引(即鍵 - 讀 - 偏移映射)可能會變得非常大洒忧。此外,2够颠、這種方法還可以引入大量的隨機I/O(當(dāng)尋求單個key的數(shù)據(jù)時熙侍,參見圖3A,這通常在分布式文件系統(tǒng)中帶來非常糟糕的性能)。
- 3蛉抓、key-groups(the atomic unit of state assignment)
However, the new parallelism can be at most the previously configured max-parallelism. Once a job was started, the max-parallelism is baked into the savepoints and cannot be changed anymore.
新的并行度最多可以是先前配置的最大并行度庆尘。作業(yè)啟動后,最大并行度將被烘焙到保存點中巷送,并且無法再進(jìn)行更改驶忌。除非拋棄所有狀態(tài),作為一個新job開始
1惩系、Flink’s approach sits in between those two extremes by introducing key-groups as the atomic unit of state assignment. How does this work? The number of key-groups must be determined before the job is started and (currently) cannot be changed after the fact. As key-groups are the atomic unit of state assignment, this also means that the number of key-groups is the upper limit for parallelism. In a nutshell, key-groups give us a way to trade between flexibility in rescaling (by setting an upper limit for parallelism) and the maximum overhead involved in indexing and restoring the state.
2位岔、We assign key-groups to subtasks as ranges. This makes the reads on restore not only sequential within each key-group, but often also across multiple key-groups. An additional benefit: this also keeps the metadata of key-group-to-subtask assignments very small. We do not maintain explicit lists of key-groups because it is sufficient to track the range boundaries.
3如筛、We have illustrated rescaling from parallelism 3 to 4 with 10 key-groups in Figure 3B. As we can see, introducing key-groups and assigning them as ranges greatly improves the access pattern over the naive approach. Equation 2 and 3 in Figure 3B also details how we compute key-groups and the range assignment.