FrontendService(Thrift服務(wù)Server端)
??FrontendService負(fù)責(zé)與客戶端進(jìn)行交互:維護(hù)與客戶端的連接簿盅,并將SQL執(zhí)行結(jié)果返回至客戶端。
FrontendService.scala:
FrontendService的類聲明如下所示:??從FrontendService類聲明中可以看出揽祥,該類實(shí)現(xiàn)了Runnable接口讽膏,看一下它如何重寫的run方法。
- run()方法
override def run(): Unit = {
try {
// Server thread pool
val minThreads = conf.get(FRONTEND_MIN_WORKER_THREADS.key).toInt
val maxThreads = conf.get(FRONTEND_MAX_WORKER_THREADS.key).toInt
//使用Java線程池ThreadPoolExecutor
val executorService = new ThreadPoolExecutor(
minThreads,
maxThreads,
conf.getTimeAsSeconds(FRONTEND_WORKER_KEEPALIVE_TIME.key),
TimeUnit.SECONDS,
new SynchronousQueue[Runnable],
new NamedThreadFactory(threadPoolName))
// Thrift configs
authFactory = new KyuubiAuthFactory(conf)
val transportFactory = authFactory.getAuthTransFactory
val processorFactory = authFactory.getAuthProcFactory(this)
//使用TServerSocket創(chuàng)建阻塞式IO盔然,TServerSocket繼承自TTransport
//也就是說(shuō)桅打,TServerSocket屬于TTransport這一層
val serverSocket: TServerSocket = getServerSocket(serverIPAddress, portNum)
// Server args
val maxMessageSize = conf.get(FRONTEND_MAX_MESSAGE_SIZE.key).toInt
val requestTimeout = conf.getTimeAsSeconds(FRONTEND_LOGIN_TIMEOUT.key).toInt
val beBackoffSlotLength = conf.getTimeAsMs(FRONTEND_LOGIN_BEBACKOFF_SLOT_LENGTH.key).toInt
val args = new TThreadPoolServer.Args(serverSocket)
.processorFactory(processorFactory)
.transportFactory(transportFactory)
//客戶端協(xié)議要一致是嗜,這里使用的協(xié)議是TBinaryProtocol愈案,它使用二進(jìn)制格式
.protocolFactory(new TBinaryProtocol.Factory)
.inputProtocolFactory(
new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
.requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
.beBackoffSlotLength(beBackoffSlotLength)
.beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
.executorService(executorService)
// TCP Server
//建立TThreadPoolServer線程池服務(wù)模型
server = Some(new TThreadPoolServer(args))
server.foreach(_.setServerEventHandler(serverEventHandler))
info(s"Starting $name on host ${serverIPAddress.getCanonicalHostName} at port $portNum with" +
s" [$minThreads, $maxThreads] worker threads")
//啟動(dòng)服務(wù)
server.foreach(_.serve())
} catch {
case t: Throwable =>
error("Error starting " + name + " for KyuubiServer", t)
System.exit(-1)
}
}
??其中,TThreadPoolServer涉及到Thrift服務(wù)的通信模型鹅搪,理解Thrift服務(wù)可以參照這篇博文:Thrift 通信模型
??下面這張圖是Thrift服務(wù)的通信協(xié)議棧站绪,TThreadPoolServer屬于TServer的一種模型(圖中紅色方框標(biāo)注處)。TServer主要作用是接收Client的請(qǐng)求丽柿,并轉(zhuǎn)到某個(gè)TProcessor上進(jìn)行請(qǐng)求處理恢准。針對(duì)不同的訪問(wèn)規(guī)模,Thrift提供了不同的TServer模型。TThreadPoolServer使用阻塞IO的多線程服務(wù)器甫题,使用線程池管理處理線程馁筐。
??TThreadPoolServer的示例代碼參見(jiàn):
??Thrift 多線程阻塞式IO服務(wù)模型-TThreadPoolServer
??我們與Hive源碼中org.apache.hive.service.cli.thrift包下ThriftBinaryCLIService(該類繼承自ThriftCLIService)的run方法進(jìn)行類比,可以看出代碼邏輯幾乎完全相似坠非。
@Override
public void run() {
// Server thread pool
String threadPoolName = "HiveServer2-Handler-Pool";
ExecutorService executorService = new ThreadPoolExecutorWithOomHook(minWorkerThreads,
maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new ThreadFactoryWithGarbageCleanup(threadPoolName),
oomHook);
// Thrift configs
hiveAuthFactory = new HiveAuthFactory(hiveConf);
TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
***省略部分代碼***
// Server args
int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
int requestTimeout = (int) hiveConf.getTimeVar(
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS);
int beBackoffSlotLength = (int) hiveConf.getTimeVar(
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);
TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
.processorFactory(processorFactory).transportFactory(transportFactory)
.protocolFactory(new TBinaryProtocol.Factory())
.inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
.requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
.beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
.executorService(executorService);
// TCP Server
server = new TThreadPoolServer(sargs);
server.setServerEventHandler(new TServerEventHandler() {
***省略部分代碼***
}
String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
+ portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
LOG.info(msg);
server.serve();
} catch (Throwable t) {
LOG.error(
"Error starting HiveServer2: could not start "
+ ThriftBinaryCLIService.class.getSimpleName(), t);
System.exit(-1);
}
}
是不是幾乎一毛一樣敏沉?哈哈~~
Thrift服務(wù)Hive JDBC客戶端
??既然Thrift服務(wù)是基于C/S模式,而FrontendService又負(fù)責(zé)與客戶端交互。上文通過(guò)解析FrontendService的代碼盟迟,可以看出Thrift Server端的大致邏輯秋泳。下面簡(jiǎn)單分析一下Thrift Client端的代碼邏輯。
??無(wú)論是MySQL還是Hive攒菠,通過(guò)JDBC連接時(shí)(包括Beeline客戶端連接)的邏輯大致相同迫皱,在連接時(shí)都會(huì)基于反射機(jī)制去加載jar包中的代碼。感興趣的同學(xué)可以參考以下博文(這兩篇分別是兩個(gè)作者寫的):JAVA JDBC(MySQL)驅(qū)動(dòng)源碼分析(一)和JDBC源碼解析(二):獲取connection辖众。
??Hive JDBC連接的jar包叫org.apache.hadoop.hive.jdbc.HiveDriver卓起,當(dāng)JDBC連接時(shí)會(huì)調(diào)用HiveDriver的connect方法,connect方法中又會(huì)實(shí)例化一個(gè)HiveConnection的對(duì)象凹炸。
HiveConnection.java:
- HiveConnection構(gòu)造方法
public HiveConnection(String uri, Properties info) throws SQLException {
***省略部分代碼***
for (int numRetries = 0;;) {
try {
// open the client transport
//這里的transport我理解為socket既绩,當(dāng)然transport不僅僅是socket
openTransport();
// set up the client
client = new TCLIService.Client(new TBinaryProtocol(transport));
// open client session
openSession();
//這行代碼相關(guān)的JIRA還是Hive Commiter幫我提交進(jìn)去的,哈哈哈~
executeInitSql();
break;
}
***省略部分代碼***
}
??Hive源碼中的注釋已經(jīng)寫得很清楚还惠,我們來(lái)分析這一行代碼:
??client = new TCLIService.Client(new TBinaryProtocol(transport));
??這行代碼做了一層層的封裝饲握,最后創(chuàng)建了Client對(duì)象。結(jié)合Thrift服務(wù)的通信模型蚕键,也就是從底層往上一層層地進(jìn)行封裝:TTransport => TProtocol => Client救欧。
??openSession()的解析,請(qǐng)看我寫的這篇博文:Kyuubi服務(wù)源碼解析:OpenSession解析
??這里有必要分析一下openTransport()的過(guò)程锣光,因?yàn)樯婕暗終erberos認(rèn)證笆怠,有助于梳理Kerberos認(rèn)證的流程。詳細(xì)內(nèi)容見(jiàn)我寫的Kerberos文集中HiveConnection之openTransport()方法解析誊爹。