spark支持standlone缎除、yarn膝但、mesos等多種運行模式环形,其中standlone模式主要用于線下環(huán)境的測試策泣,線上都采用yarn或者mesos進行資源的管控、容錯抬吟,這篇文章中介紹下spark在yarn-cluster的整個啟動流程萨咕,重點介紹spark端的實現(xiàn)邏輯,關(guān)于yarn的一些細節(jié)我們會在其他的章節(jié)中進行介紹火本。
什么是yarn
yarn(Yet Another Resource Negotiator)是hadoop生態(tài)圈中用于資源管理危队、協(xié)調(diào)、任務(wù)隔離的框架钙畔,其他的計算編程模型可以基于yarn完成任務(wù)的調(diào)度執(zhí)行茫陆;更多請查看yarn官網(wǎng)。
yarn的架構(gòu)圖
spark on yarn
消息流轉(zhuǎn)圖
消息流轉(zhuǎn)圖
- client端向ResourceManager提交spark任務(wù)
- ResourceManager根據(jù)ApplicationManager請求的參數(shù)以及當(dāng)前集群的運行狀況將啟動AM進程的請求發(fā)給相應(yīng)的NodeManager
- NodeManager在本機上根據(jù)AM的啟動命令拉起AM進程
- ApplicationManager向ResourceManager申請資源擎析,同時將啟動executor的ContainerRequest請求發(fā)送給ResourceManager;
- ResourceManger同樣將拉起executor的請求發(fā)給相應(yīng)的NodeManager簿盅,它根據(jù)executor的啟動命令拉起executor進程
- executor進程向ApplicationManager中的DriverEndpoint注冊自己,后續(xù)當(dāng)ApplicationManger中有任務(wù)需要執(zhí)行時揍魂,就會將任務(wù)的執(zhí)行調(diào)度到注冊成功的executor上;
類交互圖
類交互圖
- 設(shè)置環(huán)境變量
HADOOP_CONF_DIR
或者YARN_CONF_DIR
來指定yarn-site.xml和core-site.xml等hadoop和yarn的資源配置信息桨醋; - 執(zhí)行命令行
sh bin/spark-submit --master yarn --depoly-mode cluster --files xxx --class xxx --jars xxx
來向yarn提交spark應(yīng)用程序; - 命令行最終轉(zhuǎn)化為執(zhí)行SparkSubmit.class的main函數(shù)现斋,再依次執(zhí)行參數(shù)的解析讨盒、校驗、轉(zhuǎn)換步责,最終再運行具體的類的main函數(shù),在yarn模式中執(zhí)行的類名為org.apache.spark.deploy.yarn.Client;
def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
參數(shù)解析完畢后運行相應(yīng)的childMainClass禀苦,yarn模式下為Client類的main方法蔓肯。
private def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
- Client中首先向yarn申請一個ApplicationId,然后上傳相應(yīng)的文件振乏、jar蔗包、配置參數(shù)等到hdfs上;然后提交應(yīng)用到ResourceManager慧邮,如果spark.yarn.submit.waitAppCompletion設(shè)置為true调限,啟動進程會一直獲取應(yīng)用的狀態(tài)信息直到應(yīng)用狀態(tài)變?yōu)?code>FINISHED舟陆、
KILLED
、FAILED
后退出耻矮;否則直接退出秦躯。
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
: ContainerLaunchContext = {
logInfo("Setting up container launch context for our AM")
val appId = newAppResponse.getApplicationId
val appStagingDir = getAppStagingDir(appId)
val pySparkArchives =
if (sparkConf.getBoolean("spark.yarn.isPython", false)) {
findPySparkArchives()
} else {
Nil
}
val launchEnv = setupLaunchEnv(appStagingDir, pySparkArchives)
//上傳spark.yarn.jar到/user/{user.home}/.sparkStaging/{applicationid}/__spark__.jar
//上傳userjar到/user/{user.home}/.sparkStaging/{applicationid}/__app__.jar
//上傳--jar中指定的jar到/user/{user.home}/.sparkStaging/{applicationid}/{#linkname}.jar
//上傳--files中指定的文件到/user/{user.home}/.sparkStaging/{applicationid}/{#linkname}
//上傳__spark_conf__.zip中指定的jar到/user/{user.home}/.sparkStaging/{applicationid}/__spark__conf__.zip,conf文件中包括HADOOP_CONF_DIR和YARN_CONF_DIR下面的所有文件以及由SparkConf生成的__spark_conf__.propertie文件
val localResources = prepareLocalResources(appStagingDir, pySparkArchives)
// Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(launchEnv)
distCacheMgr.setDistArchivesEnv(launchEnv)
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources.asJava)
amContainer.setEnvironment(launchEnv.asJava)
val javaOpts = ListBuffer[String]()
// Set the environment variable through a command prefix
// to append to the existing value of the variable
var prefixEnv: Option[String] = None
- 根據(jù)queue裆装、am的啟動命令踱承、依賴的環(huán)境變量等信息初始化ApplicationSubmissionContext;
def createApplicationSubmissionContext(
newApp: YarnClientApplication,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val appContext = newApp.getApplicationSubmissionContext
appContext.setApplicationName(args.appName)
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS)
.map(StringUtils.getTrimmedStringCollection(_))
.filter(!_.isEmpty())
.foreach { tagCollection =>
try {
// The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
// reflection to set it, printing a warning if a tag was specified but the YARN version
// doesn't support it.
val method = appContext.getClass().getMethod(
"setApplicationTags", classOf[java.util.Set[String]])
method.invoke(appContext, new java.util.HashSet[String](tagCollection))
} catch {
case e: NoSuchMethodException =>
logWarning(s"Ignoring $CONF_SPARK_YARN_APPLICATION_TAGS because this version of " +
"YARN does not support it")
}
}
sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
case Some(v) => appContext.setMaxAppAttempts(v)
case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
"Cluster's default value will be used.")
}
if (sparkConf.contains("spark.yarn.am.attemptFailuresValidityInterval")) {
try {
val interval = sparkConf.getTimeAsMs("spark.yarn.am.attemptFailuresValidityInterval")
val method = appContext.getClass().getMethod(
"setAttemptFailuresValidityInterval", classOf[Long])
method.invoke(appContext, interval: java.lang.Long)
} catch {
case e: NoSuchMethodException =>
logWarning("Ignoring spark.yarn.am.attemptFailuresValidityInterval because the version " +
"of YARN does not support it")
}
}
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead)
capability.setVirtualCores(args.amCores)
if (sparkConf.contains("spark.yarn.am.nodeLabelExpression")) {
try {
val amRequest = Records.newRecord(classOf[ResourceRequest])
amRequest.setResourceName(ResourceRequest.ANY)
amRequest.setPriority(Priority.newInstance(0))
amRequest.setCapability(capability)
amRequest.setNumContainers(1)
val amLabelExpression = sparkConf.get("spark.yarn.am.nodeLabelExpression")
val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String])
method.invoke(amRequest, amLabelExpression)
val setResourceRequestMethod =
appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest])
setResourceRequestMethod.invoke(appContext, amRequest)
} catch {
case e: NoSuchMethodException =>
logWarning("Ignoring spark.yarn.am.nodeLabelExpression because the version " +
"of YARN does not support it")
appContext.setResource(capability)
}
} else {
appContext.setResource(capability)
}
appContext
}
- 提交應(yīng)用上下文到ResourceManager的ClientRMService接口完成任務(wù)的提交哨免;
ApplicationMaster啟動
- NodeManager拉起ApplicationMaster茎活,執(zhí)行main函數(shù),設(shè)置
amfilter
琢唾,確保webui只能被指定的原ip訪問载荔,否則重定向到proxyurl; - 啟動線程driver來運行
userclass
的main函數(shù)采桃,初始化相應(yīng)的SparkContext懒熙,在主線程中向ResoureManager注冊am,告知RM其sparkUI的地址
private def runDriver(securityMgr: SecurityManager): Unit = {
//添加AmIpFilter芍碧,控制sparkui只能從特定的ip訪問煌珊,否則重定向到指定的url
addAmIpFilter()
//執(zhí)行userclass的main 函數(shù),開始運行用戶定義的代碼(sparksubmit命令中--class指定的類)
userClassThread = startUserApplication()
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
val sc = waitForSparkContextInitialized()
// If there is no SparkContext at this point, just fail the app.
if (sc == null) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
} else {
rpcEnv = sc.env.rpcEnv
val driverRef = runAMEndpoint(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
//向ResourceManager注冊啟動成功的AM泌豆,告知其ui的地址定庵,程序退出后,在去注冊的時候告知其history ui address
registerAM(rpcEnv, driverRef, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
//等待應(yīng)用線程執(zhí)行結(jié)束
userClassThread.join()
}
}
- 同時開啟report線程踪危,用于定期的向ResourceManager申請資源蔬浙,主要邏輯在YarnAllocator中;同時監(jiān)測executor失敗的情況贞远,當(dāng)executor失敗的次數(shù)超過spark.yarn.max.executor.failures指定的值時畴博,停止AM;
//開啟reporter線程監(jiān)控程序的運行狀態(tài),同時定期的向RM申請資源
private def launchReporterThread(): Thread = {
// The number of failures in a row until Reporter thread give up
val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
val t = new Thread {
override def run() {
var failureCount = 0
while (!finished) {
try {
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
s"Max number of executor failures ($maxNumExecutorFailures) reached")
} else {
logDebug("Sending progress")
//觸發(fā)資源的申請流程
allocator.allocateResources()
}
failureCount = 0
} catch {
case i: InterruptedException =>
case e: Throwable => {
failureCount += 1
if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"$failureCount time(s) from Reporter thread.")
} else {
logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
}
}
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
allocatorLock.synchronized {
val sleepInterval =
if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
val currentAllocationInterval =
math.min(heartbeatInterval, nextAllocationInterval)
nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
currentAllocationInterval
} else {
nextAllocationInterval = initialAllocationInterval
heartbeatInterval
}
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Sleeping for $sleepInterval.")
allocatorLock.wait(sleepInterval)
}
} catch {
case e: InterruptedException =>
}
}
}
}
// setting to daemon status, though this is usually not a good idea.
t.setDaemon(true)
t.setName("Reporter")
t.start()
logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " +
s"initial allocation : $initialAllocationInterval) intervals")
t
- 如果未開啟dynamic功能蓝仲,系統(tǒng)一開始需要申請的executor個數(shù)由spark.executor.instances參數(shù)指定俱病;開啟dynamic allocate的話,最初的資源個數(shù)由參數(shù)spark.dynamicAllocation.initialExecutors指定
YarnAllocator
def allocateResources(): Unit = synchronized {
//根據(jù)需要申請的executor總數(shù)袱结、已經(jīng)成功申請和任務(wù)的位置獲取需要申請的executor數(shù)目以及其地理位置信息
updateResourceRequests()
val progressIndicator = 0.1f
// Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
// 獲取AmClientImpl中處于pending和release狀態(tài)的請求亮隙,組裝成AllocateRequest,最終調(diào)用ApplicationMasterProtocol.allocate接口向RM申請資源
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s."
.format(
allocatedContainers.size,
numExecutorsRunning,
allocateResponse.getAvailableResources))
handleAllocatedContainers(allocatedContainers.asScala)
}
def updateResourceRequests(): Unit = {
val pendingAllocate = getPendingAllocate
val numPendingAllocate = pendingAllocate.size
//計算出還需要申請多少container
val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
if (missing > 0) {
logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
// consideration of container placement, treat as allocated containers.
// For locality unmatched and locality free container requests, cancel these container
// requests, since required locality preference has been changed, recalculating using
// container placement strategy.
val (localityMatched, localityUnMatched, localityFree) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)
// Remove the outdated container request and recalculate the requested container number
localityUnMatched.foreach(amClient.removeContainerRequest)
localityFree.foreach(amClient.removeContainerRequest)
val updatedNumContainer = missing + localityUnMatched.size + localityFree.size
//計算出container的位置信息垢夹,比如分布到哪些host或者rack
val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
updatedNumContainer, numLocalityAwareTasks, hostToLocalTaskCounts,
allocatedHostToContainersMap, localityMatched)
for (locality <- containerLocalityPreferences) {
val request = createContainerRequest(resource, locality.nodes, locality.racks)
//只是簡單存儲到本地的全局的map中溢吻,待后續(xù)真正申請才向rm發(fā)送申請命令
amClient.addContainerRequest(request)
val nodes = request.getNodes
val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.asScala.last
logInfo(s"Container request (host: $hostStr, capability: $resource)")
}
} else if (missing < 0) {
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor containers")
val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
if (!matchingRequests.isEmpty) {
matchingRequests.iterator().next().asScala
.take(numToCancel).foreach(amClient.removeContainerRequest)
} else {
logWarning("Expected to find pending requests, but found none.")
}
}
}
- 其余運行過程中executor故障退出后,會由YarnSchedulerBackend向YarnAllocator設(shè)置需要申請的總數(shù)果元,觸發(fā)節(jié)點的補充促王。
資源的彈性伸縮
spark on yarn運行用戶開啟彈性伸縮策略犀盟,系統(tǒng)將根據(jù)當(dāng)前的負載來決定增加或者移除相應(yīng)的executor,負載是根據(jù)正在運行以及待運行的任務(wù)數(shù)來決定需要的executor數(shù)目蝇狼;詳情可以參考類ExecutorAllocationManager阅畴。
private def maxNumExecutorsNeeded(): Int = {
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
//tasksPerExecutor是executor的核數(shù)
(numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
}
動態(tài)伸縮的策略
- 擴容策略: 當(dāng)積壓的task的持續(xù)時間超過一定的閾值時题翰,開始進行executor的增加恶阴,增加的方式類似于tcp的慢啟動算法,指數(shù)級增加
- 縮容策略:當(dāng)executor空閑的時間超過一定的閾值時豹障,進行資源的釋放
參數(shù)詳解
參數(shù)值 | 參數(shù)說明 |
---|---|
spark.dynamicAllocation.minExecutors | 最小的executor的數(shù)量 |
spark.dynamicAllocation.maxExecutors | 最大的executor的數(shù)量 |
spark.dynamicAllocation.enabled | 是否開啟動態(tài)分配策略冯事,前提是spark.executor.instances為0 |
spark.executor.instances | 設(shè)置executor的個數(shù) |
spark.dynamicAllocation.initialExecutors | 初次申請executor個數(shù) |
spark.dynamicAllocation.schedulerBacklogTimeout | 開始擴容的閾值 |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout | 超過schedulerBacklogTimeout閾值后,再次擴容的時間 |
spark.dynamicAllocation.executorIdleTimeout | executor空閑時間血公,超過改值昵仅,移除掉executor |
spark.dynamicAllocation.cachedExecutorIdleTimeout | 有緩存block的executor的超時時間 |