Spark-Core源碼精讀(3)曙强、spark-shell(spark-submit)流程詳解

本文將解讀使用spark-shell的方式進(jìn)入REPL的具體流程。

注:本專題的文章皆使用Spark-1.6.3版本的源碼為參考涎拉,如果Spark-2.1.0版本有重大改進(jìn)的地方也會進(jìn)行說明瑞侮。

shell部分

下面我們來看一下當(dāng)我們輸入 spark-shell --master spark://master:7077時具體的執(zhí)行流程,首先當(dāng)然是看一下spark-shell.sh的源碼鼓拧,我們只選取了相對比較重要的部分:

##檢測有沒有設(shè)置SPARK_HOME環(huán)境變量半火,如果沒有進(jìn)行設(shè)置
if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
##...
function main() {
  if $cygwin; then
    # Workaround for issue involving JLine and Cygwin
    # (see http://sourceforge.net/p/jline/bugs/40/).
    # If you're using the Mintty terminal emulator in Cygwin, may need to set the
    # "Backspace sends ^H" setting in "Keys" section of the Mintty options
    # (see https://github.com/sbt/sbt/issues/562).
    stty -icanon min 1 -echo > /dev/null 2>&1
    export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
    "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
    stty icanon echo > /dev/null 2>&1
  else
    export SPARK_SUBMIT_OPTS
    "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
  fi
}
##...
main "$@"
##...

可以看出最后執(zhí)行的是main方法并傳入我們使用spark-shell命令時候的所有參數(shù),比如--master季俩,而main方法中無論是什么操作系統(tǒng)(當(dāng)然生產(chǎn)環(huán)境是linux系統(tǒng))都會最終執(zhí)行spark-submit钮糖,并且class為org.apache.spark.repl.Main、name為“Spark shell”并且將spark-shell所有接收到的用戶輸入的參數(shù)一起傳進(jìn)去酌住,下面我們來看spark-submit:

if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

spark-submit的代碼比較簡潔店归,最后使用exec通過spark-class來啟動SparkSubmit并將spark-submit接收到的所有參數(shù)傳入,下面我們來看一下spark-class:(這里要說明一下酪我,從這里開始起始就是我們通過spark-submit提交application的過程消痛,只不過spark-submit提交的時候后面跟的是用戶自己編寫的類,而通過spark-shell過來的spark-submit后面跟的是org.apache.spark.repl.Main都哭,spark-submit方式提交的application運(yùn)行完成后就會結(jié)束秩伞,而通過spark-shell進(jìn)入的REPL一直等待用戶的輸入)

if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

## 載入環(huán)境變量
. "${SPARK_HOME}"/bin/load-spark-env.sh

## 獲得java的二進(jìn)制文件,后面會用來啟動一個JVM進(jìn)行
# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ `command -v java` ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

## jar包的相關(guān)依賴
# Find assembly jar
SPARK_ASSEMBLY_JAR=
if [ -f "${SPARK_HOME}/RELEASE" ]; then
  ASSEMBLY_DIR="${SPARK_HOME}/lib"
else
  ASSEMBLY_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION"
fi

GREP_OPTIONS=
num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" -a "$SPARK_PREPEND_CLASSES" != "1" ]; then
  echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2
  echo "You need to build Spark before running this program." 1>&2
  exit 1
fi
if [ -d "$ASSEMBLY_DIR" ]; then
  ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
  if [ "$num_jars" -gt "1" ]; then
    echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
    echo "$ASSEMBLY_JARS" 1>&2
    echo "Please remove all but one jar." 1>&2
    exit 1
  fi
fi

SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"

LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"

# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"

# For tests
if [[ -n "$SPARK_TESTING" ]]; then
  unset YARN_CONF_DIR
  unset HADOOP_CONF_DIR
fi

# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
  ## 使用java -cp命令啟動一個JVM進(jìn)程并執(zhí)行org.apache.spark.launcher.Main類的main方法欺矫,后面我們會看到這個進(jìn)程就是SparkSubmit進(jìn)程
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
exec "${CMD[@]}"

