spark-源碼 master和worker啟動

概述

Worker的啟動都是通過啟動shell腳本

Master啟動

master啟動從main函數(shù)開始,主要啟動Rpc環(huán)境:RpcEnv(Rpc環(huán)境):Akka和Netty

啟動一個Master侠仇,通過啟動 Shell 腳本start-master.sh

這個腳本實際啟動 spark 的 master 類

start-master.sh? -> spark-daemon.sh start org.apache.spark.deploy.master.Master

啟動時會傳入一些參數(shù)既穆,比如cpu的執(zhí)行核數(shù),內存大小,app的main方法等

查看Master類的main方法

private[spark] object Master extends Logging {

? val systemName = "sparkMaster"

? private val actorName = "Master"

? //master啟動的入口,啟動命令里會傳入一些參數(shù)

? def main(argStrings: Array[String]) {

? ? SignalLogger.register(log)

? ? //創(chuàng)建SparkConf? ? val conf = new SparkConf

? ? //保存參數(shù)到SparkConf

? ? val args = new MasterArguments(argStrings, conf)

? ? //創(chuàng)建ActorSystem

? ? val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)

? ? //等待該主Actor結束

? ? actorSystem.awaitTermination()

? }

這里主要看startSystemAndActor方法

? /**

? *? (1) 啟動Master的actor system

? *? (2) 綁定端口

? *? (3) 啟動webui和port

? *? (4) 啟動rest服務和綁定端口

? */

? def startSystemAndActor(

? ? ? host: String,

? ? ? port: Int,

? ? ? webUiPort: Int,

? ? ? conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {

? ? val securityMgr = new SecurityManager(conf)

? ? //利用AkkaUtils創(chuàng)建ActorSystem

? ? val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,

? ? ? securityManager = securityMgr)

? ? val actor = actorSystem.actorOf(

? ? ? Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), "Master")

? ....

? }

}

spark底層通信是Akka

通過ActorSystem創(chuàng)建Actor -> actorSystem.actorOf, 就會執(zhí)行Master的構造方法(也就是說上面調用actorOf方法的時候會創(chuàng)建actor,也就是調用Master的構造器)->然后執(zhí)行Actor生命周期方法

執(zhí)行Master的構造方法初始化一些變量

private[spark] class Master(

? ? host: String,

? ? port: Int,

? ? webUiPort: Int,

? ? val securityMgr: SecurityManager,

? ? val conf: SparkConf)

? extends Actor with ActorLogReceive with Logging with LeaderElectable {

? //主構造器

? //啟用定期器功能

? import context.dispatcher? // to use Akka's scheduler.schedule()

? val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)

? def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")? // For application IDs

? //woker超時時間

? val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000

? val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)

? val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)

? val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)

? val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")

? //一個HashSet用于保存WorkerInfo

? val workers = new HashSet[WorkerInfo]

? //一個HashMap用保存workid -> WorkerInfo

? val idToWorker = new HashMap[String, WorkerInfo]

? val addressToWorker = new HashMap[Address, WorkerInfo]

? //一個HashSet用于保存客戶端(SparkSubmit)提交的任務

? val apps = new HashSet[ApplicationInfo]

? //一個HashMap Appid-》 ApplicationInfo

? val idToApp = new HashMap[String, ApplicationInfo]

? val actorToApp = new HashMap[ActorRef, ApplicationInfo]

? val addressToApp = new HashMap[Address, ApplicationInfo]

? //等待調度的App

? val waitingApps = new ArrayBuffer[ApplicationInfo]

? val completedApps = new ArrayBuffer[ApplicationInfo]

? var nextAppNumber = 0

? val appIdToUI = new HashMap[String, SparkUI]

? //保存DriverInfo

? val drivers = new HashSet[DriverInfo]

? val completedDrivers = new ArrayBuffer[DriverInfo]

? val waitingDrivers = new ArrayBuffer[DriverInfo] // Drivers currently spooled for scheduling

