【Go夜讀】grpc 開(kāi)發(fā)及 grpcp 的源碼分析

資源鏈接


B站視頻

grpcp 介紹


  • grpcp 是基于 grpc 開(kāi)發(fā)的闪幽,通過(guò)對(duì) grpc 的封裝和復(fù)用,來(lái)使得業(yè)務(wù)方能夠最快速地通過(guò) grpcp 找到相通的鏈路
  • grpcp 主要實(shí)現(xiàn)了對(duì) grpc 的連接的維護(hù),使得業(yè)務(wù)方能夠省去對(duì)鏈路的維護(hù)灰嫉,直接根據(jù)路由表進(jìn)行數(shù)據(jù)分發(fā)

源碼分析


結(jié)構(gòu)體和變量

  • ConnectionTracker
    ConnectionTracker 就是控制整個(gè)連接表煎源,維護(hù)所有的連接
// ConnectionTracker keep connections and maintain their status
type ConnectionTracker struct {
    sync.RWMutex  // 用來(lái)防止多個(gè)協(xié)程同時(shí)操作 connections 和 alives
    // 創(chuàng)建連接的函數(shù)合敦,可以自定義,也可以直接使用本代碼中的 dialF 變量定義的默認(rèn)函數(shù)
    dial              DialFunc
    // 進(jìn)行連接是否 ready 的檢查函數(shù)耐朴,可以自定義,不設(shè)定則使用默認(rèn)函數(shù)defaultReadyCheck()
    readyCheck        ReadyCheckFunc
    // 維護(hù)一個(gè) addr 到連接的映射表盹憎,斷連也會(huì)維護(hù)隔箍,之后會(huì)重連
    connections       map[string]*trackedConn
    // 維護(hù) connections 中處于 ready狀態(tài) 的連接
    alives            map[string]*trackedConn
    // 超時(shí)時(shí)間
    timeout           time.Duration  // 連接超時(shí)時(shí)間
    checkReadyTimeout time.Duration  // 在心跳檢測(cè)時(shí),如果連接不是ready脚乡,則等待 conn 的狀態(tài)改變超時(shí)時(shí)間
    heartbeatInterval time.Duration  // 心跳間隔
    // 上下文蜒滩,這個(gè)上下文傳給每一個(gè)連接,作為每個(gè)連接的 parent ctx奶稠,當(dāng) parent ctx cancel(cannel被調(diào)用)時(shí)俯艰,每個(gè)子ctx 均被 cancel
    ctx    context.Context
    // 上下文對(duì)應(yīng)的 cancel 函數(shù),未用到锌订,提供給業(yè)務(wù)方使用竹握,當(dāng)關(guān)閉所有連接時(shí),調(diào)用 cannel
    cannel context.CancelFunc
}
  • trackedCon
    trackedConn 用來(lái)控制單個(gè)連接
