golang里面最火的應(yīng)該就是微服務(wù)了, 所以最近研究了下grpc
先附上我的demo代碼:
https://github.com/Dragon-Zpl/gprc-grpc-gateway-etcdorconsul
記錄下自己在看源碼了解到的部分東西:
首先如果需要用到第三方的服務(wù)注冊中心的話谴麦,需要去調(diào)用“google.golang.org/grpc/resolver”該包下的resolver.Register,將自己實現(xiàn)了(Build,Scheme,ResolveNow,Close)的結(jié)構(gòu)體注冊進(jìn)去,以下是我監(jiān)聽服務(wù)改變的函數(shù)以下是使用consul,但沒使用他的服務(wù)注冊而是使用consul的key/value結(jié)構(gòu)簡單的模擬下:
// 監(jiān)聽剿吻,沒使用consul里面的服務(wù)注冊只使用到了key/value模仿,如使用服務(wù)的話可使用cc.consulClient.Health().severice獲取所有注冊的服務(wù)信息,并檢查健康狀態(tài)
func (cr *consulResolver) Watcher() {
cr.wg.Add(1)
go func() {
defer cr.wg.Done()
t := time.NewTimer(10 * time.Second)
for {
select {
case <- t.C:
datas := consul.GetConsulDirData(cr.serverName)
address := make([]resolver.Address, 0)
for _, data := range datas {
address = append(address, resolver.Address{
Addr: data.Ip + ":" + data.Port,
})
}
// 此方法是關(guān)鍵
cr.cc.UpdateState(resolver.State{Addresses:address})
t.Reset(10 * time.Second)
case <- cr.closeCh:
return
}
}
}()
}
負(fù)載的方法重寫:
//gateway啟動時會把watch傳入的address在這里存儲起來
func (*roundRobinPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs)
if len(readySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
var scs []balancer.SubConn
for _, sc := range readySCs {
scs = append(scs, sc)
}
return &roundRobinPicker{
subConns: scs,
next: rand.Intn(len(scs)),
}
}
// 每次有請求過來就會調(diào)用該方法獲取對應(yīng)的服務(wù)的鏈接 采用輪詢
func (p *roundRobinPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
p.mu.Lock()
sc := p.subConns[p.next]
p.next = (p.next + 1) % len(p.subConns)
p.mu.Unlock()
return sc, nil, nil
}
gateway 啟動
gwmux := runtime.NewServeMux()
ctx := context.Background()
RegisterConsul(serverName)
// 與服務(wù)端建立起鏈接
err := pb_test.RegisterMyTestHandlerFromEndpoint(ctx, gwmux, "consul:///", []grpc.DialOption{grpc.WithInsecure(), grpc.WithBalancerName(RoundRobin)})
if err != nil {
panic(err)
}
err = http.ListenAndServe(":8070", gwmux)
if err != nil {
panic(err)
}
其中我遇到的一個一開始一直困惑我的問題, 為什么RegisterMyTestHandlerFromEndpoint的第三個參數(shù)endpoint要使用consul:///",我看了下源碼
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range opts {
opt.apply(&cc.dopts)
}
chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)
defer func() {
if err != nil {
cc.Close()
}
}()
if channelz.IsOn() {
if cc.dopts.channelzParentID != 0 {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtINFO,
Parent: &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
Severity: channelz.CtINFO,
},
})
} else {
cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
Desc: "Channel Created",
Severity: channelz.CtINFO,
})
}
cc.csMgr.channelzID = cc.channelzID
}
if !cc.dopts.insecure {
if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
return nil, errNoTransportSecurity
}
if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
return nil, errTransportCredsAndBundle
}
} else {
if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
return nil, errCredentialsConflict
}
for _, cd := range cc.dopts.copts.PerRPCCredentials {
if cd.RequireTransportSecurity() {
return nil, errTransportCredentialsMissing
}
}
}
if cc.dopts.defaultServiceConfigRawJSON != nil {
sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
if err != nil {
return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err)
}
cc.dopts.defaultServiceConfig = sc
}
cc.mkp = cc.dopts.copts.KeepaliveParams
if cc.dopts.copts.Dialer == nil {
cc.dopts.copts.Dialer = newProxyDialer(
func(ctx context.Context, addr string) (net.Conn, error) {
network, addr := parseDialTarget(addr)
return (&net.Dialer{}).DialContext(ctx, network, addr)
},
)
}
if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {
cc.dopts.copts.UserAgent = grpcUA
}
if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
defer cancel()
}
defer func() {
select {
case <-ctx.Done():
conn, err = nil, ctx.Err()
default:
}
}()
scSet := false
if cc.dopts.scChan != nil {
// Try to get an initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
scSet = true
}
default:
}
}
if cc.dopts.bs == nil {
cc.dopts.bs = backoff.Exponential{
MaxDelay: DefaultBackoffConfig.MaxDelay,
}
}
if cc.dopts.resolverBuilder == nil {
// Only try to parse target when resolver builder is not already set.
cc.parsedTarget = parseTarget(cc.target) //此方法是關(guān)鍵
grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
if cc.dopts.resolverBuilder == nil {
// If resolver builder is still nil, the parsed target's scheme is
// not registered. Fallback to default resolver and set Endpoint to
// the original target.
grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
cc.parsedTarget = resolver.Target{
Scheme: resolver.GetDefaultScheme(),
Endpoint: target,
}
cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
}
} else {
cc.parsedTarget = resolver.Target{Endpoint: target}
}
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
} else if cc.dopts.insecure && cc.dopts.authority != "" {
cc.authority = cc.dopts.authority
} else {
// Use endpoint from "scheme://authority/endpoint" as the default
// authority for ClientConn.
cc.authority = cc.parsedTarget.Endpoint
}
if cc.dopts.scChan != nil && !scSet {
// Blocking wait for the initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = &sc
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
if cc.dopts.scChan != nil {
go cc.scWatcher()
}
var credsClone credentials.TransportCredentials
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
cc.balancerBuildOpts = balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
}
// Build the resolver.
rWrapper, err := newCCResolverWrapper(cc)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
cc.mu.Lock()
cc.resolverWrapper = rWrapper
cc.mu.Unlock()
// A blocking dial blocks until the clientConn is ready.
if cc.dopts.block {
for {
s := cc.GetState()
if s == connectivity.Ready {
break
} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
if err = cc.blockingpicker.connectionError(); err != nil {
terr, ok := err.(interface {
Temporary() bool
})
if ok && !terr.Temporary() {
return nil, err
}
}
}
if !cc.WaitForStateChange(ctx, s) {
// ctx got timeout or canceled.
return nil, ctx.Err()
}
}
}
return cc, nil
}
其中的parseTarget是關(guān)鍵,他對這個target進(jìn)行了切割獲取到Scheme,而該方法下根據(jù)scheme去獲取resolverBuilder:cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme),其實這個就是我們在開啟gateway時調(diào)用RegisterConsul會將我們自定義的服務(wù)注冊的Builder(自定義的結(jié)構(gòu)體)賦到map[string]Builder這樣的map中凌彬,而相應(yīng)的負(fù)載方法我們重寫后也是注冊到一個map中,在創(chuàng)建服務(wù)端連接的時候就會從中獲取.
func parseTarget(target string) (ret resolver.Target) {
var ok bool
ret.Scheme, ret.Endpoint, ok = split2(target, "://")
if !ok {
return resolver.Target{Endpoint: target}
}
ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
if !ok {
return resolver.Target{Endpoint: target}
}
return ret
}
寫的可能有點亂窑滞,可參考此文章:https://segmentfault.com/a/1190000018424798?utm_source=tag-newest