準備
本文主要對Master的啟動流程源碼進行分析。Spark源碼版本為2.3.1德撬。
閱讀源碼首先從啟動腳本入手铲咨,看看首先加載的是哪個類,我們看一下start-master.sh
啟動腳本中的具體內(nèi)容蜓洪。
可以看到這里加載的類是org.apache.spark.deploy.master.Master纤勒,好那我們的源碼尋覓之旅就從這開始...
源碼分析
打開源碼,我們發(fā)現(xiàn)Master是伴生關(guān)系的一組類隆檀,我們直接定位到Master的main函數(shù)
//主方法
def main(argStrings: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
Utils.initDaemon(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
/**
* 創(chuàng)建RPC 環(huán)境和Endpoint (RPC 遠程過程調(diào)用),在Spark中 Driver摇天, Master ,Worker角色都有各自的Endpoint恐仑,相當于各自的通信郵箱泉坐。
*
*/
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}
發(fā)現(xiàn)該函數(shù)除了做了一些配置文件和args參數(shù)的準備之外,調(diào)用了startRpcEnvAndEndpoint函數(shù)裳仆,我們跟進去看看
/**
* Start the Master and return a three tuple of:
* (1) The Master RpcEnv
* (2) The web UI bound port
* (3) The REST server bound port, if any
*/
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
/**
* 創(chuàng)建RPC(Remote Procedure Call )環(huán)境 ,Remote Procedure Call
* 這里只是創(chuàng)建準備好Rpc的環(huán)境腕让,后面會向RpcEnv中注冊 角色【Driver,Master,Worker,Executor】
*/
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
.....
}
首先上面這段代碼通過RpcEnv 構(gòu)建了一個RPC通信環(huán)境,為之后的RPC通信做準備歧斟,Spark底層的通信是基于Netty的NIO模型记某,
這里每個Rpc端點運行時依賴的上下文環(huán)境稱之為RpcEnv。
接下來我們直接跟蹤到create方法中
def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
numUsableCores, clientMode)
//創(chuàng)建NettyRpc 環(huán)境
new NettyRpcEnvFactory().create(config)
}
可以看到构捡,這里構(gòu)建了NettyRpcEnvFactory來創(chuàng)建NettyRpc 環(huán)境,NettyRpcEnvFactory的create函數(shù)又做了那些事呢?
def create(config: RpcEnvConfig): RpcEnv = {
val sparkConf = config.conf
// Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
// KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
val javaSerializerInstance =
new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
/**
* 創(chuàng)建nettyRPC通信環(huán)境壳猜。
* 當new NettyRpcEnv時會做一些初始化:
* Dispatcher:這個對象中有存放消息的隊列和消息的轉(zhuǎn)發(fā)
* TransportContext:可以創(chuàng)建了NettyRpcHandler
*/
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)
if (!config.clientMode) {
//啟動nettyRPCEnv
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
//以上 startNettyRpcEnv 匿名函數(shù)在此處會最終被調(diào)用勾徽,當匿名函數(shù)被調(diào)用時,重點方法是483行 nettyEnv.startServer 方法
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
throw e
}
}
nettyEnv
}
可以看到上面首先構(gòu)建了一個序列化的實例對象统扳,然后開始著手構(gòu)架nettyRPC通信環(huán)境喘帚。在new NettyRpcEnv()時會做一些初始化的工作畅姊,如下所示
/**
* dispatcher 這個對象中有消息隊列和消息的循環(huán)獲取轉(zhuǎn)發(fā)
*/
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
/**
* TransportContext 中會創(chuàng)建 NettyPpcHandler
* TransportContext 這個對象中參數(shù)類型 RpcHandler 就是這里的 NettyRpcHandler
*/
private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))
這里的Dispatcher是一個消息分發(fā)器,針對于RPC端點需要發(fā)送消息或者從遠程RPC接收到的消息吹由,分發(fā)至對應(yīng)的指令收件箱/發(fā)件箱若未。
這里的transportContext 是構(gòu)建傳輸?shù)纳舷挛沫h(huán)境,用于創(chuàng)建TransportServer和TransportClientFactory同時倾鲫,通過TransportChannelHandler來設(shè)置Netty的Channel pipelines粗合。
下面我們回到NettyRpcEnvFactory的create方法
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)
if (!config.clientMode) {
//啟動nettyRPCEnv
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
//以上 startNettyRpcEnv 匿名函數(shù)在此處會最終被調(diào)用,當匿名函數(shù)被調(diào)用時乌昔,重點方法是483行 nettyEnv.startServer 方法
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
throw e
}
}
nettyEnv 準備好了之后隙疚,會構(gòu)建一個函數(shù)startNettyRpcEnv,然后startServiceOnPort在會調(diào)用startNettyRpcEnv磕道,進而調(diào)用nettyEnv的startServer函數(shù)供屉,來啟動Server
//啟動Rpc 服務(wù)
def startServer(bindAddress: String, port: Int): Unit = {
val bootstraps: java.util.List[TransportServerBootstrap] =
if (securityManager.isAuthenticationEnabled()) {
java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
} else {
java.util.Collections.emptyList()
}
/**
* transportContext已經(jīng)被創(chuàng)建,這里createServer 就會綁定地址和端口溺蕉,啟動Netty Rpc 服務(wù)
*/
server = transportContext.createServer(bindAddress, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
由于直接初始化時transportContext就已經(jīng)被創(chuàng)建伶丐,這里createServer 就會綁定地址和端口,啟動Netty Rpc 服務(wù)疯特,createServer最在構(gòu)建TransportServer實例時哗魂,會調(diào)用init初始化方法,在這里以前了解過Netty的同學就會非常熟悉了.
private void init(String hostToBind, int portToBind) {
IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup;
PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, allocator)
.childOption(ChannelOption.ALLOCATOR, allocator);
this.metrics = new NettyMemoryMetrics(
allocator, conf.getModuleName() + "-server", conf);
if (conf.backLog() > 0) {
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
}
if (conf.receiveBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
}
if (conf.sendBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
}
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
RpcHandler rpcHandler = appRpcHandler;
for (TransportServerBootstrap bootstrap : bootstraps) {
rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
}
//初始化網(wǎng)絡(luò)通信管道
context.initializePipeline(ch, rpcHandler);
}
});
InetSocketAddress address = hostToBind == null ?
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
channelFuture = bootstrap.bind(address);
channelFuture.syncUninterruptibly();
port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port: {}", port);
}
init 方法就是去初始化一個綁定的主機和端口辙芍,創(chuàng)建nettyRPC通信啡彬,通過之前的rpcHandler來初始化網(wǎng)絡(luò)通信管道,同時會調(diào)用createChannelHandler函數(shù)故硅,創(chuàng)建處理消息的 channelHandler用于處理客戶端請求消息和服務(wù)端回應(yīng)消息
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler, conf.maxChunksBeingTransferred());
/**
* 由以上 responseHandler client requestHandler 三個handler構(gòu)建 TransportChannelHandler
* new TransportChannelHandler 這個對象中有 【channelRead() 方法】,用于讀取接收到的消息
*/
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), closeIdleConnections);
}
最終Rpc的環(huán)境就準備好了庶灿,后面會向RpcEnv中注冊 角色 Driver,Master,Worker,Executor。我們回到Master的startRpcEnvAndEndpoint函數(shù)
/**
* 向RpcEnv 中 注冊Master
*
* rpcEnv.setupEndpoint(name,new Master)
* 這里new Master 的Master 是一個伴生類吃衅,繼承了 ThreadSafeRpcEndpoint往踢,歸根結(jié)底繼承到了 Trait 接口 RpcEndpoint
val masterEndpoint: RpcEndpointRef = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
這里會向RpcEnv中注冊Master,調(diào)用了setupEndpoint函數(shù)徘层,傳入了Master的實例對象峻呕,這里的Master 是一個伴生類,繼承了 ThreadSafeRpcEndpoint趣效,也是RpcEndpoint的實現(xiàn)類瘦癌,而什么是RpcEndpoint?
RpcEndpoint:RPC端點 跷敬,Spark針對于每個節(jié)點(Client/Master/Worker)都稱之一個Rpc端點 ,且都實現(xiàn)RpcEndpoint接口讯私,內(nèi)部根據(jù)不同端點的需求,設(shè)計不同的消息和不同的業(yè)務(wù)處理,如果需要發(fā)送(詢問)則調(diào)用Dispatcher斤寇。
EndPoint中存在
- onstart() :啟動當前Endpoint
- receive() :負責收消息
- receiveAndReply():接受消息并回復
同時Endpoint 還有各自的引用桶癣,方便其他Endpoint發(fā)送消息,直接引用對方的EndpointRef 即可找到對方的Endpoint娘锁,上面源碼中的masterEndpoint 就是Master的Endpoint引用 RpcEndpointRef 牙寞。
RpcEndpointRef中存在
- send():發(fā)送消息
- ask() :請求消息,并等待回應(yīng)莫秆。
接下來我們看看NettyRpcEnv中的setupEndpoint函數(shù)
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
dispatcher.registerRpcEndpoint(name, endpoint)
}
上面直接調(diào)用了dispatcher實例间雀,注冊RpcEndpoint
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
//這里 new EndpointData使用到了endpoint,當new Inbox 時向消息隊列中放入OnStart樣例類標識
if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
//獲取剛剛封裝的EndPointData
val data: EndpointData = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
/**
* receivers 這個消息隊列中放著應(yīng)該去哪個Endpoint 中獲取Message 處理
* 這里其實就是進入 Dispatcher 當前這個類中的 MessageLoop 方法馏锡。這個方法當new Dispatcher后會一直運行雷蹂。
* 將消息放入待處理的消息隊列中,消息首先找到對應(yīng)的Endpoint ,再會獲取當前Endpoint的Inbox 中message,使用process 方法處理
*/
receivers.offer(data) // for the OnStart message
}
endpointRef
}
上面的new EndpointData使用到了endpoint杯道,EndpointData其實就是對endpoint和endpointRef的封裝匪煌,同時內(nèi)部還構(gòu)建了Inbox
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
//將endpoint封裝到Inbox中
val inbox = new Inbox(ref, endpoint)
}
這里的Inbox是其實就是消息收件箱,一個本地端點對應(yīng)一個收件箱党巾,Dispatcher在每次向Inbox存入消息時萎庭,都將對應(yīng)EndpointData加入內(nèi)部待Receiver Queue中,另外Dispatcher創(chuàng)建時會啟動一個單獨線程進行輪詢Receiver Queue齿拂,進行收件箱消息消費驳规。
其實Inbox實例在構(gòu)建過程中也會有消息的存入。
private[netty] class Inbox(
val endpointRef: NettyRpcEndpointRef,
val endpoint: RpcEndpoint)
extends Logging {
....
@GuardedBy("this")
protected val messages = new java.util.LinkedList[InboxMessage]()
// OnStart should be the first message to process
//當注冊endpoint時都會調(diào)用這個異步方法署海,messags中放入一個OnStart樣例類消息對象
inbox.synchronized {
messages.add(OnStart)
}
....
可以看到吗购,構(gòu)建inbox時會向messages中加入一個OnStart消息,該消息會被inbox類中的process所消費砸狞,只不過觸發(fā)時機還在后面捻勉。
回到Dispatcher的registerRpcEndpoint函數(shù)
if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
//獲取剛剛封裝的EndPointData
val data: EndpointData = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
/**
* receivers 這個消息隊列中放著應(yīng)該去哪個Endpoint 中獲取Message 處理
* 這里其實就是進入 Dispatcher 當前這個類中的 MessageLoop 方法。這個方法當new Dispatcher后會一直運行刀森。
* 將消息放入待處理的消息隊列中踱启,消息首先找到對應(yīng)的Endpoint ,再會獲取當前Endpoint的Inbox 中message,使用process 方法處理
*/
receivers.offer(data) // for the OnStart message
}
endpointRef
這里發(fā)現(xiàn)剛才構(gòu)建的EndpointData會被放入到endpoints中,endpoints和endpointRefs的類型都是ConcurrentHashMap研底,對這兩個容器進行相應(yīng)的操作之后埠偿,就會調(diào)用receivers.offer(data)。
receivers是Dispatcher中的一個消息隊列榜晦,這個消息隊列還有一個線程池來進行消息的消費冠蒋,整個模式構(gòu)成一個生產(chǎn)者消費者模式,因此這里將data消息加入到消息隊列中乾胶,會觸發(fā)線程消費浊服。
private val threadpool: ThreadPoolExecutor = {
val availableCores =
if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
math.max(2, availableCores))
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
for (i <- 0 until numThreads) {
pool.execute(new MessageLoop)
}
pool
}
/** Message loop used for dispatching messages. */
private class MessageLoop extends Runnable {
override def run(): Unit = {
try {
while (true) {
try {
//take 出來消息一直處理
val data: EndpointData = receivers.take()
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
receivers.offer(PoisonPill)
return
}
//調(diào)用process 方法處理消息
data.inbox.process(Dispatcher.this)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case ie: InterruptedException => // exit
}
}
}
可以看到上面的MessageLoop中统屈,通過消息隊列receivers取出一條消息,該消息的類型為EndpointData牙躺,內(nèi)部都封裝了inbox實例,因此直接調(diào)用該實例的process函數(shù)去處理消息腕扶,我們之前在inbox實例構(gòu)建過程中孽拷,發(fā)現(xiàn)會向inbox內(nèi)部的消息容器中放入一條onStart消息,因此我們看一下process函數(shù)是如何處理該消息的半抱。
def process(dispatcher: Dispatcher): Unit = {
case OnStart =>
//調(diào)用Endpoint 的onStart函數(shù)
endpoint.onStart()
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
enableConcurrent = true
}
}
}
}
process中對onStart的處理為脓恕,調(diào)用endpoint的onStart函數(shù),而endpoint我們還記得是啥嗎窿侈,這里是我們直接構(gòu)建的Master實例炼幔,因此我們回到Master中去看看onStart()函數(shù)的處理過程。
override def onStart(): Unit = {
.....
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}
onStart函數(shù)涉及到webUI的啟動史简,applicationMetricsSystem和masterMetricsSystem的啟動過程乃秀,如果設(shè)置了restServer還涉及到啟動過程。
同時Master的onStart函數(shù)的部分代碼如上所示圆兵,我么可以看到跺讯,這里涉及到如何在HA的環(huán)境下選主的過程。內(nèi)部會根據(jù)配置決定采用哪種方式殉农,是ZOOKEEPER還是文件系統(tǒng)的方式來進行刀脏。
生產(chǎn)環(huán)境下一般采用Zookeeper做HA,Zookeeper會自動化管理 Master的切換超凳;
采用Zookeeper做HA的時候愈污,Zookeeper會負責保存整個Spark集群運行時候的元數(shù)據(jù):workers、Drivers轮傍、Applications暂雹、Executors;
Zookeeper遇到當前Active級別的Master出現(xiàn)故障的時候會從StandbyMaster中選取一臺作為Active Master金麸,但是要注意擎析,被選舉后到成為真正的ActiveMaster之間需要從Zookeeper中獲取集群當前運行狀態(tài)的元數(shù)據(jù)信息并進行恢復;
在Master切換的過程中挥下,所有的已經(jīng)在運行的程序皆正常運行揍魂!因為Spark Application在運行前就已經(jīng)通過ClusterManager獲得了計算資源,所以在運行時Job本身的調(diào)度和處理和Master是沒有任何關(guān)系的棚瘟!
在Master的切換過程中唯一的影響是不能提交新的job:一方面不能提交新的應(yīng)用程序給集群现斋,因為只有ActiveMaster才能接受新的程序的提交請求;另外一方面偎蘸,已經(jīng)運行的程序中也不能夠因為Action操作觸發(fā)新的Job的提交請求庄蹋;