Zookeeper HA相關(guān)配置
## 使用zk做HA
high-availability: zookeeper
## zk地址
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
## flink在zk下的工作路徑
high-availability.zookeeper.path.root: /flink
## 任務(wù)所在的HA路徑
high-availability.cluster-id: /default
## 保存元數(shù)據(jù)到文件系統(tǒng)
high-availability.storageDir: hdfs:///flink/recovery
## --任務(wù)運(yùn)行在YARN上的配置--
## applicationMaster重試的次數(shù)灶平,默認(rèn)為1,當(dāng)application master失敗的時(shí)候雅宾,該任務(wù)不會(huì)重啟养涮。
## 設(shè)置一個(gè)比較大的值的話,yarn會(huì)嘗試重啟applicationMaster眉抬。
yarn.application-attempts: 10
## flink是否應(yīng)該重新分配失敗的taskmanager容器贯吓。默認(rèn)是true。
yarn.reallocate-failed:true
## applicationMaster可以接受的容器最大失敗次數(shù)蜀变,達(dá)到這個(gè)參數(shù)悄谐,就會(huì)認(rèn)為yarn job失敗。
## 默認(rèn)這個(gè)次數(shù)和初始化請(qǐng)求的taskmanager數(shù)量相等(-n 參數(shù)指定的)库北。
yarn.maximum-failed-containers:1
flink使用Zookeeper做HA
flink的ResourceManager爬舰、Dispatcher、JobManager寒瓦、WebServer組件都需要高可用保證情屹,同時(shí)flink高可用還需要持久化checkpoint的元數(shù)據(jù)信息,保留最近一次已經(jīng)完成的checkpoint等工作杂腰,其中最重要的就是組件的leader選舉垃你、leader狀態(tài)跟蹤。本次抽取出Flink使用zk實(shí)現(xiàn)leader選舉喂很、leader狀態(tài)跟蹤代碼蜡镶,學(xué)習(xí)下flink是如何使用curator的。類之間的關(guān)系如下:
ZooKeeperHaServices是HighAvailabilityServices基于zookeeper的實(shí)現(xiàn)恤筛,通過(guò)使用ZooKeeperUtils類來(lái)創(chuàng)建組件的LeaderRetrievalService以及LeaderElectionService官还。
LeaderRetrievalService用來(lái)跟蹤leader的變化,當(dāng)發(fā)現(xiàn)leader地址變化時(shí)毒坛,要通知依賴它的組件去依賴新的leader望伦。比如getResourceManagerLeaderRetriever方法,flink會(huì)監(jiān)聽(tīng)zk的/leader/resource_manager_lock節(jié)點(diǎn)內(nèi)容變化煎殷,內(nèi)容是rm的leader地址和leaderUUID,而taskmanger調(diào)用該服務(wù)的start方法傳遞了一個(gè)LeaderRetrievalListener屯伞。如果節(jié)點(diǎn)內(nèi)容發(fā)生變化,意味著rm的leader地址發(fā)生變化,那么的LeaderRetrievalListener的notifyLeaderAddress就會(huì)通知taskmanger去新的ResourceManager地址進(jìn)行注冊(cè)豪直。zk實(shí)現(xiàn)該功能使用的是curator的NodeCache并重寫(xiě)了nodeChanged方法劣摇。
LeaderElectionService用來(lái)進(jìn)行l(wèi)eader選舉工作,當(dāng)節(jié)點(diǎn)成為leader后會(huì)調(diào)用LeaderContender的grantLeadership方法弓乙。以ResourceManagerLeaderElection為例末融,flink會(huì)在zk的/leaderlatch/resource_manager_lock路徑下創(chuàng)建臨時(shí)節(jié)點(diǎn)钧惧,創(chuàng)建成功的rm節(jié)點(diǎn)成為leader觸發(fā)rm的grantLeadership,最終將當(dāng)前地址和UUID寫(xiě)入/leader/resource_manager_lock中勾习,這樣就觸發(fā)了LeaderRetrievalService服務(wù)浓瞪。zk實(shí)現(xiàn)leader選舉使用的是curator的LeaderLatch并重寫(xiě)了isLeader和notLeader方法。同時(shí)使用NodeCache監(jiān)聽(tīng)/leader/resource_manager_lock內(nèi)容變化巧婶,確保新leader地址和UUID成功寫(xiě)入節(jié)點(diǎn)乾颁。
LeaderRetrievalListener對(duì)LeaderRetrievalService的leader地址變化做出響應(yīng),通過(guò)notifyLeaderAddress傳遞新leader地址艺栈。
LeaderContender對(duì)LeaderElectionService的節(jié)點(diǎn)角色發(fā)生變化做出響應(yīng)英岭,通過(guò)grantLeadership和revokeLeadership進(jìn)行l(wèi)eader的授權(quán)和撤銷工作。
一個(gè)集群目錄下的zk結(jié)構(gòu)如下圖所示:
flink相關(guān)源碼
簡(jiǎn)單的走一下流程湿右,看看集群?jiǎn)?dòng)時(shí)是如何創(chuàng)建ZooKeeperHaServices的诅妹。
集群?jiǎn)?dòng)入口ClusterEntrypoint
- 根據(jù)集群的部署模式session or perjob由對(duì)應(yīng)的子類調(diào)用ClusterEntrypoint的startCluster方法啟動(dòng)集群,接著會(huì)先調(diào)用initializeServices方法诅需,啟動(dòng)集群相關(guān)的組件信息漾唉。這里只看啟動(dòng)haServices部分。
public void startCluster() throws ClusterEntrypointException {
SecurityContext securityContext = installSecurityContext(configuration);
securityContext.runSecured((Callable<Void>) () -> {
runCluster(configuration);
return null;
});
}
protected void initializeServices(Configuration configuration) {
ioExecutor = Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("cluster-io"));
haServices = createHaServices(configuration, ioExecutor);
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
}
}
- 根據(jù)high-availability配置創(chuàng)建ZooKeeperHaServices,默認(rèn)情況下為NONE堰塌。
protected HighAvailabilityServices createHaServices(
Configuration configuration,
Executor executor) throws Exception {
//創(chuàng)建HA服務(wù)時(shí)不需要地址解析
return HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
}
//根據(jù)傳遞的high-availability配置赵刑,選擇創(chuàng)建哪種HA服務(wù),默認(rèn)為NONE
public static HighAvailabilityServices createHighAvailabilityServices(
Configuration configuration,
Executor executor,
AddressResolution addressResolution) throws Exception {
//獲取high-availability配置 如:zookeeper
HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
switch (highAvailabilityMode) {
case NONE:
final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);
final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
JobMaster.JOB_MANAGER_NAME,
addressResolution,
configuration);
final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
ResourceManager.RESOURCE_MANAGER_NAME,
addressResolution,
configuration);
final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
Dispatcher.DISPATCHER_NAME,
addressResolution,
configuration);
final String address = checkNotNull(configuration.getString(RestOptions.ADDRESS),
"%s must be set",
RestOptions.ADDRESS.key());
final int port = configuration.getInteger(RestOptions.PORT);
final boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration);
final String protocol = enableSSL ? "https://" : "http://";
return new StandaloneHaServices(
resourceManagerRpcUrl,
dispatcherRpcUrl,
jobManagerRpcUrl,
String.format("%s%s:%s", protocol, address, port));
case ZOOKEEPER:
//元數(shù)據(jù)存儲(chǔ)服務(wù) 我們通常使用FileSystemBlobStore 路徑就是 high-availability.storageDir: hdfs:///flink/recovery
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
// 使用 ZooKeeper做HA服務(wù)
return new ZooKeeperHaServices(
ZooKeeperUtils.startCuratorFramework(configuration),
executor,
configuration,
blobStoreService);
case FACTORY_CLASS:
return createCustomHAServices(configuration, executor);
default:
throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
}
}
- ZooKeeperHaServices主要提供了創(chuàng)建LeaderRetrievalService和LeaderElectionService方法场刑,并給出了各個(gè)服務(wù)組件使用的ZK節(jié)點(diǎn)名稱般此。別看是以_lock結(jié)尾,這個(gè)節(jié)點(diǎn)名稱既在leaderlatcher做leader選舉的分布式鎖產(chǎn)生的路徑牵现,又在leader目錄下用來(lái)存放leader的地址信息铐懊。
private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";
private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock";
private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
// web展示服務(wù)
private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock";
// 創(chuàng)建ResourceManagerLeaderRetriever,對(duì)RM的leader地址變化進(jìn)行跟蹤
public LeaderRetrievalService getResourceManagerLeaderRetriever() {
return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
// 創(chuàng)建ResourceManagerLeaderElectionService瞎疼,對(duì)RMleader掛掉后重新進(jìn)行選舉
public LeaderElectionService getResourceManagerLeaderElectionService() {
return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}
- ZooKeeperUtils創(chuàng)建LeaderRetrievalService流程科乎。
- 接收curator客戶端以及服務(wù)在zk下的節(jié)點(diǎn)路徑,創(chuàng)建出ZooKeeperLeaderRetrievalService(ZKLRS)對(duì)象贼急。
- ZKLRS這個(gè)對(duì)象就是對(duì)zk節(jié)點(diǎn)的內(nèi)容進(jìn)行了監(jiān)聽(tīng)茅茂,當(dāng)內(nèi)容發(fā)生變化時(shí),通知給通過(guò)start方法傳遞過(guò)來(lái)的LeaderRetrievalListener太抓。
public void start(LeaderRetrievalListener listener) throws Exception {
synchronized (lock) {
//leader發(fā)生變化時(shí)空闲,通知對(duì)應(yīng)的LeaderRetrievalListener
leaderListener = listener;
// 異常時(shí)調(diào)用當(dāng)前對(duì)象的unhandledError方法
client.getUnhandledErrorListenable().addListener(this);
// 使用NodeCache監(jiān)聽(tīng)節(jié)點(diǎn)內(nèi)容變化
cache.getListenable().addListener(this);
cache.start();
// 對(duì)會(huì)話連接狀態(tài)進(jìn)行跟蹤
client.getConnectionStateListenable().addListener(connectionStateListener);
running = true;
}
}
- 通過(guò)重寫(xiě)nodeChanged方法,來(lái)獲取Leader變更后的地址走敌,并傳遞新的地址
public void nodeChanged() throws Exception {
synchronized (lock) {
if (running) {
try {
LOG.debug("Leader node has changed.");
ChildData childData = cache.getCurrentData();
String leaderAddress;
UUID leaderSessionID;
if (childData == null) {
leaderAddress = null;
leaderSessionID = null;
} else {
byte[] data = childData.getData();
if (data == null || data.length == 0) {
leaderAddress = null;
leaderSessionID = null;
} else {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bais);
// leader 地址
leaderAddress = ois.readUTF();
// leader uuid
leaderSessionID = (UUID) ois.readObject();
}
}
// leader 地址發(fā)生變化
if (!(Objects.equals(leaderAddress, lastLeaderAddress) &&
Objects.equals(leaderSessionID, lastLeaderSessionID))) {
lastLeaderAddress = leaderAddress;
lastLeaderSessionID = leaderSessionID;
// 傳遞新的leaderAddress和leaderSessionID
leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);
}
} catch (Exception e) {
leaderListener.handleError(new Exception("Could not handle node changed event.", e));
throw e;
}
} else {
LOG.debug("Ignoring node change notification since the service has already been stopped.");
}
}
}
- ZooKeeperUtils創(chuàng)建ZooKeeperLeaderElectionService流程碴倾。
- 傳遞leader所在的zk路徑、選舉時(shí)臨時(shí)節(jié)點(diǎn)創(chuàng)建的zk路徑。之所以要傳遞leader節(jié)點(diǎn)是要在新leader產(chǎn)生時(shí)跌榔,將新leader的地址和uuid寫(xiě)入异雁。
public static ZooKeeperLeaderElectionService createLeaderElectionService(
final CuratorFramework client,
final Configuration configuration,
final String pathSuffix) {
// 在leaderlatch節(jié)點(diǎn)下進(jìn)行選舉
final String latchPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) + pathSuffix;
// leader節(jié)點(diǎn)
final String leaderPath = configuration.getString(
HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;
return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
}
- 通過(guò)調(diào)用start方法傳遞LeaderContender,并開(kāi)啟leader選舉矫户。
public void start(LeaderContender contender) throws Exception {
synchronized (lock) {
// 綁定異常處理監(jiān)聽(tīng)器
client.getUnhandledErrorListenable().addListener(this);
// 傳遞Contender競(jìng)爭(zhēng)者
leaderContender = contender;
//開(kāi)啟leader選舉服務(wù)片迅,成為leader的節(jié)點(diǎn)會(huì)觸發(fā)isleader
leaderLatch.addListener(this);
leaderLatch.start();
//監(jiān)聽(tīng)leader節(jié)點(diǎn)內(nèi)容變化
cache.getListenable().addListener(this);
cache.start();
client.getConnectionStateListenable().addListener(listener);
running = true;
}
}
- 當(dāng)某一Contender成為leader后残邀,會(huì)觸發(fā)grantLeadership傳遞新leader的uuid進(jìn)行授權(quán)皆辽,并調(diào)用LeaderElectionService的confirmLeaderSessionID,將新leader地址寫(xiě)入leader節(jié)點(diǎn)芥挣。
public void confirmLeaderSessionID(UUID leaderSessionID) {
// 是Leader
if (leaderLatch.hasLeadership()) {
// check if this is an old confirmation call
synchronized (lock) {
if (running) {
if (leaderSessionID.equals(this.issuedLeaderSessionID)) {
confirmedLeaderSessionID = leaderSessionID;
// 將confirmLeaderSessionID寫(xiě)到 leader目錄下
writeLeaderInformation(confirmedLeaderSessionID);
}
}
}
}
}
- 寫(xiě)入時(shí)會(huì)觸發(fā)當(dāng)前對(duì)象的nodeChanged方法驱闷,該方法用來(lái)確保新leader地址和uuid成功寫(xiě)入。
public void nodeChanged() throws Exception {
try {
// leaderSessionID is null if the leader contender has not yet confirmed the session ID
if (leaderLatch.hasLeadership()) { // leader
synchronized (lock) {
if (running) {
// 當(dāng)選為leader 已經(jīng)被確認(rèn)
if (confirmedLeaderSessionID != null) {
ChildData childData = cache.getCurrentData();
// 沒(méi)寫(xiě)進(jìn)去空免,再寫(xiě)一次
if (childData == null) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Writing leader information into empty node by {}.",
leaderContender.getAddress());
}
writeLeaderInformation(confirmedLeaderSessionID);
} else {
byte[] data = childData.getData();
if (data == null || data.length == 0) {
// the data field seems to be empty, rewrite information
writeLeaderInformation(confirmedLeaderSessionID);
} else {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bais);
String leaderAddress = ois.readUTF();
UUID leaderSessionID = (UUID) ois.readObject();
if (!leaderAddress.equals(this.leaderContender.getAddress()) ||
(leaderSessionID == null || !leaderSessionID.equals(confirmedLeaderSessionID))) {
writeLeaderInformation(confirmedLeaderSessionID);
}
}
}
}
} else {
// leader未確認(rèn)confirmedLeaderSessionID
LOG.debug("Ignoring node change notification since the service has already been stopped.");
}
}
}
} catch (Exception e) {
...
}
}
writeLeaderInformation用來(lái)寫(xiě)入leader地址和uuid,寫(xiě)入時(shí)先判斷l(xiāng)eader節(jié)點(diǎn)是否由當(dāng)前l(fā)eader會(huì)話創(chuàng)建的空另,如果不是則刪除后重寫(xiě)創(chuàng)建。
protected void writeLeaderInformation(UUID leaderSessionID) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
// leader 地址
oos.writeUTF(leaderContender.getAddress());
// leader的 UUID
oos.writeObject(leaderSessionID);
oos.close();
boolean dataWritten = false;
while (!dataWritten && leaderLatch.hasLeadership()) {
Stat stat = client.checkExists().forPath(leaderPath);
if (stat != null) {
long owner = stat.getEphemeralOwner();
long sessionID = client.getZookeeperClient().getZooKeeper().getSessionId();
//節(jié)點(diǎn)由當(dāng)前會(huì)話創(chuàng)建
if (owner == sessionID) {
try {
client.setData().forPath(leaderPath, baos.toByteArray());
dataWritten = true;
} catch (KeeperException.NoNodeException noNode) {
// node was deleted in the meantime
}
} else {
try {
// 不是當(dāng)前節(jié)點(diǎn)創(chuàng)建則先刪除
client.delete().forPath(leaderPath);
} catch (KeeperException.NoNodeException noNode) {
// node was deleted in the meantime --> try again
}
}
} else {
try {
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
leaderPath,
baos.toByteArray());
dataWritten = true;
} catch (KeeperException.NodeExistsException nodeExists) {
// node has been created in the meantime --> try again
}
}
}
}
}
本次學(xué)習(xí)了flink如何使用curator來(lái)操作zk節(jié)點(diǎn)蹋砚,實(shí)現(xiàn)leader選舉和leader狀態(tài)跟蹤扼菠。LeaderRetrievalListener和LeaderContender兩個(gè)接口更像是這一部分功能的輸入和輸出,來(lái)跟蹤leader的變化情況坝咐。而中間部分對(duì)zk節(jié)點(diǎn)的操作和狀態(tài)監(jiān)聽(tīng)循榆,則可以抽取出來(lái)在自己的項(xiàng)目中使用。