目錄
前言
從本文開始,討論Spark基礎支撐子系統(tǒng)的具體實現(xiàn)冷离。首先來看WordCount中最先出現(xiàn)的SparkConf挠乳。
上一篇已經講過囚似,SparkConf類負責管理Spark的所有配置項潮针。在我們使用Spark的過程中塘匣,經常需要靈活配置各種參數(shù)净当,來使程序更好符匾、更快地運行,因此也必然要與SparkConf類頻繁打交道害碾。了解它的細節(jié)不無裨益矢劲。
SparkConf類的構造方法
下面先來看一看SparkConf類的構造方法。為了讀起來清晰明了慌随,可能會在不影響理解的前提下適當刪去無關代碼芬沉、注釋,并調整順序阁猜。
代碼#1.1 - o.a.s.SparkConf類的構造方法
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
import SparkConf._
if (loadDefaults) {
loadFromSystemProperties(false)
}
def this() = this(true)
// ...
}
代碼#1.1中的import語句是從SparkConf類的伴生對象中導入一些東西丸逸,它們主要管理過期的、舊版本兼容的配置項蹦漠,以及日志輸出椭员。Scala中沒有Java的靜態(tài)(static)概念,類的伴生對象中維護的成員和方法就可以視為類的靜態(tài)成員和靜態(tài)方法笛园。
SparkConf類有一個主構造方法參數(shù)loadDefaults隘击,它指示是否要從Java系統(tǒng)屬性(即System.getProperties()取得的屬性)加載默認的與Spark相關的配置侍芝。
Spark配置項的存儲
SparkConf內部是采用ConcurrentHashMap來維護所有配置項鍵值的。
代碼#1.2 - o.a.s.SparkConf.settings字段
private val settings = new ConcurrentHashMap[String, String]()
這自然是考慮到了并發(fā)環(huán)境下的線程安全性問題埋同。另外州叠,它的鍵與值類型都為String,說明所有Spark配置項都以字符串形式存儲凶赁。
設置配置項
要設置Spark配置項咧栗,有以下三種方法。
直接用Set類方法設置
這是我們開發(fā)過程中最常用的方法虱肄。SparkConf中提供了多種多樣的Set類方法致板,最基礎的set()方法重載如下。
代碼#1.3 - o.a.s.SparkConf.set()方法
def set(key: String, value: String): SparkConf = {
set(key, value, false)
}
private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
if (key == null) {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value for " + key)
}
if (!silent) {
logDeprecationWarning(key)
}
settings.put(key, value)
this
}
可見配置項的鍵值都不能為null咏窿。并且包括set()在內的所有Set類方法都返回this斟或,所以支持鏈式調用,這樣使用起來比較簡潔集嵌。
另外萝挤,還有一些方法可以快速設置常用配置項,比如上篇代碼#0.1中出現(xiàn)過的setMaster()與setAppName()根欧。它們最終也會調用set()方法怜珍。
代碼#1.4 - o.a.s.SparkConf.setAppName()與setMaster()方法
def setMaster(master: String): SparkConf = {
set("spark.master", master)
}
def setAppName(name: String): SparkConf = {
set("spark.app.name", name)
}
通過系統(tǒng)屬性加載
如果上述代碼#1.1中的loadDefaults參數(shù)為true,那么SparkConf會從Java系統(tǒng)屬性中加載配置項凤粗。如果調用無參的輔助構造方法酥泛,即直接new SparkConf()的話,也會將loadDefaults設為true嫌拣。Java系統(tǒng)屬性可以通過System.setProperty()方法在程序中動態(tài)設置揭璃。
來看代碼#1.1中調用的loadFromSystemProperties()方法。
代碼#1.5 - o.a.s.SparkConf.loadFromSystemProperties()方法
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value, silent)
}
this
}
它使用通用工具類Utils中的方法取得系統(tǒng)屬性亭罪,過濾出以字符串“spark.”為前綴的鍵,然后調用set()方法設置鍵值歼秽。由于系統(tǒng)屬性相關的參數(shù)是一次性初始化的应役,所以用Set類方法設置的值可以覆蓋它們。
克隆SparkConf
SparkConf類繼承了Cloneable特征(trait燥筷,類似于Java接口的增強版)并覆寫了clone()方法箩祥,因此SparkConf是可以(深)克隆的。
代碼#1.6 - o.a.s.SparkConf.clone()方法
override def clone: SparkConf = {
val cloned = new SparkConf(false)
settings.entrySet().asScala.foreach { e =>
cloned.set(e.getKey(), e.getValue(), true)
}
cloned
}
雖然ConcurrentHashMap保證線程安全肆氓,不會影響SparkConf實例共享袍祖,但在高并發(fā)的情況下,鎖機制可能會帶來性能問題谢揪。我們就可以克隆SparkConf到多個組件中蕉陋,以讓它們獲得相同的配置參數(shù)捐凭。
獲取配置項
獲取配置項只有一個途徑,即調用Get類方法凳鬓。Get類方法同樣有很多實現(xiàn)茁肠,基礎的get()與getOption()如下所示。
代碼#1.7 - o.a.s.SparkConf.get()與getOption()方法
def get(key: String): String = {
getOption(key).getOrElse(throw new NoSuchElementException(key))
}
def get(key: String, defaultValue: String): String = {
getOption(key).getOrElse(defaultValue)
}
def getOption(key: String): Option[String] = {
Option(settings.get(key)).orElse(getDeprecatedConfig(key, settings))
}
獲取配置項時缩举,會同時檢查過期配置(getDeprecatedConfig()方法是伴生對象中定義的)垦梆,并且會使用Scala Option來包裝返回的結果,對于有值(Some)和無值(None)的情況可以靈活處理仅孩。
另外托猩,Get類方法中有不少涉及數(shù)據(jù)類型轉換和單位轉換,如getDouble()辽慕、getLong()京腥、getSizeAsMb()、getTimeAsSeconds()等等鼻百,都是為了使用方便绞旅,不再贅述。
校驗配置項
SparkConf中有一個方法validateSettings()温艇,用來校驗配置項因悲。它的源碼很長,但是邏輯比較簡單勺爱,主要是對過期配置項進行警告晃琳,以及對非法設置或不兼容的配置項拋出異常。
限于篇幅原因琐鲁,這里就不貼出該方法的源碼了卫旱。感興趣的看官可以自己找找看,里面校驗了大量之后一定會用到的配置項围段。
總結
本文通過SparkConf類的部分源碼顾翼,簡述了SparkConf的構造方法、配置存儲奈泪,以及設置适贸、獲取、校驗配置項的方法邏輯涝桅。
SparkConf是SparkContext初始化的必備前提拜姿。了解了SparkConf,就可以分析復雜得多的SparkContext了冯遂。