本文將解讀使用spark-shell的方式進(jìn)入REPL的具體流程。
注:本專題的文章皆使用Spark-1.6.3版本的源碼為參考涎拉,如果Spark-2.1.0版本有重大改進(jìn)的地方也會進(jìn)行說明瑞侮。
shell部分
下面我們來看一下當(dāng)我們輸入 spark-shell --master spark://master:7077時具體的執(zhí)行流程,首先當(dāng)然是看一下spark-shell.sh的源碼鼓拧,我們只選取了相對比較重要的部分:
##檢測有沒有設(shè)置SPARK_HOME環(huán)境變量半火,如果沒有進(jìn)行設(shè)置
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
##...
function main() {
if $cygwin; then
# Workaround for issue involving JLine and Cygwin
# (see http://sourceforge.net/p/jline/bugs/40/).
# If you're using the Mintty terminal emulator in Cygwin, may need to set the
# "Backspace sends ^H" setting in "Keys" section of the Mintty options
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
fi
}
##...
main "$@"
##...
可以看出最后執(zhí)行的是main方法并傳入我們使用spark-shell命令時候的所有參數(shù),比如--master季俩,而main方法中無論是什么操作系統(tǒng)(當(dāng)然生產(chǎn)環(huán)境是linux系統(tǒng))都會最終執(zhí)行spark-submit钮糖,并且class為org.apache.spark.repl.Main、name為“Spark shell”并且將spark-shell所有接收到的用戶輸入的參數(shù)一起傳進(jìn)去酌住,下面我們來看spark-submit:
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
spark-submit的代碼比較簡潔店归,最后使用exec通過spark-class來啟動SparkSubmit并將spark-submit接收到的所有參數(shù)傳入,下面我們來看一下spark-class:(這里要說明一下酪我,從這里開始起始就是我們通過spark-submit提交application的過程消痛,只不過spark-submit提交的時候后面跟的是用戶自己編寫的類,而通過spark-shell過來的spark-submit后面跟的是org.apache.spark.repl.Main都哭,spark-submit方式提交的application運(yùn)行完成后就會結(jié)束秩伞,而通過spark-shell進(jìn)入的REPL一直等待用戶的輸入)
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
## 載入環(huán)境變量
. "${SPARK_HOME}"/bin/load-spark-env.sh
## 獲得java的二進(jìn)制文件,后面會用來啟動一個JVM進(jìn)行
# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ `command -v java` ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
## jar包的相關(guān)依賴
# Find assembly jar
SPARK_ASSEMBLY_JAR=
if [ -f "${SPARK_HOME}/RELEASE" ]; then
ASSEMBLY_DIR="${SPARK_HOME}/lib"
else
ASSEMBLY_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION"
fi
GREP_OPTIONS=
num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" -a "$SPARK_PREPEND_CLASSES" != "1" ]; then
echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2
echo "You need to build Spark before running this program." 1>&2
exit 1
fi
if [ -d "$ASSEMBLY_DIR" ]; then
ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
if [ "$num_jars" -gt "1" ]; then
echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
echo "$ASSEMBLY_JARS" 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi
fi
SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"
LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"
# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"
# For tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi
# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
## 使用java -cp命令啟動一個JVM進(jìn)程并執(zhí)行org.apache.spark.launcher.Main類的main方法欺矫,后面我們會看到這個進(jìn)程就是SparkSubmit進(jìn)程
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
exec "${CMD[@]}"
spark-class是Spark應(yīng)用程序的命令行啟動器纱新,負(fù)責(zé)設(shè)置JVM環(huán)境并執(zhí)行Spark的應(yīng)用程序,這里我們執(zhí)行的就是SparkSubmit汇陆,下面我們就進(jìn)入到Spark源碼的部分。
Spark源碼部分
承接上文带饱,我們直接進(jìn)入Spark的源碼:
關(guān)于org.apache.spark.launcher.Main的源碼我們這里不做說明毡代,大家可以參考我的另一篇文章(對Main類的補(bǔ)充說明)阅羹,簡單來說它會根據(jù)傳入的類進(jìn)行判斷然后生成相應(yīng)的command,最后交給exec來執(zhí)行教寂,我們現(xiàn)在主要關(guān)注Spark本身捏鱼,所以直接進(jìn)入SparkSubmit的源碼部分:
def main(args: Array[String]): Unit = {
/** 使用SparkSubmitArguments封裝spark-submit傳入的參數(shù),還記得都有什么嗎酪耕?
* 如果是spark-shell导梆,就包括spark-shell及后面的一串參數(shù),如果是直接使用spark-submit進(jìn)行提交
* 后面就是提交時傳入的參數(shù)迂烁,由于SparkSubmitArguments中的參數(shù)比較多看尼,本文中不再一一列出
* 會在使用到某個參數(shù)的時候進(jìn)行說明,詳細(xì)的參數(shù)可以參看SparkSubmitArguments的源碼盟步。
*/
val appArgs = new SparkSubmitArguments(args)
// 如果開啟了debug模式就打印出參數(shù)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
/** 這里的action就是spark-submit執(zhí)行的動作藏斩,包括:SUBMIT, KILL, REQUEST_STATUS(使
* 用了SparkSubmitAction進(jìn)行了封裝),如果沒有指定却盘,默認(rèn)就是SparkSubmitAction.SUBMIT狰域,
* 所以下面的這個模式匹配將執(zhí)行submit(appArgs)
*/
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
下面我們來看submit(appArgs)方法:
/**
* submit方法的主要功能就是使用傳進(jìn)來的參數(shù)來提交應(yīng)用程序。
* 主要分為兩步驟:
* 1. 準(zhǔn)備啟動所需的環(huán)境黄橘,包括設(shè)置classpath兆览、系統(tǒng)參數(shù)和應(yīng)用程序的參數(shù)(根據(jù)部署模式和cluster
* manager運(yùn)行child main類)。
* 2. 使用上一步準(zhǔn)備好的環(huán)境調(diào)用child main class中的main函數(shù)塞关,我們這里只考慮client模式抬探,
* cluster模式我們以后會單獨(dú)分析。
* 所以如果是spark-shell描孟,child main class就是org.apache.spark.repl.Main驶睦,如果是
* spark-submit直接進(jìn)行提交,child main class就是用戶編寫的應(yīng)用程序(含有main方法的類)
*/
private def submit(args: SparkSubmitArguments): Unit = {
// 準(zhǔn)備環(huán)境匿醒,主要就是獲得childMainClass场航,即我們上面所說的child main class
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
// 注意:源碼中這里是doRunMain()方法,我們在后面單獨(dú)拿出來進(jìn)行分析
// 判斷gateway使用的是Akka還是基于REST的廉羔,但是不論那種方式最后都會調(diào)用doRunMain()方法
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
// scalastyle:off println
printStream.println("Running Spark using the REST application submission protocol.")
// scalastyle:on println
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args)
}
// In all other modes, just run the main class as prepared
} else {
doRunMain()
}
}
doRunMain()的實(shí)現(xiàn)部分:
def doRunMain(): Unit = {
if (args.proxyUser != null) {
// 這里是hadoop相關(guān)的用戶和組的信息
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
我們看到doRunMain()內(nèi)部最終都執(zhí)行了runMain方法溉痢,所以我們進(jìn)入runMain方法:
/** 別看這個方法這么長,主要做的事情就是一件:運(yùn)行child main class的main方法
再次說明一下憋他,如果是直接使用spark-submit提交的應(yīng)用程序孩饼,就是執(zhí)行用戶指定的類的main方法
如果是通過spark-shell執(zhí)行的,就是執(zhí)行org.apache.spark.repl.Main中的main方法
*/
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
//是否打印debug信息
// scalastyle:off println
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
}
// scalastyle:on println
// 下面這些操作是指定當(dāng)前運(yùn)行線程的ClassLoader
val loader =
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
// 添加jar依賴
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
// 系統(tǒng)屬性
for ((key, value) <- sysProps) {
System.setProperty(key, value)
}
var mainClass: Class[_] = null
// 通過反射的方式獲得mainClass(child main class)
try {
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
// scalastyle:off println
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
e.printStackTrace(printStream)
if (e.getMessage.contains("org/apache/hadoop/hive")) {
// scalastyle:off println
printStream.println(s"Failed to load hive class.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
// 獲得mainClass(child main class)的main方法
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
// main方法必須是static級別的
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable =>
e
}
// 最后調(diào)用main方法
try {
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
}
走到這里竹挡,如果是用戶通過spark-submit提交自己編寫的spark application镀娶,那么就直接調(diào)用main方法,然后一步一步執(zhí)行用戶編寫的代碼:SparkContext等等揪罕,我們會在以后的文章中進(jìn)行分析梯码,所以我們現(xiàn)在要跟隨的就是org.apache.spark.repl.Main中的main方法(注意本文中我們只討論client的模式宝泵,至于cluster的模式我們會單獨(dú)進(jìn)行分析),這里我們貼出SparkSubmit進(jìn)程中主線程的thread dump:
java.io.FileInputStream.read0(Native Method)
java.io.FileInputStream.read(FileInputStream.java:207)
scala.tools.jline.TerminalSupport.readCharacter(TerminalSupport.java:152)
scala.tools.jline.UnixTerminal.readVirtualKey(UnixTerminal.java:125)
scala.tools.jline.console.ConsoleReader.readVirtualKey(ConsoleReader.java:933)
scala.tools.jline.console.ConsoleReader.readBinding(ConsoleReader.java:1136)
scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1218)
scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1170)
org.apache.spark.repl.SparkJLineReader.readOneLine(SparkJLineReader.scala:80)
scala.tools.nsc.interpreter.InteractiveReader$class.readLine(InteractiveReader.scala:43)
org.apache.spark.repl.SparkJLineReader.readLine(SparkJLineReader.scala:25)
org.apache.spark.repl.SparkILoop.readOneLine$1(SparkILoop.scala:648)
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
org.apache.spark.repl.Main$.main(Main.scala:31)
org.apache.spark.repl.Main.main(Main.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
在這個時候貼出來就是為了承上啟下轩娶,我們可以清楚的看見(注意是從最后一行往上看)前面我們分析的過程儿奶,從SparkSubmit的main方法到submit、doRunMain鳄抒、runMain到最后通過反射的方式調(diào)用org.apache.spark.repl.Main的main方法闯捎,整個流程都看的很清楚,所以下面我們進(jìn)入org.apache.spark.repl.Main的main方法(包含了初始化的操作):
// 實(shí)例化SparkConf
val conf = new SparkConf()
// 設(shè)置各種文件路徑
val tmp = System.getProperty("java.io.tmpdir")
val rootDir = conf.get("spark.repl.classdir", tmp)
val outputDir = Utils.createTempDir(rootDir)
val s = new Settings()
s.processArguments(List("-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
"-classpath", getAddedJars.mkString(File.pathSeparator)), true)
// the creation of SecurityManager has to be lazy so SPARK_YARN_MODE is set if needed
val classServerPort = conf.getInt("spark.replClassServer.port", 0)
// 實(shí)例化了HttpServer许溅,注意這里是lazy級別的
lazy val classServer =
new HttpServer(conf, outputDir, new SecurityManager(conf), classServerPort, "HTTP class server")
var sparkContext: SparkContext = _
var sqlContext: SQLContext = _
// 實(shí)例化了SparkILoop瓤鼻,接下來會詳細(xì)的分析
var interp = new SparkILoop // this is a public var because tests reset it.
// 執(zhí)行一些初始化的處理后就執(zhí)行main方法
def main(args: Array[String]) {
// 判斷是否為yarn的模式,我們在以后的文章中會專門的分析yarn的部署模式
if (getMaster == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
// Start the classServer and store its URI in a spark system property
// (which will be passed to executors so that they can connect to it)
// 啟動HTTP server
classServer.start()
// 最關(guān)鍵的代碼闹司,讓解釋器循環(huán)執(zhí)行娱仔,即REPL
interp.process(s) // Repl starts and goes in loop of R.E.P.L
classServer.stop()
Option(sparkContext).map(_.stop)
}
寫到這里我們再來貼出通過spark-shell進(jìn)入REPL時打印的部分日志:
17/02/21 13:40:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/21 13:40:17 INFO spark.SecurityManager: Changing view acls to: root
17/02/21 13:40:17 INFO spark.SecurityManager: Changing modify acls to: root
17/02/21 13:40:17 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/02/21 13:40:18 INFO spark.HttpServer: Starting HTTP Server
17/02/21 13:40:18 INFO server.Server: jetty-8.y.z-SNAPSHOT
17/02/21 13:40:18 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:43773
17/02/21 13:40:18 INFO util.Utils: Successfully started service 'HTTP class server' on port 43773.
上面這段日志其實(shí)對應(yīng)的就是classServer.start()的部分,以后我們再看到這些日志的時候就知道背后到底發(fā)生了什么游桩,是不是很有成就感牲迫?
下面就進(jìn)入SparkILoop和ILoop的部分(SparkILoop是繼承自ILoop類,而SparkILoop中沒有process方法借卧,所以調(diào)用的實(shí)際上是ILoop類中的process方法):
ILoop
// 啟動解釋器盹憎,用來解釋用戶輸入的command
// start an interpreter with the given settings
def process(settings: Settings): Boolean = savingContextLoader {
this.settings = settings
// 創(chuàng)建解釋器,內(nèi)部其實(shí)是實(shí)例化了一個ILoopInterpreter
createInterpreter()
// sets in to some kind of reader depending on environmental cues
in = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true))
globalFuture = future {
intp.initializeSynchronous()
loopPostInit()
!intp.reporter.hasErrors
}
// 這里應(yīng)該調(diào)用的是其子類SparkILoop的loadFiles方法铐刘,而SparkILoop的loadFiles方法內(nèi)部最后又會調(diào)用這里的loadFiles方法
loadFiles(settings)
printWelcome()
// 一直循環(huán)接收用戶輸入的command
try loop() match {
case LineResults.EOF => out print Properties.shellInterruptedString
case _ =>
}
catch AbstractOrMissingHandler()
finally closeInterpreter()
true
}
我們先來看一下SparkILoop的loadFiles方法都做了什么:
override def loadFiles(settings: Settings): Unit = {
initializeSpark()
super.loadFiles(settings)
}
可以看到首先調(diào)用initializeSpark()方法陪每,然后調(diào)用父類的loadFiles方法,目的就是先準(zhǔn)備好SparkContext镰吵、SQLContext然后再執(zhí)行后面的操作檩禾,方便我們在進(jìn)入到REPL后直接可以訪問sc、sqlContext等疤祭,所以我們現(xiàn)在明白了為什么我們可以直接在spark-shell中直接訪問sc盼产、sqlContext了(成就感爆棚有木有?)勺馆。說了這么多戏售,我們看一下initializeSpark()的廬山真面目:
def initializeSpark() {
intp.beQuietDuring {
processLine("""
@transient val sc = {
val _sc = org.apache.spark.repl.Main.createSparkContext()
println("Spark context available as sc.")
_sc
}
""")
processLine("""
@transient val sqlContext = {
val _sqlContext = org.apache.spark.repl.Main.createSQLContext()
println("SQL context available as sqlContext.")
_sqlContext
}
""")
processLine("import org.apache.spark.SparkContext._")
processLine("import sqlContext.implicits._")
processLine("import sqlContext.sql")
processLine("import org.apache.spark.sql.functions._")
}
}
這里寫的就非常清楚了通過processLine來創(chuàng)建SparkContext、SQLContext并導(dǎo)入一些經(jīng)常使用的包草穆,都準(zhǔn)備完成后再調(diào)用父類的loadFiles灌灾,然后調(diào)用printWelcome(),注意這里調(diào)用的是SparkILoop的printWelcome()方法:
/** Print a welcome message */
override def printWelcome() {
import org.apache.spark.SPARK_VERSION
echo("""Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version %s
/_/
""".format(SPARK_VERSION))
val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
versionString, javaVmName, javaVersion)
echo(welcomeMsg)
echo("Type in expressions to have them evaluated.")
echo("Type :help for more information.")
}
咦悲柱?這貨看著是不是很眼熟锋喜,對,這就是spark-shell中打印的日志:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.3
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.
17/02/21 13:40:24 INFO spark.SparkContext: Running Spark version 1.6.3
17/02/21 13:40:24 INFO spark.SecurityManager: Changing view acls to: root
17/02/21 13:40:24 INFO spark.SecurityManager: Changing modify acls to: root
17/02/21 13:40:24 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/02/21 13:40:25 INFO util.Utils: Successfully started service 'sparkDriver' on port 38463.
17/02/21 13:40:26 INFO slf4j.Slf4jLogger: Slf4jLogger started
17/02/21 13:40:26 INFO Remoting: Starting remoting
17/02/21 13:40:26 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@172.17.0.2:37221]
17/02/21 13:40:26 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 37221.
17/02/21 13:40:26 INFO spark.SparkEnv: Registering MapOutputTracker
17/02/21 13:40:26 INFO spark.SparkEnv: Registering BlockManagerMaster
17/02/21 13:40:26 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-a06685a8-6f1c-4e8f-805c-e232333f8d85
17/02/21 13:40:26 INFO storage.MemoryStore: MemoryStore started with capacity 511.1 MB
17/02/21 13:40:27 INFO spark.SparkEnv: Registering OutputCommitCoordinator
17/02/21 13:40:27 INFO server.Server: jetty-8.y.z-SNAPSHOT
17/02/21 13:40:27 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
17/02/21 13:40:27 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
17/02/21 13:40:27 INFO ui.SparkUI: Started SparkUI at http://172.17.0.2:4040
17/02/21 13:40:27 INFO client.AppClient$ClientEndpoint: Connecting to master spark://master:7077...
17/02/21 13:40:28 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20170221134027-0000
17/02/21 13:40:28 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44615.
17/02/21 13:40:28 INFO netty.NettyBlockTransferService: Server created on 44615
17/02/21 13:40:28 INFO storage.BlockManagerMaster: Trying to register BlockManager
17/02/21 13:40:28 INFO storage.BlockManagerMasterEndpoint: Registering block manager 172.17.0.2:44615 with 511.1 MB RAM, BlockManagerId(driver, 172.17.0.2, 44615)
17/02/21 13:40:28 INFO storage.BlockManagerMaster: Registered BlockManager
17/02/21 13:40:28 INFO client.AppClient$ClientEndpoint: Executor added: app-20170221134027-0000/0 on worker-20170221133811-172.17.0.3-41829 (172.17.0.3:41829) with 2 cores
17/02/21 13:40:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20170221134027-0000/0 on hostPort 172.17.0.3:41829 with 2 cores, 1024.0 MB RAM
17/02/21 13:40:28 INFO client.AppClient$ClientEndpoint: Executor added: app-20170221134027-0000/1 on worker-20170221133810-172.17.0.4-39901 (172.17.0.4:39901) with 2 cores
17/02/21 13:40:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20170221134027-0000/1 on hostPort 172.17.0.4:39901 with 2 cores, 1024.0 MB RAM
17/02/21 13:40:29 INFO client.AppClient$ClientEndpoint: Executor updated: app-20170221134027-0000/1 is now RUNNING
17/02/21 13:40:29 INFO client.AppClient$ClientEndpoint: Executor updated: app-20170221134027-0000/0 is now RUNNING
17/02/21 13:40:45 INFO scheduler.EventLoggingListener: Logging events to hdfs://master:9000/historyserverforspark/app-20170221134027-0000
17/02/21 13:40:45 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
17/02/21 13:40:45 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
17/02/21 13:40:46 INFO cluster.SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (worker1:60096) with ID 0
17/02/21 13:40:46 INFO cluster.SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (worker2:46846) with ID 1
17/02/21 13:40:47 INFO storage.BlockManagerMasterEndpoint: Registering block manager worker1:39275 with 511.1 MB RAM, BlockManagerId(0, worker1, 39275)
17/02/21 13:40:47 INFO storage.BlockManagerMasterEndpoint: Registering block manager worker2:37449 with 511.1 MB RAM, BlockManagerId(1, worker2, 37449)
17/02/21 13:40:50 INFO hive.HiveContext: Initializing execution hive, version 1.2.1
17/02/21 13:40:51 INFO client.ClientWrapper: Inspected Hadoop version: 2.6.0
17/02/21 13:40:51 INFO client.ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0
17/02/21 13:40:52 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
17/02/21 13:40:53 INFO metastore.ObjectStore: ObjectStore, initialize called
17/02/21 13:40:53 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
17/02/21 13:40:53 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored
17/02/21 13:40:54 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:40:55 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:41:01 INFO metastore.ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
17/02/21 13:41:08 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:08 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:15 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:15 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:17 INFO metastore.MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
17/02/21 13:41:17 INFO metastore.ObjectStore: Initialized ObjectStore
17/02/21 13:41:18 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/02/21 13:41:18 WARN metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException
17/02/21 13:41:19 INFO metastore.HiveMetaStore: Added admin role in metastore
17/02/21 13:41:19 INFO metastore.HiveMetaStore: Added public role in metastore
17/02/21 13:41:19 INFO metastore.HiveMetaStore: No user is added in admin role, since config is empty
17/02/21 13:41:20 INFO metastore.HiveMetaStore: 0: get_all_databases
17/02/21 13:41:20 INFO HiveMetaStore.audit: ugi=root ip=unknown-ip-addr cmd=get_all_databases
17/02/21 13:41:20 INFO metastore.HiveMetaStore: 0: get_functions: db=default pat=*
17/02/21 13:41:20 INFO HiveMetaStore.audit: ugi=root ip=unknown-ip-addr cmd=get_functions: db=default pat=*
17/02/21 13:41:20 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:21 INFO session.SessionState: Created local directory: /tmp/939dedb5-f724-461b-a41a-a5fd1fe7324b_resources
17/02/21 13:41:21 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/939dedb5-f724-461b-a41a-a5fd1fe7324b
17/02/21 13:41:21 INFO session.SessionState: Created local directory: /tmp/root/939dedb5-f724-461b-a41a-a5fd1fe7324b
17/02/21 13:41:21 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/939dedb5-f724-461b-a41a-a5fd1fe7324b/_tmp_space.db
17/02/21 13:41:22 INFO hive.HiveContext: default warehouse location is /user/hive/warehouse
17/02/21 13:41:22 INFO hive.HiveContext: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
17/02/21 13:41:22 INFO client.ClientWrapper: Inspected Hadoop version: 2.6.0
17/02/21 13:41:22 INFO client.ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0
17/02/21 13:41:23 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
17/02/21 13:41:23 INFO metastore.ObjectStore: ObjectStore, initialize called
17/02/21 13:41:23 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
17/02/21 13:41:23 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored
17/02/21 13:41:24 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:41:24 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:41:25 INFO metastore.ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:30 INFO DataNucleus.Query: Reading in results for query "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is closing
17/02/21 13:41:30 INFO metastore.MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
17/02/21 13:41:30 INFO metastore.ObjectStore: Initialized ObjectStore
17/02/21 13:41:30 INFO metastore.HiveMetaStore: Added admin role in metastore
17/02/21 13:41:30 INFO metastore.HiveMetaStore: Added public role in metastore
17/02/21 13:41:30 INFO metastore.HiveMetaStore: No user is added in admin role, since config is empty
17/02/21 13:41:30 INFO metastore.HiveMetaStore: 0: get_all_databases
17/02/21 13:41:30 INFO HiveMetaStore.audit: ugi=root ip=unknown-ip-addr cmd=get_all_databases
17/02/21 13:41:30 INFO metastore.HiveMetaStore: 0: get_functions: db=default pat=*
17/02/21 13:41:30 INFO HiveMetaStore.audit: ugi=root ip=unknown-ip-addr cmd=get_functions: db=default pat=*
17/02/21 13:41:30 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:30 INFO session.SessionState: Created local directory: /tmp/c9c26571-1229-4786-8a8e-d7b090b07d85_resources
17/02/21 13:41:30 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/c9c26571-1229-4786-8a8e-d7b090b07d85
17/02/21 13:41:30 INFO session.SessionState: Created local directory: /tmp/root/c9c26571-1229-4786-8a8e-d7b090b07d85
17/02/21 13:41:30 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/c9c26571-1229-4786-8a8e-d7b090b07d85/_tmp_space.db
17/02/21 13:41:30 INFO repl.SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.
Welcome后面的一大串就是上面initializeSpark()執(zhí)行打的日志信息豌鸡,現(xiàn)在所有的日志信息都“名花有主”了嘿般,我們會單獨(dú)拿出文章來分析SparkContext轴总、SQLContext的創(chuàng)建流程,下面我們看process方法中最后就一直進(jìn)行l(wèi)oop操作博个,這里我們不再深入的分析下去了,我們要適可而止功偿,否則會迷失在源碼中盆佣,大家可以簡單的理解其實(shí)這里的循環(huán)過程就是REPL所代表的意思,即Read:讀取用戶輸入的command械荷;Evaluation:通過Spark Framework執(zhí)行command共耍;P:print打計(jì)算結(jié)果;L:loop循環(huán)前面的流程吨瞎,同時在讀取command后需要進(jìn)行語法解析痹兜,然后用解釋器執(zhí)行,有興趣的朋友可以繼續(xù)跟隨源碼走下去颤诀。
至此我們走完了整個spark-shell(包括spark-submit)的整個流程字旭,下面用一張圖簡單的總結(jié)一下:
本文參考和拓展閱讀:
本文為原創(chuàng),歡迎轉(zhuǎn)載崖叫,轉(zhuǎn)載請注明出處遗淳、作者,謝謝心傀!