Thrift RPC實戰(zhàn)(二) Thrift 網(wǎng)絡(luò)服務(wù)模型

限于篇幅關(guān)系魁蒜,在觀察源碼的時候,只列舉了部分源代碼

TServer類層次體系

Paste_Image.png

TSimpleServer/TThreadPoolServer是阻塞服務(wù)模型
TNonblockingServer/THsHaServer/TThreadedSelectotServer是非阻塞服務(wù)模型(NIO)

1 TServer抽象類的定義

內(nèi)部靜態(tài)類Args的定義, 用于TServer類用于串聯(lián)軟件棧(傳輸層, 協(xié)議層, 處理層)


public abstract class TServer {

  public static class Args extends AbstractServerArgs<Args> {

    public Args(TServerTransport transport) {

      super(transport);

    }

  }

 

  public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {

    public AbstractServerArgs(TServerTransport transport);

    public T processorFactory(TProcessorFactory factory);

    public T processor(TProcessor processor);

    public T transportFactory(TTransportFactory factory);

    public T protocolFactory(TProtocolFactory factory);

  }

}

TServer類定義的抽象類


public abstract class TServer {

  public abstract void serve();

  public void stop();

 

  public boolean isServing();

  public void setServerEventHandler(TServerEventHandler eventHandler);

}

評注:

抽象函數(shù)serve由具體的TServer實例來實現(xiàn), 而并非所有的服務(wù)都需要優(yōu)雅的退出, 因此stop沒有被定義為抽象

2 TSimpleServer

TSimpleServer的工作模式采用最簡單的阻塞IO蜗顽,實現(xiàn)方法簡潔明了,便于理解恢筝,但是一次只能接收和處理一個socket連接诞外,效率比較低,主要用于演示Thrift的工作過程堪藐,在實際開發(fā)過程中很少用到它莉兰。

工作方式如圖:


Paste_Image.png

抽象的代碼可簡單描述如下:


// *) server socket進(jìn)行監(jiān)聽

serverSocket.listen();

while ( isServing() ) {

  // *) 接受socket鏈接

  client = serverSocket.accept();

  // *) 封裝處理器

  processor = factory.getProcess(client);

  while ( true ) {

    // *) 阻塞處理rpc的輸入/輸出

    if ( !processor.process(input, output) ) {

      break;   

    }  

  }

}

3 ThreadPoolServer

TThreadPoolServer模式采用阻塞socket方式工作,,主線程負(fù)責(zé)阻塞式監(jiān)聽“監(jiān)聽socket”中是否有新socket到來礁竞,業(yè)務(wù)處理交由一個線程池來處

工作模式圖:


Paste_Image.png

ThreadPoolServer解決了TSimple不支持并發(fā)和多連接的問題, 引入了線程池. 實現(xiàn)的模型是One Thread Per Connection
線程池代碼片段:


private static ExecutorService createDefaultExecutorService(Args args) {
  SynchronousQueue<Runnable> executorQueue =
    new SynchronousQueue<Runnable>();
  return new ThreadPoolExecutor(args.minWorkerThreads,
                                args.maxWorkerThreads,
                                args.stopTimeoutVal,
                                TimeUnit.SECONDS,
                                executorQueue);
}

評注:
  采用同步隊列(SynchronousQueue), 線程池采用能線程數(shù)可伸縮的模式.
主線程循環(huán)簡單描述代碼:


setServing(true);

while (!stopped_) {

  try {

    TTransport client = serverTransport_.accept();

    WorkerProcess wp = new WorkerProcess(client);

    executorService_.execute(wp);

  } catch (TTransportException ttx) {

  }

}

TThreadPoolServer模式優(yōu)點:
線程池模式中糖荒,拆分了監(jiān)聽線程(accept)和處理客戶端連接的工作線程(worker),數(shù)據(jù)讀取和業(yè)務(wù)處理都交由線程池完成,主線程只負(fù)責(zé)監(jiān)聽新連接模捂,因此在并發(fā)量較大時新連接也能夠被及時接受捶朵。線程池模式比較適合服務(wù)器端能預(yù)知最多有多少個客戶端并發(fā)的情況,這時每個請求都能被業(yè)務(wù)線程池及時處理狂男,性能也非常高综看。
TThreadPoolServer模式缺點:
線程池模式的處理能力受限于線程池的工作能力,當(dāng)并發(fā)請求數(shù)大于線程池中的線程數(shù)時岖食,新請求也只能排隊等待

4 TNonblockingServer

TNonblockingServer該模式也是單線程工作红碑,但是采用NIO的模式, 借助Channel/Selector機制, 采用IO事件模型來處理.

所有的socket都被注冊到selector中,在一個線程中通過seletor循環(huán)監(jiān)控所有的socket,每次selector結(jié)束時析珊,處理所有的處于就緒狀態(tài)的socket羡鸥,對于有數(shù)據(jù)到來的socket進(jìn)行數(shù)據(jù)讀取操作,對于有數(shù)據(jù)發(fā)送的socket則進(jìn)行數(shù)據(jù)發(fā)送忠寻,對于監(jiān)聽socket則產(chǎn)生一個新業(yè)務(wù)socket并將其注冊到selector中惧浴。

