Kyuubi服務(wù)源碼解析:FrontendService

FrontendService(Thrift服務(wù)Server端)

??FrontendService負(fù)責(zé)與客戶端進(jìn)行交互:維護(hù)與客戶端的連接簿盅,并將SQL執(zhí)行結(jié)果返回至客戶端。

FrontendService.scala:

FrontendService的類聲明如下所示:

??從FrontendService類聲明中可以看出揽祥,該類實(shí)現(xiàn)了Runnable接口讽膏,看一下它如何重寫的run方法。

  • run()方法
  override def run(): Unit = {
    try {
      // Server thread pool
      val minThreads = conf.get(FRONTEND_MIN_WORKER_THREADS.key).toInt
      val maxThreads = conf.get(FRONTEND_MAX_WORKER_THREADS.key).toInt
      //使用Java線程池ThreadPoolExecutor
      val executorService = new ThreadPoolExecutor(
        minThreads,
        maxThreads,
        conf.getTimeAsSeconds(FRONTEND_WORKER_KEEPALIVE_TIME.key),
        TimeUnit.SECONDS,
        new SynchronousQueue[Runnable],
        new NamedThreadFactory(threadPoolName))

      // Thrift configs
      authFactory = new KyuubiAuthFactory(conf)
      val transportFactory = authFactory.getAuthTransFactory
      val processorFactory = authFactory.getAuthProcFactory(this)
      //使用TServerSocket創(chuàng)建阻塞式IO盔然,TServerSocket繼承自TTransport
      //也就是說(shuō)桅打,TServerSocket屬于TTransport這一層
      val serverSocket: TServerSocket = getServerSocket(serverIPAddress, portNum)

      // Server args
      val maxMessageSize = conf.get(FRONTEND_MAX_MESSAGE_SIZE.key).toInt
      val requestTimeout = conf.getTimeAsSeconds(FRONTEND_LOGIN_TIMEOUT.key).toInt
      val beBackoffSlotLength = conf.getTimeAsMs(FRONTEND_LOGIN_BEBACKOFF_SLOT_LENGTH.key).toInt
      val args = new TThreadPoolServer.Args(serverSocket)
        .processorFactory(processorFactory)
        .transportFactory(transportFactory)
        //客戶端協(xié)議要一致是嗜,這里使用的協(xié)議是TBinaryProtocol愈案,它使用二進(jìn)制格式
        .protocolFactory(new TBinaryProtocol.Factory)
        .inputProtocolFactory(
          new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
        .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
        .beBackoffSlotLength(beBackoffSlotLength)
        .beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
        .executorService(executorService)
      // TCP Server
      //建立TThreadPoolServer線程池服務(wù)模型
      server = Some(new TThreadPoolServer(args))
      server.foreach(_.setServerEventHandler(serverEventHandler))
      info(s"Starting $name on host ${serverIPAddress.getCanonicalHostName} at port $portNum with" +
        s" [$minThreads, $maxThreads] worker threads")
      //啟動(dòng)服務(wù)
      server.foreach(_.serve())
    } catch {
      case t: Throwable =>
        error("Error starting " + name +  " for KyuubiServer", t)
        System.exit(-1)
    }
  }

??其中,TThreadPoolServer涉及到Thrift服務(wù)的通信模型鹅搪,理解Thrift服務(wù)可以參照這篇博文:Thrift 通信模型
??下面這張圖是Thrift服務(wù)的通信協(xié)議棧站绪,TThreadPoolServer屬于TServer的一種模型(圖中紅色方框標(biāo)注處)。TServer主要作用是接收Client的請(qǐng)求丽柿,并轉(zhuǎn)到某個(gè)TProcessor上進(jìn)行請(qǐng)求處理恢准。針對(duì)不同的訪問(wèn)規(guī)模,Thrift提供了不同的TServer模型。TThreadPoolServer使用阻塞IO的多線程服務(wù)器甫题,使用線程池管理處理線程馁筐。
??TThreadPoolServer的示例代碼參見(jiàn):
??Thrift 多線程阻塞式IO服務(wù)模型-TThreadPoolServer

Thrift通信協(xié)議棧

??我們與Hive源碼中org.apache.hive.service.cli.thrift包下ThriftBinaryCLIService(該類繼承自ThriftCLIService)的run方法進(jìn)行類比,可以看出代碼邏輯幾乎完全相似坠非。

@Override
  public void run() {
      // Server thread pool
      String threadPoolName = "HiveServer2-Handler-Pool";
      ExecutorService executorService = new ThreadPoolExecutorWithOomHook(minWorkerThreads,
          maxWorkerThreads, workerKeepAliveTime, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), new ThreadFactoryWithGarbageCleanup(threadPoolName),
          oomHook);
      // Thrift configs
      hiveAuthFactory = new HiveAuthFactory(hiveConf);
      TTransportFactory transportFactory = hiveAuthFactory.getAuthTransFactory();
      TProcessorFactory processorFactory = hiveAuthFactory.getAuthProcFactory(this);
      ***省略部分代碼***
      // Server args
      int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
      int requestTimeout = (int) hiveConf.getTimeVar(
          HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_TIMEOUT, TimeUnit.SECONDS);
      int beBackoffSlotLength = (int) hiveConf.getTimeVar(
          HiveConf.ConfVars.HIVE_SERVER2_THRIFT_LOGIN_BEBACKOFF_SLOT_LENGTH, TimeUnit.MILLISECONDS);
      TThreadPoolServer.Args sargs = new TThreadPoolServer.Args(serverSocket)
          .processorFactory(processorFactory).transportFactory(transportFactory)
          .protocolFactory(new TBinaryProtocol.Factory())
          .inputProtocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
          .requestTimeout(requestTimeout).requestTimeoutUnit(TimeUnit.SECONDS)
          .beBackoffSlotLength(beBackoffSlotLength).beBackoffSlotLengthUnit(TimeUnit.MILLISECONDS)
          .executorService(executorService);

      // TCP Server
      server = new TThreadPoolServer(sargs);
      server.setServerEventHandler(new TServerEventHandler() {
          ***省略部分代碼***
      }
      String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port "
          + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
      LOG.info(msg);
      server.serve();
    } catch (Throwable t) {
      LOG.error(
          "Error starting HiveServer2: could not start "
              + ThriftBinaryCLIService.class.getSimpleName(), t);
      System.exit(-1);
    }
  }

