Thrift二進制序列化TCompactProtocol與TBinaryProtocol的原理和區(qū)別
thrift 分為四個模塊
TransPort
將Socket包裝成各種TransPort使用。
TIOStreamTransport和TSocket這兩個類的結(jié)構(gòu)對應著阻塞同步IO, TSocket封裝了Socket接口
TNonblockingTrasnsort尿贫,TNonblockingSocket這兩個類對應著非阻塞IO
TMemoryInputTransport封裝了一個字節(jié)數(shù)組byte[]來做輸入流的封裝
TMemoryBuffer使用字節(jié)數(shù)組輸出流ByteArrayOutputStream做輸出流的封裝
TFramedTransport則封裝了TMemoryInputTransport做輸入流寇荧,封裝了TByteArryOutPutStream做輸出流擅羞,作為內(nèi)存讀寫緩沖區(qū)的一個封裝腻扇。TFramedTransport的flush方法時糠悼,會先寫4個字節(jié)的輸出流的長度作為消息頭藐不,然后寫消息體滋恬。和FrameBuffer的讀消息對應起來聊训。FrameBuffer對消息時,先讀4個字節(jié)的長度恢氯,再讀消息體
TFastFramedTransport是內(nèi)存利用率更高的一個內(nèi)存讀寫緩存區(qū)带斑,它使用自動增長的byte,而不是每次都new一個byte[]勋拟,提高了內(nèi)存的使用率勋磕。其他和TFramedTransport一樣,flush時也會寫4個字節(jié)的消息頭表示消息長度敢靡。
Protocol
協(xié)議和編解碼是一個網(wǎng)絡(luò)應用程序的核心問題之一挂滓,客戶端和服務(wù)器通過約定的協(xié)議來傳輸消息(數(shù)據(jù)),通過特定的格式來編解碼字節(jié)流啸胧,并轉(zhuǎn)化成業(yè)務(wù)消息,提供給上層框架調(diào)用纺念。
Thrift的協(xié)議比較簡單贝椿,它把協(xié)議和編解碼整合在了一起。抽象類TProtocol定義了協(xié)議和編解碼的頂層接口陷谱。
抽象類 TProtocol,構(gòu)造方法需要一個Transport類型的對象
方法分為兩大類烙博,write方法和read方法
方法名的結(jié)構(gòu):
(write/read) + (Struct,Message,File,Map,List,Set) + (Begin/End)
(write/read) + (Bool,I16,I32,I64,Double,String,Binary)
Processor
基類TProcessor,只有一個process(TProtocol in, TProtocol out)
方法
TBaseProcessor繼承自TProcessor:
private final I iface;
private final Map<String,ProcessFunction<I, ? extends TBase>> processMap;
實現(xiàn)process方法,去map找到對應的方法渣窜,并執(zhí)行:
public boolean process(TProtocol in, TProtocol out) throws TException {
TMessage msg = in.readMessageBegin();
ProcessFunction fn = processMap.get(msg.name);
fn.process(msg.seqid, in, out, iface);
return true;
}
TProcessorFactory有兩個方法getProcessor(TTransport trans)
和isAsyncProcessor()
兩個方法铺根。
Server
共有以下幾種類型:
server的工作可以分為:監(jiān)聽socket鏈接、數(shù)據(jù)的讀寫和業(yè)務(wù)邏輯的處理三部分图毕。
- TSimpleServer
TSimpleServer的工作模式只有一個工作線程夷都,循環(huán)監(jiān)聽新請求的到來并完成對請求的處理,它只是在簡單的演示時候使用
- TNonBlockingServer
TNonblockingServer工作模式予颤,該模式也是單線程工作,但是該模式采用NIO的方式冬阳,所有的socket都被注冊到selector中蛤虐,在一個線程中通過seletor循環(huán)監(jiān)控所有的socket,每次selector結(jié)束時肝陪,處理所有的處于就緒狀態(tài)的socket驳庭,對于有數(shù)據(jù)到來的socket進行數(shù)據(jù)讀取操作,對于有數(shù)據(jù)發(fā)送的socket則進行數(shù)據(jù)發(fā)送氯窍,對于監(jiān)聽socket則產(chǎn)生一個新業(yè)務(wù)socket并將其注冊到selector中饲常。
- THsHaServer
THsHaServer類是TNonblockingServer類的子類,在5.2節(jié)中的TNonblockingServer模式中狼讨,采用一個線程來完成對所有socket的監(jiān)聽和業(yè)務(wù)處理贝淤,造成了效率的低下,THsHaServer模式的引入則是部分解決了這些問題政供。THsHaServer模式中播聪,引入一個線程池來專門進行業(yè)務(wù)處理。但監(jiān)聽和讀寫數(shù)據(jù)還是主線程來做布隔,因此可能有性能問題离陶。
- TThreadPoolServer
線程池模式中,數(shù)據(jù)讀取和業(yè)務(wù)處理都交由線程池完成衅檀,主線程只負責監(jiān)聽新連接招刨,因此在并發(fā)量較大時新連接也能夠被及時接受。線程池模式比較適合服務(wù)器端能預知最多有多少個客戶端并發(fā)的情況哀军,這時每個請求都能被業(yè)務(wù)線程池及時處理沉眶,性能也非常高。
線程池模式的處理能力受限于線程池的工作能力排苍,當并發(fā)請求數(shù)大于線程池中的線程數(shù)時沦寂,新請求也只能排隊等待。
- TThreadedSelectorServer
(1)一個AcceptThread線程對象淘衙,專門用于處理監(jiān)聽socket上的新連接传藏;
(2) 若干個SelectorThread對象專門用于處理業(yè)務(wù)socket的網(wǎng)絡(luò)I/O操作,所有網(wǎng)絡(luò)數(shù)據(jù)的讀寫均是有這些線程來完成;
(3) 一個負載均衡器SelectorThreadLoadBalancer對象毯侦,主要用于AcceptThread線程接收到一個新socket連接請求時哭靖,決定將這個新連接請求分配給哪個SelectorThread線程。
(4) 一個ExecutorService類型的工作線程池侈离,在SelectorThread線程中试幽,監(jiān)聽到有業(yè)務(wù)socket中有調(diào)用請求過來,則將請求讀取之后卦碾,交個ExecutorService線程池中的線程完成此次調(diào)用的具體執(zhí)行铺坞;
FrameBuffer
FrameBuffer是ThriftNIO服務(wù)器端的一個核心組件,它一方面承擔了NIO編程中的緩沖區(qū)的功能洲胖,另一方面還承擔了RPC方法調(diào)用的職責济榨。
方法調(diào)用
(1)自動生成的Iface接口,是遠程方法的頂層接口
(2)自動生成的Processor類及相關(guān)父類绿映,包括TProcessor接口擒滑,TBaseProcess抽象類
(3)ProcessFunction抽象類,抽象了一個具體的方法調(diào)用叉弦,包含了方法名信息丐一,調(diào)用方法的抽象過程等
(4)TNonblcokingServer,是NIO服務(wù)器的默認實現(xiàn)淹冰,通過Args參數(shù)來配置Processor等信息
(5)FrameBuffer類库车,服務(wù)器NIO的緩沖區(qū)對象,這個對象在服務(wù)器端收到全包并解碼后榄棵,會調(diào)用Processor去完成實際的方法調(diào)用
服務(wù)器端的方法的具體實現(xiàn)類凝颇,實現(xiàn)Iface接口
- 通過IDL來定義接口: DemoService.thrift
namespace java com.thrift.test
service DemoService{
string sayHi(1:string name);
}
根據(jù)IDL自動生成代碼
thrift -r --gen java DemoService.thrift先生成一個IFace和IAsynceFace。
public interface Iface {
public String sayHi(String name) throws org.apache.thrift.TException;
}
生成args對象
生成一個client對象疹鳄,包括發(fā)送請求和接收結(jié)果的方法拧略。
public String sayHi(String name) throws org.apache.thrift.TException{
send_sayHi(name);
return recv_sayHi();
}
public void send_sayHi(String name) throws org.apache.thrift.TException{
sayHi_args args = new sayHi_args();
args.setName(name);
sendBase("sayHi", args);
}
public String recv_sayHi() throws org.apache.thrift.TException
{
sayHi_result result = new sayHi_result();
receiveBase(result, "sayHi");
if (result.isSetSuccess()) {
return result.success;
}
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "sayHi failed: unknown result");
}
- 生成processor對象
protected sayHi_args getEmptyArgsInstance() {
return new sayHi_args();
}
protected sayHi_result getResult(I iface, sayHi_args args) throws org.apache.thrift.TException {
sayHi_result result = new sayHi_result();
result.success = iface.sayHi(args.name);
return result;
}
- 實現(xiàn)IFace接口
package com.thrift.test;
import org.apache.thrift.TException;
public class DemoServiceImpl implements DemoService.Iface{
@Override
public String sayHi(String name) throws TException {
return "Hi " + name + ", from Thrift Server";
}
}
- Client
public class Client {
public static void main(String[] args) throws Exception{
TSocket socket = new TSocket("127.0.0.1", 9090);
socket.setTimeout(3000);
TTransport transport = new TFramedTransport(socket);
TProtocol protocol = new TCompactProtocol(transport);
transport.open();
System.out.println("Connected to Thrfit Server");
DemoService.Client client = new DemoService.Client.Factory()
.getClient(protocol);
String result = client.sayHi("ITer_ZC");
System.out.println(result);
}
}
- server
public class Server {
public static void main(String[] args){
TNonblockingServerSocket socket;
try {
socket = new TNonblockingServerSocket(9090);
TNonblockingServer.Args options = new TNonblockingServer.Args(socket);
TProcessor processor = new DemoService.Processor<Iface>(new DemoServiceImpl());
options.processor(processor);
options.protocolFactory(new TCompactProtocol.Factory());
TServer server = new TNonblockingServer(options);
System.out.println("Thrift Server is running at 9090 port");
server.serve();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}