工作原理圖:

Paste_Image.png

nio部分關(guān)鍵代碼如下:


private void select() {
  try {
    // wait for io events.
    selector.select();

    // process the io events we received
    Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
    while (!stopped_ && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();

      // skip if not valid
      if (!key.isValid()) {
        cleanupSelectionKey(key);
        continue;
      }

      // if the key is marked Accept, then it has to be the server
      // transport.
      if (key.isAcceptable()) {
        handleAccept();
      } else if (key.isReadable()) {
        // deal with reads
        handleRead(key);
      } else if (key.isWritable()) {
        // deal with writes
        handleWrite(key);
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  } catch (IOException e) {
    LOGGER.warn("Got an IOException while selecting!", e);
  }
}

TNonblockingServer模式優(yōu)點:
相比于TSimpleServer效率提升主要體現(xiàn)在IO多路復(fù)用上,TNonblockingServer采用非阻塞IO奕剃,對accept/read/write等IO事件進(jìn)行監(jiān)控和處理衷旅,同時監(jiān)控多個socket的狀態(tài)變化;
TNonblockingServer模式缺點:
TNonblockingServer模式在業(yè)務(wù)處理上還是采用單線程順序來完成纵朋,在業(yè)務(wù)處理比較復(fù)雜柿顶、耗時的時候,例如某些接口函數(shù)需要讀取數(shù)據(jù)庫執(zhí)行時間較長操软,會導(dǎo)致整個服務(wù)被阻塞住九串,此時該模式效率也不高,因為多個調(diào)用請求任務(wù)依然是順序一個接一個執(zhí)行

5 THsHaServer

鑒于TNonblockingServer的缺點, THsHaServer繼承TNonblockingServer寺鸥,引入了線程池去處理, 其模型把讀寫任務(wù)放到線程池去處理.THsHaServer是Half-sync/Half-async的處理模式, Half-aysnc是在處理IO事件上(accept/read/write io), Half-sync用于handler對rpc的同步處理上.

工作模式圖:

Paste_Image.png

/**
 * Helper to create an invoker pool
 */
protected static ExecutorService createInvokerPool(Args options) {
  int minWorkerThreads = options.minWorkerThreads;
  int maxWorkerThreads = options.maxWorkerThreads;
  int stopTimeoutVal = options.stopTimeoutVal;
  TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;

  LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
  ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
    maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);

  return invoker;
}

THsHaServer的優(yōu)點:
與TNonblockingServer模式相比猪钮,THsHaServer在完成數(shù)據(jù)讀取之后,將業(yè)務(wù)處理過程交由一個線程池來完成胆建,主線程直接返回進(jìn)行下一次循環(huán)操作烤低,效率大大提升;
THsHaServer的缺點:
主線程需要完成對所有socket的監(jiān)聽以及數(shù)據(jù)讀寫的工作笆载,當(dāng)并發(fā)請求數(shù)較大時扑馁,且發(fā)送數(shù)據(jù)量較多時,監(jiān)聽socket上新連接請求不能被及時接受凉驻。

6. TThreadedSelectorServer

TThreadedSelectorServer是對以上NonblockingServer的擴充, 其分離了Accept和Read/Write的Selector線程, 同時引入Worker工作線程池. 它也是種Half-sync/Half-async的服務(wù)模型

TThreadedSelectorServer模式是目前Thrift提供的最高級的模式腻要,它內(nèi)部有如果幾個部分構(gòu)成:
(1) 一個AcceptThread線程對象,專門用于處理監(jiān)聽socket上的新連接涝登;
(2) 若干個SelectorThread對象專門用于處理業(yè)務(wù)socket的網(wǎng)絡(luò)I/O操作雄家,所有網(wǎng)絡(luò)數(shù)據(jù)的讀寫均是有這些線程來完成;
(3) 一個負(fù)載均衡器SelectorThreadLoadBalancer對象胀滚,主要用于AcceptThread線程接收到一個新socket連接請求時趟济,決定將這個新連接請求分配給哪個SelectorThread線程。
(4) 一個ExecutorService類型的工作線程池咽笼,在SelectorThread線程中顷编,監(jiān)聽到有業(yè)務(wù)socket中有調(diào)用請求過來,則將請求讀取之后剑刑,交個ExecutorService線程池中的線程完成此次調(diào)用的具體執(zhí)行媳纬;主要用于處理每個rpc請求的handler回調(diào)處理(這部分是同步的).
工作模式圖:

Paste_Image.png

TThreadedSelectorServer模式中有一個專門的線程AcceptThread用于處理新連接請求,因此能夠及時響應(yīng)大量并發(fā)連接請求;另外它將網(wǎng)絡(luò)I/O操作分散到多個SelectorThread線程中來完成钮惠,因此能夠快速對網(wǎng)絡(luò)I/O進(jìn)行讀寫操作杨伙,能夠很好地應(yīng)對網(wǎng)絡(luò)I/O較多的情況

從accpect線程到selectorThreads關(guān)鍵代碼

protected boolean startThreads() {
  try {
    for (int i = 0; i < args.selectorThreads; ++i) {
      selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));//建立事件選擇線程池
    }
    acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
      createSelectorThreadLoadBalancer(selectorThreads));//建立accept接受請求線程
    for (SelectorThread thread : selectorThreads) {
      thread.start();
    }
    acceptThread.start();
    return true;
  } catch (IOException e) {
    LOGGER.error("Failed to start threads!", e);
    return false;
  }
}

