Spark-Core源碼精讀(1)榴嗅、Spark Deployment & start-all.sh on Standalone mode

本文為精度Spark-core的源碼的第一節(jié),主要內(nèi)容包括Spark Deployment的簡(jiǎn)介和Standalone模式下啟動(dòng)集群的詳細(xì)流程精讀周偎。

注:本專題的文章皆使用Spark-1.6.3版本的源碼為參考抹剩,如果Spark-2.1.0版本有重大改進(jìn)的地方也會(huì)進(jìn)行說(shuō)明。

Spark Deployment

Spark 的部署主要有四種方式:local蓉坎、standalone澳眷、yarn、mesos

圖片來(lái)源:Spark-Essentials-SSW2016-TE1.pdf

其中l(wèi)ocal和standalone模式主要用于測(cè)試學(xué)習(xí)蛉艾,實(shí)際生產(chǎn)環(huán)境下國(guó)內(nèi)一般都是使用yarn钳踊,這是歷史原因造成的(考慮到集群中同時(shí)有Hadoop);而國(guó)外一般都是使用mesos勿侯,而且個(gè)人認(rèn)為mesos也是一種趨勢(shì)拓瞪,關(guān)于yarn和mesos的部分,以后會(huì)單獨(dú)進(jìn)行分析助琐,下面我們?cè)敿?xì)解讀standalone模式下的集群?jiǎn)?dòng)的具體流程祭埂。

Standalone mode下集群?jiǎn)?dòng)源碼精讀

我們就從start-all.sh開(kāi)始,主要代碼如下:

# Load the Spark configuration
. "${SPARK_HOME}/sbin/spark-config.sh"

# Start Master
"${SPARK_HOME}/sbin"/start-master.sh $TACHYON_STR

# Start Workers
"${SPARK_HOME}/sbin"/start-slaves.sh $TACHYON_STR

注釋說(shuō)的很明確了兵钮,我們繼續(xù)追蹤start-master.sh

CLASS="org.apache.spark.deploy.master.Master"
...
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
  --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
  $ORIGINAL_ARGS
...

可以看出蛆橡,是執(zhí)行了spark-daemon.sh的start方法,即通過(guò)動(dòng)態(tài)加載的方式將org.apache.spark.deploy.master.Master作為一個(gè)daemon(守護(hù)線程)來(lái)運(yùn)行掘譬,所以我們直接分析Master的源碼:

private[deploy] object Master extends Logging {
  val SYSTEM_NAME = "sparkMaster"
  val ENDPOINT_NAME = "Master"

  def main(argStrings: Array[String]) {
    //注冊(cè)log
    SignalLogger.register(log)
    //實(shí)例化SparkConf泰演,會(huì)加載`spark.*`格式的配置信息
    val conf = new SparkConf
    //使用MasterArguments對(duì)傳入的參數(shù)argStrings和默認(rèn)加載的conf進(jìn)行封裝,并執(zhí)行一些初始化操作
    val args = new MasterArguments(argStrings, conf)
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    rpcEnv.awaitTermination()
  }

  /**
   * Start the Master and return a three tuple of:
   *   (1) The Master RpcEnv
   *   (2) The web UI bound port
   *   (3) The REST server bound port, if any
   */
  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }
}

首先注冊(cè)log葱轩,實(shí)例化SparkConf并加載spark.*格式的配置信息粥血,然后使用MasterArguments對(duì)傳入的參數(shù)argStrings和默認(rèn)加載的conf進(jìn)行封裝柏锄,并執(zhí)行一些初始化操作,主要是加載配置信息复亏,這里不做詳細(xì)說(shuō)明趾娃,我們接著往下看。

