【Flink on k8s】高可用的關(guān)鍵機(jī)制及configmap數(shù)據(jù)詳解

1.高可用的關(guān)鍵機(jī)制

源碼詳解:DefaultCompletedCheckpointStore.addCheckpoint/tryRemoveCompletedCheckpoint
步驟 1:根據(jù)checkpointID獲取checkpoint path
步驟 2:在s3 path寫state數(shù)據(jù),接著修改configmap的中checkpoint信息即flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader的checkpointID-0000000000000102688
步驟 3:把checkpoint信息放到隊(duì)列里面谅河,然后根據(jù)需要保留的completecheckpoint數(shù)量(集群配置state.checkpoints.num-retained)田弥,刪除多余的completecheckpoint

public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
        implements CompletedCheckpointStore {
    
    // 主要是緩存completedCheckpoints的路徑
    private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;

    @Override
    public void addCheckpoint(
            final CompletedCheckpoint checkpoint,
            CheckpointsCleaner checkpointsCleaner,
            Runnable postCleanup)
            throws Exception {
        // 省略...

        // 1.首先根據(jù)checkpointID獲取checkpoint path
        final String path = completedCheckpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID());
        // 2.然后在s3 path寫state數(shù)據(jù)编检,接著修改configmap的中checkpoint信息
        checkpointStateHandleStore.addAndLock(path, checkpoint);
        
        // 3.最后把checkpoint信息放到隊(duì)列里面钞澳,然后根據(jù)需要保留的completecheckpoint數(shù)量
        completedCheckpoints.addLast(checkpoint);
        CheckpointSubsumeHelper.subsume(
                completedCheckpoints,
                maxNumberOfCheckpointsToRetain,
                completedCheckpoint ->
                        tryRemoveCompletedCheckpoint(
                                completedCheckpoint,
                                completedCheckpoint.shouldBeDiscardedOnSubsume(),
                                checkpointsCleaner,
                                postCleanup));
        // 省略...
    }

    

    private void tryRemoveCompletedCheckpoint(
            CompletedCheckpoint completedCheckpoint,
            boolean shouldDiscard,
            CheckpointsCleaner checkpointsCleaner,
            Runnable postCleanup)
            throws Exception {
        if (tryRemove(completedCheckpoint.getCheckpointID())) {
            checkpointsCleaner.cleanCheckpoint(
                    completedCheckpoint, shouldDiscard, postCleanup, ioExecutor);
        }
    }
}

2.高可用數(shù)據(jù)詳解

2.1 高可用配置

① 采用 s3 作為狀態(tài)后端

設(shè)置 s3 協(xié)議的文件路徑作為狀態(tài)后端即 s3://bucket01/flink/savepoints俭嘁、s3://bucket01/flink/checkpoints宰啦,設(shè)置支持 s3 協(xié)議的集群即 s3.endpoints3.access-keys3.secret-key奸例。

② 基于 Kubernetes 設(shè)置高可用配置

high-availability 設(shè)置為 org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory彬犯,
kubernetes.namespace 是指 kubernetes 的 namespace,kubernetes.service-account 是指 kubernetes 的serviceaccount查吊,high-availability.storageDir 采用 s3 地址谐区,最后 kubernetes.cluster-id 是設(shè)置了高可用 configmap 的前綴,例如 flink-dispatcher-leader逻卖、flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader 等

$kubectl get cm |grep flink
flink-config                                              5      4d19h
flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader   4      24d
flink-dispatcher-leader                                    4      28d
flink-resourcemanager-leader                               2      28d
flink-restserver-leader                                    2      28d
$kubectl describe cm flink-config
Name:         flink-config
Namespace:    default
Labels:       <none>
Annotations:  <none>

