Rescaling Stateful Applications in Production

一筝野、概述

數(shù)據(jù)局部性是Flink中的一個關(guān)鍵原則,并且強烈影響狀態(tài)的存儲和訪問方式粤剧。Flink中的狀態(tài)都是Local State遗座。Why local state is a fundamental primitive in stream processing

  • 1、Apache Flink is a massively parallel distributed system that allows stateful stream processing at large scale. For scalability, a Flink job is logically decomposed into a graph of operators, and the execution of each operator is physically decomposed into multiple parallel operator instances. Conceptually, each parallel operator instance in Flink is an independent task that can be scheduled on its own machine in a network-connected cluster of shared-nothing machines.

Apache Flink是一個大規(guī)模并行分布式系統(tǒng)俊扳,允許大規(guī)模的有狀態(tài)流處理途蒋。對于可伸縮性,F(xiàn)link作業(yè)在邏輯上被分解為運算符圖馋记,并且每個運算符的執(zhí)行在物理上被分解為多個并行運算符實例号坡。從概念上講,F(xiàn)link中的每個并行運算符實例都是一個獨立的任務(wù)梯醒,可以在無共享機器的網(wǎng)絡(luò)連接集群中的自己的機器上進(jìn)行調(diào)度宽堆。

  • 2、For high throughput and low latency in this setting, network communications among tasks must be minimized. In Flink, network communication for stream processing only happens along the logical edges in the job’s operator graph (vertically), so that the stream data can be transferred from upstream to downstream operators.

對于此設(shè)置中的高吞吐量和低延遲茸习,必須最小化任務(wù)之間的網(wǎng)絡(luò)通信畜隶。在Flink中,用于流處理的網(wǎng)絡(luò)通信僅發(fā)生在作業(yè)運算符圖中的邏輯邊緣(垂直)号胚,以便流數(shù)據(jù)可以從上游傳輸?shù)较掠蝟perator籽慢。

  • 3、For high throughput and low latency in this setting, network communications among tasks must be minimized. In Flink, network communication for stream processing only happens along the logical edges in the job’s operator graph (vertically), so that the stream data can be transferred from upstream to downstream operators.

對于此設(shè)置中的高吞吐量和低延遲猫胁,必須最小化任務(wù)之間的網(wǎng)絡(luò)通信箱亿。在Flink中,用于流處理的網(wǎng)絡(luò)通信僅發(fā)生在作業(yè)運算符圖中的邏輯邊緣(垂直)弃秆,以便流數(shù)據(jù)可以從上游傳輸?shù)较掠蝟perator届惋。

  • 4、However, there is no communication between the parallel instances of an operator (horizontally). To avoid such network communication, data locality is a key principle in Flink and strongly affects how state is stored and accessed.
    但是菠赚,operator的并行實例之間沒有通信(水平)脑豹。為了避免這種網(wǎng)絡(luò)通信,數(shù)據(jù)局部性是Flink中的一個關(guān)鍵原則衡查,并且強烈影響狀態(tài)的存儲和訪問方式瘩欺。

二、Rescaling Stateful Stream Processing Jobs

image.png

三峡捡、Reassigning Operator State When Rescaling

image.png

Operator States的動態(tài)擴展是非常靈活的击碗,現(xiàn)提供了3種擴展筑悴,下面分別介紹:

  • 1们拙、ListState:并發(fā)度在改變的時候稍途,會將并發(fā)上的每個List都取出,然后把這些List合并到一個新的List,然后根據(jù)元素的個數(shù)在均勻分配給新的Task;
  • 2砚婆、UnionListState:相比于ListState更加靈活械拍,把劃分的方式交給用戶去做,當(dāng)改變并發(fā)的時候装盯,會將原來的List拼接起來坷虑。然后不做劃分,直接交給用戶埂奈;
  • 3迄损、BroadcastState:如大表和小表做Join時,小表可以直接廣播給大表的分區(qū)账磺,在每個并發(fā)上的數(shù)據(jù)都是完全一致的芹敌。做的更新也相同,當(dāng)改變并發(fā)的時候垮抗,把這些數(shù)據(jù)COPY到新的Task即可氏捞;