下面才是真正意義上的Start Master缔御,startRpcEnvAndEndpoint函數(shù)中首先實(shí)例化了SecurityManager(Spark中負(fù)責(zé)安全的類)抬闷,然后創(chuàng)建了RpcEnv(Spark的Rpc通信有三個(gè)抽象:RpcEnv、RpcEndpoint耕突、RpcEndpointRef笤成,這樣做屏蔽了底層的實(shí)現(xiàn),方便用戶進(jìn)行擴(kuò)展眷茁,Spark-1.6.3底層的默認(rèn)實(shí)現(xiàn)方式是Netty炕泳,而Spark-2.x已經(jīng)將Akka的依賴移除),接著實(shí)例化Master上祈,實(shí)際上就是實(shí)例化了一個(gè)RpcEndpoint(因?yàn)镸aster實(shí)現(xiàn)了ThreadSafeRpcEndpoint接口培遵,而ThreadSafeRpcEndpoint又繼承了RpcEndpoint),實(shí)例化完成后通過(guò)RpcEnv的setupEndpoint向RpcEnv進(jìn)行注冊(cè)登刺,注冊(cè)的時(shí)候執(zhí)行了Master的onStart方法籽腕,最后返回了一個(gè)RpcEndpointRef(實(shí)際上是NettyRpcEndpointRef),通過(guò)獲得的RpcEndpointRef向Master(Endpoint)發(fā)送了一條BoundPortsRequest消息纸俭,Master通過(guò)receiveAndReply方法接受到該消息(實(shí)際上是通過(guò)NettyRpcEnv中的Dispatcher進(jìn)行消息的分配)皇耗,模式匹配到是BoundPortsRequest類型的消息,然后執(zhí)行reply方法進(jìn)行回復(fù)揍很,源碼如下:

case BoundPortsRequest => {
      context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
    }

至此Master啟動(dòng)完成郎楼,Rpc部分可以參考另一篇文章:Spark RPC 到底是個(gè)什么鬼?窒悔,下面貼出Master實(shí)例化部分和onStart方法的源碼及中文注釋:

Master實(shí)例化部分:

  //默認(rèn)的情況下箭启,取消的task不會(huì)從工作的隊(duì)列中移除直到延遲時(shí)間完成,所以創(chuàng)建一個(gè)守護(hù)線程來(lái)“手動(dòng)”移除它
  private val forwardMessageThread =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")

  //用于執(zhí)行重建UI代碼的守護(hù)線程
  private val rebuildUIThread =
    ThreadUtils.newDaemonSingleThreadExecutor("master-rebuild-ui-thread")
    
  //通過(guò)rebuildUIThread獲得重建UI的執(zhí)行上下文
  private val rebuildUIContext = ExecutionContext.fromExecutor(rebuildUIThread)

  //獲取hadoop的配置文件
  private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)

  //時(shí)間格式蛉迹,用于構(gòu)建application ID
  private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs

  //如果Master在60s內(nèi)沒(méi)有收到Worker發(fā)送的heartbeat信息就認(rèn)為這個(gè)Worker timeout
  private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000
  //webUI中顯示的完成的application的最大個(gè)數(shù),超過(guò)200個(gè)就移除掉(200/10,1)=20個(gè)完成的applications
  private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
  //webUI中顯示的完成的drivers的最大個(gè)數(shù)放妈,超過(guò)200個(gè)就移除掉(200/10,1)=20個(gè)完成的drivers
  private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
  //如果Master在(REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)秒內(nèi)仍然沒(méi)有收到Worker發(fā)送的heartbeat信息北救,就刪除這個(gè)Worker
  private val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
  //recoveryMode:NONE、ZOOKEEPER芜抒、FILESYSTEM珍策、CUSTOM,默認(rèn)是NONE
  private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
  //Executor失敗的最大重試次數(shù)
  private val MAX_EXECUTOR_RETRIES = conf.getInt("spark.deploy.maxExecutorRetries", 10)

  //下面是各種“數(shù)據(jù)結(jié)構(gòu)”宅倒,不再一一說(shuō)明
  val workers = new HashSet[WorkerInfo]
  val idToApp = new HashMap[String, ApplicationInfo]
  val waitingApps = new ArrayBuffer[ApplicationInfo]
  val apps = new HashSet[ApplicationInfo]

  private val idToWorker = new HashMap[String, WorkerInfo]
  private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]

  private val endpointToApp = new HashMap[RpcEndpointRef, ApplicationInfo]
  private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
  private val completedApps = new ArrayBuffer[ApplicationInfo]
  private var nextAppNumber = 0
  // Using ConcurrentHashMap so that master-rebuild-ui-thread can add a UI after asyncRebuildUI
  private val appIdToUI = new ConcurrentHashMap[String, SparkUI]

  private val drivers = new HashSet[DriverInfo]
  private val completedDrivers = new ArrayBuffer[DriverInfo]
  // Drivers currently spooled for scheduling
  private val waitingDrivers = new ArrayBuffer[DriverInfo]
  private var nextDriverNumber = 0

  Utils.checkHost(address.host, "Expected hostname")

  //下面是Metrics系統(tǒng)相關(guān)的代碼
  private val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf, securityMgr)
  private val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf,
    securityMgr)
  private val masterSource = new MasterSource(this)

  // After onStart, webUi will be set
  private var webUi: MasterWebUI = null

  private val masterPublicAddress = {
    val envVar = conf.getenv("SPARK_PUBLIC_DNS")
    if (envVar != null) envVar else address.host
  }

  private val masterUrl = address.toSparkURL
  private var masterWebUiUrl: String = _

  //當(dāng)前Master的狀態(tài):STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY
  private var state = RecoveryState.STANDBY

  private var persistenceEngine: PersistenceEngine = _

  private var leaderElectionAgent: LeaderElectionAgent = _

  private var recoveryCompletionTask: ScheduledFuture[_] = _

  private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _

  // As a temporary workaround before better ways of configuring memory, we allow users to set
  // a flag that will perform round-robin scheduling across the nodes (spreading out each app
  // among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
  // 避免將application的運(yùn)行限制在固定的幾個(gè)節(jié)點(diǎn)上
  private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)

  // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
  private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
  if (defaultCores < 1) {
    throw new SparkException("spark.deploy.defaultCores must be positive")
  }

  // Alternative application submission gateway that is stable across Spark versions
  // 用來(lái)接受application提交的restServer
  private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", true)
  private var restServer: Option[StandaloneRestServer] = None
  private var restServerBoundPort: Option[Int] = None

