概述
Flink內(nèi)部有一些服務(wù)是JobMaster和TaskExecutor共用的欺嗤。如HighAvailabilityServices, RpcService, ActorSystem(MetricQueryService), HeartbeatServices, MetricRegistryImpl, BlobCacheService.
HighAvailabilityServices 的作用
- 提供 Leader 獲取服務(wù)(ResourceManager, Dispatcher, JobManager, WebMonitor)
- 提供 Leader 選舉服務(wù)(同上)
- 提供Checkpoint恢復(fù)工廠類(獲取已完成的Checkpoint的元信息次泽,以及Checkpoint計(jì)數(shù)器)
- 提供SubmittedJobGraphStore亿虽,用來保存和恢復(fù)JobGraph
- 提供大文件(Blob)的高可用存儲(chǔ)
- 提供(RunningJobsRegistry),任務(wù)狀態(tài)信息的保存與獲取
HighAvailabilityServices 的使用者
- ClusterEntrypoint
- TaskManagerRunner
- ClusterClient
HighAvailabilityServices 的創(chuàng)建
-
HighAvailabilityServices 的創(chuàng)建是通過HighAvailabilityServicesUtils這個(gè)工具類,這個(gè)工具類提供了兩個(gè)重要的靜態(tài)方法來生成HighAvailabilityServices 。
第一個(gè)是 createAvailableOrEmbeddedServices(Configuration config, Executor executor)劲够,主要用于創(chuàng)建MiniCluster,服務(wù)于測(cè)試和本地運(yùn)行休傍。
-
第二個(gè)是 createHighAvailabilityServices(Configuration configuration, Executor executor, AddressResolution addressResolution)征绎,相比于第一個(gè)方法,它的參數(shù)還需要AddressResolution 磨取。
/** * Enum specifying whether address resolution should be tried or not when creating the * {@link HighAvailabilityServices}. */ public enum AddressResolution { TRY_ADDRESS_RESOLUTION, NO_ADDRESS_RESOLUTION }
- TRY_ADDRESS_RESOLUTION和NO_ADDRESS_RESOLUTION人柿,分別代表是否需要解析地址。用于在非HA環(huán)境下的直接解析地址忙厌,如果hostname不存在則快速失敗凫岖。在ClusterEntrypoint中由于是本地,不需要解析逢净,而在TaskManagerRunner與ClusterClient中使用了TRY_ADDRESS_RESOLUTION哥放,因?yàn)橐粋€(gè)是負(fù)責(zé)執(zhí)行具體任務(wù)歼指,另一個(gè)則是用戶的客戶端。
- 這個(gè)方法首先獲取高可用模式(HighAvailabilityMode)甥雕,分別是無高可用踩身,基于Zookeeper的高可用,以及自己定制的高可用模式犀农。
public enum HighAvailabilityMode { NONE(false), ZOOKEEPER(true), FACTORY_CLASS(true); }
- 其中None模式JobManager地址是固定的惰赋,所以直接從Configuration中獲取地址并生成一個(gè)StandaloneHaServices。
- Zookeeper模式會(huì)先創(chuàng)建BlobStorService呵哨,就是一個(gè)高可用的大文件持久化服務(wù),這個(gè)服務(wù)將文件保存在high-availability.storageDir配置的位置轨奄,并在Zookeeper上保存元信息孟害。
case ZOOKEEPER: BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration); return new ZooKeeperHaServices( ZooKeeperUtils.startCuratorFramework(configuration), executor, configuration, blobStoreService);
- Custom模式需要用戶自己實(shí)現(xiàn)HighAvailabilityServicesFactory
LeaderElectionService & LeaderRetrievalService
- LeaderElectionService和LeaderRetrievalService分別提供了某個(gè)組件參加Leader選舉和獲取其他組件Leader的功能。(組件包括ResourceManager, Dispatcher, JobManager, WebMonitor)挪拟。
-
LeaderElectionService
接口如下所示, start方法就是將當(dāng)前的組件加入Leader選舉挨务,上述四個(gè)組件都是現(xiàn)了LeaderContender接口。
-
當(dāng)某個(gè)組件被選舉為leader時(shí)玉组,會(huì)回調(diào)該組件實(shí)現(xiàn)的grantLeadership方法(第一次被選舉為leader)谎柄,當(dāng)某個(gè)組件不再是leader時(shí),會(huì)回調(diào)該組件實(shí)現(xiàn)的revokeLeadership方法惯雳。
public interface LeaderElectionService { void start(LeaderContender contender) throws Exception; void stop() throws Exception; void confirmLeaderSessionID(UUID leaderSessionID); boolean hasLeadership(@Nonnull UUID leaderSessionId); } public interface LeaderContender { void grantLeadership(UUID leaderSessionID); void revokeLeadership(); String getAddress(); void handleError(Exception exception); }
-
LeaderRetrievalService
- LeaderRetrievalService 非常簡(jiǎn)潔朝巫,提供了start和stop方法,并且start方法只能被調(diào)用一次石景,在ZK模式中因?yàn)樗粫?huì)監(jiān)聽一條ZK上的路徑(即一個(gè)組件的變化)劈猿。
- 在啟動(dòng)LeaderRetrievalService的方法中需要接收參數(shù)LeaderRetrievalListener,將實(shí)現(xiàn)這個(gè)接口的類的實(shí)例作為參數(shù)傳入這個(gè)方法潮孽,在相應(yīng)組件leader發(fā)生變化時(shí)會(huì)回調(diào)notifyLeaderAddress方法揪荣,在LeaderRetrievalService拋出異常的時(shí)候會(huì)調(diào)用handleError方法。
public interface LeaderRetrievalService { void start(LeaderRetrievalListener listener) throws Exception; void stop() throws Exception; } public interface LeaderRetrievalListener { void notifyLeaderAddress(@Nullable String leaderAddress, @Nullable UUID leaderSessionID); void handleError(Exception exception); }
HighAvailabilityServices 的典型實(shí)現(xiàn) ZooKeeperHaServices
ZooKeeperHaServices的Constructor需要接受四個(gè)參數(shù)往史,分別為CuratorFramework, Executor, Configuration, BlobStoreService
在HighAvailabilityServices創(chuàng)建中仗颈,已經(jīng)介紹了BlobStoreService的作用,此處要再介紹一下是創(chuàng)建CuratorFramework的方法ZooKeeperUtils.startCuratorFramework(configuration)
-
CuratorFramework創(chuàng)建
- 下圖是如何通過Builder創(chuàng)建CuratorFramework椎例,詳情可以閱讀Zookeeper客戶端Curator使用詳解
一文挨决,這里會(huì)介紹這些參數(shù)是如何配置的CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(zkQuorum) .sessionTimeoutMs(sessionTimeout) .connectionTimeoutMs(connectionTimeout) .retryPolicy(new ExponentialBackoffRetry(retryWait, maxRetryAttempts)) // Curator prepends a '/' manually and throws an Exception if the // namespace starts with a '/'. .namespace(rootWithNamespace.startsWith("/") ? rootWithNamespace.substring(1) : rootWithNamespace) .aclProvider(aclProvider) .build();
- zkQuorum對(duì)應(yīng)配置中的high-availability.zookeeper.quorum,即Zookeeper的地址
- sessionTimeout對(duì)應(yīng)配置中的high-availability.zookeeper.client.session-timeout粟矿,單位為毫秒凰棉,默認(rèn)60000即一分鐘,ZK會(huì)話的超時(shí)時(shí)間
- connectionTimeout對(duì)應(yīng)配置中的high-availability.zookeeper.client.connection-timeout陌粹,單位為毫秒撒犀,默認(rèn)15000即15秒,ZK的連接超時(shí)時(shí)間
- 重試策略為ExponentialBackoffRetry,從概率上來講隨著重試次數(shù)越來越多或舞,重試間隔呈指數(shù)級(jí)增長(zhǎng)
- retryWait對(duì)應(yīng)配置中的high-availability.zookeeper.client.retry-wait荆姆,即基礎(chǔ)的間隔時(shí)間
- maxRetryAttempts對(duì)應(yīng)配置中的high-availability.zookeeper.client.max-retry-attempts,即最大重試次數(shù)
- rootWithNamespace由root和namespace(clusterId)拼成映凳,root對(duì)應(yīng)配置中的high-availability.zookeeper.path.root胆筒,默認(rèn)為/flink, namespace對(duì)應(yīng)配置中的high-availability.cluster-id, 在Yarn模式下也就是applicationId
- aclProvider默認(rèn)使用DefaultACLProvider,相關(guān)的配置有zookeeper.sasl.disable(默人false)和high-availability.zookeeper.client.acl(默認(rèn)open)
- 下圖是如何通過Builder創(chuàng)建CuratorFramework椎例,詳情可以閱讀Zookeeper客戶端Curator使用詳解
Executor是用來執(zhí)行ZooKeeperCompletedCheckpointStore移除CompletedCheckpoints的任務(wù)的诈豌。
-
在介紹LeaderElectionService和LeaderRetrievalService的ZK實(shí)現(xiàn)之前仆救,先看一個(gè)flink cluster在zookeeper中的目錄結(jié)構(gòu),如下圖
ZK目錄結(jié)構(gòu) -
ZookeeperLeaderElectionService
-
接口
- ZooKeeperLeaderElectionService除了實(shí)現(xiàn)LeaderElectionService以外矫渔,還實(shí)現(xiàn)了LeaderLatchListener彤蔽,NodeCacheListener,UnhandledErrorListener三個(gè)屬于curator的接口庙洼。
- LeaderLatchListener需要實(shí)現(xiàn)類實(shí)現(xiàn)兩個(gè)回調(diào)方法顿痪,如下
public interface LeaderLatchListener { void isLeader(); void notLeader(); }
- 當(dāng)被監(jiān)聽的對(duì)象(此處即為該ZookeeperLeaderElectionService實(shí)例)被選為leader時(shí),isLeader實(shí)現(xiàn)的邏輯會(huì)被調(diào)用
- 當(dāng)失去leader位置時(shí)油够,notLeader會(huì)被調(diào)用
- NodeCacheListener只有一個(gè)方法蚁袭,如下
public interface NodeCacheListener { void nodeChanged() throws Exception; }
- 當(dāng)監(jiān)測(cè)的節(jié)點(diǎn)狀態(tài)發(fā)生變化時(shí),nodeChanged會(huì)被調(diào)用石咬,在此處是保存了LeaderContender地址和LeaderSessionID的節(jié)點(diǎn)
- UnhandledErrorListener接口需要實(shí)現(xiàn)一個(gè)方法揩悄,如下
public interface UnhandledErrorListener { void unhandledError(String var1, Throwable var2); }
- 當(dāng)后臺(tái)操作發(fā)生異常時(shí)觸發(fā)unhandledError方法,在flink各個(gè)組件的實(shí)現(xiàn)中也把這當(dāng)做fatal error來處理
-
創(chuàng)建
- ZookeeperLeaderElectionService的創(chuàng)建通過工具類ZookeeperUtils的createLeaderElectionService方法碌补,如下虏束。
public static ZooKeeperLeaderElectionService createLeaderElectionService( final CuratorFramework client, final Configuration configuration, final String pathSuffix) { final String latchPath = configuration.getString( HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) + pathSuffix; final String leaderPath = configuration.getString( HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix; return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath); }
- 該方法接受的參數(shù)其中client(CuratorFramework)的創(chuàng)建上一個(gè)小節(jié)介紹了。還有pathSuffix則對(duì)應(yīng)的是各個(gè)組件厦章,分別如下, 與leader和leaderlatch目錄下一一對(duì)應(yīng)
private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock"; private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock"; private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock"; private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock";
- 方法體中的HA_ZOOKEEPER_LATCH_PATH對(duì)應(yīng)flink配置中的high-availability.zookeeper.path.latch镇匀,默認(rèn)值為/leaderlatch,HA_ZOOKEEPER_LEADER_PATH對(duì)應(yīng)flink配置中的high-availability.zookeeper.path.leader袜啃,默認(rèn)為/leader汗侵。此處latchpath與leaderpath就與上圖中flink集群在zk下的目錄一一對(duì)應(yīng)了起來。
- 在ZookeeperLeaderElectionService的構(gòu)造方法如下
public ZooKeeperLeaderElectionService(CuratorFramework client, String latchPath, String leaderPath) { this.client = Preconditions.checkNotNull(client, "CuratorFramework client"); this.leaderPath = Preconditions.checkNotNull(leaderPath, "leaderPath"); leaderLatch = new LeaderLatch(client, latchPath); cache = new NodeCache(client, leaderPath); issuedLeaderSessionID = null; confirmedLeaderSessionID = null; leaderContender = null; running = false; }
- 其中LeaderLatch是Curator的針對(duì)某一條zk路徑的leader選舉實(shí)現(xiàn)群发,NodeCache是Curator監(jiān)控某一條zk路徑的變化的實(shí)現(xiàn)晰韵,在此處只是分別根據(jù)latchpath和leaderpath初始化了對(duì)象,還沒有啟動(dòng)監(jiān)聽熟妓。
- 有兩個(gè)重要的類型為UUID的成員變量被初始化為null雪猪,分別是issuedLeaderSessionID和confirmedLeaderSessionID。這兩個(gè)變量在leader選舉過程中起到非常重要的作用起愈。
-
啟動(dòng)
- 在啟動(dòng)LeaderElectionService時(shí)只恨,會(huì)將實(shí)現(xiàn)LeaderContender(參與選舉)的實(shí)例傳入译仗,基于zk的方法實(shí)現(xiàn)如下
public void start(LeaderContender contender) throws Exception { Preconditions.checkNotNull(contender, "Contender must not be null."); Preconditions.checkState(leaderContender == null, "Contender was already set."); LOG.info("Starting ZooKeeperLeaderElectionService {}.", this); synchronized (lock) { client.getUnhandledErrorListenable().addListener(this); leaderContender = contender; leaderLatch.addListener(this); leaderLatch.start(); cache.getListenable().addListener(this); cache.start(); client.getConnectionStateListenable().addListener(listener); running = true; } }
- 在啟動(dòng)方法中,將當(dāng)前LeaderElection的對(duì)象作為L(zhǎng)istener加入LeaderLatch官觅,NodeCache和CuratorFramework的UnhandleError中纵菌,并啟動(dòng)前兩個(gè)服務(wù),并將Running置為true休涤。
-
過程
- 過程主要包含了被選舉為leader咱圆,不再是leader和Cache節(jié)點(diǎn)改變
- 被選舉為leader,如接口小節(jié)所述功氨,isLeader方法會(huì)被調(diào)用序苏,此時(shí)會(huì)生成一個(gè)UUID作為issuedLeaderSessionID,并作為調(diào)用LeaderContender(參與選舉的組件)的grantLeadership方法的參數(shù)捷凄。而LeaderContender則會(huì)通過confirmedLeaderSessionID來進(jìn)行確認(rèn)杠览,只有與issuedLeaderSessionID相同,confirmedLeaderSessionID才會(huì)更新纵势,并將leader信息寫入對(duì)應(yīng)的leaderPath的節(jié)點(diǎn)中。
- 不再是leader管钳,如接口小節(jié)所述钦铁,notLeader方法會(huì)被調(diào)用,此時(shí)會(huì)將issuedLeaderSessionID和confirmLeaderSessionID置為null才漆,并調(diào)用LeaderContender的revokeLeadership方法通知該組件已經(jīng)失去leader位置牛曹。
- Cache節(jié)點(diǎn)改變時(shí),nodeChanged方法會(huì)被調(diào)用醇滥,首先判斷是否為leader黎比,如果是的話則判斷confirmedLeaderSessionID是否為空,如果不為空則將其連同LeaderContender的地址寫入leaderpath下的zk臨時(shí)節(jié)點(diǎn)鸳玩。
- 過程主要包含了被選舉為leader咱圆,不再是leader和Cache節(jié)點(diǎn)改變
-
停止
- 在停止方法中LeaderContender將退出選舉阅虫。具體實(shí)現(xiàn)是將啟動(dòng)方法中添加的listener移除并關(guān)閉LeaderLatch和NodeCache,并將成員變量的引用置null不跟。
-
-
創(chuàng)建ZookeeperLeaderRetrievalService
- 接口颓帝,ZooKeeperLeaderRetrievalService實(shí)現(xiàn)了LeaderRetrievalService,NodeCacheListener和UnhandledErrorListener接口窝革,這三個(gè)接口在上文都已經(jīng)介紹過购城。
- 創(chuàng)建
- 因?yàn)長(zhǎng)eaderRetrievalService功能相對(duì)比較簡(jiǎn)單,只需要在leader切換時(shí)獲取相關(guān)組件的Leader的地址和leaderSessionID虐译,所以只創(chuàng)建了NodeCache來監(jiān)測(cè)retrievalPath的變化(此處retrievalPath與參與選舉的組件的leaderPath)相同瘪板,并緩存了lastLeaderAddress和lastLeaderSessionID,防止在leader并沒有改變的情況下觸發(fā)listener的notifyLeaderAddress漆诽。
- 啟動(dòng)
- 啟動(dòng)方法將Listener加入U(xiǎn)nhandledError和NodeCache的監(jiān)聽并啟動(dòng)NodeCache侮攀,在CuratorFramework出錯(cuò)或者監(jiān)測(cè)的retrievalPath節(jié)點(diǎn)發(fā)生變化或能收到回調(diào)锣枝。
- 過程
- 當(dāng)監(jiān)測(cè)的retrievalPath發(fā)生變化時(shí),nodeChanged會(huì)被調(diào)用魏身,在該方法體中惊橱,會(huì)從這個(gè)NodeCache(zk節(jié)點(diǎn))中獲取數(shù)據(jù),與lastLeaderAddress和lastLeaderSessionID進(jìn)行比對(duì)箭昵,如果發(fā)生變化會(huì)更新這兩個(gè)變量并調(diào)用Listner的notifyLeaderAddress税朴,通知新的leader地址與leaderSessionID.
- 停止
- 在停止方法中中止監(jiān)聽,具體實(shí)現(xiàn)是將listener移除家制,并關(guān)閉NodeCache正林。
-
CheckpointRecoveryFactory
- CheckpointRecoveryJob 一是提供了根據(jù)JobID和maxNumberOfCheckpointsToRetain(也就是保存的歷史checpkpoint文件的個(gè)數(shù))來生成CompletedCheckpointStore的方法,二是提供了根據(jù)JobID生成CheckpointIDCounter的方法颤殴。在本文中不會(huì)多做介紹觅廓,后續(xù)如果寫到失敗恢復(fù)的文章的話會(huì)詳細(xì)介紹。CompletedCheckpointStore本質(zhì)上主要是提供獲取高可用存儲(chǔ)下備份的JobGraph進(jìn)行任務(wù)恢復(fù)的方法涵但。
- ZookeeperCheckpointRecoveryJob的提供CompletedCheckpointStore的實(shí)現(xiàn)中具體存儲(chǔ)方式是將在高可用文件系統(tǒng)(如HDFS)上保存的Checkpoint文件的地址存儲(chǔ)在/flink/cluster_id/checkpoints/路徑下杈绸。其中ZK的路徑由配置中的high-availability.zookeeper.path.checkpoints參數(shù)來制定,文件系統(tǒng)上存儲(chǔ)的路徑由配置中的high-availability.storageDir指定矮瘟。
- ZooKeeperCheckpointRecoveryFactory中提供CheckpointIDCounter是通過Curator的SharedCount來實(shí)現(xiàn)的瞳脓,是一個(gè)高可用的計(jì)數(shù)器,路徑由配置中high-availability.zookeeper.path.checkpoint-counter來指定澈侠,默認(rèn)是/checkpoint-counter
-
SubmittedJobGraphStore
- SubmittedJobGraphStore提供了將JobGraph高可用文件系統(tǒng)上的保存和移除功能劫侧,以及根據(jù) JobID獲取所要恢復(fù)的任務(wù)的JobGraph功能。但是在zk的目錄和hdfs上的目錄下我都沒有找到相應(yīng)的文件哨啃,這邊先略過烧栋,有機(jī)會(huì)補(bǔ)上。
-
RunningJobsRegistry
- ZK實(shí)現(xiàn)RunningJobsRegistry負(fù)責(zé)在ZK節(jié)點(diǎn)上登記所有集群中運(yùn)行的Job的狀態(tài)拳球,三種狀態(tài)分別為RUNNING审姓,PENDING和FINISHED。ZK上的路徑可以通過high-availability.zookeeper.path.running-registry來指定醇坝。
總結(jié)
- 本文簡(jiǎn)單的介紹了一下Flink高可用服務(wù)的功能邑跪,和基于ZK的典型實(shí)現(xiàn)。其中SubmittedJobGraphStore部分的實(shí)際運(yùn)行不符合我的預(yù)期呼猪,后續(xù)有機(jī)會(huì)更正画畅。