grpc+gprc-gateway+consul or etcd

golang里面最火的應(yīng)該就是微服務(wù)了, 所以最近研究了下grpc

先附上我的demo代碼:
https://github.com/Dragon-Zpl/gprc-grpc-gateway-etcdorconsul

記錄下自己在看源碼了解到的部分東西:

首先如果需要用到第三方的服務(wù)注冊中心的話谴麦,需要去調(diào)用“google.golang.org/grpc/resolver”該包下的resolver.Register,將自己實現(xiàn)了(Build,Scheme,ResolveNow,Close)的結(jié)構(gòu)體注冊進(jìn)去,以下是我監(jiān)聽服務(wù)改變的函數(shù)以下是使用consul,但沒使用他的服務(wù)注冊而是使用consul的key/value結(jié)構(gòu)簡單的模擬下:

// 監(jiān)聽剿吻,沒使用consul里面的服務(wù)注冊只使用到了key/value模仿,如使用服務(wù)的話可使用cc.consulClient.Health().severice獲取所有注冊的服務(wù)信息,并檢查健康狀態(tài)
func (cr *consulResolver) Watcher() {
    cr.wg.Add(1)
    go func() {
        defer cr.wg.Done()
        t := time.NewTimer(10 * time.Second)
        for   {
            select {
            case <- t.C:
                datas := consul.GetConsulDirData(cr.serverName)
                address := make([]resolver.Address, 0)
                for _, data := range datas {
                    address = append(address, resolver.Address{
                        Addr:       data.Ip + ":" + data.Port,
                    })
                } 
                                // 此方法是關(guān)鍵
                cr.cc.UpdateState(resolver.State{Addresses:address})
                t.Reset(10 * time.Second)
            case <- cr.closeCh:
                return
            }
        }
    }()
}

負(fù)載的方法重寫:

//gateway啟動時會把watch傳入的address在這里存儲起來
func (*roundRobinPickerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
    grpclog.Infof("roundrobinPicker: newPicker called with readySCs: %v", readySCs)
        
    if len(readySCs) == 0 {
        return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
    }
    var scs []balancer.SubConn
    for _, sc := range readySCs {
        scs = append(scs, sc)
    }

    return &roundRobinPicker{
        subConns: scs,
        next:     rand.Intn(len(scs)),
    }
}

// 每次有請求過來就會調(diào)用該方法獲取對應(yīng)的服務(wù)的鏈接 采用輪詢
func (p *roundRobinPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
    p.mu.Lock()
    sc := p.subConns[p.next]
    p.next = (p.next + 1) % len(p.subConns)
    p.mu.Unlock()
    return sc, nil, nil
}

gateway 啟動

    gwmux := runtime.NewServeMux()
    ctx := context.Background()
    RegisterConsul(serverName)
        // 與服務(wù)端建立起鏈接
    err := pb_test.RegisterMyTestHandlerFromEndpoint(ctx, gwmux, "consul:///", []grpc.DialOption{grpc.WithInsecure(), grpc.WithBalancerName(RoundRobin)})
    if err != nil {
        panic(err)
    }

    err = http.ListenAndServe(":8070", gwmux)
    if err != nil {
        panic(err)
    }

