最近項(xiàng)目要使用grpc,但是關(guān)于grpc的超時(shí)和重連這一塊很多文章都是說的不夠詳細(xì),無奈只能自己看代碼.順手記錄一下。
超時(shí)
建立連接
主要就2函數(shù)Dail和DialContext阱州。
// Dial creates a client connection to the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error){...}
DialContext 太長(zhǎng)了不帖了.看Dial實(shí)際上也是調(diào)用DialContext來實(shí)現(xiàn)的.如果你想在建立連接的時(shí)候使用超時(shí)控制.就使用DialContext傳入一個(gè)Timeout的context,就像下面的例子
ctx1, cel := context.WithTimeout(context.Background(), time.Second*3)
defer cel()
conn, err := grpc.DialContext(ctx1, address, grpc.WithBlock(), grpc.WithInsecure())
另外調(diào)用Dial建立連接默認(rèn)只是返回一個(gè)ClientConn的指針,相當(dāng)于new了一個(gè)ClientConn 把指針返回給你熔酷。并不是一定要建立真實(shí)的h2連接.至于真實(shí)的連接建立實(shí)際上是一個(gè)異步的過程珍促。當(dāng)然了如果你想等真實(shí)的鏈接完全建立再返回ClientConn可以通過WithBlock傳入Options來實(shí)現(xiàn),當(dāng)然了這樣的話鏈接如果建立不成功就會(huì)一直阻塞直到Contex超時(shí)燃乍。真正的建立鏈接的代碼后面介紹重試的時(shí)候會(huì)再詳細(xì)介紹。
調(diào)用超時(shí)
這個(gè)比較簡(jiǎn)單
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*3)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
如上代碼傳入一個(gè)timeout context就可以杈笔。
重連
假設(shè)我們想這樣一個(gè)問題,剛才我們說Dial實(shí)際上是new了一個(gè)ClientConn.真實(shí)的連接建立在另外一個(gè)協(xié)程中,那這個(gè)協(xié)程是建立連接后就退出了呢,還是還在運(yùn)行挑童。另外如果我們退出服務(wù)端然后啟動(dòng)客戶端會(huì)重新建立鏈接嗎,如果是那又是如何重試的累铅。
grpc調(diào)用的時(shí)候啟動(dòng)的協(xié)程
要回答第一個(gè)問題,很簡(jiǎn)單我們?cè)赾lient代碼中啟動(dòng)pprof看看有哪些協(xié)程在跑。
go func() {
log.Println(http.ListenAndServe("localhost:6006", nil))
}()
main.main.func1()
/Users/myonlyzzy/go/src/google.golang.org/grpc/examples/helloworld/greeter_client/main.go:41 +0x3e
created by main.main
/Users/myonlyzzy/go/src/google.golang.org/grpc/examples/helloworld/greeter_client/main.go:40 +0x47
goroutine 6 [select]:
google.golang.org/grpc.(*ccResolverWrapper).watcher(0xc4201941e0)
/Users/myonlyzzy/go/src/google.golang.org/grpc/resolver_conn_wrapper.go:110 +0x182
created by google.golang.org/grpc.(*ccResolverWrapper).start
/Users/myonlyzzy/go/src/google.golang.org/grpc/resolver_conn_wrapper.go:96 +0x3f
goroutine 7 [select]:
google.golang.org/grpc.(*ccBalancerWrapper).watcher(0xc42006e280)
/Users/myonlyzzy/go/src/google.golang.org/grpc/balancer_conn_wrappers.go:122 +0x14a
created by google.golang.org/grpc.newCCBalancerWrapper
/Users/myonlyzzy/go/src/google.golang.org/grpc/balancer_conn_wrappers.go:113 +0x14c
goroutine 8 [select]:
google.golang.org/grpc.(*addrConn).transportMonitor(0xc42019e280)
/Users/myonlyzzy/go/src/google.golang.org/grpc/clientconn.go:1240 +0x235
google.golang.org/grpc.(*addrConn).connect.func1(0xc42019e280)
/Users/myonlyzzy/go/src/google.golang.org/grpc/clientconn.go:839 +0x216
created by google.golang.org/grpc.(*addrConn).connect
/Users/myonlyzzy/go/src/google.golang.org/grpc/clientconn.go:829 +0xe1
我們看到有一個(gè)transportMonitor的協(xié)程一直阻塞在select中.代碼都在clientconn.go 中站叼。我們進(jìn)去看看其實(shí)有4個(gè)主要的方法.
func (ac *addrConn) connect() error
func (ac *addrConn) resetTransport() error
func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error)
func (ac *addrConn) transportMonitor()
connect
// Start a goroutine connecting to the server asynchronously.
go func() {
if err := ac.resetTransport(); err != nil {
log.Printf("resetTransport %v ",err)
grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
ac.transportMonitor()
}()
return nil
上面的是connect的一部分娃兽。connect會(huì)調(diào)用resetTransport來建立鏈接。再啟動(dòng)transportMonitor來監(jiān)控鏈接的情況大年。
resetTransport
for connectRetryNum := 0; ; connectRetryNum++ {
ac.mu.Lock()
if ac.backoffDeadline.IsZero() {
// This means either a successful HTTP2 connection was established
// or this is the first time this addrConn is trying to establish a
// connection.
backoffFor := ac.dopts.bs.backoff(connectRetryNum) // time.Duration.
// This will be the duration that dial gets to finish.
dialDuration := minConnectTimeout
if backoffFor > dialDuration {
// Give dial more time as we keep failing to connect.
dialDuration = backoffFor
}
start := time.Now()
backoffDeadline = start.Add(backoffFor)
connectDeadline = start.Add(dialDuration)
ridx = 0 // Start connecting from the beginning.
} else {
// Continue trying to conect with the same deadlines.
connectRetryNum = ac.connectRetryNum
backoffDeadline = ac.backoffDeadline
connectDeadline = ac.connectDeadline
ac.backoffDeadline = time.Time{}
ac.connectDeadline = time.Time{}
ac.connectRetryNum = 0
}
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
ac.printf("connecting")
if ac.state != connectivity.Connecting {
ac.state = connectivity.Connecting
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
// copy ac.addrs in case of race
addrsIter := make([]resolver.Address, len(ac.addrs))
copy(addrsIter, ac.addrs)
copts := ac.dopts.copts
ac.mu.Unlock()
connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts)
if err != nil {
return err
}
if connected {
return nil
}
}
resetTransport 主要內(nèi)容就是一個(gè)for 循環(huán),可以看到在這個(gè)for循環(huán)中會(huì)嘗試建立鏈接换薄。如果建立成功就返回一個(gè)nil。如果不成功會(huì)不斷重試下去翔试。實(shí)際上不管是開頭的Dial或者Dial完了關(guān)閉服務(wù)器后都是由這段代碼來建立真實(shí)的鏈接轻要。這也就是如果你使用withBlock 但是不使用超時(shí)的話會(huì)不斷的重試下去。中途斷掉也會(huì)不斷重聯(lián)垦缅。當(dāng)然了重連的過程中是使用了backoff算法來重連冲泥。而且默認(rèn)會(huì)在grpc的配置中有個(gè)默認(rèn)最大重試間隔時(shí)間。默認(rèn)是120.
var DefaultBackoffConfig = BackoffConfig{
MaxDelay: 120 * time.Second,
baseDelay: 1.0 * time.Second,
factor: 1.6,
jitter: 0.2,
}
transportMonitor
for {
var timer *time.Timer
var cdeadline <-chan time.Time
ac.mu.Lock()
t := ac.transport
if !ac.connectDeadline.IsZero() {
timer = time.NewTimer(ac.connectDeadline.Sub(time.Now()))
cdeadline = timer.C
}
ac.mu.Unlock()
// Block until we receive a goaway or an error occurs.
select {
case <-t.GoAway():
case <-t.Error():
case <-cdeadline:
ac.mu.Lock()
// This implies that client received server preface.
if ac.backoffDeadline.IsZero() {
ac.mu.Unlock()
continue
}
ac.mu.Unlock()
timer = nil
// No server preface received until deadline.
// Kill the connection.
grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.")
t.Close()
}
if timer != nil {
timer.Stop()
}
// If a GoAway happened, regardless of error, adjust our keepalive
// parameters as appropriate.
select {
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
default:
}
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
// Set connectivity state to TransientFailure before calling
// resetTransport. Transition READY->CONNECTING is not valid.
ac.state = connectivity.TransientFailure
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.cc.resolveNow(resolver.ResolveNowOption{})
ac.curAddr = resolver.Address{}
ac.mu.Unlock()
if err := ac.resetTransport(); err != nil {
ac.mu.Lock()
ac.printf("transport exiting: %v", err)
ac.mu.Unlock()
grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
return
}
}
monitor也是運(yùn)行一個(gè)for 循環(huán)如果連接斷開就調(diào)用resetTransport重試壁涎。
其實(shí)我們使用etcdclient的時(shí)候的經(jīng)常要使用一個(gè)DialTimeout參數(shù)其實(shí)那個(gè)參數(shù)就是用來生成一個(gè)TimeOut的Context.用來控制建立鏈接的超時(shí)凡恍。