Spark 源碼分析(一):Spark Submit 任務(wù)提交

研究 Spark 內(nèi)部是怎么運(yùn)行的,怎么將 Spark 的任務(wù)從開始運(yùn)行到結(jié)束的踪古,先從 spark-submit 這個 shell 腳本提交用戶程序開始券腔。下面的分析都是基于 spark 2.1.1 版本。

我們一般提交 Spark 任務(wù)時纷纫,都會寫一個如下的腳本,里面指定 spark-submit 腳本的位置辱魁,配置好一些參數(shù),然后運(yùn)行:

./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

上面那個腳本實(shí)際上會將參數(shù)帶到 spark-submit 腳本中去執(zhí)行参滴,看一下 spark-submit 腳本:

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi  # disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

腳本最后調(diào)用 exec 執(zhí)行 "{SPARK_HOME}"/bin/spark-class锻弓,調(diào)用的 class 為:`org.apache.spark.deploy.SparkSubmit `,后面的 "\@" 是腳本執(zhí)行的所有參數(shù)过蹂。

通過 spark-class 腳本,最終執(zhí)行的命令中酷勺,制定了程序的入口為org.apache.spark.deploy.SparkSubmit

一脆诉,org.apache.spark.deploy.SparkSubmit

1,main 方法

def main(args: Array[String]): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

從 main 方法中可以看出亏狰,根據(jù)解析后的參數(shù)中的 action 進(jìn)行模式匹配偶摔,是什么操作就執(zhí)行什么方法,我們這邊是 submit 操作辰斋,則調(diào)用 submit 方法。

2够挂,submit 方法

submit 方法做兩件事情藕夫,第一件事為通過 clusterManager 和 dploymode 去決定下一步要執(zhí)行的類的 main 方法,第二件事是根據(jù)反射執(zhí)行這個 main 方法毅贮。

2.1,submit 方法第一步

這部分主要是準(zhǔn)備下一步要執(zhí)行的相關(guān)類及參數(shù):