type trackedConn struct {
    sync.RWMutex  // 用來(lái)防止多個(gè)協(xié)程同時(shí)操作本連接
    // 本連接對(duì)應(yīng)的 addr 地址辆飘,即 key啦辐,用來(lái)標(biāo)識(shí)一個(gè)連接
    addr    string
    // 本連接對(duì)應(yīng)的 grpc 連接
    conn    *grpc.ClientConn
    // 本連接對(duì)應(yīng)的父ConnectionTracker谓传,該連接存在于 tracker.connections 映射表中,可能存在于 tracker.alives 映射表中
    tracker *ConnectionTracker
    // 本連接對(duì)應(yīng)的 state芹关,將 grpc 中的五種狀態(tài)(Idle Connecting Ready TransientFailure Shutdown)減到三種(Idle Ready Shutdown)
    state   connectivity.State
    // 本連接對(duì)應(yīng)的超時(shí)時(shí)刻续挟,每次心跳檢測(cè)成功,如果是 ready 狀態(tài)就更新超時(shí)時(shí)間侥衬,心跳檢測(cè)一直失敗诗祸,超過(guò)超時(shí)時(shí)間 tracker.timeout,則 shutdown() 本連接
    expires time.Time
    // 本連接對(duì)應(yīng)的 retry 次數(shù)轴总,每次心跳檢測(cè)成功直颅,如果是 ready 狀態(tài)就重置為 0,否則 ++怀樟,代碼中未使用該變量
    retry   int
    // 本連接對(duì)應(yīng)的 CancelFunc功偿,在本連接 shutdown 時(shí),通知本連接對(duì)應(yīng)的心跳協(xié)程退出
    cannel  context.CancelFunc
}
  • Pool
    以下通過(guò)池子來(lái)實(shí)現(xiàn)復(fù)用同一個(gè)連接往堡,但是復(fù)用的時(shí)候不能將拿出來(lái)的 connection close 掉脖含,否則該 conn 會(huì)被置為 shutdown,并被踢出 ConnectionTracker.alives 記錄, 同時(shí)其他協(xié)程已經(jīng)拿出來(lái)的 conn 就無(wú)法使用了
var (
    // 默認(rèn)連接池投蝉,如果沒(méi)有自定義連接池养葵,則使用默認(rèn)連接池
    defaultPool *ConnectionTracker
    // 用于限定創(chuàng)建連接池的操作最多被僅僅執(zhí)行一次
    once        sync.Once
    // 連接函數(shù),可以自定義參數(shù)(類(lèi)似grpc.WithInsecure())
    dialF       = func(addr string) (*grpc.ClientConn, error) {
        return grpc.Dial(
            addr,
            grpc.WithInsecure(),
        )
    }
)
  • 其他默認(rèn)參數(shù)
// 超時(shí)參數(shù)瘩缆,可以自定義覆蓋掉
const (
    defaultTimeout    = 100 * time.Second
    checkReadyTimeout = 5 * time.Second
    heartbeatInterval = 20 * time.Second
)
// 如果嘗試獲取某個(gè)連接关拒,當(dāng)前連接不聯(lián)通,則返回的錯(cuò)誤定義
var (
    errNoReady = fmt.Errorf("no ready")
)

函數(shù)

ConnectionTracker

  • 可以被自定義的函數(shù)
// DialFunc dial function
// 用來(lái)自定義連接函數(shù)庸娱,在 New 時(shí)需要被傳入
// 如果業(yè)務(wù)方直接使用 ConnectionTracker着绊,則需要自己寫(xiě) dailF,參考 Pool 的 dialF 寫(xiě)法來(lái)自定義
// 如果業(yè)務(wù)方使用 pool() 函數(shù)熟尉,則利用 Pool 的默認(rèn) dailF 函數(shù)來(lái)創(chuàng)建
type DialFunc func(addr string) (*grpc.ClientConn, error)

// ReadyCheckFunc check conn is ready function
// 用來(lái)自定義檢測(cè)是否 ready 的函數(shù)归露,默認(rèn)使用 defaultReadyCheck(),可以根據(jù)業(yè)務(wù)場(chǎng)景自定義斤儿,用來(lái)得到連接的狀態(tài)剧包,直接操作 conn.GetState() 實(shí)現(xiàn)自己的封裝
type ReadyCheckFunc func(ctx context.Context, conn *grpc.ClientConn) connectivity.State
  • defaultReadyCheck()
    檢測(cè)連接的狀態(tài),并返回
    如果是 ready 或 shutdown往果,則直接返回
    如果是別的狀態(tài)疆液,則檢測(cè)是否有狀態(tài)改變,如果沒(méi)有狀態(tài)改變而 ConnectionTracker.checkReadyTimeout 超時(shí)陕贮,則返回 Idle堕油,如果狀態(tài)改變,則繼續(xù)循環(huán)
