JobManager職責
JobManager的職責主要是接收Flink作業(yè)匾委,調(diào)度Task破花,收集作業(yè)狀態(tài)和管理TaskManager。它包含一個Actor慰照,并且接收如下信息:
- RegisterTaskManager: 它由想要注冊到JobManager的TaskManager發(fā)送剃盾。注冊成功會通過AcknowledgeRegistration消息進行Ack腺占。
- SubmitJob: 由提交作業(yè)到系統(tǒng)的Client發(fā)送。提交的信息是JobGraph形式的作業(yè)描述信息痒谴。
- CancelJob: 請求取消指定id的作業(yè)衰伯。成功會返回CancellationSuccess,否則返回CancellationFailure积蔚。
- UpdateTaskExecutionState: 由TaskManager發(fā)送意鲸,用來更新執(zhí)行節(jié)點(ExecutionVertex)的狀態(tài)。成功則返回true尽爆,否則返回false怎顾。
- RequestNextInputSplit: TaskManager上的Task請求下一個輸入split,成功則返回NextInputSplit漱贱,否則返回null槐雾。
- JobStatusChanged: 它意味著作業(yè)的狀態(tài)(RUNNING, CANCELING, FINISHED,等)發(fā)生變化。這個消息由ExecutionGraph發(fā)送幅狮。
JobManager啟動過程
代碼在org.apache.flink.runtime.jobmanager.JobManager.scala文件中募强,入口是main方法,通過腳本啟動JobManager時彪笼,調(diào)用的就是main方法钻注。main方法是通過調(diào)用runJobManager方法來啟動JobManager的蚂且,下面我們主要看下runJobManager方法配猫。
def runJobManager(
configuration: Configuration,
executionMode: JobManagerMode,
listeningAddress: String,
listeningPort: Int)
: Unit = {
....
// 首先啟動JobManager的ActorSystem,因為如果端口號是0杏死,它決定了使用哪個端口泵肄,并會更新相應的配置捆交。
val jobManagerSystem = startActorSystem(
configuration,
listeningAddress,
listeningPort)
// 創(chuàng)建高可靠的服務,比如通過ZooKeeper配置了多個JobManager
val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
ioExecutor,
AddressResolution.NO_ADDRESS_RESOLUTION)
...
//啟動JobManager的所有組件腐巢,包括library緩存品追,實例管理和調(diào)度器,最終啟動 JobManager Actor冯丙。
val (_, _, webMonitorOption, _) = try {
startJobManagerActors(
jobManagerSystem,
configuration,
executionMode,
listeningAddress,
futureExecutor,
ioExecutor,
highAvailabilityServices,
metricRegistry,
classOf[JobManager],
classOf[MemoryArchivist],
Option(classOf[StandaloneResourceManager])
)
} catch {
case t: Throwable =>
futureExecutor.shutdownNow()
ioExecutor.shutdownNow()
throw t
}
// 阻塞肉瓦,直到系統(tǒng)退出
jobManagerSystem.awaitTermination()
webMonitorOption.foreach {
webMonitor =>
try {
webMonitor.stop()
} catch {
case t: Throwable =>
LOG.warn("Could not properly stop the web monitor.", t)
}
}
try {
highAvailabilityServices.close()
} catch {
case t: Throwable =>
LOG.warn("Could not properly stop the high availability services.", t)
}
try {
metricRegistry.shutdown()
} catch {
case t: Throwable =>
LOG.warn("Could not properly shut down the metric registry.", t)
}
ExecutorUtils.gracefulShutdown(
timeout.toMillis,
TimeUnit.MILLISECONDS,
futureExecutor,
ioExecutor)
}
JobManager高可用性
目前JobManager的高可用性模式分為兩種:
- NONE:意味著沒有高可用性,只有一個JobManager節(jié)點胃惜。
- ZooKeeper:通過ZooKeeper實現(xiàn)高可用性泞莉,多個JobManager節(jié)點組成一個集群,通過ZooKeeper選舉出master節(jié)點船殉,由master節(jié)點提供服務鲫趁,其它節(jié)點作為備份。
當使用NONE模式時利虫,只有一個JobManager節(jié)點提供服務挨厚,且JobManager不會保存提交的jar包信息,將Checkpoint和metadata信息保存在Java堆或者本地文件系統(tǒng)中糠惫,因此意味著沒有搞可用性疫剃。
而使用ZooKeeper模式時,有一個Master和多個Standby節(jié)點硼讽,當Master故障時慌申,Standby節(jié)點會通過選舉產(chǎn)生新的Master節(jié)點。這樣不會產(chǎn)生單點故障理郑,只要有新的Master生成蹄溉,程序可以繼續(xù)執(zhí)行。Standby JobManager和Master JobManager實例之間沒有明確區(qū)別您炉。 每個JobManager可以成為Master或Standby節(jié)點柒爵。
舉例,使用三個JobManager節(jié)點的情況下赚爵,進行以下設(shè)置:
使用NONE或ZooKeeper模式棉胀,通過如下配置進行設(shè)置:
high-availability: none/zookeeper
高可用服務創(chuàng)建過程
public static HighAvailabilityServices createHighAvailabilityServices(
Configuration configuration,
Executor executor,
AddressResolution addressResolution) throws Exception {
HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);
switch(highAvailabilityMode) {
case NONE: //NONE模式
final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);
final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
JobMaster.JOB_MANAGER_NAME,
addressResolution,
configuration);
final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
ResourceManager.RESOURCE_MANAGER_NAME,
addressResolution,
configuration);
final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
hostnamePort.f0,
hostnamePort.f1,
Dispatcher.DISPATCHER_NAME,
addressResolution,
configuration);
return new StandaloneHaServices(
resourceManagerRpcUrl,
dispatcherRpcUrl,
jobManagerRpcUrl);
case ZOOKEEPER: //ZOOKEEPER模式
//存儲JobManager Metadata 數(shù)據(jù)的Service
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
//基于ZooKeeper的JobManager 高可用Service
return new ZooKeeperHaServices(
ZooKeeperUtils.startCuratorFramework(configuration),
executor,
configuration,
blobStoreService);
default:
throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
}
}