上篇主要介紹了server端的流程移层,這篇的關注點是Client端的流程贞让,同樣只列出核心主流程代碼。
// 調用接口
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// 執(zhí)行接口辽装,通過ClientStream 利用用transport層http2協(xié)議發(fā)送和接收消息被完成序列化和反序列化
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}
// 構建核心clientStream對象
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error){
var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
}
return newStream(ctx, func() {})
}
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
ContentSubtype: c.contentSubtype,
DoneFunc: doneFunc,
}
cs := &clientStream{
callHdr: callHdr,
...
...
}
// 構建attempt吠冤,attempt是在transport層用stream在clientStream中完成實際的sendMsg,并實現(xiàn)retry機制
cs.newAttemptLocked(sh, trInfo)
op := func(a *csAttempt) error { return a.newStream() }
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
cs.finish(err)
return nil, err
}
}
func (cs *clientStream) SendMsg(m interface{}) (err error) {
// 預處理數(shù)據(jù)秽梅, 將數(shù)據(jù)encode并壓縮得到hdr抹蚀、data為構建http 2 frame做準備
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
// 利用Retry機制,通過csAttempt實際執(zhí)行向server端發(fā)送消息
op := func(a *csAttempt) error {
err := a.sendMsg(m, hdr, payload, data)
m, data = nil, nil
return err
}
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
}
// 通過csAttempt將構建好的http2 dataFrame寫入stream中企垦,等于通過http2方式向服務器發(fā)數(shù)據(jù)况鸣。
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams})
}
// 將構建好的http2 dataFrame 寫入流中
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
h: hdr,
d: data,
}
return t.controlBuf.put(df)
}
// 利用retry機制,調用csAttempt recvMsg
func (cs *clientStream) RecvMsg(m interface{}) error {
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
}
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
}
// 實現(xiàn)數(shù)據(jù)解壓竹观,利用預定義Codec Unmarshal數(shù)據(jù)得到reply镐捧,在本例中就是HelloReply結構體
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
c.Unmarshal(d, m);
}
``
GRPC源碼實例解析(一)
http://www.reibang.com/p/8bbc6dc36859