利用碎片時(shí)間閱讀了一下Flink的源碼侦鹏,選擇Flink主要出發(fā)點(diǎn)還是了解一個(gè)穩(wěn)定的分布式計(jì)算系統(tǒng)的實(shí)現(xiàn)适袜,另外也是由于Flink相對(duì)更加成熟的Spark有其獨(dú)到的優(yōu)勢(shì)苞氮,相信其在下一代分布式計(jì)算中也會(huì)占有重要的地位盼产。Flink的主要概念可以在官網(wǎng)了解
Flink系統(tǒng)作業(yè)的提交和調(diào)度都是利用AKKA的Actor通信,因此也是由此作為切入點(diǎn)礼华,首先理清整個(gè)系統(tǒng)的啟動(dòng)以及作業(yè)提交的流程和數(shù)據(jù)流咐鹤。
圖中可以看到,一個(gè)完整的Flink系統(tǒng)由三個(gè)Actor System構(gòu)成卓嫂,包括Client慷暂、JobManager(JM)以及TaskManager(TM)。下面對(duì)三個(gè)Actor系統(tǒng)的創(chuàng)建進(jìn)行分析。
JM ActorSystem
JM是Flink系統(tǒng)的調(diào)度中心行瑞,這部分除了會(huì)看到JM ActorSystem的創(chuàng)建奸腺,還會(huì)了解到整個(gè)Flink系統(tǒng)的各個(gè)模塊的初始化與運(yùn)行。
先找程序入口血久,從啟動(dòng)腳本可以追溯到突照,每一個(gè)啟動(dòng)腳本最終都會(huì)運(yùn)行flink_deamon.sh 腳本,查看該腳本:
...
...
case $DAEMON in
(jobmanager)
CLASS_TO_RUN=org.apache.flink.runtime.jobmanager.JobManager
;;
(taskmanager)
CLASS_TO_RUN=org.apache.flink.runtime.taskmanager.TaskManager
;;
(zookeeper)
CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
;;
(*)
echo "Unknown daemon '${DAEMON}'. $USAGE."
exit 1
;;
esac
$JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 < /dev/null &
...
...
由此找到JM的程序入口:org.apache.flink.runtime.jobmanager.JobManager.scala氧吐,代碼中可以找到main函數(shù)讹蘑,調(diào)用runJobManager方法:
def runJobManager(
configuration: Configuration,
executionMode: JobManagerMode,
listeningAddress: String,
listeningPort: Int)
: Unit = {
//startActorSystemAndJobManagerActors返回jobManagerSystem
val (jobManagerSystem, _, _, webMonitorOption, _) = startActorSystemAndJobManagerActors(
configuration,
executionMode,
listeningAddress,
listeningPort,
classOf[JobManager],
classOf[MemoryArchivist],
Option(classOf[StandaloneResourceManager])
)
// 阻塞,直到系統(tǒng)退出
jobManagerSystem.awaitTermination()
webMonitorOption.foreach{
webMonitor =>
try {
webMonitor.stop()
} catch {
case t: Throwable =>
LOG.warn("Could not properly stop the web monitor.", t)
}
}
}
runJobManager方法邏輯比較簡(jiǎn)單筑舅,調(diào)用startActorSystemAndJobManagerActors方法中創(chuàng)建ActorSystem和JMActor座慰,然后阻塞等待系統(tǒng)退出,看具體的JM創(chuàng)建過(guò)程:
def startActorSystemAndJobManagerActors(
configuration: Configuration,
executionMode: JobManagerMode,
listeningAddress: String,
listeningPort: Int,
jobManagerClass: Class[_ <: JobManager],
archiveClass: Class[_ <: MemoryArchivist],
resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
: (ActorSystem, ActorRef, ActorRef, Option[WebMonitor], Option[ActorRef]) = {
LOG.info("Starting JobManager")
// Bring up the job manager actor system first, bind it to the given address.
val hostPortUrl = NetUtils.hostAndPortToUrlString(listeningAddress, listeningPort)
LOG.info(s"Starting JobManager actor system at $hostPortUrl")
val jobManagerSystem = try {
val akkaConfig = AkkaUtils.getAkkaConfig(
configuration,
Some((listeningAddress, listeningPort))
)
if (LOG.isDebugEnabled) {
LOG.debug("Using akka configuration\n " + akkaConfig)
}
AkkaUtils.createActorSystem(akkaConfig)//創(chuàng)建ActorSystem全局僅有一個(gè)
}
catch {
...
...
}
...
...//此處省略webMonitor的創(chuàng)建
try {
// bring up the job manager actor
LOG.info("Starting JobManager actor")
val (jobManager, archive) = startJobManagerActors(
configuration,
jobManagerSystem,
jobManagerClass,
archiveClass)
// start a process reaper that watches the JobManager. If the JobManager actor dies,
// the process reaper will kill the JVM process (to ensure easy failure detection)
LOG.debug("Starting JobManager process reaper")
jobManagerSystem.actorOf(
Props(
classOf[ProcessReaper],
jobManager,
LOG.logger,
RUNTIME_FAILURE_RETURN_CODE),
"JobManager_Process_Reaper")
// bring up a local task manager, if needed
if (executionMode == JobManagerMode.LOCAL) {
LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode")
val taskManagerActor = TaskManager.startTaskManagerComponentsAndActor(
configuration,
ResourceID.generate(),
jobManagerSystem,
listeningAddress,
Some(TaskManager.TASK_MANAGER_NAME),
None,
localTaskManagerCommunication = true,
classOf[TaskManager])
LOG.debug("Starting TaskManager process reaper")
jobManagerSystem.actorOf(
Props(
classOf[ProcessReaper],
taskManagerActor,
LOG.logger,
RUNTIME_FAILURE_RETURN_CODE),
"TaskManager_Process_Reaper")
}
...
...
(jobManagerSystem, jobManager, archive, webMonitor, resourceManager)
}
...
...
}
這里可以看到startActorSystemAndJobManagerActors方法中利用AkkaUtils和flinkConfig創(chuàng)建了全局的ActorSystem翠拣,AkkaUtils也是對(duì)Actor創(chuàng)建的簡(jiǎn)單封裝版仔,這里不再贅述。緊接著利用剛創(chuàng)建的jobManagerSystem和jobManager的類名:jobManagerClass創(chuàng)建jobManager误墓。除了jobManager以外蛮粮,該方法中還創(chuàng)建了Flink的其他重要模塊,從返回值中可以清楚看到谜慌。另外本地模式啟動(dòng)方式下然想,還會(huì)創(chuàng)建本地的啟動(dòng)本地的taskManagerActor。繼續(xù)深入到startJobManagerActors欣范,該方法接收jobManagerSystem等參數(shù)变泄,創(chuàng)建jobManager和archive并返回:
def startJobManagerActors(
configuration: Configuration,
actorSystem: ActorSystem,
jobManagerActorName: Option[String],
archiveActorName: Option[String],
jobManagerClass: Class[_ <: JobManager],
archiveClass: Class[_ <: MemoryArchivist])
: (ActorRef, ActorRef) = {
val (executorService: ExecutorService,
instanceManager,
scheduler,
libraryCacheManager,
restartStrategy,
timeout,
archiveCount,
leaderElectionService,
submittedJobGraphs,
checkpointRecoveryFactory,
savepointStore,
jobRecoveryTimeout,
metricsRegistry) = createJobManagerComponents(
configuration,
None)
val archiveProps = Props(archiveClass, archiveCount)
// start the archiver with the given name, or without (avoid name conflicts)
val archive: ActorRef = archiveActorName match {
case Some(actorName) => actorSystem.actorOf(archiveProps, actorName)
case None => actorSystem.actorOf(archiveProps)
}
val jobManagerProps = Props(
jobManagerClass,
configuration,
executorService,
instanceManager,
scheduler,
libraryCacheManager,
archive,
restartStrategy,
timeout,
leaderElectionService,
submittedJobGraphs,
checkpointRecoveryFactory,
savepointStore,
jobRecoveryTimeout,
metricsRegistry)
val jobManager: ActorRef = jobManagerActorName match {
case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
case None => actorSystem.actorOf(jobManagerProps)
}
(jobManager, archive)
}
這里首先createJobManagerComponents方法創(chuàng)建了jobManager的重要組成模塊,包括了存儲(chǔ)熙卡、備份等策略的組件實(shí)現(xiàn)杖刷,還包括以后會(huì)遇到的scheduler励饵、submittedJobGraphs驳癌,分別負(fù)責(zé)job的調(diào)度和作業(yè)的提交,這里暫不深入役听。
jobManagerActor已經(jīng)成功創(chuàng)建颓鲜,但是Scala中一個(gè)Actor會(huì)繼承Actor類,并重寫receive方法接受信息并處理典予,由此可以發(fā)現(xiàn).JobManager類繼承FlinkActor甜滨,再看FlinkActor:
trait FlinkActor extends Actor {
val log: Logger
override def receive: Receive = handleMessage
/** Handle incoming messages
* @return
*/
def handleMessage: Receive
def decorateMessage(message: Any): Any = {
message
}
}
可以看到receive方法被重寫,并賦值為handleMessage瘤袖,所以處理消息的操作被放在FlinkActor子類Jobmanager的handleMessage方法中:
override def handleMessage: Receive = {
...
...
case SubmitJob(jobGraph, listeningBehaviour) =>
val client = sender()
val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),
jobGraph.getSessionTimeout)
submitJob(jobGraph, jobInfo)
...
...
handleMessage方法中處理的消息很多衣摩,包括了諸如作業(yè)恢復(fù),leader決策捂敌,TM注冊(cè)艾扮,作業(yè)的提交既琴、恢復(fù)與取消,這里暫時(shí)只關(guān)注消息SubmitJob(jobGraph, listeningBehaviour)泡嘴,消息的定義很簡(jiǎn)單甫恩,不再追溯。而SubmitJob消息的主要獲取Client傳來(lái)的jobGraph以及l(fā)isteningBehaviour酌予。Flink的作業(yè)最后都會(huì)抽象為jobGraph交給JM處理磺箕。關(guān)于jobGraph的生成,會(huì)在后面的Job生成的過(guò)程中進(jìn)行分析抛虫。
JM對(duì)job的處理函數(shù)submitJob(jobGraph, jobInfo)松靡,參數(shù)jobInfo中包括了Client端的ActorRef,用以Job處理結(jié)果的返回建椰,該函數(shù)中實(shí)現(xiàn)了JM對(duì)作業(yè)的提交與處理的細(xì)節(jié)击困,為突出重點(diǎn),放在作業(yè)處理部分分析广凸。但從該方法的注釋來(lái)看:
/**
* Submits a job to the job manager. The job is registered at the libraryCacheManager which
* creates the job's class loader. The job graph is appended to the corresponding execution
* graph and the execution vertices are queued for scheduling.
*
* @param jobGraph representing the Flink job
* @param jobInfo the job info
* @param isRecovery Flag indicating whether this is a recovery or initial submission
*/
在該方法中將Job注冊(cè)到libraryCacheManager阅茶,并將Job執(zhí)行餓的DAG加入到調(diào)度隊(duì)列。
小結(jié)
這里僅僅就JM Actor的創(chuàng)建過(guò)程對(duì)flink的源碼進(jìn)行了分析谅海,主要了解到flink系統(tǒng)JM部分ActorSystem的組織方式脸哀,main函數(shù)最終創(chuàng)建JM 監(jiān)聽(tīng)客戶端的消息,并對(duì)作業(yè)進(jìn)行調(diào)度和Job容錯(cuò)處理扭吁,最終交由TaskManager進(jìn)行處理撞蜂。對(duì)于具體的調(diào)度和處理策略,JM和TM的通信會(huì)在以后進(jìn)行分析侥袜。接下來(lái)首先看Client端的邏輯蝌诡。