// 這里簡(jiǎn)化了 grpc 的狀態(tài),從五種簡(jiǎn)化為三種掉缺,這個(gè)默認(rèn)的檢查狀態(tài)的方法可以被重寫(xiě)覆蓋卜录,來(lái)自定義狀態(tài)
func defaultReadyCheck(ctx context.Context, conn *grpc.ClientConn) connectivity.State {
    for {
        s := conn.GetState()
        if s == connectivity.Ready || s == connectivity.Shutdown {
            return s
        }
        if !conn.WaitForStateChange(ctx, s) {
            return connectivity.Idle
        }
    }
}
  • New 函數(shù)和 New 函數(shù)實(shí)現(xiàn)動(dòng)態(tài)參數(shù)傳入
// New initialization ConnectionTracker
func New(dial DialFunc, opts ...TrackerOption) *ConnectionTracker {
    // 創(chuàng)建默認(rèn)的 ConnectionTracker 
    ctx, cannel := context.WithCancel(context.Background())
    ct := &ConnectionTracker{
        dial:              dial,
        readyCheck:        defaultReadyCheck,
        connections:       make(map[string]*trackedConn),
        alives:            make(map[string]*trackedConn),
        timeout:           defaultTimeout,
        checkReadyTimeout: checkReadyTimeout,
        heartbeatInterval: heartbeatInterval,

        ctx:    ctx,
        cannel: cannel,
    }

    // 通過(guò)傳入的參數(shù)來(lái)覆蓋 ct 的默認(rèn)參數(shù)
    for _, opt := range opts {
        opt(ct)
    }

    return ct
}

// TrackerOption initialization options
// 用來(lái)操作選項(xiàng),實(shí)現(xiàn) New() 的動(dòng)態(tài)參數(shù)
type TrackerOption func(*ConnectionTracker)

/**************************以下函數(shù)的作用為更新設(shè)定的參數(shù)眶明,業(yè)務(wù)方可以自定義函數(shù)并傳入 New() *****************************/ 

// SetTimeout custom timeout
func SetTimeout(timeout time.Duration) TrackerOption {
    return func(o *ConnectionTracker) {
        o.timeout = timeout
    }
}

// SetCheckReadyTimeout custom checkReadyTimeout
func SetCheckReadyTimeout(timeout time.Duration) TrackerOption {
    return func(o *ConnectionTracker) {
        o.checkReadyTimeout = timeout
    }
}

// SetHeartbeatInterval custom heartbeatInterval
func SetHeartbeatInterval(interval time.Duration) TrackerOption {
    return func(o *ConnectionTracker) {
        o.heartbeatInterval = interval
    }
}

// CustomReadyCheck custom ready check function
func CustomReadyCheck(f ReadyCheckFunc) TrackerOption {
    return func(o *ConnectionTracker) {
        o.readyCheck = f
    }
}
  • 對(duì)外接口
// GetConn create or get an existing connection
// 獲取一個(gè)存在的連接艰毒,如果連接之前已經(jīng)被創(chuàng)建,則直接返回赘来;否則創(chuàng)建連接
func (ct *ConnectionTracker) GetConn(addr string) (*grpc.ClientConn, error) {
    return ct.getConn(addr, false)
}

// Dial force to create new connection, this operation will close old connection!
// 強(qiáng)制重新創(chuàng)建一個(gè)新連接现喳,如果連接之前已經(jīng)被創(chuàng)建凯傲,則關(guān)閉后再次重新創(chuàng)建
func (ct *ConnectionTracker) Dial(addr string) (*grpc.ClientConn, error) {
    return ct.getConn(addr, true)
}

// Alives current live connections
// 返回當(dāng)前處于 ready 狀態(tài)的連接的 addr 數(shù)組
func (ct *ConnectionTracker) Alives() []string {
    // ConnectionTracker 的鎖犬辰,用來(lái)防止多個(gè) goroutine 同時(shí)操作 alives(比如在處理過(guò)程中加入或者刪除 alives 中的元素)
    ct.RLock()
    defer ct.RUnlock()
    alives := []string{}
    for addr := range ct.alives {
        alives = append(alives, addr)
    }
    return alives
}
  • 其他內(nèi)部函數(shù)
