鋪墊
grpc有三個(gè)核心的抽象層:
Stub
絕大數(shù)開(kāi)發(fā)者會(huì)直接使用的一部分吃靠,proto文件編譯所生成的Stub就是在此層的基礎(chǔ)之上生成的危纫。它提供了一種調(diào)用方和服務(wù)之間類(lèi)型安全的綁定關(guān)系赊锚,對(duì)比http服務(wù):http服務(wù)就不提供類(lèi)型安全的綁定關(guān)系,調(diào)用方和服務(wù)方需要自行處理類(lèi)型轉(zhuǎn)化的相關(guān)工作醉顽。
Channel
channel層是對(duì)數(shù)據(jù)傳輸?shù)某橄蟛埃阌谟脩?hù)更方便的進(jìn)行 攔截器/裝飾者 等類(lèi)似的處理潮梯。它旨在使應(yīng)用程序框架易于使用此層來(lái)解決諸如日志記錄,監(jiān)視惨恭,身份驗(yàn)證等交叉問(wèn)題秉馏。流控制也暴露在此層,以允許更復(fù)雜的應(yīng)用程序直接與其交互脱羡。
Transport
這一層在絕大多數(shù)情況下是我們不需要關(guān)心的萝究,它的作用就是傳輸數(shù)據(jù),底層基于netty锉罐,或者ok http的實(shí)現(xiàn)
grpc請(qǐng)求創(chuàng)建
分析一個(gè)典型的grpc請(qǐng)求創(chuàng)建的代碼
請(qǐng)求從調(diào)用 grpcClient 的getFutureStub開(kāi)始
CrmConsumeServiceGrpc.CrmConsumeServiceFutureStub rechargeService =
(CrmConsumeServiceGrpc.CrmConsumeServiceFutureStub) grpcClient.getFutureStub(CrmConsumeServiceGrpc.class);
getFutureStub 方法實(shí)際就是調(diào)用了 getStub
public Object getFutureStub(Class grpcClass) throws Exception {
return getStub(grpcClass, "newFutureStub");
}
實(shí)際上就是通過(guò)反射的方式調(diào)用了了 CrmConsumeServiceGrpc 的newFutureStub 方法
private Object getStub(Class grpcClass, String stubMethodName) throws Exception {
Method stubMethod = grpcClass.getMethod(stubMethodName, Channel.class);
return stubMethod.invoke(null, channel);
}
繼續(xù)跟到具體方法
public static CrmConsumeServiceFutureStub newFutureStub(
io.grpc.Channel channel) {
return new CrmConsumeServiceFutureStub(channel);
}
類(lèi)的私有構(gòu)造方法
public static final class CrmConsumeServiceFutureStub extends io.grpc.stub.AbstractStub<CrmConsumeServiceFutureStub> {
private CrmConsumeServiceFutureStub(io.grpc.Channel channel) {
super(channel);
}
最終跟到了AbstractStub 類(lèi)帆竹,這個(gè)類(lèi)是stub層的核心類(lèi)之一,實(shí)際上proto文件生成的Stub類(lèi)脓规,都是該類(lèi)的子類(lèi)馆揉。
protected AbstractStub(Channel channel) {
this(channel, CallOptions.DEFAULT);
}
簡(jiǎn)單的說(shuō),獲取stub的過(guò)程抖拦,就是把GrpcClient 的channel屬性傳遞給Stub的過(guò)程升酣。那由此可以推斷出,channel是在GrpcClient創(chuàng)建的時(shí)候态罪,一起創(chuàng)建的噩茄。 下面看一下GrpcClient的源碼。
private void init(String target, int maxMessageSize, Tracing tracing, LoadBalancer.Factory loadBalancerFactory, ClientInterceptor... interceptors) {
LoggingClientInterceptor loggingClientInterceptor = new LoggingClientInterceptor();
this.allClientInterceptors.add(loggingClientInterceptor);
if (tracing != null) {
GrpcTracing grpcTracing = GrpcTracing.create(tracing);
allClientInterceptors.add(grpcTracing.newClientInterceptor());
}
if (interceptors != null) {
allClientInterceptors.addAll(Arrays.asList(interceptors));
}
NettyChannelBuilder builder = NettyChannelBuilder
.forTarget(target)
.keepAliveTime(20, TimeUnit.SECONDS)
.keepAliveTimeout(2, TimeUnit.SECONDS)
.keepAliveWithoutCalls(true)
.idleTimeout(24, TimeUnit.HOURS)
.usePlaintext(true)
.intercept(allClientInterceptors)
.nameResolverFactory(NameResolverProvider.asFactory())
.loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance());
if (loadBalancerFactory != null) {
builder.loadBalancerFactory(loadBalancerFactory);
}
if (maxMessageSize > 0) {
builder.maxInboundMessageSize(maxMessageSize);
}
channel = builder.build();
}
調(diào)用 NettyChannelBuilder的build方法复颈,創(chuàng)建了channel绩聘。下面的注釋也驗(yàn)證了這點(diǎn),NettyChannelBuilder 創(chuàng)建了一個(gè)以Netty 作為傳輸層的 channel耗啦。其中除了必要的target凿菩,也就是服務(wù)地址以外,傳遞給channel的參數(shù)還包含了三個(gè)比較重要的屬性 intercept,namemResolverFactory,loadBalancerFactory帜讲。 衅谷。
/**
* A builder to help simplify construction of channels using the Netty transport.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1784")
@CanIgnoreReturnValue
public final class NettyChannelBuilder
先看一下channel能夠做什么
Channel的注釋?zhuān)?/p>
A virtual connection to a conceptual endpoint, to perform RPCs. A channel is free to have zero or many actual connections to the endpoint based on configuration, load, etc. A channel is also free to determine which actual endpoints to use and may change it every RPC, permitting client-side load balancing. Applications are generally expected to use stubs instead of calling this class directly.
Applications can add common cross-cutting behaviors to stubs by decorating Channel implementations using ClientInterceptor. It is expected that most application code will not use this class directly but rather work with stubs that have been bound to a Channel that was decorated during application initialization.
Channel是一個(gè)虛擬的鏈接,它可以維護(hù)任意數(shù)量的真實(shí)鏈接似将,也可以自主選擇具體使用的鏈接获黔。也就是說(shuō),在channel上在验,我們可以實(shí)現(xiàn)客戶(hù)端的 load balancing玷氏。另外,注釋中建議應(yīng)用使用stubs腋舌,而不是直接調(diào)用channel盏触。其實(shí)也就說(shuō),我們可以避開(kāi)Stub的創(chuàng)建,直接調(diào)用Channel赞辩。另外通過(guò)實(shí)現(xiàn)ClientInterceptor 可以實(shí)現(xiàn)對(duì)Channel的橫切(cross-cutting)雌芽,這個(gè)也是在Channel層做到的。
方法的調(diào)用
在看方法調(diào)用的源碼前诗宣,先思考一個(gè)問(wèn)題膘怕。grpc 實(shí)現(xiàn)遠(yuǎn)程調(diào)用想诅,需要哪些參數(shù)召庞。
首先遠(yuǎn)程調(diào)用首先是一個(gè)方法調(diào)用,所以需要定位一個(gè)方法来破,以及對(duì)應(yīng)的參數(shù)篮灼。
另外因?yàn)槭沁h(yuǎn)端請(qǐng)求,所以還需要一個(gè)遠(yuǎn)端服務(wù)地址(Channel)徘禁,遠(yuǎn)端的請(qǐng)求诅诱,必然涉及到序列化和反序列化的過(guò)程,另外grpc提供了類(lèi)型安全的方法調(diào)用送朱,所以序列化和反序列化的時(shí)候娘荡,也需要獲取到 request 和response 的具體類(lèi)型。
public com.google.common.util.concurrent.ListenableFuture<com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentResponse> updateAgentInfo(
com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentRequest request) {
return futureUnaryCall(
getChannel().newCall(METHOD_UPDATE_AGENT_INFO, getCallOptions()), request);
}
}
grpc 方法的調(diào)用就是調(diào)用proto自動(dòng)生成的Stub上的方法驶沼,所有的的方法都是調(diào)用了 形如futureUnaryCall的方法炮沐,只是 每個(gè)方法傳入的 METHOD_UPDATE_AGENT_INFO參數(shù)不一樣。
public static final io.grpc.MethodDescriptor<com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentRequest,
com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentResponse> METHOD_UPDATE_AGENT_INFO =
io.grpc.MethodDescriptor.create(
io.grpc.MethodDescriptor.MethodType.UNARY,
generateFullMethodName(
"UpdateAgentInfoInterface", "updateAgentInfo"),
io.grpc.protobuf.ProtoUtils.marshaller(com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentRequest.getDefaultInstance()),
io.grpc.protobuf.ProtoUtils.marshaller(com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentResponse.getDefaultInstance()));
METHOD_UPDATE_AGENT_INFO 保存了 FullMethodName回怜,還有兩個(gè)marshaller大年,marshaller保存著request和response的類(lèi)信息,用于序列化和反序列化玉雾。
到此為止翔试,grpc發(fā)起的一個(gè)遠(yuǎn)端請(qǐng)求需要的信息怎么處理的我們都已經(jīng)了解了。Channel通過(guò)GrpcClient創(chuàng)建复旬,調(diào)用的方法信息和序列化的信息在Stub中保存垦缅,request 是調(diào)用方創(chuàng)建的。
之后其實(shí)就是這些信息的加工和組合驹碍。
首先第一步獲取Channel失都,然后創(chuàng)建一個(gè)Call。Channel毫無(wú)疑問(wèn)就是剛剛創(chuàng)建的ManagedChannelImpl幸冻。
public com.google.common.util.concurrent.ListenableFuture<com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentResponse> updateAgentInfo(
com.hualala.app.shop.pos.grpc.UpdateAgentInfoInterfaceData.UpdateAgentRequest request) {
return futureUnaryCall(
getChannel().newCall(METHOD_UPDATE_AGENT_INFO, getCallOptions()), request);
}
}
newCall創(chuàng)建了一個(gè) ClientCall粹庞,在看newCall的源碼前,我們先看一下ClientCall文檔
An instance of a call to a remote method. A call will send zero or more request messages to the server and receive zero or more response messages back.
Instances are created by a Channel and used by stubs to invoke their remote behavior.
clientCall是一個(gè)調(diào)用遠(yuǎn)程方法的實(shí)例洽损,由Channel創(chuàng)建庞溜,被stubs調(diào)用。所以clientCasll是真正的發(fā)送請(qǐng)求的對(duì)象。另外在clientCall在創(chuàng)建的時(shí)候流码,也會(huì)創(chuàng)建對(duì)應(yīng)的攔截器又官。
思考一個(gè)問(wèn)題,攔截器的功能實(shí)現(xiàn)漫试,需要考慮哪些東西六敬。攔截器的效果就是在請(qǐng)求真正執(zhí)行前,獲取到相關(guān)的信息驾荣,可以進(jìn)行鑒權(quán)外构,日志等等。那么就是說(shuō)播掷,攔截器主要功能又兩點(diǎn)审编,一個(gè)就是獲取到請(qǐng)求信息。另外一個(gè)就是對(duì)請(qǐng)求的轉(zhuǎn)發(fā)(包括轉(zhuǎn)發(fā)到另外的攔截器和真實(shí)請(qǐng)求)
了解到這之后歧匈,我們看一下newCall方法垒酬。
private static class InterceptorChannel extends Channel {
private final Channel channel;
private final ClientInterceptor interceptor;
private InterceptorChannel(Channel channel, ClientInterceptor interceptor) {
this.channel = channel;
this.interceptor = Preconditions.checkNotNull(interceptor, "interceptor");
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions) {
return interceptor.interceptCall(method, callOptions, channel);
}
newCall方法設(shè)計(jì)的很巧妙,首先它本身繼承自Channel件炉,另外還有還有一個(gè)Channel的屬性勘究。在調(diào)用newCall方法的時(shí)候,Channel屬性被當(dāng)作參數(shù)傳入了interceptCall中斟冕。如果InterceptorChannel中的channel口糕,就同樣是一個(gè)InterceptorChannel,那么如果在interceptCall的實(shí)現(xiàn)中宫静,繼續(xù)調(diào)用channel的newCall走净,那就順序的創(chuàng)建了一批call
this.interceptorChannel = ClientInterceptors.intercept(new RealChannel(), interceptors);
下面看一下interceptorChannel到底是怎么初始化的,首先interceptorChannel是在ManagedChannelImpl出事的時(shí)候孤里,在構(gòu)造方法中初始化好的伏伯。傳入了兩個(gè)參數(shù),一個(gè)是RealChannel捌袜,一個(gè)是build的時(shí)候傳入的interceptorts(List<ClientInterceptor>)參數(shù)
該方法的注釋?zhuān)蟾乓馑季褪撬到粒@段邏輯是為了攔截器用的
Create a new Channel that will call interceptors before starting a call on the given channel. The last interceptor will have its ClientInterceptor.interceptCall called first.
public static Channel intercept(Channel channel, List<? extends ClientInterceptor> interceptors) {
Preconditions.checkNotNull(channel, "channel");
for (ClientInterceptor interceptor : interceptors) {
channel = new InterceptorChannel(channel, interceptor);
}
return channel;
}
下面繼續(xù)看 newCall 方法本身,實(shí)際上就是inteceptor的inteceptorCall方法虏等。
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
可以看到在這個(gè)方法內(nèi)部確實(shí)調(diào)用了channel的newCall方法弄唧,新創(chuàng)建的一個(gè)匿名類(lèi)SimpleForwardingClientCall,創(chuàng)建這個(gè)類(lèi)的過(guò)程中調(diào)用了 channel的newCall霍衫。注意一個(gè)細(xì)節(jié)候引,在重寫(xiě)start方法后,還調(diào)用了super的start的方法
public abstract class ForwardingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
/**
* Returns the delegated {@code ClientCall}.
*/
protected abstract ClientCall<ReqT, RespT> delegate();
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
delegate().start(responseListener, headers);
}
ForwardingClientCall 就是典型的代理模式敦跌,在調(diào)用A方法的時(shí)候澄干,實(shí)際上調(diào)用的都是代理類(lèi)的方法。delegate 就是nextChannel創(chuàng)建的call, 注意到剛剛在攔截器中重寫(xiě)start的時(shí)候麸俘,最后調(diào)用了父類(lèi)的start辩稽,實(shí)際上就是通過(guò)ForwardingClientCall 轉(zhuǎn)發(fā)到了代理類(lèi)的start方法上
接下來(lái)就縷一下整個(gè)client的創(chuàng)建過(guò)程。
假設(shè):
項(xiàng)目里有兩個(gè)攔截器从媚,logging auth
然后創(chuàng)建了兩個(gè)InterceptorChannel
loggingChannel:channel realChannel inteceptor logging
authChannel:channel loggingChannel inteceprot auth
返回authChannel
調(diào)用authChannel 的newCall方法逞泄。
先調(diào)用 auth 的interceptCall 方法創(chuàng)建call (authCall)
在authCall創(chuàng)建的時(shí)候
調(diào)用authChannel的interceptCall 方法
然后繼續(xù)調(diào)用loggingChanne的newCall方法(loggingCall)
在loggingCall的創(chuàng)建過(guò)程中
調(diào)用的realCall的new Call方法
創(chuàng)建一個(gè)ClientCallImpl
最后方法執(zhí)行結(jié)束,返回authCall
authCall的執(zhí)行(以start方法為例)
先調(diào)用authCall的start方法
然后調(diào)用super.start(),super.start()方法中的delegated 是logginCall
執(zhí)行 loggingCall的start方法
loggingCall也同樣調(diào)用 super的start拜效,最終開(kāi)始執(zhí)行realCall的start方法
創(chuàng)建
調(diào)用
攔截器實(shí)際就是創(chuàng)建了一個(gè)clientCall喷众,也就是說(shuō),clientCall的所有方法都可以被攔截拂檩,到底是在方法執(zhí)行前攔截侮腹,還是方法執(zhí)行后攔截嘲碧,可以通過(guò)調(diào)用super方法的時(shí)機(jī)來(lái)確定稻励。ClientCall在創(chuàng)建的時(shí)候還可以創(chuàng)建對(duì)應(yīng)的Listener。下面會(huì)介紹Listener的作用
剛剛說(shuō)了這么多愈涩,其實(shí)方法還在在說(shuō)Stub里方法望抽。下面繼續(xù)看futureUnaryCall的調(diào)用過(guò)程。futureUnaryCall方法是ClientCalls的一個(gè)方法履婉。
public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(
ClientCall<ReqT, RespT> call,
ReqT param) {
GrpcFuture<RespT> responseFuture = new GrpcFuture<RespT>(call);
asyncUnaryRequestCall(call, param, new UnaryStreamToFuture<RespT>(responseFuture), false);
return responseFuture;
}
futureUnaryCall方法中煤篙,創(chuàng)建了UnaryStreamToFuture。UnaryStreamToFuture是一個(gè)Listener毁腿,這就涉及到了設(shè)計(jì)模式里的觀察者模式辑奈,觀察者模式主要有兩個(gè)要點(diǎn),一個(gè)是事件和觀察者的綁定已烤,一個(gè)是事件觸發(fā)的時(shí)候鸠窗,會(huì)調(diào)用觀察者。
private static final class UnaryStreamToFuture<RespT> extends ClientCall.Listener<RespT> {
private final GrpcFuture<RespT> responseFuture;
private RespT value;
開(kāi)始實(shí)際發(fā)送請(qǐng)求
startCall胯究,處理了call 和callListener的綁定稍计。
call.sendMessage(param) 方法開(kāi)始發(fā)送請(qǐng)求
call.halfClose() 是只關(guān)閉了這個(gè)call 發(fā)送 request message,但是不影響response的接收
實(shí)際上到此為止裕循,請(qǐng)求的發(fā)送就已經(jīng)結(jié)束了臣嚣。
但是除了發(fā)送請(qǐng)求以外,還要接收請(qǐng)求剥哑。攔截器有一個(gè)onMessage方法硅则,就是收到消息的時(shí)候,會(huì)調(diào)用的接口株婴。我們通過(guò)這個(gè)切入點(diǎn)來(lái)看一下client端是怎么接收消息的怎虫。
private static <ReqT, RespT> void asyncUnaryRequestCall(
ClientCall<ReqT, RespT> call,
ReqT param,
ClientCall.Listener<RespT> responseListener,
boolean streamingResponse) {
startCall(call, responseListener, streamingResponse);
try {
call.sendMessage(param);
call.halfClose();
} catch (RuntimeException e) {
throw cancelThrow(call, e);
} catch (Error e) {
throw cancelThrow(call, e);
}
}
startCall方法中,調(diào)用了call的start方法。所以攔截器中也可以自定義的添加具體的listener揪垄。
private static <ReqT, RespT> void startCall(ClientCall<ReqT, RespT> call,
ClientCall.Listener<RespT> responseListener, boolean streamingResponse) {
call.start(responseListener, new Metadata());
if (streamingResponse) {
call.request(1);
} else {
// Initially ask for two responses from flow-control so that if a misbehaving server sends
// more than one responses, we can catch it and fail it in the listener.
call.request(2);
}
}
所以攔截器在重寫(xiě)start方法的時(shí)候穷吮,也可以傳入對(duì)應(yīng)的listener,看一下listener的創(chuàng)建
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
Override
public void onMessage(RespT message) {
//如果當(dāng)前在線(xiàn)饥努,則onMessage是從服務(wù)端返回捡鱼,這就相當(dāng)于一次服務(wù)可用性的檢測(cè)
healthCheck.setProxyStatus(true);
super.onMessage(message);
}
}, headers)
這個(gè)是項(xiàng)目里面accessTokenInterceptor里創(chuàng)建的Listener,可以看到也是創(chuàng)建里一個(gè)SimpleForwarding***酷愧,在onMessage的時(shí)候驾诈,也是調(diào)用了super的onMessage方法,所以listenter和clientCall類(lèi)似溶浴,也是通過(guò)代理模式實(shí)現(xiàn)的轉(zhuǎn)發(fā)乍迄。
看一下創(chuàng)建的真正實(shí)際工作的Call
return new ClientCallImpl<ReqT, RespT>(
method,
executor,
callOptions,
transportProvider,
terminated ? null : transportFactory.getScheduledExecutorService())
.setFullStreamDecompression(fullStreamDecompression)
.setDecompressorRegistry(decompressorRegistry)
.setCompressorRegistry(compressorRegistry);
傳入的參數(shù)有
method :指定執(zhí)行的方法
transportProvider: 提供鏈路信息
真正執(zhí)行的start方法
if (!deadlineExceeded) {
updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(),
context.getDeadline(), headers);
ClientTransport transport = clientTransportProvider.get(
new PickSubchannelArgsImpl(method, headers, callOptions));
Context origContext = context.attach();
try {
stream = transport.newStream(method, headers, callOptions);
} finally {
context.detach(origContext);
}
} else {
stream = new FailingClientStream(DEADLINE_EXCEEDED);
}
if (callOptions.getAuthority() != null) {
stream.setAuthority(callOptions.getAuthority());
}
if (callOptions.getMaxInboundMessageSize() != null) {
stream.setMaxInboundMessageSize(callOptions.getMaxInboundMessageSize());
}
if (callOptions.getMaxOutboundMessageSize() != null) {
stream.setMaxOutboundMessageSize(callOptions.getMaxOutboundMessageSize());
}
stream.setCompressor(compressor);
stream.setFullStreamDecompression(fullStreamDecompression);
stream.setDecompressorRegistry(decompressorRegistry);
stream.start(new ClientStreamListenerImpl(observer));
創(chuàng)建了一個(gè)stream ,并且執(zhí)行了stream 的start方法士败。
在創(chuàng)建的時(shí)候闯两,傳入了method信息,header信息谅将。并且執(zhí)行了stream 的start方法漾狼。
看一下stream的作用
A single stream of communication between two end-points within a transport.
在transport基礎(chǔ)上,用作兩端通信的流
看一下這個(gè)stream的具體作用
public final void start(ClientStreamListener listener) {
transportState().setListener(listener);
if (!useGet) {
abstractClientStreamSink().writeHeaders(headers, null);
headers = null;
}
}
在Sink 里寫(xiě)入header
@Override
public void writeHeaders(Metadata headers, byte[] requestPayload) {
// Convert the headers into Netty HTTP/2 headers.
AsciiString defaultPath = (AsciiString) methodDescriptorAccessor.geRawMethodName(method);
if (defaultPath == null) {
defaultPath = new AsciiString("/" + method.getFullMethodName());
methodDescriptorAccessor.setRawMethodName(method, defaultPath);
}
boolean get = (requestPayload != null);
AsciiString httpMethod;
if (get) {
// Forge the query string
// TODO(ericgribkoff) Add the key back to the query string
defaultPath =
new AsciiString(defaultPath + "?" + BaseEncoding.base64().encode(requestPayload));
httpMethod = Utils.HTTP_GET_METHOD;
} else {
httpMethod = Utils.HTTP_METHOD;
}
Http2Headers http2Headers = Utils.convertClientHeaders(headers, scheme, defaultPath,
authority, httpMethod, userAgent);
ChannelFutureListener failureListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// Stream creation failed. Close the stream if not already closed.
// When the channel is shutdown, the lifecycle manager has a better view of the failure,
// especially before negotiation completes (because the negotiator commonly doesn't
// receive the execeptionCaught because NettyClientHandler does not propagate it).
Status s = transportState().handler.getLifecycleManager().getShutdownStatus();
if (s == null) {
s = transportState().statusFromFailedFuture(future);
}
transportState().transportReportStatus(s, true, new Metadata());
}
}
};
// Write the command requesting the creation of the stream.
writeQueue.enqueue(new CreateStreamCommand(http2Headers, transportState(), get),
!method.getType().clientSendsOneMessage() || get).addListener(failureListener);
}
主要做了兩件事
1:在header前添加上method饥臂,構(gòu)建一個(gè)http2.0 的header
2: 把這個(gè)header 寫(xiě)入寫(xiě)隊(duì)列逊躁。
后面再詳細(xì)的調(diào)用就是netty框架的寫(xiě)入執(zhí)行過(guò)程。