https://blog.csdn.net/u013332124/article/details/91456422
一怖侦、spark-submit腳本分析
spark-submit的腳本內(nèi)容很簡單:
# 如果沒設(shè)置SPARK_HOME的環(huán)境變量,調(diào)用find-spark-home文件尋找spark-home
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
# 直接將所有參數(shù)傳遞給spark-class
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
最后又調(diào)用spark-class躯喇。其實(shí)不光spark-submit藤抡,幾乎所有的spark服務(wù)最終都是調(diào)用spark-class來啟動(dòng)的侠碧。spark-class的代碼也不多:
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
. "${SPARK_HOME}"/bin/load-spark-env.sh
# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ "$(command -v java)" ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
# Find Spark jars.
if [ -d "${SPARK_HOME}/jars" ]; then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
echo "You need to build Spark with the target \"package\" before running this program." 1>&2
exit 1
else
LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi
# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
# For tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi
# 調(diào)用Main類生成命令
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
# Turn off posix mode since it does not allow process substitution
set +o posix
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <(build_command "$@")
COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
echo "${CMD[@]}" | head -n-1 1>&2
exit 1
fi
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
exit $LAUNCHER_EXIT_CODE
fi
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
spark-class主要是將參數(shù)交給org.apache.spark.launcher.Main
類執(zhí)行,然后獲取到一個(gè)新的命令缠黍,之后我們拿著這個(gè)命令執(zhí)行弄兜。
比如我們執(zhí)行下面的spark-submit語句:
spark-submit --queue up --deploy-mode cluster --master yarn --class org.apache.spark.examples.SparkPi /www/harbinger-spark/examples/jars/spark-examples_2.11-2.1.0.jar 10
經(jīng)過Main類解析后,就會(huì)變成下面的命令:
/www/jdk1.8.0_51/bin/java -cp /www/harbinger-spark/conf/:/www/harbinger-spark/jars/*:/www/harbinger-hadoop/etc/hadoop/ -Xmx52m org.apache.spark.deploy.SparkSubmit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi --queue up /www/harbinger-spark/examples/jars/spark-examples_2.11-2.1.0.jar 10
我們發(fā)現(xiàn)瓷式,最終又繞回來了替饿,還是通過java命令調(diào)用SparkSubmit類。
那么蒿往,為什么spark不直接運(yùn)行SparkSubmit盛垦,而是繞了一大圈通過Main類解析獲得命令然后再運(yùn)行呢湿弦?
二瓤漏、Main類的作用
spark-submit的命令解析主要是經(jīng)過SparkSubmitCommandBuilder#buildSparkSubmitCommand()方法,我們可以看一下源碼:
private List<String> buildSparkSubmitCommand(Map<String, String> env)
throws IOException, IllegalArgumentException {
//加載配置文件的配置
Map<String, String> config = getEffectiveConfig();
boolean isClientMode = isClientMode(config);
//獲取用戶指定的classPath
String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;
List<String> cmd = buildJavaCommand(extraClassPath);
// Take Thrift Server as daemon
if (isThriftServer(mainClass)) {
addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
}
addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
// We don't want the client to specify Xmx. These have to be set by their corresponding
// memory flag --driver-memory or configuration entry spark.driver.memory
String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
if (!isEmpty(driverExtraJavaOptions) && driverExtraJavaOptions.contains("Xmx")) {
String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " +
"java options (was %s). Use the corresponding --driver-memory or " +
"spark.driver.memory configuration instead.", driverExtraJavaOptions);
throw new IllegalArgumentException(msg);
}
if (isClientMode) {
String tsMemory =
isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
cmd.add("-Xmx" + memory);
addOptionString(cmd, driverExtraJavaOptions);
mergeEnvPathList(env, getLibPathEnvName(),
config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
}
cmd.add("org.apache.spark.deploy.SparkSubmit");
cmd.addAll(buildSparkSubmitArgs());
return cmd;
}
主要做的事情其實(shí)就是讀取各種配置然后往命令中添加一些參數(shù)。也就是對(duì)命令進(jìn)行加工蔬充。
其實(shí)添加參數(shù)這種事情直接在shell中也能做蝶俱,但是這個(gè)過程需要讀取配置文件,shell可能做起來比較麻煩饥漫。另外其他服務(wù)也會(huì)經(jīng)過Main類進(jìn)行加工榨呆,一些公共的代碼也可以抽象出來。所以庸队,這個(gè)Main類主要用于對(duì)命令的加工和轉(zhuǎn)換积蜻。
一些spark服務(wù),如果要修改一些服務(wù)的參數(shù)彻消,比如調(diào)整堆大小竿拆,就是在Main類中讀取相關(guān)的環(huán)境變量來設(shè)置的。比如SparkHistoryServer宾尚,Main類中會(huì)讀取環(huán)境變量SPARK_HISTORY_OPTS的值丙笋,然后在啟動(dòng)SparkHistoryServer時(shí)加上去。其他的服務(wù)也類似煌贴。另外御板,環(huán)境變量可以在"${SPARK_HOME}"/bin/load-spark-env.sh中設(shè)置,spark-class中會(huì)加載這個(gè)文件的配置牛郑。
三怠肋、SparkSubmit類提交任務(wù)的過程
SparkSubmit做的事情就是提交任務(wù)運(yùn)行。我們這里討論一下yarn模式的任務(wù)提交淹朋。
整個(gè)任務(wù)提交流程也比較好理解灶似,主要就是收集ApplicationMaster的上下文,比如ApplicationMaster的啟動(dòng)命令瑞你、資源文件酪惭、環(huán)境變量等,然后和yarn建立連接者甲,通過yarnClient提交ApplicationMaster到y(tǒng)arn上運(yùn)行春感。之后,不斷向yarn輪詢?nèi)蝿?wù)的狀態(tài)直到任務(wù)運(yùn)行結(jié)束虏缸。
因?yàn)檎麄€(gè)過程代碼比較多鲫懒,我們挑一些關(guān)鍵點(diǎn)進(jìn)行分析。
如何和ResourceManger建立連接
在yarn的模式下刽辙,spark會(huì)去讀取環(huán)境變量HADOOP_CONF_DIR
或者YARN_CONF_DIR
目錄下的配置文件窥岩,如果這兩個(gè)環(huán)境變量都沒找到,運(yùn)行spark-submit命令時(shí)就會(huì)報(bào)錯(cuò):
Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
at org.apache.spark.deploy.SparkSubmitArguments.validateSubmitArguments(SparkSubmitArguments.scala:256)
at org.apache.spark.deploy.SparkSubmitArguments.validateArguments(SparkSubmitArguments.scala:233)
at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:110)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
spark主要是為了讀取該目錄下的3個(gè)文件:core-site.xml宰缤、yarn-site.xml颂翼、hdfs-site.xml晃洒。
其中core-site.xml是hadoop的核心配置。讀取yarn-site.xml配置主要是為了獲取ResourceManger的地址朦乏,之后就可以通過rpc建立連接球及。而讀取hdfs-site.xml主要是要上傳需要資源文件到hdfs用。
所以呻疹,運(yùn)行spark-submit其實(shí)并不需要整個(gè)hadoop安裝包吃引,只需要將這三個(gè)配置文件放好然后設(shè)置一下HADOOP_CONF_DIR或者YARN_CONF_DIR環(huán)境變量即可。
提交任務(wù)到y(tǒng)arn的相關(guān)代碼在spark源碼的resource-managers/yarn目錄下刽锤。在使用maven編譯時(shí)镊尺,需要帶上 -Pyarn 才會(huì)將這些代碼打包進(jìn)去
spark任務(wù)配置的優(yōu)先級(jí)
在spark中,有三種方式可以設(shè)置參數(shù),這三種方法的優(yōu)先級(jí)從低到高依次是:
- 在 spark_default.conf 文件中配置
- 執(zhí)行spark-submit 時(shí)通過參數(shù)指定配置
- 在代碼中直接通過SparkConf的方法設(shè)置參數(shù)
比如我們?cè)?spark_default.conf 中設(shè)置了spark.executor.cores = 1并思,但是在spark-submit時(shí)又指定了--executor-cores 2
鹅心,這時(shí)真正的executor的core數(shù)量就是2,spark_default.conf 中的配置被覆蓋纺荧。
但是也有一些情況旭愧,可能只會(huì)用到spark_default.conf 文件中的配置或者spark-submit的參數(shù)配置。在代碼中設(shè)置是沒用的宙暇,比如在client模式下输枯,spark.driver.extraClassPath這參數(shù)必須在啟動(dòng)Driver的時(shí)候立馬設(shè)置,這時(shí)通過SparkConf設(shè)置等于沒設(shè)置占贫。
還有一種情況桃熄,我們?cè)趕park-submit中設(shè)置appName為"a",但是在SparkConf中又設(shè)置了appname為"b"型奥。這時(shí)我們?nèi)arn的頁面就會(huì)發(fā)現(xiàn)這個(gè)app的name還是"a"瞳收,不會(huì)被覆蓋。去SparkHisotryServer中這個(gè)app的name就是"b"厢汹。這個(gè)主要是因?yàn)閟park向yarn提交任務(wù)時(shí)Driver還未運(yùn)行螟深,此時(shí)獲取到的spark.app.name還是spark-submit設(shè)置的"a"。到了真正執(zhí)行烫葬,spark.app.name配置就變成"b"了界弧。
所以,雖然大多數(shù)的配置優(yōu)先級(jí)是那樣搭综,但是如果我們發(fā)現(xiàn)哪個(gè)配置沒生效垢箕,還是需要具體情況具體分析的。
spark尋找spark_default.conf文件的過程主要是先讀取SPARK_CONF_DIR環(huán)境變量兑巾,然后讀取 目錄下面的spark_default.conf文件条获。獲取SPARK_CONF_DIR沒設(shè)置,就讀取SPARK_HOME/conf目錄下的配置文件蒋歌。這時(shí)如果SPARK_HOME環(huán)境變量也沒設(shè)置帅掘,就會(huì)報(bào)錯(cuò)
client模式的真正運(yùn)行方式
spark提交請(qǐng)求的Application上下文中有一個(gè)command參數(shù)委煤,也就是告訴yarn怎么啟動(dòng)ApplicationMaster。我們發(fā)現(xiàn)在cluster模式下锄开,啟動(dòng)的ApplicationMaster是org.apache.spark.deploy.yarn.ApplicationMaster類素标,而在client模式下称诗,啟動(dòng)的ApplicationMaster是org.apache.spark.deploy.yarn.ExecutorLauncher萍悴。
其實(shí)ExecutorLauncher的main方法還是直接調(diào)用ApplicationMaster的main方法。之后在ApplicationMaster#run()方法中寓免,如果是client模式癣诱,會(huì)去連接運(yùn)行的客戶端機(jī)器上的Driver。之后做的事就是根據(jù)Driver的命令(也就是rpc請(qǐng)求)申請(qǐng)或者釋放Container資源了袜香。
之前經(jīng)常以為client模式下撕予,Driver就是ApplicationMaster,只是AppcationMaster運(yùn)行在客戶端服務(wù)器上而已蜈首。但是實(shí)際并不是這樣实抡。client模式下,Driver運(yùn)行在客戶端上欢策,ApplicationMaster還是運(yùn)行在yarn的Container中吆寨,只是這時(shí)這個(gè)ApplicationMaster只負(fù)責(zé)進(jìn)行資源的調(diào)度而已。