TNonblockingServer是服務(wù)端五個(gè)服務(wù)器之一域慷,TNonblockingServer的工作模式也是單線程工作严就,但是該模式與TSimpleServer模式不同之處就是采用NIO的方式总寻,這樣可以避免每個(gè)客戶端阻塞等待,它的accept,read,write都是注冊(cè)在同一個(gè)Selector上梢为,它內(nèi)部用來(lái)處理這幾個(gè)事件的是SelectAcceptThread線程, 它繼承自AbstractNonblockingServer渐行,AbstractNonblockingServer里面封裝了一個(gè)FrameBuffer作為數(shù)據(jù)輸入輸出流的緩沖轰坊,同時(shí)還充當(dāng)了rpc的調(diào)用,F(xiàn)rameBuffer由后面說(shuō)明祟印,這篇只說(shuō)明TNonblockingServer線程io模型肴沫。下面先看下TNonblockingServer的流程圖。
TNonblockingServer模式優(yōu)點(diǎn):
相比于TSimpleServer效率提升主要體現(xiàn)在IO多路復(fù)用上蕴忆,TNonblockingServer采用非阻塞IO颤芬,同時(shí)監(jiān)控多個(gè)socket的狀態(tài)變化;
TNonblockingServer模式缺點(diǎn):
TNonblockingServer模式在業(yè)務(wù)處理上還是采用單線程順序來(lái)完成套鹅,在業(yè)務(wù)處理比較復(fù)雜站蝠、耗時(shí)的時(shí)候,例如某些接口函數(shù)需要讀取數(shù)據(jù)庫(kù)執(zhí)行時(shí)間較長(zhǎng)卓鹿,此時(shí)該模式效率也不高躏精,因?yàn)槎鄠€(gè)調(diào)用請(qǐng)求任務(wù)依然是順序一個(gè)接一個(gè)執(zhí)行办陷。
。
TNonblockingServer服務(wù)端使用流程大概如下:
public static void main(String[] args) {
private Integer port = 9090;
//thrift支持的scoker有很多種,這個(gè)是非阻塞的socker
TNonblockingServerSocket socket = new TNonblockingServerSocket(port);
TNonblockingServer.Args args = new TNonblockingServer.Args(socket);
//---------------thrift傳輸協(xié)議------------------------------
//1. TBinaryProtocol 二進(jìn)制傳輸協(xié)議
//2. TCompactProtocol 壓縮協(xié)議 他是基于TBinaryProtocol二進(jìn)制協(xié)議在進(jìn)一步的壓縮,使得體積更小
//3. TJSONProtocol Json格式傳輸協(xié)議
//4. TSimpleJSONProtocol 簡(jiǎn)單JSON只寫協(xié)議股冗,生成的文件很容易通過(guò)腳本語(yǔ)言解析媳危,實(shí)際開發(fā)中很少使用
//5. TDebugProtocol 簡(jiǎn)單易懂的可讀協(xié)議鹅搪,調(diào)試的時(shí)候用于方便追蹤傳輸過(guò)程中的數(shù)據(jù)
//-----------------------------------------------------------
//協(xié)議工廠 TCompactProtocol壓縮工廠 二進(jìn)制壓縮協(xié)議
arg.protocolFactory(new TCompactProtocol.Factory());
//---------------thrift傳輸格式------------------------------
//---------------thrift數(shù)據(jù)傳輸方式------------------------------
//1. TSocker 阻塞式Scoker 相當(dāng)于Java中的ServerSocket
//2. TFrameTransport 以frame為單位進(jìn)行數(shù)據(jù)傳輸恤左,非阻塞式服務(wù)中使用
//3. TFileTransport 以文件的形式進(jìn)行傳輸
//4. TMemoryTransport 將內(nèi)存用于IO,Java實(shí)現(xiàn)的時(shí)候內(nèi)部實(shí)際上是使用了簡(jiǎn)單的ByteArrayOutputStream
//5. TZlibTransport 使用zlib進(jìn)行壓縮,與其他傳世方式聯(lián)合使用稚失;java當(dāng)前無(wú)實(shí)現(xiàn)所以無(wú)法使用
//傳輸工廠 更加底層的概念
arg.transportFactory(new TFramedTransport.Factory());
//---------------thrift數(shù)據(jù)傳輸方式------------------------------
//設(shè)置處理器(Processor)工廠
//processor已經(jīng)設(shè)置好了業(yè)務(wù)處理邏輯
arg.processorFactory(new TProcessorFactory(processor));
TNonblockingServer tNonblockingServer = new TNonblockingServer(arg);
//啟動(dòng)服務(wù)器
tNonblockingServer.serve();
}
我們先來(lái)看下TNonblockingServer的類圖
TNonblockingServer繼承了抽象類AbstractNonblockingServer栋艳,AbstractNonblockingServer又繼承了TServer抽象類。
我們先來(lái)看下TServer接口的源代碼句各。
/**
* Generic interface for a Thrift server.
*
*/
public abstract class TServer {
public static class Args extends AbstractServerArgs<Args> {
//參數(shù)傳進(jìn)來(lái)一個(gè)TServerTransport類型的參數(shù)吸占,
//記住名字帶有Server多半都是在服務(wù)端才用到的參數(shù)。
public Args(TServerTransport transport) {
super(transport);//一層一層的傳入凿宾,最后賦值給AbstractServerArgs類里的serverTransport
}
}
public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {
//用來(lái)存儲(chǔ)服務(wù)端的socket矾屯,通過(guò)構(gòu)造方法傳入
final TServerTransport serverTransport;
TProcessorFactory processorFactory;
//一些傳輸類工廠和協(xié)議類工廠,此處實(shí)例化是當(dāng)初默認(rèn)值
TTransportFactory inputTransportFactory = new TTransportFactory();
TTransportFactory outputTransportFactory = new TTransportFactory();
TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
public AbstractServerArgs(TServerTransport transport) {
serverTransport = transport;
}
//省略AbstractServerArgs類里面的方法...
}
//省略TServer 類里面的方法...
}
TServer里面有一個(gè)靜態(tài)類Args初厚,繼承了抽象類AbstractServerArgs件蚕,AbstractServerArgs的源碼也在TServer里面
public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {
final TServerTransport serverTransport;
TProcessorFactory processorFactory;
TTransportFactory inputTransportFactory = new TTransportFactory();
TTransportFactory outputTransportFactory = new TTransportFactory();
TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
public AbstractServerArgs(TServerTransport transport) {
serverTransport = transport;
}
//傳入TProcessorFactory
public T processorFactory(TProcessorFactory factory) {
this.processorFactory = factory;
return (T) this;
}
//new一個(gè)TProcessorFactory工廠,將傳入的TProcessor 放入TProcessorFactory工廠內(nèi)产禾,
public T processor(TProcessor processor) {
this.processorFactory = new TProcessorFactory(processor);
return (T) this;
}
//設(shè)置傳輸工廠
public T transportFactory(TTransportFactory factory) {
this.inputTransportFactory = factory;
this.outputTransportFactory = factory;
return (T) this;
}
//設(shè)置輸入傳輸工廠
public T inputTransportFactory(TTransportFactory factory) {
this.inputTransportFactory = factory;
return (T) this;
}
//設(shè)置輸出傳輸工廠
public T outputTransportFactory(TTransportFactory factory) {
this.outputTransportFactory = factory;
return (T) this;
}
//設(shè)置協(xié)議工廠
public T protocolFactory(TProtocolFactory factory) {
this.inputProtocolFactory = factory;
this.outputProtocolFactory = factory;
return (T) this;
}
//設(shè)置輸入?yún)f(xié)議工廠
public T inputProtocolFactory(TProtocolFactory factory) {
this.inputProtocolFactory = factory;
return (T) this;
}
//設(shè)置輸出協(xié)議工廠
public T outputProtocolFactory(TProtocolFactory factory) {
this.outputProtocolFactory = factory;
return (T) this;
}
}
TServer 類里面也有一些屬性排作,但是這些屬性都是在構(gòu)造函數(shù)里面通過(guò)args的屬性獲得
public abstract class TServer {
/**
* Core processor
*/
protected TProcessorFactory processorFactory_;
/**
* Server transport
*
*/
protected TServerTransport serverTransport_;
/**
* Input Transport Factory
*/
protected TTransportFactory inputTransportFactory_;
/**
* Output Transport Factory
*/
protected TTransportFactory outputTransportFactory_;
/**
* Input Protocol Factory
*/
protected TProtocolFactory inputProtocolFactory_;
/**
* Output Protocol Factory
*/
protected TProtocolFactory outputProtocolFactory_;
private volatile boolean isServing;
protected TServerEventHandler eventHandler_;
// Flag for stopping the server
// Please see THRIFT-1795 for the usage of this flag
protected volatile boolean stopped_ = false;
protected TServer(TServer.AbstractServerArgs args) {
//TServer里的屬性在初始化的時(shí)候通過(guò)args里的屬性獲取
processorFactory_ = args.processorFactory;
serverTransport_ = args.serverTransport;
inputTransportFactory_ = args.inputTransportFactory;
outputTransportFactory_ = args.outputTransportFactory;
inputProtocolFactory_ = args.inputProtocolFactory;
outputProtocolFactory_ = args.outputProtocolFactory;
}
/**
* The run method fires up the server and gets things going.
* 這個(gè)方法用來(lái)啟動(dòng)服務(wù),是個(gè)抽象方法亚情,詳細(xì)的啟動(dòng)步驟由他的繼承類實(shí)現(xiàn)
*/
public abstract void serve();
/**
* Stop the server. This is optional on a per-implementation basis. Not
* all servers are required to be cleanly stoppable.
*/
public void stop() {
}
public boolean isServing() {
return isServing;
}
protected void setServing(boolean serving) {
isServing = serving;
}
public void setServerEventHandler(TServerEventHandler eventHandler) {
eventHandler_ = eventHandler;
}
public TServerEventHandler getEventHandler() {
return eventHandler_;
}
public boolean getShouldStop() {
return this.stopped_;
}
public void setShouldStop(boolean shouldStop) {
this.stopped_ = shouldStop;
}
}
下面我么再來(lái)分析AbstractNonblockingServer 的代碼
public abstract class AbstractNonblockingServer extends TServer {
//千篇一律吧AbstractNonblockingServerArgs繼承于AbstractServerArgs
public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> {
//多了個(gè)屬性 最大buffer讀取大小
public long maxReadBufferBytes = 256 * 1024 * 1024;
public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
super(transport);
transportFactory(new TFramedTransport.Factory());
}
}
/**
* The maximum amount of memory we will allocate to client IO buffers at a
* time. Without this limit, the server will gladly allocate client buffers
* right into an out of memory exception, rather than waiting.
* 一次分配給客戶端IO緩沖區(qū)的最大內(nèi)存量妄痪,如果沒有這個(gè)限制,
* 服務(wù)器分配給客戶端緩沖區(qū)過(guò)大的話會(huì)拋出內(nèi)存不足異常中楞件,而不是等待衫生。
* 暫時(shí)還不懂裳瘪,看源碼英文注釋自己想象翻譯的
*/
final long MAX_READ_BUFFER_BYTES;
/**
* How many bytes are currently allocated to read buffers.
*/
final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
super(args);
MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
}
/**
* Begin accepting connections and processing invocations.
* 服務(wù)器通過(guò)調(diào)用這個(gè)方法開始接受客戶端請(qǐng)求,進(jìn)行業(yè)務(wù)邏輯處理
*/
public void serve() {
/**
* Begin accepting connections and processing invocations.
* 開啟一個(gè)線程進(jìn)行接受客戶端請(qǐng)求罪针,進(jìn)行一系列的邏輯處理彭羹,但是這個(gè)方法沒有實(shí)現(xiàn)體,
* 詳細(xì)的實(shí)現(xiàn)留給繼承AbstractNonblockingServer類的類去實(shí)現(xiàn)
*/
if (!startThreads()) {
return;
}
// start listening, or exit
//服務(wù)端開始接受連接站故,此法方法在此類里面已經(jīng)實(shí)現(xiàn)了
if (!startListening()) {
return;
}
//設(shè)置服務(wù)端是否正在服務(wù)為true
setServing(true);
// this will block while we serve
/**
* Begin accepting connections and processing invocations.
* 阻塞當(dāng)前服務(wù)皆怕,也就是開啟這個(gè)server的線程毅舆,知道開啟server的線程被中斷
* 詳細(xì)的實(shí)現(xiàn)留給繼承AbstractNonblockingServer類的類去實(shí)現(xiàn)
*/
waitForShutdown();
setServing(false);
// do a little cleanup
stopListening();
}
}
AbstractNonblockingServer 類里面還有個(gè)比較重要的抽象類:AbstractSelectThread西篓,SelectThread都繼承于這個(gè)抽象類
protected abstract class AbstractSelectThread extends Thread {
protected Selector selector;
// List of FrameBuffers that want to change their selection interests.
protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
public AbstractSelectThread() throws IOException {
this.selector = SelectorProvider.provider().openSelector();
}
還有兩個(gè)比較重要的方法
//這兩個(gè)方法大部分情況下會(huì)在startThreads()方法里面被調(diào)用。
/**
* Do the work required to read from a readable client. If the frame is
* fully read, then invoke the method call.
* 從可以讀取數(shù)據(jù)的客戶端讀取數(shù)據(jù)憋活,如果數(shù)據(jù)塊讀滿了岂津,觸發(fā)相應(yīng)的方法回調(diào)
*/
protected void handleRead(SelectionKey key) {
// 獲取附加在SelectionKey上的FrameBuffer
FrameBuffer buffer = (FrameBuffer) key.attachment();
// 讀取客戶端發(fā)過(guò)來(lái)的數(shù)據(jù),如果讀取失敗悦即,清除SelectionKey
if (!buffer.read()) {
cleanupSelectionKey(key);
return;
}
// if the buffer's frame read is complete, invoke the method.
// 如果讀取客戶端數(shù)據(jù)成功吮成,requestInvoke觸發(fā)相應(yīng)事件,
// 也就是根據(jù)客戶端請(qǐng)求辜梳,調(diào)用服務(wù)端相應(yīng)方法
if (buffer.isFrameFullyRead()) {
if (!requestInvoke(buffer)) {
cleanupSelectionKey(key);
}
}
}
/**
* Let a writable client get written, if there's data to be written.
* 讓可被寫的客戶端寫數(shù)據(jù)
*/
protected void handleWrite(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer) key.attachment();
if (!buffer.write()) {
cleanupSelectionKey(key);
}
}
下面開始正式講解TNonblockingServer的源碼
大家可以看出TNonblockingServer的源碼除了老生常談的Args類粱甫,多了個(gè)最重要的SelectAcceptThread類參數(shù)selectAcceptThread_,看名字就知道這個(gè)類肯定繼承了Thread類作瞄,被當(dāng)成一個(gè)線程了茶宵,SelectAcceptThread里面絕對(duì)封裝了這個(gè)線程要處理的邏輯。
public static class Args extends AbstractNonblockingServerArgs<Args> {
public Args(TNonblockingServerTransport transport) {
super(transport);
}
}
private SelectAcceptThread selectAcceptThread_;
public TNonblockingServer(AbstractNonblockingServerArgs args) {
super(args);
}
源碼往下看會(huì)看到我們熟悉的startThreads()方法
/**
* Start the selector thread to deal with accepts and client messages.
* 通過(guò)這個(gè)方法開啟線程去處理客戶端請(qǐng)求宗挥,這個(gè)發(fā)方法是重寫的乌庶,將
* AbstractNonblockingServer抽象類里的方法重寫了
* @return true if everything went ok, false if we couldn't start for some
* reason.
*/
@Override
protected boolean startThreads() {
// start the selector
try {
// 將selectAcceptThread_實(shí)例化
selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport)serverTransport_);
// 開啟線程
selectAcceptThread_.start();
return true;
} catch (IOException e) {
LOGGER.error("Failed to start selector thread!", e);
return false;
}
}
SelectAcceptThread類也在TNonblockingServer類里面,我們看下SelectAcceptThread的類圖和源碼
SelectAcceptThread就是一個(gè)線程契耿,想要的處理邏輯也封裝在里面瞒大,
SelectAcceptThread線程中,使用Selector(選擇器), 可以使用一個(gè)線程處理多個(gè)客戶端連接搪桂。Selector 能夠檢測(cè)多個(gè)注冊(cè)的通道上是否有事件發(fā)生(多個(gè)Channel以事件的方式可以注冊(cè)到同一個(gè)Selector), 如果有事件發(fā)生, 便獲取事件然后針對(duì)每個(gè)事件進(jìn)行相應(yīng)的處理透敌。這樣就可以只用一個(gè)單線程去管理多個(gè)通道, 也就是管理多個(gè)連接和請(qǐng)求。
- 這些都是NIO的知識(shí)踢械,如果不太了解酗电,可以先去學(xué)習(xí)下NIO。
/**
* The thread that will be doing all the selecting, managing new connections
* and those that still need to be read.
*/
protected class SelectAcceptThread extends AbstractSelectThread {
// The server transport on which new client transports will be accepted
// 服務(wù)端的 serverTransport 裸燎,后面會(huì)把它注冊(cè)到selector
private final TNonblockingServerTransport serverTransport;
/**
* Set up the thread that will handle the non-blocking accepts, reads, and
* writes.
*/
public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
throws IOException {
this.serverTransport = serverTransport;
// 將serverTransport 注冊(cè)到了selector顾瞻,selector是在AbstractSelectThread里聲明的
serverTransport.registerSelector(selector);
}
public boolean isStopped() {
return stopped_;
}
/**
* The work loop. Handles both selecting (all IO operations) and managing
* the selection preferences of all existing connections.
*/
public void run() {
try {
if (eventHandler_ != null) {
eventHandler_.preServe();
}
while (!stopped_) {
// 從這個(gè)地方開始進(jìn)行select()
select();
processInterestChanges();
}
// 服務(wù)端停止以后,回收selector里面的key
for (SelectionKey selectionKey : selector.keys()) {
cleanupSelectionKey(selectionKey);
}
} catch (Throwable t) {
LOGGER.error("run() exiting due to uncaught error", t);
} finally {
try {
selector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing selector!", e);
}
stopped_ = true;
}
}
}
select()方法里面主要是通過(guò)seletor循環(huán)監(jiān)控所有的socket德绿,每次selector結(jié)束時(shí)荷荤,獲取所有的處于就緒狀態(tài)的socket退渗,然后通過(guò)循環(huán)對(duì)這些處于就緒狀態(tài)的socket進(jìn)行處理
/**
* Select and process IO events appropriately:
* If there are connections to be accepted, accept them.
* If there are existing connections with data waiting to be read, read it,
* buffering until a whole frame has been read.
* If there are any pending responses, buffer them until their target client
* is available, and then send the data.
*/
private void select() {
try {
// wait for io events.
/**
* 線程阻塞在這里,seletor循環(huán)監(jiān)控所有的IO事件蕴纳,
* 每次selector結(jié)束時(shí)会油,獲取所有的處于就緒狀態(tài)的IO事件。
*/
selector.select();
// process the io events we received
/**
* 獲取所有的處于就緒狀態(tài)的io事件古毛,通過(guò)遍歷這些io事件來(lái)進(jìn)行相應(yīng)的處理翻翩,
* 每個(gè)客戶端第一次連接后的socket,注冊(cè)到selector后稻薇,都有一個(gè)SelectionKey與之對(duì)應(yīng)
* 同理 服務(wù)端也注冊(cè)到selector了嫂冻,也有SelectionKey與之對(duì)應(yīng)
*/
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
// if the key is marked Accept, then it has to be the server transport.
if (key.isAcceptable()) {
// 服務(wù)端的transport接收到了客服端發(fā)來(lái)創(chuàng)建連接的請(qǐng)求,所以accept()
handleAccept();
} else if (key.isReadable()) {
// deal with reads
// 方法詳細(xì)實(shí)現(xiàn)在AbstractSelectThread類里面
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
// 方法詳細(xì)實(shí)現(xiàn)在AbstractSelectThread類里面
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans,
final SelectionKey selectionKey,
final AbstractSelectThread selectThread) {
return processorFactory_.isAsyncProcessor() ?
new AsyncFrameBuffer(trans, selectionKey, selectThread) :
new FrameBuffer(trans, selectionKey, selectThread);
}
/**
* Accept a new connection.
*/
private void handleAccept() throws IOException {
SelectionKey clientKey = null;
TNonblockingTransport client = null;
try {
// accept the connection
// 接受客戶端過(guò)來(lái)的連接請(qǐng)求
client = (TNonblockingTransport)serverTransport.accept();
// 將客戶端注冊(cè)到Selector里塞椎,以后就可以處理新加入客戶端的請(qǐng)求了
clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
// 實(shí)例化一個(gè)FrameBuffer桨仿,將實(shí)例化的FrameBuffer
FrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);
// 將實(shí)例化的FrameBuffer添加到新生成的客戶端的SelectionKey
// 因?yàn)樵趆andleRead()和handleWrite()方法里面都是在這個(gè)FrameBuffer里面獲取數(shù)據(jù),
// 調(diào)用服務(wù)端的方法案狠,進(jìn)行相應(yīng)的業(yè)務(wù)處理后服傍,再發(fā)送給客戶端
clientKey.attach(frameBuffer);
} catch (TTransportException tte) {
// something went wrong accepting.
LOGGER.warn("Exception trying to accept!", tte);
if (clientKey != null) cleanupSelectionKey(clientKey);
if (client != null) client.close();
}
}
} // SelectAcceptThread