private def submit(args: SparkSubmitArguments): Unit = {
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
2.1.1病蛉,prepareSubmitEnvironment 方法

通過調(diào)用 prepareSubmitEnvironment 方法來準(zhǔn)備下一步要執(zhí)行的類的 main 方法及相關(guān)參數(shù),看一下這個方法铡恕,下面這部分是根據(jù)參數(shù)中的 master 和 deploy-mode 來設(shè)置對應(yīng)的 cluasterManager 和部署模式:

private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
      : (Seq[String], Seq[String], Map[String, String], String) = {
    // 要返回的四個參數(shù)
    val childArgs = new ArrayBuffer[String]()
    val childClasspath = new ArrayBuffer[String]()
    val sysProps = new HashMap[String, String]()
    var childMainClass = ""

    // 根據(jù)腳本中配置的 master 參數(shù)去模式匹配出 clusterManager
    val clusterManager: Int = args.master match {
      case "yarn" => YARN
      case "yarn-client" | "yarn-cluster" =>
        printWarning(s"Master ${args.master} is deprecated since 2.0." +
          " Please use master \"yarn\" with specified deploy mode instead.")
        YARN
      case m if m.startsWith("spark") => STANDALONE
      case m if m.startsWith("mesos") => MESOS
      case m if m.startsWith("local") => LOCAL
      case _ =>
        printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
        -1
    }

    // 根據(jù) deployMode 參數(shù)去模式匹配出部署模式
    var deployMode: Int = args.deployMode match {
      case "client" | null => CLIENT
      case "cluster" => CLUSTER
      case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
    }

然后會根據(jù)上面匹配出的集群以及部署模式?jīng)Q定怎么提交 application探熔,我們這邊看一下 standalone 集群部署模式,看下面這部分代碼:

        // standalone cluster 模式下的 childMainClass 以及參數(shù)的配置
    if (args.isStandaloneCluster) {
      //如果參數(shù)中配置了 useRest 則為 RestSubmissionClient 的方式去提交 application
      if (args.useRest) {
        childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
        childArgs += (args.primaryResource, args.mainClass)
      } else {
        // 否則使用 Client 放是去提交 application
        childMainClass = "org.apache.spark.deploy.Client"
        if (args.supervise) { childArgs += "--supervise" }
        Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
        Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
        childArgs += "launch"
        childArgs += (args.master, args.primaryResource, args.mainClass)
      }
      if (args.childArgs != null) {
        childArgs ++= args.childArgs
      }
    }

在 standalone 集群模式下诀艰,有兩個提交網(wǎng)關(guān):

1,使用 org.apache.spark.deploy.Client 作為包裝器來使用傳統(tǒng)的 RPC 網(wǎng)關(guān)苛蒲;

2绿满,使用 Spark 1.3 中引入的基于 rest 的網(wǎng)關(guān)。

2.2喇颁,submit 方法第二步

這里我們的參數(shù)已經(jīng)準(zhǔn)備好了,然后根據(jù)我們 standalone cluster 部署模式?jīng)Q定下一步怎么執(zhí)行:

    /*  在standalone集群模式下橘霎,有兩個提交網(wǎng)關(guān):
    *   1.使用org.apache.spark.deploy.Client作為包裝器來使用傳統(tǒng)的RPC網(wǎng)關(guān)
    *   2.Spark 1.3中引入的基于rest的網(wǎng)關(guān)
    *   第二種方法是Spark 1.3的默認(rèn)行為,但是Spark submit將會失敗
    *   如果master不是一個REST服務(wù)器瓦盛,那么它將無法使用REST網(wǎng)關(guān)外潜。
    */
        if (args.isStandaloneCluster && args.useRest) {
      try {
        // scalastyle:off println
        printStream.println("Running Spark using the REST application submission protocol.")
        // scalastyle:on println
        doRunMain()
      } catch {
        // Fail over to use the legacy submission gateway
        case e: SubmitRestConnectionException =>
          printWarning(s"Master endpoint ${args.master} was not a REST server. " +
            "Falling back to legacy submission gateway instead.")
          args.useRest = false
          submit(args)
      }
    } else {
      // 其他模式,直接調(diào)用doRunMain方法
      doRunMain()
    }

接著會調(diào)用到 doRunMain 方法橡卤,內(nèi)部其實(shí)調(diào)用了 runMain 方法损搬,所以我們直接看 runMain 方法。

2.2.1嵌灰,runMain 方法
//實(shí)際上這個方法就是根據(jù)我們上面 prepareSubmitEnvironment 方法準(zhǔn)備好的參數(shù),通過反射的方法去執(zhí)行我們
//下一步要執(zhí)行的類及方法
private def runMain(
      childArgs: Seq[String],
      childClasspath: Seq[String],
      sysProps: Map[String, String],
      childMainClass: String,
      verbose: Boolean): Unit = {
    // scalastyle:off println
    if (verbose) {
      printStream.println(s"Main class:\n$childMainClass")
      printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
      printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
      printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
      printStream.println("\n")
    }
    // scalastyle:on println

    val loader =
      if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
        new ChildFirstURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      } else {
        new MutableURLClassLoader(new Array[URL](0),
          Thread.currentThread.getContextClassLoader)
      }
    Thread.currentThread.setContextClassLoader(loader)

    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }

    for ((key, value) <- sysProps) {
      System.setProperty(key, value)
    }

    var mainClass: Class[_] = null

    try {
      mainClass = Utils.classForName(childMainClass)
    } catch {
      case e: ClassNotFoundException =>
        e.printStackTrace(printStream)
        if (childMainClass.contains("thriftserver")) {
          // scalastyle:off println
          printStream.println(s"Failed to load main class $childMainClass.")
          printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
          // scalastyle:on println
        }
        System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
      case e: NoClassDefFoundError =>
        e.printStackTrace(printStream)
        if (e.getMessage.contains("org/apache/hadoop/hive")) {
          // scalastyle:off println
          printStream.println(s"Failed to load hive class.")
          printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
          // scalastyle:on println
        }
        System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
    }

    // SPARK-4170
    if (classOf[scala.App].isAssignableFrom(mainClass)) {
      printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
    }

    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
      throw new IllegalStateException("The main method in the given main class must be static")
    }

    @tailrec
    def findCause(t: Throwable): Throwable = t match {
      case e: UndeclaredThrowableException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: InvocationTargetException =>
        if (e.getCause() != null) findCause(e.getCause()) else e
      case e: Throwable =>
        e
    }

    try {
      //通過反射去執(zhí)行準(zhǔn)備好的 mainClass 的 main 方法
      mainMethod.invoke(null, childArgs.toArray)
    } catch {
      case t: Throwable =>
        findCause(t) match {
          case SparkUserAppException(exitCode) =>
            System.exit(exitCode)

          case t: Throwable =>
            throw t
        }
    }
  }

