Thrift提供了多種服務器實現(xiàn)羡玛。
它們各有特點稼稿,適應不同的需求環(huán)境。下面我將結(jié)合網(wǎng)絡資源和自己的理解來梳理一下Thrift各種java server的特點和使用姿勢渺杉。
-
TSimpleServer
while (!stopped_) {
TTransport client = null;
TProcessor processor = null;
TTransport inputTransport = null;
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
ServerContext connectionContext = null;
try {
client = serverTransport_.accept();
if (client != null) {
processor = processorFactory_.getProcessor(client);
inputTransport = inputTransportFactory_.getTransport(client);
outputTransport = outputTransportFactory_.getTransport(client);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
if (eventHandler_ != null) {
connectionContext = eventHandler_.createContext(inputProtocol, outputProtocol);
}
while (true) {
if (eventHandler_ != null) {
eventHandler_.processContext(connectionContext, inputTransport, outputTransport);
}
if(!processor.process(inputProtocol, outputProtocol)) {
break;
}
}
}
}
}
TSimplerServer在while循環(huán)中每次接受一個連接是越,處理連接請求,直到客戶端關閉了連接浦徊,它才會去接受一個新的連接。由于它只在一個單獨的線程中以阻塞I/O的方式完成這些工作霞丧,所以它只能服務一個客戶端連接冕香,其他所有客戶端在被服務器端接受之前都只能等待。其使用方法如下:
public static void sample() throws Exception {
ServerSocket socket = new ServerSocket(8912);
TServerSocket serverTransport = new TServerSocket(socket); // HelloServiceImpl 為自己實現(xiàn)的Thrift服務接口的具體實現(xiàn)
TServer server = new TSimpleServer(new TServer.Args(serverTransport).processor(processor));
System.out.println("Server start...");
server.serve();
}
-
TNonblockingServer
public void run() {
try {
if (eventHandler_ != null) {
eventHandler_.preServe();
}
while (!stopped_) {
select();
processInterestChanges();
}
for (SelectionKey selectionKey : selector.keys()) {
cleanupSelectionKey(selectionKey);
}
} catch (Throwable t) {
LOGGER.error("run() exiting due to uncaught error", t);
} finally {
try {
selector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing selector!", e);
}
stopped_ = true;
}
}
private void select() {
try {
// wait for io events.
selector.select();
// process the io events we received
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
// if the key is marked Accept, then it has to be the server
// transport.
if (key.isAcceptable()) {
handleAccept();
} else if (key.isReadable()) {
// deal with reads
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
TNonblockingServer 使用非阻塞的 I/O 解決了TSimpleServer一個客戶端阻塞其他所有客戶端的問題。它使用了java.nio.channels.Selector愕难,通過調(diào)用select()惫霸,它使得你阻塞在多個連接上,而不是阻塞在單一的連接上猜丹。當一或多個連接準備好被接受/讀/寫時茫打,select()調(diào)用便會返回。TNonblockingServer處理這些連接的時候轮洋,要么接受它抬旺,要么從它那讀數(shù)據(jù),要么把數(shù)據(jù)寫到它那里开财,然后再次調(diào)用select()來等待下一個可用的連接责鳍。通用這種方式,server可同時服務多個客戶端历葛,而不會出現(xiàn)一個客戶端把其他客戶端全部“餓死”的情況。
使用方法:
public static void nonBlock() throws TTransportException {
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(8912);
Hello.Processor processor = new Hello.Processor(new HelloServiceImpl());
TNonblockingServer.Args args = new TNonblockingServer.Args(serverSocket);
args.transportFactory(new TFramedTransport.Factory());
args.protocolFactory(new TCompactProtocol.Factory());
args.processor(processor);
TNonblockingServer server = new TNonblockingServer(args);
server.serve();
}
-
ThreadedSelectorServer
ThreadedSelectorServer允許你用多個線程來處理網(wǎng)絡 I/O乓诽。它維護了兩個線程池鸠天,一個用來處理網(wǎng)絡 I/O,另一個用來進行請求的處理稠集。使用方法:
public static void threadSelector() throws TTransportException, IOException {
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(8912);
Hello.Processor processor = new Hello.Processor(new HelloServiceImpl());
TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(serverSocket);
args.transportFactory(new TFramedTransport.Factory());
args.protocolFactory(new TBinaryProtocol.Factory());
args.selectorThreads(10);
args.acceptQueueSizePerThread(10);
args.processor(processor);
TThreadedSelectorServer server = new TThreadedSelectorServer(args);
server.serve();
}
-
TThreadPoolServer
TThreadPoolServer有一個專用的線程用來接受連接旦接受了一個連接巍杈,它就會被放入ThreadPoolExecutor中的一個 worker 線程里處理。worker 線程被綁定到特定的客戶端連接上筷畦,直到它關閉刺洒。一旦連接關閉逆航,該worker線程就又回到了線程池中。你可以配置線程池的最小因俐、最大線程數(shù),默認值分別是5(最谐盘)和Integer.MAX_VALUE(最大)澳眷。使用方法:
public static void threadPool() throws Exception {
TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(8912);
Hello.Processor processor = new Hello.Processor(new HelloServiceImpl());
TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket);
args.transportFactory(new TFramedTransport.Factory());
args.protocolFactory(new TBinaryProtocol.Factory());
args.processor(processor);
args.maxWorkerThreads(10);
TThreadPoolServer server = new TThreadPoolServer(args);
server.serve();
}