// 獲取和 addr 對(duì)應(yīng)的連接,不存在則創(chuàng)建冰单;force 用來(lái)指定是否強(qiáng)制重新創(chuàng)建
func (ct *ConnectionTracker) getConn(addr string, force bool) (*grpc.ClientConn, error) {
    // ConnectionTracker 的鎖幌缝,用來(lái)防止多個(gè) goroutine 同時(shí)操作 connections(唯一的場(chǎng)景就是多個(gè) goroutine 獲取 addr,發(fā)現(xiàn)沒(méi)有诫欠,然后并行創(chuàng)建 trackedConn 并設(shè)定 connections[addr])
    ct.Lock()
    tc, ok := ct.connections[addr]
    if !ok {
        tc = &trackedConn{
            addr:    addr,
            tracker: ct,
        }
        ct.connections[addr] = tc
    }
    // 在此處 Unlock涵卵,而不是在上面 defer ct.Unlock() 是因?yàn)橐崆搬尫沛i,讓其他的協(xié)程能夠讀取 connections荒叼,因?yàn)橄旅娴?tryconn 需要嘗試連接轿偎,可能會(huì)耗費(fèi)較長(zhǎng)時(shí)間,因此需要提前把鎖釋放掉
    ct.Unlock()

    // 嘗試連接被廓,如果連接已經(jīng)創(chuàng)建且 force 為 false(不強(qiáng)制重新創(chuàng)建)坏晦,則會(huì)立即返回(Ready 返回 nil,Idle 返回 errNoReady嫁乘,Shutdown 進(jìn)行重新創(chuàng)建連接) 
    err := tc.tryconn(ct.ctx, force)
    if err != nil {
        return nil, err
    }
    return tc.conn, nil
}

// 當(dāng)連接 ready 后昆婿,加入 alives
func (ct *ConnectionTracker) connReady(tc *trackedConn) {
    ct.Lock()
    defer ct.Unlock()
    ct.alives[tc.addr] = tc
}

// 當(dāng)連接 unready 后,從 alives 移除
func (ct *ConnectionTracker) connUnReady(addr string) {
    ct.Lock()
    defer ct.Unlock()
    delete(ct.alives, addr)
}

trackedConn

  • 連接 tryconn
    如果已經(jīng)創(chuàng)建連接且不強(qiáng)制重新創(chuàng)建蜓斧,連接處于 Ready 狀態(tài)則返回 nil仓蛆,處于 Idle 狀態(tài)返回 errNoReady,處于 Shutdown 狀態(tài)進(jìn)行重新創(chuàng)建連接
    如果強(qiáng)制重新創(chuàng)建連接挎春,則關(guān)閉原來(lái)的連接(如果存在)看疙,并重新連接
    創(chuàng)建連接過(guò)程:1. 關(guān)閉可能存在的連接,重新創(chuàng)建連接直奋;2. 開(kāi)啟 ready 狀態(tài)判斷函數(shù)狼荞,超時(shí)則返回 Idle 狀態(tài);3. 開(kāi)啟心跳函數(shù)帮碰;4. 更新連接的參數(shù)并更新 tracker 中該連接的記錄
