本文為精度Spark-core的源碼的第一節(jié),主要內(nèi)容包括Spark Deployment的簡(jiǎn)介和Standalone模式下啟動(dòng)集群的詳細(xì)流程精讀周偎。
注:本專題的文章皆使用Spark-1.6.3版本的源碼為參考抹剩,如果Spark-2.1.0版本有重大改進(jìn)的地方也會(huì)進(jìn)行說(shuō)明。
Spark Deployment
Spark 的部署主要有四種方式:local蓉坎、standalone澳眷、yarn、mesos
其中l(wèi)ocal和standalone模式主要用于測(cè)試學(xué)習(xí)蛉艾,實(shí)際生產(chǎn)環(huán)境下國(guó)內(nèi)一般都是使用yarn钳踊,這是歷史原因造成的(考慮到集群中同時(shí)有Hadoop);而國(guó)外一般都是使用mesos勿侯,而且個(gè)人認(rèn)為mesos也是一種趨勢(shì)拓瞪,關(guān)于yarn和mesos的部分,以后會(huì)單獨(dú)進(jìn)行分析助琐,下面我們?cè)敿?xì)解讀standalone模式下的集群?jiǎn)?dòng)的具體流程祭埂。
Standalone mode下集群?jiǎn)?dòng)源碼精讀
我們就從start-all.sh開(kāi)始,主要代碼如下:
# Load the Spark configuration
. "${SPARK_HOME}/sbin/spark-config.sh"
# Start Master
"${SPARK_HOME}/sbin"/start-master.sh $TACHYON_STR
# Start Workers
"${SPARK_HOME}/sbin"/start-slaves.sh $TACHYON_STR
注釋說(shuō)的很明確了兵钮,我們繼續(xù)追蹤start-master.sh
CLASS="org.apache.spark.deploy.master.Master"
...
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
--ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
$ORIGINAL_ARGS
...
可以看出蛆橡,是執(zhí)行了spark-daemon.sh的start方法,即通過(guò)動(dòng)態(tài)加載的方式將org.apache.spark.deploy.master.Master作為一個(gè)daemon(守護(hù)線程)來(lái)運(yùn)行掘譬,所以我們直接分析Master的源碼:
private[deploy] object Master extends Logging {
val SYSTEM_NAME = "sparkMaster"
val ENDPOINT_NAME = "Master"
def main(argStrings: Array[String]) {
//注冊(cè)log
SignalLogger.register(log)
//實(shí)例化SparkConf泰演,會(huì)加載`spark.*`格式的配置信息
val conf = new SparkConf
//使用MasterArguments對(duì)傳入的參數(shù)argStrings和默認(rèn)加載的conf進(jìn)行封裝,并執(zhí)行一些初始化操作
val args = new MasterArguments(argStrings, conf)
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}
/**
* Start the Master and return a three tuple of:
* (1) The Master RpcEnv
* (2) The web UI bound port
* (3) The REST server bound port, if any
*/
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
}
首先注冊(cè)log葱轩,實(shí)例化SparkConf并加載spark.*
格式的配置信息粥血,然后使用MasterArguments對(duì)傳入的參數(shù)argStrings和默認(rèn)加載的conf進(jìn)行封裝柏锄,并執(zhí)行一些初始化操作,主要是加載配置信息复亏,這里不做詳細(xì)說(shuō)明趾娃,我們接著往下看。
下面才是真正意義上的Start Master缔御,startRpcEnvAndEndpoint函數(shù)中首先實(shí)例化了SecurityManager(Spark中負(fù)責(zé)安全的類)抬闷,然后創(chuàng)建了RpcEnv(Spark的Rpc通信有三個(gè)抽象:RpcEnv、RpcEndpoint耕突、RpcEndpointRef笤成,這樣做屏蔽了底層的實(shí)現(xiàn),方便用戶進(jìn)行擴(kuò)展眷茁,Spark-1.6.3底層的默認(rèn)實(shí)現(xiàn)方式是Netty炕泳,而Spark-2.x已經(jīng)將Akka的依賴移除),接著實(shí)例化Master上祈,實(shí)際上就是實(shí)例化了一個(gè)RpcEndpoint(因?yàn)镸aster實(shí)現(xiàn)了ThreadSafeRpcEndpoint接口培遵,而ThreadSafeRpcEndpoint又繼承了RpcEndpoint),實(shí)例化完成后通過(guò)RpcEnv的setupEndpoint向RpcEnv進(jìn)行注冊(cè)登刺,注冊(cè)的時(shí)候執(zhí)行了Master的onStart方法籽腕,最后返回了一個(gè)RpcEndpointRef(實(shí)際上是NettyRpcEndpointRef),通過(guò)獲得的RpcEndpointRef向Master(Endpoint)發(fā)送了一條BoundPortsRequest消息纸俭,Master通過(guò)receiveAndReply方法接受到該消息(實(shí)際上是通過(guò)NettyRpcEnv中的Dispatcher進(jìn)行消息的分配)皇耗,模式匹配到是BoundPortsRequest類型的消息,然后執(zhí)行reply方法進(jìn)行回復(fù)揍很,源碼如下:
case BoundPortsRequest => {
context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
}
至此Master啟動(dòng)完成郎楼,Rpc部分可以參考另一篇文章:Spark RPC 到底是個(gè)什么鬼?窒悔,下面貼出Master實(shí)例化部分和onStart方法的源碼及中文注釋:
Master實(shí)例化部分:
//默認(rèn)的情況下箭启,取消的task不會(huì)從工作的隊(duì)列中移除直到延遲時(shí)間完成,所以創(chuàng)建一個(gè)守護(hù)線程來(lái)“手動(dòng)”移除它
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
//用于執(zhí)行重建UI代碼的守護(hù)線程
private val rebuildUIThread =
ThreadUtils.newDaemonSingleThreadExecutor("master-rebuild-ui-thread")
//通過(guò)rebuildUIThread獲得重建UI的執(zhí)行上下文
private val rebuildUIContext = ExecutionContext.fromExecutor(rebuildUIThread)
//獲取hadoop的配置文件
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
//時(shí)間格式蛉迹,用于構(gòu)建application ID
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
//如果Master在60s內(nèi)沒(méi)有收到Worker發(fā)送的heartbeat信息就認(rèn)為這個(gè)Worker timeout
private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000
//webUI中顯示的完成的application的最大個(gè)數(shù),超過(guò)200個(gè)就移除掉(200/10,1)=20個(gè)完成的applications
private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
//webUI中顯示的完成的drivers的最大個(gè)數(shù)放妈,超過(guò)200個(gè)就移除掉(200/10,1)=20個(gè)完成的drivers
private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
//如果Master在(REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)秒內(nèi)仍然沒(méi)有收到Worker發(fā)送的heartbeat信息北救,就刪除這個(gè)Worker
private val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
//recoveryMode:NONE、ZOOKEEPER芜抒、FILESYSTEM珍策、CUSTOM,默認(rèn)是NONE
private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
//Executor失敗的最大重試次數(shù)
private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.maxExecutorRetries", 10)
//下面是各種“數(shù)據(jù)結(jié)構(gòu)”宅倒,不再一一說(shuō)明
val workers = new HashSet[WorkerInfo]
val idToApp = new HashMap[String, ApplicationInfo]
val waitingApps = new ArrayBuffer[ApplicationInfo]
val apps = new HashSet[ApplicationInfo]
private val idToWorker = new HashMap[String, WorkerInfo]
private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]
private val endpointToApp = new HashMap[RpcEndpointRef, ApplicationInfo]
private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
private val completedApps = new ArrayBuffer[ApplicationInfo]
private var nextAppNumber = 0
// Using ConcurrentHashMap so that master-rebuild-ui-thread can add a UI after asyncRebuildUI
private val appIdToUI = new ConcurrentHashMap[String, SparkUI]
private val drivers = new HashSet[DriverInfo]
private val completedDrivers = new ArrayBuffer[DriverInfo]
// Drivers currently spooled for scheduling
private val waitingDrivers = new ArrayBuffer[DriverInfo]
private var nextDriverNumber = 0
Utils.checkHost(address.host, "Expected hostname")
//下面是Metrics系統(tǒng)相關(guān)的代碼
private val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf, securityMgr)
private val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf,
securityMgr)
private val masterSource = new MasterSource(this)
// After onStart, webUi will be set
private var webUi: MasterWebUI = null
private val masterPublicAddress = {
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else address.host
}
private val masterUrl = address.toSparkURL
private var masterWebUiUrl: String = _
//當(dāng)前Master的狀態(tài):STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY
private var state = RecoveryState.STANDBY
private var persistenceEngine: PersistenceEngine = _
private var leaderElectionAgent: LeaderElectionAgent = _
private var recoveryCompletionTask: ScheduledFuture[_] = _
private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
// 避免將application的運(yùn)行限制在固定的幾個(gè)節(jié)點(diǎn)上
private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
// Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
if (defaultCores < 1) {
throw new SparkException("spark.deploy.defaultCores must be positive")
}
// Alternative application submission gateway that is stable across Spark versions
// 用來(lái)接受application提交的restServer
private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", true)
private var restServer: Option[StandaloneRestServer] = None
private var restServerBoundPort: Option[Int] = None
onStart方法:
override def onStart(): Unit = {
//打日志
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
//實(shí)例化standalone模式下的MasterWebUI并綁定到HTTP Server
webUi = new MasterWebUI(this, webUiPort)
webUi.bind()
//可以通過(guò)這個(gè)Url地址看到Master的信息
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
//以固定的時(shí)間間隔檢查并移除time-out的worker
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
//實(shí)例化并啟動(dòng)restServer用于接受application的提交
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
}
restServerBoundPort = restServer.map(_.start())
//啟動(dòng)MetricsSystem
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
// Attach the master and app metrics servlet handler to the web ui after the metrics systems are
// started.
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
//序列化器
val serializer = new JavaSerializer(conf)
//恢復(fù)機(jī)制攘宙,包括持久化引擎和選舉機(jī)制
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}
下面介紹Worker的啟動(dòng)
start-slaves.sh:
# Launch the slaves
"${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-slave.sh" "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT"
start-slave.sh:
CLASS="org.apache.spark.deploy.worker.Worker"
...
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \
--webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"
和Master的啟動(dòng)類似,我們直接看Worker文件,仍然從main方法開(kāi)始:
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir, conf = conf)
rpcEnv.awaitTermination()
}
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {
// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
}
可以看到前面和Master類似蹭劈,只不過(guò)Worker有可能是多個(gè)疗绣,所以需要根據(jù)workerNumber構(gòu)造一個(gè)systemName,用來(lái)創(chuàng)建不同的RpcEnv铺韧,然后實(shí)例化Worker(即實(shí)例化Endpoint)多矮,實(shí)例化的時(shí)候需要傳入masterAddresses(注意此處可能有多個(gè)Master),以便以后向Master注冊(cè)哈打,同時(shí)由于要向?qū)?yīng)的RpcEnv注冊(cè)塔逃,注冊(cè)的時(shí)候同樣要執(zhí)行Worker的onStart方法,我會(huì)將Worker實(shí)例化和onStart的源碼放到后面料仗,這里我們先來(lái)看一下Worker向Master注冊(cè)的代碼(onStart方法中調(diào)用registerWithMaster):
private def registerWithMaster() {
// onDisconnected may be triggered multiple times, so don't attempt registration
// if there are outstanding registration attempts scheduled.
registrationRetryTimer match {
case None =>
registered = false
registerMasterFutures = tryRegisterAllMasters()
connectionAttemptCount = 0
registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReregisterWithMaster))
}
},
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
" attempt scheduled already.")
}
}
可以看到內(nèi)部調(diào)用了tryRegisterAllMasters方法:
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
masterRpcAddresses.map { masterAddress =>
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint =
rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
registerWithMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
}
})
}
}
通過(guò)一個(gè)名為registerMasterThreadPool的線程池(最大線程數(shù)為Worker的個(gè)數(shù))來(lái)運(yùn)行run方法中的內(nèi)容:首先通過(guò)setupEndpointRef方法獲得其中一個(gè)Master的一個(gè)引用(RpcEndpointRef)湾盗,然后執(zhí)行registerWithMaster(masterEndpoint)方法,剛才得到的Master的引用作為參數(shù)傳入立轧,下面進(jìn)入registerWithMaster方法:(注意此處的registerWithMaster方法是有一個(gè)RpcEndpointRef作為參數(shù)的格粪,和剛開(kāi)始的那個(gè)不一樣)
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Utils.tryLogNonFatalError {
handleRegisterResponse(msg)
}
case Failure(e) =>
logError(s"Cannot register with master: ${masterEndpoint.address}", e)
System.exit(1)
}(ThreadUtils.sameThread)
}
內(nèi)部使用masterEndpoint(Master的RpcEndpointRef)的ask方法向Master發(fā)送一條RegisterWorker的消息,并使用onComplete方法接受Master的處理結(jié)果肺孵,下面我們先來(lái)看一下消息到達(dá)Master端進(jìn)行怎樣的處理:
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
context.reply(MasterInStandby)
} else if (idToWorker.contains(id)) {
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
context.reply(RegisteredWorker(self, masterWebUiUrl))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
}
首先receiveAndReply方法匹配到Worker發(fā)過(guò)來(lái)的RegisterWorker消息匀借,然后執(zhí)行具體的操作:打了一個(gè)日志,判斷Master現(xiàn)在的狀態(tài)平窘,如果是STANDBY就reply一個(gè)MasterInStandby的消息吓肋,如果idToWorker中已經(jīng)存在該Worker的ID就回復(fù)重復(fù)的worker ID的失敗信息,如果都不是瑰艘,將獲得的Worker信息用WorkerInfo進(jìn)行封裝是鬼,然后執(zhí)行registerWorker(worker)操作注冊(cè)該Worker,如果成功就向persistenceEngine中添加該Worker并reply給Worker RegisteredWorker(self, masterWebUiUrl)消息并執(zhí)行schedule方法紫新,如果注冊(cè)失敗就reply RegisterWorkerFailed消息均蜜,下面我們具體看一下Master端是如何注冊(cè)Worker的,即registerWorker(worker)方法:
private def registerWorker(worker: WorkerInfo): Boolean = {
// There may be one or more refs to dead workers on this same node (w/ different ID's),
// remove them.
workers.filter { w =>
(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
workers -= w
}
val workerAddress = worker.endpoint.address
if (addressToWorker.contains(workerAddress)) {
val oldWorker = addressToWorker(workerAddress)
if (oldWorker.state == WorkerState.UNKNOWN) {
// A worker registering from UNKNOWN implies that the worker was restarted during recovery.
// The old worker must thus be dead, so we will remove it and accept the new worker.
removeWorker(oldWorker)
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
}
workers += worker
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
true
}
首先判斷是否有和該Worker的host和port相同且狀態(tài)為DEAD的Worker芒率,如果有就remove掉囤耳,然后獲得該Worker的RpcAddress,然后根據(jù)RpcAddress判斷addressToWorker中是否有相同地址的記錄偶芍,如果有記錄且老的Worker的狀態(tài)為UNKNOWN就remove掉老的Worker充择,如果沒(méi)有記錄就打日志并返回false(導(dǎo)致上一步reply:RegisterWorkerFailed)然后分別在workers、idToWorker匪蟀、addressToWorker中添加該Worker椎麦,最后返回true,導(dǎo)致上一步向Worker reply注冊(cè)成功的消息:context.reply(RegisteredWorker(self, masterWebUiUrl))材彪,并執(zhí)行schedule()观挎,即向等待的applications分配當(dāng)前可用的資源(每當(dāng)新的application加入或者有資源變化時(shí)都會(huì)調(diào)用該方法)琴儿,這個(gè)方法我會(huì)用單獨(dú)的一片文章詳細(xì)分析,現(xiàn)在我們先來(lái)看Worker端是如何進(jìn)行回復(fù)的嘁捷,回到上面的registerWithMaster方法(有參數(shù)的)造成,我們直接看成功后執(zhí)行的handleRegisterResponse(msg)這個(gè)方法:
private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
msg match {
case RegisteredWorker(masterRef, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
registered = true
changeMaster(masterRef, masterWebUiUrl)
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(SendHeartbeat)
}
}, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
if (CLEANUP_ENABLED) {
logInfo(
s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(WorkDirCleanup)
}
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}
case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
System.exit(1)
}
case MasterInStandby =>
// Ignore. Master not yet ready.
}
}
依然是模式匹配的方式:
- 如果接受到的是RegisteredWorker,會(huì)執(zhí)行changeMaster方法普气,取消最后一次的重試谜疤,然后向自己的RpcEnv發(fā)送SendHeartBeat消息,使用receive方法接受到該消息后會(huì)通過(guò)sendToMaster方法向Master發(fā)送心跳现诀,最后判斷CLEANUP_ENABLED如果開(kāi)啟就向自己的RpcEnv發(fā)送WorkDirCleanup消息夷磕,接受到消息后將老的application的目錄清除
- 如果接受到的是RegisterWorkerFailed就表明注冊(cè)失敗
changeMaster發(fā)送:
private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
// activeMasterUrl it's a valid Spark url since we receive it from master.
activeMasterUrl = masterRef.address.toSparkURL
activeMasterWebUiUrl = uiUrl
master = Some(masterRef)
connected = true
// Cancel any outstanding re-registration attempts because we found a new master
cancelLastRegistrationRetry()
}
cancelLastRegistrationRetry:
private def cancelLastRegistrationRetry(): Unit = {
if (registerMasterFutures != null) {
registerMasterFutures.foreach(_.cancel(true))
registerMasterFutures = null
}
registrationRetryTimer.foreach(_.cancel(true))
registrationRetryTimer = None
}
如果Worker注冊(cè)失敗同樣會(huì)通過(guò)registrationRetryTimer進(jìn)行重試:
registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReregisterWithMaster))
}
},
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
可以看到向自己發(fā)送重新注冊(cè)的消息:ReregisterWithMaster,receive接收到后會(huì)執(zhí)行reregisterWithMaster()方法:
private def reregisterWithMaster(): Unit = {
Utils.tryOrExit {
//重試次數(shù)加1
connectionAttemptCount += 1
if (registered) {
//如果已經(jīng)注冊(cè)了仔沿,就取消重試
cancelLastRegistrationRetry()
} else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) { //判斷是否超過(guò)最大重試次數(shù)
logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")
/**
* Re-register with the active master this worker has been communicating with. If there
* is none, then it means this worker is still bootstrapping and hasn't established a
* connection with a master yet, in which case we should re-register with all masters.
*
* It is important to re-register only with the active master during failures. Otherwise,
* if the worker unconditionally attempts to re-register with all masters, the following
* race condition may arise and cause a "duplicate worker" error detailed in SPARK-4592:
*
* (1) Master A fails and Worker attempts to reconnect to all masters
* (2) Master B takes over and notifies Worker
* (3) Worker responds by registering with Master B
* (4) Meanwhile, Worker's previous reconnection attempt reaches Master B,
* causing the same Worker to register with Master B twice
*
* Instead, if we only register with the known active master, we can assume that the
* old master must have died because another master has taken over. Note that this is
* still not safe if the old master recovers within this interval, but this is a much
* less likely scenario.
*/
master match {
case Some(masterRef) =>
// registered == false && master != None means we lost the connection to master, so
// masterRef cannot be used and we need to recreate it again. Note: we must not set
// master to None due to the above comments.
// 這里說(shuō)的很清楚坐桩,如果注冊(cè)失敗了,但是master != None說(shuō)明我們失去了和master的連接封锉,所以需要重新創(chuàng)建一個(gè)masterRef
// 先取消原來(lái)阻塞的用來(lái)等待消息回復(fù)的線程
if (registerMasterFutures != null) {
registerMasterFutures.foreach(_.cancel(true))
}
// 然后創(chuàng)建新的masterRef绵跷,然后重新注冊(cè)
val masterAddress = masterRef.address
registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = {
try {
logInfo("Connecting to master " + masterAddress + "...")
val masterEndpoint =
rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
registerWithMaster(masterEndpoint)
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
}
}))
case None =>
// 如果沒(méi)有masterRef,先取消原來(lái)阻塞的用來(lái)等待消息回復(fù)的線程
if (registerMasterFutures != null) {
registerMasterFutures.foreach(_.cancel(true))
}
// 然后執(zhí)行最初的注冊(cè)成福,即tryRegisterAllMasters
// We are retrying the initial registration
registerMasterFutures = tryRegisterAllMasters()
}
// We have exceeded the initial registration retry threshold
// All retries from now on should use a higher interval
// 如果超過(guò)剛開(kāi)始設(shè)置的重試注冊(cè)次數(shù)碾局,取消之前的重試,開(kāi)啟新的注冊(cè)奴艾,并改變重試次數(shù)和時(shí)間間隔
// 剛開(kāi)始的重試默認(rèn)為6次净当,時(shí)間間隔在5到15秒之間,接下來(lái)的10次重試時(shí)間間隔在30到90秒之間
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
registrationRetryTimer.foreach(_.cancel(true))
registrationRetryTimer = Some(
forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(ReregisterWithMaster)
}
}, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
}
} else {
logError("All masters are unresponsive! Giving up.")
System.exit(1)
}
}
}
至此Worker的啟動(dòng)和注冊(cè)完成蕴潦,即start-all.sh執(zhí)行完成像啼。
下面是Worker的初始化部分和onStart方法的源碼及注釋(重要部分):
初始化部分:
private val host = rpcEnv.address.host
private val port = rpcEnv.address.port
Utils.checkHost(host, "Expected hostname")
assert (port > 0)
// A scheduled executor used to send messages at the specified time.
private val forwordMessageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
// A separated thread to clean up the workDir. Used to provide the implicit parameter of `Future`
// methods.
private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))
// For worker and executor IDs
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
// 發(fā)送心跳的時(shí)間間隔:timeout的時(shí)間 / 4
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
// 重試的模型及其次數(shù)設(shè)置
// Model retries to connect to the master, after Hadoop's model.
// The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds)
// Afterwards, the next 10 attempts are between 30 and 90 seconds.
// A bit of randomness is introduced so that not all of the workers attempt to reconnect at
// the same time.
private val INITIAL_REGISTRATION_RETRIES = 6
private val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
private val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500
private val REGISTRATION_RETRY_FUZZ_MULTIPLIER = {
val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits)
randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND
}
private val INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(10 *
REGISTRATION_RETRY_FUZZ_MULTIPLIER))
private val PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(60
* REGISTRATION_RETRY_FUZZ_MULTIPLIER))
//CLEANUP相關(guān)的設(shè)置
private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
// How often worker will clean up old app folders
private val CLEANUP_INTERVAL_MILLIS =
conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
// TTL for app folders/data; after TTL expires it will be cleaned up
private val APP_DATA_RETENTION_SECONDS =
conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
private val testing: Boolean = sys.props.contains("spark.testing")
//對(duì)master的引用
private var master: Option[RpcEndpointRef] = None
private var activeMasterUrl: String = ""
private[worker] var activeMasterWebUiUrl : String = ""
private val workerUri = rpcEnv.uriOf(systemName, rpcEnv.address, endpointName)
private var registered = false
private var connected = false
private val workerId = generateWorkerId()
private val sparkHome =
if (testing) {
assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
new File(sys.props("spark.test.home"))
} else {
new File(sys.env.get("SPARK_HOME").getOrElse("."))
}
var workDir: File = null
val finishedExecutors = new LinkedHashMap[String, ExecutorRunner]
val drivers = new HashMap[String, DriverRunner]
val executors = new HashMap[String, ExecutorRunner]
val finishedDrivers = new LinkedHashMap[String, DriverRunner]
val appDirectories = new HashMap[String, Seq[String]]
val finishedApps = new HashSet[String]
val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors",
WorkerWebUI.DEFAULT_RETAINED_EXECUTORS)
val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers",
WorkerWebUI.DEFAULT_RETAINED_DRIVERS)
// The shuffle service is not actually started unless configured.
private val shuffleService = new ExternalShuffleService(conf, securityMgr)
private val publicAddress = {
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
}
private var webUi: WorkerWebUI = null
private var connectionAttemptCount = 0
private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
private val workerSource = new WorkerSource(this)
private var registerMasterFutures: Array[JFuture[_]] = null
private var registrationRetryTimer: Option[JScheduledFuture[_]] = None
// 用來(lái)和Master注冊(cè)使用的線程池,默認(rèn)線程的最大個(gè)數(shù)為Worker的個(gè)數(shù)
// A thread pool for registering with masters. Because registering with a master is a blocking
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
// time so that we can register with all masters.
private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
"worker-register-master-threadpool",
masterRpcAddresses.size // Make sure we can register with all masters at the same time
)
var coresUsed = 0
var memoryUsed = 0
onStart()方法:
override def onStart() {
assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
logInfo("Spark home: " + sparkHome)
// 創(chuàng)建Work的目錄
createWorkDir()
// 開(kāi)啟 external shuffle service
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()
// 向Master注冊(cè)自己
registerWithMaster()
// metrics系統(tǒng)
metricsSystem.registerSource(workerSource)
metricsSystem.start()
// Attach the worker metrics servlet handler to the web ui after the metrics system is started.
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
本文簡(jiǎn)單介紹了Spark的幾種部署模式潭苞,并詳細(xì)的分析了start-all.sh所執(zhí)行源碼(Master的啟動(dòng)和注冊(cè)忽冻、Worker的啟動(dòng)和向Master的注冊(cè))的具體流程,當(dāng)然Master的schedule方法并沒(méi)有詳細(xì)說(shuō)明此疹,我們會(huì)單獨(dú)用一篇文章進(jìn)行詳細(xì)的分析僧诚。
本文參考和拓展閱讀:
本文為原創(chuàng),歡迎轉(zhuǎn)載蝗碎,轉(zhuǎn)載請(qǐng)注明出處湖笨、作者,謝謝衍菱!