從grpc源碼講起(Client端的消息發(fā)送)

鋪墊

grpc有三個(gè)核心的抽象層:

Stub

絕大數(shù)開(kāi)發(fā)者會(huì)直接使用的一部分吃靠,proto文件編譯所生成的Stub就是在此層的基礎(chǔ)之上生成的危纫。它提供了一種調(diào)用方和服務(wù)之間類(lèi)型安全的綁定關(guān)系赊锚,對(duì)比http服務(wù):http服務(wù)就不提供類(lèi)型安全的綁定關(guān)系,調(diào)用方和服務(wù)方需要自行處理類(lèi)型轉(zhuǎn)化的相關(guān)工作醉顽。

Channel

channel層是對(duì)數(shù)據(jù)傳輸?shù)某橄蟛埃阌谟脩?hù)更方便的進(jìn)行 攔截器/裝飾者 等類(lèi)似的處理潮梯。它旨在使應(yīng)用程序框架易于使用此層來(lái)解決諸如日志記錄,監(jiān)視惨恭,身份驗(yàn)證等交叉問(wèn)題秉馏。流控制也暴露在此層,以允許更復(fù)雜的應(yīng)用程序直接與其交互脱羡。

Transport

這一層在絕大多數(shù)情況下是我們不需要關(guān)心的萝究,它的作用就是傳輸數(shù)據(jù),底層基于netty锉罐,或者ok http的實(shí)現(xiàn)

grpc請(qǐng)求創(chuàng)建

分析一個(gè)典型的grpc請(qǐng)求創(chuàng)建的代碼

請(qǐng)求從調(diào)用 grpcClient 的getFutureStub開(kāi)始

CrmConsumeServiceGrpc.CrmConsumeServiceFutureStub rechargeService =
        (CrmConsumeServiceGrpc.CrmConsumeServiceFutureStub) grpcClient.getFutureStub(CrmConsumeServiceGrpc.class);

getFutureStub 方法實(shí)際就是調(diào)用了 getStub

public Object getFutureStub(Class grpcClass) throws Exception {
  return getStub(grpcClass, "newFutureStub");
}

實(shí)際上就是通過(guò)反射的方式調(diào)用了了 CrmConsumeServiceGrpc 的newFutureStub 方法

private Object getStub(Class grpcClass, String stubMethodName) throws Exception {
  Method stubMethod = grpcClass.getMethod(stubMethodName, Channel.class);
  return stubMethod.invoke(null, channel);
}

繼續(xù)跟到具體方法

public static CrmConsumeServiceFutureStub newFutureStub(
    io.grpc.Channel channel) {
  return new CrmConsumeServiceFutureStub(channel);
}

類(lèi)的私有構(gòu)造方法

public static final class CrmConsumeServiceFutureStub extends io.grpc.stub.AbstractStub<CrmConsumeServiceFutureStub> {
  private CrmConsumeServiceFutureStub(io.grpc.Channel channel) {
    super(channel);
  }

最終跟到了AbstractStub 類(lèi)帆竹,這個(gè)類(lèi)是stub層的核心類(lèi)之一,實(shí)際上proto文件生成的Stub類(lèi)脓规,都是該類(lèi)的子類(lèi)馆揉。

protected AbstractStub(Channel channel) {
  this(channel, CallOptions.DEFAULT);
}

簡(jiǎn)單的說(shuō),獲取stub的過(guò)程抖拦,就是把GrpcClient 的channel屬性傳遞給Stub的過(guò)程升酣。那由此可以推斷出,channel是在GrpcClient創(chuàng)建的時(shí)候态罪,一起創(chuàng)建的噩茄。 下面看一下GrpcClient的源碼。

