spark-submit 任務(wù)提交過程分析

https://blog.csdn.net/u013332124/article/details/91456422

一怖侦、spark-submit腳本分析

spark-submit的腳本內(nèi)容很簡單:

# 如果沒設(shè)置SPARK_HOME的環(huán)境變量,調(diào)用find-spark-home文件尋找spark-home
if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
# 直接將所有參數(shù)傳遞給spark-class
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

最后又調(diào)用spark-class躯喇。其實(shí)不光spark-submit藤抡,幾乎所有的spark服務(wù)最終都是調(diào)用spark-class來啟動(dòng)的侠碧。spark-class的代碼也不多:

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

. "${SPARK_HOME}"/bin/load-spark-env.sh

# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ "$(command -v java)" ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

# Find Spark jars.
if [ -d "${SPARK_HOME}/jars" ]; then
  SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
  SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi

if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
  echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
  echo "You need to build Spark with the target \"package\" before running this program." 1>&2
  exit 1
else
  LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi

# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

# For tests
if [[ -n "$SPARK_TESTING" ]]; then
  unset YARN_CONF_DIR
  unset HADOOP_CONF_DIR
fi

# 調(diào)用Main類生成命令
build_command() {
  "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?
}

# Turn off posix mode since it does not allow process substitution
set +o posix
CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
done < <(build_command "$@")

COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}

# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
  echo "${CMD[@]}" | head -n-1 1>&2
  exit 1
fi

if [ $LAUNCHER_EXIT_CODE != 0 ]; then
  exit $LAUNCHER_EXIT_CODE
fi

CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

spark-class主要是將參數(shù)交給org.apache.spark.launcher.Main類執(zhí)行,然后獲取到一個(gè)新的命令缠黍,之后我們拿著這個(gè)命令執(zhí)行弄兜。

比如我們執(zhí)行下面的spark-submit語句:

spark-submit --queue up --deploy-mode cluster --master yarn --class org.apache.spark.examples.SparkPi /www/harbinger-spark/examples/jars/spark-examples_2.11-2.1.0.jar 10

經(jīng)過Main類解析后,就會(huì)變成下面的命令:

/www/jdk1.8.0_51/bin/java -cp /www/harbinger-spark/conf/:/www/harbinger-spark/jars/*:/www/harbinger-hadoop/etc/hadoop/ -Xmx52m org.apache.spark.deploy.SparkSubmit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi --queue up /www/harbinger-spark/examples/jars/spark-examples_2.11-2.1.0.jar 10

我們發(fā)現(xiàn)瓷式,最終又繞回來了替饿,還是通過java命令調(diào)用SparkSubmit類。

那么蒿往,為什么spark不直接運(yùn)行SparkSubmit盛垦,而是繞了一大圈通過Main類解析獲得命令然后再運(yùn)行呢湿弦?

二瓤漏、Main類的作用

spark-submit的命令解析主要是經(jīng)過SparkSubmitCommandBuilder#buildSparkSubmitCommand()方法,我們可以看一下源碼:

  private List<String> buildSparkSubmitCommand(Map<String, String> env)
      throws IOException, IllegalArgumentException {
    //加載配置文件的配置
    Map<String, String> config = getEffectiveConfig();
    boolean isClientMode = isClientMode(config);
      //獲取用戶指定的classPath
    String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;

    List<String> cmd = buildJavaCommand(extraClassPath);
    // Take Thrift Server as daemon
    if (isThriftServer(mainClass)) {
      addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
    }
    addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));

    // We don't want the client to specify Xmx. These have to be set by their corresponding
    // memory flag --driver-memory or configuration entry spark.driver.memory
    String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
    if (!isEmpty(driverExtraJavaOptions) && driverExtraJavaOptions.contains("Xmx")) {
      String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " +
                   "java options (was %s). Use the corresponding --driver-memory or " +
                   "spark.driver.memory configuration instead.", driverExtraJavaOptions);
      throw new IllegalArgumentException(msg);
    }

    if (isClientMode) {
      String tsMemory =
        isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
      String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
        System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
      cmd.add("-Xmx" + memory);
      addOptionString(cmd, driverExtraJavaOptions);
      mergeEnvPathList(env, getLibPathEnvName(),
        config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
    }

    cmd.add("org.apache.spark.deploy.SparkSubmit");
    cmd.addAll(buildSparkSubmitArgs());
    return cmd;
  }

主要做的事情其實(shí)就是讀取各種配置然后往命令中添加一些參數(shù)。也就是對(duì)命令進(jìn)行加工蔬充。

其實(shí)添加參數(shù)這種事情直接在shell中也能做蝶俱,但是這個(gè)過程需要讀取配置文件,shell可能做起來比較麻煩饥漫。另外其他服務(wù)也會(huì)經(jīng)過Main類進(jìn)行加工榨呆,一些公共的代碼也可以抽象出來。所以庸队,這個(gè)Main類主要用于對(duì)命令的加工和轉(zhuǎn)換积蜻。

一些spark服務(wù),如果要修改一些服務(wù)的參數(shù)彻消,比如調(diào)整堆大小竿拆,就是在Main類中讀取相關(guān)的環(huán)境變量來設(shè)置的。比如SparkHistoryServer宾尚,Main類中會(huì)讀取環(huán)境變量SPARK_HISTORY_OPTS的值丙笋,然后在啟動(dòng)SparkHistoryServer時(shí)加上去。其他的服務(wù)也類似煌贴。另外御板,環(huán)境變量可以在"${SPARK_HOME}"/bin/load-spark-env.sh中設(shè)置,spark-class中會(huì)加載這個(gè)文件的配置牛郑。

三怠肋、SparkSubmit類提交任務(wù)的過程

SparkSubmit做的事情就是提交任務(wù)運(yùn)行。我們這里討論一下yarn模式的任務(wù)提交淹朋。

整個(gè)任務(wù)提交流程也比較好理解灶似,主要就是收集ApplicationMaster的上下文,比如ApplicationMaster的啟動(dòng)命令瑞你、資源文件酪惭、環(huán)境變量等,然后和yarn建立連接者甲,通過yarnClient提交ApplicationMaster到y(tǒng)arn上運(yùn)行春感。之后,不斷向yarn輪詢?nèi)蝿?wù)的狀態(tài)直到任務(wù)運(yùn)行結(jié)束虏缸。

因?yàn)檎麄€(gè)過程代碼比較多鲫懒,我們挑一些關(guān)鍵點(diǎn)進(jìn)行分析。

如何和ResourceManger建立連接

在yarn的模式下刽辙,spark會(huì)去讀取環(huán)境變量HADOOP_CONF_DIR或者YARN_CONF_DIR目錄下的配置文件窥岩,如果這兩個(gè)環(huán)境變量都沒找到,運(yùn)行spark-submit命令時(shí)就會(huì)報(bào)錯(cuò):

Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
    at org.apache.spark.deploy.SparkSubmitArguments.validateSubmitArguments(SparkSubmitArguments.scala:256)
    at org.apache.spark.deploy.SparkSubmitArguments.validateArguments(SparkSubmitArguments.scala:233)
    at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:110)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

spark主要是為了讀取該目錄下的3個(gè)文件:core-site.xml宰缤、yarn-site.xml颂翼、hdfs-site.xml晃洒。

其中core-site.xml是hadoop的核心配置。讀取yarn-site.xml配置主要是為了獲取ResourceManger的地址朦乏,之后就可以通過rpc建立連接球及。而讀取hdfs-site.xml主要是要上傳需要資源文件到hdfs用。

所以呻疹,運(yùn)行spark-submit其實(shí)并不需要整個(gè)hadoop安裝包吃引,只需要將這三個(gè)配置文件放好然后設(shè)置一下HADOOP_CONF_DIR或者YARN_CONF_DIR環(huán)境變量即可

提交任務(wù)到y(tǒng)arn的相關(guān)代碼在spark源碼的resource-managers/yarn目錄下刽锤。在使用maven編譯時(shí)镊尺,需要帶上 -Pyarn 才會(huì)將這些代碼打包進(jìn)去

spark任務(wù)配置的優(yōu)先級(jí)

在spark中,有三種方式可以設(shè)置參數(shù),這三種方法的優(yōu)先級(jí)從低到高依次是:

  1. 在 spark_default.conf 文件中配置
  2. 執(zhí)行spark-submit 時(shí)通過參數(shù)指定配置
  3. 在代碼中直接通過SparkConf的方法設(shè)置參數(shù)

比如我們?cè)?spark_default.conf 中設(shè)置了spark.executor.cores = 1并思,但是在spark-submit時(shí)又指定了--executor-cores 2鹅心,這時(shí)真正的executor的core數(shù)量就是2,spark_default.conf 中的配置被覆蓋纺荧。

但是也有一些情況旭愧,可能只會(huì)用到spark_default.conf 文件中的配置或者spark-submit的參數(shù)配置。在代碼中設(shè)置是沒用的宙暇,比如在client模式下输枯,spark.driver.extraClassPath這參數(shù)必須在啟動(dòng)Driver的時(shí)候立馬設(shè)置,這時(shí)通過SparkConf設(shè)置等于沒設(shè)置占贫。

還有一種情況桃熄,我們?cè)趕park-submit中設(shè)置appName為"a",但是在SparkConf中又設(shè)置了appname為"b"型奥。這時(shí)我們?nèi)arn的頁面就會(huì)發(fā)現(xiàn)這個(gè)app的name還是"a"瞳收,不會(huì)被覆蓋。去SparkHisotryServer中這個(gè)app的name就是"b"厢汹。這個(gè)主要是因?yàn)閟park向yarn提交任務(wù)時(shí)Driver還未運(yùn)行螟深,此時(shí)獲取到的spark.app.name還是spark-submit設(shè)置的"a"。到了真正執(zhí)行烫葬,spark.app.name配置就變成"b"了界弧。

所以,雖然大多數(shù)的配置優(yōu)先級(jí)是那樣搭综,但是如果我們發(fā)現(xiàn)哪個(gè)配置沒生效垢箕,還是需要具體情況具體分析的。

spark尋找spark_default.conf文件的過程主要是先讀取SPARK_CONF_DIR環(huán)境變量兑巾,然后讀取 目錄下面的spark_default.conf文件条获。獲取SPARK_CONF_DIR沒設(shè)置,就讀取SPARK_HOME/conf目錄下的配置文件蒋歌。這時(shí)如果SPARK_HOME環(huán)境變量也沒設(shè)置帅掘,就會(huì)報(bào)錯(cuò)

client模式的真正運(yùn)行方式

spark提交請(qǐng)求的Application上下文中有一個(gè)command參數(shù)委煤,也就是告訴yarn怎么啟動(dòng)ApplicationMaster。我們發(fā)現(xiàn)在cluster模式下锄开,啟動(dòng)的ApplicationMaster是org.apache.spark.deploy.yarn.ApplicationMaster類素标,而在client模式下称诗,啟動(dòng)的ApplicationMaster是org.apache.spark.deploy.yarn.ExecutorLauncher萍悴。

其實(shí)ExecutorLauncher的main方法還是直接調(diào)用ApplicationMaster的main方法。之后在ApplicationMaster#run()方法中寓免,如果是client模式癣诱,會(huì)去連接運(yùn)行的客戶端機(jī)器上的Driver。之后做的事就是根據(jù)Driver的命令(也就是rpc請(qǐng)求)申請(qǐng)或者釋放Container資源了袜香。

之前經(jīng)常以為client模式下撕予,Driver就是ApplicationMaster,只是AppcationMaster運(yùn)行在客戶端服務(wù)器上而已蜈首。但是實(shí)際并不是這樣实抡。client模式下,Driver運(yùn)行在客戶端上欢策,ApplicationMaster還是運(yùn)行在yarn的Container中吆寨,只是這時(shí)這個(gè)ApplicationMaster只負(fù)責(zé)進(jìn)行資源的調(diào)度而已。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末踩寇,一起剝皮案震驚了整個(gè)濱河市啄清,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌俺孙,老刑警劉巖辣卒,帶你破解...
    沈念sama閱讀 216,324評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異睛榄,居然都是意外死亡荣茫,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門场靴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來计露,“玉大人,你說我怎么就攤上這事憎乙∑惫蓿” “怎么了?”我有些...
    開封第一講書人閱讀 162,328評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵泞边,是天一觀的道長该押。 經(jīng)常有香客問我,道長阵谚,這世上最難降的妖魔是什么蚕礼? 我笑而不...
    開封第一講書人閱讀 58,147評(píng)論 1 292
  • 正文 為了忘掉前任烟具,我火速辦了婚禮,結(jié)果婚禮上奠蹬,老公的妹妹穿的比我還像新娘朝聋。我一直安慰自己,他們只是感情好囤躁,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評(píng)論 6 388
  • 文/花漫 我一把揭開白布冀痕。 她就那樣靜靜地躺著,像睡著了一般狸演。 火紅的嫁衣襯著肌膚如雪言蛇。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,115評(píng)論 1 296
  • 那天宵距,我揣著相機(jī)與錄音腊尚,去河邊找鬼。 笑死满哪,一個(gè)胖子當(dāng)著我的面吹牛婿斥,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播哨鸭,決...
    沈念sama閱讀 40,025評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼民宿,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼!你這毒婦竟也來了兔跌?” 一聲冷哼從身側(cè)響起勘高,我...
    開封第一講書人閱讀 38,867評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎坟桅,沒想到半個(gè)月后华望,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,307評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡仅乓,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評(píng)論 2 332
  • 正文 我和宋清朗相戀三年赖舟,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片夸楣。...
    茶點(diǎn)故事閱讀 39,688評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡宾抓,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出豫喧,到底是詐尸還是另有隱情石洗,我是刑警寧澤,帶...
    沈念sama閱讀 35,409評(píng)論 5 343
  • 正文 年R本政府宣布紧显,位于F島的核電站讲衫,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏孵班。R本人自食惡果不足惜涉兽,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評(píng)論 3 325
  • 文/蒙蒙 一招驴、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧枷畏,春花似錦别厘、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至袋倔,卻和暖如春雕蔽,著一層夾襖步出監(jiān)牢的瞬間折柠,已是汗流浹背宾娜。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評(píng)論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留扇售,地道東北人前塔。 一個(gè)月前我還...
    沈念sama閱讀 47,685評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像承冰,于是被迫代替她去往敵國和親华弓。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容