Spark 源碼閱讀 1

最近對音視頻轉(zhuǎn)碼系統(tǒng)進行重構(gòu),嘗試使用spark作分布式并發(fā)轉(zhuǎn)碼任務(wù)框架办陷。對于不熟悉的事物貌夕,使用起來畢竟心里沒底。所以便有了這次源碼的閱讀民镜。

Master 啟動過程

master的啟動命令是:

./sbin/start-master.sh

于是我們從這個腳本出發(fā)啡专。開始跟蹤Spark的啟動流程。
我們只抓主線制圈,其它一些支節(jié)先忽略们童,先了解整體流程。
閱讀start-master.sh 發(fā)現(xiàn)實際執(zhí)行語句為:

${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port  $SPARK_MASTER_WEBUI_PORT \
 $ORIGINAL_ARGS

其中CLASS為:

\# NOTE: This exact class name is matched downstream by SparkSubmit.
\# Any changes need to be reflected there.
CLASS="org.apache.spark.deploy.master.Master"
  • 其它參數(shù)主要是端口信息有一些啟動參數(shù)离唐,可以先忽略病附。
    精簡為:
spark-daemon.sh start  org.apache.spark.deploy.master.Master 1

查看/spark-daemon.sh
關(guān)鍵語句為:

nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null

其中command 為start
查看:/bin/spark-class
找到真正入口:

CMD=()                       
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")              
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH"   org.apache.spark.launcher.Main "$@")
exec "${CMD[@]}"             

$RUNNER$LAUNCH_CLASSPATH 分別是java 路徑及類路徑问窃。
實際調(diào)用:org.apache.spark.launcher.Main 生成java命令重定向輸入到$CMD中亥鬓,并使用exec執(zhí)行$CMD。在$CMD中主要執(zhí)行類為上面提到的**org.apache.spark.deploy.master.Master **
到這里找到程序的實際真正入口:

org.apache.spark.deploy.master.Master

文件所在位置:

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

入口函數(shù):private[deploy] object Master extends Logging
如下:

private[deploy] object Master extends Logging {
  val SYSTEM_NAME = "sparkMaster"
  val ENDPOINT_NAME = "Master"

  def main(argStrings: Array[String]) {
    SignalLogger.register(log)
    val conf = new SparkConf
    val args = new MasterArguments(argStrings, conf)
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
  }

  /**
   * Start the Master and return a three tuple of:
   *   (1) The Master RpcEnv
   *   (2) The web UI bound port
   *   (3) The REST server bound port, if any
   */
  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)//創(chuàng)建rpcEnv使用Netty
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
  new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }
}

這里主要是創(chuàng)建了一個rpcEnv域庇,并將master數(shù)作為一個endpoint注入其中嵌戈。
跟入: RpcEnv.create

  def create(
      name: String,
      host: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      clientMode: Boolean = false): RpcEnv = {
    // Using Reflection to create the RpcEnv to avoid to depend on Akka directly
    val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
    getRpcEnvFactory(conf).create(config)
  }

這里使用了getRpcEnvFactory(conf).create(config) 創(chuàng)建一個rpcEnv返回。

  private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
    val rpcEnvNames = Map(
      "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
      "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
    val rpcEnvName = conf.get("spark.rpc", "netty")
    val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
    Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
  }

實際使用中听皿,我們使用了netty作為異步NIO框架熟呛。故這里使用的是
org.apache.spark.rpc.netty.NettyRpcEnvFactory
工廠類用于生成 rpcEnv
路徑:

core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala

看一下這個工廠類。create方法尉姨。

private[netty] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {

  def create(config: RpcEnvConfig): RpcEnv = {
    val sparkConf = config.conf
    // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
    // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
    val javaSerializerInstance =
      new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)
    if (!config.clientMode) {
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      try {
        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
      } catch {
        case NonFatal(e) =>
          nettyEnv.shutdown()
          throw e
      }
    }
    nettyEnv
  }
}

rpcEnv的實現(xiàn)是NettyRpcEnv

使用

 Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1

啟動服務(wù): nettyEnv.startServer(actualPort)

  def startServer(port: Int): Unit = {
    val bootstraps: java.util.List[TransportServerBootstrap] =
      if (securityManager.isAuthenticationEnabled()) {
        java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
      } else {
        java.util.Collections.emptyList()
      }
    server = transportContext.createServer(host, port, bootstraps)
    dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
  }

回到Master.scala startRpcEnvAndEndpoint中

  val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
  new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