func (tc *trackedConn) tryconn(ctx context.Context, force bool) error {
    // 對(duì)本連接加鎖相味,防止其他 goroutine 同時(shí)連接
    tc.Lock()
    defer tc.Unlock()
    if !force && tc.conn != nil { // 其他的協(xié)程可能建立了連接,所以如果建立好了殉挽,則直接返回并復(fù)用
        if tc.state == connectivity.Ready {
            return nil
        }
        if tc.state == connectivity.Idle {
            return errNoReady
        }
    }
    /******************************下面是創(chuàng)建連接的部分**********************************/
    // 如果有連接丰涉,則先關(guān)閉原來(lái)的連接
    if tc.conn != nil { // close shutdown conn
        tc.conn.Close()
    }
    // 根據(jù)設(shè)定的 dialF 函數(shù)創(chuàng)建連接
    conn, err := tc.tracker.dial(tc.addr)
    if err != nil {
        return err
    }
    tc.conn = conn
    
    // ready 超時(shí)上下文拓巧,設(shè)定超時(shí)時(shí)間,如果超時(shí)時(shí)間內(nèi)沒(méi)有變?yōu)?Ready 或者 Shutdown 狀態(tài)一死,則返回 Idle
    readyCtx, cancel := context.WithTimeout(ctx, tc.tracker.checkReadyTimeout)
    // 本函數(shù)結(jié)束并完成連接的 ready 后肛度,清除資源
    defer cancel()
    // 調(diào)動(dòng) ConnectionTracker.readyCheck 函數(shù)(可能是自定義 可能是默認(rèn))。默認(rèn)邏輯:如果狀態(tài)
    // 是 Ready 或者 Shutdown投慈,則直接返回承耿,否則等待狀態(tài)的變化,超時(shí)則返回 Idle 狀態(tài)
    checkStatus := tc.tracker.readyCheck(readyCtx, tc.conn)

    // heartbeat 心跳上下文
    hbCtx, hbCancel := context.WithCancel(ctx)
    tc.cannel = hbCancel
    // 開(kāi)啟心跳協(xié)程函數(shù)伪煤,每個(gè)心跳間隔加袋,探測(cè)一次,當(dāng)連接被 Shutdown 時(shí)或者ConnectionTracker的cannel被調(diào)用時(shí)抱既,tc.cannel 被調(diào)用职烧,從而退出心跳協(xié)程
    go tc.heartbeat(hbCtx)

    if checkStatus != connectivity.Ready {
        return errNoReady
    }
    // 調(diào)用連接準(zhǔn)備好的函數(shù),更改連接狀態(tài)為 Ready防泵,更新超時(shí)時(shí)刻蚀之,重置 retry 參數(shù),并將該連接加入 ConnectionTracker.Alives 數(shù)組
    tc.ready()
    return nil
}

這里解決一個(gè)疑問(wèn):
當(dāng)?shù)谝淮握?qǐng)求 addr 連接捷泞,如果調(diào)用 GetConn() 足删,tc, ok := ct.connections[addr] 返回空,則創(chuàng)建連接锁右,此時(shí)連接狀態(tài)默認(rèn)為 0失受,即 Idle 狀態(tài)。同時(shí) tc.conn == nil
然后進(jìn)入 tryconn()骡湖,由于 tc.conn == nil 因此不會(huì)進(jìn)入“直接判斷連接狀態(tài)為 Idle 而返回 errNoReady ”的邏輯贱纠,而是進(jìn)入后面創(chuàng)建連接的邏輯

  • 心跳檢測(cè)
    每次新創(chuàng)建一個(gè)連接,會(huì)創(chuàng)建一個(gè)協(xié)程响蕴,開(kāi)啟這個(gè)連接的心跳檢測(cè)谆焊,根據(jù)檢測(cè)結(jié)果進(jìn)行狀態(tài)調(diào)整
    被 Shutdown 的連接不會(huì)從映射表中刪除,在下次請(qǐng)求該 addr 連接的時(shí)候浦夷,會(huì)進(jìn)入 tryconn()辖试,重新創(chuàng)建一個(gè)連接,開(kāi)啟新連接的心跳函數(shù)
