本篇分兩塊來(lái)介紹thrift協(xié)議投放。
thrift定義文件:
struct MiRequest {
2: required string name;
3: optional i32 age;
}
exception MiRequestException {
1: required i32 code;
2: optional string reason;
}
service MiTestService {
string miTestMethod(1: MiRequest request) throws (1:MiRequestException qe);
string miTestOtherMethod(1: MiRequest request, 2: string pTwo) throws (1:MiRequestException qe);
}
執(zhí)行thrift --gen java:beans Test.thrift
生成MiTestService.java,本篇后文的通信協(xié)議介紹都是基于MiTestService.java适贸,類結(jié)構(gòu)如下:
簡(jiǎn)介:
MiTestService.AsyncClient:異步調(diào)用-調(diào)用方協(xié)議處理類
MiTestService.AsyncIface:異步調(diào)用-接口定義
MiTestService.Client:同步調(diào)用-調(diào)用方協(xié)議處理類
MiTestService.Iface:同步調(diào)用-接口定義
MiTestService.miTestMethod_args:方法miTestMethod的入?yún)?br>
MiTestService.miTestMethod_result:方法miTestMethod的返回值
MiTestService.miTestOtherMethod_args:方法miTestOtherMethod的入?yún)?br>
MiTestService.miTestOtherMethod_result:方法miTestOtherMethod的返回值
MiTestService.Processor:服務(wù)方協(xié)議處理器
以同步遠(yuǎn)程調(diào)用miTestOtherMethod方法為例:
一灸芳、調(diào)用方--方法調(diào)用和消息發(fā)送
MiTestService.Client構(gòu)造方法
public Client(TProtocol prot)
{
this(prot, prot);
}
public Client(TProtocol iprot, TProtocol oprot)
{
iprot_ = iprot;
oprot_ = oprot;
}
TProtocol為底層通信處理類。
備注:
MiTestService.Client協(xié)議處理類:負(fù)責(zé)通信協(xié)議消息體的定義拜姿,消息體的發(fā)送和二進(jìn)制流的解析
TProtocol通信處理類:負(fù)責(zé)處理二進(jìn)制字節(jié)流的發(fā)送和接收
調(diào)用方發(fā)起方法調(diào)用:MiTestService.Client.miTestOtherMethod
public String miTestOtherMethod(MiRequest request, String pTwo) throws MiRequestException, TException
{
send_miTestOtherMethod(request, pTwo);
return recv_miTestOtherMethod();
}
public void send_miTestOtherMethod(MiRequest request, String pTwo) throws TException
{
oprot_.writeMessageBegin(new TMessage("miTestOtherMethod", TMessageType.CALL, ++seqid_));
miTestOtherMethod_args args = new miTestOtherMethod_args();
args.setRequest(request);
args.setPTwo(pTwo);
args.write(oprot_);
oprot_.writeMessageEnd();
oprot_.getTransport().flush();
}
重點(diǎn)說(shuō)明:
TProtocol.writeMessageBegin發(fā)送TMessage指定本次調(diào)用的是哪個(gè)方法
miTestOtherMethod_args.write發(fā)送本次方法調(diào)用的入?yún)?/p>
兩塊的詳細(xì)源碼和說(shuō)明如下:
public void writeMessageBegin(TMessage message) throws TException {
if (strictWrite_) {
int version = VERSION_1 | message.type;
writeI32(version);
writeString(message.name);
writeI32(message.seqid);
} else {
writeString(message.name);
writeByte(message.type);
writeI32(message.seqid);
}
}
public void write(TProtocol oprot) throws TException {
validate();
//空實(shí)現(xiàn)
oprot.writeStructBegin(STRUCT_DESC);
//發(fā)送入?yún)ⅲ簭?fù)合對(duì)象request
if (this.request != null) {
//REQUEST_FIELD_DESC = new TField("request", TType.STRUCT, (short)1);
//寫入TField.type烙样、TField.id
oprot.writeFieldBegin(REQUEST_FIELD_DESC);
//寫入具體內(nèi)容
this.request.write(oprot);
//空實(shí)現(xiàn)
oprot.writeFieldEnd();
}
//發(fā)送入?yún)ⅲ夯A(chǔ)類型string
if (this.pTwo != null) {
//P_TWO_FIELD_DESC = new TField("pTwo", TType.STRING, (short)2);
//寫入TField.type、TField.id
oprot.writeFieldBegin(P_TWO_FIELD_DESC);
//寫入具體內(nèi)容
oprot.writeString(this.pTwo);
//空實(shí)現(xiàn)
oprot.writeFieldEnd();
}
//寫入一個(gè)字節(jié):TType.STOP(整型1)
oprot.writeFieldStop();
//空實(shí)現(xiàn)
oprot.writeStructEnd();
}
遠(yuǎn)程調(diào)用請(qǐng)求信息發(fā)送完畢蕊肥。
二谒获、服務(wù)方--消息接收和解析
MiTestService.Processor構(gòu)造方法:傳入MiTestService.Iface的具體實(shí)現(xiàn)類
public Processor(Iface iface)
{
iface_ = iface;
//方法名-方法處理類映射,每個(gè)方法生成了一個(gè)具體的處理類
//可以看出方法名稱必須唯一壁却,不支持方法重載
//方法處理類實(shí)現(xiàn)了MiTestService.Processor.ProcessFunction接口
processMap_.put("miTestMethod", new miTestMethod());
processMap_.put("miTestOtherMethod", new miTestOtherMethod());
}
protected static interface ProcessFunction {
public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException;
}
//private class miTestMethod implements ProcessFunction
//private class miTestOtherMethod implements ProcessFunction
服務(wù)方調(diào)用TServer.serve開啟端口監(jiān)聽批狱,
接收到消息后將調(diào)用MiTestService.Processor.process方法
public boolean process(TProtocol iprot, TProtocol oprot) throws TException
{
//解析出TMessage消息,主要包含請(qǐng)求調(diào)用的方法名稱
TMessage msg = iprot.readMessageBegin();
//
ProcessFunction fn = processMap_.get(msg.name);
fn.process(msg.seqid, iprot, oprot);
return true;
}
以miTestOtherMethod處理流程為例展东,主要代碼如下:
public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException
{
//解析參數(shù)
miTestOtherMethod_args args = new miTestOtherMethod_args();
//此處劃重點(diǎn):涉及第三部分可擴(kuò)展性的講解
args.read(iprot);
//空實(shí)現(xiàn)
iprot.readMessageEnd();
//結(jié)果對(duì)象
miTestOtherMethod_result result = new miTestOtherMethod_result();
//發(fā)起方法調(diào)用
result.success = iface_.miTestOtherMethod(args.request, args.pTwo);
//調(diào)用結(jié)果寫回
oprot.writeMessageBegin(new TMessage("miTestOtherMethod", TMessageType.REPLY, seqid));
result.write(oprot);
//空實(shí)現(xiàn)
oprot.writeMessageEnd();
oprot.getTransport().flush();
}
三赔硫、thrift協(xié)議--字節(jié)碼增強(qiáng)、skywalking可擴(kuò)展消息頭
第三部分將根據(jù)miTestOtherMethod_args.read方法的具體實(shí)現(xiàn)盐肃,
來(lái)解讀thrift協(xié)議的可擴(kuò)展消息頭的實(shí)現(xiàn)原理
public void read(TProtocol iprot) throws TException {
TField field;
//非讀--空實(shí)現(xiàn)
iprot.readStructBegin();
while (true)
{
//讀取Field.type爪膊、Field.id
field = iprot.readFieldBegin();
if (field.type == TType.STOP) {
break;
}
//根據(jù)Field.id匹配關(guān)系,解析出每一個(gè)方法入?yún)? switch (field.id) {
case 1: // REQUEST
if (field.type == TType.STRUCT) {
this.request = new MiRequest();
this.request.read(iprot);
} else {
TProtocolUtil.skip(iprot, field.type);
}
break;
case 2: // P_TWO
if (field.type == TType.STRING) {
this.pTwo = iprot.readString();
} else {
TProtocolUtil.skip(iprot, field.type);
}
break;
//匹配不上的消息則跳過(guò)
default:
TProtocolUtil.skip(iprot, field.type);
}
iprot.readFieldEnd();
}
//空實(shí)現(xiàn)
iprot.readStructEnd();
validate();
}
由上面的服務(wù)方方法參數(shù)解析實(shí)現(xiàn)可以得出:
1砸王、每一個(gè)方法的方法入?yún)⒍紘?yán)格按照定義的Filed.id來(lái)匹配
2惊完、匹配失敗則走到default邏輯,跳過(guò)匹配異常的消息體
由此可以實(shí)現(xiàn):
字節(jié)碼增強(qiáng)TBinaryProtocol.writeMessageBegin处硬,在此方法執(zhí)行后延(即寫入TMessage后)寫入自定義Field.id的消息體小槐;
字節(jié)碼增強(qiáng)TBinaryProtocol.readMessageBegin 或者 MiTestService.Processor.process,在字節(jié)流解析出TMessage后解析自定義Field.id的消息體荷辕。
自定義消息體可以為任何自定義類型的thrift復(fù)合類型或基礎(chǔ)類型凿跳,傳入skywalking上下文信息。