將Master注冊進入rpcEnv中獲得masterEndpoint
Netty中通過dispatcher派發(fā)消息
我們進入Dispatcher.scala定位到消息派發(fā)函數(shù):

  /** Message loop used for dispatching messages. */
  private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            val data = receivers.take()
            if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              receivers.offer(PoisonPill)
              return
            }
            data.inbox.process(Dispatcher.this)
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }

消息通過

 data.inbox.process(Dispatcher.this) 

處理
跟入:

core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala

定位:

   /**
   * Process stored messages.
   */
  def process(dispatcher: Dispatcher): Unit = {
    var message: InboxMessage = null
    inbox.synchronized {
      if (!enableConcurrent && numActiveThreads != 0) {
        return
      }
      message = messages.poll()
      if (message != null) {
        numActiveThreads += 1
      } else {
        return
      }
    }
    while (true) {
      safelyCall(endpoint) {
        message match {
          case RpcMessage(_sender, content, context) =>
            try {
              endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
                throw new SparkException(s"Unsupported message $message from ${_sender}")
              })
            } catch {
              case NonFatal(e) =>
                context.sendFailure(e)
                // Throw the exception -- this exception will be caught by the safelyCall function.
                // The endpoint's onError function will be called.
                throw e
            }

          case OneWayMessage(_sender, content) =>
            endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
              throw new SparkException(s"Unsupported message $message from ${_sender}")
            })

          case OnStart =>
            endpoint.onStart()
            if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
              inbox.synchronized {
                if (!stopped) {
                  enableConcurrent = true
                }
              }
            }

          case OnStop =>
            val activeThreads = inbox.synchronized { inbox.numActiveThreads }
                assert(activeThreads == 1,
              s"There should be only a single active thread but found $activeThreads threads.")
            dispatcher.removeRpcEndpointRef(endpoint)
            endpoint.onStop()
            assert(isEmpty, "OnStop should be the last message")

          case RemoteProcessConnected(remoteAddress) =>
            endpoint.onConnected(remoteAddress)

          case RemoteProcessDisconnected(remoteAddress) =>
            endpoint.onDisconnected(remoteAddress)

          case RemoteProcessConnectionError(cause, remoteAddress) =>
            endpoint.onNetworkError(cause, remoteAddress)
        }
      }

      inbox.synchronized {
        // "enableConcurrent" will be set to false after `onStop` is called, so we should check it
        // every time.
        if (!enableConcurrent && numActiveThreads != 1) {
          // If we are not the only one worker, exit
          numActiveThreads -= 1
          return
        }
        message = messages.poll()
        if (message == null) {
          numActiveThreads -= 1
          return
        }
      }
    }
  }

可看出:
啟動時調(diào)用了:

endpoint.onStart()

啟動后提供rpc調(diào)用庵朝,并通過receiveAndReply處理:

endpoint.receiveAndReply

這里endpoint 為我們的 Master
到Master中查看這兩個函數(shù)。

  • 先看onStart()
  override def onStart(): Unit = {
    logInfo("Starting Spark master at " + masterUrl)
    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
    webUi = new MasterWebUI(this, webUiPort)
    webUi.bind()
    masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
    checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = Utils.tryLogNonFatalError {
        self.send(CheckForWorkerTimeOut)
      }
    }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)

    if (restServerEnabled) {
      val port = conf.getInt("spark.master.rest.port", 6066)
      restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
    }
    restServerBoundPort = restServer.map(_.start())

    masterMetricsSystem.registerSource(masterSource)
    masterMetricsSystem.start()
    applicationMetricsSystem.start()
    // Attach the master and app metrics servlet handler to the web ui after the metrics systems are
    // started.
    masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
    applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)

    val serializer = new JavaSerializer(conf)
    val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
      case "ZOOKEEPER" =>
        logInfo("Persisting recovery state to ZooKeeper")
        val zkFactory =
          new ZooKeeperRecoveryModeFactory(conf, serializer)
        (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
      case "FILESYSTEM" =>
        val fsFactory =
          new FileSystemRecoveryModeFactory(conf, serializer)
        (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
      case "CUSTOM" =>
        val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
        val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
          .newInstance(conf, serializer)
          .asInstanceOf[StandaloneRecoveryModeFactory]
        (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
      case _ =>
        (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
    }
    persistenceEngine = persistenceEngine_
    leaderElectionAgent = leaderElectionAgent_
  }

主要動作是啟動了web ui界面又厉,啟動了監(jiān)控九府,設(shè)置了master的高可用。

  • 再看另一函數(shù):receiveAndReply
    這個是master的主要工作函數(shù)覆致。

首先其分為多個case項侄旬。
先看第一個。

case RegisterWorker

這個主要是當(dāng)有新worker啟動時煌妈,worker的注冊函數(shù)儡羔。
看一下主體部分:

//創(chuàng)建worker信息類

 val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          workerRef, workerUiPort, publicAddress)
 if (registerWorker(worker)) {
    //注冊worker
   persistenceEngine.addWorker(worker)
   context.reply(RegisteredWorker(self, masterWebUiUrl))
   schedule() //重新調(diào)度宣羊,平衡集群
 } 
  • 重點看一下schedule()
 /**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   */
  private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) {
      return
    }
    // Drivers take strict precedence over executors
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      var launched = false
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
    startExecutorsOnWorkers()
  }