onStart方法:

override def onStart(): Unit = {
    //打日志
    logInfo("Starting Spark master at " + masterUrl)
    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
    //實(shí)例化standalone模式下的MasterWebUI并綁定到HTTP Server
    webUi = new MasterWebUI(this, webUiPort)
    webUi.bind()
    //可以通過(guò)這個(gè)Url地址看到Master的信息
    masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
    
    //以固定的時(shí)間間隔檢查并移除time-out的worker
    checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = Utils.tryLogNonFatalError {
        self.send(CheckForWorkerTimeOut)
      }
    }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)

    //實(shí)例化并啟動(dòng)restServer用于接受application的提交
    if (restServerEnabled) {
      val port = conf.getInt("spark.master.rest.port", 6066)
      restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
    }
    restServerBoundPort = restServer.map(_.start())

    //啟動(dòng)MetricsSystem
    masterMetricsSystem.registerSource(masterSource)
    masterMetricsSystem.start()
    applicationMetricsSystem.start()
    // Attach the master and app metrics servlet handler to the web ui after the metrics systems are
    // started.
    masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
    applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)

    //序列化器
    val serializer = new JavaSerializer(conf)
    
    //恢復(fù)機(jī)制攘宙,包括持久化引擎和選舉機(jī)制
    val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
      case "ZOOKEEPER" =>
        logInfo("Persisting recovery state to ZooKeeper")
        val zkFactory =
          new ZooKeeperRecoveryModeFactory(conf, serializer)
        (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
      case "FILESYSTEM" =>
        val fsFactory =
          new FileSystemRecoveryModeFactory(conf, serializer)
        (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
      case "CUSTOM" =>
        val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
        val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
          .newInstance(conf, serializer)
          .asInstanceOf[StandaloneRecoveryModeFactory]
        (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
      case _ =>
        (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
    }
    persistenceEngine = persistenceEngine_
    leaderElectionAgent = leaderElectionAgent_
  }

下面介紹Worker的啟動(dòng)

start-slaves.sh:

# Launch the slaves
"${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-slave.sh" "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT"

start-slave.sh:

CLASS="org.apache.spark.deploy.worker.Worker"
...
  "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \
     --webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"

和Master的啟動(dòng)類似,我們直接看Worker文件,仍然從main方法開(kāi)始:

def main(argStrings: Array[String]) {
    SignalLogger.register(log)
    val conf = new SparkConf
    val args = new WorkerArguments(argStrings, conf)
    val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir, conf = conf)
    rpcEnv.awaitTermination()
  }
  
def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      cores: Int,
      memory: Int,
      masterUrls: Array[String],
      workDir: String,
      workerNumber: Option[Int] = None,
      conf: SparkConf = new SparkConf): RpcEnv = {

    // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
    val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
    val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
    rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
      masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr))
    rpcEnv
  }