主構造器執(zhí)行完就會執(zhí)行preStart –》執(zhí)行完receive方法

? //啟動定時器,進行定時檢查超時的worker

? //重點看一下CheckForWorkerTimeOut

? context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)

? ? ? 1、在第一次運行的時候需要等待多少時間洛搀;

  2、循環(huán)的頻率佑淀;

  3留美、我們想發(fā)送消息的目標ActorRef ;

  4伸刃、消息

preStart方法里創(chuàng)建了一個定時器谎砾,定時檢查Woker的超時時間val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000默認為60秒

到此Master的初始化的主要過程到我們已經看到了,主要就是構造一個Master的Actor進行等待消息捧颅,并初始化了集合來保存task信息和Worker信息景图,和一個定時器來檢查Worker的超時

Woker的啟動

執(zhí)行本地 shell 腳本salves.sh-> 通過讀取配置文件, 通過ssh的方式遠程連接遠端的worker節(jié)點碉哑,然后啟動 每個節(jié)點的 work 類

spark-daemon.sh start org.apache.spark.deploy.worker.Worker

腳本會啟動org.apache.spark.deploy.worker.Worker 類

看Worker源碼:

private[spark] object Worker extends Logging {

? //Worker啟動的入口

? def main(argStrings: Array[String]) {

? ? SignalLogger.register(log)

? ? val conf = new SparkConf

? ? val args = new WorkerArguments(argStrings, conf)

? ? //新創(chuàng)ActorSystem和Actor

? ? val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,

? ? ? args.memory, args.masters, args.workDir)

? ? actorSystem.awaitTermination()

? }

這里最重要的是Woker的startSystemAndActor

? def startSystemAndActor(

? ? ? host: String,

? ? ? port: Int,

? ? ? webUiPort: Int,

? ? ? cores: Int,

? ? ? memory: Int,

? ? ? masterUrls: Array[String],

? ? ? workDir: String,

? ? ? workerNumber: Option[Int] = None,

? ? ? conf: SparkConf = new SparkConf): (ActorSystem, Int) = {

? ? // The LocalSparkCluster runs multiple local sparkWorkerX actor systems

? ? val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")

? ? val actorName = "Worker"

? ? val securityMgr = new SecurityManager(conf)

? ? //通過AkkaUtils ActorSystem

? ? val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,

? ? ? conf = conf, securityManager = securityMgr)

? ? val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))

? ? //通過actorSystem.actorOf創(chuàng)建Actor? Worker-》執(zhí)行構造器 -》 preStart -》 receice

? ? actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,

? ? ? masterAkkaUrls, systemName, "Worker",? workDir, conf, securityMgr), name = "Worker")

? ? (actorSystem, boundPort)

? }

這里啟動該Worker的Actor對象,到此Worker的啟動初始化完成

Worker與Master通信

根據Actor生命周期接著Worker的preStart方法被調用挚币,也就是說worker一起動就會給master發(fā)消息,進行注冊(說白了就是把work信息存到master的一個list里)

? override def preStart() {

? ? assert(!registered)

? ? createWorkDir()

? ? context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

? ? shuffleService.startIfEnabled()

? ? webUi = new WorkerWebUI(this, workDir, webUiPort)

? ? webUi.bind()

? ? //Worker向Master注冊

? ? registerWithMaster()

? ? ....

? }

這里調用了一個registerWithMaster方法扣典,開始向Master注冊

def registerWithMaster() {

? ? // DisassociatedEvent may be triggered multiple times, so don't attempt registration

? ? // if there are outstanding registration attempts scheduled.

? ? registrationRetryTimer match {

? ? ? case None =>

? ? ? ? registered = false

? ? ? ? //開始注冊

? ? ? ? tryRegisterAllMasters()

? ? ? ? ....

? ? }

? }

registerWithMaster里通過匹配調用了tryRegisterAllMasters方法

妆毕,接下來看