 private void init(String target, int maxMessageSize, Tracing tracing, LoadBalancer.Factory loadBalancerFactory, ClientInterceptor... interceptors) {
    LoggingClientInterceptor loggingClientInterceptor = new LoggingClientInterceptor();
    this.allClientInterceptors.add(loggingClientInterceptor);
    if (tracing != null) {
      GrpcTracing grpcTracing = GrpcTracing.create(tracing);
      allClientInterceptors.add(grpcTracing.newClientInterceptor());
    }
    if (interceptors != null) {
      allClientInterceptors.addAll(Arrays.asList(interceptors));
    }

    NettyChannelBuilder builder = NettyChannelBuilder
        .forTarget(target)
        .keepAliveTime(20, TimeUnit.SECONDS)
        .keepAliveTimeout(2, TimeUnit.SECONDS)
        .keepAliveWithoutCalls(true)
        .idleTimeout(24, TimeUnit.HOURS)
        .usePlaintext(true)
        .intercept(allClientInterceptors)
        .nameResolverFactory(NameResolverProvider.asFactory())
        .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance());
    if (loadBalancerFactory != null) {
      builder.loadBalancerFactory(loadBalancerFactory);
    }

    if (maxMessageSize > 0) {
      builder.maxInboundMessageSize(maxMessageSize);
    }
    channel = builder.build();
  }
  

調(diào)用 NettyChannelBuilder的build方法复颈,創(chuàng)建了channel绩聘。下面的注釋也驗(yàn)證了這點(diǎn),NettyChannelBuilder 創(chuàng)建了一個(gè)以Netty 作為傳輸層的 channel耗啦。其中除了必要的target凿菩,也就是服務(wù)地址以外,傳遞給channel的參數(shù)還包含了三個(gè)比較重要的屬性 intercept,namemResolverFactory,loadBalancerFactory帜讲。 衅谷。

/**
 * A builder to help simplify construction of channels using the Netty transport.
 */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1784")
@CanIgnoreReturnValue
public final class NettyChannelBuilder

先看一下channel能夠做什么

Channel的注釋?zhuān)?/p>

A virtual connection to a conceptual endpoint, to perform RPCs. A channel is free to have zero or many actual connections to the endpoint based on configuration, load, etc. A channel is also free to determine which actual endpoints to use and may change it every RPC, permitting client-side load balancing. Applications are generally expected to use stubs instead of calling this class directly.
Applications can add common cross-cutting behaviors to stubs by decorating Channel implementations using ClientInterceptor. It is expected that most application code will not use this class directly but rather work with stubs that have been bound to a Channel that was decorated during application initialization.

Channel是一個(gè)虛擬的鏈接,它可以維護(hù)任意數(shù)量的真實(shí)鏈接似将,也可以自主選擇具體使用的鏈接获黔。也就是說(shuō),在channel上在验,我們可以實(shí)現(xiàn)客戶(hù)端的 load balancing玷氏。另外,注釋中建議應(yīng)用使用stubs腋舌,而不是直接調(diào)用channel盏触。其實(shí)也就說(shuō),我們可以避開(kāi)Stub的創(chuàng)建,直接調(diào)用Channel赞辩。另外通過(guò)實(shí)現(xiàn)ClientInterceptor 可以實(shí)現(xiàn)對(duì)Channel的橫切(cross-cutting)雌芽,這個(gè)也是在Channel層做到的。

方法的調(diào)用

在看方法調(diào)用的源碼前诗宣,先思考一個(gè)問(wèn)題膘怕。grpc 實(shí)現(xiàn)遠(yuǎn)程調(diào)用想诅,需要哪些參數(shù)召庞。
首先遠(yuǎn)程調(diào)用首先是一個(gè)方法調(diào)用,所以需要定位一個(gè)方法来破,以及對(duì)應(yīng)的參數(shù)篮灼。
另外因?yàn)槭沁h(yuǎn)端請(qǐng)求,所以還需要一個(gè)遠(yuǎn)端服務(wù)地址(Channel)徘禁,遠(yuǎn)端的請(qǐng)求诅诱,必然涉及到序列化和反序列化的過(guò)程,另外grpc提供了類(lèi)型安全的方法調(diào)用送朱,所以序列化和反序列化的時(shí)候娘荡,也需要獲取到 request 和response 的具體類(lèi)型。

    public com.google.common.util.concurrent.ListenableFuture<com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentResponse> updateAgentInfo(
        com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentRequest request) {
      return futureUnaryCall(
          getChannel().newCall(METHOD_UPDATE_AGENT_INFO, getCallOptions()), request);
    }
  }