整個調(diào)度過程還是比較簡單的。
首先取出workers集合狀態(tài)為alive的worker
然后遍歷driver等待隊列汰蜘,將driver 加載到滿足資源要求的worker中仇冯。
最后遍歷Apps等待隊列,過濾出可用的wokers族操,apps并發(fā)度沒達(dá)到預(yù)設(shè)值時赞枕,將app放到對應(yīng)的worker中,增加app并發(fā)度坪创。

這里startExecutorsOnWorkers() 如下:

  /**
   * Schedule and launch executors on workers
   */
  private def startExecutorsOnWorkers(): Unit = {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    for (app <- waitingApps if app.coresLeft > 0) {
      val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
      // Filter out workers that don't have enough resources to launch an executor
      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
          worker.coresFree >= coresPerExecutor.getOrElse(1))
        .sortBy(_.coresFree).reverse
      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

      // Now that we've decided how many cores to allocate on each worker, let's allocate them
      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
        allocateWorkerResourceToExecutors(
          app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
      }
    }
  }

簡單的FIFO方式炕婶。
scheduleExecutorsOnWorkers()返回對應(yīng)worker需要擴展的executor記錄
allocateWorkerResourceToExecutors()進行資源分配

到這里啟過程基本完成,但仍有兩處不明白莱预。driver與app 分別是怎么動作機制柠掂。代碼是如何提交上來的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末依沮,一起剝皮案震驚了整個濱河市涯贞,隨后出現(xiàn)的幾起案子考抄,更是在濱河造成了極大的恐慌藕施,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件督暂,死亡現(xiàn)場離奇詭異辜限,居然都是意外死亡皇拣,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進店門薄嫡,熙熙樓的掌柜王于貴愁眉苦臉地迎上來氧急,“玉大人,你說我怎么就攤上這事毫深》园樱” “怎么了?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵哑蔫,是天一觀的道長钉寝。 經(jīng)常有香客問我,道長闸迷,這世上最難降的妖魔是什么嵌纲? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮稿黍,結(jié)果婚禮上疹瘦,老公的妹妹穿的比我還像新娘。我一直安慰自己巡球,他們只是感情好言沐,可當(dāng)我...
    茶點故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布邓嘹。 她就那樣靜靜地躺著,像睡著了一般险胰。 火紅的嫁衣襯著肌膚如雪汹押。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天起便,我揣著相機與錄音棚贾,去河邊找鬼。 笑死榆综,一個胖子當(dāng)著我的面吹牛妙痹,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播鼻疮,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼怯伊,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了判沟?” 一聲冷哼從身側(cè)響起耿芹,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎挪哄,沒想到半個月后吧秕,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡迹炼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年砸彬,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片疗涉。...
    茶點故事閱讀 39,722評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡拿霉,死狀恐怖吟秩,靈堂內(nèi)的尸體忽然破棺而出咱扣,到底是詐尸還是另有隱情,我是刑警寧澤涵防,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布闹伪,位于F島的核電站,受9級特大地震影響壮池,放射性物質(zhì)發(fā)生泄漏偏瓤。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一椰憋、第九天 我趴在偏房一處隱蔽的房頂上張望厅克。 院中可真熱鬧,春花似錦橙依、人聲如沸证舟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽女责。三九已至漆枚,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間抵知,已是汗流浹背墙基。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留刷喜,地道東北人残制。 一個月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像掖疮,于是被迫代替她去往敵國和親痘拆。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內(nèi)容