??Kyuubi服務(wù)與HiveServer2服務(wù)非常相似,在Kyuubi中很多類的設(shè)計(jì)和代碼邏輯都參照了HiveServer2(Spark SQL Thrift Server也是同樣的道理)乎折。
??HiveServer2服務(wù)啟動(dòng)的源碼解析參見(jiàn):
??Hive源碼剖析之HiveServer2服務(wù)啟動(dòng)過(guò)程
??Spark SQL Thrift Server服務(wù)啟動(dòng)的源碼解析參見(jiàn)以下兩處文章:
??SparkSQL Hive ThriftServer 源碼解析:Intro
??Spark SQL源碼走讀(一):HiveThriftServer2:Intro
(注:以下代碼模塊中,英文注釋皆為作者注釋蜓席,中文注釋是我自己加上的)
KuubiServer.scala
- KuubiServer類聲明及成員
/**
* Main entrance of Kyuubi Server
*/
private[kyuubi] class KyuubiServer private(name: String)
extends CompositeService(name) with Logging {
//BackendService服務(wù)
private[this] var _beService: BackendService = _
def beService: BackendService = _beService
//FrontendService服務(wù)
private[this] var _feService: FrontendService = _
def feService: FrontendService = _feService
***后續(xù)代碼省略***
}
??簡(jiǎn)單介紹一下KyuubiServer中兩個(gè)很重要的成員FrontendService和BackendService:FrontendService負(fù)責(zé)維護(hù)與客戶端的連接枣耀,與客戶端進(jìn)行交互祈搜,將客戶端的SQL請(qǐng)求轉(zhuǎn)發(fā)至FrontendService;BackendService負(fù)責(zé)執(zhí)行SQL并將執(zhí)行結(jié)果返回給FrontendService笼痛。FrontendService最后將結(jié)果返回至客戶端裙秋。
- main方法
def main(args: Array[String]): Unit = {
SparkUtils.initDaemon(logger)
//加載配置
val conf = new SparkConf(loadDefaults = true)
setupCommonConfig(conf)
try {
val server = new KyuubiServer()
//對(duì)各種服務(wù)進(jìn)行初始化
server.init(conf)
//啟動(dòng)各種服務(wù)
server.start()
info(server.getName + " started!")
if (HighAvailabilityUtils.isSupportDynamicServiceDiscovery(conf)) {
info(s"HA mode: start to add this ${server.getName} instance to Zookeeper...")
HighAvailabilityUtils.addServerInstanceToZooKeeper(server)
}
} catch {
case e: Exception =>
error("Error starting Kyuubi Server", e)
System.exit(-1)
}
- init方法
override def init(conf: SparkConf): Unit = synchronized {
this.conf = conf
_beService = new BackendService()
_feService = new FrontendService(_beService)
//將BackendService和FrontendService服務(wù)加入serviceList中
addService(_beService)
addService(_feService)
//調(diào)用父類CompositeService的init方法
super.init(conf)
SparkUtils.addShutdownHook {
() => this.stop()
}
}
CompositeService的init方法:
override def init(conf: SparkConf): Unit = {
//依次調(diào)用serviceList中各個(gè)服務(wù)的init方法
for (service <- serviceList) {
service.init(conf)
}
super.init(conf)
}
- start方法
override def start(): Unit = {
//遍歷serviceList中所有的服務(wù)并依次啟動(dòng)
serviceList.zipWithIndex.foreach { case (service, i) =>
try {
service.start()
} catch {
case e: Throwable =>
error("Error starting services " + getName, e)
stop(i)
throw new ServiceException("Failed to Start " + getName, e)
}
}
super.start()
}
??調(diào)用各個(gè)服務(wù)的init和start方法時(shí),最終都會(huì)調(diào)用AbstractService的init和start方法(這些服務(wù)類要么直接繼承AbstractService缨伊,要么繼承CompositeService摘刑。而CompositeService又繼承自AbstractService)。
AbstractService的init和start方法:
override def init(conf: SparkConf): Unit = {
ensureCurrentState(State.NOT_INITED)
this.conf = conf
changeState(State.INITED)
info("Service: [" + getName + "] is initialized.")
}
override def start(): Unit = {
startTime = System.currentTimeMillis
ensureCurrentState(State.INITED)
changeState(State.STARTED)
info("Service: [" + getName + "] is started.")
}
??對(duì)照Kyuubi服務(wù)啟動(dòng)日志刻坊,可以看到KyuubiServer依次啟動(dòng)的服務(wù)有:KyuubiServer枷恕、OperationManager、SessionManager谭胚、BackendService和FrontendService(至于為什么也啟動(dòng)了OperationManager和SessionManager服務(wù)徐块,在后續(xù)解析BackendService的源碼時(shí)會(huì)提及)未玻。