grpc 方法的調(diào)用就是調(diào)用proto自動(dòng)生成的Stub上的方法驶沼,所有的的方法都是調(diào)用了 形如futureUnaryCall的方法炮沐,只是 每個(gè)方法傳入的 METHOD_UPDATE_AGENT_INFO參數(shù)不一樣。

  public static final io.grpc.MethodDescriptor<com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentRequest,
      com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentResponse> METHOD_UPDATE_AGENT_INFO =
      io.grpc.MethodDescriptor.create(
          io.grpc.MethodDescriptor.MethodType.UNARY,
          generateFullMethodName(
              "UpdateAgentInfoInterface", "updateAgentInfo"),
          io.grpc.protobuf.ProtoUtils.marshaller(com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentRequest.getDefaultInstance()),
          io.grpc.protobuf.ProtoUtils.marshaller(com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentResponse.getDefaultInstance()));

METHOD_UPDATE_AGENT_INFO 保存了 FullMethodName回怜,還有兩個(gè)marshaller大年,marshaller保存著request和response的類(lèi)信息,用于序列化和反序列化玉雾。

到此為止翔试,grpc發(fā)起的一個(gè)遠(yuǎn)端請(qǐng)求需要的信息怎么處理的我們都已經(jīng)了解了。Channel通過(guò)GrpcClient創(chuàng)建复旬,調(diào)用的方法信息和序列化的信息在Stub中保存垦缅,request 是調(diào)用方創(chuàng)建的。

之后其實(shí)就是這些信息的加工和組合驹碍。

首先第一步獲取Channel失都,然后創(chuàng)建一個(gè)Call。Channel毫無(wú)疑問(wèn)就是剛剛創(chuàng)建的ManagedChannelImpl幸冻。

    public com.google.common.util.concurrent.ListenableFuture<com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentResponse> updateAgentInfo(
        com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentRequest request) {
      return futureUnaryCall(
          getChannel().newCall(METHOD_UPDATE_AGENT_INFO, getCallOptions()), request);
    }
  }
  

newCall創(chuàng)建了一個(gè) ClientCall粹庞,在看newCall的源碼前,我們先看一下ClientCall文檔

An instance of a call to a remote method. A call will send zero or more request messages to the server and receive zero or more response messages back.
Instances are created by a Channel and used by stubs to invoke their remote behavior.

clientCall是一個(gè)調(diào)用遠(yuǎn)程方法的實(shí)例洽损,由Channel創(chuàng)建庞溜,被stubs調(diào)用。所以clientCasll是真正的發(fā)送請(qǐng)求的對(duì)象。另外在clientCall在創(chuàng)建的時(shí)候流码,也會(huì)創(chuàng)建對(duì)應(yīng)的攔截器又官。

思考一個(gè)問(wèn)題,攔截器的功能實(shí)現(xiàn)漫试,需要考慮哪些東西六敬。攔截器的效果就是在請(qǐng)求真正執(zhí)行前,獲取到相關(guān)的信息驾荣,可以進(jìn)行鑒權(quán)外构,日志等等。那么就是說(shuō)播掷,攔截器主要功能又兩點(diǎn)审编,一個(gè)就是獲取到請(qǐng)求信息。另外一個(gè)就是對(duì)請(qǐng)求的轉(zhuǎn)發(fā)(包括轉(zhuǎn)發(fā)到另外的攔截器和真實(shí)請(qǐng)求)

了解到這之后歧匈,我們看一下newCall方法垒酬。

  private static class InterceptorChannel extends Channel {
    private final Channel channel;
    private final ClientInterceptor interceptor;

    private InterceptorChannel(Channel channel, ClientInterceptor interceptor) {
      this.channel = channel;
      this.interceptor = Preconditions.checkNotNull(interceptor, "interceptor");
    }

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
      return interceptor.interceptCall(method, callOptions, channel);
    }

newCall方法設(shè)計(jì)的很巧妙,首先它本身繼承自Channel件炉,另外還有還有一個(gè)Channel的屬性勘究。在調(diào)用newCall方法的時(shí)候,Channel屬性被當(dāng)作參數(shù)傳入了interceptCall中斟冕。如果InterceptorChannel中的channel口糕,就同樣是一個(gè)InterceptorChannel,那么如果在interceptCall的實(shí)現(xiàn)中宫静,繼續(xù)調(diào)用channel的newCall走净,那就順序的創(chuàng)建了一批call

   this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);