其中我遇到的一個一開始一直困惑我的問題, 為什么RegisterMyTestHandlerFromEndpoint的第三個參數(shù)endpoint要使用consul:///",我看了下源碼

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    cc := &ClientConn{
        target:            target,
        csMgr:             &connectivityStateManager{},
        conns:             make(map[*addrConn]struct{}),
        dopts:             defaultDialOptions(),
        blockingpicker:    newPickerWrapper(),
        czData:            new(channelzData),
        firstResolveEvent: grpcsync.NewEvent(),
    }
    cc.retryThrottler.Store((*retryThrottler)(nil))
    cc.ctx, cc.cancel = context.WithCancel(context.Background())

    for _, opt := range opts {
        opt.apply(&cc.dopts)
    }

    chainUnaryClientInterceptors(cc)
    chainStreamClientInterceptors(cc)

    defer func() {
        if err != nil {
            cc.Close()
        }
    }()

    if channelz.IsOn() {
        if cc.dopts.channelzParentID != 0 {
            cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
            channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
                Desc:     "Channel Created",
                Severity: channelz.CtINFO,
                Parent: &channelz.TraceEventDesc{
                    Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
                    Severity: channelz.CtINFO,
                },
            })
        } else {
            cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
            channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
                Desc:     "Channel Created",
                Severity: channelz.CtINFO,
            })
        }
        cc.csMgr.channelzID = cc.channelzID
    }

    if !cc.dopts.insecure {
        if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
            return nil, errNoTransportSecurity
        }
        if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
            return nil, errTransportCredsAndBundle
        }
    } else {
        if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
            return nil, errCredentialsConflict
        }
        for _, cd := range cc.dopts.copts.PerRPCCredentials {
            if cd.RequireTransportSecurity() {
                return nil, errTransportCredentialsMissing
            }
        }
    }

    if cc.dopts.defaultServiceConfigRawJSON != nil {
        sc, err := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
        if err != nil {
            return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, err)
        }
        cc.dopts.defaultServiceConfig = sc
    }
    cc.mkp = cc.dopts.copts.KeepaliveParams

    if cc.dopts.copts.Dialer == nil {
        cc.dopts.copts.Dialer = newProxyDialer(
            func(ctx context.Context, addr string) (net.Conn, error) {
                network, addr := parseDialTarget(addr)
                return (&net.Dialer{}).DialContext(ctx, network, addr)
            },
        )
    }

    if cc.dopts.copts.UserAgent != "" {
        cc.dopts.copts.UserAgent += " " + grpcUA
    } else {
        cc.dopts.copts.UserAgent = grpcUA
    }

    if cc.dopts.timeout > 0 {
        var cancel context.CancelFunc
        ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
        defer cancel()
    }
    defer func() {
        select {
        case <-ctx.Done():
            conn, err = nil, ctx.Err()
        default:
        }
    }()

    scSet := false
    if cc.dopts.scChan != nil {
        // Try to get an initial service config.
        select {
        case sc, ok := <-cc.dopts.scChan:
            if ok {
                cc.sc = &sc
                scSet = true
            }
        default:
        }
    }
    if cc.dopts.bs == nil {
        cc.dopts.bs = backoff.Exponential{
            MaxDelay: DefaultBackoffConfig.MaxDelay,
        }
    }
    if cc.dopts.resolverBuilder == nil {
        // Only try to parse target when resolver builder is not already set.
        cc.parsedTarget = parseTarget(cc.target) //此方法是關(guān)鍵
        grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
        cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
        if cc.dopts.resolverBuilder == nil {
            // If resolver builder is still nil, the parsed target's scheme is
            // not registered. Fallback to default resolver and set Endpoint to
            // the original target.
            grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
            cc.parsedTarget = resolver.Target{
                Scheme:   resolver.GetDefaultScheme(),
                Endpoint: target,
            }
            cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
        }
    } else {
        cc.parsedTarget = resolver.Target{Endpoint: target}
    }
    creds := cc.dopts.copts.TransportCredentials
    if creds != nil && creds.Info().ServerName != "" {
        cc.authority = creds.Info().ServerName
    } else if cc.dopts.insecure && cc.dopts.authority != "" {
        cc.authority = cc.dopts.authority
    } else {
        // Use endpoint from "scheme://authority/endpoint" as the default
        // authority for ClientConn.
        cc.authority = cc.parsedTarget.Endpoint
    }

    if cc.dopts.scChan != nil && !scSet {
        // Blocking wait for the initial service config.
        select {
        case sc, ok := <-cc.dopts.scChan:
            if ok {
                cc.sc = &sc
            }
        case <-ctx.Done():
            return nil, ctx.Err()
        }
    }
    if cc.dopts.scChan != nil {
        go cc.scWatcher()
    }

    var credsClone credentials.TransportCredentials
    if creds := cc.dopts.copts.TransportCredentials; creds != nil {
        credsClone = creds.Clone()
    }
    cc.balancerBuildOpts = balancer.BuildOptions{
        DialCreds:        credsClone,
        CredsBundle:      cc.dopts.copts.CredsBundle,
        Dialer:           cc.dopts.copts.Dialer,
        ChannelzParentID: cc.channelzID,
        Target:           cc.parsedTarget,
    }

    // Build the resolver.
    rWrapper, err := newCCResolverWrapper(cc)
    if err != nil {
        return nil, fmt.Errorf("failed to build resolver: %v", err)
    }

    cc.mu.Lock()
    cc.resolverWrapper = rWrapper
    cc.mu.Unlock()
    // A blocking dial blocks until the clientConn is ready.
    if cc.dopts.block {
        for {
            s := cc.GetState()
            if s == connectivity.Ready {
                break
            } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
                if err = cc.blockingpicker.connectionError(); err != nil {
                    terr, ok := err.(interface {
                        Temporary() bool
                    })
                    if ok && !terr.Temporary() {
                        return nil, err
                    }
                }
            }
            if !cc.WaitForStateChange(ctx, s) {
                // ctx got timeout or canceled.
                return nil, ctx.Err()
            }
        }
    }

    return cc, nil
}