1、Operator State只有一種數(shù)據(jù)結(jié)構(gòu)即:ListState<T>冒版,并且是全局的, Operator State的每個SubTask貢獻(xiàn)一部分T給ListState<T>液茎。正是因為是List,Operator在rescaling的時候辞嗡,才會進(jìn)行分配捆等。否則一個T,對于Flink续室,這個T就是一個黑盒楚里,F(xiàn)link無法進(jìn)行分配。
2猎贴、為什么Operator State只提供了一種數(shù)據(jù)結(jié)構(gòu)ListState<T>班缎,就是因為Operator State的Rescale的問題。
  • 1她渴、Operator State不像Keyed State可以有一個全局的狀態(tài)(對于每一個Key來說达址,當(dāng)然Transform也是圍繞一個個key獨立進(jìn)行),因為無論怎樣改變并行度趁耗,所有具有相同key的Records都會落到一個Task上面沉唠,只是有可能這個key對應(yīng)的Records換個Task而已,一個key只有一個并行度苛败。
  • 2满葛、對于Operator State來說径簿,當(dāng)并行度改變的時候,上游過來的Records會重新散列到SubTask上面嘀韧,可以理解為每個subTask上的Records變了篇亭。所以ListState中的T(item),我們希望是is considered an atomic, independently re-distributable part of the operator state.

圖解

  • As a generalized approach to solve this black box problem, we slightly modified the checkpointing interface, called ListCheckpointed. Figure 2B shows the new checkpointing interface, which returns and receives a list of state partitions. Introducing a list instead of a single object makes the meaningful partitioning of state explicit: each item in the list still remains a black box to Flink, but is considered an atomic, independently re-distributable part of the operator state.

作為解決這個黑盒問題的一種通用方法锄贷,我們稍微修改了一個名為的checkpointing接口译蒂,稱為ListCheckpointed。圖2B顯示了新的檢查點接口谊却,它返回并接收狀態(tài)分區(qū)列表柔昼。引入列表而不是單個對象會使?fàn)顟B(tài)的有意義分區(qū)顯式化:列表中的每個T仍然是Flink的黑盒子,但被認(rèn)為是原子的炎辨,可獨立重新分配的operator state的一部分捕透。

image.png

四、Reassigning Keyed State When Rescaling

1碴萧、Question:

  • 1乙嘀、While this automatically solves the problem of logically remapping the state to sub-tasks after rescaling, there is one more practical problem left to solve: how can we efficiently transfer the state to the subtasks’ local backends?

雖然這會自動解決重新縮放后邏輯上將狀態(tài)重新映射到子任務(wù)的問題(因為由key??),但還有一個實際問題需要解決:我們?nèi)绾尾拍苡行У貙顟B(tài)轉(zhuǎn)移到子任務(wù)的本地后端勿决?

2乒躺、Answer

  • 1、Naive Approach
    A naive approach might be to read all the previous subtask state from the checkpoint in all sub-tasks and filter out the matching keys for each sub-task. While this approach can benefit from a sequential read pattern, each subtask potentially reads a large fraction of irrelevant state data, and the distributed file system receives a huge number of parallel read requests.