可以看到前面和Master類似蹭劈,只不過(guò)Worker有可能是多個(gè)疗绣,所以需要根據(jù)workerNumber構(gòu)造一個(gè)systemName,用來(lái)創(chuàng)建不同的RpcEnv铺韧,然后實(shí)例化Worker(即實(shí)例化Endpoint)多矮,實(shí)例化的時(shí)候需要傳入masterAddresses(注意此處可能有多個(gè)Master),以便以后向Master注冊(cè)哈打,同時(shí)由于要向?qū)?yīng)的RpcEnv注冊(cè)塔逃,注冊(cè)的時(shí)候同樣要執(zhí)行Worker的onStart方法,我會(huì)將Worker實(shí)例化和onStart的源碼放到后面料仗,這里我們先來(lái)看一下Worker向Master注冊(cè)的代碼(onStart方法中調(diào)用registerWithMaster):

private def registerWithMaster() {
    // onDisconnected may be triggered multiple times, so don't attempt registration
    // if there are outstanding registration attempts scheduled.
    registrationRetryTimer match {
      case None =>
        registered = false
        registerMasterFutures = tryRegisterAllMasters()
        connectionAttemptCount = 0
        registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
          new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              Option(self).foreach(_.send(ReregisterWithMaster))
            }
          },
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          TimeUnit.SECONDS))
      case Some(_) =>
        logInfo("Not spawning another attempt to register with the master, since there is an" +
          " attempt scheduled already.")
    }
  }

可以看到內(nèi)部調(diào)用了tryRegisterAllMasters方法:

private def tryRegisterAllMasters(): Array[JFuture[_]] = {
    masterRpcAddresses.map { masterAddress =>
      registerMasterThreadPool.submit(new Runnable {
        override def run(): Unit = {
          try {
            logInfo("Connecting to master " + masterAddress + "...")
            val masterEndpoint =
              rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
            registerWithMaster(masterEndpoint)
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        }
      })
    }
  }

通過(guò)一個(gè)名為registerMasterThreadPool的線程池(最大線程數(shù)為Worker的個(gè)數(shù))來(lái)運(yùn)行run方法中的內(nèi)容:首先通過(guò)setupEndpointRef方法獲得其中一個(gè)Master的一個(gè)引用(RpcEndpointRef)湾盗,然后執(zhí)行registerWithMaster(masterEndpoint)方法,剛才得到的Master的引用作為參數(shù)傳入立轧,下面進(jìn)入registerWithMaster方法:(注意此處的registerWithMaster方法是有一個(gè)RpcEndpointRef作為參數(shù)的格粪,和剛開(kāi)始的那個(gè)不一樣)

private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
    masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
      workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
      .onComplete {
        // This is a very fast action so we can use "ThreadUtils.sameThread"
        case Success(msg) =>
          Utils.tryLogNonFatalError {
            handleRegisterResponse(msg)
          }
        case Failure(e) =>
          logError(s"Cannot register with master: ${masterEndpoint.address}", e)
          System.exit(1)
      }(ThreadUtils.sameThread)
  }

內(nèi)部使用masterEndpoint(Master的RpcEndpointRef)的ask方法向Master發(fā)送一條RegisterWorker的消息,并使用onComplete方法接受Master的處理結(jié)果肺孵,下面我們先來(lái)看一下消息到達(dá)Master端進(jìn)行怎樣的處理:

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case RegisterWorker(
        id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      if (state == RecoveryState.STANDBY) {
        context.reply(MasterInStandby)
      } else if (idToWorker.contains(id)) {
        context.reply(RegisterWorkerFailed("Duplicate worker ID"))
      } else {
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          workerRef, workerUiPort, publicAddress)
        if (registerWorker(worker)) {
          persistenceEngine.addWorker(worker)
          context.reply(RegisteredWorker(self, masterWebUiUrl))
          schedule()
        } else {
          val workerAddress = worker.endpoint.address
          logWarning("Worker registration failed. Attempted to re-register worker at same " +
            "address: " + workerAddress)
          context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
            + workerAddress))
        }
      }
    }