Data
====
flink-conf.yaml:
----
省略...
#共享文件系統(tǒng)S3
s3.endpoint: http://service-minio:9000
s3.path.style.access: true
s3.access-key: admin
s3.secret-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
#狀態(tài)后端配置
state.backend: filesystem
state.checkpoints.dir: s3://bucket01/flink/checkpoints
state.savepoints.dir: s3://bucket01/flink/savepoints
#HA和k8s參數(shù)
kubernetes.namespace: default
kubernetes.cluster-id: flink
kubernetes.service-account: serviceaccount-flink
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://bucket01/flink/ha

2.2 集群 dispatcher 高可用數(shù)據(jù)

dispatcher 是管理作業(yè)的主節(jié)點(diǎn)宋列,高可用數(shù)據(jù)主要有 dispatcher 主節(jié)點(diǎn)的地址非完成狀態(tài)的作業(yè)狀態(tài)和流圖保存地址箭阶,其中流圖保存地址是 Base64 編碼的虚茶。如下所示戈鲁,dispatcher 主節(jié)點(diǎn)是akka.tcp://flink@10.244.0.246:8123/user/rpc/dispatcher_1仇参,作業(yè) 161511ce1fe78368bc659597e472fb7d 的狀態(tài)是 Running ,其流圖 jobGraph-161511ce1fe78368bc659597e472fb7d 保存在 s3://bucket01/flink/ha/default/submittedJobGraph307e2d6a5be8

說明:利用 OS 的 Base64 編解碼工具婆殿,例如诈乒,編碼是 echo "mmsc" | openssl base64 -e,解碼是 echo "bW1zYwo=" | openssl base64 -d

$kubectl describe cm flink-dispatcher-leader
Name:         flink-dispatcher-leader
Namespace:    default
Labels:       app=flink
              configmap-type=high-availability
              type=flink-native-kubernetes
