grpc客戶端原理

gRPC 是在 HTTP/2 之上實現(xiàn)的 RPC 框架檩奠,HTTP/2 是第 7 層(應(yīng)用層)協(xié)議,它運行在 TCP(第 4 層 - 傳輸層)協(xié)議之上,相比于傳統(tǒng)的 REST/JSON 機制grpc有諸多的優(yōu)點:

  • 基于 HTTP/2 之上的二進制協(xié)議(Protobuf 序列化機制);
  • 一個連接上可以多路復(fù)用(stream)牛郑,并發(fā)處理多個請求和響應(yīng)喇聊;
  • 多種語言的類庫實現(xiàn)笑诅;
  • 服務(wù)定義文件和自動代碼生成(.proto 文件和 Protobuf 編譯工具)靠益。

一個完整的 RPC 調(diào)用流程示例如下:

image.png

gRPC 的 RPC 調(diào)用與上述流程相似,下面我們一起學習下 gRPC 的客戶端創(chuàng)建和服務(wù)調(diào)用流程穿挨。

demo

以 gRPC 入門級的 helloworld Demo 為例,客戶端發(fā)起 RPC 調(diào)用的代碼主要包括如下幾部分:

  • 根據(jù) hostname 和 port 創(chuàng)建 ManagedChannelImpl;
  • 根據(jù) helloworld.proto 文件生成的 GreeterGrpc 創(chuàng)建客戶端 Stub肴盏,用來發(fā)起 RPC 調(diào)用科盛;
  • 使用客戶端 Stub(GreeterBlockingStub)發(fā)起 RPC 調(diào)用,獲取響應(yīng)菜皂。

相關(guān)示例代碼如下所示(HelloWorldClient 類):

