1. 前言
轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!
源碼位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)
本文將繼續(xù)分析
pilot
中的內(nèi)容, 將分析pilot-agent
內(nèi)容, 該pilot-agent
將負責envoy
的生命周期.
2. 初識pilot-agent
[root@master ~]# kubectl get pods
NAME READY STATUS RESTARTS AGE
productpage-v1-8554d58bff-d7j8d 2/2 Running 2 13d
...
[root@master ~]# kubectl exec -it productpage-v1-8554d58bff-d7j8d -c istio-proxy bash
istio-proxy@productpage-v1-8554d58bff-d7j8d:/$ ps -ef
UID PID PPID C STIME TTY TIME CMD
istio-p+ 1 0 0 Feb02 ? 00:00:11 /usr/local/bin/pilot-agent proxy sidecar --domain default.svc.cluster.local --configPath /etc/istio/proxy --binaryPath /u
istio-p+ 11 1 0 Feb02 ? 00:00:36 /usr/local/bin/envoy -c /etc/istio/proxy/envoy-rev0.json --restart-epoch 0 --drain-time-s 45 --parent-shutdown-time-s 60
istio-p+ 24 0 0 05:53 pts/0 00:00:00 bash
istio-p+ 33 24 0 05:53 pts/0 00:00:00 ps -ef
istio-proxy@productpage-v1-8554d58bff-d7j8d:/$
1. 可以看到注入的容器
istio-proxy
的1
號進程是命令/usr/local/bin/pilot-agent proxy sidecar ...
執(zhí)行的.
2. 另外可以看到envoy
程序的父進程是1
號進程, 也就是說pilot-agent
進程負責啟動envoy
進程, 并且envoy
啟動的配置文件在/etc/istio/proxy/envoy-rev0.json
.
istio-proxy@productpage-v1-8554d58bff-d7j8d:/$ cat /etc/istio/proxy/envoy-rev0.json
{
"node": {
"id": "sidecar~10.0.12.9~productpage-v1-8554d58bff-d7j8d.default~default.svc.cluster.local",
"cluster": "productpage.default",
...
},
...
"dynamic_resources": {
"lds_config": {
"ads": {}
},
"cds_config": {
"ads": {}
},
"ads_config": {
"api_type": "GRPC",
"grpc_services": [
{
"envoy_grpc": {
"cluster_name": "xds-grpc"
}
}
]
}
},
"static_resources": {
"clusters": [
...
{
"name": "xds-grpc",
"type": "STRICT_DNS",
...
"hosts": [
{
"socket_address": {"address": "172.31.71.181", "port_value": 15010}
}
],
...
}
istio-proxy@productpage-v1-8554d58bff-d7j8d:/$
可以看到
xds
的配置為172.31.71.181:15010
, 是pilot
的地址.
3. pilot-agent
// pilot/cmd/pilot-agent/main.go
var (
...
rootCmd = &cobra.Command{
Use: "pilot-agent",
...
}
proxyCmd = &cobra.Command{
Use: "proxy",
...
RunE: func(c *cobra.Command, args []string) error {
...
tlsCertsToWatch = []string{
tlsServerCertChain, tlsServerKey, tlsServerRootCert,
tlsClientCertChain, tlsClientKey, tlsClientRootCert,
}
// envoy的配置信息
proxyConfig := mesh.DefaultProxyConfig()
...
// 生成envoy
envoyProxy := envoy.NewProxy(proxyConfig, role.ServiceNode(), proxyLogLevel, proxyComponentLogLevel, pilotSAN, role.IPAddresses, dnsRefreshRate, opts)
// 生成agent
agent := proxy.NewAgent(envoyProxy, proxy.DefaultRetry, features.TerminationDrainDuration())
// watch
watcher := envoy.NewWatcher(tlsCertsToWatch, agent.ConfigCh())
go waitForCompletion(ctx, agent.Run)
go waitForCompletion(ctx, watcher.Run)
cmd.WaitSignal(make(chan struct{}))
return nil
},
}
)
1.
envoy
的配置信息proxyConfig
.
2.envoyProxy
是運行envoy
的一個代理.
3.agent
負責envoy
的生命周期.
4.watcher
監(jiān)控文件certs
的變化.
5. 運行agent
和watcher
.
6. 等待``agent```結(jié)束.
3.1 agent
理解了
agent
, 對整個結(jié)構(gòu)就理解了.
// pilot/pkg/proxy/agent.go
type Agent interface {
// 返回一個config channel用于發(fā)送文件更新
// agent會與當前config比較來決定是否需要重啟envoy, 如果啟動失敗會以exponential back-off形式重試
ConfigCh() chan<- interface{}
Run(ctx context.Context)
}
type Proxy interface {
// 運行 傳入config, epoch, abort channel
Run(interface{}, int, <-chan error) error
// 清理某個epoch的信息
Cleanup(int)
// 重試了所有的次數(shù)后還無法成功 panic該epoch
Panic(interface{})
}
1.
Proxy
有三個方法, 分別需要實現(xiàn)Run
,Cleanup
和Panic
方法.1.1
Run
中傳入三個參數(shù), 第一個是proxy
運行的時候的config
文件. 第二個是epoch
, 相當于版本號. 第三個是傳入的一個channel
, 外部可以通過該channel
殺死proxy
.
1.2Cleanup
傳入的是epoch
, 對該版本做一些清理工作. 比如envoy
刪除對應的配置文件, 比如傳入0
, 就把/etc/istio/proxy/envoy-rev0.json
文件刪除.
2.
Agent
有兩個方法.2.1
ConfigCh
返回一個config channel用于發(fā)送文件更新.agent
會與當前config
比較來決定是否需要重啟envoy
, 如果啟動失敗會以exponential back-off
形式重試.
2.2Run
啟動agent
.
3.2 agent
agent
是Agent
的一個實現(xiàn)體.
// pilot/pkg/proxy/agent.go
func NewAgent(proxy Proxy, retry Retry, terminationDrainDuration time.Duration) Agent {
return &agent{
// 要運行的proxy
proxy: proxy,
retry: retry,
// 每個版本對應的config
epochs: make(map[int]interface{}),
configCh: make(chan interface{}),
statusCh: make(chan exitStatus),
// 每個版本對應的退出channel
abortCh: make(map[int]chan error),
terminationDrainDuration: terminationDrainDuration,
}
}
func (a *agent) ConfigCh() chan<- interface{} {
return a.configCh
}
reconcile
先了解一下
reconcile
再看Run
方法.
// cancel any scheduled restart
a.retry.restart = nil
log.Infof("Reconciling retry (budget %d)", a.retry.budget)
// check that the config is current
// 與當前config比較
if reflect.DeepEqual(a.desiredConfig, a.currentConfig) {
log.Infof("Desired configuration is already applied")
return
}
// discover and increment the latest running epoch
// 增加一個版本號
epoch := a.latestEpoch() + 1
// buffer aborts to prevent blocking on failing proxy
abortCh := make(chan error, maxAborts)
// 當前版本對應的config和abort channel
a.epochs[epoch] = a.desiredConfig
a.abortCh[epoch] = abortCh
// 更新當前agent的config
a.currentConfig = a.desiredConfig
go a.runWait(a.desiredConfig, epoch, abortCh)
}
func (a *agent) runWait(config interface{}, epoch int, abortCh <-chan error) {
log.Infof("Epoch %d starting", epoch)
// 同步運行proxy 返回結(jié)果為err
err := a.proxy.Run(config, epoch, abortCh)
// envoy proxy 運行完將結(jié)果組裝成一個exitStatus 發(fā)送到 a.statusCh
a.statusCh <- exitStatus{epoch: epoch, err: err}
}
1. 增加一個版本號
epoch
.
2. 添加新版本對應的config和abort channel.
3. 更新當前agent
的currentConfig
, 新版本的config
是該agent
要運行的內(nèi)容.
4.runWait
是個同步操作, 會等到envoy proxy
后返回結(jié)果了, 才會把結(jié)果寫入到a.statusCh
這個channel
中.
Run
Run
方法
func (a *agent) Run(ctx context.Context) {
...
for {
...
select {
// 有文件更新
case config := <-a.configCh:
if !reflect.DeepEqual(a.desiredConfig, config) {
// 如果有新文件
// 更新a.desiredConfig
a.desiredConfig = config
...
a.reconcile()
}
// envoy proxy運行結(jié)束
case status := <-a.statusCh:
// 刪除該版本內(nèi)存中內(nèi)容
delete(a.epochs, status.epoch)
delete(a.abortCh, status.epoch)
a.currentConfig = a.epochs[a.latestEpoch()]
if status.err == errAbort {
log.Infof("Epoch %d aborted", status.epoch)
} else if status.err != nil {
log.Warnf("Epoch %d terminated with an error: %v", status.epoch, status.err)
a.abortAll()
} else {
log.Infof("Epoch %d exited normally", status.epoch)
}
// cleanup for the epoch
// 為當前版本做清理工作 因為該版本的envoy proxy已經(jīng)運行結(jié)束
a.proxy.Cleanup(status.epoch)
if status.err != nil {
// skip retrying twice by checking retry restart delay
if a.retry.restart == nil {
if a.retry.budget > 0 {
delayDuration := a.retry.InitialInterval * (1 << uint(a.retry.MaxRetries-a.retry.budget))
restart := time.Now().Add(delayDuration)
a.retry.restart = &restart
a.retry.budget--
log.Infof("Epoch %d: set retry delay to %v, budget to %d", status.epoch, delayDuration, a.retry.budget)
} else {
log.Error("Permanent error: budget exhausted trying to fulfill the desired configuration")
// 已經(jīng)試過所有的次數(shù) panic該版本并且agent整體退出
a.proxy.Panic(status.epoch)
return
}
} else {
log.Debugf("Epoch %d: restart already scheduled", status.epoch)
}
}
// 定時操作
case <-reconcileTimer.C:
a.reconcile()
// 結(jié)束agent
case <-ctx.Done():
a.terminate()
log.Info("Agent has successfully terminated")
return
}
}
}
該
Run
是從這里控制整個envoy proxy
的生命周期.
1. 從a.configCh
中獲得config
信息, 如果與當前的config
不同, 則表明需要重新調(diào)整envoy proxy
. 調(diào)用reconcile
進行調(diào)整. 這個是供外部調(diào)用控制的, 比如watcher
.
2. 當envoy proxy
運行結(jié)束后, 會向a.statusCh
發(fā)送信息, 所以第二個分支是處理這類信息的, 首先刪除agent
內(nèi)存中關(guān)于該版本epoch
的信息包括a.epochs
,a.abortCh
等, 并且調(diào)用proxy
的清理方法(envoy proxy
會刪除對應的在磁盤上的配置文件). 如果proxy
是帶有錯誤信息退出的話, 則可能需要重試(定時重試的機制第三個case
).
3. 定時重試的機制.
3.2 proxy
proxy
在istio
指的就是envoy
, 所以接下來看一下envoy
是如何實現(xiàn)Proxy
的三個方法的.
Run
func (e *envoy) Run(config interface{}, epoch int, abort <-chan error) error {
var fname string
if len(e.config.CustomConfigFile) > 0 {
// there is a custom configuration. Don't write our own config - but keep watching the certs.
fname = e.config.CustomConfigFile
} else if _, ok := config.(proxy.DrainConfig); ok {
fname = drainFile
} else {
// 生成envoy的配置文件
out, err := bootstrap.WriteBootstrap(
&e.config, e.node, epoch, e.pilotSAN, e.opts, os.Environ(), e.nodeIPs, e.dnsRefreshRate)
...
fname = out
}
// 構(gòu)造envoy運行的配置文件
args := e.args(fname, epoch, istioBootstrapOverrideVar.Get())
log.Infof("Envoy command: %v", args)
// 運行
cmd := exec.Command(e.config.BinaryPath, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
return err
}
done := make(chan error, 1)
go func() {
// 等待運行結(jié)果
done <- cmd.Wait()
}()
select {
// 外部可以中斷該envoy的運行
case err := <-abort:
log.Warnf("Aborting epoch %d", epoch)
if errKill := cmd.Process.Kill(); errKill != nil {
log.Warnf("killing epoch %d caused an error %v", epoch, errKill)
}
return err
// envoy運行結(jié)束后返回的內(nèi)容
case err := <-done:
return err
}
}
Run
方法的操作可以總結(jié)成以下幾步:
1.bootstrap.WriteBootstrap
生成配置文件(envoy-rev%d.json
)寫在磁盤上, 主要是根據(jù)proxyConfig
.
2. 生成envoy
運行的args
參數(shù).
3. 運行envoy
, 并異步等待運行結(jié)果將其寫入done
這個channel
中.
4. 有兩種情況Run
方法會退出:4.1 外部(
agent
)向abort channel
中寫入信息使其主動殺死envoy
進程.
4.2envoy
運行結(jié)束. (錯誤或者無錯誤的運行結(jié)束)
5. 返回最終的錯誤信息
err
, 也有可能是nil
(沒有錯誤).
Cleanup 和 Panic
func (e *envoy) Cleanup(epoch int) {
filePath := configFile(e.config.ConfigPath, epoch)
// 刪除此版本的配置文件
if err := os.Remove(filePath); err != nil {
log.Warnf("Failed to delete config file %s for %d, %v", filePath, epoch, err)
}
}
func (e *envoy) Panic(epoch interface{}) {
log.Error("cannot start the e with the desired configuration")
if epochInt, ok := epoch.(int); ok {
// print the failed config file
// 打印所有失敗的配置文件
filePath := configFile(e.config.ConfigPath, epochInt)
b, _ := ioutil.ReadFile(filePath)
log.Errorf(string(b))
}
os.Exit(-1)
}
1.
Cleanup
就是刪除此版本在磁盤上的配置文件.
2.Panic
打印完所有失敗的配置文件后退出程序.
3.3 Watcher
// pilot/cmd/pilot-agent/main.go
...
tlsCertsToWatch = []string{
tlsServerCertChain, tlsServerKey, tlsServerRootCert,
tlsClientCertChain, tlsClientKey, tlsClientRootCert,
}
...
watcher := envoy.NewWatcher(tlsCertsToWatch, agent.ConfigCh())
...
// pilot/pkg/proxy/envoy/watcher.go
func NewWatcher(
certs []string,
updates chan<- interface{}) Watcher {
return &watcher{
certs: certs,
updates: updates,
}
}
func (w *watcher) Run(ctx context.Context) {
w.SendConfig()
go watchCerts(ctx, w.certs, watchFileEvents, defaultMinDelay, w.SendConfig)
<-ctx.Done()
log.Info("Watcher has successfully terminated")
}
func (w *watcher) SendConfig() {
h := sha256.New()
// 向agent.configCh發(fā)送信息
generateCertHash(h, w.certs)
w.updates <- h.Sum(nil)
}
這里不展開了, 主要是通過監(jiān)控
tlsCertsToWatch
幾個文件, 如果發(fā)生變化, 則向agent.configCh
發(fā)送信息.
4. 總結(jié)
conclusion.png
1. 通過
Watcher
結(jié)構(gòu)體監(jiān)控tlsCertsToWatch
文件的變化, 然后向a.configCh
發(fā)信息進而調(diào)用reconcile
來啟動envoy
.
2.agent
會定時調(diào)用reconcile
并判斷是否需要重啟envoy
.
3.envoy
運行結(jié)束(有錯誤或無錯誤運行結(jié)束)后會向a.statusCh
發(fā)送信息, 然后agent
會做一些此envoy
版本的清理工作.
5. 參考
1.
istio 1.3.6源碼
2. https://segmentfault.com/a/1190000015171622