下面看一下interceptorChannel到底是怎么初始化的,首先interceptorChannel是在ManagedChannelImpl出事的時(shí)候孤里,在構(gòu)造方法中初始化好的伏伯。傳入了兩個(gè)參數(shù),一個(gè)是RealChannel捌袜,一個(gè)是build的時(shí)候傳入的interceptorts(List<ClientInterceptor>)參數(shù)

該方法的注釋?zhuān)蟾乓馑季褪撬到粒@段邏輯是為了攔截器用的

Create a new Channel that will call interceptors before starting a call on the given channel. The last interceptor will have its ClientInterceptor.interceptCall called first.
  public static Channel intercept(Channel channel, List<? extends ClientInterceptor> interceptors) {
    Preconditions.checkNotNull(channel, "channel");
    for (ClientInterceptor interceptor : interceptors) {
      channel = new InterceptorChannel(channel, interceptor);
    }
    return channel;
  }

下面繼續(xù)看 newCall 方法本身,實(shí)際上就是inteceptor的inteceptorCall方法虏等。

       @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
        
            return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(methodDescriptor, callOptions)) {

                @Override
                public void start(Listener<RespT> responseListener, Metadata headers) {

                    super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
                        @Override

可以看到在這個(gè)方法內(nèi)部確實(shí)調(diào)用了channel的newCall方法弄唧,新創(chuàng)建的一個(gè)匿名類(lèi)SimpleForwardingClientCall,創(chuàng)建這個(gè)類(lèi)的過(guò)程中調(diào)用了 channel的newCall霍衫。注意一個(gè)細(xì)節(jié)候引,在重寫(xiě)start方法后,還調(diào)用了super的start的方法

public abstract class ForwardingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
  /**
   * Returns the delegated {@code ClientCall}.
   */
  protected abstract ClientCall<ReqT, RespT> delegate();

  @Override
  public void start(Listener<RespT> responseListener, Metadata headers) {
    delegate().start(responseListener, headers);
  }

ForwardingClientCall 就是典型的代理模式敦跌,在調(diào)用A方法的時(shí)候澄干,實(shí)際上調(diào)用的都是代理類(lèi)的方法。delegate 就是nextChannel創(chuàng)建的call, 注意到剛剛在攔截器中重寫(xiě)start的時(shí)候麸俘,最后調(diào)用了父類(lèi)的start辩稽,實(shí)際上就是通過(guò)ForwardingClientCall 轉(zhuǎn)發(fā)到了代理類(lèi)的start方法上

接下來(lái)就縷一下整個(gè)client的創(chuàng)建過(guò)程。

假設(shè):
項(xiàng)目里有兩個(gè)攔截器从媚,logging   auth
然后創(chuàng)建了兩個(gè)InterceptorChannel 
loggingChannel:channel realChannel inteceptor logging
authChannel:channel loggingChannel  inteceprot auth
返回authChannel
調(diào)用authChannel 的newCall方法逞泄。
先調(diào)用 auth 的interceptCall 方法創(chuàng)建call (authCall)
在authCall創(chuàng)建的時(shí)候
    調(diào)用authChannel的interceptCall 方法
    然后繼續(xù)調(diào)用loggingChanne的newCall方法(loggingCall)
    在loggingCall的創(chuàng)建過(guò)程中
        調(diào)用的realCall的new Call方法 
        創(chuàng)建一個(gè)ClientCallImpl
最后方法執(zhí)行結(jié)束,返回authCall

authCall的執(zhí)行(以start方法為例)

 先調(diào)用authCall的start方法
 然后調(diào)用super.start(),super.start()方法中的delegated 是logginCall
 執(zhí)行 loggingCall的start方法
 loggingCall也同樣調(diào)用 super的start拜效,最終開(kāi)始執(zhí)行realCall的start方法

創(chuàng)建

0B808183-B2A1-45B1-95D1-FBDE8B02B00D.png

調(diào)用

4C776402-3342-4C52-B487-A69D03039196.png

攔截器實(shí)際就是創(chuàng)建了一個(gè)clientCall喷众,也就是說(shuō),clientCall的所有方法都可以被攔截拂檩,到底是在方法執(zhí)行前攔截侮腹,還是方法執(zhí)行后攔截嘲碧,可以通過(guò)調(diào)用super方法的時(shí)機(jī)來(lái)確定稻励。ClientCall在創(chuàng)建的時(shí)候還可以創(chuàng)建對(duì)應(yīng)的Listener。下面會(huì)介紹Listener的作用

86140735-3D02-4727-B9D3-DEB8F1E5F100.png

剛剛說(shuō)了這么多愈涩,其實(shí)方法還在在說(shuō)Stub里方法望抽。下面繼續(xù)看futureUnaryCall的調(diào)用過(guò)程。futureUnaryCall方法是ClientCalls的一個(gè)方法履婉。

  public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(
      ClientCall<ReqT, RespT> call,
      ReqT param) {
    GrpcFuture<RespT> responseFuture = new GrpcFuture<RespT>(call);
    asyncUnaryRequestCall(call, param, new UnaryStreamToFuture<RespT>(responseFuture), false);
    return responseFuture;
  }

futureUnaryCall方法中煤篙,創(chuàng)建了UnaryStreamToFuture。UnaryStreamToFuture是一個(gè)Listener毁腿,這就涉及到了設(shè)計(jì)模式里的觀察者模式辑奈,觀察者模式主要有兩個(gè)要點(diǎn),一個(gè)是事件和觀察者的綁定已烤,一個(gè)是事件觸發(fā)的時(shí)候鸠窗,會(huì)調(diào)用觀察者。

  private static final class UnaryStreamToFuture<RespT> extends ClientCall.Listener<RespT> {
    private final GrpcFuture<RespT> responseFuture;
    private RespT value;

開(kāi)始實(shí)際發(fā)送請(qǐng)求
startCall胯究,處理了call 和callListener的綁定稍计。
call.sendMessage(param) 方法開(kāi)始發(fā)送請(qǐng)求
call.halfClose() 是只關(guān)閉了這個(gè)call 發(fā)送 request message,但是不影響response的接收
實(shí)際上到此為止裕循,請(qǐng)求的發(fā)送就已經(jīng)結(jié)束了臣嚣。
但是除了發(fā)送請(qǐng)求以外,還要接收請(qǐng)求剥哑。攔截器有一個(gè)onMessage方法硅则,就是收到消息的時(shí)候,會(huì)調(diào)用的接口株婴。我們通過(guò)這個(gè)切入點(diǎn)來(lái)看一下client端是怎么接收消息的怎虫。

  private static <ReqT, RespT> void asyncUnaryRequestCall(
      ClientCall<ReqT, RespT> call,
      ReqT param,
      ClientCall.Listener<RespT> responseListener,
      boolean streamingResponse) {
    startCall(call, responseListener, streamingResponse);
    try {
      call.sendMessage(param);
      call.halfClose();
    } catch (RuntimeException e) {
      throw cancelThrow(call, e);
    } catch (Error e) {
      throw cancelThrow(call, e);
    }
  }

startCall方法中,調(diào)用了call的start方法。所以攔截器中也可以自定義的添加具體的listener揪垄。

     private static <ReqT, RespT> void startCall(ClientCall<ReqT, RespT> call,
      ClientCall.Listener<RespT> responseListener, boolean streamingResponse) {
    call.start(responseListener, new Metadata());
    if (streamingResponse) {
      call.request(1);
    } else {
      // Initially ask for two responses from flow-control so that if a misbehaving server sends
      // more than one responses, we can catch it and fail it in the listener.
      call.request(2);
    }
  }

所以攔截器在重寫(xiě)start方法的時(shí)候穷吮,也可以傳入對(duì)應(yīng)的listener,看一下listener的創(chuàng)建

    super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
        Override
        public void onMessage(RespT message) {
            //如果當(dāng)前在線(xiàn)饥努,則onMessage是從服務(wù)端返回捡鱼,這就相當(dāng)于一次服務(wù)可用性的檢測(cè)
            healthCheck.setProxyStatus(true);
            super.onMessage(message);
        }
    }, headers)

這個(gè)是項(xiàng)目里面accessTokenInterceptor里創(chuàng)建的Listener,可以看到也是創(chuàng)建里一個(gè)SimpleForwarding***酷愧,在onMessage的時(shí)候驾诈,也是調(diào)用了super的onMessage方法,所以listenter和clientCall類(lèi)似溶浴,也是通過(guò)代理模式實(shí)現(xiàn)的轉(zhuǎn)發(fā)乍迄。

看一下創(chuàng)建的真正實(shí)際工作的Call

      return new ClientCallImpl<ReqT, RespT>(
              method,
              executor,
              callOptions,
              transportProvider,
              terminated ? null : transportFactory.getScheduledExecutorService())
          .setFullStreamDecompression(fullStreamDecompression)
          .setDecompressorRegistry(decompressorRegistry)
          .setCompressorRegistry(compressorRegistry);

傳入的參數(shù)有

method :指定執(zhí)行的方法

transportProvider: 提供鏈路信息

真正執(zhí)行的start方法

 if (!deadlineExceeded) {
      updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(),
          context.getDeadline(), headers);
      ClientTransport transport = clientTransportProvider.get(
          new PickSubchannelArgsImpl(method, headers, callOptions));
      Context origContext = context.attach();
      try {
        stream = transport.newStream(method, headers, callOptions);
      } finally {
        context.detach(origContext);
      }
    } else {
      stream = new FailingClientStream(DEADLINE_EXCEEDED);
    }

    if (callOptions.getAuthority() != null) {
      stream.setAuthority(callOptions.getAuthority());
    }
    if (callOptions.getMaxInboundMessageSize() != null) {
      stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
    }
    if (callOptions.getMaxOutboundMessageSize() != null) {
      stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
    }
    stream.setCompressor(compressor);
    stream.setFullStreamDecompression(fullStreamDecompression);
    stream.setDecompressorRegistry(decompressorRegistry);
    stream.start(new ClientStreamListenerImpl(observer));

