1. 前言
轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!
源碼位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)
在前一篇文章 [istio源碼分析][pilot] pilot之DiscoveryServer 中已經(jīng)分析了
DiscoveryServer
拿到galley
和k8s
的數(shù)據(jù)后向每一個連接的client
端(其實是envoy
) 發(fā)送了一個XdsEvent
到client.pushChannel
.
本文將分析從
client
到discoveryServer
端之間的聯(lián)系. 主要是分析整體的流程, 關于細節(jié)方面的東西可以在具體問題中進行調(diào)試即可.
2. adsc
adsc
是istio
模擬的一個client
端, 真正的客戶端是sidecar
中的envoy
. 這里為了方便, 因此用adsc
進行分析.
// pkg/adsc/adsc.go
func (a *ADSC) Run() error {
var err error
if len(a.certDir) > 0 {
...
a.conn, err = grpc.Dial(a.url, opts...)
...
} else {
a.conn, err = grpc.Dial(a.url, grpc.WithInsecure())
...
}
// 建立連接
xds := ads.NewAggregatedDiscoveryServiceClient(a.conn)
edsstr, err := xds.StreamAggregatedResources(context.Background())
if err != nil {
return err
}
a.stream = edsstr
// 從discovery server接收信息
go a.handleRecv()
return nil
}
func (a *ADSC) handleRecv() {
for {
// 從server端獲得信息
msg, err := a.stream.Recv()
...
listeners := []*xdsapi.Listener{}
clusters := []*xdsapi.Cluster{}
routes := []*xdsapi.RouteConfiguration{}
eds := []*xdsapi.ClusterLoadAssignment{}
// 為獲得的數(shù)據(jù)分類
for _, rsc := range msg.Resources { // Any
a.VersionInfo[rsc.TypeUrl] = msg.VersionInfo
valBytes := rsc.Value
if rsc.TypeUrl == listenerType {
ll := &xdsapi.Listener{}
_ = proto.Unmarshal(valBytes, ll)
listeners = append(listeners, ll)
} ...
}
...
// 向server端發(fā)送ACK
a.ack(msg)
...
// 客戶端處理listener, cluster, endpoint, route
...
}
}
func (a *ADSC) ack(msg *xdsapi.DiscoveryResponse) {
_ = a.stream.Send(&xdsapi.DiscoveryRequest{
ResponseNonce: msg.Nonce,
TypeUrl: msg.TypeUrl,
Node: a.node(),
VersionInfo: msg.VersionInfo,
})
}
1. 與
discoveryServer
建立連接.
2. 通過handleRecv
方法與server
端交流.
3. 將獲得的數(shù)據(jù)分類(endpoint, listener, cluster, route
)
4. 往server
端發(fā)送ack
.
5. 客戶端自己處理數(shù)據(jù).
那
envoy
就是數(shù)據(jù)生成的配置文件了,xds
協(xié)議.
3. server端
// StreamAggregatedResources implements the ADS interface.
func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
peerInfo, ok := peer.FromContext(stream.Context())
peerAddr := "0.0.0.0"
if ok {
peerAddr = peerInfo.Addr.String()
}
t0 := time.Now()
err := s.globalPushContext().InitContext(s.Env)
...
// 創(chuàng)建一個XdsConnection
con := newXdsConnection(peerAddr, stream)
var receiveError error
reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
// 接收con對應的client的請求
go receiveThread(con, reqChannel, &receiveError)
node := &core.Node{}
for {
// Block until either a request is received or a push is triggered.
select {
case discReq, ok := <-reqChannel:
if !ok {
// Remote side closed connection.
return receiveError
}
// This should be only set for the first request. Guard with ID check regardless.
if discReq.Node != nil && discReq.Node.Id != "" {
node = discReq.Node
err = s.initConnectionNode(discReq.Node, con)
if err != nil {
return err
}
}
switch discReq.TypeUrl {
case ClusterType:
...
case ListenerType:
...
case RouteType:
...
case EndpointType:
...
default:
adsLog.Warnf("ADS: Unknown watched resources %s", discReq.String())
}
con.mu.Lock()
if !con.added {
con.added = true
con.mu.Unlock()
// 添加到xdsclient中
s.addCon(con.ConID, con)
defer s.removeCon(con.ConID, con)
} else {
con.mu.Unlock()
}
case pushEv := <-con.pushChannel:
err := s.pushConnection(con, pushEv)
pushEv.done()
if err != nil {
return nil
}
}
}
}
可以看到
select
中有兩個case
.
1.discReq, ok := <-reqChannel:
是從client
來的,receiveThread
方法會看到.
2.pushEv := <-con.pushChannel:
是從k8s
和galley
中來的, 具體可以參考上文 [istio源碼分析][pilot] pilot之DiscoveryServer .
3.1 第一個分支
這里先看一下第一個分支的操作.
receiveThread
func receiveThread(con *XdsConnection, reqChannel chan *xdsapi.DiscoveryRequest, errP *error) {
defer close(reqChannel) // indicates close of the remote side.
for {
// 從client端接收信息
req, err := con.stream.Recv()
...
select {
// 將req轉(zhuǎn)到reqChannel中
case reqChannel <- req:
case <-con.stream.Context().Done():
adsLog.Errorf("ADS: %q %s terminated with stream closed", con.PeerAddr, con.ConID)
return
}
}
}
1. 從
client
端接收的req
直接放入到reqChannel
中, 所以會進入到StreamAggregatedResources
中的第一個分支.
2. 以ClusterType
為例:
case ClusterType:
if con.CDSWatch {
// Already received a cluster watch request, this is an ACK
if discReq.ErrorDetail != nil {
adsLog.Warnf("ADS:CDS: ACK ERROR %v %s (%s) %v", peerAddr, con.ConID, con.modelNode.ID, discReq.String())
errCode := codes.Code(discReq.ErrorDetail.Code)
incrementXDSRejects(cdsReject, node.Id, errCode.String())
} else if discReq.ResponseNonce != "" {
con.ClusterNonceAcked = discReq.ResponseNonce
}
adsLog.Debugf("ADS:CDS: ACK %s %s (%s) %s %s", peerAddr, con.ConID, con.modelNode.ID, discReq.VersionInfo, discReq.ResponseNonce)
continue
}
// CDS REQ is the first request an envoy makes. This shows up
// immediately after connect. It is followed by EDS REQ as
// soon as the CDS push is returned.
adsLog.Infof("ADS:CDS: REQ %v %s %v version:%s", peerAddr, con.ConID, time.Since(t0), discReq.VersionInfo)
con.CDSWatch = true
err := s.pushCds(con, s.globalPushContext(), versionInfo())
if err != nil {
return err
}
1. 可以看到除了是第一次會調(diào)用
pushCds
方法, 后面都是打印ACK/NACK
信息. 通過一個變量con.CDSWatch
進行控制.
2. 另外通過s.addCon(con.ConID, con)
將此連接加入到adsClients
, 這個在startPush
方法需要分發(fā)給所有的client
端的時候用到的. 具體可以參考上文 [istio源碼分析][pilot] pilot之DiscoveryServer .
pushCds
// pilot/pkg/proxy/envoy/v2/cds.go
func (s *DiscoveryServer) pushCds(con *XdsConnection, push *model.PushContext, version string) error {
pushStart := time.Now()
// 發(fā)送給該envoy (con.modelNode)
// 內(nèi)容在push中
// 根據(jù)client信息和內(nèi)容生成要發(fā)送的clusters集合
rawClusters := s.generateRawClusters(con.modelNode, push)
...
// 構(gòu)造response并發(fā)送給該client(envoy)
response := con.clusters(rawClusters)
err := con.send(response)
...
return nil
}
func (s *DiscoveryServer) generateRawClusters(node *model.Proxy, push *model.PushContext) []*xdsapi.Cluster {
rawClusters := s.ConfigGenerator.BuildClusters(s.Env, node, push)
...
return rawClusters
}
1.
client
端(envoy
)信息是con.modelNode
, 內(nèi)容為push
, 根據(jù)此兩個信息傳入到generateRawClusters
中生成cluster
集合. 實際上是通過s.ConfigGenerator.BuildClusters
方法,
type ConfigGenerator interface {
BuildListeners(env *model.Environment, node *model.Proxy, push *model.PushContext) []*v2.Listener
BuildClusters(env *model.Environment, node *model.Proxy, push *model.PushContext) []*v2.Cluster
BuildHTTPRoutes(env *model.Environment, node *model.Proxy, push *model.PushContext, routeNames []string) []*v2.RouteConfiguration
}
這個部分的內(nèi)容就是根據(jù)當前的內(nèi)容生成
envoy
所接受的cluster
,endpoint
和route
.
2. 向
client
端(envoy
)發(fā)送數(shù)據(jù).
3.2 第二個分支
這里分析
pushEv := <-con.pushChannel:
, 這個分支是從configController
和ServiceController
過來的, 具體可以參考上文 [istio源碼分析][pilot] pilot之DiscoveryServer .
func (s *DiscoveryServer) pushConnection(con *XdsConnection, pushEv *XdsEvent) error {
...
if con.CDSWatch {
err := s.pushCds(con, pushEv.push, currentVersion)
...
}
if len(con.Clusters) > 0 {
err := s.pushEds(pushEv.push, con, currentVersion, nil)
...
}
if con.LDSWatch {
err := s.pushLds(con, pushEv.push, currentVersion)
...
}
if len(con.Routes) > 0 {
err := s.pushRoute(con, pushEv.push, currentVersion)
...
}
...
return nil
}
很多細節(jié)部分省略了很多, 可以看到最終是往此
client
端發(fā)送cluster
,endpoint
,route
和listener
.
4. 總結(jié)
5. 參考
1.
istio 1.3.6源碼