spark-class是Spark應(yīng)用程序的命令行啟動器纱新,負(fù)責(zé)設(shè)置JVM環(huán)境并執(zhí)行Spark的應(yīng)用程序,這里我們執(zhí)行的就是SparkSubmit汇陆,下面我們就進(jìn)入到Spark源碼的部分。

Spark源碼部分

承接上文带饱,我們直接進(jìn)入Spark的源碼:

關(guān)于org.apache.spark.launcher.Main的源碼我們這里不做說明毡代,大家可以參考我的另一篇文章(對Main類的補(bǔ)充說明)阅羹,簡單來說它會根據(jù)傳入的類進(jìn)行判斷然后生成相應(yīng)的command,最后交給exec來執(zhí)行教寂,我們現(xiàn)在主要關(guān)注Spark本身捏鱼,所以直接進(jìn)入SparkSubmit的源碼部分:

def main(args: Array[String]): Unit = {
  /** 使用SparkSubmitArguments封裝spark-submit傳入的參數(shù),還記得都有什么嗎酪耕?
   *  如果是spark-shell导梆,就包括spark-shell及后面的一串參數(shù),如果是直接使用spark-submit進(jìn)行提交
   *  后面就是提交時傳入的參數(shù)迂烁,由于SparkSubmitArguments中的參數(shù)比較多看尼,本文中不再一一列出
   *  會在使用到某個參數(shù)的時候進(jìn)行說明,詳細(xì)的參數(shù)可以參看SparkSubmitArguments的源碼盟步。
  */
  val appArgs = new SparkSubmitArguments(args)
  // 如果開啟了debug模式就打印出參數(shù)
  if (appArgs.verbose) {
    // scalastyle:off println
    printStream.println(appArgs)
    // scalastyle:on println
  }
  
  /** 這里的action就是spark-submit執(zhí)行的動作藏斩,包括:SUBMIT, KILL, REQUEST_STATUS(使
   *  用了SparkSubmitAction進(jìn)行了封裝),如果沒有指定却盘,默認(rèn)就是SparkSubmitAction.SUBMIT狰域,
   *  所以下面的這個模式匹配將執(zhí)行submit(appArgs)
  */
  appArgs.action match {
    case SparkSubmitAction.SUBMIT => submit(appArgs)
    case SparkSubmitAction.KILL => kill(appArgs)
    case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
  }
}

下面我們來看submit(appArgs)方法:

/**
 * submit方法的主要功能就是使用傳進(jìn)來的參數(shù)來提交應(yīng)用程序。
 * 主要分為兩步驟:
 * 1. 準(zhǔn)備啟動所需的環(huán)境黄橘,包括設(shè)置classpath兆览、系統(tǒng)參數(shù)和應(yīng)用程序的參數(shù)(根據(jù)部署模式和cluster
 * manager運(yùn)行child main類)。
 * 2. 使用上一步準(zhǔn)備好的環(huán)境調(diào)用child main class中的main函數(shù)塞关,我們這里只考慮client模式抬探,
 * cluster模式我們以后會單獨(dú)分析。
 * 所以如果是spark-shell描孟,child main class就是org.apache.spark.repl.Main驶睦,如果是
 * spark-submit直接進(jìn)行提交,child main class就是用戶編寫的應(yīng)用程序(含有main方法的類)
*/
private def submit(args: SparkSubmitArguments): Unit = {
  // 準(zhǔn)備環(huán)境匿醒,主要就是獲得childMainClass场航,即我們上面所說的child main class
  val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
   // 注意:源碼中這里是doRunMain()方法,我們在后面單獨(dú)拿出來進(jìn)行分析
   // 判斷gateway使用的是Akka還是基于REST的廉羔,但是不論那種方式最后都會調(diào)用doRunMain()方法
   // In standalone cluster mode, there are two submission gateways:
   //   (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
   //   (2) The new REST-based gateway introduced in Spark 1.3
   // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
   // to use the legacy gateway if the master endpoint turns out to be not a REST server.
  if (args.isStandaloneCluster && args.useRest) {
    try {
      // scalastyle:off println
      printStream.println("Running Spark using the REST application submission protocol.")
      // scalastyle:on println
      doRunMain()
    } catch {
      // Fail over to use the legacy submission gateway
      case e: SubmitRestConnectionException =>
        printWarning(s"Master endpoint ${args.master} was not a REST server. " +
          "Falling back to legacy submission gateway instead.")
        args.useRest = false
        submit(args)
    }
  // In all other modes, just run the main class as prepared
  } else {
    doRunMain()
  }
}

