前言
Thrift
提供的網(wǎng)絡服務模型:單線程妆偏、多線程刨肃、事件驅(qū)動古拴,從另一個角度劃分為:阻塞服務模型、非阻塞服務模型真友。
阻塞服務模型:
TSimpleServer
黄痪、TThreadPoolServer
。非阻塞服務模型:
TNonblockingServer
盔然、THsHaServer
和TThreadedSelectorServer
桅打。
TServer
類的層次關系:
正文
TServer
TServer
定義了靜態(tài)內(nèi)部類Args
,Args
繼承自抽象類AbstractServerArgs
愈案。AbstractServerArgs
采用了建造者模式挺尾,向TServer
提供各種工廠:
工廠屬性 | 工廠類型 | 作用 |
---|---|---|
ProcessorFactory | TProcessorFactory | 處理層工廠類,用于具體的TProcessor對象的創(chuàng)建 |
InputTransportFactory | TTransportFactory | 傳輸層輸入工廠類站绪,用于具體的TTransport對象的創(chuàng)建 |
OutputTransportFactory | TTransportFactory | 傳輸層輸出工廠類遭铺,用于具體的TTransport對象的創(chuàng)建 |
InputProtocolFactory | TProtocolFactory | 協(xié)議層輸入工廠類,用于具體的TProtocol對象的創(chuàng)建 |
OutputProtocolFactory | TProtocolFactory | 協(xié)議層輸出工廠類恢准,用于具體的TProtocol對象的創(chuàng)建 |
下面是TServer
的部分核心代碼:
public abstract class TServer {
public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {
public Args(TServerTransport transport) {
super(transport);
}
}
public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {
final TServerTransport serverTransport;
TProcessorFactory processorFactory;
TTransportFactory inputTransportFactory = new TTransportFactory();
TTransportFactory outputTransportFactory = new TTransportFactory();
TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
public AbstractServerArgs(TServerTransport transport) {
serverTransport = transport;
}
}
protected TProcessorFactory processorFactory_;
protected TServerTransport serverTransport_;
protected TTransportFactory inputTransportFactory_;
protected TTransportFactory outputTransportFactory_;
protected TProtocolFactory inputProtocolFactory_;
protected TProtocolFactory outputProtocolFactory_;
private boolean isServing;
protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {
processorFactory_ = args.processorFactory;
serverTransport_ = args.serverTransport;
inputTransportFactory_ = args.inputTransportFactory;
outputTransportFactory_ = args.outputTransportFactory;
inputProtocolFactory_ = args.inputProtocolFactory;
outputProtocolFactory_ = args.outputProtocolFactory;
}
public abstract void serve();
public void stop() {}
public boolean isServing() {
return isServing;
}
protected void setServing(boolean serving) {
isServing = serving;
}
}
TServer
的三個方法:serve()
魂挂、stop()
和isServing()
。serve()
用于啟動服務馁筐,stop()
用于關閉服務涂召,isServing()
用于檢測服務的起停狀態(tài)。
TServer
的不同實現(xiàn)類的啟動方式不一樣敏沉,因此serve()
定義為抽象方法芹扭。不是所有的服務都需要優(yōu)雅的退出, 因此stop()
方法沒有被定義為抽象。
TSimpleServer
TSimpleServer
的工作模式采用最簡單的阻塞IO赦抖,實現(xiàn)方法簡潔明了舱卡,便于理解,但是一次只能接收和處理一個socket
連接队萤,效率比較低轮锥。它主要用于演示Thrift
的工作過程,在實際開發(fā)過程中很少用到它要尔。
(一) 工作流程
(二) 使用入門
服務端:
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor processor =
new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport);
tArgs.processor(processor);
tArgs.protocolFactory(protocolFactory);
// 簡單的單線程服務模型 一般用于測試
TServer tServer = new TSimpleServer(tArgs);
System.out.println("Running Simple Server");
tServer.serve();
客戶端:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("Leo");
System.out.println("Result =: " + result);
transport.close();
(三) 源碼分析
查看上述流程的源代碼舍杜,即TSimpleServer.java
中的serve()
方法如下:
serve()
方法的操作:
- 設置
TServerSocket
的listen()
方法啟動連接監(jiān)聽。 - 以阻塞的方式接受客戶端地連接請求赵辕,每進入一個連接即為其創(chuàng)建一個通道
TTransport
對象既绩。 - 為客戶端創(chuàng)建處理器對象、輸入傳輸通道對象还惠、輸出傳輸通道對象饲握、輸入?yún)f(xié)議對象和輸出協(xié)議對象。
- 通過
TServerEventHandler
對象處理具體的業(yè)務請求。
ThreadPoolServer
TThreadPoolServer
模式采用阻塞socket
方式工作救欧,主線程負責阻塞式監(jiān)聽是否有新socket
到來衰粹,具體的業(yè)務處理交由一個線程池來處理。
(一) 工作流程
(二) 使用入門
服務端:
ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
TServerSocket serverTransport = new TServerSocket(serverSocket);
HelloWorldService.Processor<HelloWorldService.Iface> processor =
new HelloWorldService.Processor<>(new HelloWorldServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
ttpsArgs.processor(processor);
ttpsArgs.protocolFactory(protocolFactory);
// 線程池服務模型 使用標準的阻塞式IO 預先創(chuàng)建一組線程處理請求
TServer ttpsServer = new TThreadPoolServer(ttpsArgs);
System.out.println("Running ThreadPool Server");
ttpsServer.serve();
客戶端:
TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadPoolClient");
System.out.println("Result =: " + result);
transport.close();
(三) 源碼分析
ThreadPoolServer
解決了TSimpleServer
不支持并發(fā)和多連接的問題笆怠,引入了線程池铝耻。實現(xiàn)的模型是One Thread Per Connection
。查看上述流程的源代碼蹬刷,先查看線程池的代碼片段:
TThreadPoolServer.java
中的serve()
方法如下:
serve()
方法的操作:
- 設置
TServerSocket
的listen()
方法啟動連接監(jiān)聽瓢捉。 - 以阻塞的方式接受客戶端的連接請求,每進入一個連接办成,將通道對象封裝成一個
WorkerProcess
對象(WorkerProcess
實現(xiàn)了Runnabel
接口)泡态,并提交到線程池。 -
WorkerProcess
的run()
方法負責業(yè)務處理诈火,為客戶端創(chuàng)建了處理器對象兽赁、輸入傳輸通道對象状答、輸出傳輸通道對象冷守、輸入?yún)f(xié)議對象和輸出協(xié)議對象。 - 通過
TServerEventHandler
對象處理具體的業(yè)務請求惊科。
WorkerProcess
的run()
方法:
(四) 優(yōu)缺點
TThreadPoolServer模式的優(yōu)點
拆分了監(jiān)聽線程(Accept Thread
)和處理客戶端連接的工作線程(Worker Thread
)拍摇,數(shù)據(jù)讀取和業(yè)務處理都交給線程池處理。因此在并發(fā)量較大時新連接也能夠被及時接受馆截。
線程池模式比較適合服務器端能預知最多有多少個客戶端并發(fā)的情況充活,這時每個請求都能被業(yè)務線程池及時處理,性能也非常高蜡娶。
TThreadPoolServer模式的缺點
線程池模式的處理能力受限于線程池的工作能力混卵,當并發(fā)請求數(shù)大于線程池中的線程數(shù)時,新請求也只能排隊等待窖张。
TNonblockingServer
TNonblockingServer
模式也是單線程工作幕随,但是采用NIO
的模式,借助Channel/Selector
機制, 采用IO
事件模型來處理宿接。
所有的socket
都被注冊到selector
中赘淮,在一個線程中通過seletor
循環(huán)監(jiān)控所有的socket
。
每次selector
循環(huán)結(jié)束時睦霎,處理所有的處于就緒狀態(tài)的socket
梢卸,對于有數(shù)據(jù)到來的socket
進行數(shù)據(jù)讀取操作,對于有數(shù)據(jù)發(fā)送的socket則進行數(shù)據(jù)發(fā)送操作副女,對于監(jiān)聽socket
則產(chǎn)生一個新業(yè)務socket
并將其注冊到selector
上蛤高。
注意:TNonblockingServer要求底層的傳輸通道必須使用TFramedTransport。
(一) 工作流程
(二) 使用入門
服務端:
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
tnbArgs.processor(tprocessor);
tnbArgs.transportFactory(new TFramedTransport.Factory());
tnbArgs.protocolFactory(new TCompactProtocol.Factory());
// 使用非阻塞式IO服務端和客戶端需要指定TFramedTransport數(shù)據(jù)傳輸?shù)姆绞? TServer server = new TNonblockingServer(tnbArgs);
System.out.println("Running Non-blocking Server");
server.serve();
客戶端:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// 協(xié)議要和服務端一致
TProtocol protocol = new TCompactProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("NonBlockingClient");
System.out.println("Result =: " + result);
transport.close();
(三) 源碼分析
TNonblockingServer
繼承于AbstractNonblockingServer
,這里我們更關心基于NIO
的selector
部分的關鍵代碼襟齿。
(四) 優(yōu)缺點
TNonblockingServer模式優(yōu)點
相比于TSimpleServer
效率提升主要體現(xiàn)在IO
多路復用上姻锁,TNonblockingServer
采用非阻塞IO
,對accept/read/write
等IO
事件進行監(jiān)控和處理猜欺,同時監(jiān)控多個socket
的狀態(tài)變化位隶。
TNonblockingServer模式缺點
TNonblockingServer
模式在業(yè)務處理上還是采用單線程順序來完成。在業(yè)務處理比較復雜开皿、耗時的時候涧黄,例如某些接口函數(shù)需要讀取數(shù)據(jù)庫執(zhí)行時間較長,會導致整個服務被阻塞住赋荆,此時該模式效率也不高笋妥,因為多個調(diào)用請求任務依然是順序一個接一個執(zhí)行。
THsHaServer
鑒于TNonblockingServer
的缺點窄潭,THsHaServer
繼承于TNonblockingServer
春宣,引入了線程池提高了任務處理的并發(fā)能力。THsHaServer
是半同步半異步(Half-Sync/Half-Async
)的處理模式嫉你,Half-Aysnc
用于IO
事件處理(Accept/Read/Write
)月帝,Half-Sync
用于業(yè)務handler
對rpc
的同步處理上。
注意:THsHaServer和TNonblockingServer一樣幽污,要求底層的傳輸通道必須使用TFramedTransport嚷辅。
(一) 工作流程
(二) 使用入門
服務端:
TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// 半同步半異步
THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
thhsArgs.processor(tprocessor);
thhsArgs.transportFactory(new TFramedTransport.Factory());
thhsArgs.protocolFactory(new TBinaryProtocol.Factory());
TServer server = new THsHaServer(thhsArgs);
System.out.println("Running HsHa Server");
server.serve();
客戶端:
TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
// 協(xié)議要和服務端一致
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("HsHaClient");
System.out.println("Result =: " + result);
transport.close();
(三) 源碼分析
THsHaServer
繼承于TNonblockingServer
,新增了線程池并發(fā)處理工作任務的功能距误,查看線程池的相關代碼:
任務線程池的創(chuàng)建過程:
下文的TThreadedSelectorServer囊括了THsHaServer的大部分特性簸搞,源碼分析可參考TThreadedSelectorServer。
(四) 優(yōu)缺點
THsHaServer的優(yōu)點
THsHaServer
與TNonblockingServer
模式相比准潭,THsHaServer
在完成數(shù)據(jù)讀取之后趁俊,將業(yè)務處理過程交由一個線程池來完成,主線程直接返回進行下一次循環(huán)操作刑然,效率大大提升寺擂。
THsHaServer的缺點
主線程仍然需要完成所有socket
的監(jiān)聽接收、數(shù)據(jù)讀取和數(shù)據(jù)寫入操作闰集。當并發(fā)請求數(shù)較大時沽讹,且發(fā)送數(shù)據(jù)量較多時,監(jiān)聽socket
上新連接請求不能被及時接受武鲁。
TThreadedSelectorServer
TThreadedSelectorServer
是對THsHaServer
的一種擴充爽雄,它將selector
中的讀寫IO
事件(read/write
)從主線程中分離出來。同時引入worker
工作線程池沐鼠,它也是種Half-Sync/Half-Async
的服務模型挚瘟。
TThreadedSelectorServer
模式是目前Thrift
提供的最高級的線程服務模型叹谁,它內(nèi)部有如果幾個部分構(gòu)成:
-
一個
AcceptThread
線程對象,專門用于處理監(jiān)聽socket
上的新連接乘盖。 -
若干個
SelectorThread
對象專門用于處理業(yè)務socket
的網(wǎng)絡I/O
讀寫操作焰檩,所有網(wǎng)絡數(shù)據(jù)的讀寫均是有這些線程來完成。 - 一個負載均衡器
SelectorThreadLoadBalancer
對象订框,主要用于AcceptThread
線程接收到一個新socket
連接請求時析苫,決定將這個新連接請求分配給哪個SelectorThread
線程。 - 一個
ExecutorService
類型的工作線程池穿扳,在SelectorThread
線程中衩侥,監(jiān)聽到有業(yè)務socket
中有調(diào)用請求過來,則將請求數(shù)據(jù)讀取之后矛物,交給ExecutorService
線程池中的線程完成此次調(diào)用的具體執(zhí)行茫死。主要用于處理每個rpc
請求的handler
回調(diào)處理(這部分是同步的)。
(一) 工作流程
(二) 使用入門
服務端:
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
// 多線程半同步半異步
TThreadedSelectorServer.Args ttssArgs = new TThreadedSelectorServer.Args(serverSocket);
ttssArgs.processor(processor);
ttssArgs.protocolFactory(new TBinaryProtocol.Factory());
// 使用非阻塞式IO時 服務端和客戶端都需要指定數(shù)據(jù)傳輸方式為TFramedTransport
ttssArgs.transportFactory(new TFramedTransport.Factory());
// 多線程半同步半異步的服務模型
TThreadedSelectorServer server = new TThreadedSelectorServer(ttssArgs);
System.out.println("Running ThreadedSelector Server");
server.serve();
客戶端:
for (int i = 0; i < 10; i++) {
new Thread("Thread " + i) {
@Override
public void run() {
// 設置傳輸通道 對于非阻塞服務 需要使用TFramedTransport(用于將數(shù)據(jù)分塊發(fā)送)
for (int j = 0; j < 10; j++) {
TTransport transport = null;
try {
transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
TProtocol protocol = new TBinaryProtocol(transport);
HelloWorldService.Client client = new HelloWorldService.Client(protocol);
transport.open();
String result = client.say("ThreadedSelector Client");
System.out.println("Result =: " + result);
transport.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 關閉傳輸通道
transport.close();
}
}
}
}.start();
}
(三) 核心代碼
以上工作流程的三個組件AcceptThread
履羞、SelectorThread
和ExecutorService
在源碼中的定義如下:
TThreadedSelectorServer
模式中有一個專門的線程AcceptThread
用于處理新連接請求峦萎,因此能夠及時響應大量并發(fā)連接請求;另外它將網(wǎng)絡I/O操作分散到多個SelectorThread
線程中來完成忆首,因此能夠快速對網(wǎng)絡I/O
進行讀寫操作爱榔,能夠很好地應對網(wǎng)絡I/O
較多的情況。
TThreadedSelectorServer
默認參數(shù)定義如下:
- 負責網(wǎng)絡IO讀寫的selector默認線程數(shù)(selectorThreads):2
- 負責業(yè)務處理的默認工作線程數(shù)(workerThreads):5
- 工作線程池單個線程的任務隊列大小(acceptQueueSizePerThread):4
創(chuàng)建雄卷、初始化并啟動AcceptThread
和SelectorThreads
搓蚪,同時啟動selector
線程的負載均衡器(selectorThreads
)蛤售。
AcceptThread源碼
AcceptThread
繼承于Thread
丁鹉,可以看出包含三個重要的屬性:非阻塞式傳輸通道(TNonblockingServerTransport
)、NIO
選擇器(acceptSelector
)和選擇器線程負載均衡器(threadChooser
)悴能。
查看AcceptThread
的run()
方法揣钦,可以看出accept
線程一旦啟動,就會不停地調(diào)用select()
方法:
查看select()
方法漠酿,acceptSelector
選擇器等待IO
事件的到來冯凹,拿到SelectionKey
即檢查是不是accept
事件。如果是炒嘲,通過handleAccept()
方法接收一個新來的連接宇姚;否則,如果是IO
讀寫事件夫凸,AcceptThread
不作任何處理浑劳,交由SelectorThread
完成。
在handleAccept()
方法中夭拌,先通過doAccept()
去拿連接通道魔熏,然后Selector
線程負載均衡器選擇一個Selector
線程衷咽,完成接下來的IO
讀寫事件。
[圖片上傳失敗...(image-314d7a-1536936397051)]
接下來繼續(xù)查看doAddAccept()
方法的實現(xiàn)蒜绽,毫無懸念镶骗,它進一步調(diào)用了SelectorThread
的addAcceptedConnection()
方法,把非阻塞傳輸通道對象傳遞給選擇器線程做進一步的IO
讀寫操作躲雅。
SelectorThreadLoadBalancer源碼
SelectorThreadLoadBalancer
如何創(chuàng)建鼎姊?
SelectorThreadLoadBalancer
是一個基于輪詢算法的Selector
線程選擇器,通過線程迭代器為新進來的連接順序分配SelectorThread
相赁。
SelectorThread源碼
SelectorThread
和AcceptThread
一樣此蜈,是TThreadedSelectorServer
的一個成員內(nèi)部類,每個SelectorThread
線程對象內(nèi)部都有一個阻塞式的隊列噪生,用于存放該線程被接收的連接通道裆赵。
[站外圖片上傳中...(image-ce3409-1536936397051)]
阻塞隊列的大小可由構(gòu)造函數(shù)指定:
上面看到,在AcceptThread
的doAddAccept()
方法中調(diào)用了SelectorThread
的addAcceptedConnection()
方法跺嗽。
這個方法做了兩件事:
- 將被此
SelectorThread
線程接收的連接通道放入阻塞隊列中战授。 - 通過
wakeup()
方法喚醒SelectorThread
中的NIO
選擇器selector
。
既然SelectorThread
也是繼承于Thread
桨嫁,查看其run()
方法的實現(xiàn):
SelectorThread
方法的select()
監(jiān)聽IO
事件植兰,僅僅用于處理數(shù)據(jù)讀取和數(shù)據(jù)寫入。如果連接有數(shù)據(jù)可讀璃吧,讀取并以frame
的方式緩存楣导;如果需要向連接中寫入數(shù)據(jù),緩存并發(fā)送客戶端的數(shù)據(jù)畜挨。且在數(shù)據(jù)讀寫處理完成后筒繁,需要向NIO
的selector
清空和注銷自身的SelectionKey
。
-
數(shù)據(jù)寫操作完成以后巴元,整個
rpc
調(diào)用過程也就結(jié)束了毡咏,handleWrite()
方法如下:
-
數(shù)據(jù)讀操作完成以后,
Thrift
會利用已讀數(shù)據(jù)執(zhí)行目標方法逮刨,handleRead()
方法如下:
handleRead
方法在執(zhí)行read()
方法呕缭,將數(shù)據(jù)讀取完成后,會調(diào)用requestInvoke()
方法調(diào)用目標方法完成具體業(yè)務處理修己。requestInvoke()
方法將請求數(shù)據(jù)封裝為一個Runnable
對象恢总,提交到工作任務線程池(ExecutorService
)進行處理。
select()
方法完成后睬愤,線程繼續(xù)運行processAcceptedConnections()
方法處理下一個連接的IO
事件片仿。
這里比較核心的幾個操作:
- 嘗試從
SelectorThread
的阻塞隊列acceptedQueue
中獲取一個連接的傳輸通道。如果獲取成功戴涝,調(diào)用registerAccepted()
方法滋戳;否則钻蔑,進入下一次循環(huán)。 -
registerAccepted()
方法將傳輸通道底層的連接注冊到NIO
的選擇器selector
上面奸鸯,獲取到一個SelectionKey
咪笑。 - 創(chuàng)建一個
FrameBuffer
對象,并綁定到獲取的SelectionKey
上面娄涩,用于數(shù)據(jù)傳輸時的中間讀寫緩存窗怒。
總結(jié)
本文對Thrift
的各種線程服務模型進行了介紹,包括2種阻塞式服務模型:TSimpleServer
蓄拣、TThreadPoolServer
扬虚,3種非阻塞式服務模型:TNonblockingServer
、THsHaServer
和TThreadedSelectorServer
球恤。對各種服務模型的具體用法辜昵、工作流程、原理和源碼實現(xiàn)進行了一定程度的分析咽斧。
鑒于篇幅較長堪置,請各位看官請慢慢批閱!
歡迎關注技術公眾號: 零壹技術棧
本帳號將持續(xù)分享后端技術干貨张惹,包括虛擬機基礎舀锨,多線程編程,高性能框架宛逗,異步坎匿、緩存和消息中間件失仁,分布式和微服務胶滋,架構(gòu)學習和進階等學習資料和文章。