// 如果連接不是 Shutdown 則每隔 tracker.heartbeatInterval 時(shí)間進(jìn)行一次心跳檢測(cè)劈狐;如果父 ctx 被取消或者其他原因?qū)е逻B接的 ctx 被取消罐孝,則關(guān)閉連接,結(jié)束心跳
func (tc *trackedConn) heartbeat(ctx context.Context) {
    ticker := time.NewTicker(tc.tracker.heartbeatInterval)
    for tc.getState() != connectivity.Shutdown {
        select {
        case <-ctx.Done():
            tc.shutdown()
            break
        case <-ticker.C:
            tc.healthCheck(ctx)
        }
    }
}

// 檢測(cè)當(dāng)前連接的狀態(tài)肥缔,并返回莲兢,期間連接被鎖定,tryconn 和 healthCheck 不能同時(shí)進(jìn)行
func (tc *trackedConn) healthCheck(ctx context.Context) {
    tc.Lock()
    defer tc.Unlock()
    ctx, cancel := context.WithTimeout(ctx, tc.tracker.checkReadyTimeout)
    defer cancel()

    // 如果是 Ready 或者 Shutdown 則直接返回,否則等待狀態(tài)轉(zhuǎn)變改艇,如果 checkReadyTimeout 超時(shí)收班,則返回 Idle
    switch tc.tracker.readyCheck(ctx, tc.conn) {
    case connectivity.Ready:
        tc.ready()
    case connectivity.Shutdown:
        tc.shutdown()
    case connectivity.Idle:
        // 判斷是否連接 timeout 超時(shí),如果超時(shí)則關(guān)閉谒兄,否則置為 Idle狀態(tài)
        if tc.expired() {
            tc.shutdown()
        } else {
            tc.idle()
        }
    }
}
  • 其他內(nèi)部函數(shù)
/**************************獲取狀態(tài)*****************************/ 

func (tc *trackedConn) getState() connectivity.State {
    tc.RLock()
    defer tc.RUnlock()
    return tc.state
}

/**************************根據(jù)狀態(tài)進(jìn)行參數(shù)設(shè)定*****************************/ 

func (tc *trackedConn) ready() {
    tc.state = connectivity.Ready
    tc.expires = time.Now().Add(tc.tracker.timeout)
    tc.retry = 0
    tc.tracker.connReady(tc)
}

func (tc *trackedConn) idle() {
    tc.state = connectivity.Idle
    tc.retry++
    tc.tracker.connUnReady(tc.addr)
}

func (tc *trackedConn) shutdown() {
    tc.state = connectivity.Shutdown
    tc.conn.Close()
    tc.cannel()
    tc.tracker.connUnReady(tc.addr)
}

func (tc *trackedConn) expired() bool {
    return tc.expires.Before(time.Now())
}

Pool

pool 其實(shí)就是一個(gè) ConnectionTracker摔桦,因此在業(yè)務(wù)方 New ConnectionTracker 時(shí),業(yè)務(wù)方可以直接定義為 pool 變量名承疲,然后調(diào)用 pool 的 GetConn / Dial / Alives邻耕,如下示例:

/***********************以下使用方法其實(shí)是在用 ConnectionTracker **************************/
pool := New(dialF, opts)
conn := pool.GetConn(addr)
conn := pool.Dail(addr)
alives := pool.Alives()
/***********************以下使用方法才是在用 pool **************************/
conn := GetConn(addr)
conn := Dail(addr)
alives := Alives()
  • 函數(shù)不多,全部列在一起
    這里的 pool 是可以方便業(yè)務(wù)方直接使用 pool() 得到一個(gè)默認(rèn)的 ConnectionTracker燕鸽,直接開(kāi)始業(yè)務(wù)工作
    直接調(diào)用 GetConn() / Dail() / Alives() 即可兄世,在第一次 pool() 函數(shù)被調(diào)用時(shí),創(chuàng)建一個(gè) ConnectionTracker绵咱,之后可以直接復(fù)用碘饼,不會(huì)重新創(chuàng)建 ConnectionTracker
func pool() *ConnectionTracker {
    once.Do(func() {
        defaultPool = New(dialF)
    })
    return defaultPool
}

