1、上次總結 spark初始化環(huán)境資源 0:18:00~ 0:41:00
1牙言、Spark RPC(Endpoint:DriverEndpoint ClientEndpoint)
2览濒、利用 akka(endpoint類似于actor) 模擬實現(xiàn) YARN(Flink 就是基于 akka實現(xiàn)的 RPC)
3、Spark Standalone 集群啟動腳本start-all.sh 分析
4、Master 啟動分析
5、Worker 啟動分析
6领猾、Spark APP 提交腳本 spark-submit 腳本分析
7、SparkSubmit 分析(重點是進入main方法骇扇,通過反射的方式運行用戶編寫的application的主類main方法)
8瘤运、SparkContext 初始化
1.1 SparkContext 初始化 分析 0:10~ 0:41
示例程序demo sparkPI
val spark: SparkSession = SparkSession.builder.appName("Spark Pi").getOrCreate()
注: 上圖粉紅色半邊 ,7件事
0:18:16 代碼開始
——》SparkSession#Builder#getOrCreate()
val sparkContext: SparkContext = userSuppliedContext.getOrElse{
SparkContext.getOrCreate(sparkConf)
——》 SparkContext#getOrCreate()
setActiveContext(new SparkContext(config), allowMultipleContexts = false)
——》SparkContext類塊#try
//以下為類級代碼塊
try {
_conf = config.clone()
_conf.validateSettings()
############# 注釋:第一步 創(chuàng)建Spark Env############
*
* 除了創(chuàng)建 sparkEnv之外匠题,還創(chuàng)建了各種 manager 對象。
*/ Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
——》SparkEnv#createDriverEnv
——》SparkEnv#create
//注釋: 初始化 SecurityManager
val securityManager = new SecurityManager(conf, ioEncryptionKey)
//注釋: 初始化 NettyRpcEnv
val systemName = if (isDriver) driverSystemName
val rpcEnv = RpcEnv.create(systemName,
bindAddress, advertiseAddress,
port.getOrElse(-1), conf, securityManager,
numUsableCores, !isDriver)
//注釋: 初始化 SerializerManager
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
//注釋: 初始化 BroadcastManager
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
//注釋: 初始化 MapOutputTracker
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
//注釋: 初始化 SortShuffleManager
val shortShuffleMgrNames = Map("sort" ->
// 注釋: 初始化 UnifiedMemoryManager 統(tǒng)一內(nèi)存管理模型
// StaticMemoryManaager 靜態(tài)內(nèi)存管理模型
val memoryManager: MemoryManager = if (useLegacyMemoryManager)
//注釋: 初始化 BlockManagerMaster
val blockManagerMaster = new BlockManagerMaster(
//注釋: 初始化 BlockManager
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
//注釋: 初始化 OutputCommitCoordinator
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse
//注釋: 正式初始化SparkEnv
val envInstance = new SparkEnv(executorId, rpcEnv, serializer, closureSerializer,
return envInstance
############# 注釋:第二步 創(chuàng)建SparkUI############
/** 注釋:第二步
* 創(chuàng)建并初始化Spark UI
*/
_ui = if (conf.getBoolean("spark.ui.enabled", true)) {
// TODO_MA 注釋:_jobProgressListener跟蹤要在UI中顯示的任務級別信息但金,startTime就是SparkContext的初始時的系統(tǒng)時間
// TODO_MA 注釋:返回SparkUI韭山,它的父類是WebUI,和MasterWebUI是一個級別的
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime))
} else {
// For tests, do not enable the UI
None
}
############# 注釋:第三步 hadoop相關配置以及Executor環(huán)境變量############
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
if (jars != null) {
jars.foreach(addJar)
}
if (files != null) {
files.foreach(addFile)
}
executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
executorEnvs ++= _conf.getExecutorEnv
executorEnvs("SPARK_USER") = sparkUser
############# 注釋: 第四步:創(chuàng)建心跳接收器 ############
* 1冷溃、我們需要在“createTaskScheduler”之前注冊“HeartbeatReceiver”钱磅,因為Executor將在構造函數(shù)中檢索“HeartbeatReceiver”
* 2、創(chuàng)建一個HeartbeatReceiver 的RpcEndpoint注冊到RpcEnv中似枕,每分鐘給自己發(fā)送ExpireDeadHosts盖淡,去檢測Executor是否存在心跳,
* 3凿歼、如果當前時間減去最一次心跳時間褪迟,大于1分鐘冗恨,就會用CoarseGrainedSchedulerBackend將Executor殺死
*/
_heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
############# 注釋:第五步:創(chuàng)建任務調(diào)度TaskScheduler############
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
############# 注釋:第六步:創(chuàng)建和啟動DAGScheduler############
/* * 1、內(nèi)部初始化了一個:DAGSchedulerEventProcessLoop 用來處理各種任務
* 2味赃、在 DAGSchedulerEventProcessLoop 創(chuàng)建的時候掀抹,構造函數(shù)的內(nèi)部的最后一句代碼執(zhí)行了 DAGSchedulerEventProcessLoop的啟動。
* 將來任務的提交心俗,取消等傲武,都會發(fā)送一個事件給 DAGSchedulerEventProcessLoop
* 從而觸發(fā) dagScheduler.onRecevie() 的運行。
*/
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
############# 注釋:第七步:TaskScheduler的啟動城榛,backend.start()############
/** :第七步:TaskScheduler的啟動揪利,主要任務:backend.start()
*/
_taskScheduler.start()
↓
TaskSchedulerImpl#start()
.......
backend.start()
↓
StandaloneSchedulerBackend.start()
▼
/** :調(diào)用父類方法 start() 方法啟動一個 DriverEndPoint
* super 粗粒度的 CoarseGrainedSchedulerBackend
*/
super.start()
↓
CoarseGrainedSchedulerBackend.start()
▼
/** 創(chuàng)建一個 DriverEndPoint 負責跟 master 打交道的
driverEndpoint = createDriverEndpointRef(properties)
/** 注釋:里面維護了一個 DriverEndPoint 主要用來向 Executor分發(fā)任務
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
/* 注釋:client start 啟動了 ClientEndpoint
client.start()
——》 StandaloneAppClient.start()
endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
——》 ClientEndpoint#Onstart()方法
/** 注釋:clientEndPoint 向 Master 執(zhí)行注冊
registerWithMaster(1)
▼
registerMasterFutures.set(tryRegisterAllMasters())
——》tryRegisterAllMasters()
▼
/** 注釋:創(chuàng)建了一個 Master 的 RPC 代理
*/
val masterRef = rpcEnv.setupEndpointRef(masterAddress,
/**
* 注釋:注冊
*/
masterRef.send(RegisterApplication(appDescription, self))
——》RegisterApplication()
↓
Master#類 receive 方法
...................
case RegisterApplication(description, driver) =>
...................
case RegisterWorker
schedule()
launchDriver(worker, driver) 啟動driver
——》startExecutorsOnWorkers()
▼
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
——》allocateWorkerResourceToExecutors
——》launchExecutor(worker, exec)
▼
/* 注釋:發(fā)送命令讓 worker 啟動 executor
* worker 節(jié)點的一個 RPC 節(jié)點,負責通信的狠持。
*/
worker.endpoint.send(LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
/ 注釋:發(fā)消息告訴 Driver 該 worker 上的 executor 已經(jīng)啟動
*/
exec.application.driver.send(ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
↓
Worker類 receive 方法
▼
/* 注釋: 接收到 Master 發(fā)送過來的啟動 Executor 的命令
*/
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
/* 注釋:把啟動Executor必要的一些信息疟位,封裝在 ExeuctorRunner 中
*/
val manager = new ExecutorRunner(appId, execId, appDesc.copy(command =
/* : 啟動好了 Executor 之后,返回給 Master一個信號工坊。
* 信息封裝在 ExecutorStateChanged 對象中献汗。
*/
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
/* 啟動 executor
* worker.send(ExecutorStateChanged)
*/
manager.start()
↓
——》ExecutorRunner#start
——》fetchAndRunExecutor()
/* :構建jvm 進程啟動命令 */
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
↓ 跳轉(zhuǎn)到Executor類構造方法
Executor #
// 實際上是構建了一個用于執(zhí)行task的線程池
private val threadPool = {
val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Executor task launch worker-%d")
.setThreadFactory(new ThreadFactory {
2、 本次內(nèi)容概述 spark 任務提交 分析 0:41:00 ~
1王污、Spark Application 提交流程分析
2罢吃、Spark Application 的 DAG 生成和 Stage 切分分析
3、Spark 的 Task 分發(fā)和執(zhí)行源碼分析
4昭齐、Spark 的 Shuffle 機制源碼分析
2.1 Spark Application 提交流程分析 0:43 ~0:58
入口:spark application 中的 action 算子D蛘小(SparkPi 程序中的 reduce 函數(shù))
以 SparkPi 程序舉例:reduce() 算子就是提交 job 的入口
reduce()
▼
sc.runJob
——》SparkContext#runJob
——》DAGScheduler#runJob
▼
/*
* 1、應用程序調(diào)用action算子
* 2阱驾、sparkcontext就谜。runjob
* 3、dagscheduler里覆。runjob
* 4丧荐、taskscheduler。submittasks
* 5喧枷、schedulerbackend虹统。driverEndpoint 提交任務
/
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
——》DAGScheduler#runJob
▼
/* 注釋: 提交任務
* 參數(shù)解析:
* 1、rdd:要在其上運行任務的參數(shù)RDD目標RDD
* 2隧甚、func:在RDD的每個分區(qū)上運行的函數(shù)
* 3车荔、partitions:要運行的分區(qū)的集;某些作業(yè)可能不希望在目標RDD的所有分區(qū)上進行計算戚扳,例如忧便,對于 first() 之類的操作。
* 4帽借、callSite:在用戶程序中調(diào)用此作業(yè)的位置
* 5珠增、resultHandler:回調(diào)函數(shù)超歌,以將每個分區(qū)結果傳遞給Xxx
* 6、properties:要附加到此作業(yè)的scheduler屬性切平,例如fair scheduler pool name
*/
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
——》DAGScheduler#runJob
▼
/**
* 第一步:封裝一個JobWaiter對象握础;
* 第二步:將JobWaiter對象賦值給JobSubmitted的listener屬性,
* 并將JobSubmitted(DAGSchedulerEvent事件)對象傳遞給eventProcessLoop事件循環(huán)處理器悴品。eventProcessLoop
* 內(nèi)部事件消息處理線程將會接收JobSubmitted事件禀综,并調(diào)用dagScheduler.handleJobSubmitted(...)方法來處理事件;
* 第三步:返回JobWaiter對象苔严。
*/
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
/**
* 注釋:這是提交任務運行
* eventProcessLoop 就是當初 DAGScheduler 在初始化的時候定枷,創(chuàng)建的一個 DAGSchedulerEventProcessLoop
* 這個組件主要負責:任務的提交執(zhí)行
*/
eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties)))
——》 eventProcessLoop 構造函數(shù) DAGSchedulerEventProcessLoop
——》EventLoop#post
——》DAGSchedulerEventProcessLoop#onRecive
——》DAGSchedulerEventProcessLoop#doOnRecive
——》DAGSchedulerEventProcessLoop#handleJobSubmitted
從此,任務的提交就交給了 dagScheduler#handleJobSubmitted 方法
▼
/* 注釋: RDD DAG劃分Stages:Stage的劃分是從最后一個Stage開始逆推的届氢,
-
每遇到一個寬依賴處欠窒,就分裂成另外一個Stage
- 依此類推直到Stage劃分完畢為止。并且退子,只有最后一個Stage的類型是ResultStage類型岖妄。
- 注意Dataset、DataFrame寂祥、sparkSession.sql("select ...")
- 經(jīng)過catalyst代碼解析會將代碼轉(zhuǎn)化為RDD
做了2件最主要的事
1荐虐、stage切分
-
2、 stage 提交
/
/ 注釋: Stage 切分
* 這個 finalRDD 就是 rdd鏈條中的最后一個 RDD丸凭,也就是觸發(fā) sc.runJob() 方法執(zhí)行的 RDD
*/finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) /* * 注釋: 提交 Stage */ submitStage(finalStage)
2.2 Spark Application 的 DAG 生成和 Stage 切分分析 0:58~ 1:46
入口:EventLoop 中的 eventQueue.take() 方法
如果任務提交衙传,則有 JobSubmitted 事件提交到 eventQueue 中撞叨,則 eventQueue.take() 阻塞返回驾凶,此時的 event 就是 JobSubmitted马昨。
根據(jù)事件機制,跳轉(zhuǎn)到:DAGScheduler.handleJobSubmitted() 方法
根據(jù) driver 發(fā)送過來的 事件類型虽界,來決定到底做什么汽烦!
兩個核心的方法:
// stage切分入口
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
// 提交stage執(zhí)行入口
submitStage(finalStage)
方法依賴關系:
1、createResultStage(傳入finalRDD獲得ResultStage) ->2
2莉御、getOrCreateParentStages(傳入rdd獲得父stage) ->3->4
3刹缝、getShuffleDependencies(傳入rdd獲得寬依賴)
4、getOrCreateShuffleMapStage(傳入寬依賴獲得ShuffleMapStage) ->5->6
5颈将、getMissingAncestorShuffleDependencies(傳入一個rdd獲得所有寬依賴) ->3
6、createShuffleMapStage(傳入寬依賴獲得ShuffleMapStage) ->2
RDD任務切分中間分為:Application言疗、Job晴圾、Stage 和 Task
1、Application:初始化一個 SparkContext 即生成一個 Application噪奄;
2死姚、Job:一個 Action 算子就會生成一個 Job人乓;
3、Stage:Stage 等于寬依賴的個數(shù)加 1都毒;
4色罚、Task:一個 Stage 階段中,最后一個 RDD 的分區(qū)個數(shù)就是 Task 的個數(shù)账劲。
注意:Application->Job->Stage->Task每一層都是1對n的關系
dagScheduler#handleJobSubmitted 主方法
▼
一 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
▼
// TODO_MA 注釋:獲取當前Stage的parent Stage戳护,這個方法是劃分Stage的核心實現(xiàn)
val parents = getOrCreateParentStages(rdd, jobId)
▼
1 getShuffleDependencies(rdd).map { shuffleDep => 2 getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList
1 ——》DAGScheduler#getShuffleDependencies
▼
/** 重點方法 找RDD依賴鏈
* Returns shuffle dependencies that are immediate parents of the given RDD.
* This function will not return more distant ancestors.
* For example, if C has a shuffle dependency on B which has a shuffle dependency on A:
* A <-- B <-- C
* calling this function with rdd C will only return the B <-- C dependency.
* This function is scheduler-visible for the purpose of unit testing.
* TODO_ 采用的是深度優(yōu)先遍歷找到Action算子的父依賴中的寬依賴
*/
2 ——》 getOrCreateShuffleMapStage
/* TODO_MA 如果shuffleIdToMapStage中存在shuffle,則獲取shuffle map stage瀑焦。
* 否則腌且,如果shuffle map stage不存在,該方法將創(chuàng)建shuffle map stage
* 以及任何丟失的parent shuffle map stage榛瓮。
***************************************
二 铺董、 /* 注釋: 遞歸提交 Stage /
submitStage(finalStage)
——>submitMissingTasks()
▼
/ 注釋: 把 stage 變成 Tasks
* Step3: 為每個需要計算的partiton生成一個task
*/
val tasks: Seq[Task[_]] = try {
//如果是 ShuffleMapStage 階段的 Task,則構建 ShuffleMapTask
case stage: ShuffleMapStage => stage.pendingPartitions.clear()
.............
//如果是 ResultStage 階段的 Task禀晓,則構建 ResultTask
case stage: ResultStage => partitionsToCompute.map
/* 注釋: 如果該階段有 Task 需要執(zhí)行
* Step4: 提交tasks
*/
if (tasks.size > 0) {
//注釋: taskScheduler 的具體類型是:TaskSchedulerImpl
taskScheduler.submitTasks(new TaskSet(tasks.toArray,
2.3 Spark Task 分發(fā)和執(zhí)行分析 2:13 ~ 2:45
入口接上面的: taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id,
taskScheduler 的具體類型是:TaskSchedulerImpl
TaskSchedulerImpl#submitTasks 方法
▼
/** 注釋:TaskScheduler提交job精续,最后交由 SchedulerBackEnd 進行提交
*/
backend.reviveOffers()
↓
CoarseGrainedSchedulerBackend.reviveOffers()
▼
// 給 DriverEndpoint (自己 )發(fā)送 ReviveOffers 消息
driverEndpoint.send(ReviveOffers)
↓
CoarseGrainedSchedulerBackend#DriverEndpoint#receive
▼
case ReviveOffers => makeOffers()
▼
scheduler.resourceOffers(workOffers) //申請計算資源
.........
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
▼
// 注釋: 發(fā)送 LaunchTask 消息給:CoarseGrainedExecutorBackend 類
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
↓
CoarseGrainedExecutorBackend# receive()
▼ //data 是任務的反序列化
case LaunchTask(data) => if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else
{ ....... executor.launchTask(this, taskDesc)
}
↓
Executor#launchTask
▼
// TODO_MA 注釋: 封裝一個 TaskRunner 線程對象,來運行一個 Task
val tr = new TaskRunner(context, taskDescription)
- 注釋: 提交到線程池運行粹懒,那么就轉(zhuǎn)到 TaskRunner 的 run() 方法
*/
threadPool.execute(tr)
↓ 具體運行一個 Task 的地方
Executor#TaskRunner#run()
▼
val value = Utils.tryWithSafeFinally {
val res = task.run(taskAttemptId = taskId,
↓
Task#run()//
▼
runTask(context)// 兩種task 重付, 一種是shuffer ,一種是result
↓shuffer
1. ShuffleMapTask#runTask()
▼
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
2.3 Spark Suffle 源碼分析 2:45~ 3:05
HashShuffleManager的運行原理 2:45~ 2:52
SortShuffleManager運行原理 2:52 ~3:13
↓ 有4個writer 崎淳; 以SortShuffleWriter為例
SortShuffleWriter#write
▼
* 注釋: 先排序
sorter = if (dep.mapSideCombine) {
插入數(shù)據(jù)到排序區(qū)
sorter.insertAll(records)
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
↓ Result
2. ResultMapTask#runTask()
▼