doRunMain()的實(shí)現(xiàn)部分:

def doRunMain(): Unit = {
  if (args.proxyUser != null) {
    // 這里是hadoop相關(guān)的用戶和組的信息
    val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
      UserGroupInformation.getCurrentUser())
    try {
      proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
        override def run(): Unit = {
          runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
        }
      })
    } catch {
      case e: Exception =>
        // Hadoop's AuthorizationException suppresses the exception's stack trace, which
        // makes the message printed to the output by the JVM not very helpful. Instead,
        // detect exceptions with empty stack traces here, and treat them differently.
        if (e.getStackTrace().length == 0) {
          // scalastyle:off println
          printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
          // scalastyle:on println
          exitFn(1)
        } else {
          throw e
        }
    }
  } else {
    runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
  }
}

我們看到doRunMain()內(nèi)部最終都執(zhí)行了runMain方法溉痢,所以我們進(jìn)入runMain方法:

/** 別看這個方法這么長,主要做的事情就是一件:運(yùn)行child main class的main方法
再次說明一下憋他,如果是直接使用spark-submit提交的應(yīng)用程序孩饼,就是執(zhí)行用戶指定的類的main方法
如果是通過spark-shell執(zhí)行的,就是執(zhí)行org.apache.spark.repl.Main中的main方法
*/
private def runMain(
    childArgs: Seq[String],
    childClasspath: Seq[String],
    sysProps: Map[String, String],
    childMainClass: String,
    verbose: Boolean): Unit = {
  //是否打印debug信息
  // scalastyle:off println
  if (verbose) {
    printStream.println(s"Main class:\n$childMainClass")
    printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
    printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
    printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
    printStream.println("\n")
  }
  // scalastyle:on println
  
  // 下面這些操作是指定當(dāng)前運(yùn)行線程的ClassLoader
  val loader =
    if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
      new ChildFirstURLClassLoader(new Array[URL](0),
        Thread.currentThread.getContextClassLoader)
    } else {
      new MutableURLClassLoader(new Array[URL](0),
        Thread.currentThread.getContextClassLoader)
    }
  Thread.currentThread.setContextClassLoader(loader)
  
  // 添加jar依賴
  for (jar <- childClasspath) {
    addJarToClasspath(jar, loader)
  }
  // 系統(tǒng)屬性
  for ((key, value) <- sysProps) {
    System.setProperty(key, value)
  }
  var mainClass: Class[_] = null
  // 通過反射的方式獲得mainClass(child main class)
  try {
    mainClass = Utils.classForName(childMainClass)
  } catch {
    case e: ClassNotFoundException =>
      e.printStackTrace(printStream)
      if (childMainClass.contains("thriftserver")) {
        // scalastyle:off println
        printStream.println(s"Failed to load main class $childMainClass.")
        printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
        // scalastyle:on println
      }
      System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
    case e: NoClassDefFoundError =>
      e.printStackTrace(printStream)
      if (e.getMessage.contains("org/apache/hadoop/hive")) {
        // scalastyle:off println
        printStream.println(s"Failed to load hive class.")
        printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
        // scalastyle:on println
      }
      System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
  }
  // SPARK-4170
  if (classOf[scala.App].isAssignableFrom(mainClass)) {
    printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
  }
  // 獲得mainClass(child main class)的main方法
  val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
  // main方法必須是static級別的
  if (!Modifier.isStatic(mainMethod.getModifiers)) {
    throw new IllegalStateException("The main method in the given main class must be static")
  }
  def findCause(t: Throwable): Throwable = t match {
    case e: UndeclaredThrowableException =>
      if (e.getCause() != null) findCause(e.getCause()) else e
    case e: InvocationTargetException =>
      if (e.getCause() != null) findCause(e.getCause()) else e
    case e: Throwable =>
      e
  }
  // 最后調(diào)用main方法
  try {
    mainMethod.invoke(null, childArgs.toArray)
  } catch {
    case t: Throwable =>
      findCause(t) match {
        case SparkUserAppException(exitCode) =>
          System.exit(exitCode)
        case t: Throwable =>
          throw t
      }
  }
}