首先receiveAndReply方法匹配到Worker發(fā)過(guò)來(lái)的RegisterWorker消息匀借,然后執(zhí)行具體的操作:打了一個(gè)日志,判斷Master現(xiàn)在的狀態(tài)平窘,如果是STANDBY就reply一個(gè)MasterInStandby的消息吓肋,如果idToWorker中已經(jīng)存在該Worker的ID就回復(fù)重復(fù)的worker ID的失敗信息,如果都不是瑰艘,將獲得的Worker信息用WorkerInfo進(jìn)行封裝是鬼,然后執(zhí)行registerWorker(worker)操作注冊(cè)該Worker,如果成功就向persistenceEngine中添加該Worker并reply給Worker RegisteredWorker(self, masterWebUiUrl)消息并執(zhí)行schedule方法紫新,如果注冊(cè)失敗就reply RegisterWorkerFailed消息均蜜,下面我們具體看一下Master端是如何注冊(cè)Worker的,即registerWorker(worker)方法:

private def registerWorker(worker: WorkerInfo): Boolean = {
    // There may be one or more refs to dead workers on this same node (w/ different ID's),
    // remove them.
    workers.filter { w =>
      (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
    }.foreach { w =>
      workers -= w
    }

    val workerAddress = worker.endpoint.address
    if (addressToWorker.contains(workerAddress)) {
      val oldWorker = addressToWorker(workerAddress)
      if (oldWorker.state == WorkerState.UNKNOWN) {
        // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
        // The old worker must thus be dead, so we will remove it and accept the new worker.
        removeWorker(oldWorker)
      } else {
        logInfo("Attempted to re-register worker at same address: " + workerAddress)
        return false
      }
    }

    workers += worker
    idToWorker(worker.id) = worker
    addressToWorker(workerAddress) = worker
    true
  }

首先判斷是否有和該Worker的host和port相同且狀態(tài)為DEAD的Worker芒率,如果有就remove掉囤耳,然后獲得該Worker的RpcAddress,然后根據(jù)RpcAddress判斷addressToWorker中是否有相同地址的記錄偶芍,如果有記錄且老的Worker的狀態(tài)為UNKNOWN就remove掉老的Worker充择,如果沒(méi)有記錄就打日志并返回false(導(dǎo)致上一步reply:RegisterWorkerFailed)然后分別在workers、idToWorker匪蟀、addressToWorker中添加該Worker椎麦,最后返回true,導(dǎo)致上一步向Worker reply注冊(cè)成功的消息:context.reply(RegisteredWorker(self, masterWebUiUrl))材彪,并執(zhí)行schedule()观挎,即向等待的applications分配當(dāng)前可用的資源(每當(dāng)新的application加入或者有資源變化時(shí)都會(huì)調(diào)用該方法)琴儿,這個(gè)方法我會(huì)用單獨(dú)的一片文章詳細(xì)分析,現(xiàn)在我們先來(lái)看Worker端是如何進(jìn)行回復(fù)的嘁捷,回到上面的registerWithMaster方法(有參數(shù)的)造成,我們直接看成功后執(zhí)行的handleRegisterResponse(msg)這個(gè)方法:

private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
    msg match {
      case RegisteredWorker(masterRef, masterWebUiUrl) =>
        logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
        registered = true
        changeMaster(masterRef, masterWebUiUrl)
        forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(SendHeartbeat)
          }
        }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
        if (CLEANUP_ENABLED) {
          logInfo(
            s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
          forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              self.send(WorkDirCleanup)
            }
          }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
        }

      case RegisterWorkerFailed(message) =>
        if (!registered) {
          logError("Worker registration failed: " + message)
          System.exit(1)
        }

      case MasterInStandby =>
        // Ignore. Master not yet ready.
    }
  }

依然是模式匹配的方式:

  • 如果接受到的是RegisteredWorker,會(huì)執(zhí)行changeMaster方法普气,取消最后一次的重試谜疤,然后向自己的RpcEnv發(fā)送SendHeartBeat消息,使用receive方法接受到該消息后會(huì)通過(guò)sendToMaster方法向Master發(fā)送心跳现诀,最后判斷CLEANUP_ENABLED如果開(kāi)啟就向自己的RpcEnv發(fā)送WorkDirCleanup消息夷磕,接受到消息后將老的application的目錄清除
  • 如果接受到的是RegisterWorkerFailed就表明注冊(cè)失敗

changeMaster發(fā)送:

private def changeMaster(masterRef: RpcEndpointRef, uiUrl: String) {
    // activeMasterUrl it's a valid Spark url since we receive it from master.
    activeMasterUrl = masterRef.address.toSparkURL
    activeMasterWebUiUrl = uiUrl
    master = Some(masterRef)
    connected = true
    // Cancel any outstanding re-registration attempts because we found a new master
    cancelLastRegistrationRetry()
  }

cancelLastRegistrationRetry:

private def cancelLastRegistrationRetry(): Unit = {
    if (registerMasterFutures != null) {
      registerMasterFutures.foreach(_.cancel(true))
      registerMasterFutures = null
    }
    registrationRetryTimer.foreach(_.cancel(true))
    registrationRetryTimer = None
  }

如果Worker注冊(cè)失敗同樣會(huì)通過(guò)registrationRetryTimer進(jìn)行重試:

registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
          new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              Option(self).foreach(_.send(ReregisterWithMaster))
            }
          },
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          TimeUnit.SECONDS))

可以看到向自己發(fā)送重新注冊(cè)的消息:ReregisterWithMaster,receive接收到后會(huì)執(zhí)行reregisterWithMaster()方法:

private def reregisterWithMaster(): Unit = {
    Utils.tryOrExit {
      //重試次數(shù)加1
      connectionAttemptCount += 1
      if (registered) {
        //如果已經(jīng)注冊(cè)了仔沿,就取消重試
        cancelLastRegistrationRetry()
      } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {  //判斷是否超過(guò)最大重試次數(shù)
        logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")
        /**
         * Re-register with the active master this worker has been communicating with. If there
         * is none, then it means this worker is still bootstrapping and hasn't established a
         * connection with a master yet, in which case we should re-register with all masters.
         *
         * It is important to re-register only with the active master during failures. Otherwise,
         * if the worker unconditionally attempts to re-register with all masters, the following
         * race condition may arise and cause a "duplicate worker" error detailed in SPARK-4592:
         *
         *   (1) Master A fails and Worker attempts to reconnect to all masters
         *   (2) Master B takes over and notifies Worker
         *   (3) Worker responds by registering with Master B
         *   (4) Meanwhile, Worker's previous reconnection attempt reaches Master B,
         *       causing the same Worker to register with Master B twice
         *
         * Instead, if we only register with the known active master, we can assume that the
         * old master must have died because another master has taken over. Note that this is
         * still not safe if the old master recovers within this interval, but this is a much
         * less likely scenario.
         */
        master match {
          case Some(masterRef) =>
            // registered == false && master != None means we lost the connection to master, so
            // masterRef cannot be used and we need to recreate it again. Note: we must not set
            // master to None due to the above comments.
            // 這里說(shuō)的很清楚坐桩,如果注冊(cè)失敗了,但是master != None說(shuō)明我們失去了和master的連接封锉,所以需要重新創(chuàng)建一個(gè)masterRef
            // 先取消原來(lái)阻塞的用來(lái)等待消息回復(fù)的線程
            if (registerMasterFutures != null) {
              registerMasterFutures.foreach(_.cancel(true))
            }
            
            // 然后創(chuàng)建新的masterRef绵跷,然后重新注冊(cè)
            val masterAddress = masterRef.address
            registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable {
              override def run(): Unit = {
                try {
                  logInfo("Connecting to master " + masterAddress + "...")
                  val masterEndpoint =
                    rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
                  registerWithMaster(masterEndpoint)
                } catch {
                  case ie: InterruptedException => // Cancelled
                  case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
                }
              }
            }))
          case None =>
            // 如果沒(méi)有masterRef,先取消原來(lái)阻塞的用來(lái)等待消息回復(fù)的線程
            if (registerMasterFutures != null) {
              registerMasterFutures.foreach(_.cancel(true))
            }
            
            // 然后執(zhí)行最初的注冊(cè)成福,即tryRegisterAllMasters
            // We are retrying the initial registration
            registerMasterFutures = tryRegisterAllMasters()
        }
        // We have exceeded the initial registration retry threshold
        // All retries from now on should use a higher interval
        // 如果超過(guò)剛開(kāi)始設(shè)置的重試注冊(cè)次數(shù)碾局,取消之前的重試,開(kāi)啟新的注冊(cè)奴艾,并改變重試次數(shù)和時(shí)間間隔
        // 剛開(kāi)始的重試默認(rèn)為6次净当,時(shí)間間隔在5到15秒之間,接下來(lái)的10次重試時(shí)間間隔在30到90秒之間
        if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
          registrationRetryTimer.foreach(_.cancel(true))
          registrationRetryTimer = Some(
            forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
              override def run(): Unit = Utils.tryLogNonFatalError {
                self.send(ReregisterWithMaster)
              }
            }, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
              PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
              TimeUnit.SECONDS))
        }
      } else {
        logError("All masters are unresponsive! Giving up.")
        System.exit(1)
      }
    }
  }