負(fù)載均衡器SelectorThreadLoadBalancer對象部分關(guān)鍵代碼:

protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) {
    return new SelectorThreadLoadBalancer(threads);
}

/**
 * A round robin load balancer for choosing selector threads for new
 * connections.
 */
protected static class SelectorThreadLoadBalancer {
    private final Collection<? extends SelectorThread> threads;
    private Iterator<? extends SelectorThread> nextThreadIterator;

    public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) {
        if (threads.isEmpty()) {
            throw new IllegalArgumentException("At least one selector thread is required");
        }
        this.threads = Collections.unmodifiableList(new ArrayList<T>(threads));
        nextThreadIterator = this.threads.iterator();
    }
    //根據(jù)循環(huán)負(fù)載均衡策略獲取一個SelectorThread
    public SelectorThread nextThread() {
        // Choose a selector thread (round robin)
        if (!nextThreadIterator.hasNext()) {
            nextThreadIterator = threads.iterator();
        }
        return nextThreadIterator.next();
    }
}

從SelectorThread線程中,監(jiān)聽到有業(yè)務(wù)socket中有調(diào)用請求萌腿,轉(zhuǎn)到業(yè)務(wù)工作線程池關(guān)鍵代碼

private void handleAccept() {
    final TNonblockingTransport client = doAccept();//取得客戶端的連接
    if (client != null) {
        // Pass this connection to a selector thread
        final SelectorThread targetThread = threadChooser.nextThread();//獲取目標(biāo)SelectorThread

        if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
            doAddAccept(targetThread, client);
        } else {
            // FAIR_ACCEPT
            try {
                invoker.submit(new Runnable() {// 提交client的業(yè)務(wù)給到工作線程
                    public void run() {
                        doAddAccept(targetThread, client);
                    }
                });
            } catch (RejectedExecutionException rx) {
                LOGGER.warn("ExecutorService rejected accept registration!", rx);
                // close immediately
                client.close();
            }
        }
    }
}

demo地址:
碼云:http://git.oschina.net/shunyang/thrift-all/tree/master/thrift-demo
github:https://github.com/shunyang/thrift-all/tree/master/thrift-demo
本文參考文章:

http://www.cnblogs.com/mumuxinfei/p/3875165.html

http://blog.csdn.net/sunmenggmail/article/details/46818147

歡迎大家掃碼關(guān)注我的微信公眾號,與大家一起分享技術(shù)與成長中的故事抖苦。


我的微信公眾號.jpg
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末毁菱,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子锌历,更是在濱河造成了極大的恐慌贮庞,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件究西,死亡現(xiàn)場離奇詭異窗慎,居然都是意外死亡,警方通過查閱死者的電腦和手機卤材,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進(jìn)店門遮斥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人扇丛,你說我怎么就攤上這事术吗。” “怎么了帆精?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵较屿,是天一觀的道長。 經(jīng)常有香客問我卓练,道長隘蝎,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任襟企,我火速辦了婚禮嘱么,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘顽悼。我一直安慰自己拱撵,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布表蝙。 她就那樣靜靜地躺著拴测,像睡著了一般。 火紅的嫁衣襯著肌膚如雪府蛇。 梳的紋絲不亂的頭發(fā)上集索,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天,我揣著相機與錄音,去河邊找鬼务荆。 笑死妆距,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的函匕。 我是一名探鬼主播娱据,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼盅惜!你這毒婦竟也來了中剩?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤抒寂,失蹤者是張志新(化名)和其女友劉穎结啼,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體屈芜,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡郊愧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了井佑。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片属铁。...
    茶點故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖躬翁,靈堂內(nèi)的尸體忽然破棺而出红选,到底是詐尸還是另有隱情,我是刑警寧澤姆另,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布喇肋,位于F島的核電站,受9級特大地震影響迹辐,放射性物質(zhì)發(fā)生泄漏蝶防。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一明吩、第九天 我趴在偏房一處隱蔽的房頂上張望间学。 院中可真熱鬧,春花似錦印荔、人聲如沸低葫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽嘿悬。三九已至,卻和暖如春水泉,著一層夾襖步出監(jiān)牢的瞬間善涨,已是汗流浹背窒盐。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留钢拧,地道東北人蟹漓。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像源内,于是被迫代替她去往敵國和親葡粒。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內(nèi)容