走到這里竹挡,如果是用戶通過spark-submit提交自己編寫的spark application镀娶,那么就直接調(diào)用main方法,然后一步一步執(zhí)行用戶編寫的代碼:SparkContext等等揪罕,我們會在以后的文章中進(jìn)行分析梯码,所以我們現(xiàn)在要跟隨的就是org.apache.spark.repl.Main中的main方法(注意本文中我們只討論client的模式宝泵,至于cluster的模式我們會單獨(dú)進(jìn)行分析),這里我們貼出SparkSubmit進(jìn)程中主線程的thread dump:

java.io.FileInputStream.read0(Native Method)
java.io.FileInputStream.read(FileInputStream.java:207)
scala.tools.jline.TerminalSupport.readCharacter(TerminalSupport.java:152)
scala.tools.jline.UnixTerminal.readVirtualKey(UnixTerminal.java:125)
scala.tools.jline.console.ConsoleReader.readVirtualKey(ConsoleReader.java:933)
scala.tools.jline.console.ConsoleReader.readBinding(ConsoleReader.java:1136)
scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1218)
scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1170)
org.apache.spark.repl.SparkJLineReader.readOneLine(SparkJLineReader.scala:80)
scala.tools.nsc.interpreter.InteractiveReader$class.readLine(InteractiveReader.scala:43) 
org.apache.spark.repl.SparkJLineReader.readLine(SparkJLineReader.scala:25) 
org.apache.spark.repl.SparkILoop.readOneLine$1(SparkILoop.scala:648) 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 
org.apache.spark.repl.Main$.main(Main.scala:31)
org.apache.spark.repl.Main.main(Main.scala) 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
java.lang.reflect.Method.invoke(Method.java:498) 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

在這個時候貼出來就是為了承上啟下轩娶,我們可以清楚的看見(注意是從最后一行往上看)前面我們分析的過程儿奶,從SparkSubmit的main方法到submit、doRunMain鳄抒、runMain到最后通過反射的方式調(diào)用org.apache.spark.repl.Main的main方法闯捎,整個流程都看的很清楚,所以下面我們進(jìn)入org.apache.spark.repl.Main的main方法(包含了初始化的操作):

// 實(shí)例化SparkConf
val conf = new SparkConf()
// 設(shè)置各種文件路徑
val tmp = System.getProperty("java.io.tmpdir")
val rootDir = conf.get("spark.repl.classdir", tmp)
val outputDir = Utils.createTempDir(rootDir)
val s = new Settings()
s.processArguments(List("-Yrepl-class-based",
  "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
  "-classpath", getAddedJars.mkString(File.pathSeparator)), true)
// the creation of SecurityManager has to be lazy so SPARK_YARN_MODE is set if needed
val classServerPort = conf.getInt("spark.replClassServer.port", 0)
// 實(shí)例化了HttpServer许溅,注意這里是lazy級別的
lazy val classServer =
  new HttpServer(conf, outputDir, new SecurityManager(conf), classServerPort, "HTTP class server")