? private def tryRegisterAllMasters() {

? ? //遍歷master的地址

? ? for (masterAkkaUrl <- masterAkkaUrls) {

? ? ? logInfo("Connecting to master " + masterAkkaUrl + "...")

? ? ? //Worker得到Mater actor的遠程引用? ? ? val actor = context.actorSelection(masterAkkaUrl)

? ? ? //向Master發(fā)送注冊信息

? ? ? actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)//Worker向Master發(fā)送了一個消息,注冊內容包含,帶去一些參數(shù)贮尖,id,主機笛粘,端口,cpu核數(shù)湿硝,內存等待? ? }

? }

通過masterAkkaUrl和Master建立連接后

masterActor接受來自Worker的注冊信息

override def receiveWithLogging = {

? ? ......

? ? //接受來自Worker的注冊信息

? ? case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>

? ? {

? ? ? logInfo("Registering worker %s:%d with %d cores, %s RAM".format(

? ? ? ? workerHost, workerPort, cores, Utils.megabytesToString(memory)))

? ? ? if (state == RecoveryState.STANDBY) {

? ? ? ? // ignore, don't send response

? ? ? ? //判斷這個worker是否已經注冊過

? ? ? } else if (idToWorker.contains(id)) {

? ? ? ? //如果注冊過薪前,告訴worker注冊失敗

? ? ? ? sender ! RegisterWorkerFailed("Duplicate worker ID")

? ? ? } else {

? ? ? ? //沒有注冊過,把來自Worker的注冊信息封裝到WorkerInfo當中

? ? ? ? val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,

? ? ? ? ? sender, workerUiPort, publicAddress)

? ? ? ? if (registerWorker(worker)) {

? ? ? ? ? //用持久化引擎記錄Worker的信息

? ? ? ? ? persistenceEngine.addWorker(worker)

? ? ? ? ? //向Worker反饋信息关斜,告訴Worker注冊成功

? ? ? ? ? sender ! RegisteredWorker(masterUrl, masterWebUiUrl)

? ? ? ? ? schedule()

? ? ? ? } else {

? ? ? ? ? val workerAddress = worker.actor.path.address

? ? ? ? ? logWarning("Worker registration failed. Attempted to re-register worker at same " +

? ? ? ? ? ? "address: " + workerAddress)

? ? ? ? ? sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "

? ? ? ? ? ? + workerAddress)

? ? ? ? }

? ? ? }

? ? }

注冊成功后Worker向master發(fā)送心跳

override def receiveWithLogging = {

? ? ? case RegisteredWorker(masterUrl, masterWebUiUrl) =>

? ? ? logInfo("Successfully registered with master " + masterUrl)

? ? ? registered = true

? ? ? changeMaster(masterUrl, masterWebUiUrl)

? ? ? //啟動定時器示括,定時發(fā)送心跳Heartbeat

? ? ? context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)

? ? ? if (CLEANUP_ENABLED) {

? ? ? ? logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")

? ? ? ? context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,

? ? ? ? ? CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)

? ? ? }

worker接受來自Master的注冊成功的反饋信息,啟動定時器,定時發(fā)送心跳Heartbeat

? ? case SendHeartbeat =>

? ? ? //worker發(fā)送心跳的目的就是為了報活

? ? ? if (connected) { master ! Heartbeat(workerId) }

Master接收心跳消息蚤吹,更新最后一次心跳時間

? override def receiveWithLogging = {

? ? ? ? ....

? ? case Heartbeat(workerId) => {

? ? ? idToWorker.get(workerId) match {

? ? ? ? case Some(workerInfo) =>

? ? ? ? ? //更新最后一次心跳時間

? ? ? ? ? workerInfo.lastHeartbeat = System.currentTimeMillis()

? ? ? ? ? .....

? ? ? }

? ? }

}

記錄并更新workerInfo.lastHeartbeat = System.currentTimeMillis()最后一次心跳時間