是不是幾乎一毛一樣敏沉?哈哈~~


Thrift服務(wù)Hive JDBC客戶端

??既然Thrift服務(wù)是基于C/S模式,而FrontendService又負(fù)責(zé)與客戶端交互。上文通過(guò)解析FrontendService的代碼盟迟,可以看出Thrift Server端的大致邏輯秋泳。下面簡(jiǎn)單分析一下Thrift Client端的代碼邏輯。
??無(wú)論是MySQL還是Hive攒菠,通過(guò)JDBC連接時(shí)(包括Beeline客戶端連接)的邏輯大致相同迫皱,在連接時(shí)都會(huì)基于反射機(jī)制去加載jar包中的代碼。感興趣的同學(xué)可以參考以下博文(這兩篇分別是兩個(gè)作者寫的):JAVA JDBC(MySQL)驅(qū)動(dòng)源碼分析(一)JDBC源碼解析(二):獲取connection辖众。
??Hive JDBC連接的jar包叫org.apache.hadoop.hive.jdbc.HiveDriver卓起,當(dāng)JDBC連接時(shí)會(huì)調(diào)用HiveDriver的connect方法,connect方法中又會(huì)實(shí)例化一個(gè)HiveConnection的對(duì)象凹炸。

HiveConnection.java:

  • HiveConnection構(gòu)造方法
 public HiveConnection(String uri, Properties info) throws SQLException {
 ***省略部分代碼***
      for (int numRetries = 0;;) {
        try {
          // open the client transport
          //這里的transport我理解為socket既绩,當(dāng)然transport不僅僅是socket
          openTransport();
          // set up the client
          client = new TCLIService.Client(new TBinaryProtocol(transport));
          // open client session
          openSession();
          //這行代碼相關(guān)的JIRA還是Hive Commiter幫我提交進(jìn)去的,哈哈哈~
          executeInitSql();

          break;
        } 
 ***省略部分代碼***
 }

??Hive源碼中的注釋已經(jīng)寫得很清楚还惠,我們來(lái)分析這一行代碼:
??client = new TCLIService.Client(new TBinaryProtocol(transport));
??這行代碼做了一層層的封裝饲握,最后創(chuàng)建了Client對(duì)象。結(jié)合Thrift服務(wù)的通信模型蚕键,也就是從底層往上一層層地進(jìn)行封裝:TTransport => TProtocol => Client救欧。
??openSession()的解析,請(qǐng)看我寫的這篇博文:Kyuubi服務(wù)源碼解析:OpenSession解析
??這里有必要分析一下openTransport()的過(guò)程锣光,因?yàn)樯婕暗終erberos認(rèn)證笆怠,有助于梳理Kerberos認(rèn)證的流程。詳細(xì)內(nèi)容見(jiàn)我寫的Kerberos文集中HiveConnection之openTransport()方法解析誊爹。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蹬刷,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子频丘,更是在濱河造成了極大的恐慌办成,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,427評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件搂漠,死亡現(xiàn)場(chǎng)離奇詭異迂卢,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)桐汤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門而克,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人怔毛,你說(shuō)我怎么就攤上這事员萍。” “怎么了拣度?”我有些...
    開(kāi)封第一講書人閱讀 165,747評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵碎绎,是天一觀的道長(zhǎng)蜂莉。 經(jīng)常有香客問(wèn)我,道長(zhǎng)混卵,這世上最難降的妖魔是什么映穗? 我笑而不...
    開(kāi)封第一講書人閱讀 58,939評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮幕随,結(jié)果婚禮上蚁滋,老公的妹妹穿的比我還像新娘。我一直安慰自己赘淮,他們只是感情好辕录,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,955評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著梢卸,像睡著了一般走诞。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蛤高,一...
    開(kāi)封第一講書人閱讀 51,737評(píng)論 1 305
  • 那天蚣旱,我揣著相機(jī)與錄音,去河邊找鬼戴陡。 笑死塞绿,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的恤批。 我是一名探鬼主播异吻,決...
    沈念sama閱讀 40,448評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼喜庞!你這毒婦竟也來(lái)了诀浪?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 39,352評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤延都,失蹤者是張志新(化名)和其女友劉穎雷猪,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體窄潭,經(jīng)...
    沈念sama閱讀 45,834評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡春宣,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,992評(píng)論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了嫉你。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,133評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡躏惋,死狀恐怖幽污,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情簿姨,我是刑警寧澤距误,帶...
    沈念sama閱讀 35,815評(píng)論 5 346
  • 正文 年R本政府宣布簸搞,位于F島的核電站,受9級(jí)特大地震影響准潭,放射性物質(zhì)發(fā)生泄漏趁俊。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,477評(píng)論 3 331
  • 文/蒙蒙 一刑然、第九天 我趴在偏房一處隱蔽的房頂上張望寺擂。 院中可真熱鬧,春花似錦泼掠、人聲如沸怔软。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 32,022評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)挡逼。三九已至,卻和暖如春腻豌,著一層夾襖步出監(jiān)牢的瞬間家坎,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,147評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工吝梅, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留乘盖,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,398評(píng)論 3 373
  • 正文 我出身青樓憔涉,卻偏偏與公主長(zhǎng)得像订框,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子兜叨,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,077評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容