RPC 框架原理
RPC 框架的目標(biāo)就是讓遠(yuǎn)程服務(wù)調(diào)用更加簡單、透明拘荡,RPC 框架負(fù)責(zé)屏蔽底層的傳輸方式(TCP 或者 UDP)臼节、序列化方式(XML/Json/ 二進(jìn)制)和通信細(xì)節(jié)。服務(wù)調(diào)用者可以像調(diào)用本地接口一樣調(diào)用遠(yuǎn)程的服務(wù)提供者,而不需要關(guān)心底層通信細(xì)節(jié)和調(diào)用過程网缝。
業(yè)界主流的 RPC 框架整體上分為三類:
- 支持多語言的 RPC 框架巨税,比較成熟的有 Google 的 gRPC、Apache(Facebook)的 Thrift粉臊;
- 只支持特定語言的 RPC 框架草添,例如新浪微博的 Motan;
- 支持服務(wù)治理等服務(wù)化特性的分布式服務(wù)框架扼仲,其底層內(nèi)核仍然是 RPC 框架, 例如阿里的 Dubbo远寸。
gRPC 簡介
gRPC 是一個(gè)高性能、開源和通用的 RPC 框架屠凶,面向服務(wù)端和移動(dòng)端驰后,基于 HTTP/2 設(shè)計(jì)。
gRPC 特點(diǎn)
- 語言中立阅畴,支持多種語言倡怎;
- 基于 IDL 文件定義服務(wù)贱枣,通過 proto3 工具生成指定語言的數(shù)據(jù)結(jié)構(gòu)、服務(wù)端接口以及客戶端 Stub;
- 通信協(xié)議基于標(biāo)準(zhǔn)的 HTTP/2 設(shè)計(jì),支持雙向流谁鳍、消息頭壓縮郊楣、單 TCP 的多路復(fù)用点把、服務(wù)端推送等特性橘荠,這些特性使得 gRPC 在移動(dòng)端設(shè)備上更加省電和節(jié)省網(wǎng)絡(luò)流量;
- 序列化支持 PB(Protocol Buffer)和 JSON郎逃,PB 是一種語言無關(guān)的高性能序列化框架哥童,基于 HTTP/2 + PB, 保障了 RPC 調(diào)用的高性能。
服務(wù)端創(chuàng)建流程
gRPC 服務(wù)端創(chuàng)建采用 Build 模式褒翰,對(duì)底層服務(wù)綁定贮懈、transportServer 和 NettyServer 的創(chuàng)建和實(shí)例化做了封裝和屏蔽,讓服務(wù)調(diào)用者不用關(guān)心 RPC 調(diào)用細(xì)節(jié)优训,整體上分為三個(gè)過程:
- 創(chuàng)建 Netty HTTP/2 服務(wù)端朵你;
- 將需要調(diào)用的服務(wù)端接口實(shí)現(xiàn)類注冊(cè)到內(nèi)部的 Registry 中,RPC 調(diào)用時(shí)揣非,可以根據(jù) RPC 請(qǐng)求消息中的服務(wù)定義信息查詢到服務(wù)接口實(shí)現(xiàn)類抡医;
- 創(chuàng)建 gRPC Server,它是 gRPC 服務(wù)端的抽象妆兑,聚合了各種 Listener魂拦,用于 RPC 消息的統(tǒng)一調(diào)度和處理。
gRPC 服務(wù)端創(chuàng)建關(guān)鍵流程分析:
- NettyServer 實(shí)例創(chuàng)建:gRPC 服務(wù)端創(chuàng)建搁嗓,首先需要初始化 NettyServer芯勘,它是 gRPC 基于 Netty 4.1 HTTP/2 協(xié)議棧之上封裝的 HTTP/2 服務(wù)端。NettyServer 實(shí)例由 NettyServerBuilder 的 buildTransportServer 方法構(gòu)建腺逛,NettyServer 構(gòu)建完成之后荷愕,監(jiān)聽指定的 Socket 地址,即可實(shí)現(xiàn)基于 HTTP/2 協(xié)議的請(qǐng)求消息接入。
- 綁定 IDL 定義的服務(wù)接口實(shí)現(xiàn)類:gRPC 與其它一些 RPC 框架的差異點(diǎn)是服務(wù)接口實(shí)現(xiàn)類的調(diào)用并不是通過動(dòng)態(tài)代理和反射機(jī)制安疗,而是通過 proto 工具生成代碼抛杨,在服務(wù)端啟動(dòng)時(shí),將服務(wù)接口實(shí)現(xiàn)類實(shí)例注冊(cè)到 gRPC 內(nèi)部的服務(wù)注冊(cè)中心上荐类。請(qǐng)求消息接入之后怖现,可以根據(jù)服務(wù)名和方法名,直接調(diào)用啟動(dòng)時(shí)注冊(cè)的服務(wù)實(shí)例玉罐,而不需要通過反射的方式進(jìn)行調(diào)用屈嗤,性能更優(yōu)。
- gRPC 服務(wù)實(shí)例(ServerImpl)構(gòu)建:ServerImpl 負(fù)責(zé)整個(gè) gRPC 服務(wù)端消息的調(diào)度和處理吊输,創(chuàng)建 ServerImpl 實(shí)例過程中饶号,會(huì)對(duì)服務(wù)端依賴的對(duì)象進(jìn)行初始化,例如 Netty 的線程池資源季蚂、gRPC 的線程池茫船、內(nèi)部的服務(wù)注冊(cè)類(InternalHandlerRegistry)等,ServerImpl 初始化完成之后扭屁,就可以調(diào)用 NettyServer 的 start 方法啟動(dòng) HTTP/2 服務(wù)端算谈,接收 gRPC 客戶端的服務(wù)調(diào)用請(qǐng)求
服務(wù)端 service 調(diào)用流程
gRPC 的客戶端請(qǐng)求消息由 Netty Http2ConnectionHandler 接入,由 gRPC 負(fù)責(zé)將 PB 消息(或者 JSON)反序列化為 POJO 對(duì)象疯搅,然后通過服務(wù)定義查詢到該消息對(duì)應(yīng)的接口實(shí)例濒生,發(fā)起本地 Java 接口調(diào)用,調(diào)用完成之后幔欧,將響應(yīng)消息反序列化為 PB(或者 JSON),通過 HTTP2 Frame 發(fā)送給客戶端丽声。
整個(gè) service 調(diào)用可以劃分為如下四個(gè)過程:
- gRPC 請(qǐng)求消息接入礁蔗;
- gRPC 消息頭和消息體處理;
- 內(nèi)部的服務(wù)路由和調(diào)用雁社;
- 響應(yīng)消息發(fā)送浴井。
gRPC 請(qǐng)求消息接入
gRPC 的請(qǐng)求消息由 Netty HTTP/2 協(xié)議棧接入,通過 gRPC 注冊(cè)的 Http2FrameListener霉撵,將解碼成功之后的 HTTP Header 和 HTTP Body 發(fā)送到 gRPC 的 NettyServerHandler 中磺浙,實(shí)現(xiàn)基于 HTTP/2 的 RPC 請(qǐng)求消息接入。
gRPC 請(qǐng)求消息接入流程如下:
關(guān)鍵流程解讀如下:
- Netty 4.1 提供了 HTTP/2 底層協(xié)議棧徒坡,通過 Http2ConnectionHandler 及其依賴的其它類庫撕氧,實(shí)現(xiàn)了 HTTP/2 消息的統(tǒng)一接入和處理。通過注冊(cè) Http2FrameListener 監(jiān)聽器喇完,可以回調(diào)接收 HTTP2 協(xié)議的消息頭伦泥、消息體、優(yōu)先級(jí)、Ping不脯、SETTINGS 等府怯。gRPC 通過 FrameListener 重載 Http2FrameListener 的 onDataRead、onHeadersRead 等方法防楷,將 Netty 的 HTTP/2 消息轉(zhuǎn)發(fā)到 gRPC 的 NettyServerHandler 中牺丙。
- Netty 的 HTTP/2 協(xié)議接入仍然是通過 ChannelHandler 的 CodeC 機(jī)制實(shí)現(xiàn),它并不影響 NIO 線程模型复局。
因此冲簿,理論上各種協(xié)議、以及同一個(gè)協(xié)議的多個(gè)服務(wù)端實(shí)例可以共用同一個(gè) NIO 線程池(NioEventLoopGroup).也可以獨(dú)占肖揣。
在實(shí)踐中獨(dú)占模式普遍會(huì)存在線程資源占用過載問題民假,很容易出現(xiàn)句柄等資源泄漏。在 gRPC 中龙优,為了避免該問題羊异,默認(rèn)采用共享池模式創(chuàng)建 NioEventLoopGroup,所有的 gRPC 服務(wù)端實(shí)例彤断,都統(tǒng)一從 SharedResourceHolder 分配 NioEventLoopGroup 資源野舶,實(shí)現(xiàn) NioEventLoopGroup 的共享。
gRPC 消息頭和消息體處理
gRPC 消息頭的處理入口是 NettyServerHandler 的 onHeadersRead()宰衙,處理流程如下所示:
- 對(duì) HTTP Header 的 Content-Type 校驗(yàn)平道,此處必須是 “application/grpc”;
- 從 HTTP Header 的 URL 中提取接口和方法名供炼,以 HelloWorldServer 為例一屋,它的 method 為:”helloworld.Greeter/SayHello”;
- 將 Netty 的 HTTP Header 轉(zhuǎn)換成 gRPC 內(nèi)部的 Metadata袋哼,Metadata 內(nèi)部維護(hù)了一個(gè)鍵值對(duì)的二維數(shù)組 namesAndValues冀墨,以及一系列的類型轉(zhuǎn)換方法:
- 創(chuàng)建 NettyServerStream 對(duì)象,它持有了 Sink 和 TransportState 類涛贯,負(fù)責(zé)將消息封裝成 GrpcFrameCommand诽嘉,與底層 Netty 進(jìn)行交互,實(shí)現(xiàn)協(xié)議消息的處理弟翘;
- 創(chuàng)建 NettyServerStream 之后虫腋,會(huì)觸發(fā) ServerTransportListener 的 streamCreated 方法,在該方法中稀余,主要完成了消息上下文和 gRPC 業(yè)務(wù)監(jiān)聽器的創(chuàng)建悦冀;
- gRPC 上下文創(chuàng)建:CancellableContext 創(chuàng)建之后,支持超時(shí)取消滚躯,如果 gRPC 客戶端請(qǐng)求消息在 Http Header 中攜帶了“grpc-timeout”雏门,系統(tǒng)在創(chuàng)建 CancellableContext 的同時(shí)會(huì)啟動(dòng)一個(gè)延時(shí)定時(shí)任務(wù)嘿歌,延時(shí)周期為超時(shí)時(shí)間,一旦該定時(shí)器成功執(zhí)行茁影,就會(huì)調(diào)用 CancellableContext.CancellationListener 的 cancel 方法宙帝,發(fā)送 CancelServerStreamCommand 指令;
- JumpToApplicationThreadServerStreamListener 的創(chuàng)建:它是 ServerImpl 的內(nèi)部類募闲,從命名上基本可以看出它的用途步脓,即從 ServerStream 跳轉(zhuǎn)到應(yīng)用線程中進(jìn)行服務(wù)調(diào)用,gRPC 服務(wù)端的接口調(diào)用主要通過 JumpToApplicationThreadServerStreamListener 的 messageRead 和 halfClosed 方法完成浩螺;
- 將 NettyServerStream 的 TransportState 緩存到 Netty 的 Http2Stream 中靴患,當(dāng)處理請(qǐng)求消息體時(shí),可以根據(jù) streamId 獲取到 Http2Stream要出,進(jìn)而根據(jù)“streamKey”還原 NettyServerStream 的 TransportState鸳君,進(jìn)行后續(xù)處理。
gRPC 消息體的處理入口是 NettyServerHandler 的 onDataRead()患蹂,處理流程如下所示:
消息體處理比較簡單或颊,下面就關(guān)鍵技術(shù)點(diǎn)進(jìn)行講解:
- 因?yàn)?Netty HTTP/2 協(xié)議 Http2FrameListener 分別提供了 onDataRead 和 onHeadersRead 回調(diào)方法,所以 gRPC NettyServerHandler 在處理完消息頭之后需要緩存上下文传于,以便后續(xù)處理消息體時(shí)使用囱挑;
- onDataRead 和 onHeadersRead 方法都是由 Netty 的 NIO 線程負(fù)責(zé)調(diào)度,但是在執(zhí)行 onDataRead 的過程中發(fā)生了線程切換沼溜,如下所示(ServerTransportListenerImpl 類):
內(nèi)部的服務(wù)路由和調(diào)用
內(nèi)部的服務(wù)路由和調(diào)用平挑,主要包括如下幾個(gè)步驟:
將請(qǐng)求消息體反序列為 Java 的 POJO 對(duì)象,即 IDL 中定義的請(qǐng)求參數(shù)對(duì)象系草;
根據(jù)請(qǐng)求消息頭中的方法名到注冊(cè)中心查詢到對(duì)應(yīng)的服務(wù)定義信息通熄;
-
通過 Java 本地接口調(diào)用方式,調(diào)用服務(wù)端啟動(dòng)時(shí)注冊(cè)的 IDL 接口實(shí)現(xiàn)類找都。
中間的交互流程比較復(fù)雜棠隐,涉及的類較多,但是關(guān)鍵步驟主要有三個(gè):
解碼:對(duì) HTTP/2 Body 進(jìn)行應(yīng)用層解碼檐嚣,轉(zhuǎn)換成服務(wù)端接口的請(qǐng)求參數(shù),解碼的關(guān)鍵就是調(diào)用 requestMarshaller.parse(input)啰扛,將 PB 碼流轉(zhuǎn)換成 Java 對(duì)象嚎京;
路由:根據(jù) URL 中的方法名從內(nèi)部服務(wù)注冊(cè)中心查詢到對(duì)應(yīng)的服務(wù)實(shí)例,路由的關(guān)鍵是調(diào)用 registry.lookupMethod(methodName) 獲取到 ServerMethodDefinition 對(duì)象隐解;
調(diào)用:調(diào)用服務(wù)端接口實(shí)現(xiàn)類的指定方法鞍帝,實(shí)現(xiàn) RPC 調(diào)用,與一些 RPC 框架不同的是煞茫,此處調(diào)用是 Java 本地接口調(diào)用帕涌,非反射調(diào)用摄凡,性能更優(yōu),它的實(shí)現(xiàn)關(guān)鍵是 UnaryRequestMethod.invoke(request, responseObserver) 方法蚓曼。
響應(yīng)消息發(fā)送
響應(yīng)消息的發(fā)送由 StreamObserver 的 onNext 觸發(fā)亲澡,流程如下所示:
響應(yīng)消息的發(fā)送原理如下:
- 分別發(fā)送 gRPC HTTP/2 響應(yīng)消息頭和消息體,由 NettyServerStream 的 Sink 將響應(yīng)消息封裝成 SendResponseHeadersCommand 和 SendGrpcFrameCommand纫版,加入到 WriteQueue 中床绪;
- WriteQueue 通過 Netty 的 NioEventLoop 線程進(jìn)行消息處理,NioEventLoop 將 SendResponseHeadersCommand 和 SendGrpcFrameCommand 寫入到 Netty 的 Channel 中其弊,進(jìn)而觸發(fā) DefaultChannelPipeline 的
write(Object msg, ChannelPromise promise) 操作癞己; - 響應(yīng)消息通過 ChannelPipeline 職責(zé)鏈進(jìn)行調(diào)度,觸發(fā) NettyServerHandler 的 sendResponseHeaders 和 sendGrpcFrame 方法梭伐,調(diào)用 Http2ConnectionEncoder 的 writeHeaders 和 writeData 方法痹雅,將響應(yīng)消息通過 Netty 的 HTTP/2 協(xié)議棧發(fā)送給客戶端。
源碼分析
主要類和功能交互流程
gRPC 請(qǐng)求消息頭處理
gRPC 請(qǐng)求消息頭處理涉及的主要類庫如下:
- NettyServerHandler:gRPC Netty Server 的 ChannelHandler 實(shí)現(xiàn)糊识,負(fù)責(zé) HTTP/2 請(qǐng)求消息和響應(yīng)消息的處理绩社;
- SerializingExecutor:應(yīng)用調(diào)用線程池,負(fù)責(zé) RPC 請(qǐng)求消息的解碼技掏、響應(yīng)消息編碼以及服務(wù)接口的調(diào)用等铃将;
- MessageDeframer:負(fù)責(zé)請(qǐng)求 Framer 的解析,主要用于處理 HTTP/2 Header 和 Body 的讀妊剖帷劲阎;
- ServerCallHandler:真正的服務(wù)接口處理類,提供 onMessage(ReqT request) 和 onHalfClose() 方法鸠真,用于服務(wù)接口的調(diào)用悯仙。
gRPC 請(qǐng)求消息體處理和服務(wù)調(diào)用
gRPC 響應(yīng)消息處理
需要說明的是,響應(yīng)消息的發(fā)送由調(diào)用服務(wù)端接口的應(yīng)用線程執(zhí)行吠卷,在本示例中锡垄,由 SerializingExecutor 進(jìn)行調(diào)用。
當(dāng)請(qǐng)求消息頭被封裝成 SendResponseHeadersCommand 并被插入到 WriteQueue 之后祭隔,后續(xù)操作由 Netty 的 NIO 線程 NioEventLoop 負(fù)責(zé)處理货岭。
應(yīng)用線程繼續(xù)發(fā)送響應(yīng)消息體,將其封裝成 SendGrpcFrameCommand 并插入到 WriteQueue 隊(duì)列中疾渴,由 Netty 的 NIO 線程 NioEventLoop 處理千贯。響應(yīng)消息的發(fā)送嚴(yán)格按照順序:即先消息頭,后消息體搞坝。
了解 gRPC 服務(wù)端消息接入和 service 調(diào)用流程之后搔谴,針對(duì)主要的流程和類庫,進(jìn)行源碼分析桩撮,以加深對(duì) gRPC 服務(wù)端工作原理的了解敦第。
Netty 服務(wù)端創(chuàng)建
基于 Netty 的 HTTP/2 協(xié)議棧峰弹,構(gòu)建 gRPC 服務(wù)端,Netty HTTP/2 協(xié)議棧初始化代碼如下所示(創(chuàng)建 NettyServerHandler芜果,NettyServerHandler 類):
frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
Http2ConnectionDecoder decoder = new FixedHttp2ConnectionDecoder(connection, encoder,
frameReader);
Http2Settings settings = new Http2Settings();
settings.initialWindowSize(flowControlWindow);
settings.maxConcurrentStreams(maxStreams);
settings.maxHeaderListSize(maxHeaderListSize);
return new NettyServerHandler(
transportListener, streamTracerFactories, decoder, encoder, settings, maxMessageSize,
keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
keepAliveEnforcer);
創(chuàng)建 gRPC FrameListener鞠呈,作為 Http2FrameListener,監(jiān)聽 HTTP/2 消息的讀取师幕,回調(diào)到 NettyServerHandler 中(NettyServerHandler 類):
decoder().frameListener(new FrameListener());
將 NettyServerHandler 添加到 Netty 的 ChannelPipeline 中粟按,接收和發(fā)送 HTTP/2 消息(NettyServerTransport 類):
ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler);
channel.pipeline().addLast(negotiationHandler);
gRPC 服務(wù)端請(qǐng)求和響應(yīng)消息統(tǒng)一由 NettyServerHandler 攔截處理,相關(guān)方法如下:
NettyServerHandler 是 gRPC 應(yīng)用側(cè)和底層協(xié)議棧的橋接類霹粥,負(fù)責(zé)將原生的 HTTP/2 消息調(diào)度到 gRPC 應(yīng)用側(cè)灭将,同時(shí)將應(yīng)用側(cè)的消息發(fā)送到協(xié)議棧。
服務(wù)實(shí)例創(chuàng)建和綁定
gRPC 服務(wù)端啟動(dòng)時(shí)后控,需要將調(diào)用的接口實(shí)現(xiàn)類實(shí)例注冊(cè)到內(nèi)部的服務(wù)注冊(cè)中心庙曙,用于后續(xù)的接口調(diào)用,關(guān)鍵代碼如下(InternalHandlerRegistry 類)
Builder addService(ServerServiceDefinition service) {
services.put(service.getServiceDescriptor().getName(), service);
return this;
}
服務(wù)接口綁定時(shí)浩淘,由 Proto3 工具生成代碼捌朴,重載 bindService() 方法(GreeterImplBase 類):
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_SAY_HELLO,
asyncUnaryCall(
new MethodHandlers<
io.grpc.examples.helloworld.HelloRequest,
io.grpc.examples.helloworld.HelloReply>(
this, METHODID_SAY_HELLO)))
.build();
}
service 調(diào)用
gRPC 消息的接收
gRPC 消息的接入由 Netty HTTP/2 協(xié)議棧回調(diào) gRPC 的 FrameListener张抄,進(jìn)而調(diào)用 NettyServerHandler 的 onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers) 和 onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)砂蔽,
消息頭和消息體的處理,主要由 MessageDeframer 的 deliver 方法完成署惯,相關(guān)代碼如下(MessageDeframer 類):
if (inDelivery) {
return;
}
inDelivery = true;
try {
while (pendingDeliveries > 0 && readRequiredBytes()) {
switch (state) {
case HEADER:
processHeader();
break;
case BODY:
processBody();
pendingDeliveries--;
break;
default:
throw new AssertionError("Invalid state: " + state);
gRPC 請(qǐng)求消息(PB)的解碼由 PrototypeMarshaller 負(fù)責(zé)左驾,代碼如下 (ProtoLiteUtils 類):
public T parse(InputStream stream) {
if (stream instanceof ProtoInputStream) {
ProtoInputStream protoStream = (ProtoInputStream) stream;
if (protoStream.parser() == parser) {
try {
T message = (T) ((ProtoInputStream) stream).message();
gRPC 響應(yīng)消息發(fā)送
響應(yīng)消息分為兩部分發(fā)送:響應(yīng)消息頭和消息體,分別被封裝成不同的 WriteQueue.AbstractQueuedCommand极谊,插入到 WriteQueue 中诡右。
消息頭封裝代碼(NettyServerStream 類):
public void writeHeaders(Metadata headers) {
writeQueue.enqueue(new SendResponseHeadersCommand(transportState(),
Utils.convertServerHeaders(headers), false),
true);
}
消息體封裝代碼(NettyServerStream 類):
ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf();
final int numBytes = bytebuf.readableBytes();
onSendingBytes(numBytes);
writeQueue.enqueue(
new SendGrpcFrameCommand(transportState(), bytebuf, false),
channel.newPromise().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
transportState().onSentBytes(numBytes);
}
}), flush);
Netty 的 NioEventLoop 將響應(yīng)消息發(fā)送到 ChannelPipeline,最終被 NettyServerHandler 攔截并處理轻猖。
響應(yīng)消息頭處理代碼如下(NettyServerHandler 類):
private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
ChannelPromise promise) throws Http2Exception {
int streamId = cmd.stream().id();
Http2Stream stream = connection().stream(streamId);
if (stream == null) {
resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise);
return;
}
if (cmd.endOfStream()) {
closeStreamWhenDone(promise, streamId);
}
encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
}
響應(yīng)消息體處理代碼如下(NettyServerHandler 類):
private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
ChannelPromise promise) throws Http2Exception {
if (cmd.endStream()) {
closeStreamWhenDone(promise, cmd.streamId());
}
encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
}
服務(wù)接口實(shí)例調(diào)用: 經(jīng)過一系列預(yù)處理帆吻,最終由 ServerCalls 的 ServerCallHandler 調(diào)用服務(wù)接口實(shí)例,代碼如下(ServerCalls 類):
return new EmptyServerCallListener<ReqT>() {
ReqT request;
@Override
public void onMessage(ReqT request) {
this.request = request;
}
@Override
public void onHalfClose() {
if (request != null) {
method.invoke(request, responseObserver);
responseObserver.freeze();
if (call.isReady()) {
onReady();
}
最終的服務(wù)實(shí)現(xiàn)類調(diào)用如下(GreeterGrpc 類):
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_SAY_HELLO:
serviceImpl.sayHello((io.grpc.examples.helloworld.HelloRequest) request,
(io.grpc.stub.StreamObserver<io.grpc.examples.helloworld.HelloReply>) responseObserver);
break;
default:
throw new AssertionError();
}
服務(wù)端線程模型
gRPC 的線程由 Netty 線程 + gRPC 應(yīng)用線程組成咙边,它們之間的交互和切換比較復(fù)雜猜煮,下面做下詳細(xì)介紹。
Netty Server 線程模型
它的工作流程總結(jié)如下:
- 從主線程池(bossGroup)中隨機(jī)選擇一個(gè) Reactor 線程作為 Acceptor 線程败许,用于綁定監(jiān)聽端口友瘤,接收客戶端連接;
- Acceptor 線程接收客戶端連接請(qǐng)求之后創(chuàng)建新的 SocketChannel檐束,將其注冊(cè)到主線程池(bossGroup)的其它 Reactor 線程上,由其負(fù)責(zé)接入認(rèn)證束倍、握手等操作被丧;
- 步驟 2 完成之后盟戏,應(yīng)用層的鏈路正式建立,將 SocketChannel 從主線程池的 Reactor 線程的多路復(fù)用器上摘除甥桂,重新注冊(cè)到 Sub 線程池(workerGroup)的線程上柿究,用于處理 I/O 的讀寫操作。
Netty Server 使用的 NIO 線程實(shí)現(xiàn)是 NioEventLoop黄选,它的職責(zé)如下:
作為服務(wù)端 Acceptor 線程蝇摸,負(fù)責(zé)處理客戶端的請(qǐng)求接入;
作為客戶端 Connecor 線程办陷,負(fù)責(zé)注冊(cè)監(jiān)聽連接操作位貌夕,用于判斷異步連接結(jié)果;
作為 I/O 線程民镜,監(jiān)聽網(wǎng)絡(luò)讀操作位啡专,負(fù)責(zé)從 SocketChannel 中讀取報(bào)文;
作為 I/O 線程制圈,負(fù)責(zé)向 SocketChannel 寫入報(bào)文發(fā)送給對(duì)方们童,如果發(fā)生寫半包,會(huì)自動(dòng)注冊(cè)監(jiān)聽寫事件鲸鹦,用于后續(xù)繼續(xù)發(fā)送半包數(shù)據(jù)慧库,直到數(shù)據(jù)全部發(fā)送完成;
作為定時(shí)任務(wù)線程馋嗜,可以執(zhí)行定時(shí)任務(wù)齐板,例如鏈路空閑檢測和發(fā)送心跳消息等;
作為線程執(zhí)行器可以執(zhí)行普通的任務(wù) Task(Runnable)嵌戈。
gRPC service 線程模型
gRPC 服務(wù)端調(diào)度線程為 SerializingExecutor覆积,它實(shí)現(xiàn)了 Executor 和 Runnable 接口,通過外部傳入的 Executor 對(duì)象熟呛,調(diào)度和處理 Runnable宽档,同時(shí)內(nèi)部又維護(hù)了一個(gè)任務(wù)隊(duì)列 ConcurrentLinkedQueue,通過 run 方法循環(huán)處理隊(duì)列中存放的 Runnable 對(duì)象
線程調(diào)度和切換策略
Netty Server I/O 線程的職責(zé):
gRPC 請(qǐng)求消息的讀取庵朝、響應(yīng)消息的發(fā)送
HTTP/2 協(xié)議消息的編碼和解碼
NettyServerHandler 的調(diào)度
gRPC service 線程的職責(zé):將 gRPC 請(qǐng)求消息(PB 碼流)反序列化為接口的請(qǐng)求參數(shù)對(duì)象
將接口響應(yīng)對(duì)象序列化為 PB 碼流
gRPC 服務(wù)端接口實(shí)現(xiàn)類調(diào)用
gRPC 的線程模型遵循 Netty 的線程分工原則吗冤,即:協(xié)議層消息的接收和編解碼由 Netty 的 I/O(NioEventLoop) 線程負(fù)責(zé);后續(xù)應(yīng)用層的處理由應(yīng)用線程負(fù)責(zé)九府,防止由于應(yīng)用處理耗時(shí)而阻塞 Netty 的 I/O 線程椎瘟。
基于上述分工原則,在 gRPC 請(qǐng)求消息的接入和響應(yīng)發(fā)送過程中侄旬,系統(tǒng)不斷的在 Netty I/O 線程和 gRPC 應(yīng)用線程之間進(jìn)行切換肺蔚。明白了分工原則,也就能夠理解為什么要做頻繁的線程切換儡羔。
gRPC 線程模型存在的一個(gè)缺點(diǎn)宣羊,就是在一次 RPC 調(diào)用過程中璧诵,做了多次 I/O 線程到應(yīng)用線程之間的切換,頻繁切換會(huì)導(dǎo)致性能下降仇冯,這也是為什么 gRPC 性能比一些基于私有協(xié)議構(gòu)建的 RPC 框架性能低的一個(gè)原因之宿。盡管 gRPC 的性能已經(jīng)比較優(yōu)異,但是仍有一定的優(yōu)化空間苛坚。