Master的定時任務會不斷的發(fā)送一個CheckForWorkerTimeOut內部消息不斷的輪詢集合里的Worker信息例诀,如果超過60秒就將Worker信息移除

? //檢查超時的Worker

? ? case CheckForWorkerTimeOut => {

? ? ? timeOutDeadWorkers()

? ? }

timeOutDeadWorkers方法

? def timeOutDeadWorkers() {

? ? // Copy the workers into an array so we don't modify the hashset while iterating through it

? ? val currentTime = System.currentTimeMillis()

? ? val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray

? ? for (worker <- toRemove) {

? ? ? if (worker.state != WorkerState.DEAD) {

? ? ? ? logWarning("Removing %s because we got no heartbeat in %d seconds".format(

? ? ? ? ? worker.id, WORKER_TIMEOUT/1000))

? ? ? ? removeWorker(worker)

? ? ? } else {

? ? ? ? if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT)) {

? ? ? ? ? workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it

? ? ? ? }

? ? ? }

? ? }

? }

如果 (最后一次心跳時間<當前時間-超時時間)則判斷為Worker超時随抠,

將集合里的信息移除裁着。

當下一次收到心跳信息時繁涂,如果是已注冊過的,workerId不為空二驰,但是WorkerInfo已被移除的條件扔罪,就會sender ! ReconnectWorker(masterUrl)發(fā)送一個重新注冊的消息

case None =>

? ? ? ? ? if (workers.map(_.id).contains(workerId)) {

? ? ? ? ? ? logWarning(s"Got heartbeat from unregistered worker $workerId." +

? ? ? ? ? ? ? " Asking it to re-register.")

? ? ? ? ? ? //發(fā)送重新注冊的消息

? ? ? ? ? ? sender ! ReconnectWorker(masterUrl)

? ? ? ? ? } else {

? ? ? ? ? ? logWarning(s"Got heartbeat from unregistered worker $workerId." +

? ? ? ? ? ? ? " This worker was never registered, so ignoring the heartbeat.")

? ? ? ? ? }

Master與Worker啟動的大致的通信流程到此ok

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市桶雀,隨后出現(xiàn)的幾起案子矿酵,更是在濱河造成了極大的恐慌,老刑警劉巖矗积,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件全肮,死亡現(xiàn)場離奇詭異,居然都是意外死亡棘捣,警方通過查閱死者的電腦和手機辜腺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來乍恐,“玉大人评疗,你說我怎么就攤上這事∫鹆遥” “怎么了百匆?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長呜投。 經常有香客問我加匈,道長,這世上最難降的妖魔是什么仑荐? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任雕拼,我火速辦了婚禮,結果婚禮上释漆,老公的妹妹穿的比我還像新娘悲没。我一直安慰自己,他們只是感情好男图,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布示姿。 她就那樣靜靜地躺著,像睡著了一般逊笆。 火紅的嫁衣襯著肌膚如雪栈戳。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天难裆,我揣著相機與錄音子檀,去河邊找鬼镊掖。 笑死,一個胖子當著我的面吹牛褂痰,可吹牛的內容都是我干的亩进。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼缩歪,長吁一口氣:“原來是場噩夢啊……” “哼归薛!你這毒婦竟也來了?” 一聲冷哼從身側響起匪蝙,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤主籍,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后逛球,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體千元,經...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年颤绕,在試婚紗的時候發(fā)現(xiàn)自己被綠了幸海。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡屋厘,死狀恐怖涕烧,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情汗洒,我是刑警寧澤议纯,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站溢谤,受9級特大地震影響瞻凤,放射性物質發(fā)生泄漏。R本人自食惡果不足惜世杀,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一阀参、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧瞻坝,春花似錦蛛壳、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至浮创,卻和暖如春忧吟,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背斩披。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工溜族, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留讹俊,地道東北人。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓煌抒,卻偏偏與公主長得像仍劈,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子摧玫,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353