參考:https://github.com/liangzhiyang/annotate-grpc-go
正常啟動一個grpcClient連接如下:
func main() {
// Set up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
name := defaultName
if len(os.Args) > 1 {
name = os.Args[1]
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.Message)
}
1.grpc.Dial
// Dial creates a client connection to the given target.
// targe->server地址 opts->DialOption
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
DialOption參數(shù),grpcClient連接時傳入
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
unaryInt UnaryClientInterceptor // UnaryClientInterceptor intercepts the execution of a unary RPC on the client.
streamInt StreamClientInterceptor // StreamClientInterceptor intercepts the creation of ClientStream
codec Codec //編碼方式锋恬,默認(rèn)是protoCodec
cp Compressor // Compressor defines the interface gRPC uses to compress a message.
dc Decompressor // Decompressor defines the interface gRPC uses to decompress a message.
bs backoffStrategy //backoff重試策略
balancer Balancer //負(fù)載均衡柬采,自帶的RoundRobin就會返回一個輪訓(xùn)策略的對象
block bool // 設(shè)置為true則如果集群有多個address曾棕,grpc連接是阻塞的,一直等到所有連接成功
insecure bool //是否需要安全驗證
timeout time.Duration // 超時時間
copts transport.ConnectOptions // // ConnectOptions covers all relevant options for communicating with the server.
scChan <-chan ServiceConfig //通過WithServiceConfig設(shè)置超全,可以異步設(shè)置 提供配置負(fù)債均衡的方式及service里的methods,如下:
type ServiceConfig struct {
// LB is the load balancer the service providers recommends. The balancer specified
// via grpc.WithBalancer will override this.
LB Balancer
// Methods contains a map for the methods in this service.
Methods map[string]MethodConfig
}
}
2.DialContext
// DialContext creates a client connection to the given target. ctx can be used to
// cancel or expire the pending connecting. Once this function returns, the
// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
// to terminate all the pending operations after this function returns.
// This is the EXPERIMENTAL API.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
conns: make(map[Address]*addrConn),
}
//這里設(shè)置取消的context,可以調(diào)用cc.cancel 主動中斷dial
cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range opts {
opt(&cc.dopts)//初始化所有參數(shù)
}
if cc.dopts.timeout > 0 {//這個值可以通過 WithTimeout 設(shè)置靖苇;
var cancel context.CancelFunc
//這個方法里面會 在timeout 時間之后,close chan班缰,這樣Done() 就可以讀了
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) // 設(shè)置定時的context
defer cancel() //這一步是用來及時釋放資源贤壁,里面調(diào)用了ctx.cancel,關(guān)閉了chan(Done()會返回的那個),
}
defer func() {
select {
case <-ctx.Done()://收到這個表示 前面WithTimeout設(shè)置了超時 并且ctx過期了
conn, err = nil, ctx.Err()
default:
}
if err != nil {
cc.Close()
}
}()
//通過WithServiceConfig 設(shè)置埠忘, 可以異步的 設(shè)置service config
if cc.dopts.scChan != nil {
// Wait for the initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = sc
}
case <-ctx.Done(): //同上
return nil, ctx.Err()
}
}
// Set defaults.
if cc.dopts.codec == nil {
cc.dopts.codec = protoCodec{}//默認(rèn)編碼
}
if cc.dopts.bs == nil {
cc.dopts.bs = DefaultBackoffConfig //默認(rèn)的backoff重試策略,
}
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
} else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
cc.authority = cc.dopts.copts.Authority
} else {
colonPos := strings.LastIndex(target, ":")
if colonPos == -1 {
colonPos = len(target)
}
cc.authority = target[:colonPos]
}
var ok bool
waitC := make(chan error, 1)
//單獨goroutine執(zhí)行脾拆,里面有錯誤會寫到waitC 里馒索, 用來獲取集群服務(wù)的所有地址,并建立連接
//雖然go出去了名船,但是還是要等待這個goroutine執(zhí)行結(jié)束绰上,是阻塞的
go func() {
var addrs []Address
//負(fù)載均衡的配置
//cc.dopts.balancer和cc.sc.LB都是Balancer接口,分別通過WithBalancer 和 通過WithServiceConfig 設(shè)置渠驼,前者會覆蓋后者
if cc.dopts.balancer == nil && cc.sc.LB != nil {
cc.dopts.balancer = cc.sc.LB
}
if cc.dopts.balancer == nil {
// Connect to target directly if balancer is nil.
//如果沒有設(shè)置balancer蜈块,地址列表里面就只有target這一個地址
addrs = append(addrs, Address{Addr: target})
} else {
var credsClone credentials.TransportCredentials
if creds != nil {
credsClone = creds.Clone()
}
config := BalancerConfig{
DialCreds: credsClone,
}
//balancer(etcd等)的初始化
if err := cc.dopts.balancer.Start(target, config); err != nil {
waitC <- err
return
}
//這里會 返回一個chan,元素是每一次地址更新后的所有地址(是全量迷扇,不是增量)
ch := cc.dopts.balancer.Notify()
if ch == nil {
// There is no name resolver installed.
addrs = append(addrs, Address{Addr: target})
} else {
addrs, ok = <-ch//ok 表示chan是否關(guān)閉疯趟,如果關(guān)閉了就不需要lbWatcher了(監(jiān)控地址改動)
if !ok || len(addrs) == 0 {
waitC <- errNoAddr //沒有從balance找到有效的地址
return
}
}
}
//對于每一個地址 建立連接;
// 如果調(diào)用了WithBlock,則這一步是阻塞的谋梭,一直等到所有連接成功(注:建議不要這樣信峻,除非你知道你在干什么)
// 否則里面是通過goroutine異步處理的,不會等待所有的連接成功
for _, a := range addrs {
if err := cc.resetAddrConn(a, false, nil); err != nil {
waitC <- err
return
}
}
close(waitC) //關(guān)閉waitC瓮床,這樣會讀取到err=nil
}()
select {
case <-ctx.Done()://同上
return nil, ctx.Err()
case err := <-waitC://這一步會等待上一個goroutine結(jié)束
if err != nil {
return nil, err
}
}
// If balancer is nil or balancer.Notify() is nil, ok will be false here.
// The lbWatcher goroutine will not be created.
if ok {//只有采用balancer(etcd等)的才會走到這一步盹舞,監(jiān)控服務(wù)集群地址的變化
go cc.lbWatcher()
}
if cc.dopts.scChan != nil {
go cc.scWatcher()//監(jiān)控ServiceConfig的變化,這樣可以在dial之后動態(tài)的修改client的訪問服務(wù)的配置ServiceConfig
}
return cc, nil
}
3.resetAddrConn
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
func (cc *ClientConn) resetAddrConn(addr Address, skipWait bool, tearDownErr error) error {
ac := &addrConn{
cc: cc,
addr: addr,
dopts: cc.dopts,
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
ac.stateCV = sync.NewCond(&ac.mu)
if EnableTracing {
ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
}
if !ac.dopts.insecure {
if ac.dopts.copts.TransportCredentials == nil {
return errNoTransportSecurity
}
} else {
if ac.dopts.copts.TransportCredentials != nil {
return errCredentialsConflict
}
for _, cd := range ac.dopts.copts.PerRPCCredentials {
if cd.RequireTransportSecurity() {
return errTransportCredentialsMissing
}
}
}
// Track ac in cc. This needs to be done before any getTransport(...) is called.
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return ErrClientConnClosing
}
stale := cc.conns[ac.addr] //獲取舊的addrConn隘庄,可能沒有踢步,就是nil
cc.conns[ac.addr] = ac //用新的addrConn 替換
cc.mu.Unlock()
if stale != nil {
//已經(jīng)存在一個舊的addrConn,需要關(guān)閉丑掺,有兩種可能
//1. balancer(etcd等) 存在bug获印,返回了重復(fù)的地址~~O(∩_∩)O哈哈~完美甩鍋
//2. 舊的ac收到http2 的goaway(表示服務(wù)端不接受新的請求了,但是已有的請求要繼續(xù)處理完)街州,這里又新建一個兼丰,?~在transportMonitor里
// There is an addrConn alive on ac.addr already. This could be due to
// 1) a buggy Balancer notifies duplicated Addresses;
// 2) goaway was received, a new ac will replace the old ac.
// The old ac should be deleted from cc.conns, but the
// underlying transport should drain rather than close.
if tearDownErr == nil {
// tearDownErr is nil if resetAddrConn is called by
// 1) Dial
// 2) lbWatcher
// In both cases, the stale ac should drain, not close.
stale.tearDown(errConnDrain)
//errConnDrain不會馬上close transport唆缴,則是會先stop accepting new RPCs and wait the completion of the pending RPCs
} else {
stale.tearDown(tearDownErr)
}
}
//通過WithBlock設(shè)置為true后鳍征,這里會阻塞,直到所有連接成功面徽;
// skipWait may overwrite the decision in ac.dopts.block.
if ac.dopts.block && !skipWait {
if err := ac.resetTransport(false); err != nil {
if err != errConnClosing { //如果有錯 且不是errConnClosing(表示已經(jīng)關(guān)閉)
// Tear down ac and delete it from cc.conns.
cc.mu.Lock()
delete(cc.conns, ac.addr) //從cc.conns 刪除艳丛,不在維護
cc.mu.Unlock()
ac.tearDown(err)// 關(guān)閉
}
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
return e.Origin()
}
return err
}
// Start to monitor the error status of transport.
go ac.transportMonitor()
} else {//這里不會阻塞,異步的建立連接
// Start a goroutine connecting to the server asynchronously.
go func() {
if err := ac.resetTransport(false); err != nil {
grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err) //關(guān)閉趟紊,(和上面相比)但是不從cc.conns 刪除氮双,為了方便得到tearDownErr reason
}
return
}
ac.transportMonitor()
}()
}
return nil
}
tearDown方法(處理返回的address已存在的情況)
// tearDown starts to tear down the addrConn.
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
// some edge cases (e.g., the caller opens and closes many addrConn's in a
// tight loop.
// tearDown doesn't remove ac from ac.cc.conns.
func (ac *addrConn) tearDown(err error) {
ac.cancel() //執(zhí)行這個之后ac.ctx.Done() 會收到消息
ac.mu.Lock()
defer ac.mu.Unlock()
if ac.down != nil {//通知balancer 這個地址 down了
ac.down(downErrorf(false, false, "%v", err)) // // the handler called when a connection is down.
ac.down = nil
}
if err == errConnDrain && ac.transport != nil {
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
// ii) there are concurrent name resolver/Balancer triggered
// address removal and GoAway.
ac.transport.GracefulClose()
}
if ac.state == Shutdown {
return
}
ac.state = Shutdown
ac.tearDownErr = err
ac.stateCV.Broadcast()
if ac.events != nil {
ac.events.Finish()
ac.events = nil
}
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
if ac.transport != nil && err != errConnDrain {
ac.transport.Close()
}
return
}
4.transportMonitor
// Run in a goroutine to track the error in transport and create the
// new transport if an error happens. It returns when the channel is closing.
func (ac *addrConn) transportMonitor() {
for {
ac.mu.Lock()
t := ac.transport
ac.mu.Unlock()
select {
// This is needed to detect the teardown when
// the addrConn is idle (i.e., no RPC in flight).
case <-ac.ctx.Done(): //這一步 表示ac.teardown 了,不需要維護了霎匈,return掉
select {
case <-t.Error():
t.Close()
default:
}
return
case <-t.GoAway():// 這一步戴差,會resetAddrConn,里面會新建一個transportMonitor唧躲,這里就不需要維護了造挽,retrrn掉
// If GoAway happens without any network I/O error, ac is closed without shutting down the
// underlying transport (the transport will be closed when all the pending RPCs finished or
// failed.).
// If GoAway and some network I/O error happen concurrently, ac and its underlying transport
// are closed.
// In both cases, a new ac is created.
select {
case <-t.Error():
ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
default:
ac.cc.resetAddrConn(ac.addr, true, errConnDrain)
}
return
case <-t.Error()://這里是transport 有錯碱璃,
select {
case <-ac.ctx.Done()://這一步 表示ac.teardown(比如ctx 被cancel的情況)不需要維護了,return掉
t.Close()
return
case <-t.GoAway()://同上
ac.cc.resetAddrConn(ac.addr, true, errNetworkIO)
return
default: //如果有錯饭入,走到這里~~沒有return嵌器, 下面會重連
}
ac.mu.Lock()
if ac.state == Shutdown { //不是通過ac.teardown的(這種情況會走到上面),但是還Shutdown的了谐丢,什么情況呢爽航??乾忱?
// ac has been shutdown.
ac.mu.Unlock()
return
}
ac.state = TransientFailure //置為臨時失敗讥珍,暫時不讓用
ac.stateCV.Broadcast()
ac.mu.Unlock()
if err := ac.resetTransport(true); err != nil {
ac.mu.Lock()
ac.printf("transport exiting: %v", err)
ac.mu.Unlock()
grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
}
}
}
5.resetTransport
func (ac *addrConn) resetTransport(closeTransport bool) error {
for retries := 0; ; retries++ { //一直循重試建立連接直到成功,除非某些條件下返回
ac.mu.Lock()
ac.printf("connecting")
if ac.state == Shutdown {
//Shutdown狀態(tài)表示這個連接已經(jīng)關(guān)閉窄瘟,不需要維護了衷佃,通常服務(wù)先掛了,然后balancer(etcd等)中這個地址又被移除了的情況會走到這蹄葱,直接返回
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
}
if ac.down != nil {
//一般通過存balancer(etcd等)的UP 方法返回的氏义,
// 如果存在,這里就通知balancer 這個地址連接不ok了图云,不要用了惯悠,其實是因為接下里要reset,O(∩_∩)O哈哈~
ac.down(downErrorf(false, true, "%v", errNetworkIO))
ac.down = nil
}
ac.state = Connecting //狀態(tài)改為正在連接中~~
ac.stateCV.Broadcast() //lzy todo
t := ac.transport
ac.mu.Unlock()
if closeTransport && t != nil { //舊的要關(guān)閉
t.Close()
}
//這里是根據(jù)重試的超時策略竣况,返回兩次重試的間隔時間克婶;即如果這次重連還是失敗,會等待sleepTime才會進入下一次循環(huán)
//retries越大丹泉,sleepTime越大
sleepTime := ac.dopts.bs.backoff(retries)
timeout := minConnectTimeout
if timeout < sleepTime {
timeout = sleepTime
}
//設(shè)置超時時間情萤,最小20s
//注:為啥下面err==nil的時候cancel沒有執(zhí)行(官方推薦的是立馬defer cancel()) bug?嘀掸?紫岩??睬塌?
//已經(jīng)提了個issue https://github.com/grpc/grpc-go/issues/1099
ctx, cancel := context.WithTimeout(ac.ctx, timeout)
connectTime := time.Now()
sinfo := transport.TargetInfo{
Addr: ac.addr.Addr,
Metadata: ac.addr.Metadata,
}
newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
if err != nil {
cancel()
//如果不是臨時錯誤,立馬返回歇万;否則接下里會重試
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
return err
}
grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
ac.mu.Lock()
if ac.state == Shutdown { //同上揩晴,即這個地址不需要了
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
}
ac.errorf("transient failure: %v", err)
ac.state = TransientFailure //狀態(tài)改為短暫的失敗
ac.stateCV.Broadcast()
if ac.ready != nil { //只有進入ac.wait()才會走入這個邏輯,表示有一個請求正在等待這個地址的連接是成功還是失敗
close(ac.ready) //建立連接失敗了 關(guān)閉ready
ac.ready = nil
}
ac.mu.Unlock()
closeTransport = false
select {
case <-time.After(sleepTime - time.Since(connectTime)): //一直等待足夠sleepTime長時間贪磺,再進入下一次循環(huán)
case <-ac.ctx.Done(): //這個連接是被cancel掉(超時或者主動cancel)
return ac.ctx.Err()
}
continue
}
ac.mu.Lock()
ac.printf("ready")
if ac.state == Shutdown { //同上硫兰,所以這個時候要把已經(jīng)建立的連接close,手動從etcd中刪除這個地址會走到這
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
newTransport.Close()
return errConnClosing
}
ac.state = Ready //狀態(tài)ok了
ac.stateCV.Broadcast()
ac.transport = newTransport
if ac.ready != nil { //只有進入ac.wait()才會走入這個邏輯寒锚,表示有一個請求正在等待這個地址的連接是成功還是失敗
close(ac.ready) //建立連接成功了關(guān)閉ready
ac.ready = nil
}
//如果存在balancer(etcd等)就通知balancer 這個地址連接ok了劫映,可以用了
if ac.cc.dopts.balancer != nil {
ac.down = ac.cc.dopts.balancer.Up(ac.addr)
}
ac.mu.Unlock()
return nil
}
}
以下基于默認(rèn)配置情況下(還有其它沒有提到的配置都取默認(rèn)值):
設(shè)置了balancer(etcd等)
沒有設(shè)置WithBlock违孝,即dialOptions.block = false
沒有設(shè)置FailOnNonTempDialError,即dialOptions.copts.FailOnNonTempDialError = false
grpc.Dial 正常的執(zhí)行流程泳赋,第一次進入的時候的有些邏輯是走不到或者不太重要的都舍去不表
A. grpc.Dial() 返回一個*ClientConn
從balancer(etcd等)返回一批地址雌桑,但是這批地址暫時還是不能用的,需要等待A225
A2祖今,對于每一個地址依次建立連接校坑,循環(huán)調(diào)用cc.resetAddrConn, 即
A3,單獨goroutine監(jiān)控balancer(etcd等)的變化(cc.lbWatcher())千诬,實時更新服務(wù)集群地址耍目,即
單獨goroutine監(jiān)控監(jiān)控ServiceConfig的變化(cc.scWatcher),可以在服務(wù)啟動后動態(tài)更新調(diào)用服務(wù)的配置
A2,cc.resetAddrConn針對一個地址建立連接徐绑,創(chuàng)建一個addrConn加入到ClientConn.conns中去邪驮,主流程分為:
如果這個地址已經(jīng)存在連接了,先關(guān)閉掉ac.teardown
A22傲茄,ac.resetTransport毅访,建立一個底層連接(http2),這一步默認(rèn)是goroutine出去的烫幕,不會阻塞俺抽,除非調(diào)用WithBlock;
A23较曼,單獨goroutine監(jiān)控底層連接的狀態(tài)變化(ac.transportMonitor),進行重連或者放棄
A22, ac.resetTransport里面是一個大循環(huán)磷斧,重試建立連接直到成功,除非某些條件下返回(默認(rèn)情況下只有被ac.teardown了捷犹,即在balance中刪除)弛饭,每個循環(huán)里面:
如果ac.state == Shutdown ,直接返回
將狀態(tài)改為正在連接中~~ ac.state == Connecting
計算sleepTime萍歉,這里是根據(jù)重試的超時策略侣颂,返回兩次重試的間隔時間;即如果這次重連還是失敗枪孩,會等待sleepTime才會進入下一次循環(huán)
A224憔晒,建立一個底層的http2連接(transport.NewClientTransport);如果是臨時錯誤, 將狀態(tài)改為短暫的失敗蔑舞,ac.state = TransientFailure拒担,等待sleepTime;如果是非臨時錯誤攻询,直接返回从撼,默認(rèn)情況下可以認(rèn)為都是臨時錯誤;
將狀態(tài)改為readyac.state = Ready 钧栖,通知balancer(etcd等)這個地址連接ok了(up)低零;這樣下次就能從balancer 中讀取到這個地址了
A224, 建立一個底層的http2連接(newHTTP2Client)
dial一個tcp連接婆翔,失敗的話,默認(rèn)返回一個臨時錯誤
單獨goroutine掏婶,循環(huán)的讀取所有的幀啃奴,并且分發(fā)到相應(yīng)的流中去,如果有錯誤了气堕,會有通知到A233
初始化http2 相關(guān)的操作(發(fā)送setting幀等)
A23纺腊,transportMonitor 是一個單獨的goroutine,里面是一個循環(huán),會監(jiān)控這個連接以下幾種情況:
如果這個連接被ac.teardown了茎芭,直接退出揖膜,不需要維護了
如果收到http2的goaway幀,再重新cc.resetAddrConn梅桩,即A2,然后當(dāng)前直接退出壹粟;相當(dāng)于用一個新的連接來替換
如果這個連接出錯了,置為臨時失敗ac.state = TransientFailure 宿百,暫時不讓用趁仙,然后重試連接ac.resetTransport,即A22
A3垦页,cc.lbWatcher監(jiān)控balancer(etcd等)的變化 , 實時更新集群服務(wù)地址
balancer.Notify() 是一個channel雀费,每當(dāng)有更新的時候,從這里讀取到所有的地址(全量而非增量)
判斷有哪些地址是新增的痊焊,哪些地址是刪除掉的
對于新增的地址執(zhí)行cc.resetAddrConn盏袄,即A2
對于刪掉的地址直接ac.tearDown,通知balancer(etcd等)這個地址down了, 這樣可能會影響到A231薄啥,A221
對于ac.tearDown辕羽,里面會關(guān)閉底層的連接,修改狀態(tài)為ac.state == Shutdown垄惧,然后通知balancer(etcd等)這個地址down了刁愿,在下次輪詢的時候,就不會有這個地址了到逊;
如果這個balancer(etcd等)收到這個地址的UP的通知铣口,表示這個地址又OK了