// GetConn create or get an existing connection from default pool
func GetConn(addr string) (*grpc.ClientConn, error) {
    return pool().GetConn(addr)
}

// Dial force to create new connection from default pool, this operation will close old connection!
func Dial(addr string) (*grpc.ClientConn, error) {
    return pool().Dial(addr)
}

// Alives current live connections from default pool
func Alives() []string {
    return pool().Alives()
}

以上就是整個(gè) grpcp 的源碼

開(kāi)發(fā)此工具的背景


網(wǎng)絡(luò)物理視圖
  1. 當(dāng) server 面臨業(yè)務(wù)的多種請(qǐng)求時(shí)熙兔,需要將請(qǐng)求轉(zhuǎn)移給不同的物理機(jī)或者 BGP悲伶,那么如果所有的連接都加在 server 上,會(huì)導(dǎo)致連接數(shù)過(guò)多住涉,以及維護(hù)連接的開(kāi)銷(xiāo)很大麸锉,比如網(wǎng)絡(luò)不穩(wěn)定的時(shí)候,會(huì)使得傳輸效率極大下降
  2. 為了解決這個(gè)問(wèn)題舆声,利用 grpcp花沉,來(lái)維護(hù)連接狀態(tài),而業(yè)務(wù)方可以快速得到一個(gè)可用的 Ready 狀態(tài)的連接媳握,從而遞交數(shù)據(jù)包碱屁。
  3. 同時(shí),存在大量的連接時(shí)蛾找,可以分級(jí)娩脾,從而減少 server 的維護(hù)連接的壓力,分擔(dān)壓力給 agent
  4. server 和 每一個(gè) agent 都維護(hù)著自己到別人的連接打毛。
  5. 初始啟動(dòng)時(shí)柿赊,由于連接均未建立,則短時(shí)間會(huì)大量建立連接幻枉,之后則可以穩(wěn)定下來(lái)碰声,滿(mǎn)足連接復(fù)用的功能需求。

場(chǎng)景示例:

  • 初始所有的連接都沒(méi)有熬甫。
  • 業(yè)務(wù)方維護(hù)一個(gè)路由表胰挑,比如從 server -> agent-d,有兩條匹配規(guī)則:1. server -> agent-a -> agent-d 2. server -> agent-b -> agent-d
  • 業(yè)務(wù)方根據(jù)優(yōu)先級(jí),優(yōu)先選取 1 規(guī)則瞻颂,從 server 取 agent-a 的連接脚粟,server 沒(méi)有,則創(chuàng)建連接蘸朋;然后得到連接核无,再向 agent-a 請(qǐng)求 agent-d 的連接,agent-a 創(chuàng)建連接藕坯,得到 conn 之后团南,發(fā)送數(shù)據(jù)包。
    然后其他業(yè)務(wù)方也要發(fā)送數(shù)據(jù)到 agent-d炼彪,此時(shí)吐根,1 鏈路已經(jīng)被創(chuàng)建好,因此辐马,直接獲取得到連接拷橘,并發(fā)送包。
  • 這時(shí)喜爷, agent-a 到 agent-d 連接開(kāi)始不穩(wěn)定冗疮,那么因?yàn)榇嬖谛奶B接狀態(tài)會(huì)被置為 Idle
  • 另一個(gè)業(yè)務(wù)請(qǐng)求想要發(fā)送至 agent-d檩帐,請(qǐng)求連接术幔,由于 Idel 狀態(tài),直接能夠收返回 errNoReady湃密,因此開(kāi)始選用規(guī)則 2诅挑,server 創(chuàng)建 agent-b連接,agent-b 創(chuàng)建 agent-d 連接泛源,然后返回
  • 其他業(yè)務(wù)請(qǐng)求時(shí)拔妥,也會(huì)復(fù)用 2 規(guī)則的鏈路
  • 這時(shí),agent-a 到 agent-d 因?yàn)樾奶S護(hù)达箍,重新連接没龙,到達(dá) Ready 狀態(tài)
  • 這時(shí)再來(lái)一個(gè)請(qǐng)求發(fā)送往 agent-d,就會(huì)直接匹配到規(guī)則 1幻梯,并發(fā)送兜畸。