HelloWorldClient(ManagedChannelBuilder<?> channelBuilder) {
    channel = channelBuilder.build();
    blockingStub = GreeterGrpc.newBlockingStub(channel);
    futureStub = GreeterGrpc.newFutureStub(channel);
    stub = GreeterGrpc.newStub(channel);
  }
  public void blockingGreet(String name) {
    logger.info("Will try to greet " + name + " ...");
    HelloRequest request = HelloRequest.newBuilder().setName(name).build();
    try {
      HelloReply response = blockingStub
              .sayHello(request);
...

gRPC 的客戶端調(diào)用主要包括基于 Netty 的 HTTP/2 客戶端創(chuàng)建贞绵、客戶端負載均衡、請求消息的發(fā)送和響應(yīng)接收處理四個流程恍飘。

gRPC 的客戶端調(diào)用總體流程如下圖所示:

image.png

gRPC 的客戶端調(diào)用流程如下:

  • 客戶端 Stub(GreeterBlockingStub) 調(diào)用 sayHello(request)榨崩,發(fā)起 RPC 調(diào)用;
  • 通過 DnsNameResolver 進行域名解析章母,獲取服務(wù)端的地址信息(列表)母蛛,隨后使用默認的 LoadBalancer 策略,選擇一個具體的 gRPC 服務(wù)端實例乳怎;
  • 如果與路由選中的服務(wù)端之間沒有可用的連接彩郊,則創(chuàng)建 NettyClientTransport 和 NettyClientHandler,發(fā)起 HTTP/2 連接;
  • 對請求消息使用 PB(Protobuf)做序列化秫逝,通過 HTTP/2 Stream 發(fā)送給 gRPC 服務(wù)端恕出;
  • 接收到服務(wù)端響應(yīng)之后,使用 PB(Protobuf)做反序列化违帆;
  • 回調(diào) GrpcFuture 的 set(Response) 方法浙巫,喚醒阻塞的客戶端調(diào)用線程,獲取 RPC 響應(yīng)刷后。

需要指出的是的畴,客戶端同步阻塞 RPC 調(diào)用阻塞的是調(diào)用方線程(通常是業(yè)務(wù)線程),底層 Transport 的 I/O 線程(Netty 的 NioEventLoop)仍然是非阻塞的惠险。

ManagedChannel 創(chuàng)建流程
ManagedChannel 是對 Transport 層 SocketChannel 的抽象苗傅,Transport 層負責協(xié)議消息的序列化和反序列化,以及協(xié)議消息的發(fā)送和讀取班巩。

ManagedChannel 將處理后的請求和響應(yīng)傳遞給與之相關(guān)聯(lián)的 ClientCall 進行上層處理渣慕,同時,ManagedChannel 提供了對 Channel 的生命周期管理(鏈路創(chuàng)建抱慌、空閑逊桦、關(guān)閉等)。

ManagedChannel 提供了接口式的切面 ClientInterceptor抑进,它可以攔截 RPC 客戶端調(diào)用强经,注入擴展點,以及功能定制寺渗,方便框架的使用者對 gRPC 進行功能擴展匿情。

ManagedChannel 的主要實現(xiàn)類 ManagedChannelImpl 創(chuàng)建流程如下:

image.png
  • 使用 builder 模式創(chuàng)建 ManagedChannelBuilder 實現(xiàn)類 NettyChannelBuilder,NettyChannelBuilder 提供了 buildTransportFactory 工廠方法創(chuàng)建 NettyTransportFactory信殊,最終用于創(chuàng)建 NettyClientTransport炬称;
  • 初始化 HTTP/2 連接方式:采用 plaintext 協(xié)商模式還是默認的 TLS 模式,HTTP/2 的連接有兩種模式涡拘,h2(基于 TLS 之上構(gòu)建的 HTTP/2)和 h2c(直接在 TCP 之上構(gòu)建的 HTTP/2)玲躯;
  • 創(chuàng)建 NameResolver.Factory 工廠類,用于服務(wù)端 URI 的解析鳄乏,gRPC 默認采用 DNS 域名解析方式跷车。

ManagedChannel 實例構(gòu)造完成之后,即可創(chuàng)建 ClientCall橱野,發(fā)起 RPC 調(diào)用朽缴。

ClientCall 創(chuàng)建流程

完成 ManagedChannelImpl 創(chuàng)建之后,由 ManagedChannelImpl 發(fā)起創(chuàng)建一個新的 ClientCall 實例仲吏。ClientCall 的用途是業(yè)務(wù)應(yīng)用層的消息調(diào)度和處理不铆,它的典型用法如下:

 call = channel.newCall(unaryMethod, callOptions);
 call.start(listener, headers);
 call.sendMessage(message);
 call.halfClose();
 call.request(1);

ClientCall 實例的創(chuàng)建流程如下所示:


image.png
  • ClientCallImpl 的主要構(gòu)造參數(shù)是 MethodDescriptor 和 CallOptions蝌焚,其中 MethodDescriptor 存放了需要調(diào)用 RPC 服務(wù)的接口名、方法名誓斥、服務(wù)調(diào)用的方式(例如 UNARY 類型)以及請求和響應(yīng)的序列化和反序列化實現(xiàn)類只洒。
    CallOptions 則存放了 RPC 調(diào)用的其它附加信息,例如超時時間劳坑、鑒權(quán)信息毕谴、消息長度限制和執(zhí)行客戶端調(diào)用的線程池等。

  • 設(shè)置壓縮和解壓縮的注冊類(CompressorRegistry 和 DecompressorRegistry)距芬,以便可以按照指定的壓縮算法對 HTTP/2 消息做壓縮和解壓縮涝开。

ClientCallImpl 實例創(chuàng)建完成之后,就可以調(diào)用 ClientTransport框仔,創(chuàng)建 HTTP/2 Client舀武,向 gRPC 服務(wù)端發(fā)起遠程服務(wù)調(diào)用。

基于 Netty 的 HTTP/2 Client 創(chuàng)建流程

gRPC 客戶端底層基于 Netty4.1 的 HTTP/2 協(xié)議椑胝叮框架構(gòu)建银舱,以便可以使用 HTTP/2 協(xié)議來承載 RPC 消息,在滿足標準化規(guī)范的前提下跛梗,提升通信性能寻馏。

gRPC HTTP/2 協(xié)議棧(客戶端)的關(guān)鍵實現(xiàn)是 NettyClientTransport 和 NettyClientHandler,客戶端初始化流程如下所示:

image.png
  • NettyClientHandler 的創(chuàng)建:級聯(lián)創(chuàng)建 Netty 的 Http2FrameReader核偿、Http2FrameWriter 和 Http2Connection诚欠,用于構(gòu)建基于 Netty 的 gRPC HTTP/2 客戶端協(xié)議棧。
  • HTTP/2 Client 啟動:仍然基于 Netty 的 Bootstrap 來初始化并啟動客戶端漾岳,但是有兩個細節(jié)需要注意:
    NettyClientHandler(實際被包裝成 ProtocolNegotiator.Handler轰绵,用于 HTTP/2 的握手協(xié)商)創(chuàng)建之后,不是由傳統(tǒng)的 ChannelInitializer 在初始化 Channel 時將 NettyClientHandler 加入到 pipeline 中尼荆,而是直接通過 Bootstrap 的 handler 方法直接加入到 pipeline 中藏澳,以便可以立即接收發(fā)送任務(wù)。
    客戶端使用的 work 線程組并非通常意義的 EventLoopGroup耀找,而是一個 EventLoop:即 HTTP/2 客戶端使用的 work 線程并非一組線程(默認線程數(shù)為 CPU 內(nèi)核 * 2),而是一個 EventLoop 線程业崖。這個其實也很容易理解野芒,一個 NioEventLoop 線程可以同時處理多個 HTTP/2 客戶端連接,它是多路復(fù)用的双炕,對于單個 HTTP/2 客戶端狞悲,如果默認獨占一個 work 線程組,將造成極大的資源浪費妇斤,同時也可能會導(dǎo)致句柄溢出(并發(fā)啟動大量 HTTP/2 客戶端)摇锋。
  • WriteQueue 創(chuàng)建:Netty 的 NioSocketChannel 初始化并向 Selector 注冊之后(發(fā)起 HTTP 連接之前)丹拯,立即由 NettyClientHandler 創(chuàng)建 WriteQueue,用于接收并處理 gRPC 內(nèi)部的各種 Command荸恕,例如鏈路關(guān)閉指令乖酬、發(fā)送 Frame 指令、發(fā)送 Ping 指令等融求。

HTTP/2 Client 創(chuàng)建完成之后咬像,即可由客戶端根據(jù)協(xié)商策略發(fā)起 HTTP/2 連接。如果連接創(chuàng)建成功生宛,后續(xù)即可復(fù)用該 HTTP/2 連接县昂,進行 RPC 調(diào)用。

HTTP/2 連接創(chuàng)建流程

HTTP/2 在 TCP 連接之初通過協(xié)商的方式進行通信陷舅,只有協(xié)商成功倒彰,才能進行后續(xù)的業(yè)務(wù)層數(shù)據(jù)發(fā)送和接收。

HTTP/2 的版本標識分為兩類:

  • 基于 TLS 之上構(gòu)架的 HTTP/2, 即 HTTPS莱睁,使用 h2 表示(ALPN):0x68 與 0x32待讳;
  • 直接在 TCP 之上構(gòu)建的 HTTP/2, 即 HTTP,使用 h2c 表示缩赛。

HTTP/2 連接創(chuàng)建耙箍,分為兩種:通過協(xié)商升級協(xié)議方式和直接連接方式。

假如不知道服務(wù)端是否支持 HTTP/2酥馍,可以先使用 HTTP/1.1 進行協(xié)商辩昆,客戶端發(fā)送協(xié)商請求消息(只含消息頭),報文示例如下:

GET / HTTP/1.1
Host: 127.0.0.1
Connection: Upgrade, HTTP2-Settings
Upgrade: h2c
HTTP2-Settings: <base64url encoding of HTTP/2 SETTINGS payload>

服務(wù)端接收到協(xié)商請求之后旨袒,如果不支持 HTTP/2汁针,則直接按照 HTTP/1.1 響應(yīng)返回,雙方通過 HTTP/1.1 進行通信砚尽,報文示例如下:

HTTP/1.1 200 OK
Content-Length: 28
Content-Type: text/css

body...

如果服務(wù)端支持 HTTP/2, 則協(xié)商成功施无,返回 101 結(jié)果碼,通知客戶端一起升級到 HTTP/2 進行通信必孤,示例報文如下:

HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Upgrade: h2c

[ HTTP/2 connection...

101 響應(yīng)之后猾骡,服務(wù)需要發(fā)送 SETTINGS 幀作為連接序言,客戶端接收到 101 響應(yīng)之后敷搪,也必須發(fā)送一個序言作為回應(yīng)兴想,示例如下:

PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n
SETTINGS 幀

客戶端序言發(fā)送完成之后,可以不需要等待服務(wù)端的 SETTINGS 幀赡勘,而直接發(fā)送業(yè)務(wù)請求 Frame嫂便。

假如客戶端和服務(wù)端已經(jīng)約定使用 HTTP/2, 則可以免去 101 協(xié)商和切換流程,直接發(fā)起 HTTP/2 連接闸与,具體流程如下所示:

image.png

幾個關(guān)鍵點:

  • 如果已經(jīng)明確知道服務(wù)端支持 HTTP/2毙替,則可免去通過 HTTP/1.1 101 協(xié)議切換方式進行升級岸售,TCP 連接建立之后即可發(fā)送序言,否則只能在接收到服務(wù)端 101 響應(yīng)之后發(fā)送序言厂画;
  • 針對一個連接凸丸,服務(wù)端第一個要發(fā)送的幀必須是 SETTINGS 幀,連接序言所包含的 SETTINGS 幀可以為空木羹;
  • 客戶端可以在發(fā)送完序言之后發(fā)送應(yīng)用幀數(shù)據(jù)甲雅,不用等待來自服務(wù)器端的序言 SETTINGS 幀。

gRPC 支持三種 Protocol Negotiator 策略:

  • PlaintextNegotiator:明確服務(wù)端支持 HTTP/2坑填,采用 HTTP 直接連接的方式與服務(wù)端建立 HTTP/2 連接抛人,省去 101 協(xié)議切換過程;
  • PlaintextUpgradeNegotiator:不清楚服務(wù)端是否支持 HTTP/2脐瑰,采用 HTTP/1.1 協(xié)商模式切換升級到 HTTP/2
  • TlsNegotiator:在 TLS 之上構(gòu)建 HTTP/2妖枚,協(xié)商采用 ALPN 擴展協(xié)議,以 “h2” 作為協(xié)議標識符苍在。

下面我們以 PlaintextNegotiator 為例绝页,了解下基于 Netty 的 HTTP/2 連接創(chuàng)建流程:


image.png
負載均衡策略

總體上看,RPC 的負載均衡策略有兩大類:

  • 服務(wù)端負載均衡(例如代理模式寂恬、外部負載均衡服務(wù))
  • 客戶端負載均衡(內(nèi)置負載均衡策略和算法续誉,客戶端實現(xiàn))

外部負載均衡模式如下所示:

image.png

以代理 LB 模式為例:RPC 客戶端向負載均衡代理發(fā)送請求,負載均衡代理按照指定的路由策略初肉,將請求消息轉(zhuǎn)發(fā)到后端可用的服務(wù)實例上酷鸦。負載均衡代理負責維護后端可用的服務(wù)列表,如果發(fā)現(xiàn)某個服務(wù)不可用牙咏,則將其剔除出路由表臼隔。

代理 LB 模式的優(yōu)點是客戶端不需要實現(xiàn)負載均衡策略算法,也不需要維護后端的服務(wù)列表信息妄壶,不直接跟后端的服務(wù)進行通信摔握,在做網(wǎng)絡(luò)安全邊界隔離時,非常實用丁寄。例如通過 Nginx 做 L7 層負載均衡氨淌,將互聯(lián)網(wǎng)前端的流量安全的接入到后端服務(wù)中。

代理 LB 模式通常支持 L4(Transport)和 L7(Application) 層負載均衡伊磺,兩者各有優(yōu)缺點宁舰,可以根據(jù) RPC 的協(xié)議特點靈活選擇。L4/L7 層負載均衡對應(yīng)場景如下:

  • L4 層:對時延要求苛刻奢浑、資源損耗少、RPC 本身采用私有 TCP 協(xié)議腋腮;
  • L7 層:有會話狀態(tài)的連接雀彼、HTTP 協(xié)議簇(例如 Restful)壤蚜。

客戶端負載均衡策略由客戶端內(nèi)置負載均衡能力,通過靜態(tài)配置徊哑、域名解析服務(wù)(例如 DNS 服務(wù))袜刷、訂閱發(fā)布(例如 Zookeeper 服務(wù)注冊中心)等方式獲取 RPC 服務(wù)端地址列表,并將地址列表緩存到客戶端內(nèi)存中莺丑。

每次 RPC 調(diào)用時著蟹,根據(jù)客戶端配置的負載均衡策略由負載均衡算法從緩存的服務(wù)地址列表中選擇一個服務(wù)實例,發(fā)起 RPC 調(diào)用梢莽。

客戶端負載均衡策略工作原理示例如下:


image.png

gRPC 默認采用客戶端負載均衡策略萧豆,同時提供了擴展機制,使用者通過自定義實現(xiàn) NameResolver 和 LoadBalancer昏名,即可覆蓋 gRPC 默認的負載均衡策略涮雷,實現(xiàn)自定義路由策略的擴展。

gRPC 提供的負載均衡策略實現(xiàn)類如下所示:

  • PickFirstBalancer:無負載均衡能力轻局,即使有多個服務(wù)端地址可用洪鸭,也只選擇第一個地址;
  • RoundRobinLoadBalancer:“RoundRobin” 負載均衡策略仑扑。

gRPC 負載均衡流程如下所示:

image.png

流程關(guān)鍵技術(shù)點解讀:

  • 負載均衡功能模塊的輸入是客戶端指定的 hostName览爵、需要調(diào)用的接口名和方法名等參數(shù),輸出是執(zhí)行負載均衡算法后獲得的 NettyClientTransport镇饮,通過 NettyClientTransport 可以創(chuàng)建基于 Netty HTTP/2 的 gRPC 客戶端蜓竹,發(fā)起 RPC 調(diào)用;
  • gRPC 系統(tǒng)默認提供的是 DnsNameResolver盒让,它通過 InetAddress.getAllByName(host) 獲取指定 host 的 IP 地址列表(本地 DNS 服務(wù))梅肤,對于擴展者而言,可以繼承 NameResolver 實現(xiàn)自定義的地址解析服務(wù)邑茄,例如使用 Zookeeper 替換 DnsNameResolver姨蝴,把 Zookeeper 作為動態(tài)的服務(wù)地址配置中心,它的偽代碼示例如下:

第一步:繼承 NameResolver肺缕,實現(xiàn) start(Listener listener) 方法:

void start(Listener listener)
{
 // 獲取 ZooKeeper 地址左医,并連接
 // 創(chuàng)建 Watcher,并實現(xiàn) process(WatchedEvent event)同木,監(jiān)聽地址變更
 // 根據(jù)接口名和方法名浮梢,調(diào)用 getChildren 方法,獲取發(fā)布該服務(wù)的地址列表
// 將地址列表加到 List 中
// 調(diào)用 NameResolver.Listener.onAddresses(), 通知地址解析完成

第二步:創(chuàng)建 ManagedChannelBuilder 時彤路,指定 Target 的地址為 Zookeeper 服務(wù)端地址秕硝,同時設(shè)置 nameResolver 為 Zookeeper NameResolver, 示例代碼如下所示:

this(ManagedChannelBuilder.forTarget(zookeeperAddr)
        .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
        .nameResolverFactory(new ZookeeperNameResolverProvider())
        .usePlaintext(false));
  • LoadBalancer 負責從 nameResolver 中解析獲得的服務(wù)端 URL 中按照指定路由策略,選擇一個目標服務(wù)端地址洲尊,并創(chuàng)建 ClientTransport远豺。同樣奈偏,可以通過覆蓋 handleResolvedAddressGroups 實現(xiàn)自定義負載均衡策略。

通過 LoadBalancer + NameResolver躯护,可以實現(xiàn)靈活的負載均衡策略擴展惊来。例如基于 Zookeeper、etcd 的分布式配置服務(wù)中心方案棺滞。

RPC 請求消息發(fā)送流程

gRPC 默認基于 Netty HTTP/2 + PB 進行 RPC 調(diào)用裁蚁,請求消息發(fā)送流程如下所示:

image.png
  • ClientCallImpl 的 sendMessage 調(diào)用,主要完成了請求對象的序列化(基于 PB)继准、HTTP/2 Frame 的初始化枉证;
  • ClientCallImpl 的 halfClose 調(diào)用將客戶端準備就緒的請求 Frame 封裝成自定義的 SendGrpcFrameCommand,寫入到 WriteQueue 中锰瘸;
  • WriteQueue 執(zhí)行 flush() 將 SendGrpcFrameCommand 寫入到 Netty 的 Channel 中刽严,調(diào)用 Channel 的 write 方法,被 NettyClientHandler 攔截到避凝,由 NettyClientHandler 負責具體的發(fā)送操作舞萄;
  • NettyClientHandler 調(diào)用 Http2ConnectionEncoder 的 writeData 方法,將 Frame 寫入到 HTTP/2 Stream 中管削,完成請求消息的發(fā)送倒脓。
RPC 響應(yīng)接收和處理流程

gRPC 客戶端響應(yīng)消息的接收入口是 NettyClientHandler,它的處理流程如下所示:


image.png

流程關(guān)鍵技術(shù)點解讀:

  • NettyClientHandler 的 onHeadersRead(int streamId, Http2Headers headers, boolean endStream) 方法會被調(diào)用兩次含思,根據(jù) endStream 判斷是否是 Stream 結(jié)尾崎弃;
  • 請求和響應(yīng)的關(guān)聯(lián):根據(jù) streamId 可以關(guān)聯(lián)同一個 HTTP/2 Stream,將 NettyClientStream 緩存到 Stream 中含潘,客戶端就可以在接收到響應(yīng)消息頭或消息體時還原出 NettyClientStream饲做,進行后續(xù)處理;
  • RPC 客戶端調(diào)用線程的阻塞和喚醒使用到了 GrpcFuture 的 wait 和 notify 機制遏弱,來實現(xiàn)客戶端調(diào)用線程的同步阻塞和喚醒盆均;
  • 客戶端和服務(wù)端的 HTTP/2 Header 和 Data Frame 解析共用同一個方法,即 MessageDeframer 的 deliver()漱逸。

客戶端源碼分析

gRPC 客戶端調(diào)用原理并不復(fù)雜泪姨,但是代碼卻相對比較繁雜。下面圍繞關(guān)鍵的類庫饰抒,對主要功能點進行源碼分析肮砾。

NettyClientTransport 功能和源碼分析
NettyClientTransport 的主要功能如下:

  • 通過 start(Listener transportListener) 創(chuàng)建 HTTP/2 Client,并連接 gRPC 服務(wù)端袋坑;
  • 通過 newStream(MethodDescriptor method, Metadata headers, CallOptions callOptions) 創(chuàng)建 ClientStream仗处;
  • 通過 shutdown() 關(guān)閉底層的 HTTP/2 連接。

以啟動 HTTP/2 客戶端為例進行講解(NettyClientTransport 類):

EventLoop eventLoop = group.next();
    if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) {
      keepAliveManager = new KeepAliveManager(
          new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos, keepAliveTimeoutNanos,
          keepAliveWithoutCalls);
    }
    handler = NettyClientHandler.newHandler(lifecycleManager, keepAliveManager, flowControlWindow,
        maxHeaderListSize, Ticker.systemTicker(), tooManyPingsRunnable);
    HandlerSettings.setAutoWindow(handler);
    negotiationHandler = negotiator.newHandler(handler);

根據(jù)啟動時配置的 HTTP/2 協(xié)商策略,以 NettyClientHandler 為參數(shù)創(chuàng)建 ProtocolNegotiator.Handler婆誓。

創(chuàng)建 Bootstrap咒精,并設(shè)置 EventLoopGroup,需要指出的是旷档,此處并沒有使用 EventLoopGroup,而是它的一種實現(xiàn)類 EventLoop歇拆,原因在前文中已經(jīng)說明鞋屈,相關(guān)代碼示例如下(NettyClientTransport 類):

Bootstrap b = new Bootstrap();
    b.group(eventLoop);
    b.channel(channelType);
    if (NioSocketChannel.class.isAssignableFrom(channelType)) {
      b.option(SO_KEEPALIVE, true);
    }

創(chuàng)建 WriteQueue 并設(shè)置到 NettyClientHandler 中,用于接收內(nèi)部的各種 QueuedCommand故觅,初始化完成之后厂庇,發(fā)起 HTTP/2 連接,代碼如下(NettyClientTransport 類):

handler.startWriteQueue(channel);
    channel.connect(address).addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture future) throws Exception {
        if (!future.isSuccess()) {
          ChannelHandlerContext ctx = future.channel().pipeline().context(handler);
          if (ctx != null) {
            ctx.fireExceptionCaught(future.cause());
          }
          future.channel().pipeline().fireExceptionCaught(future.cause());
        }
NettyClientHandler 功能和源碼分析

NettyClientHandler 繼承自 Netty 的 Http2ConnectionHandler输吏,是 gRPC 接收和發(fā)送 HTTP/2 消息的關(guān)鍵實現(xiàn)類权旷,也是 gRPC 和 Netty 的交互橋梁,它的主要功能如下所示:

  • 發(fā)送各種協(xié)議消息給 gRPC 服務(wù)端贯溅;
  • 接收 gRPC 服務(wù)端返回的應(yīng)答消息頭拄氯、消息體和其它協(xié)議消息;
  • 處理 HTTP/2 協(xié)議相關(guān)的指令它浅,例如 StreamError译柏、ConnectionError 等。

協(xié)議消息的發(fā)送:無論是業(yè)務(wù)請求消息姐霍,還是協(xié)議指令消息鄙麦,都統(tǒng)一封裝成 QueuedCommand,由 NettyClientHandler 攔截并處理镊折,相關(guān)代碼如下所示(NettyClientHandler 類):

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
          throws Exception {
    if (msg instanceof CreateStreamCommand) {
      createStream((CreateStreamCommand) msg, promise);
    } else if (msg instanceof SendGrpcFrameCommand) {
      sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
    } else if (msg instanceof CancelClientStreamCommand) {
      cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
    } else if (msg instanceof SendPingCommand) {
      sendPingFrame(ctx, (SendPingCommand) msg, promise);
    } else if (msg instanceof GracefulCloseCommand) {
      gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
    } else if (msg instanceof ForcefulCloseCommand) {
      forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
    } else if (msg == NOOP_MESSAGE) {
      ctx.write(Unpooled.EMPTY_BUFFER, promise);
    } else {
      throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
    }

協(xié)議消息的接收:NettyClientHandler 通過向 Http2ConnectionDecoder 注冊 FrameListener 來監(jiān)聽 RPC 響應(yīng)消息和協(xié)議指令消息胯府,相關(guān)接口如下:


image.png

FrameListener 回調(diào) NettyClientHandler 的相關(guān)方法,實現(xiàn)協(xié)議消息的接收和處理:


image.png

需要指出的是恨胚,NettyClientHandler 并沒有實現(xiàn)所有的回調(diào)接口骂因,對于需要特殊處理的幾個方法進行了重載,例如 onDataRead 和 onHeadersRead与纽。

ProtocolNegotiator 功能和源碼分析

ProtocolNegotiator 用于 HTTP/2 連接創(chuàng)建的協(xié)商侣签,gRPC 支持三種策略并有三個實現(xiàn)子類:


image.png

gRPC 的 ProtocolNegotiator 實現(xiàn)類完全遵循 HTTP/2 相關(guān)規(guī)范,以 PlaintextUpgradeNegotiator 為例急迂,通過設(shè)置 Http2ClientUpgradeCodec影所,用于 101 協(xié)商和協(xié)議升級,相關(guān)代碼如下所示(PlaintextUpgradeNegotiator 類):

public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
      Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(handler);
      HttpClientCodec httpClientCodec = new HttpClientCodec();
      final HttpClientUpgradeHandler upgrader =
          new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000);
      return new BufferingHttp2UpgradeHandler(upgrader);
    }

LoadBalancer 功能和源碼分析

LoadBalancer 負責客戶端負載均衡僚碎,它是個抽象類猴娩,gRPC 框架的使用者可以通過繼承的方式進行擴展。

gRPC 當前已經(jīng)支持 PickFirstBalancer 和 RoundRobinLoadBalancer 兩種負載均衡策略,未來不排除會提供更多的策略卷中。

以 RoundRobinLoadBalancer 為例矛双,它的工作原理如下:根據(jù) PickSubchannelArgs 來選擇一個 Subchannel(RoundRobinLoadBalancerFactory 類):

public PickResult pickSubchannel(PickSubchannelArgs args) {
      if (size > 0) {
        return PickResult.withSubchannel(nextSubchannel());
      }
      if (status != null) {
        return PickResult.withError(status);
      }
      return PickResult.withNoResult();
    }

再看下 Subchannel 的選擇算法(Picker 類):

private Subchannel nextSubchannel() {
      if (size == 0) {
        throw new NoSuchElementException();
      }
      synchronized (this) {
        Subchannel val = list.get(index);
        index++;
        if (index >= size) {
          index = 0;
        }
        return val;
      }
    }

即通過順序的方式從服務(wù)端列表中獲取一個 Subchannel。
如果用戶需要定制負載均衡策略蟆豫,則可以在 RPC 調(diào)用時议忽,使用如下代碼(HelloWorldClient 類):

this(ManagedChannelBuilder.forAddress(host, port).loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance()).nameResolverFactory(new ZkNameResolverProvider()) .usePlaintext(true));
ClientCalls 功能和源碼分析

ClientCalls 提供了各種 RPC 調(diào)用方式,包括同步十减、異步栈幸、Streaming 和 Unary 方式等,相關(guān)方法如下所示:


image.png

下面一起看下 RPC 請求消息的發(fā)送和應(yīng)答接收相關(guān)代碼帮辟。

RPC 請求調(diào)用源碼分析

請求調(diào)用主要有兩步:請求 Frame 構(gòu)造和 Frame 發(fā)送速址,請求 Frame 構(gòu)造代碼如下所示(ClientCallImpl 類):

public void sendMessage(ReqT message) {
    Preconditions.checkState(stream != null, "Not started");
    Preconditions.checkState(!cancelCalled, "call was cancelled");
    Preconditions.checkState(!halfCloseCalled, "call was half-closed");
    try {
      InputStream messageIs = method.streamRequest(message);
      stream.writeMessage(messageIs);
...

使用 PB 對請求消息做序列化,生成 InputStream由驹,構(gòu)造請求 Frame:

private int writeUncompressed(InputStream message, int messageLength) throws IOException {
    if (messageLength != -1) {
      statsTraceCtx.outboundWireSize(messageLength);
      return writeKnownLengthUncompressed(message, messageLength);
    }
    BufferChainOutputStream bufferChain = new BufferChainOutputStream();
    int written = writeToOutputStream(message, bufferChain);
    if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
      throw Status.INTERNAL
          .withDescription(
              String.format("message too large %d > %d", written , maxOutboundMessageSize))
          .asRuntimeException();
    }
    writeBufferChain(bufferChain, false);
    return written;
}

Frame 發(fā)送代碼如下所示:

public void writeFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
      ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf();
      final int numBytes = bytebuf.readableBytes();
      if (numBytes > 0) {
        onSendingBytes(numBytes);
        writeQueue.enqueue(
            new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream),
            channel.newPromise().addListener(new ChannelFutureListener() {
              @Override
              public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                  transportState().onSentBytes(numBytes);
                }
              }
            }), flush);