至此Worker的啟動(dòng)和注冊(cè)完成蕴潦,即start-all.sh執(zhí)行完成像啼。

下面是Worker的初始化部分和onStart方法的源碼及注釋(重要部分):

初始化部分:

  private val host = rpcEnv.address.host
  private val port = rpcEnv.address.port

  Utils.checkHost(host, "Expected hostname")
  assert (port > 0)

  // A scheduled executor used to send messages at the specified time.
  private val forwordMessageScheduler =
    ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")

  // A separated thread to clean up the workDir. Used to provide the implicit parameter of `Future`
  // methods.
  private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
    ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))

  // For worker and executor IDs
  private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
  // 發(fā)送心跳的時(shí)間間隔:timeout的時(shí)間 / 4
  // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
  private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4

  // 重試的模型及其次數(shù)設(shè)置
  // Model retries to connect to the master, after Hadoop's model.
  // The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds)
  // Afterwards, the next 10 attempts are between 30 and 90 seconds.
  // A bit of randomness is introduced so that not all of the workers attempt to reconnect at
  // the same time.
  private val INITIAL_REGISTRATION_RETRIES = 6
  private val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10
  private val FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND = 0.500
  private val REGISTRATION_RETRY_FUZZ_MULTIPLIER = {
    val randomNumberGenerator = new Random(UUID.randomUUID.getMostSignificantBits)
    randomNumberGenerator.nextDouble + FUZZ_MULTIPLIER_INTERVAL_LOWER_BOUND
  }
  private val INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(10 *
    REGISTRATION_RETRY_FUZZ_MULTIPLIER))
  private val PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS = (math.round(60
    * REGISTRATION_RETRY_FUZZ_MULTIPLIER))

  //CLEANUP相關(guān)的設(shè)置
  private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
  // How often worker will clean up old app folders
  private val CLEANUP_INTERVAL_MILLIS =
    conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
  // TTL for app folders/data;  after TTL expires it will be cleaned up
  private val APP_DATA_RETENTION_SECONDS =
    conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)

  private val testing: Boolean = sys.props.contains("spark.testing")
  //對(duì)master的引用
  private var master: Option[RpcEndpointRef] = None
  private var activeMasterUrl: String = ""
  private[worker] var activeMasterWebUiUrl : String = ""
  private val workerUri = rpcEnv.uriOf(systemName, rpcEnv.address, endpointName)
  private var registered = false
  private var connected = false
  private val workerId = generateWorkerId()
  private val sparkHome =
    if (testing) {
      assert(sys.props.contains("spark.test.home"), "spark.test.home is not set!")
      new File(sys.props("spark.test.home"))
    } else {
      new File(sys.env.get("SPARK_HOME").getOrElse("."))
    }

  var workDir: File = null
  val finishedExecutors = new LinkedHashMap[String, ExecutorRunner]
  val drivers = new HashMap[String, DriverRunner]
  val executors = new HashMap[String, ExecutorRunner]
  val finishedDrivers = new LinkedHashMap[String, DriverRunner]
  val appDirectories = new HashMap[String, Seq[String]]
  val finishedApps = new HashSet[String]

  val retainedExecutors = conf.getInt("spark.worker.ui.retainedExecutors",
    WorkerWebUI.DEFAULT_RETAINED_EXECUTORS)
  val retainedDrivers = conf.getInt("spark.worker.ui.retainedDrivers",
    WorkerWebUI.DEFAULT_RETAINED_DRIVERS)

  // The shuffle service is not actually started unless configured.
  private val shuffleService = new ExternalShuffleService(conf, securityMgr)

  private val publicAddress = {
    val envVar = conf.getenv("SPARK_PUBLIC_DNS")
    if (envVar != null) envVar else host
  }
  private var webUi: WorkerWebUI = null

  private var connectionAttemptCount = 0

  private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
  private val workerSource = new WorkerSource(this)

  private var registerMasterFutures: Array[JFuture[_]] = null
  private var registrationRetryTimer: Option[JScheduledFuture[_]] = None

  // 用來(lái)和Master注冊(cè)使用的線程池,默認(rèn)線程的最大個(gè)數(shù)為Worker的個(gè)數(shù)
  // A thread pool for registering with masters. Because registering with a master is a blocking
  // action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
  // time so that we can register with all masters.
  private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
    "worker-register-master-threadpool",
    masterRpcAddresses.size // Make sure we can register with all masters at the same time
  )

  var coresUsed = 0
  var memoryUsed = 0

