Spark讀取配置
我們知道僚匆,有一些配置可以在多個地方配置年堆。以配置executor的memory為例,有以下三種方式:
- spark-submit的
--executor-memory
選項 - spark-defaults.conf的
spark.executor.memory
配置 - spark-env.sh的
SPARK_EXECUTOR_MEMORY
配置
同一個配置可以在多處設置,這顯然會造成迷惑疾牲,不知道spark為什么到現(xiàn)在還保留這樣的邏輯。
如果我分別在這三處對executor的memory設置了不同的值衙解,最終在Application中生效的是哪個阳柔?
處理這一問題的類是SparkSubmitArguments
。在其構造函數(shù)中就完成了從 『spark-submit --選項』蚓峦、『spark-defaults.conf』舌剂、『spark-env.sh』中讀取配置济锄,并根據(jù)策略決定使用哪個配置。下面分幾步來分析這個重要的構造函數(shù)霍转。
Step0:讀取spark-env.sh配置并寫入環(huán)境變量中
SparkSubmitArguments的參數(shù)列表包含一個env: Map[String, String] = sys.env
參數(shù)荐绝。該參數(shù)包含一些系統(tǒng)環(huán)境變量的值和從spark-env.sh中讀取的配置值,如圖是我一個demo中env值的部分截圖
這一步之所以叫做Step0避消,是因為env的值在構造SparkSubmitArguments對象之前就確認低滩,即spark-env.sh
在構造SparkSubmitArguments對象前就讀取并將配置存入env中。
Step1:創(chuàng)建各配置成員并賦空值
這一步比較簡單岩喷,定義了所有要從『spark-submit --選項』委造、『spark-defaults.conf』、『spark-env.sh』中讀取的配置均驶,并賦空值昏兆。下面的代碼展示了其中一部分 :
var master: String = null
var deployMode: String = null
var executorMemory: String = null
var executorCores: String = null
var totalExecutorCores: String = null
var propertiesFile: String = null
var driverMemory: String = null
var driverExtraClassPath: String = null
var driverExtraLibraryPath: String = null
var driverExtraJavaOptions: String = null
var queue: String = null
var numExecutors: String = null
var files: String = null
var archives: String = null
var mainClass: String = null
var primaryResource: String = null
var name: String = null
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var jars: String = null
var packages: String = null
var repositories: String = null
var ivyRepoPath: String = null
var packagesExclusions: String = null
var verbose: Boolean = false
...
Step2:調用父類parse方法解析 spark-submit --選項
try {
parse(args.toList)
} catch {
case e: IllegalArgumentException => SparkSubmit.printErrorAndExit(e.getMessage())
}
這里調用父類的SparkSubmitOptionParser#parse(List<String> args)
。parse函數(shù)查找args中設置的--選項和值并解析為name和value妇穴,如--master yarn-client
會被解析為值為--master
的name和值為yarn-client
的value爬虱。這之后調用SparkSubmitArguments#handle(MASTER, "yarn-client")
進行處理。
來看看handle函數(shù)干了什么:
/** Fill in values by parsing user options. */
override protected def handle(opt: String, value: String): Boolean = {
opt match {
case NAME =>
name = value
case MASTER =>
master = value
case CLASS =>
mainClass = value
case DEPLOY_MODE =>
if (value != "client" && value != "cluster") {
SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
case NUM_EXECUTORS =>
numExecutors = value
case TOTAL_EXECUTOR_CORES =>
totalExecutorCores = value
case EXECUTOR_CORES =>
executorCores = value
case EXECUTOR_MEMORY =>
executorMemory = value
case DRIVER_MEMORY =>
driverMemory = value
case DRIVER_CORES =>
driverCores = value
case DRIVER_CLASS_PATH =>
driverExtraClassPath = value
...
case _ =>
throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
}
true
}
這個函數(shù)也很簡單腾它,根據(jù)參數(shù)opt及value跑筝,設置各個成員的值。接上例瞒滴,parse中調用handle("--master", "yarn-client")
后曲梗,在handle函數(shù)中,master成員將被賦值為yarn-client
妓忍。
注意虏两,case MASTER中的MASTER的值在SparkSubmitOptionParser
定義為--master
,MASTER與其他值定義如下:
protected final String MASTER = "--master";
protected final String CLASS = "--class";
protected final String CONF = "--conf";
protected final String DEPLOY_MODE = "--deploy-mode";
protected final String DRIVER_CLASS_PATH = "--driver-class-path";
protected final String DRIVER_CORES = "--driver-cores";
protected final String DRIVER_JAVA_OPTIONS = "--driver-java-options";
protected final String DRIVER_LIBRARY_PATH = "--driver-library-path";
protected final String DRIVER_MEMORY = "--driver-memory";
protected final String EXECUTOR_MEMORY = "--executor-memory";
protected final String FILES = "--files";
protected final String JARS = "--jars";
protected final String KILL_SUBMISSION = "--kill";
protected final String NAME = "--name";
protected final String PACKAGES = "--packages";
protected final String PACKAGES_EXCLUDE = "--exclude-packages";
protected final String PROPERTIES_FILE = "--properties-file";
protected final String PROXY_USER = "--proxy-user";
protected final String PY_FILES = "--py-files";
protected final String REPOSITORIES = "--repositories";
protected final String STATUS = "--status";
protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";
...
總結來說世剖,parse函數(shù)解析了spark-submit中的--選項定罢,并根據(jù)解析出的name和value給SparkSubmitArguments的各個成員(例如master、deployMode旁瘫、executorMemory等)設置值祖凫。
Step3:mergeDefaultSparkProperties加載spark-defaults.conf中配置
Step3讀取spark-defaults.conf中的配置文件并存入sparkProperties中,sparkProperties將在下一步中發(fā)揮作用
//< 保存從spark-defaults.conf讀取的配置
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
//< 獲取配置文件路徑酬凳,若在spark-env.sh中設置SPARK_CONF_DIR惠况,則以該值為準;否則為 $SPARK_HOME/conf/spark-defaults.conf
def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
env.get("SPARK_CONF_DIR")
.orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
.map { t => new File(s"$t${File.separator}spark-defaults.conf")}
.filter(_.isFile)
.map(_.getAbsolutePath)
.orNull
}
//< 讀取spark-defaults.conf配置并存入sparkProperties中
private def mergeDefaultSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(env))
// Honor --conf before the defaults file
defaultSparkProperties.foreach { case (k, v) =>
if (!sparkProperties.contains(k)) {
sparkProperties(k) = v
}
}
}
Step4:loadEnvironmentArguments確認每個配置成員最終值
先來看看代碼(由于篇幅太長宁仔,省略了一部分)
private def loadEnvironmentArguments(): Unit = {
master = Option(master)
.orElse(sparkProperties.get("spark.master"))
.orElse(env.get("MASTER"))
.orNull
driverExtraClassPath = Option(driverExtraClassPath)
.orElse(sparkProperties.get("spark.driver.extraClassPath"))
.orNull
driverExtraJavaOptions = Option(driverExtraJavaOptions)
.orElse(sparkProperties.get("spark.driver.extraJavaOptions"))
.orNull
driverExtraLibraryPath = Option(driverExtraLibraryPath)
.orElse(sparkProperties.get("spark.driver.extraLibraryPath"))
.orNull
driverMemory = Option(driverMemory)
.orElse(sparkProperties.get("spark.driver.memory"))
.orElse(env.get("SPARK_DRIVER_MEMORY"))
.orNull
...
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull
// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && !isR && primaryResource != null) {
val uri = new URI(primaryResource)
val uriScheme = uri.getScheme()
uriScheme match {
case "file" =>
try {
val jar = new JarFile(uri.getPath)
// Note that this might still return null if no main-class is set; we catch that later
mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
} catch {
case e: Exception =>
SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
}
case _ =>
SparkSubmit.printErrorAndExit(
s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
"Please specify a class through --class.")
}
}
// Global defaults. These should be keep to minimum to avoid confusing behavior.
master = Option(master).getOrElse("local[*]")
// In YARN mode, app name can be set via SPARK_YARN_APP_NAME (see SPARK-5222)
if (master.startsWith("yarn")) {
name = Option(name).orElse(env.get("SPARK_YARN_APP_NAME")).orNull
}
// Set name from main class if not given
name = Option(name).orElse(Option(mainClass)).orNull
if (name == null && primaryResource != null) {
name = Utils.stripDirectory(primaryResource)
}
// Action should be SUBMIT unless otherwise specified
action = Option(action).getOrElse(SUBMIT)
}
我們單獨以確定master值的那部分代碼來說明稠屠,相關代碼如下
master = Option(master)
.orElse(sparkProperties.get("spark.master"))
.orElse(env.get("MASTER"))
.orNull
// Global defaults. These should be keep to minimum to avoid confusing behavior.
master = Option(master).getOrElse("local[*]")
確定master的值的步驟如下:
- Option(master):若master值不為null,則以master為準;否則進入2完箩。若master不為空赐俗,從上文的分析我們可以知道是從解析spark-submit --master選項得到的值
- .orElse(sparkProperties.get("spark.master")):若sparkProperties.get("spark.master")范圍非null則以該返回值為準拉队;否則進入3弊知。從Step3中可以知道sparkProperties中的值都是從spark-defaults.conf中讀取
- .orElse(env.get("MASTER")):若env.get("MASTER")返回非null,則以該返回值為準粱快;否則進入4秩彤。env中的值從spark-env.sh讀取而來
- 若以上三處均為設置master,則取默認值local[*]
查看其余配置成員的值的決定過程也和master一致事哭,稍有不同的是并不是所有配置都能在spark-defaults.conf漫雷、spark-env.sh和spark-submit選項中設置。但優(yōu)先級還是一致的鳍咱。
由此降盹,我們可以得出結論,對于spark配置谤辜。若一個配置在多處設置蓄坏,則優(yōu)先級如下:
spark-submit --選項 > spark-defaults.conf配置 > spark-env.sh配置 > 默認值
最后,附上流程圖