NettyClientHandler 接收到發(fā)送事件之后芍锚,調(diào)用 Http2ConnectionEncoder 將 Frame 寫入 Netty HTTP/2 協(xié)議棧(NettyClientHandler 類):

private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
      ChannelPromise promise) {
    encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
  }
RPC 響應(yīng)接收和處理源碼分析

響應(yīng)消息的接收入口是 NettyClientHandler,包括 HTTP/2 Header 和 HTTP/2 DATA Frame 兩部分蔓榄,代碼如下(NettyClientHandler 類):

private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) {
    NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
    stream.transportHeadersReceived(headers, endStream);
    if (keepAliveManager != null) {
      keepAliveManager.onDataReceived();
    }
  }

如果參數(shù) endStream 為 True忘朝,說明 Stream 已經(jīng)結(jié)束方淤,調(diào)用 transportTrailersReceived颁督,通知 Listener close爸舒,代碼如下所示(AbstractClientStream2 類):

if (stopDelivery || isDeframerStalled()) {
        deliveryStalledTask = null;
        closeListener(status, trailers);
      } else {
        deliveryStalledTask = new Runnable() {
          @Override
          public void run() {
            closeListener(status, trailers);
          }
        };
      }

讀取到 HTTP/2 DATA Frame 之后,調(diào)用 MessageDeframer 的 deliver 對 Frame 進行解析壹若,代碼如下(MessageDeframer 類):