Annotations:  control-plane.alpha.kubernetes.io/leader:
                {"holderIdentity":"f7978fe5-962d-4037-aa23-19ff522afbff","leaseDuration":15.000000000,"acquireTime":"2022-05-19T15:09:39.272000Z","renewTi...

Data
====
runningJobsRegistry-161511ce1fe78368bc659597e472fb7d:
----
RUNNING
sessionId:
----
942d4a50-c31f-47fb-939b-94b14a1121fc
address:
----
akka.tcp://flink@10.244.0.246:8123/user/rpc/dispatcher_1
jobGraph-161511ce1fe78368bc659597e472fb7d:
----
rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAADcrNzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAN3MzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvc3VibWl0dGVkSm9iR3JhcGgzMDdlMmQ2YTViZTh4
Events:  <none>
$echo "rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAADcrNzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAN3MzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvc3VibWl0dGVkSm9iR3JhcGgzMDdlMmQ2YTViZTh4" | openssl base64 -d
??sr;org.apache.flink.runtime.state.RetrievableStreamStateHandle?U?+LwrappedStreamStateHandlet2Lorg/apache/flink/runtime/state/StreamStateHandle;xpsr9org.apache.flink.runtime.state.filesystem.FileStateHandle?u?b?J  stateSizefilePathtLorg/apache/flink/core/fs/Path;xpr?srorg.apache.flink.core.fs.PathLuritLjava/net/URI;xpsr
java.net.URI?x.C?I?LstringtLjava/lang/String;xpt7s3://bucket01/flink/ha/default/submittedJobGraph307e2d6a5be8

2.3 作業(yè)的 jobmanager 高可用數(shù)據(jù)

作業(yè)的高可用數(shù)據(jù)主要有 作業(yè)管理節(jié)點(diǎn)的地址婆芦、當(dāng)前作業(yè)的checkpoint 最新數(shù)據(jù)的保存地址怕磨,其中checkpoint 保存地址是 Base64 編碼的。如下所示消约,作業(yè)管理節(jié)點(diǎn)是akka.tcp://flink@10.244.0.246:8123/user/rpc/jobmanager_2肠鲫,該作業(yè)最新的 checkpoint 是 checkpointID-0000000000000102688,其保存地址是 s3://bucket01/flink/ha/default/completedCheckpointf07724c0946a

$kubectl describe cm flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader
Name:         flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader
Namespace:    default
Labels:       app=flink
              configmap-type=high-availability
              type=flink-native-kubernetes
Annotations:  control-plane.alpha.kubernetes.io/leader:
                {"holderIdentity":"f7978fe5-962d-4037-aa23-19ff522afbff","leaseDuration":15.000000000,"acquireTime":"2022-05-19T15:09:39.988000Z","renewTi...

Data
====
address:
----
akka.tcp://flink@10.244.0.246:8123/user/rpc/jobmanager_2
checkpointID-0000000000000102688:
----
rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAAAFyhzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAOXMzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvY29tcGxldGVkQ2hlY2twb2ludGYwNzcyNGMwOTQ2YXg=
counter:
----
102689
sessionId:
----
766ea025-af00-4b6b-8700-a80c9fa2a4e5
Events:  <none>
$echo "rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAAAFyhzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAOXMzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvY29tcGxldGVkQ2hlY2twb2ludGYwNzcyNGMwOTQ2YXg=" | openssl base64 -d
??sr;org.apache.flink.runtime.state.RetrievableStreamStateHandle?U?+LwrappedStreamStateHandlet2Lorg/apache/flink/runtime/state/StreamStateHandle;xpsr9org.apache.flink.runtime.state.filesystem.FileStateHandle?u?b?J  stateSizefilePathtLorg/apache/flink/core/fs/Path;xp(srorg.apache.flink.core.fs.PathLuritLjava/net/URI;xpsr
java.net.URI?x.C?I?LstringtLjava/lang/String;xpt9s3://bucket01/flink/ha/default/completedCheckpointf07724c0946a
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末或粮,一起剝皮案震驚了整個(gè)濱河市导饲,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖渣锦,帶你破解...
    沈念sama閱讀 221,576評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件硝岗,死亡現(xiàn)場離奇詭異,居然都是意外死亡袋毙,警方通過查閱死者的電腦和手機(jī)型檀,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來听盖,“玉大人胀溺,你說我怎么就攤上這事〗钥矗” “怎么了月幌?”我有些...
    開封第一講書人閱讀 168,017評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長悬蔽。 經(jīng)常有香客問我扯躺,道長,這世上最難降的妖魔是什么蝎困? 我笑而不...
    開封第一講書人閱讀 59,626評(píng)論 1 296
  • 正文 為了忘掉前任录语,我火速辦了婚禮,結(jié)果婚禮上禾乘,老公的妹妹穿的比我還像新娘澎埠。我一直安慰自己,他們只是感情好始藕,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,625評(píng)論 6 397
  • 文/花漫 我一把揭開白布蒲稳。 她就那樣靜靜地躺著,像睡著了一般伍派。 火紅的嫁衣襯著肌膚如雪江耀。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,255評(píng)論 1 308
  • 那天诉植,我揣著相機(jī)與錄音祥国,去河邊找鬼。 笑死晾腔,一個(gè)胖子當(dāng)著我的面吹牛舌稀,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播灼擂,決...
    沈念sama閱讀 40,825評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼壁查,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了剔应?” 一聲冷哼從身側(cè)響起睡腿,我...
    開封第一講書人閱讀 39,729評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤康谆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后嫉到,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體沃暗,經(jīng)...
    沈念sama閱讀 46,271評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,363評(píng)論 3 340
  • 正文 我和宋清朗相戀三年何恶,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了孽锥。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,498評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡细层,死狀恐怖惜辑,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情疫赎,我是刑警寧澤盛撑,帶...
    沈念sama閱讀 36,183評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站捧搞,受9級(jí)特大地震影響抵卫,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜胎撇,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,867評(píng)論 3 333
  • 文/蒙蒙 一介粘、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧晚树,春花似錦姻采、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至宝鼓,卻和暖如春刑棵,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背席函。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評(píng)論 1 272
  • 我被黑心中介騙來泰國打工铐望, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留冈涧,地道東北人茂附。 一個(gè)月前我還...
    沈念sama閱讀 48,906評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像督弓,于是被迫代替她去往敵國和親营曼。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,507評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容