var sparkContext: SparkContext = _
var sqlContext: SQLContext = _
// 實(shí)例化了SparkILoop瓤鼻,接下來會詳細(xì)的分析
var interp = new SparkILoop // this is a public var because tests reset it.
// 執(zhí)行一些初始化的處理后就執(zhí)行main方法
def main(args: Array[String]) {
  // 判斷是否為yarn的模式,我們在以后的文章中會專門的分析yarn的部署模式
  if (getMaster == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
  // Start the classServer and store its URI in a spark system property
  // (which will be passed to executors so that they can connect to it)
  // 啟動HTTP server
  classServer.start()
  // 最關(guān)鍵的代碼闹司,讓解釋器循環(huán)執(zhí)行娱仔,即REPL
  interp.process(s) // Repl starts and goes in loop of R.E.P.L
  classServer.stop()
  Option(sparkContext).map(_.stop)
}

寫到這里我們再來貼出通過spark-shell進(jìn)入REPL時打印的部分日志:

17/02/21 13:40:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/21 13:40:17 INFO spark.SecurityManager: Changing view acls to: root
17/02/21 13:40:17 INFO spark.SecurityManager: Changing modify acls to: root
17/02/21 13:40:17 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/02/21 13:40:18 INFO spark.HttpServer: Starting HTTP Server
17/02/21 13:40:18 INFO server.Server: jetty-8.y.z-SNAPSHOT
17/02/21 13:40:18 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:43773
17/02/21 13:40:18 INFO util.Utils: Successfully started service 'HTTP class server' on port 43773.

上面這段日志其實(shí)對應(yīng)的就是classServer.start()的部分,以后我們再看到這些日志的時候就知道背后到底發(fā)生了什么游桩,是不是很有成就感牲迫?

下面就進(jìn)入SparkILoop和ILoop的部分(SparkILoop是繼承自ILoop類,而SparkILoop中沒有process方法借卧,所以調(diào)用的實(shí)際上是ILoop類中的process方法):

ILoop

// 啟動解釋器盹憎,用來解釋用戶輸入的command
// start an interpreter with the given settings
def process(settings: Settings): Boolean = savingContextLoader {
  this.settings = settings
  // 創(chuàng)建解釋器,內(nèi)部其實(shí)是實(shí)例化了一個ILoopInterpreter
  createInterpreter()
  // sets in to some kind of reader depending on environmental cues
  in = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true))
  globalFuture = future {
    intp.initializeSynchronous()
    loopPostInit()
    !intp.reporter.hasErrors
  }
  // 這里應(yīng)該調(diào)用的是其子類SparkILoop的loadFiles方法铐刘,而SparkILoop的loadFiles方法內(nèi)部最后又會調(diào)用這里的loadFiles方法
  loadFiles(settings)
  printWelcome()
  // 一直循環(huán)接收用戶輸入的command
  try loop() match {
    case LineResults.EOF => out print Properties.shellInterruptedString
    case _               =>
  }
  catch AbstractOrMissingHandler()
  finally closeInterpreter()
  true
}

我們先來看一下SparkILoop的loadFiles方法都做了什么:

override def loadFiles(settings: Settings): Unit = {
  initializeSpark()
  super.loadFiles(settings)
}

可以看到首先調(diào)用initializeSpark()方法陪每,然后調(diào)用父類的loadFiles方法,目的就是先準(zhǔn)備好SparkContext镰吵、SQLContext然后再執(zhí)行后面的操作檩禾,方便我們在進(jìn)入到REPL后直接可以訪問sc、sqlContext等疤祭,所以我們現(xiàn)在明白了為什么我們可以直接在spark-shell中直接訪問sc盼产、sqlContext了(成就感爆棚有木有?)勺馆。說了這么多戏售,我們看一下initializeSpark()的廬山真面目:

def initializeSpark() {
  intp.beQuietDuring {
    processLine("""
       @transient val sc = {
         val _sc = org.apache.spark.repl.Main.createSparkContext()
         println("Spark context available as sc.")
         _sc
       }
      """)
    processLine("""
       @transient val sqlContext = {
         val _sqlContext = org.apache.spark.repl.Main.createSQLContext()
         println("SQL context available as sqlContext.")
         _sqlContext
       }
      """)
    processLine("import org.apache.spark.SparkContext._")
    processLine("import sqlContext.implicits._")
    processLine("import sqlContext.sql")
    processLine("import org.apache.spark.sql.functions._")
  }
}

這里寫的就非常清楚了通過processLine來創(chuàng)建SparkContext、SQLContext并導(dǎo)入一些經(jīng)常使用的包草穆,都準(zhǔn)備完成后再調(diào)用父類的loadFiles灌灾,然后調(diào)用printWelcome(),注意這里調(diào)用的是SparkILoop的printWelcome()方法:

/** Print a welcome message */
override def printWelcome() {
  import org.apache.spark.SPARK_VERSION
  echo("""Welcome to
    ____              __
   / __/__  ___ _____/ /__
  _\ \/ _ \/ _ `/ __/  '_/
 /___/ .__/\_,_/_/ /_/\_\   version %s
    /_/
       """.format(SPARK_VERSION))
  val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
    versionString, javaVmName, javaVersion)
  echo(welcomeMsg)
  echo("Type in expressions to have them evaluated.")
  echo("Type :help for more information.")
}

咦悲柱?這貨看著是不是很眼熟锋喜,對,這就是spark-shell中打印的日志:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.3
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.
17/02/21 13:40:24 INFO spark.SparkContext: Running Spark version 1.6.3
17/02/21 13:40:24 INFO spark.SecurityManager: Changing view acls to: root
17/02/21 13:40:24 INFO spark.SecurityManager: Changing modify acls to: root
17/02/21 13:40:24 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/02/21 13:40:25 INFO util.Utils: Successfully started service 'sparkDriver' on port 38463.
17/02/21 13:40:26 INFO slf4j.Slf4jLogger: Slf4jLogger started
17/02/21 13:40:26 INFO Remoting: Starting remoting
17/02/21 13:40:26 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@172.17.0.2:37221]
17/02/21 13:40:26 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 37221.
17/02/21 13:40:26 INFO spark.SparkEnv: Registering MapOutputTracker
17/02/21 13:40:26 INFO spark.SparkEnv: Registering BlockManagerMaster
17/02/21 13:40:26 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-a06685a8-6f1c-4e8f-805c-e232333f8d85
17/02/21 13:40:26 INFO storage.MemoryStore: MemoryStore started with capacity 511.1 MB
17/02/21 13:40:27 INFO spark.SparkEnv: Registering OutputCommitCoordinator
17/02/21 13:40:27 INFO server.Server: jetty-8.y.z-SNAPSHOT
17/02/21 13:40:27 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
17/02/21 13:40:27 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
17/02/21 13:40:27 INFO ui.SparkUI: Started SparkUI at http://172.17.0.2:4040
17/02/21 13:40:27 INFO client.AppClient$ClientEndpoint: Connecting to master spark://master:7077...
17/02/21 13:40:28 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20170221134027-0000
17/02/21 13:40:28 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44615.
17/02/21 13:40:28 INFO netty.NettyBlockTransferService: Server created on 44615
17/02/21 13:40:28 INFO storage.BlockManagerMaster: Trying to register BlockManager
17/02/21 13:40:28 INFO storage.BlockManagerMasterEndpoint: Registering block manager 172.17.0.2:44615 with 511.1 MB RAM, BlockManagerId(driver, 172.17.0.2, 44615)
17/02/21 13:40:28 INFO storage.BlockManagerMaster: Registered BlockManager
17/02/21 13:40:28 INFO client.AppClient$ClientEndpoint: Executor added: app-20170221134027-0000/0 on worker-20170221133811-172.17.0.3-41829 (172.17.0.3:41829) with 2 cores
17/02/21 13:40:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20170221134027-0000/0 on hostPort 172.17.0.3:41829 with 2 cores, 1024.0 MB RAM
17/02/21 13:40:28 INFO client.AppClient$ClientEndpoint: Executor added: app-20170221134027-0000/1 on worker-20170221133810-172.17.0.4-39901 (172.17.0.4:39901) with 2 cores
17/02/21 13:40:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20170221134027-0000/1 on hostPort 172.17.0.4:39901 with 2 cores, 1024.0 MB RAM
17/02/21 13:40:29 INFO client.AppClient$ClientEndpoint: Executor updated: app-20170221134027-0000/1 is now RUNNING
17/02/21 13:40:29 INFO client.AppClient$ClientEndpoint: Executor updated: app-20170221134027-0000/0 is now RUNNING
17/02/21 13:40:45 INFO scheduler.EventLoggingListener: Logging events to hdfs://master:9000/historyserverforspark/app-20170221134027-0000
17/02/21 13:40:45 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
17/02/21 13:40:45 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
17/02/21 13:40:46 INFO cluster.SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (worker1:60096) with ID 0
17/02/21 13:40:46 INFO cluster.SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (worker2:46846) with ID 1
17/02/21 13:40:47 INFO storage.BlockManagerMasterEndpoint: Registering block manager worker1:39275 with 511.1 MB RAM, BlockManagerId(0, worker1, 39275)
17/02/21 13:40:47 INFO storage.BlockManagerMasterEndpoint: Registering block manager worker2:37449 with 511.1 MB RAM, BlockManagerId(1, worker2, 37449)
17/02/21 13:40:50 INFO hive.HiveContext: Initializing execution hive, version 1.2.1
17/02/21 13:40:51 INFO client.ClientWrapper: Inspected Hadoop version: 2.6.0
17/02/21 13:40:51 INFO client.ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0
17/02/21 13:40:52 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
17/02/21 13:40:53 INFO metastore.ObjectStore: ObjectStore, initialize called
17/02/21 13:40:53 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
17/02/21 13:40:53 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored
17/02/21 13:40:54 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:40:55 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:41:01 INFO metastore.ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
17/02/21 13:41:08 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:08 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:15 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:15 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:17 INFO metastore.MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
17/02/21 13:41:17 INFO metastore.ObjectStore: Initialized ObjectStore
17/02/21 13:41:18 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/02/21 13:41:18 WARN metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException
17/02/21 13:41:19 INFO metastore.HiveMetaStore: Added admin role in metastore
17/02/21 13:41:19 INFO metastore.HiveMetaStore: Added public role in metastore
17/02/21 13:41:19 INFO metastore.HiveMetaStore: No user is added in admin role, since config is empty
17/02/21 13:41:20 INFO metastore.HiveMetaStore: 0: get_all_databases
17/02/21 13:41:20 INFO HiveMetaStore.audit: ugi=root    ip=unknown-ip-addr  cmd=get_all_databases   
17/02/21 13:41:20 INFO metastore.HiveMetaStore: 0: get_functions: db=default pat=*
17/02/21 13:41:20 INFO HiveMetaStore.audit: ugi=root    ip=unknown-ip-addr  cmd=get_functions: db=default pat=* 
17/02/21 13:41:20 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:21 INFO session.SessionState: Created local directory: /tmp/939dedb5-f724-461b-a41a-a5fd1fe7324b_resources
17/02/21 13:41:21 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/939dedb5-f724-461b-a41a-a5fd1fe7324b
17/02/21 13:41:21 INFO session.SessionState: Created local directory: /tmp/root/939dedb5-f724-461b-a41a-a5fd1fe7324b
17/02/21 13:41:21 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/939dedb5-f724-461b-a41a-a5fd1fe7324b/_tmp_space.db
17/02/21 13:41:22 INFO hive.HiveContext: default warehouse location is /user/hive/warehouse
17/02/21 13:41:22 INFO hive.HiveContext: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
17/02/21 13:41:22 INFO client.ClientWrapper: Inspected Hadoop version: 2.6.0
17/02/21 13:41:22 INFO client.ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0
17/02/21 13:41:23 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
17/02/21 13:41:23 INFO metastore.ObjectStore: ObjectStore, initialize called
17/02/21 13:41:23 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
17/02/21 13:41:23 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored
17/02/21 13:41:24 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:41:24 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:41:25 INFO metastore.ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:30 INFO DataNucleus.Query: Reading in results for query "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is closing
17/02/21 13:41:30 INFO metastore.MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
17/02/21 13:41:30 INFO metastore.ObjectStore: Initialized ObjectStore
17/02/21 13:41:30 INFO metastore.HiveMetaStore: Added admin role in metastore
17/02/21 13:41:30 INFO metastore.HiveMetaStore: Added public role in metastore
17/02/21 13:41:30 INFO metastore.HiveMetaStore: No user is added in admin role, since config is empty
17/02/21 13:41:30 INFO metastore.HiveMetaStore: 0: get_all_databases
17/02/21 13:41:30 INFO HiveMetaStore.audit: ugi=root    ip=unknown-ip-addr  cmd=get_all_databases   
17/02/21 13:41:30 INFO metastore.HiveMetaStore: 0: get_functions: db=default pat=*
17/02/21 13:41:30 INFO HiveMetaStore.audit: ugi=root    ip=unknown-ip-addr  cmd=get_functions: db=default pat=* 
17/02/21 13:41:30 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:30 INFO session.SessionState: Created local directory: /tmp/c9c26571-1229-4786-8a8e-d7b090b07d85_resources
17/02/21 13:41:30 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/c9c26571-1229-4786-8a8e-d7b090b07d85
17/02/21 13:41:30 INFO session.SessionState: Created local directory: /tmp/root/c9c26571-1229-4786-8a8e-d7b090b07d85
17/02/21 13:41:30 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/c9c26571-1229-4786-8a8e-d7b090b07d85/_tmp_space.db
17/02/21 13:41:30 INFO repl.SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