private void deliver() {
    if (inDelivery) {
      return;
    }
    inDelivery = true;
    try {
          while (pendingDeliveries > 0 && readRequiredBytes()) {
        switch (state) {
          case HEADER:
            processHeader();
            break;
          case BODY:
            processBody();
...

將 Frame 轉(zhuǎn)換成 InputStream 之后嗅钻,通知 ClientStreamListenerImpl,調(diào)用 messageRead(final InputStream message)店展,將 InputStream 反序列化為響應(yīng)對象养篓,相關(guān)代碼如下所示(ClientStreamListenerImpl 類):

public void messageRead(final InputStream message) {
      class MessageRead extends ContextRunnable {
        MessageRead() {
          super(context);
        }
        @Override
        public final void runInContext() {
          try {
            if (closed) {
              return;
            }
            try {
              observer.onMessage(method.parseResponse(message));
            } finally {
              message.close();
            }

當接收到 endOfStream 之后,通知 ClientStreamListenerImpl赂蕴,調(diào)用它的 close 方法柳弄,如下所示(ClientStreamListenerImpl 類):

private void close(Status status, Metadata trailers) {
      closed = true;
      cancelListenersShouldBeRemoved = true;
      try {
        closeObserver(observer, status, trailers);
      } finally {
        removeContextListenerAndCancelDeadlineFuture();
      }
    }

最終調(diào)用 UnaryStreamToFuture 的 onClose 方法,set 響應(yīng)對象概说,喚醒阻塞的調(diào)用方線程碧注,完成 RPC 調(diào)用,代碼如下(UnaryStreamToFuture 類):

public void onClose(Status status, Metadata trailers) {
      if (status.isOk()) {
        if (value == null) {
          responseFuture.setException(
              Status.INTERNAL.withDescription("No value received for unary call")
                  .asRuntimeException(trailers));
        }
        responseFuture.set(value);
      } else {
        responseFuture.setException(status.asRuntimeException(trailers));
      }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末糖赔,一起剝皮案震驚了整個濱河市萍丐,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌放典,老刑警劉巖逝变,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件基茵,死亡現(xiàn)場離奇詭異,居然都是意外死亡壳影,警方通過查閱死者的電腦和手機拱层,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來宴咧,“玉大人根灯,你說我怎么就攤上這事〔粽ぃ” “怎么了箱吕?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長柿冲。 經(jīng)常有香客問我,道長兆旬,這世上最難降的妖魔是什么假抄? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮丽猬,結(jié)果婚禮上宿饱,老公的妹妹穿的比我還像新娘。我一直安慰自己脚祟,他們只是感情好谬以,可當我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著由桌,像睡著了一般为黎。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上行您,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天铭乾,我揣著相機與錄音,去河邊找鬼娃循。 笑死炕檩,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的捌斧。 我是一名探鬼主播笛质,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼捞蚂!你這毒婦竟也來了妇押?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤洞难,失蹤者是張志新(化名)和其女友劉穎舆吮,沒想到半個月后揭朝,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡色冀,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年潭袱,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片锋恬。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡屯换,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出与学,到底是詐尸還是另有隱情彤悔,我是刑警寧澤,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布索守,位于F島的核電站晕窑,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏卵佛。R本人自食惡果不足惜杨赤,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望截汪。 院中可真熱鬧疾牲,春花似錦、人聲如沸衙解。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蚓峦。三九已至舌剂,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間暑椰,已是汗流浹背架诞。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留干茉,地道東北人谴忧。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像角虫,于是被迫代替她去往敵國和親沾谓。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 原文出處:gRPC gRPC分享 概述 gRPC 一開始由 google 開發(fā)戳鹅,是一款語言中立均驶、平臺中立、開源的遠...
    小波同學閱讀 7,219評論 0 18
  • gRPC 是一個高性能枫虏、通用的開源RPC框架妇穴,基于HTTP/2協(xié)議標準和Protobuf序列化協(xié)議開發(fā)爬虱,支持眾多的...
    小波同學閱讀 19,492評論 6 19
  • 1.簡介 在gRPC中,客戶端應(yīng)用程序可以直接調(diào)用不同計算機上的服務(wù)器應(yīng)用程序上的方法腾它,就像它是本地對象一樣跑筝,使您...
    第八共同體閱讀 1,879評論 0 6
  • 1)簡介 gRPC負載平衡的主要實現(xiàn)機制是外部負載平衡,即通過外部負載平衡器來向客戶端提供更新后的服務(wù)器列表瞒滴。 g...
    Jay_Guo閱讀 13,271評論 6 22
  • 一曲梗、官方文檔 以下內(nèi)容為官方文檔的中文翻譯,源文檔地址為:https://github.com/grpc/grpc...
    HRocky閱讀 2,546評論 0 0