onStart()方法:

override def onStart() {
    assert(!registered)
    logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
      host, port, cores, Utils.megabytesToString(memory)))
    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
    logInfo("Spark home: " + sparkHome)
    // 創(chuàng)建Work的目錄
    createWorkDir()
    // 開(kāi)啟 external shuffle service
    shuffleService.startIfEnabled()
    webUi = new WorkerWebUI(this, workDir, webUiPort)
    webUi.bind()
    // 向Master注冊(cè)自己
    registerWithMaster()

    // metrics系統(tǒng)
    metricsSystem.registerSource(workerSource)
    metricsSystem.start()
    // Attach the worker metrics servlet handler to the web ui after the metrics system is started.
    metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
  }

本文簡(jiǎn)單介紹了Spark的幾種部署模式潭苞,并詳細(xì)的分析了start-all.sh所執(zhí)行源碼(Master的啟動(dòng)和注冊(cè)忽冻、Worker的啟動(dòng)和向Master的注冊(cè))的具體流程,當(dāng)然Master的schedule方法并沒(méi)有詳細(xì)說(shuō)明此疹,我們會(huì)單獨(dú)用一篇文章進(jìn)行詳細(xì)的分析僧诚。

本文參考和拓展閱讀:

Spark-1.6.3源碼

Spark-2.1.0源碼

本文為原創(chuàng),歡迎轉(zhuǎn)載蝗碎,轉(zhuǎn)載請(qǐng)注明出處湖笨、作者,謝謝衍菱!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市肩豁,隨后出現(xiàn)的幾起案子脊串,更是在濱河造成了極大的恐慌辫呻,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件琼锋,死亡現(xiàn)場(chǎng)離奇詭異放闺,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)缕坎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門(mén)怖侦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人谜叹,你說(shuō)我怎么就攤上這事匾寝。” “怎么了荷腊?”我有些...
    開(kāi)封第一講書(shū)人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵艳悔,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我女仰,道長(zhǎng)猜年,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任疾忍,我火速辦了婚禮乔外,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘一罩。我一直安慰自己杨幼,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布擒抛。 她就那樣靜靜地躺著推汽,像睡著了一般。 火紅的嫁衣襯著肌膚如雪歧沪。 梳的紋絲不亂的頭發(fā)上歹撒,一...
    開(kāi)封第一講書(shū)人閱讀 49,144評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音诊胞,去河邊找鬼暖夭。 笑死,一個(gè)胖子當(dāng)著我的面吹牛撵孤,可吹牛的內(nèi)容都是我干的迈着。 我是一名探鬼主播,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼邪码,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼裕菠!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起闭专,我...
    開(kāi)封第一講書(shū)人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤奴潘,失蹤者是張志新(化名)和其女友劉穎旧烧,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體画髓,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡掘剪,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了奈虾。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片夺谁。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖肉微,靈堂內(nèi)的尸體忽然破棺而出匾鸥,到底是詐尸還是另有隱情,我是刑警寧澤浪册,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布扫腺,位于F島的核電站,受9級(jí)特大地震影響村象,放射性物質(zhì)發(fā)生泄漏笆环。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一厚者、第九天 我趴在偏房一處隱蔽的房頂上張望躁劣。 院中可真熱鬧,春花似錦库菲、人聲如沸账忘。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)鳖擒。三九已至,卻和暖如春烫止,著一層夾襖步出監(jiān)牢的瞬間蒋荚,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工馆蠕, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留期升,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓互躬,卻偏偏與公主長(zhǎng)得像播赁,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子吼渡,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容