最近對音視頻轉(zhuǎn)碼系統(tǒng)進行重構(gòu),嘗試使用spark作分布式并發(fā)轉(zhuǎn)碼任務(wù)框架办陷。對于不熟悉的事物貌夕,使用起來畢竟心里沒底。所以便有了這次源碼的閱讀民镜。
Master 啟動過程
master的啟動命令是:
./sbin/start-master.sh
于是我們從這個腳本出發(fā)啡专。開始跟蹤Spark的啟動流程。
我們只抓主線制圈,其它一些支節(jié)先忽略们童,先了解整體流程。
閱讀start-master.sh 發(fā)現(xiàn)實際執(zhí)行語句為:
${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
--ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
$ORIGINAL_ARGS
其中CLASS為:
\# NOTE: This exact class name is matched downstream by SparkSubmit.
\# Any changes need to be reflected there.
CLASS="org.apache.spark.deploy.master.Master"
- 其它參數(shù)主要是端口信息有一些啟動參數(shù)离唐,可以先忽略病附。
精簡為:
spark-daemon.sh start org.apache.spark.deploy.master.Master 1
查看/spark-daemon.sh
關(guān)鍵語句為:
nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null
其中command 為start
查看:/bin/spark-class
找到真正入口:
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
exec "${CMD[@]}"
$RUNNER 和 $LAUNCH_CLASSPATH 分別是java 路徑及類路徑问窃。
實際調(diào)用:org.apache.spark.launcher.Main 生成java命令重定向輸入到$CMD中亥鬓,并使用exec執(zhí)行$CMD。在$CMD中主要執(zhí)行類為上面提到的**org.apache.spark.deploy.master.Master **
到這里找到程序的實際真正入口:
org.apache.spark.deploy.master.Master
文件所在位置:
core/src/main/scala/org/apache/spark/deploy/master/Master.scala
入口函數(shù):private[deploy] object Master extends Logging
如下:
private[deploy] object Master extends Logging {
val SYSTEM_NAME = "sparkMaster"
val ENDPOINT_NAME = "Master"
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}
/**
* Start the Master and return a three tuple of:
* (1) The Master RpcEnv
* (2) The web UI bound port
* (3) The REST server bound port, if any
*/
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)//創(chuàng)建rpcEnv使用Netty
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
}
這里主要是創(chuàng)建了一個rpcEnv域庇,并將master數(shù)作為一個endpoint注入其中嵌戈。
跟入: RpcEnv.create
def create(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean = false): RpcEnv = {
// Using Reflection to create the RpcEnv to avoid to depend on Akka directly
val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
getRpcEnvFactory(conf).create(config)
}
這里使用了getRpcEnvFactory(conf).create(config) 創(chuàng)建一個rpcEnv返回。
private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
val rpcEnvNames = Map(
"akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
"netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
val rpcEnvName = conf.get("spark.rpc", "netty")
val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
}
實際使用中听皿,我們使用了netty作為異步NIO框架熟呛。故這里使用的是
org.apache.spark.rpc.netty.NettyRpcEnvFactory
工廠類用于生成 rpcEnv
路徑:
core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
看一下這個工廠類。create方法尉姨。
private[netty] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
def create(config: RpcEnvConfig): RpcEnv = {
val sparkConf = config.conf
// Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
// KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
val javaSerializerInstance =
new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
throw e
}
}
nettyEnv
}
}
rpcEnv的實現(xiàn)是NettyRpcEnv
使用
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
啟動服務(wù): nettyEnv.startServer(actualPort)
def startServer(port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager))
} else {
java.util.Collections.emptyList()
}
server = transportContext.createServer(host, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
回到Master.scala startRpcEnvAndEndpoint中
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
將Master注冊進入rpcEnv中獲得masterEndpoint
Netty中通過dispatcher派發(fā)消息
我們進入Dispatcher.scala定位到消息派發(fā)函數(shù):
/** Message loop used for dispatching messages. */
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
while (true) {
try {
val data = receivers.take()
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
receivers.offer(PoisonPill)
return
}
data.inbox.process(Dispatcher.this)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case ie: InterruptedException => // exit
}
}
}
消息通過
data.inbox.process(Dispatcher.this)
處理
跟入:
core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
定位:
/**
* Process stored messages.
*/
def process(dispatcher: Dispatcher): Unit = {
var message: InboxMessage = null
inbox.synchronized {
if (!enableConcurrent && numActiveThreads != 0) {
return
}
message = messages.poll()
if (message != null) {
numActiveThreads += 1
} else {
return
}
}
while (true) {
safelyCall(endpoint) {
message match {
case RpcMessage(_sender, content, context) =>
try {
endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
} catch {
case NonFatal(e) =>
context.sendFailure(e)
// Throw the exception -- this exception will be caught by the safelyCall function.
// The endpoint's onError function will be called.
throw e
}
case OneWayMessage(_sender, content) =>
endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
case OnStart =>
endpoint.onStart()
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
enableConcurrent = true
}
}
}
case OnStop =>
val activeThreads = inbox.synchronized { inbox.numActiveThreads }
assert(activeThreads == 1,
s"There should be only a single active thread but found $activeThreads threads.")
dispatcher.removeRpcEndpointRef(endpoint)
endpoint.onStop()
assert(isEmpty, "OnStop should be the last message")
case RemoteProcessConnected(remoteAddress) =>
endpoint.onConnected(remoteAddress)
case RemoteProcessDisconnected(remoteAddress) =>
endpoint.onDisconnected(remoteAddress)
case RemoteProcessConnectionError(cause, remoteAddress) =>
endpoint.onNetworkError(cause, remoteAddress)
}
}
inbox.synchronized {
// "enableConcurrent" will be set to false after `onStop` is called, so we should check it
// every time.
if (!enableConcurrent && numActiveThreads != 1) {
// If we are not the only one worker, exit
numActiveThreads -= 1
return
}
message = messages.poll()
if (message == null) {
numActiveThreads -= 1
return
}
}
}
}
可看出:
啟動時調(diào)用了:
endpoint.onStart()
啟動后提供rpc調(diào)用庵朝,并通過receiveAndReply處理:
endpoint.receiveAndReply
這里endpoint 為我們的 Master
到Master中查看這兩個函數(shù)。
- 先看onStart()
override def onStart(): Unit = {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
webUi = new MasterWebUI(this, webUiPort)
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
}
restServerBoundPort = restServer.map(_.start())
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
// Attach the master and app metrics servlet handler to the web ui after the metrics systems are
// started.
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
val serializer = new JavaSerializer(conf)
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}
主要動作是啟動了web ui界面又厉,啟動了監(jiān)控九府,設(shè)置了master的高可用。
- 再看另一函數(shù):receiveAndReply
這個是master的主要工作函數(shù)覆致。
首先其分為多個case項侄旬。
先看第一個。
case RegisterWorker
這個主要是當(dāng)有新worker啟動時煌妈,worker的注冊函數(shù)儡羔。
看一下主體部分:
//創(chuàng)建worker信息類
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerUiPort, publicAddress)
if (registerWorker(worker)) {
//注冊worker
persistenceEngine.addWorker(worker)
context.reply(RegisteredWorker(self, masterWebUiUrl))
schedule() //重新調(diào)度宣羊,平衡集群
}
- 重點看一下schedule()
/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
整個調(diào)度過程還是比較簡單的。
首先取出workers集合狀態(tài)為alive的worker
然后遍歷driver等待隊列汰蜘,將driver 加載到滿足資源要求的worker中仇冯。
最后遍歷Apps等待隊列,過濾出可用的wokers族操,apps并發(fā)度沒達(dá)到預(yù)設(shè)值時赞枕,將app放到對應(yīng)的worker中,增加app并發(fā)度坪创。
這里startExecutorsOnWorkers() 如下:
/**
* Schedule and launch executors on workers
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// Filter out workers that don't have enough resources to launch an executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
簡單的FIFO方式炕婶。
scheduleExecutorsOnWorkers()返回對應(yīng)worker需要擴展的executor記錄
allocateWorkerResourceToExecutors()進行資源分配
到這里啟過程基本完成,但仍有兩處不明白莱预。driver與app 分別是怎么動作機制柠掂。代碼是如何提交上來的。