資源鏈接
grpcp 介紹
- grpcp 是基于 grpc 開(kāi)發(fā)的闪幽,通過(guò)對(duì) grpc 的封裝和復(fù)用,來(lái)使得業(yè)務(wù)方能夠最快速地通過(guò) grpcp 找到相通的鏈路
- grpcp 主要實(shí)現(xiàn)了對(duì) grpc 的連接的維護(hù),使得業(yè)務(wù)方能夠省去對(duì)鏈路的維護(hù)灰嫉,直接根據(jù)路由表進(jìn)行數(shù)據(jù)分發(fā)
源碼分析
結(jié)構(gòu)體和變量
-
ConnectionTracker
ConnectionTracker 就是控制整個(gè)連接表煎源,維護(hù)所有的連接
// ConnectionTracker keep connections and maintain their status
type ConnectionTracker struct {
sync.RWMutex // 用來(lái)防止多個(gè)協(xié)程同時(shí)操作 connections 和 alives
// 創(chuàng)建連接的函數(shù)合敦,可以自定義,也可以直接使用本代碼中的 dialF 變量定義的默認(rèn)函數(shù)
dial DialFunc
// 進(jìn)行連接是否 ready 的檢查函數(shù)耐朴,可以自定義,不設(shè)定則使用默認(rèn)函數(shù)defaultReadyCheck()
readyCheck ReadyCheckFunc
// 維護(hù)一個(gè) addr 到連接的映射表盹憎,斷連也會(huì)維護(hù)隔箍,之后會(huì)重連
connections map[string]*trackedConn
// 維護(hù) connections 中處于 ready狀態(tài) 的連接
alives map[string]*trackedConn
// 超時(shí)時(shí)間
timeout time.Duration // 連接超時(shí)時(shí)間
checkReadyTimeout time.Duration // 在心跳檢測(cè)時(shí),如果連接不是ready脚乡,則等待 conn 的狀態(tài)改變超時(shí)時(shí)間
heartbeatInterval time.Duration // 心跳間隔
// 上下文蜒滩,這個(gè)上下文傳給每一個(gè)連接,作為每個(gè)連接的 parent ctx奶稠,當(dāng) parent ctx cancel(cannel被調(diào)用)時(shí)俯艰,每個(gè)子ctx 均被 cancel
ctx context.Context
// 上下文對(duì)應(yīng)的 cancel 函數(shù),未用到锌订,提供給業(yè)務(wù)方使用竹握,當(dāng)關(guān)閉所有連接時(shí),調(diào)用 cannel
cannel context.CancelFunc
}
-
trackedCon
trackedConn 用來(lái)控制單個(gè)連接
type trackedConn struct {
sync.RWMutex // 用來(lái)防止多個(gè)協(xié)程同時(shí)操作本連接
// 本連接對(duì)應(yīng)的 addr 地址辆飘,即 key啦辐,用來(lái)標(biāo)識(shí)一個(gè)連接
addr string
// 本連接對(duì)應(yīng)的 grpc 連接
conn *grpc.ClientConn
// 本連接對(duì)應(yīng)的父ConnectionTracker谓传,該連接存在于 tracker.connections 映射表中,可能存在于 tracker.alives 映射表中
tracker *ConnectionTracker
// 本連接對(duì)應(yīng)的 state芹关,將 grpc 中的五種狀態(tài)(Idle Connecting Ready TransientFailure Shutdown)減到三種(Idle Ready Shutdown)
state connectivity.State
// 本連接對(duì)應(yīng)的超時(shí)時(shí)刻续挟,每次心跳檢測(cè)成功,如果是 ready 狀態(tài)就更新超時(shí)時(shí)間侥衬,心跳檢測(cè)一直失敗诗祸,超過(guò)超時(shí)時(shí)間 tracker.timeout,則 shutdown() 本連接
expires time.Time
// 本連接對(duì)應(yīng)的 retry 次數(shù)轴总,每次心跳檢測(cè)成功直颅,如果是 ready 狀態(tài)就重置為 0,否則 ++怀樟,代碼中未使用該變量
retry int
// 本連接對(duì)應(yīng)的 CancelFunc功偿,在本連接 shutdown 時(shí),通知本連接對(duì)應(yīng)的心跳協(xié)程退出
cannel context.CancelFunc
}
-
Pool
以下通過(guò)池子來(lái)實(shí)現(xiàn)復(fù)用同一個(gè)連接往堡,但是復(fù)用的時(shí)候不能將拿出來(lái)的 connection close 掉脖含,否則該 conn 會(huì)被置為 shutdown,并被踢出 ConnectionTracker.alives 記錄, 同時(shí)其他協(xié)程已經(jīng)拿出來(lái)的 conn 就無(wú)法使用了
var (
// 默認(rèn)連接池投蝉,如果沒(méi)有自定義連接池养葵,則使用默認(rèn)連接池
defaultPool *ConnectionTracker
// 用于限定創(chuàng)建連接池的操作最多被僅僅執(zhí)行一次
once sync.Once
// 連接函數(shù),可以自定義參數(shù)(類(lèi)似grpc.WithInsecure())
dialF = func(addr string) (*grpc.ClientConn, error) {
return grpc.Dial(
addr,
grpc.WithInsecure(),
)
}
)
- 其他默認(rèn)參數(shù)
// 超時(shí)參數(shù)瘩缆,可以自定義覆蓋掉
const (
defaultTimeout = 100 * time.Second
checkReadyTimeout = 5 * time.Second
heartbeatInterval = 20 * time.Second
)
// 如果嘗試獲取某個(gè)連接关拒,當(dāng)前連接不聯(lián)通,則返回的錯(cuò)誤定義
var (
errNoReady = fmt.Errorf("no ready")
)
函數(shù)
ConnectionTracker
- 可以被自定義的函數(shù)
// DialFunc dial function
// 用來(lái)自定義連接函數(shù)庸娱,在 New 時(shí)需要被傳入
// 如果業(yè)務(wù)方直接使用 ConnectionTracker着绊,則需要自己寫(xiě) dailF,參考 Pool 的 dialF 寫(xiě)法來(lái)自定義
// 如果業(yè)務(wù)方使用 pool() 函數(shù)熟尉,則利用 Pool 的默認(rèn) dailF 函數(shù)來(lái)創(chuàng)建
type DialFunc func(addr string) (*grpc.ClientConn, error)
// ReadyCheckFunc check conn is ready function
// 用來(lái)自定義檢測(cè)是否 ready 的函數(shù)归露,默認(rèn)使用 defaultReadyCheck(),可以根據(jù)業(yè)務(wù)場(chǎng)景自定義斤儿,用來(lái)得到連接的狀態(tài)剧包,直接操作 conn.GetState() 實(shí)現(xiàn)自己的封裝
type ReadyCheckFunc func(ctx context.Context, conn *grpc.ClientConn) connectivity.State
-
defaultReadyCheck()
檢測(cè)連接的狀態(tài),并返回
如果是 ready 或 shutdown往果,則直接返回
如果是別的狀態(tài)疆液,則檢測(cè)是否有狀態(tài)改變,如果沒(méi)有狀態(tài)改變而 ConnectionTracker.checkReadyTimeout 超時(shí)陕贮,則返回 Idle堕油,如果狀態(tài)改變,則繼續(xù)循環(huán)
// 這里簡(jiǎn)化了 grpc 的狀態(tài),從五種簡(jiǎn)化為三種掉缺,這個(gè)默認(rèn)的檢查狀態(tài)的方法可以被重寫(xiě)覆蓋卜录,來(lái)自定義狀態(tài)
func defaultReadyCheck(ctx context.Context, conn *grpc.ClientConn) connectivity.State {
for {
s := conn.GetState()
if s == connectivity.Ready || s == connectivity.Shutdown {
return s
}
if !conn.WaitForStateChange(ctx, s) {
return connectivity.Idle
}
}
}
- New 函數(shù)和 New 函數(shù)實(shí)現(xiàn)動(dòng)態(tài)參數(shù)傳入
// New initialization ConnectionTracker
func New(dial DialFunc, opts ...TrackerOption) *ConnectionTracker {
// 創(chuàng)建默認(rèn)的 ConnectionTracker
ctx, cannel := context.WithCancel(context.Background())
ct := &ConnectionTracker{
dial: dial,
readyCheck: defaultReadyCheck,
connections: make(map[string]*trackedConn),
alives: make(map[string]*trackedConn),
timeout: defaultTimeout,
checkReadyTimeout: checkReadyTimeout,
heartbeatInterval: heartbeatInterval,
ctx: ctx,
cannel: cannel,
}
// 通過(guò)傳入的參數(shù)來(lái)覆蓋 ct 的默認(rèn)參數(shù)
for _, opt := range opts {
opt(ct)
}
return ct
}
// TrackerOption initialization options
// 用來(lái)操作選項(xiàng),實(shí)現(xiàn) New() 的動(dòng)態(tài)參數(shù)
type TrackerOption func(*ConnectionTracker)
/**************************以下函數(shù)的作用為更新設(shè)定的參數(shù)眶明,業(yè)務(wù)方可以自定義函數(shù)并傳入 New() *****************************/
// SetTimeout custom timeout
func SetTimeout(timeout time.Duration) TrackerOption {
return func(o *ConnectionTracker) {
o.timeout = timeout
}
}
// SetCheckReadyTimeout custom checkReadyTimeout
func SetCheckReadyTimeout(timeout time.Duration) TrackerOption {
return func(o *ConnectionTracker) {
o.checkReadyTimeout = timeout
}
}
// SetHeartbeatInterval custom heartbeatInterval
func SetHeartbeatInterval(interval time.Duration) TrackerOption {
return func(o *ConnectionTracker) {
o.heartbeatInterval = interval
}
}
// CustomReadyCheck custom ready check function
func CustomReadyCheck(f ReadyCheckFunc) TrackerOption {
return func(o *ConnectionTracker) {
o.readyCheck = f
}
}
- 對(duì)外接口
// GetConn create or get an existing connection
// 獲取一個(gè)存在的連接艰毒,如果連接之前已經(jīng)被創(chuàng)建,則直接返回赘来;否則創(chuàng)建連接
func (ct *ConnectionTracker) GetConn(addr string) (*grpc.ClientConn, error) {
return ct.getConn(addr, false)
}
// Dial force to create new connection, this operation will close old connection!
// 強(qiáng)制重新創(chuàng)建一個(gè)新連接现喳,如果連接之前已經(jīng)被創(chuàng)建凯傲,則關(guān)閉后再次重新創(chuàng)建
func (ct *ConnectionTracker) Dial(addr string) (*grpc.ClientConn, error) {
return ct.getConn(addr, true)
}
// Alives current live connections
// 返回當(dāng)前處于 ready 狀態(tài)的連接的 addr 數(shù)組
func (ct *ConnectionTracker) Alives() []string {
// ConnectionTracker 的鎖犬辰,用來(lái)防止多個(gè) goroutine 同時(shí)操作 alives(比如在處理過(guò)程中加入或者刪除 alives 中的元素)
ct.RLock()
defer ct.RUnlock()
alives := []string{}
for addr := range ct.alives {
alives = append(alives, addr)
}
return alives
}
- 其他內(nèi)部函數(shù)
// 獲取和 addr 對(duì)應(yīng)的連接,不存在則創(chuàng)建冰单;force 用來(lái)指定是否強(qiáng)制重新創(chuàng)建
func (ct *ConnectionTracker) getConn(addr string, force bool) (*grpc.ClientConn, error) {
// ConnectionTracker 的鎖幌缝,用來(lái)防止多個(gè) goroutine 同時(shí)操作 connections(唯一的場(chǎng)景就是多個(gè) goroutine 獲取 addr,發(fā)現(xiàn)沒(méi)有诫欠,然后并行創(chuàng)建 trackedConn 并設(shè)定 connections[addr])
ct.Lock()
tc, ok := ct.connections[addr]
if !ok {
tc = &trackedConn{
addr: addr,
tracker: ct,
}
ct.connections[addr] = tc
}
// 在此處 Unlock涵卵,而不是在上面 defer ct.Unlock() 是因?yàn)橐崆搬尫沛i,讓其他的協(xié)程能夠讀取 connections荒叼,因?yàn)橄旅娴?tryconn 需要嘗試連接轿偎,可能會(huì)耗費(fèi)較長(zhǎng)時(shí)間,因此需要提前把鎖釋放掉
ct.Unlock()
// 嘗試連接被廓,如果連接已經(jīng)創(chuàng)建且 force 為 false(不強(qiáng)制重新創(chuàng)建)坏晦,則會(huì)立即返回(Ready 返回 nil,Idle 返回 errNoReady嫁乘,Shutdown 進(jìn)行重新創(chuàng)建連接)
err := tc.tryconn(ct.ctx, force)
if err != nil {
return nil, err
}
return tc.conn, nil
}
// 當(dāng)連接 ready 后昆婿,加入 alives
func (ct *ConnectionTracker) connReady(tc *trackedConn) {
ct.Lock()
defer ct.Unlock()
ct.alives[tc.addr] = tc
}
// 當(dāng)連接 unready 后,從 alives 移除
func (ct *ConnectionTracker) connUnReady(addr string) {
ct.Lock()
defer ct.Unlock()
delete(ct.alives, addr)
}
trackedConn
-
連接 tryconn
如果已經(jīng)創(chuàng)建連接且不強(qiáng)制重新創(chuàng)建蜓斧,連接處于 Ready 狀態(tài)則返回 nil仓蛆,處于 Idle 狀態(tài)返回 errNoReady,處于 Shutdown 狀態(tài)進(jìn)行重新創(chuàng)建連接
如果強(qiáng)制重新創(chuàng)建連接挎春,則關(guān)閉原來(lái)的連接(如果存在)看疙,并重新連接
創(chuàng)建連接過(guò)程:1. 關(guān)閉可能存在的連接,重新創(chuàng)建連接直奋;2. 開(kāi)啟 ready 狀態(tài)判斷函數(shù)狼荞,超時(shí)則返回 Idle 狀態(tài);3. 開(kāi)啟心跳函數(shù)帮碰;4. 更新連接的參數(shù)并更新 tracker 中該連接的記錄
func (tc *trackedConn) tryconn(ctx context.Context, force bool) error {
// 對(duì)本連接加鎖相味,防止其他 goroutine 同時(shí)連接
tc.Lock()
defer tc.Unlock()
if !force && tc.conn != nil { // 其他的協(xié)程可能建立了連接,所以如果建立好了殉挽,則直接返回并復(fù)用
if tc.state == connectivity.Ready {
return nil
}
if tc.state == connectivity.Idle {
return errNoReady
}
}
/******************************下面是創(chuàng)建連接的部分**********************************/
// 如果有連接丰涉,則先關(guān)閉原來(lái)的連接
if tc.conn != nil { // close shutdown conn
tc.conn.Close()
}
// 根據(jù)設(shè)定的 dialF 函數(shù)創(chuàng)建連接
conn, err := tc.tracker.dial(tc.addr)
if err != nil {
return err
}
tc.conn = conn
// ready 超時(shí)上下文拓巧,設(shè)定超時(shí)時(shí)間,如果超時(shí)時(shí)間內(nèi)沒(méi)有變?yōu)?Ready 或者 Shutdown 狀態(tài)一死,則返回 Idle
readyCtx, cancel := context.WithTimeout(ctx, tc.tracker.checkReadyTimeout)
// 本函數(shù)結(jié)束并完成連接的 ready 后肛度,清除資源
defer cancel()
// 調(diào)動(dòng) ConnectionTracker.readyCheck 函數(shù)(可能是自定義 可能是默認(rèn))。默認(rèn)邏輯:如果狀態(tài)
// 是 Ready 或者 Shutdown投慈,則直接返回承耿,否則等待狀態(tài)的變化,超時(shí)則返回 Idle 狀態(tài)
checkStatus := tc.tracker.readyCheck(readyCtx, tc.conn)
// heartbeat 心跳上下文
hbCtx, hbCancel := context.WithCancel(ctx)
tc.cannel = hbCancel
// 開(kāi)啟心跳協(xié)程函數(shù)伪煤,每個(gè)心跳間隔加袋,探測(cè)一次,當(dāng)連接被 Shutdown 時(shí)或者ConnectionTracker的cannel被調(diào)用時(shí)抱既,tc.cannel 被調(diào)用职烧,從而退出心跳協(xié)程
go tc.heartbeat(hbCtx)
if checkStatus != connectivity.Ready {
return errNoReady
}
// 調(diào)用連接準(zhǔn)備好的函數(shù),更改連接狀態(tài)為 Ready防泵,更新超時(shí)時(shí)刻蚀之,重置 retry 參數(shù),并將該連接加入 ConnectionTracker.Alives 數(shù)組
tc.ready()
return nil
}
這里解決一個(gè)疑問(wèn):
當(dāng)?shù)谝淮握?qǐng)求 addr 連接捷泞,如果調(diào)用 GetConn() 足删,tc, ok := ct.connections[addr] 返回空,則創(chuàng)建連接锁右,此時(shí)連接狀態(tài)默認(rèn)為 0失受,即 Idle 狀態(tài)。同時(shí) tc.conn == nil
然后進(jìn)入 tryconn()骡湖,由于 tc.conn == nil 因此不會(huì)進(jìn)入“直接判斷連接狀態(tài)為 Idle 而返回 errNoReady ”的邏輯贱纠,而是進(jìn)入后面創(chuàng)建連接的邏輯
-
心跳檢測(cè)
每次新創(chuàng)建一個(gè)連接,會(huì)創(chuàng)建一個(gè)協(xié)程响蕴,開(kāi)啟這個(gè)連接的心跳檢測(cè)谆焊,根據(jù)檢測(cè)結(jié)果進(jìn)行狀態(tài)調(diào)整
被 Shutdown 的連接不會(huì)從映射表中刪除,在下次請(qǐng)求該 addr 連接的時(shí)候浦夷,會(huì)進(jìn)入 tryconn()辖试,重新創(chuàng)建一個(gè)連接,開(kāi)啟新連接的心跳函數(shù)
// 如果連接不是 Shutdown 則每隔 tracker.heartbeatInterval 時(shí)間進(jìn)行一次心跳檢測(cè)劈狐;如果父 ctx 被取消或者其他原因?qū)е逻B接的 ctx 被取消罐孝,則關(guān)閉連接,結(jié)束心跳
func (tc *trackedConn) heartbeat(ctx context.Context) {
ticker := time.NewTicker(tc.tracker.heartbeatInterval)
for tc.getState() != connectivity.Shutdown {
select {
case <-ctx.Done():
tc.shutdown()
break
case <-ticker.C:
tc.healthCheck(ctx)
}
}
}
// 檢測(cè)當(dāng)前連接的狀態(tài)肥缔,并返回莲兢,期間連接被鎖定,tryconn 和 healthCheck 不能同時(shí)進(jìn)行
func (tc *trackedConn) healthCheck(ctx context.Context) {
tc.Lock()
defer tc.Unlock()
ctx, cancel := context.WithTimeout(ctx, tc.tracker.checkReadyTimeout)
defer cancel()
// 如果是 Ready 或者 Shutdown 則直接返回,否則等待狀態(tài)轉(zhuǎn)變改艇,如果 checkReadyTimeout 超時(shí)收班,則返回 Idle
switch tc.tracker.readyCheck(ctx, tc.conn) {
case connectivity.Ready:
tc.ready()
case connectivity.Shutdown:
tc.shutdown()
case connectivity.Idle:
// 判斷是否連接 timeout 超時(shí),如果超時(shí)則關(guān)閉谒兄,否則置為 Idle狀態(tài)
if tc.expired() {
tc.shutdown()
} else {
tc.idle()
}
}
}
- 其他內(nèi)部函數(shù)
/**************************獲取狀態(tài)*****************************/
func (tc *trackedConn) getState() connectivity.State {
tc.RLock()
defer tc.RUnlock()
return tc.state
}
/**************************根據(jù)狀態(tài)進(jìn)行參數(shù)設(shè)定*****************************/
func (tc *trackedConn) ready() {
tc.state = connectivity.Ready
tc.expires = time.Now().Add(tc.tracker.timeout)
tc.retry = 0
tc.tracker.connReady(tc)
}
func (tc *trackedConn) idle() {
tc.state = connectivity.Idle
tc.retry++
tc.tracker.connUnReady(tc.addr)
}
func (tc *trackedConn) shutdown() {
tc.state = connectivity.Shutdown
tc.conn.Close()
tc.cannel()
tc.tracker.connUnReady(tc.addr)
}
func (tc *trackedConn) expired() bool {
return tc.expires.Before(time.Now())
}
Pool
pool 其實(shí)就是一個(gè) ConnectionTracker摔桦,因此在業(yè)務(wù)方 New ConnectionTracker 時(shí),業(yè)務(wù)方可以直接定義為 pool 變量名承疲,然后調(diào)用 pool 的 GetConn / Dial / Alives邻耕,如下示例:
/***********************以下使用方法其實(shí)是在用 ConnectionTracker **************************/
pool := New(dialF, opts)
conn := pool.GetConn(addr)
conn := pool.Dail(addr)
alives := pool.Alives()
/***********************以下使用方法才是在用 pool **************************/
conn := GetConn(addr)
conn := Dail(addr)
alives := Alives()
-
函數(shù)不多,全部列在一起
這里的 pool 是可以方便業(yè)務(wù)方直接使用 pool() 得到一個(gè)默認(rèn)的 ConnectionTracker燕鸽,直接開(kāi)始業(yè)務(wù)工作
直接調(diào)用 GetConn() / Dail() / Alives() 即可兄世,在第一次 pool() 函數(shù)被調(diào)用時(shí),創(chuàng)建一個(gè) ConnectionTracker绵咱,之后可以直接復(fù)用碘饼,不會(huì)重新創(chuàng)建 ConnectionTracker
func pool() *ConnectionTracker {
once.Do(func() {
defaultPool = New(dialF)
})
return defaultPool
}
// GetConn create or get an existing connection from default pool
func GetConn(addr string) (*grpc.ClientConn, error) {
return pool().GetConn(addr)
}
// Dial force to create new connection from default pool, this operation will close old connection!
func Dial(addr string) (*grpc.ClientConn, error) {
return pool().Dial(addr)
}
// Alives current live connections from default pool
func Alives() []string {
return pool().Alives()
}
以上就是整個(gè) grpcp 的源碼
開(kāi)發(fā)此工具的背景
- 當(dāng) server 面臨業(yè)務(wù)的多種請(qǐng)求時(shí)熙兔,需要將請(qǐng)求轉(zhuǎn)移給不同的物理機(jī)或者 BGP悲伶,那么如果所有的連接都加在 server 上,會(huì)導(dǎo)致連接數(shù)過(guò)多住涉,以及維護(hù)連接的開(kāi)銷(xiāo)很大麸锉,比如網(wǎng)絡(luò)不穩(wěn)定的時(shí)候,會(huì)使得傳輸效率極大下降
- 為了解決這個(gè)問(wèn)題舆声,利用 grpcp花沉,來(lái)維護(hù)連接狀態(tài),而業(yè)務(wù)方可以快速得到一個(gè)可用的 Ready 狀態(tài)的連接媳握,從而遞交數(shù)據(jù)包碱屁。
- 同時(shí),存在大量的連接時(shí)蛾找,可以分級(jí)娩脾,從而減少 server 的維護(hù)連接的壓力,分擔(dān)壓力給 agent
- server 和 每一個(gè) agent 都維護(hù)著自己到別人的連接打毛。
- 初始啟動(dòng)時(shí)柿赊,由于連接均未建立,則短時(shí)間會(huì)大量建立連接幻枉,之后則可以穩(wěn)定下來(lái)碰声,滿(mǎn)足連接復(fù)用的功能需求。
場(chǎng)景示例:
- 初始所有的連接都沒(méi)有熬甫。
- 業(yè)務(wù)方維護(hù)一個(gè)路由表胰挑,比如從 server -> agent-d,有兩條匹配規(guī)則:1. server -> agent-a -> agent-d 2. server -> agent-b -> agent-d
- 業(yè)務(wù)方根據(jù)優(yōu)先級(jí),優(yōu)先選取 1 規(guī)則瞻颂,從 server 取 agent-a 的連接脚粟,server 沒(méi)有,則創(chuàng)建連接蘸朋;然后得到連接核无,再向 agent-a 請(qǐng)求 agent-d 的連接,agent-a 創(chuàng)建連接藕坯,得到 conn 之后团南,發(fā)送數(shù)據(jù)包。
然后其他業(yè)務(wù)方也要發(fā)送數(shù)據(jù)到 agent-d炼彪,此時(shí)吐根,1 鏈路已經(jīng)被創(chuàng)建好,因此辐马,直接獲取得到連接拷橘,并發(fā)送包。 - 這時(shí)喜爷, agent-a 到 agent-d 連接開(kāi)始不穩(wěn)定冗疮,那么因?yàn)榇嬖谛奶B接狀態(tài)會(huì)被置為 Idle
- 另一個(gè)業(yè)務(wù)請(qǐng)求想要發(fā)送至 agent-d檩帐,請(qǐng)求連接术幔,由于 Idel 狀態(tài),直接能夠收返回 errNoReady湃密,因此開(kāi)始選用規(guī)則 2诅挑,server 創(chuàng)建 agent-b連接,agent-b 創(chuàng)建 agent-d 連接泛源,然后返回
- 其他業(yè)務(wù)請(qǐng)求時(shí)拔妥,也會(huì)復(fù)用 2 規(guī)則的鏈路
- 這時(shí),agent-a 到 agent-d 因?yàn)樾奶S護(hù)达箍,重新連接没龙,到達(dá) Ready 狀態(tài)
- 這時(shí)再來(lái)一個(gè)請(qǐng)求發(fā)送往 agent-d,就會(huì)直接匹配到規(guī)則 1幻梯,并發(fā)送兜畸。
注:
- 路由表是維護(hù)在業(yè)務(wù)方,由業(yè)務(wù)來(lái)定義路由的使用情況
- agent 做兩件事碘梢,如果是直連咬摇,數(shù)據(jù)是發(fā)給自己的,則直接處理煞躬;如果是轉(zhuǎn)發(fā)肛鹏,則查找相應(yīng)的連接并發(fā)送逸邦,或者創(chuàng)建連接并發(fā)送,或者失敗返回 err在扰,讓業(yè)務(wù)方迅速進(jìn)行下一條規(guī)則匹配并發(fā)送
其他資源補(bǔ)充
- grpc 的使用一般先定義 proto 文件(定義 rpc 接口 以及 rpc 的輸入結(jié)構(gòu)和傳輸數(shù)據(jù)的格式等)缕减,然后根據(jù) protoc-gen-go 工具來(lái)生成服務(wù)端和客戶(hù)端代碼(一個(gè)第三方的更快捷的包 golang/protobuf)
- grpc-go 連接語(yǔ)義和 API
- ↑ 大佬的 blog 主頁(yè)