其中的parseTarget是關(guān)鍵,他對這個target進(jìn)行了切割獲取到Scheme,而該方法下根據(jù)scheme去獲取resolverBuilder:cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme),其實這個就是我們在開啟gateway時調(diào)用RegisterConsul會將我們自定義的服務(wù)注冊的Builder(自定義的結(jié)構(gòu)體)賦到map[string]Builder這樣的map中凌彬,而相應(yīng)的負(fù)載方法我們重寫后也是注冊到一個map中,在創(chuàng)建服務(wù)端連接的時候就會從中獲取.

func parseTarget(target string) (ret resolver.Target) {
    var ok bool
    ret.Scheme, ret.Endpoint, ok = split2(target, "://")
    if !ok {
        return resolver.Target{Endpoint: target}
    }
    ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
    if !ok {
        return resolver.Target{Endpoint: target}
    }
    return ret
}

寫的可能有點亂窑滞,可參考此文章:https://segmentfault.com/a/1190000018424798?utm_source=tag-newest

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末力穗,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子睦番,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件托嚣,死亡現(xiàn)場離奇詭異巩检,居然都是意外死亡,警方通過查閱死者的電腦和手機示启,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進(jìn)店門兢哭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人丑搔,你說我怎么就攤上這事厦瓢。” “怎么了啤月?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵煮仇,是天一觀的道長。 經(jīng)常有香客問我谎仲,道長浙垫,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任郑诺,我火速辦了婚禮夹姥,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘辙诞。我一直安慰自己辙售,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布飞涂。 她就那樣靜靜地躺著旦部,像睡著了一般。 火紅的嫁衣襯著肌膚如雪较店。 梳的紋絲不亂的頭發(fā)上士八,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天,我揣著相機與錄音梁呈,去河邊找鬼婚度。 笑死,一個胖子當(dāng)著我的面吹牛官卡,可吹牛的內(nèi)容都是我干的蝗茁。 我是一名探鬼主播,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼寻咒,長吁一口氣:“原來是場噩夢啊……” “哼评甜!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起仔涩,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎粘舟,沒想到半個月后熔脂,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體佩研,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年霞揉,在試婚紗的時候發(fā)現(xiàn)自己被綠了旬薯。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,110評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡适秩,死狀恐怖绊序,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情秽荞,我是刑警寧澤骤公,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站扬跋,受9級特大地震影響阶捆,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜钦听,卻給世界環(huán)境...
    茶點故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一洒试、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧朴上,春花似錦垒棋、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至酵镜,卻和暖如春碉碉,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背淮韭。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工垢粮, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人靠粪。 一個月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓蜡吧,卻偏偏與公主長得像,于是被迫代替她去往敵國和親占键。 傳聞我的和親對象是個殘疾皇子昔善,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內(nèi)容