創(chuàng)建了一個(gè)stream ,并且執(zhí)行了stream 的start方法士败。
在創(chuàng)建的時(shí)候闯两,傳入了method信息,header信息谅将。并且執(zhí)行了stream 的start方法漾狼。

看一下stream的作用

A single stream of communication between two end-points within a transport.

在transport基礎(chǔ)上,用作兩端通信的流

看一下這個(gè)stream的具體作用

  public final void start(ClientStreamListener listener) {
    transportState().setListener(listener);
    if (!useGet) {
      abstractClientStreamSink().writeHeaders(headers, null);
      headers = null;
    }
  }

在Sink 里寫(xiě)入header

@Override
    public void writeHeaders(Metadata headers, byte[] requestPayload) {
      // Convert the headers into Netty HTTP/2 headers.
      AsciiString defaultPath = (AsciiString) methodDescriptorAccessor.geRawMethodName(method);
      if (defaultPath == null) {
        defaultPath = new AsciiString("/" + method.getFullMethodName());
        methodDescriptorAccessor.setRawMethodName(method, defaultPath);
      }
      boolean get = (requestPayload != null);
      AsciiString httpMethod;
      if (get) {
        // Forge the query string
        // TODO(ericgribkoff) Add the key back to the query string
        defaultPath =
            new AsciiString(defaultPath + "?" + BaseEncoding.base64().encode(requestPayload));
        httpMethod = Utils.HTTP_GET_METHOD;
      } else {
        httpMethod = Utils.HTTP_METHOD;
      }
      Http2Headers http2Headers = Utils.convertClientHeaders(headers, scheme, defaultPath,
          authority, httpMethod, userAgent);

      ChannelFutureListener failureListener = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
          if (!future.isSuccess()) {
            // Stream creation failed. Close the stream if not already closed.
            // When the channel is shutdown, the lifecycle manager has a better view of the failure,
            // especially before negotiation completes (because the negotiator commonly doesn't
            // receive the execeptionCaught because NettyClientHandler does not propagate it).
            Status s = transportState().handler.getLifecycleManager().getShutdownStatus();
            if (s == null) {
              s = transportState().statusFromFailedFuture(future);
            }
            transportState().transportReportStatus(s, true, new Metadata());
          }
        }
      };

      // Write the command requesting the creation of the stream.
      writeQueue.enqueue(new CreateStreamCommand(http2Headers, transportState(), get),
          !method.getType().clientSendsOneMessage() || get).addListener(failureListener);
    }