我們選取的 standalone cluster 模式去分析的,根據(jù)上面的 prepareSubmitEnvironment 方法可以知道我們要使用 org.apache.spark.deploy.Client 這個 childMainClass驹溃,然后根據(jù)上面的代碼知道延曙,我們下一步是將相關(guān)參數(shù)帶進(jìn) org.apache.spark.deploy.Client 這個類的 main 方法中去執(zhí)行。

所以下面開始看 org.apache.spark.deploy.Client

二枝缔,org.apache.spark.deploy.Client

Client 用于啟動和終止 standalone 集群中的 Driver 程序。

1,main 方法

def main(args: Array[String]) {
    // scalastyle:off println
    if (!sys.props.contains("SPARK_SUBMIT")) {
      println("WARNING: This client is deprecated and will be removed in a future version of Spark")
      println("Use ./bin/spark-submit with \"--master spark://host:port\"")
    }
    // scalastyle:on println

    val conf = new SparkConf()
    val driverArgs = new ClientArguments(args)

    if (!conf.contains("spark.rpc.askTimeout")) {
      conf.set("spark.rpc.askTimeout", "10s")
    }
    Logger.getRootLogger.setLevel(driverArgs.logLevel)

  //創(chuàng)建 rpcEnv
    val rpcEnv =
      RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

  //獲取 master 節(jié)點(diǎn)的 RpcEndPoint 的引用截型,用于和 master 進(jìn)行 Rpc 通信
    val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
      map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
  
  //注冊 rpcEndpoint儒溉,調(diào)用 onStart方法
    rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))

  //
    rpcEnv.awaitTermination()
  }

這里開始創(chuàng)建 rpcEnv 了,關(guān)于 Rpc 這塊的知識點(diǎn)顿涣,可以看前面這篇文章了解一下:Spark 中的 RPC,拿到 master 的 rpcEndpoint 的引用去注冊 rpcEndpoint舔痪,這里會去調(diào)用 ClientEndpoint 的 onstart 方法。

三锄码,org.apache.spark.deploy.ClientEndpoint

ClientEndPoint 是一個 ThreadSafeRpcEndpoint晌涕,下面看下它的 onStart 方法。

1余黎,onStart 方法