注:

  1. 路由表是維護(hù)在業(yè)務(wù)方,由業(yè)務(wù)來(lái)定義路由的使用情況
  2. agent 做兩件事碘梢,如果是直連咬摇,數(shù)據(jù)是發(fā)給自己的,則直接處理煞躬;如果是轉(zhuǎn)發(fā)肛鹏,則查找相應(yīng)的連接并發(fā)送逸邦,或者創(chuàng)建連接并發(fā)送,或者失敗返回 err在扰,讓業(yè)務(wù)方迅速進(jìn)行下一條規(guī)則匹配并發(fā)送

其他資源補(bǔ)充


  1. grpc 的使用一般先定義 proto 文件(定義 rpc 接口 以及 rpc 的輸入結(jié)構(gòu)和傳輸數(shù)據(jù)的格式等)缕减,然后根據(jù) protoc-gen-go 工具來(lái)生成服務(wù)端和客戶(hù)端代碼(一個(gè)第三方的更快捷的包 golang/protobuf
  2. grpc-go 連接語(yǔ)義和 API
  3. ↑ 大佬的 blog 主頁(yè)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市芒珠,隨后出現(xiàn)的幾起案子桥狡,更是在濱河造成了極大的恐慌,老刑警劉巖皱卓,帶你破解...
    沈念sama閱讀 212,599評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件裹芝,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡娜汁,警方通過(guò)查閱死者的電腦和手機(jī)嫂易,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,629評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)掐禁,“玉大人怜械,你說(shuō)我怎么就攤上這事「凳拢” “怎么了缕允?”我有些...
    開(kāi)封第一講書(shū)人閱讀 158,084評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)享完。 經(jīng)常有香客問(wèn)我灼芭,道長(zhǎng)有额,這世上最難降的妖魔是什么般又? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,708評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮巍佑,結(jié)果婚禮上茴迁,老公的妹妹穿的比我還像新娘。我一直安慰自己萤衰,他們只是感情好堕义,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,813評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著脆栋,像睡著了一般倦卖。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上椿争,一...
    開(kāi)封第一講書(shū)人閱讀 50,021評(píng)論 1 291
  • 那天怕膛,我揣著相機(jī)與錄音,去河邊找鬼秦踪。 笑死褐捻,一個(gè)胖子當(dāng)著我的面吹牛掸茅,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播柠逞,決...
    沈念sama閱讀 39,120評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼昧狮,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了板壮?” 一聲冷哼從身側(cè)響起逗鸣,我...
    開(kāi)封第一講書(shū)人閱讀 37,866評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎绰精,沒(méi)想到半個(gè)月后慕购,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,308評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡茬底,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,633評(píng)論 2 327
  • 正文 我和宋清朗相戀三年沪悲,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片阱表。...
    茶點(diǎn)故事閱讀 38,768評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡殿如,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出最爬,到底是詐尸還是另有隱情涉馁,我是刑警寧澤,帶...
    沈念sama閱讀 34,461評(píng)論 4 333
  • 正文 年R本政府宣布爱致,位于F島的核電站烤送,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏糠悯。R本人自食惡果不足惜帮坚,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,094評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望互艾。 院中可真熱鬧试和,春花似錦、人聲如沸纫普。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,850評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)昨稼。三九已至节视,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間假栓,已是汗流浹背寻行。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,082評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留但指,地道東北人寡痰。 一個(gè)月前我還...
    沈念sama閱讀 46,571評(píng)論 2 362
  • 正文 我出身青樓抗楔,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親拦坠。 傳聞我的和親對(duì)象是個(gè)殘疾皇子连躏,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,666評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容