1. 前言
轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!
源碼位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)
前面兩篇文章 [istio源碼分析][citadel] citadel之istio_ca 和 [istio源碼分析][citadel] citadel之istio_ca(grpc server) 分析了
istio_ca
的serviceaccount controller
和 自定義簽名, 以及istio_ca
提供的一個grpc server
服務(wù).
本文將分析
node_agent_k8s
組件的一個使用場景ingressgateway sds
.
2. 例子
例子參考官網(wǎng)的 Secure Gateways(SDS) .
創(chuàng)建完mygateway
和httpbin-credential
之后. 運行一下腳本查看配置文件:
podname=istio-ingressgateway-85bb5b4c57-l2pcz
ns=istio-system
rm lds.json rds.json cds.json eds.json sds.json
istioctl proxy-config listener $podname -n $ns -o json > lds.json
istioctl proxy-config route $podname -n $ns -o json > rds.json
istioctl proxy-config cluster $podname -n $ns -o json > cds.json
istioctl proxy-config endpoint $podname -n $ns -o json > eds.json
istioctl proxy-config secret $podname -n $ns -o json > sds.json
查看
lds.json
{
"name": "0.0.0.0_443",
"address": {
"socketAddress": {
"address": "0.0.0.0",
"portValue": 443
}
},
"filterChains": [
{
"filterChainMatch": {
"serverNames": [
"httpbin.example.com"
]
},
"tlsContext": {
"commonTlsContext": {
"tlsCertificateSdsSecretConfigs": [
{
"name": "httpbin-credential",
"sdsConfig": {
"apiConfigSource": {
"apiType": "GRPC",
"grpcServices": [
{
"googleGrpc": {
"targetUri": "unix:/var/run/ingress_gateway/sds",
"statPrefix": "sdsstat"
}
}
]
}
}
}
],
...
},
"requireClientCertificate": false
},
...
}
然后查看
sds.json
{
"dynamicActiveSecrets": [
{
"name": "httpbin-credential",
"versionInfo": "2020-02-07 06:35:36.286120317 +0000 UTC m=+69968.244929345",
"lastUpdated": "2020-02-07T06:35:36.287Z",
"secret": {
"name": "httpbin-credential",
"tlsCertificate": {
"certificateChain": {
"inlineBytes": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUZVekNDQXp1Z0F3SUJBZ0lERUFJU01BMEdDU3FHU0liM0RRRUJDd1VBTUVveEN6QUpCZ05WQkFZVEFsVlQKTVE4d0RRWURWUVFJREFaRVpXNXBZV3d4RERBS0JnTlZCQW9NQTBScGN6RWNNQm9HQTFVRUF3d1RhSFIwY0dKcApiaTVsZUdGdGNHeGxMbU52YlRBZUZ3MHlNREF5TURjd05UUTRNRFZhRncweU1UQXlNVFl3TlRRNE1EVmFNR0F4CkN6QUpCZ05WQkFZVEFsVlRNUTh3RFFZRFZRUUlEQVpFWlc1cFlXd3hGREFTQmdOVkJBY01DMU53Y21sdVoyWnAKWld4a01Rd3dDZ1lEVlFRS0RBTkVhWE14SERBYUJnTlZCQU1NRTJoMGRIQmlhVzR1WlhoaGJYQnNaUzVqYjIwdwpnZ0VpTUEwR0NTcUdTSWIzRFFFQkFRVUFBNElCRHdBd2dnRUtBb0lCQVFDcTVySC9CYnZwZXlacFNmdUU2d09TCi83ZDFIYU1Dek1mQ3E2NmxyYmZVSUhueG5xd0kzemh0OExMVzU1OTVKNk1wZ1FGdEZoNFA4KzhkQVF1TkxQa2IKNkpTZjdIOFpCWnY1NlRKS2pvNEYrVG1aTTFHTExmMzdBZXBsSnQwandFMUxXK3BmODliWHhvYVVSMHg5K2o5ZQpObncyK3RjUEdHSFZNdndEWVVzbGYyM3Z1RnpKckpFZWpudWRTK0FSTTRTL1krY1IreXF3aDVueTJRcjFZS3E4CkVNeWNyS0NGT3JpaElCT3Y1bERRSmRya05ZMFdMRG5QZWNIYTlvd0Y1Nk5BSnJhM2dGQWJuTHpiZ2xOa2NWWjEKTC9rUjF0NGMzV3FURHJhRXRwRHpBTXlMT0RHWkN1N1JBaGdCM2g2b1dkVW9KSGZ3N0hUWXltWWY1VHVtU3F3dApBZ01CQUFHamdnRXFNSUlCSmpBSkJnTlZIUk1FQWpBQU1CRUdDV0NHU0FHRytFSUJBUVFFQXdJR1FEQXpCZ2xnCmhrZ0JodmhDQVEwRUpoWWtUM0JsYmxOVFRDQkhaVzVsY21GMFpXUWdVMlZ5ZG1WeUlFTmxjblJwWm1sallYUmwKTUIwR0ExVWREZ1FXQkJRYmxqaDJaRlZwbWMxRTkzQnpLZS9NdjFMSnNEQ0JqQVlEVlIwakJJR0VNSUdCZ0JRVQpoYzdkdVdrbkdsRUYrdEN4RXFqWU1VNVZBYUZrcEdJd1lERUxNQWtHQTFVRUJoTUNWVk14RHpBTkJnTlZCQWdNCkJrUmxibWxoYkRFVU1CSUdBMVVFQnd3TFUzQnlhVzVuWm1sbGJHUXhEREFLQmdOVkJBb01BMFJwY3pFY01Cb0cKQTFVRUF3d1RhSFIwY0dKcGJpNWxlR0Z0Y0d4bExtTnZiWUlERUFJU01BNEdBMVVkRHdFQi93UUVBd0lGb0RBVApCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBVEFOQmdrcWhraUc5dzBCQVFzRkFBT0NBZ0VBZlVQSjJGYk9vY3h1CmFFQ0tBbVNoeTB4eWJ1RXBCeXV4UGdVWkhnaG1wRmluNVJwUmJoOGMxcTlwd3duVGthcGEyb2lscTBqSmMrZmcKL3Q0TnFVYTZER2RHOHAxZnNlcnB5eXNaQlVXU1JNMktJOWJWTVpzNmNValVIekJ2MVZFekxKdmdUT0k4QjRjUwpyak5jcUc3TXcyVkNRMDl4TE00MVQrOC91R0FkT1IvSEpQOGNKdUp1L0huTjFyOFU5N1JqUnRjMUlHMXlOMlowClBMdGpCd1pGdXpBVGlwN0lnWDFqKzZkaXdiV3VFZjQ4bWNuQ3BNbExjQ2Y1S1o3Z0gzejEwWkcvcFoxRnNURHgKeWlBZndHZDUyMzNmMk5xL1Q3dXIxbWxWY1prSy8zc2QrMjgxRUU1cWpqY05GVDdWY2hDL21FYVNHK2F5UGpJdApLNzdBenlzcnNnOGlyMHhLU2VuSVF2NHRHbytHL09aNzhtSU4vWmQreEVOVnNDazVRN3NQNFloYkZKKzc4WFIvCkJNWTNFaUNEbXlZMm1sbytlL2hjN0ZIM3Uxb3VJc0s0UnJrLzJSRWtYSCtsUk1RYjNCbzJhYkFrY09RaU93ZHkKb2lvVnFOOVZUdkEwMDh6eEp6Mm11Qmp5MzJobnYyRUV4eDRLMUplKzVtZ3lwYkp4MnNOQ2xoMCtTTDE0OWVkcApPMUt3bXNsdGhLTEZhT2RrMEF4b2dxVEpnbjZpYUsrRFRFRk9IMjIxSzUwQld5ZTUvbDJsZHozL0hXd3VMZlMzCndlN0h6dWxIRGV5aXdJdCtqVkhUTkF0Z0VLOGMyTFJkTS9xR3lwTEwrN3AxdjhZYUdJeG1Hc3pYRnFxOGdhREwKZG5IWEY4U3czT2NSRUhxVlA4ai9sbTlXWFY1QVRKUT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo="
},
"privateKey": {
"inlineString": "[redacted]"
}
}
}
}
]
}
3. 分析
查看
istio-system/ingressgateway
的詳細pod
信息, 可以看到其運行參數(shù)如下:
apiVersion: apps/v1
kind: Deployment
...
name: istio-ingressgateway
...
spec:
...
containers:
- env:
- name: ENABLE_WORKLOAD_SDS
value: "false"
- name: ENABLE_INGRESS_GATEWAY_SDS
value: "true"
- name: INGRESS_GATEWAY_NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
image: docker.io/istio/node-agent-k8s:1.4.3
name: ingress-sds
...
volumeMounts:
- mountPath: /var/run/ingress_gateway
name: ingressgatewaysdsudspath
...
volumes:
- emptyDir: {}
name: ingressgatewaysdsudspath
可以看一下兩個參數(shù)的意義是什么
// The workload SDS mode allows node agent to provision credentials to workload proxy by sending
// CSR to CA.
enableWorkloadSDS = "ENABLE_WORKLOAD_SDS"
enableWorkloadSDSFlag = "enableWorkloadSDS"
// The ingress gateway SDS mode allows node agent to provision credentials to ingress gateway
// proxy by watching kubernetes secrets.
enableIngressGatewaySDS = "ENABLE_INGRESS_GATEWAY_SDS"
enableIngressGatewaySDSFlag = "enableIngressGatewaySDS"
1.
enableWorkloadSDS
是表示agent
可以通過向CA
發(fā)送CSR
獲得簽名從而給workload
身份(key and cert
). 簡明的意思是自己生成key
, 然后向citadel
發(fā)送簽名請求獲得簽名證書.
2.enableIngressGatewaySDS
是表示agent
可以通過監(jiān)控secret
從而給ingress gateway
身份(key and cert
). 簡明意思是key and cert
需要從k8s
中獲得.
本文分析
enableIngressGatewaySDS
的情況.
3. sds service
此處和 [istio源碼分析][pilot] pilot之a(chǎn)ds 中內(nèi)容類似.
有一個測試的客戶端
security/pkg/testing/sdsc/sdsclient.go
, 所以這里就以客戶端為切入點.
// security/pkg/testing/sdsc/sdsclient.go
func constructSDSRequestContext() (context.Context, error) {
// Read from the designated location for Kubernetes JWT.
content, err := ioutil.ReadFile(authn_model.K8sSATrustworthyJwtFileName)
...
// 注意是/var/run/secrets/tokens/istio-token里的內(nèi)容
md := metadata.New(map[string]string{
authn_model.K8sSAJwtTokenHeaderKey: string(content),
})
return metadata.NewOutgoingContext(context.Background(), md), nil
}
func NewClient(opt ClientOptions) (*Client, error) {
conn, err := grpc.Dial(opt.ServerAddress, grpc.WithInsecure())
...
client := sds.NewSecretDiscoveryServiceClient(conn)
ctx, err := constructSDSRequestContext()
...
stream, err := client.StreamSecrets(ctx)
...
return &Client{
stream: stream,
conn: conn,
updateChan: make(chan xdsapi.DiscoveryResponse, 1),
serverAddress: opt.ServerAddress,
}, nil
}
看
server
端:
func (s *sdsservice) StreamSecrets(stream sds.SecretDiscoveryService_StreamSecretsServer) error {
...
reqChannel := make(chan *xdsapi.DiscoveryRequest, 1)
con := newSDSConnection(stream)
// 從客戶端接收信息
go receiveThread(con, reqChannel, &receiveError)
var node *core.Node
for {
// Block until a request is received.
select {
case discReq, ok := <-reqChannel:
...
if con.conID == "" {
// first request
...
// 添加到sdsclient
addConn(key, con)
}
...
// 如果nodeagent的cache secret可以匹配請求中的內(nèi)容<token, resourceName, Version> 那表明這是一個ACK請求
if discReq.VersionInfo != "" && s.st.SecretExist(conID, resourceName, token, discReq.VersionInfo) {
sdsServiceLog.Debugf("%s received SDS ACK from proxy %q, version info %q, "+
"error details %s\n", conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo,
discReq.ErrorDetail.GoString())
continue
}
...
// 在ingress gateway agent模式, 如果第一個sds請求已經(jīng)收到但是k8s中還沒有對應(yīng)的secret, 在發(fā)送response之前先等待一下這個secret
if s.st.ShouldWaitForIngressGatewaySecret(conID, resourceName, token) {
...
continue
}
// 獲取secret 如果是ingress gateway agent模式, 那該secret是從k8s中獲得
// 如果不是 則自己生成并請求簽名
secret, err := s.st.GenerateSecret(ctx, conID, resourceName, token)
...
con.mutex.Lock()
con.secret = secret
con.mutex.Unlock()
// 向客戶端發(fā)送response
if err := pushSDS(con); err != nil {
...
return err
}
case <-con.pushChannel:
// server端主動向client端push的信號
...
// 向客戶端發(fā)送response
if err := pushSDS(con); err != nil {
...
return err
}
}
}
}
這里主要有兩個分支:
1.<-reqChannel:
這里的來源是通過receiveThread
得到client
端的請求并放入到reqChannel
中.1.1 如果是第一次請求, 則通過
addConn
方法加入到sdsclient
中.
1.2 如果nodeagent
的cache secret
可以匹配請求中的內(nèi)容<token, resourceName, Version>
那表明這是一個ACK
請求.
1.3 在ingress gateway agent
模式, 如果sds
請求已經(jīng)收到但是k8s
中還沒有對應(yīng)的secret
或者被刪除, 在發(fā)送response
給client
端之前先等待一下這個secret
被創(chuàng)建.
1.4 通過s.st.GenerateSecret
獲得對應(yīng)的secret
, 如果是ingress gateway agent
模式, 那該secret
是從k8s
中獲得. 如果不是, 則自己生成并請求簽名.
1.5 調(diào)用pushSDS
向客戶端發(fā)送response
.
2.
<-con.pushChannel:
代表的是server
端主動向client
端push
的信號, 然后直接通過pushSDS
發(fā)送當(dāng)前存在在con
里的內(nèi)容.
func NotifyProxy(connKey cache.ConnKey, secret *model.SecretItem) error {
conIDresourceNamePrefix := sdsLogPrefix(connKey.ConnectionID, connKey.ResourceName)
sdsClientsMutex.Lock()
conn := sdsClients[connKey]
...
conn.secret = secret
conn.pushChannel <- &sdsEvent{}
return nil
}
可以看到
NotifyProxy
方法傳入了secret
, 所以上游可以根據(jù)secret
的變化來調(diào)用NotifyProxy
方法來主動push
信息到client
端.
4. secretFetcher 和 secretCache
4.1 secretFetcher
func NewSecretFetcher(ingressGatewayAgent bool, endpoint, caProviderName string, tlsFlag bool,
tlsRootCert []byte, vaultAddr, vaultRole, vaultAuthPath, vaultSignCsrPath string) (*SecretFetcher, error) {
ret := &SecretFetcher{}
if ingressGatewayAgent {
// 如果是ingress gateway模式, 則監(jiān)控k8s中的secret并從中獲取信息
ret.UseCaClient = false
cs, err := kube.CreateClientset("", "")
...
ret.FallbackSecretName = ingressFallbackSecret
secretFetcherLog.Debugf("SecretFetcher set fallback secret name %s", ret.FallbackSecretName)
ret.InitWithKubeClient(cs.CoreV1())
} else {
// 如果是workload agent模式, 則創(chuàng)建ca client 從citadel中獲得簽名證書等
caClient, err := ca.NewCAClient(endpoint, caProviderName, tlsFlag, tlsRootCert,
vaultAddr, vaultRole, vaultAuthPath, vaultSignCsrPath)
...
ret.UseCaClient = true
ret.CaClient = caClient
}
return ret, nil
}
func (sf *SecretFetcher) InitWithKubeClient(core corev1.CoreV1Interface) { // nolint:interfacer
...
sf.scrtStore, sf.scrtController =
cache.NewInformer(scrtLW, &v1.Secret{}, resyncPeriod, cache.ResourceEventHandlerFuncs{
AddFunc: sf.scrtAdded,
DeleteFunc: sf.scrtDeleted,
UpdateFunc: sf.scrtUpdated,
})
...
}
scrtAdded
,scrtDeleted
和scrtUpdated
在獲取secret
的key and cert
信息后會通過sf.AddCache
,sf.DeleteCache
和sf.UpdateCache
來保存到cache
中.
var (
...
rootCmd = &cobra.Command{
Use: "nodeagent",
Short: "Citadel agent",
RunE: func(c *cobra.Command, args []string) error {
...
workloadSecretCache, gatewaySecretCache := newSecretCache(serverOptions)
...
server, err := sds.NewServer(serverOptions, workloadSecretCache, gatewaySecretCache)
defer server.Stop()
...
},
}
)
func newSecretCache(serverOptions sds.Options) (workloadSecretCache, gatewaySecretCache *cache.SecretCache) {
...
if serverOptions.EnableIngressGatewaySDS {
gSecretFetcher, err := secretfetcher.NewSecretFetcher(true, "", "", false, nil, "", "", "", "")
...
gatewaySecretChan = make(chan struct{})
gSecretFetcher.Run(gatewaySecretChan)
gatewaySecretCache = cache.NewSecretCache(gSecretFetcher, sds.NotifyProxy, gatewaySdsCacheOptions)
} else {
gatewaySecretCache = nil
}
return workloadSecretCache, gatewaySecretCache
}
func NewSecretCache(fetcher *secretfetcher.SecretFetcher, notifyCb func(ConnKey, *model.SecretItem) error, options Options) *SecretCache {
...
fetcher.AddCache = ret.UpdateK8sSecret
fetcher.DeleteCache = ret.DeleteK8sSecret
fetcher.UpdateCache = ret.UpdateK8sSecret
...
return ret
}
func (sc *SecretCache) UpdateK8sSecret(secretName string, ns model.SecretItem) {
...
sc.secrets.Range(func(k interface{}, v interface{}) bool {
...
if connKey.ResourceName == secretName {
...
go func() {
...
sc.callbackWithTimeout(connKey, newSecret)
}()
return false
}
return true
})
...
}
func (sc *SecretCache) callbackWithTimeout(connKey ConnKey, secret *model.SecretItem) {
...
go func() {
...
if sc.notifyCallback != nil {
if err := sc.notifyCallback(connKey, secret); err != nil {
...
}
} ...
}()
select {
...
}
}
k8s-apiserver
->SecretFetcher.scrtAdded
->SecretCache.UpdateK8sSecret(SecretFetcher.AddCache)
->sc.callbackWithTimeout
->sc.notifyCallback(NotifyProxy)
.