Welcome后面的一大串就是上面initializeSpark()執(zhí)行打的日志信息豌鸡,現(xiàn)在所有的日志信息都“名花有主”了嘿般,我們會單獨(dú)拿出文章來分析SparkContext轴总、SQLContext的創(chuàng)建流程,下面我們看process方法中最后就一直進(jìn)行l(wèi)oop操作博个,這里我們不再深入的分析下去了,我們要適可而止功偿,否則會迷失在源碼中盆佣,大家可以簡單的理解其實(shí)這里的循環(huán)過程就是REPL所代表的意思,即Read:讀取用戶輸入的command械荷;Evaluation:通過Spark Framework執(zhí)行command共耍;P:print打計(jì)算結(jié)果;L:loop循環(huán)前面的流程吨瞎,同時在讀取command后需要進(jìn)行語法解析痹兜,然后用解釋器執(zhí)行,有興趣的朋友可以繼續(xù)跟隨源碼走下去颤诀。

至此我們走完了整個spark-shell(包括spark-submit)的整個流程字旭,下面用一張圖簡單的總結(jié)一下:

注意:本圖的前提是client模式

本文參考和拓展閱讀:

Spark-1.6.3源碼

Spark-2.1.0源碼

本文為原創(chuàng),歡迎轉(zhuǎn)載崖叫,轉(zhuǎn)載請注明出處遗淳、作者,謝謝心傀!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末屈暗,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子脂男,更是在濱河造成了極大的恐慌养叛,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件宰翅,死亡現(xiàn)場離奇詭異弃甥,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)堕油,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進(jìn)店門潘飘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人掉缺,你說我怎么就攤上這事卜录。” “怎么了眶明?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵艰毒,是天一觀的道長。 經(jīng)常有香客問我搜囱,道長丑瞧,這世上最難降的妖魔是什么柑土? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮绊汹,結(jié)果婚禮上稽屏,老公的妹妹穿的比我還像新娘。我一直安慰自己西乖,他們只是感情好狐榔,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著获雕,像睡著了一般薄腻。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上届案,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天庵楷,我揣著相機(jī)與錄音,去河邊找鬼楣颠。 笑死尽纽,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的童漩。 我是一名探鬼主播蜓斧,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼睁冬!你這毒婦竟也來了挎春?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤豆拨,失蹤者是張志新(化名)和其女友劉穎直奋,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體施禾,經(jīng)...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡脚线,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了弥搞。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片邮绿。...
    茶點(diǎn)故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖攀例,靈堂內(nèi)的尸體忽然破棺而出船逮,到底是詐尸還是另有隱情,我是刑警寧澤粤铭,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布挖胃,位于F島的核電站,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏酱鸭。R本人自食惡果不足惜吗垮,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望凹髓。 院中可真熱鬧烁登,春花似錦、人聲如沸蔚舀。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蝗敢。三九已至,卻和暖如春足删,著一層夾襖步出監(jiān)牢的瞬間寿谴,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工失受, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留讶泰,地道東北人。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓拂到,卻偏偏與公主長得像痪署,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子兄旬,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內(nèi)容