為什么我要把這段話放在最前面呢差凹?因?yàn)椴┲饔辛舜蟀l(fā)現(xiàn)看疙,博主在總結(jié)學(xué)習(xí)的過(guò)程中豆拨,總結(jié)了除了 Flink CP、Chandy-Lamport 全局一致性快照算法之外的一種?通用全局一致性快照算法D芮臁J┖獭!搁胆。
這套?通用算法?包含?Chandy-Lamport 算法?≈?Flink 非對(duì)齊 CP 算法?包含?Flink 對(duì)齊 CP 算法弥搞。
可能這一套?通用算法?之前已經(jīng)有人提過(guò)了,但是博主是自己在總結(jié) Flink CP渠旁、Chandy-Lamport 算法的過(guò)程中攀例,逆推總結(jié)出來(lái)的,并沒(méi)有借助外力9死啊T撩!
1.前言
對(duì)于很多做離線或者實(shí)時(shí)數(shù)倉(cāng)的小伙伴來(lái)說(shuō)杂靶,我先問(wèn)幾個(gè)問(wèn)題梆惯,看看小伙伴萌能回答上來(lái)嗎?
? 你知道狀態(tài)是什么嗎吗垮?在離線數(shù)據(jù)開(kāi)發(fā)的經(jīng)歷中垛吗,你碰到過(guò)狀態(tài)的概念嗎?
? 為什么離線數(shù)倉(cāng)不需要狀態(tài)烁登,實(shí)時(shí)數(shù)據(jù)開(kāi)發(fā)中老是提到狀態(tài)的概念怯屉?
? Flink 中的狀態(tài)、狀態(tài)后端饵沧、全局一致性快照(Checkpoint\Savepoint) 的作用都是什么锨络,這三個(gè)概念的關(guān)聯(lián)又是什么?
? Flink 是通過(guò)什么機(jī)制來(lái)做 Checkpoint 的狼牺?為什么這套機(jī)制能夠做到故障恢復(fù)呢羡儿?
? Flink Checkpoint 是基于 Chandy-Lamport 算法的,但是 Flink 的實(shí)現(xiàn)相比 Chandy-Lamport 算法之間又有哪些優(yōu)點(diǎn)锁右、缺點(diǎn)?
? Flink Checkpoint 用到了 barrier讶泰,為什么用了 barrier 做的快照就能保證全局一致性快照的正確性咏瑟?barrier 到底起到了什么作用?
? Flink 對(duì)齊 Checkpoint 和非對(duì)齊 Checkpoint 的區(qū)別是什么痪署?非對(duì)齊 Checkpoint 也能保障精確一次嗎码泞?
小伙伴們思考一下,都能回答上來(lái)么狼犯,如果對(duì)于某些問(wèn)題你還有疑問(wèn)余寥,樓主會(huì)通過(guò)本篇文章幫你解答這些問(wèn)題领铐,理清這些概念!
由于本文內(nèi)容較多宋舷,所以博主將本文分為上绪撵,中,下三集祝蝠,本文是中音诈,三集內(nèi)容是有連接關(guān)系的,如果小伙伴在看本文的過(guò)程中對(duì)有些概念不清楚绎狭,可以跳轉(zhuǎn)到上文進(jìn)行查看:
本文最主要的內(nèi)容就是解釋了:
一個(gè)分布式應(yīng)用是怎么異步做一個(gè)全局一致性快照细溅?
2.名詞解釋
? Single-Token conservation:一個(gè)最常見(jiàn)的分布式應(yīng)用,單 Token 流轉(zhuǎn)分布式應(yīng)用
? Process:指分布式應(yīng)用中的進(jìn)程儡嘶,舉個(gè) Flink 中的例子就是 TaskManager
? Channel:指分布式應(yīng)用中進(jìn)程之間的傳輸通道喇聊,舉個(gè) Flink 中的例子就是 TaskManager 之間傳輸數(shù)據(jù)的網(wǎng)絡(luò)傳輸通道
3.分布式應(yīng)用全局一致性快照要記錄的狀態(tài)內(nèi)容
首先在分析一個(gè)復(fù)雜的大數(shù)據(jù)應(yīng)用的全局一致性快照之前,我們先以最簡(jiǎn)單的分布式應(yīng)用為例蹦狂。
Single-Token conservation:其有 p 和 q 兩個(gè)進(jìn)程誓篱,p 可以通過(guò) Channel pq(記為 Cpq) 向 q 發(fā)消息,q 可以通過(guò) Channel qp(記為 Cqp) 向 p 發(fā)消息鸥咖,其中有一個(gè)叫 token 的消息燕鸽,在這個(gè)系統(tǒng)中一直不停的傳輸流轉(zhuǎn),從 p 到 q啼辣,再?gòu)?q 到 p啊研。
15
? 首先我們來(lái)分析這個(gè)應(yīng)用中,全局一致性快照應(yīng)該包含哪些內(nèi)容鸥拧?
? 結(jié)論:全局一致性快照?=?Process 狀態(tài)?+?Channel 狀態(tài)党远。
? 原因:以上面的四幅圖為例,每一幅圖代表一個(gè)時(shí)刻富弦,如果我們以拍照這種方式做全局一致性快照來(lái)理解時(shí)沟娱,那么同一時(shí)刻,Process 和 Channel 同時(shí)都會(huì)存在數(shù)據(jù)腕柜,這些數(shù)據(jù)都是作為全局一致性快照的一部分內(nèi)容济似。
使用上述的這個(gè)結(jié)論,我們可以得到上圖?Single-Token conservation?示例中的全局一致性快照?S = S(p) + S(Cpq) + S(q) + S(Cqp)
其中:
S:全局一致性快照
S(p):p 進(jìn)程的狀態(tài)
S(Cpq):p 進(jìn)程到 q 進(jìn)程的 Channel 狀態(tài)
S(q):q 進(jìn)程的狀態(tài)
S(Cqp):q 進(jìn)程到 p 進(jìn)程的 Channel 狀態(tài)
這里就碰到了我們要分析的關(guān)鍵問(wèn)題:做全局一致性快照時(shí)盏缤,小伙伴萌都容易理解S(p)砰蠢,S(q)這兩個(gè),因?yàn)檫@兩份狀態(tài)數(shù)據(jù)就真實(shí)的存在我們的分布式應(yīng)用中唉铜,但是S(Cpq)台舱,S(Cqp)這兩個(gè)怎么理解呢?這些數(shù)據(jù)都是在網(wǎng)絡(luò)中傳輸啊潭流,我們做全局一致性快照時(shí)用啥方法才能把這些數(shù)據(jù)也記錄下來(lái)呢竞惋?接下來(lái)詳細(xì)講講博主的理解
4.Process 狀態(tài)記錄的內(nèi)容
記錄和實(shí)際業(yè)務(wù)相關(guān)的狀態(tài)內(nèi)容柜去。舉例:id 去重就存儲(chǔ)歷史所有的 id 就可以了。
5.Channel 狀態(tài)記錄的內(nèi)容
還是以前文的?Single-Token conservation?為例:
token 在 p 時(shí)(對(duì)應(yīng)第一張圖)拆宛,這時(shí)的全局一致性快照為:
S(token-in-p) = S(p-token-in-p) + S(Cpq-token-in-p) + S(q-token-in-p) + S(Cqp-token-in-p)
其中:
S(token-in-p):token 在 p 時(shí)嗓奢,做的全局一致性快照
S(p-token-in-p):token 在 p 時(shí),p 進(jìn)程的狀態(tài)
S(Cpq-token-in-p):token 在 p 時(shí)胰挑,p 進(jìn)程到 q 進(jìn)程的 Channel 狀態(tài)
S(q-token-in-p):token 在 p 時(shí)蔓罚,q 進(jìn)程的狀態(tài)
S(Cqp-token-in-p):token 在 p 時(shí),q 進(jìn)程到 p 進(jìn)程的 Channel 狀態(tài)
其中 S(p-token-in-p) 好理解瞻颂,做快照時(shí)豺谈,token 還沒(méi)有從 p 發(fā)出去贡这,p 肯定知道 token 還在 p茬末;但是站在 Cpq 做狀態(tài)時(shí)來(lái)說(shuō):Cpq 做狀態(tài)時(shí),怎么保障 Cpq 知道 token in p盖矫?
在分析上面這個(gè)問(wèn)題前丽惭,博主先使用?數(shù)學(xué)的方式?分析一下 S(Cpq) 到底記錄了哪些內(nèi)容。
? 第一步:定義變量
n:在 p 的狀態(tài)記錄前辈双,p 記錄的 p 發(fā)往 Cpq 的 msg 數(shù)责掏;
n′:在 Cpq 的狀態(tài)記錄前,Cpq 記錄的 p 發(fā)往 Cpq 的 msg 數(shù)湃望;
m:在 q 的狀態(tài)記錄前换衬,q 記錄的 q 從 Cpq 中接收到的 msg 數(shù);
m′:在 Cpq 的狀態(tài)記錄前证芭,Cpq 記錄的 q 從 Cpq 中接收到的 msg 數(shù)瞳浦;
? 第二步:提出假設(shè)
假設(shè) Channel 和 Process 一樣,也可以自主的去將做快照時(shí) Channel 中進(jìn)行網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)作為狀態(tài)保存下來(lái)废士;對(duì)應(yīng)到上述案例中就是 Cpq 可以主動(dòng)的將做好的 S(Cpq) 狀態(tài)保存下來(lái)叫潦;
? 第三步:先說(shuō)結(jié)論
Cpq 記錄S(Cpq)時(shí),必然會(huì)有 n = n' ≥ m = m'官硝;
一個(gè) Channel 要記錄的狀態(tài)是矗蕊,它 sender 記錄自己狀態(tài)之前 channel 所接收到 sender 發(fā)的的 msg 列表,再減去 receiver 記錄自己狀態(tài)之前 channel 已經(jīng)發(fā)給 receiver 的 msg 列表氢架,減去的之后的 msg 就是還在 Channel 中的數(shù)據(jù)傻咖,這些數(shù)據(jù)是需要 Channel 作為狀態(tài)記錄下來(lái)的。
而如果 n′ = m′达箍,那么 Channel c 中要記錄的 msg 列表就是 empty 列表没龙。如果 n′ > m′铺厨,那么要記錄的列表是 (m′+1),…n′ 號(hào)消息對(duì)應(yīng)的 msg 列表缎玫。
? 第四步:給出證明
首先:n = n'硬纤,利用反證法:如果 n != n',則會(huì)有:
n > n' 時(shí)赃磨,假設(shè):
n = 10(p 記錄狀態(tài)前筝家,p 記錄 p 發(fā)往 Cpq msg 數(shù)為 10(msg 編號(hào) 1 - 10));
n' = 7(Cpq 記錄狀態(tài)前邻辉,Cpq 記錄 p 發(fā)往 Cpq 的 msg 數(shù)為 7(msg 編號(hào) 1 - 7))溪王;
那么假設(shè) token 這條 msg 的編號(hào)為 9,就會(huì)出現(xiàn) p 記錄的狀態(tài)為S(p-token-in-Cpq)值骇,Cpq 記錄的狀態(tài)為S(p-token-in-p)莹菱,實(shí)際這是不符合全局一致性快照的要求的;
n < n' 時(shí)吱瘩,假設(shè):
n = 7(p 記錄狀態(tài)前道伟,p 記錄 p 發(fā)往 Cpq msg 數(shù)為 7(編號(hào) 1 - 7));
n' = 10(Cpq 記錄狀態(tài)前使碾,Cpq 記錄 p 發(fā)往 Cpq 的 msg 數(shù)為 10(編號(hào) 1 - 10))蜜徽;
那么假設(shè) token 這條 msg 的編號(hào)為 9,就會(huì)出現(xiàn) p 記錄的狀態(tài)為S(p-token-in-p)票摇,Cpq 記錄的狀態(tài)為S(p-token-in-Cpq)拘鞋,實(shí)際這是不符合全局一致性快照的要求的;
n = n' 時(shí)矢门,假設(shè):
p 做出S(p-token-in-p)的狀態(tài)時(shí)盆色,因?yàn)?n = n',這就代表 p 沒(méi)有把 token 發(fā)出去颅和,Cpq 也沒(méi)有接受到 token傅事,Cpq 就知道 token 沒(méi)有發(fā)過(guò)來(lái),則只有這種情況可以滿足S(Cpq-token-in-p)峡扩;
其次:m = m'蹭越,同樣利用反證法可以得到,下文只舉 m > m' 的案例:
m > m' 時(shí):
n = n' = m = 10(q 記錄狀態(tài)前教届,Cpq 記錄 q 從 Cpq 接收到的 msg 數(shù)為 10(編號(hào) 1 - 10))响鹃;
m' = 7(Cpq記錄狀態(tài)前,Cpq 記錄的 q 從 Cpq 接收到的 msg 數(shù)為 7(編號(hào) 1 - 7))案训;
那么假設(shè) token 這條 msg 的編號(hào)為 9买置,就會(huì)出現(xiàn) Cpq 記錄的狀態(tài)為S(Cpq-token-in-Cpq),q 記錄的狀態(tài)為S(q-token-in-p)强霎,實(shí)際這是不符合全局一致性快照的要求的忿项;
最后:n' ≥ m' and n ≥ m:同樣可以利用反證法得到,此處不再舉例。
? 第五步:解答 2.4 節(jié)提出的 Cpq 怎么知道 token in p 的問(wèn)題
通過(guò)?n = n' ≥ m = m'?其實(shí)就可以推論出?Cpq 一定會(huì)知道 token in p轩触。
為了幫大家更容易的理解一個(gè)分布式應(yīng)用包含的全局一致性快照包含的數(shù)據(jù)內(nèi)容寞酿,接下來(lái)我用偽代碼描述一下,會(huì)比文字更好理解~
6.偽代碼描述一個(gè)分布式應(yīng)用全局一致快照包含的數(shù)據(jù)內(nèi)容
偽代碼如下:
//?S_all?即一個(gè)分布式應(yīng)用的全局一致性快照
S_all?=?null;
//?假設(shè)總共有?x?個(gè)?Process脱柱,S_all?先把所有?Process?的狀態(tài)記錄下來(lái)
for?(int?i?=?1;?i?<=?x;?i++)?{
//?第?i?個(gè)?Process?的狀態(tài)為?S_P_i伐弹,直接按照?+=?寫,勿噴
S_all?+=?S_P_i;
}
//?假設(shè)總共有?y?個(gè)?Channel榨为,S_all?把所有?Channel?的狀態(tài)記錄下來(lái)
for?(int?i?=?1;?i?<=?y;?i++)?{
// 1. S_C_i:第 i 個(gè) Channel 的狀態(tài)
// 2. m_i:第 i 個(gè) Channel 做快照前惨好,發(fā)往下游 Process 的消息(數(shù)據(jù))編號(hào),m_i 其實(shí)就是上文變量中的 m
// 3. n_i:第 i 個(gè) Channel 做快照前随闺,接受上游 Process 的消息(數(shù)據(jù))編號(hào)日川,n_i 其實(shí)就是上文變量中的 n
//?4.?需要注意,每一個(gè)?Channel?的?m_i?和?n_i?的數(shù)值都可能是不一樣的
// 5. Message[m_i + 1]?:代表編號(hào)為 m_i + 1 的那條消息(數(shù)據(jù))矩乐。舉例 Message[0]?代表編號(hào)為?0?的那條消息(數(shù)據(jù))
S_C_i?=?Message[m_i?+?1]?+?...?+?Message[n_i];
S_all?+=?S_C_i;
}
//?狀態(tài)做完了
7.怎樣去記錄 Channel 的狀態(tài)逗鸣?
通過(guò)上面的分析,我們已經(jīng)討論得到了S(Cpq)都包含了什么內(nèi)容绰精,并且其之間要滿足什么樣的數(shù)學(xué)關(guān)系撒璧。但是在現(xiàn)實(shí)實(shí)際生活中,消息在 Channel 上傳輸(光纖上傳輸)時(shí)笨使,我們是無(wú)法記錄這些消息作為 Channel 的狀態(tài)的卿樱。
那么有沒(méi)有什么思路可以讓我們也能夠去記錄 Channel 的消息呢?
當(dāng)然有硫椰。
因?yàn)橹灰覀兎植际綉?yīng)用的傳輸這些消息的光纖沒(méi)有被挖斷繁调,消息終究會(huì)通過(guò) Channel 到達(dá) Process 的,因此我們就可以自然的想到其實(shí)可以在消息傳輸?終點(diǎn)的 Process?去記錄這些消息作為 Channel 的狀態(tài)靶草。對(duì)應(yīng)到上述的 Single-Token conservation 案例來(lái)說(shuō)蹄胰,我們可以在 q 中記錄 Channel pq 的S(Cpq),在 p 中記錄 Channel pq 的S(Cqp)奕翔。
如果是按照這個(gè)思路去分析的話裕寨,上面那段偽代碼就可以簡(jiǎn)化為下面這樣:
//?S_all?即全局一致性快照
S_all?=?null;
//?假設(shè)總共有?x?個(gè)?Process
for?(int?i?=?1;?i?<=?x;?i++)?{
// S_i_all:第 i 個(gè) Process 要記錄的所有狀態(tài)
S_i_all?=?null;
// S_P_i:第 i 個(gè) process 的狀態(tài)
S_i_all?+=?S_P_i;//?【直接按照?+=?寫,勿噴】
//?第?i?個(gè)?Process?總共有?y?個(gè)輸入?channel派继,下文中?j?即指代第?i?個(gè)?Process?的第?j?個(gè)上游?Channel
for?(int?j?=?1;?j?<=?y;?j++)?{
// 1.S_C_j:第 j 個(gè) Channel 的狀態(tài)
// 2.m_j:第 j 個(gè) Channel 做快照前宾袜,發(fā)往下游 Process 的消息(數(shù)據(jù))編號(hào)
// 3.n_j:第 j 個(gè) Channel 做快照前,接受上游 Process 的消息(數(shù)據(jù))編號(hào)
S_C_j?=?Message[m_j?+?1]?+?...?+?Message[n_j];
S_i_all?+=?S_C_j;
}
S_all?+=?S_i_all;
}
//?狀態(tài)做完了
解釋一下上面的偽代碼:
S_all:所有的進(jìn)程記錄的狀態(tài)之和驾窟,即所有S_i_all之和
S_i_all:每一個(gè)進(jìn)程要記錄的所有狀態(tài)之和
Process i 在做S_i_all其實(shí)只有一個(gè)變量在做快照時(shí)是不知道到的庆猫,那就是 n_j(即第 i 個(gè) channel 做快照前,接受到 j(上游) 的消息個(gè)數(shù))
S_P_i是 Process 自己的狀態(tài)绅络,所以是明確已知的
S_C_j是 Channel 狀態(tài)月培,由Message[m_j + 1] + ... + Message[n_j]組成嘁字,其中 m_j 的含義是這個(gè) Channel j 在做快照時(shí)發(fā)給當(dāng)前 Process i 的消息編號(hào),前文已經(jīng)介紹到 m = m'杉畜,即 m_j 值就等于是是當(dāng)前 Process i 在做S_P_i時(shí)拳锚,接收到上游 Channel 的消息編號(hào),所以 m_j 是明確已知的寻行。但是 Process i 在做上游 Channel j 快照時(shí) n_j 是無(wú)法獲取到的。
那么 Process i 在做S_C_j如何獲取到 n_j 呢匾荆?
這里我們就可以想到 n = n':
因?yàn)?n = n'拌蜘,所以 n_j 的值就等于 Channel j 的上游 Process 在做快照時(shí),Process 發(fā)往 Channel j 的消息個(gè)數(shù)牙丽;
所以我們可以認(rèn)為 n_j 是 Channel j 的輸入 Process 在做快照時(shí)已知的一個(gè)值简卧,則只需要這個(gè) Process 把這個(gè)值發(fā)給 Process i 就行;
Channel j 的輸入 Process 告知 Process i 的方式其做完快照之后烤芦,直接發(fā)一個(gè) marker 下去举娩,這個(gè) marker 不會(huì)對(duì)計(jì)算有任何影響(即不會(huì)對(duì)狀態(tài)產(chǎn)生任何影響),marker 只是一個(gè)標(biāo)識(shí)构罗,當(dāng) Process i 接受到這個(gè) marker 時(shí)铜涉,就知道 n_j 的具體值了。
大家注意到了沒(méi)遂唧,這個(gè) marker 其實(shí)就對(duì)應(yīng)到了 Flink 中 Checkpoint 的 barrier芙代。
8.總結(jié)
本文主要講了一個(gè)分布式應(yīng)用異步做一個(gè)全局一致性快照的機(jī)制。
好盖彭,今天主要就講這么多纹烹,下集我們?cè)僬f(shuō)說(shuō) Chandy-Lambort,F(xiàn)link CP 和今天介紹的全局一致性快照原理的關(guān)系召边。
從盤古開(kāi)天辟地說(shuō)起為什么 Flink CP 能實(shí)現(xiàn)精確一次铺呵?(上)
爆肝 1 年,18w 字 Flink SQL 手冊(cè)隧熙,橫空出世 F摇!贞盯!(建議收藏)
(上)史上最全干貨宴卖!Flink SQL 成神之路(全文 18 萬(wàn)字、138 個(gè)案例邻悬、42 張圖)
(中)史上最全干貨症昏!Flink SQL 成神之路(全文 18 萬(wàn)字、138 個(gè)案例父丰、42 張圖)
(下)史上最全干貨肝谭!Flink SQL 成神之路(全文 18 萬(wàn)字掘宪、138 個(gè)案例、42 張圖)