override def onStart(): Unit = {
    driverArgs.cmd match {
      case "launch" =>
        // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
        //       truncate filesystem paths similar to what YARN does. For now, we just require
        //       people call `addJar` assuming the jar is in the same directory.
        val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

        val classPathConf = "spark.driver.extraClassPath"
        val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
          cp.split(java.io.File.pathSeparator)
        }

        val libraryPathConf = "spark.driver.extraLibraryPath"
        val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
          cp.split(java.io.File.pathSeparator)
        }

        val extraJavaOptsConf = "spark.driver.extraJavaOptions"
        val extraJavaOpts = sys.props.get(extraJavaOptsConf)
          .map(Utils.splitCommandString).getOrElse(Seq.empty)
        val sparkJavaOpts = Utils.sparkJavaOpts(conf)
        val javaOpts = sparkJavaOpts ++ extraJavaOpts
      
        // 將classPathEntries,libraryPathEntries,javaOpts,drvierArgs信息封裝成Command
        //  這里的mainClass為org.apache.spark.deploy.worker.DriverWrapper
        val command = new Command(mainClass,
          Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
          sys.env, classPathEntries, libraryPathEntries, javaOpts)
                
        //  將drvierArgs惧财,command信息封裝成DriverDescription
        val driverDescription = new DriverDescription(
          driverArgs.jarUrl,
          driverArgs.memory,
          driverArgs.cores,
          driverArgs.supervise,
          command)
      
        //  向master發(fā)送RequestSubmitDriver,注冊Driver
        ayncSendToMasterAndForwardReply[SubmitDriverResponse](
          RequestSubmitDriver(driverDescription))

      case "kill" =>
        val driverId = driverArgs.driverId
        ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
    }
  }

這里也會根據(jù) cmd 進(jìn)行模式匹配,垮衷,如果命令為 launch,就去獲取 driver 的額外的 java 依賴搀突,classpath,java 配置甸昏。然后將這些信息封裝為一個 Command 對象,再降 driver 的參數(shù)和 command 信息一起封裝成 DriverDescription 對象施蜜,調(diào)用 ayncSendToMasterAndForwardReply 發(fā)送信息绊寻。

2花墩,ayncSendToMasterAndForwardReply 方法

private def ayncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {
    for (masterEndpoint <- masterEndpoints) {
      masterEndpoint.ask[T](message).onComplete {
        case Success(v) => self.send(v)
        case Failure(e) =>
          logWarning(s"Error sending messages to master $masterEndpoint", e)
      }(forwardMessageExecutionContext)
    }
  }

這個方法實(shí)際上就是將信息發(fā)送到 masterEndpoint 上去。

四和泌,總結(jié)

至此祠肥,我們整個 spark-submit 任務(wù)提交就完成了,接下來就是等待 master 返回 driver 的注冊結(jié)果仇箱,啟動 driver。

最后可以看一下 spark-submit 過程的流程圖:

image
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末忠烛,一起剝皮案震驚了整個濱河市权逗,隨后出現(xiàn)的幾起案子美尸,更是在濱河造成了極大的恐慌斟薇,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件胯陋,死亡現(xiàn)場離奇詭異,居然都是意外死亡袱箱,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進(jìn)店門犯眠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來症革,“玉大人,你說我怎么就攤上這事噪矛。” “怎么了残炮?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵缩滨,是天一觀的道長势就。 經(jīng)常有香客問我,道長袖牙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任鞭达,我火速辦了婚禮皇忿,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鳍烁。我一直安慰自己,他們只是感情好幔荒,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著墓怀,像睡著了一般卫键。 火紅的嫁衣襯著肌膚如雪傀履。 梳的紋絲不亂的頭發(fā)上莉炉,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天,我揣著相機(jī)與錄音梆暮,去河邊找鬼。 笑死啦粹,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的唠椭。 我是一名探鬼主播忍饰,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼艾蓝!你這毒婦竟也來了斗塘?” 一聲冷哼從身側(cè)響起亮靴,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎台猴,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體饱狂,經(jīng)...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年讲婚,在試婚紗的時候發(fā)現(xiàn)自己被綠了俊柔。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡雏婶,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出留晚,到底是詐尸還是另有隱情,我是刑警寧澤错维,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站参歹,受9級特大地震影響隆判,放射性物質(zhì)發(fā)生泄漏犬庇。R本人自食惡果不足惜蜜氨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望飒炎。 院中可真熱鬧笆豁,春花似錦赤赊、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽吹截。三九已至凝危,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間蛾默,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工冬念, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人急前。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓瀑构,卻偏偏與公主長得像叔汁,于是被迫代替她去往敵國和親检碗。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,033評論 2 355