一種天真的方法可能是從所有子任務(wù)中的檢查點讀取所有先前的子任務(wù)狀態(tài)低缩,并過濾掉每個子任務(wù)的匹配鍵嘉冒。雖然這種方法可以從順序讀取模式中受益,但是每個子任務(wù)可能會讀取大部分不相關(guān)的狀態(tài)數(shù)據(jù)咆繁,并且分布式文件系統(tǒng)接收大量的并行讀取請求讳推。

  • 2、Index
    Another approach could be to build an index that tracks the location of the state for each key in the checkpoint. With this approach, all sub-tasks could locate and read the matching keys very selectively. This approach would avoid reading irrelevant data, but it has two major downsides. A materialized index for all keys, i.e. a key-to-read-offset mapping, can potentially grow very large. Furthermore, this approach can also introduce a huge amount of random I/O (when seeking to the data for individual keys, see Figure 3A, which typically entails very bad performance in distributed file systems.

另一種方法可以是構(gòu)建一個索引玩般,該索引跟蹤檢查點中每個密鑰的狀態(tài)位置银觅。通過這種方法,所有子任務(wù)都可以非常有選擇地定位和讀取匹配的鍵坏为。這種方法可以避免讀取不相關(guān)的數(shù)據(jù)究驴,但它有兩個主要缺點。1匀伏、所有鍵的物化索引(即鍵 - 讀 - 偏移映射)可能會變得非常大洒忧。此外,2够颠、這種方法還可以引入大量的隨機I/O(當(dāng)尋求單個key的數(shù)據(jù)時熙侍,參見圖3A,這通常在分布式文件系統(tǒng)中帶來非常糟糕的性能)。

image.png
  • 3蛉抓、key-groups(the atomic unit of state assignment)

However, the new parallelism can be at most the previously configured max-parallelism. Once a job was started, the max-parallelism is baked into the savepoints and cannot be changed anymore.
新的并行度最多可以是先前配置的最大并行度庆尘。作業(yè)啟動后,最大并行度將被烘焙到保存點中巷送,并且無法再進(jìn)行更改驶忌。除非拋棄所有狀態(tài),作為一個新job開始

  • 1惩系、Flink’s approach sits in between those two extremes by introducing key-groups as the atomic unit of state assignment. How does this work? The number of key-groups must be determined before the job is started and (currently) cannot be changed after the fact. As key-groups are the atomic unit of state assignment, this also means that the number of key-groups is the upper limit for parallelism. In a nutshell, key-groups give us a way to trade between flexibility in rescaling (by setting an upper limit for parallelism) and the maximum overhead involved in indexing and restoring the state.

  • 2位岔、We assign key-groups to subtasks as ranges. This makes the reads on restore not only sequential within each key-group, but often also across multiple key-groups. An additional benefit: this also keeps the metadata of key-group-to-subtask assignments very small. We do not maintain explicit lists of key-groups because it is sufficient to track the range boundaries.

  • 3如筛、We have illustrated rescaling from parallelism 3 to 4 with 10 key-groups in Figure 3B. As we can see, introducing key-groups and assigning them as ranges greatly improves the access pattern over the naive approach. Equation 2 and 3 in Figure 3B also details how we compute key-groups and the range assignment.

image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末堡牡,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子杨刨,更是在濱河造成了極大的恐慌晤柄,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,126評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件妖胀,死亡現(xiàn)場離奇詭異芥颈,居然都是意外死亡,警方通過查閱死者的電腦和手機赚抡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評論 2 382
  • 文/潘曉璐 我一進(jìn)店門爬坑,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人涂臣,你說我怎么就攤上這事盾计。” “怎么了赁遗?”我有些...
    開封第一講書人閱讀 152,445評論 0 341
  • 文/不壞的土叔 我叫張陵署辉,是天一觀的道長。 經(jīng)常有香客問我岩四,道長哭尝,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,185評論 1 278
  • 正文 為了忘掉前任剖煌,我火速辦了婚禮材鹦,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘耕姊。我一直安慰自己桶唐,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,178評論 5 371
  • 文/花漫 我一把揭開白布箩做。 她就那樣靜靜地躺著莽红,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上安吁,一...
    開封第一講書人閱讀 48,970評論 1 284
  • 那天醉蚁,我揣著相機與錄音,去河邊找鬼鬼店。 笑死网棍,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的妇智。 我是一名探鬼主播滥玷,決...
    沈念sama閱讀 38,276評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼巍棱!你這毒婦竟也來了惑畴?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,927評論 0 259
  • 序言:老撾萬榮一對情侶失蹤航徙,失蹤者是張志新(化名)和其女友劉穎如贷,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體到踏,經(jīng)...
    沈念sama閱讀 43,400評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡杠袱,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,883評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了窝稿。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片楣富。...
    茶點故事閱讀 37,997評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖伴榔,靈堂內(nèi)的尸體忽然破棺而出纹蝴,到底是詐尸還是另有隱情,我是刑警寧澤潮梯,帶...
    沈念sama閱讀 33,646評論 4 322
  • 正文 年R本政府宣布骗灶,位于F島的核電站,受9級特大地震影響秉馏,放射性物質(zhì)發(fā)生泄漏耙旦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,213評論 3 307
  • 文/蒙蒙 一萝究、第九天 我趴在偏房一處隱蔽的房頂上張望免都。 院中可真熱鬧,春花似錦帆竹、人聲如沸绕娘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽险领。三九已至侨舆,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間绢陌,已是汗流浹背挨下。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留脐湾,地道東北人臭笆。 一個月前我還...
    沈念sama閱讀 45,423評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像秤掌,于是被迫代替她去往敵國和親愁铺。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,722評論 2 345

推薦閱讀更多精彩內(nèi)容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi閱讀 7,292評論 0 10
  • 正在屋里看電視,一位鄰居敲門進(jìn)來。 她眼睛紅紅的椒拗。 “出什么事了似将?” 話剛一出口获黔,她便嗚嗚哭了起來蚀苛。 “這日子沒法...
    澠池3112王莉莉閱讀 362評論 2 4
  • 1. 參考文獻(xiàn): [1]CNN的發(fā)展史.http://www.cnblogs.com/52machinelearn...
    dreamsfuture閱讀 594評論 0 0