主要做了兩件事

1:在header前添加上method饥臂,構(gòu)建一個(gè)http2.0 的header

2: 把這個(gè)header 寫(xiě)入寫(xiě)隊(duì)列逊躁。

后面再詳細(xì)的調(diào)用就是netty框架的寫(xiě)入執(zhí)行過(guò)程。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末隅熙,一起剝皮案震驚了整個(gè)濱河市稽煤,隨后出現(xiàn)的幾起案子爹谭,更是在濱河造成了極大的恐慌谜喊,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,084評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件窖维,死亡現(xiàn)場(chǎng)離奇詭異弯淘,居然都是意外死亡绿店,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,623評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)庐橙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)假勿,“玉大人,你說(shuō)我怎么就攤上這事态鳖∽啵” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 163,450評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵浆竭,是天一觀的道長(zhǎng)浸须。 經(jīng)常有香客問(wèn)我惨寿,道長(zhǎng),這世上最難降的妖魔是什么删窒? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,322評(píng)論 1 293
  • 正文 為了忘掉前任裂垦,我火速辦了婚禮,結(jié)果婚禮上肌索,老公的妹妹穿的比我還像新娘蕉拢。我一直安慰自己,他們只是感情好诚亚,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,370評(píng)論 6 390
  • 文/花漫 我一把揭開(kāi)白布晕换。 她就那樣靜靜地躺著,像睡著了一般站宗。 火紅的嫁衣襯著肌膚如雪闸准。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,274評(píng)論 1 300
  • 那天梢灭,我揣著相機(jī)與錄音夷家,去河邊找鬼。 笑死或辖,一個(gè)胖子當(dāng)著我的面吹牛瘾英,可吹牛的內(nèi)容都是我干的枣接。 我是一名探鬼主播颂暇,決...
    沈念sama閱讀 40,126評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼但惶!你這毒婦竟也來(lái)了耳鸯?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,980評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤膀曾,失蹤者是張志新(化名)和其女友劉穎县爬,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體添谊,經(jīng)...
    沈念sama閱讀 45,414評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡财喳,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,599評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了斩狱。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片耳高。...
    茶點(diǎn)故事閱讀 39,773評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖所踊,靈堂內(nèi)的尸體忽然破棺而出泌枪,到底是詐尸還是另有隱情,我是刑警寧澤秕岛,帶...
    沈念sama閱讀 35,470評(píng)論 5 344
  • 正文 年R本政府宣布碌燕,位于F島的核電站误证,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏修壕。R本人自食惡果不足惜愈捅,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,080評(píng)論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望慈鸠。 院中可真熱鬧改鲫,春花似錦、人聲如沸林束。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,713評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)壶冒。三九已至缕题,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間胖腾,已是汗流浹背烟零。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,852評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留咸作,地道東北人锨阿。 一個(gè)月前我還...
    沈念sama閱讀 47,865評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像记罚,于是被迫代替她去往敵國(guó)和親墅诡。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,689評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理桐智,服務(wù)發(fā)現(xiàn)末早,斷路器,智...
    卡卡羅2017閱讀 134,654評(píng)論 18 139
  • (目前有點(diǎn)亂说庭,先貼上來(lái)然磷,等以后有時(shí)間在整理吧。這個(gè)問(wèn)題一直想拿出來(lái)分享刊驴,還有兩個(gè)博客姿搜,都是相關(guān)的,一點(diǎn)點(diǎn)發(fā)出來(lái)) ...
    kamiSDY閱讀 4,371評(píng)論 0 2
  • 兩個(gè)人在一起吵吵鬧鬧走過(guò)8年捆憎,要不是8月發(fā)生那個(gè)事情舅柜,我不會(huì)停下來(lái)思考,思緒萬(wàn)千地把點(diǎn)點(diǎn)滴滴都想著攻礼,或許經(jīng)歷一些事...
    伊志如此閱讀 337評(píng)論 0 0
  • 昨天晚上半夜還在談工作室的事情业踢,雖然說(shuō)今天8點(diǎn)才起來(lái),還是沒(méi)有能成功早起礁扮,但是起來(lái)后買(mǎi)了早餐吃了早餐就去將昨天平面...
    Jennifer_怡閱讀 59評(píng)論 0 0