1.高可用的關(guān)鍵機(jī)制
源碼詳解:DefaultCompletedCheckpointStore.addCheckpoint/tryRemoveCompletedCheckpoint
步驟 1:根據(jù)checkpointID獲取checkpoint path
步驟 2:在s3 path寫state數(shù)據(jù),接著修改configmap的中checkpoint信息即flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader的checkpointID-0000000000000102688
步驟 3:把checkpoint信息放到隊(duì)列里面谅河,然后根據(jù)需要保留的completecheckpoint數(shù)量(集群配置state.checkpoints.num-retained)田弥,刪除多余的completecheckpoint
public class DefaultCompletedCheckpointStore<R extends ResourceVersion<R>>
implements CompletedCheckpointStore {
// 主要是緩存completedCheckpoints的路徑
private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
@Override
public void addCheckpoint(
final CompletedCheckpoint checkpoint,
CheckpointsCleaner checkpointsCleaner,
Runnable postCleanup)
throws Exception {
// 省略...
// 1.首先根據(jù)checkpointID獲取checkpoint path
final String path = completedCheckpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID());
// 2.然后在s3 path寫state數(shù)據(jù)编检,接著修改configmap的中checkpoint信息
checkpointStateHandleStore.addAndLock(path, checkpoint);
// 3.最后把checkpoint信息放到隊(duì)列里面钞澳,然后根據(jù)需要保留的completecheckpoint數(shù)量
completedCheckpoints.addLast(checkpoint);
CheckpointSubsumeHelper.subsume(
completedCheckpoints,
maxNumberOfCheckpointsToRetain,
completedCheckpoint ->
tryRemoveCompletedCheckpoint(
completedCheckpoint,
completedCheckpoint.shouldBeDiscardedOnSubsume(),
checkpointsCleaner,
postCleanup));
// 省略...
}
private void tryRemoveCompletedCheckpoint(
CompletedCheckpoint completedCheckpoint,
boolean shouldDiscard,
CheckpointsCleaner checkpointsCleaner,
Runnable postCleanup)
throws Exception {
if (tryRemove(completedCheckpoint.getCheckpointID())) {
checkpointsCleaner.cleanCheckpoint(
completedCheckpoint, shouldDiscard, postCleanup, ioExecutor);
}
}
}
2.高可用數(shù)據(jù)詳解
2.1 高可用配置
① 采用 s3 作為狀態(tài)后端
設(shè)置 s3 協(xié)議的文件路徑作為狀態(tài)后端即 s3://bucket01/flink/savepoints
俭嘁、s3://bucket01/flink/checkpoints
宰啦,設(shè)置支持 s3 協(xié)議的集群即 s3.endpoint
、s3.access-key
和 s3.secret-key
奸例。
② 基于 Kubernetes 設(shè)置高可用配置
high-availability
設(shè)置為 org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory彬犯,
kubernetes.namespace
是指 kubernetes 的 namespace,kubernetes.service-account 是指 kubernetes 的serviceaccount查吊,high-availability.storageDir
采用 s3 地址谐区,最后 kubernetes.cluster-id
是設(shè)置了高可用 configmap 的前綴,例如 flink-dispatcher-leader逻卖、flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader 等
$kubectl get cm |grep flink
flink-config 5 4d19h
flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader 4 24d
flink-dispatcher-leader 4 28d
flink-resourcemanager-leader 2 28d
flink-restserver-leader 2 28d
$kubectl describe cm flink-config
Name: flink-config
Namespace: default
Labels: <none>
Annotations: <none>
Data
====
flink-conf.yaml:
----
省略...
#共享文件系統(tǒng)S3
s3.endpoint: http://service-minio:9000
s3.path.style.access: true
s3.access-key: admin
s3.secret-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
#狀態(tài)后端配置
state.backend: filesystem
state.checkpoints.dir: s3://bucket01/flink/checkpoints
state.savepoints.dir: s3://bucket01/flink/savepoints
#HA和k8s參數(shù)
kubernetes.namespace: default
kubernetes.cluster-id: flink
kubernetes.service-account: serviceaccount-flink
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://bucket01/flink/ha
2.2 集群 dispatcher 高可用數(shù)據(jù)
dispatcher 是管理作業(yè)的主節(jié)點(diǎn)宋列,高可用數(shù)據(jù)主要有 dispatcher 主節(jié)點(diǎn)的地址、非完成狀態(tài)的作業(yè)狀態(tài)和流圖保存地址箭阶,其中流圖保存地址是 Base64 編碼的虚茶。如下所示戈鲁,dispatcher 主節(jié)點(diǎn)是akka.tcp://flink@10.244.0.246:8123/user/rpc/dispatcher_1
仇参,作業(yè) 161511ce1fe78368bc659597e472fb7d
的狀態(tài)是 Running
,其流圖 jobGraph-161511ce1fe78368bc659597e472fb7d
保存在 s3://bucket01/flink/ha/default/submittedJobGraph307e2d6a5be8
說明:利用 OS 的 Base64 編解碼工具婆殿,例如诈乒,編碼是
echo "mmsc" | openssl base64 -e
,解碼是echo "bW1zYwo=" | openssl base64 -d
$kubectl describe cm flink-dispatcher-leader
Name: flink-dispatcher-leader
Namespace: default
Labels: app=flink
configmap-type=high-availability
type=flink-native-kubernetes
Annotations: control-plane.alpha.kubernetes.io/leader:
{"holderIdentity":"f7978fe5-962d-4037-aa23-19ff522afbff","leaseDuration":15.000000000,"acquireTime":"2022-05-19T15:09:39.272000Z","renewTi...
Data
====
runningJobsRegistry-161511ce1fe78368bc659597e472fb7d:
----
RUNNING
sessionId:
----
942d4a50-c31f-47fb-939b-94b14a1121fc
address:
----
akka.tcp://flink@10.244.0.246:8123/user/rpc/dispatcher_1
jobGraph-161511ce1fe78368bc659597e472fb7d:
----
rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAADcrNzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAN3MzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvc3VibWl0dGVkSm9iR3JhcGgzMDdlMmQ2YTViZTh4
Events: <none>
$echo "rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAADcrNzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAN3MzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvc3VibWl0dGVkSm9iR3JhcGgzMDdlMmQ2YTViZTh4" | openssl base64 -d
??sr;org.apache.flink.runtime.state.RetrievableStreamStateHandle?U?+LwrappedStreamStateHandlet2Lorg/apache/flink/runtime/state/StreamStateHandle;xpsr9org.apache.flink.runtime.state.filesystem.FileStateHandle?u?b?J stateSizefilePathtLorg/apache/flink/core/fs/Path;xpr?srorg.apache.flink.core.fs.PathLuritLjava/net/URI;xpsr
java.net.URI?x.C?I?LstringtLjava/lang/String;xpt7s3://bucket01/flink/ha/default/submittedJobGraph307e2d6a5be8
2.3 作業(yè)的 jobmanager 高可用數(shù)據(jù)
作業(yè)的高可用數(shù)據(jù)主要有 作業(yè)管理節(jié)點(diǎn)的地址婆芦、當(dāng)前作業(yè)的checkpoint 最新數(shù)據(jù)的保存地址怕磨,其中checkpoint 保存地址是 Base64 編碼的。如下所示消约,作業(yè)管理節(jié)點(diǎn)是akka.tcp://flink@10.244.0.246:8123/user/rpc/jobmanager_2
肠鲫,該作業(yè)最新的 checkpoint 是 checkpointID-0000000000000102688
,其保存地址是 s3://bucket01/flink/ha/default/completedCheckpointf07724c0946a
$kubectl describe cm flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader
Name: flink-161511ce1fe78368bc659597e472fb7d-jobmanager-leader
Namespace: default
Labels: app=flink
configmap-type=high-availability
type=flink-native-kubernetes
Annotations: control-plane.alpha.kubernetes.io/leader:
{"holderIdentity":"f7978fe5-962d-4037-aa23-19ff522afbff","leaseDuration":15.000000000,"acquireTime":"2022-05-19T15:09:39.988000Z","renewTi...
Data
====
address:
----
akka.tcp://flink@10.244.0.246:8123/user/rpc/jobmanager_2
checkpointID-0000000000000102688:
----
rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAAAFyhzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAOXMzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvY29tcGxldGVkQ2hlY2twb2ludGYwNzcyNGMwOTQ2YXg=
counter:
----
102689
sessionId:
----
766ea025-af00-4b6b-8700-a80c9fa2a4e5
Events: <none>
$echo "rO0ABXNyADtvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuUmV0cmlldmFibGVTdHJlYW1TdGF0ZUhhbmRsZQABHhjxVZcrAgABTAAYd3JhcHBlZFN0cmVhbVN0YXRlSGFuZGxldAAyTG9yZy9hcGFjaGUvZmxpbmsvcnVudGltZS9zdGF0ZS9TdHJlYW1TdGF0ZUhhbmRsZTt4cHNyADlvcmcuYXBhY2hlLmZsaW5rLnJ1bnRpbWUuc3RhdGUuZmlsZXN5c3RlbS5GaWxlU3RhdGVIYW5kbGUE3HXYYr0bswIAAkoACXN0YXRlU2l6ZUwACGZpbGVQYXRodAAfTG9yZy9hcGFjaGUvZmxpbmsvY29yZS9mcy9QYXRoO3hwAAAAAAAAFyhzcgAdb3JnLmFwYWNoZS5mbGluay5jb3JlLmZzLlBhdGgAAAAAAAAAAQIAAUwAA3VyaXQADkxqYXZhL25ldC9VUkk7eHBzcgAMamF2YS5uZXQuVVJJrAF4LkOeSasDAAFMAAZzdHJpbmd0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQAOXMzOi8vcnRhL2ZsaW5rL2hhL2RlZmF1bHQvY29tcGxldGVkQ2hlY2twb2ludGYwNzcyNGMwOTQ2YXg=" | openssl base64 -d
??sr;org.apache.flink.runtime.state.RetrievableStreamStateHandle?U?+LwrappedStreamStateHandlet2Lorg/apache/flink/runtime/state/StreamStateHandle;xpsr9org.apache.flink.runtime.state.filesystem.FileStateHandle?u?b?J stateSizefilePathtLorg/apache/flink/core/fs/Path;xp(srorg.apache.flink.core.fs.PathLuritLjava/net/URI;xpsr
java.net.URI?x.C?I?LstringtLjava/lang/String;xpt9s3://bucket01/flink/ha/default/completedCheckpointf07724c0946a