我是 LEE堆生,老李妇穴,一個在 IT 行業(yè)摸爬滾打 16 年的技術(shù)老兵。
事件背景
現(xiàn)網(wǎng)上運行著一個自己開發(fā)的 metrics exporter卖怜,它是專門來捕獲后端資源的運行狀態(tài)渔工,并生成對應(yīng)的 prometheus metrics 供監(jiān)控報警系統(tǒng)使用隶校。當(dāng)然這個 exporter 只是負(fù)責(zé)遠(yuǎn)程監(jiān)控資源,并不能實際控制后端的資源,也不能實時動態(tài)獲得被監(jiān)控的資源的變動事件蹲堂。當(dāng)我們的運維小伙伴手動錯誤刪除后端被監(jiān)控的資源,導(dǎo)致業(yè)務(wù)流量異常贝淤。此時也沒有報警出來柒竞,而這個報警卻是依賴這個 metrics exporter 所采集的數(shù)據(jù),導(dǎo)致了一次小型事件霹娄。因為這個事件,才有今天寫文章的動力鲫骗,同時也分享下解決這個問題的方法犬耻。
現(xiàn)象獲取
架構(gòu)圖
問題定位
通過跟小伙伴們一起復(fù)盤,以及追查可能出現(xiàn)問題的位置后执泰,大家都覺得沒有任何問題枕磁。在運維刪除對應(yīng)的監(jiān)控資源后,同時沒有關(guān)閉報警規(guī)則的情況下术吝,應(yīng)該有大量的任何異常報警產(chǎn)生计济。但實際情況,沒有任何報警發(fā)出來排苍。
當(dāng)大家一籌莫展的時候沦寂,我突然說了一句,會不會是數(shù)據(jù)采集出現(xiàn)了問題淘衙?大家眼前一亮传藏,趕緊拿出 metrics exporter 的代碼檢查。通過反復(fù)檢查彤守,也沒有發(fā)現(xiàn)可疑的地方毯侦,于是大家又開始了思考。這時我打開了 metrics exporter 調(diào)試模式具垫,打上斷點侈离,然后請運維小伙伴刪除一個測試資源,觀察監(jiān)控數(shù)據(jù)的變化筝蚕。果不其然卦碾,資源刪除了,對應(yīng)監(jiān)控的 metrics 條目的值沒有變化(也就是說起宽,還是上次資源的狀態(tài))蔗坯。
這下破案了,搞了半天是因為 metrics 條目內(nèi)容沒有跟隨資源的刪除而被自動的刪除燎含。導(dǎo)致了報警系統(tǒng)一直認(rèn)為被刪除的資源還在運行宾濒,而且狀態(tài)正常。
原理分析
既然知道了原因屏箍,再回過頭看 metrics exporter 的代碼绘梦,代碼中有 prometheus.MustRegister橘忱、prometheus.Unregister 和相關(guān)的 MetricsVec 值變更的實現(xiàn)和調(diào)用。就是沒有判斷監(jiān)控資源在下線或者刪除的情況下卸奉,如何刪除和清理創(chuàng)建出來的 MetricsVec钝诚。
在我的印象中 MetricsVec 會根據(jù) labels 會自動創(chuàng)建相關(guān)的條目,從來沒有手動的添加和創(chuàng)建榄棵。根據(jù)這個邏輯我也認(rèn)為凝颇,MetricsVec 中如果 labels 對應(yīng)的值不更新或者處于不活躍的狀態(tài),應(yīng)該自動刪除才是疹鳄。
最后還是把 golang 的 github.com/prometheus/client_golang 這個庫想太完美了拧略。沒有花時間對 github.com/prometheus/client_golang 內(nèi)部結(jié)構(gòu)、原理瘪弓、處理機(jī)制充分理解垫蛆,才導(dǎo)致這個事件的發(fā)生。
github.com/prometheus/client_golang 中的 metrics 主要是 4 個種類腺怯,這個可以 baidu 上搜索袱饭,很多介紹,我這里不詳細(xì)展開呛占。這些種類的 metrics 又可以分為:一次性使用和多次使用虑乖。
- 一次性使用:當(dāng)請求到達(dá)了 http 服務(wù)器,被 promhttp 中的 handler 處理后晾虑,返回數(shù)據(jù)給請求方决左。隨后 metrics 數(shù)據(jù)就失效了,不保存走贪。下次再有請求到 http 接口查詢 metrics佛猛,數(shù)據(jù)重新計算生成,返回給請求方坠狡。
- 多次性使用:當(dāng)請求到達(dá)了 http 服務(wù)器继找,被 promhttp 中的 handler 處理后,返回數(shù)據(jù)給請求方逃沿。隨后 metrics 保存婴渡,并不會刪除,需要手動清理和刪除凯亮。 下次再有請求到 http 接口查詢 metrics边臼,直接返回之前存儲過的數(shù)據(jù)給請求方。
注意這兩者的區(qū)別假消,他們有不同的應(yīng)用場景柠并。
- 一次性使用:一次請求一次新數(shù)據(jù),數(shù)據(jù)與數(shù)據(jù)間隔時間由數(shù)據(jù)讀取者決定。 如果有多個數(shù)據(jù)讀取者臼予,每一個讀取者讀取到的數(shù)據(jù)可能不會相同鸣戴。每一個請求計算一次,如果采集請求量比較大粘拾,或者內(nèi)部計算壓力比較大窄锅,都會導(dǎo)致負(fù)載壓力很高。 計算和輸出是同步邏輯缰雇。 例如:k8s 上的很多 exporter 是這樣的方式入偷。
- 多次性使用:每次請求都是從 map 中獲得,數(shù)據(jù)與數(shù)據(jù)間隔時間由數(shù)據(jù)寫入者決定械哟。如果有多個數(shù)據(jù)讀取者疏之,每一個讀取者采集的數(shù)據(jù)相同(讀取的過程中沒有更新數(shù)據(jù)寫入)。每一個請求獲得都是相同的計算結(jié)果戒良,1 次計算多數(shù)讀取体捏。計算和輸出是異步邏輯冠摄。例如:http server 上 http 請求狀態(tài)統(tǒng)計糯崎,延遲統(tǒng)計,轉(zhuǎn)發(fā)字節(jié)匯總河泳,并發(fā)量等等沃呢。
這次項目中寫的 metrics exporter 本應(yīng)該是采用 “一次性使用” 這樣的模型來開發(fā),但是內(nèi)部結(jié)構(gòu)模型采用了 “多次性使用” 模型拆挥,因為指標(biāo)數(shù)據(jù)寫入者和數(shù)據(jù)讀取者之間沒有必然聯(lián)系薄霜,不屬于一個會話系統(tǒng),所以之間是異步結(jié)構(gòu)纸兔。具體我們看下圖:
從圖中有 2 個身份說明下:
- 數(shù)據(jù)讀取者:主要是 Prometheus 系統(tǒng)的采集器惰瓜,根據(jù)配置的規(guī)則周期性的來 metrics 接口讀取數(shù)據(jù)。
- 數(shù)據(jù)寫入者:開發(fā)的 scanner 汉矿,通過接口去讀遠(yuǎn)程資源狀態(tài)信息和相關(guān)數(shù)據(jù)崎坊,通過計算得到最后的結(jié)果,寫入指定的 metrics 條目內(nèi)洲拇。
在此次項目中 metrics 條目是用 prometheus.GaugeVec 作為采集數(shù)據(jù)計算后結(jié)果的存儲類型奈揍。
說了這么多,想要分析真正的原因赋续,就必須深入 github.com/prometheus/client_golang 代碼中 GaugeVec 這個具體代碼實現(xiàn)男翰。
// GaugeVec is a Collector that bundles a set of Gauges that all share the same
// Desc, but have different values for their variable labels. This is used if
// you want to count the same thing partitioned by various dimensions
// (e.g. number of operations queued, partitioned by user and operation
// type). Create instances with NewGaugeVec.
type GaugeVec struct {
*MetricVec
}
type MetricVec struct {
*metricMap
curry []curriedLabelValue
// hashAdd and hashAddByte can be replaced for testing collision handling.
hashAdd func(h uint64, s string) uint64
hashAddByte func(h uint64, b byte) uint64
}
// metricMap is a helper for metricVec and shared between differently curried
// metricVecs.
type metricMap struct {
mtx sync.RWMutex // Protects metrics.
metrics map[uint64][]metricWithLabelValues // 真正的數(shù)據(jù)存儲位置
desc *Desc
newMetric func(labelValues ...string) Metric
}
通過上面的代碼,一條 metric 條目是保存在 metricMap.metrics 下纽乱。 我們繼續(xù)往下看:
讀取數(shù)據(jù)
// Collect implements Collector.
func (m *metricMap) Collect(ch chan<- Metric) {
m.mtx.RLock()
defer m.mtx.RUnlock()
// 遍歷 map
for _, metrics := range m.metrics {
for _, metric := range metrics {
ch <- metric.metric // 讀取數(shù)據(jù)到通道
}
}
}
寫入數(shù)據(jù)
// To create Gauge instances, use NewGauge.
type Gauge interface {
Metric
Collector
// Set sets the Gauge to an arbitrary value.
Set(float64)
// Inc increments the Gauge by 1. Use Add to increment it by arbitrary
// values.
Inc()
// Dec decrements the Gauge by 1. Use Sub to decrement it by arbitrary
// values.
Dec()
// Add adds the given value to the Gauge. (The value can be negative,
// resulting in a decrease of the Gauge.)
Add(float64)
// Sub subtracts the given value from the Gauge. (The value can be
// negative, resulting in an increase of the Gauge.)
Sub(float64)
// SetToCurrentTime sets the Gauge to the current Unix time in seconds.
SetToCurrentTime()
}
func NewGauge(opts GaugeOpts) Gauge {
desc := NewDesc(
BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
opts.Help,
nil,
opts.ConstLabels,
)
result := &gauge{desc: desc, labelPairs: desc.constLabelPairs}
result.init(result) // Init self-collection.
return result
}
type gauge struct {
// valBits contains the bits of the represented float64 value. It has
// to go first in the struct to guarantee alignment for atomic
// operations. http://golang.org/pkg/sync/atomic/#pkg-note-BUG
valBits uint64
selfCollector
desc *Desc
labelPairs []*dto.LabelPair
}
func (g *gauge) Set(val float64) {
atomic.StoreUint64(&g.valBits, math.Float64bits(val)) // 寫入數(shù)據(jù)到變量
}
看到上面的代碼蛾绎,有的小伙伴就會說讀取和寫入的位置不一樣啊,沒有找到真正的位置。不要著急秘通,后面還有为严。
// getOrCreateMetricWithLabelValues retrieves the metric by hash and label value
// or creates it and returns the new one.
//
// This function holds the mutex.
func (m *metricMap) getOrCreateMetricWithLabelValues(hash uint64, lvs []string, curry []curriedLabelValue,) Metric { // 返回了一個接口
m.mtx.RLock()
metric, ok := m.getMetricWithHashAndLabelValues(hash, lvs, curry)
m.mtx.RUnlock()
if ok {
return metric
}
m.mtx.Lock()
defer m.mtx.Unlock()
metric, ok = m.getMetricWithHashAndLabelValues(hash, lvs, curry)
if !ok {
inlinedLVs := inlineLabelValues(lvs, curry)
metric = m.newMetric(inlinedLVs...)
m.metrics[hash] = append(m.metrics[hash], metricWithLabelValues{values: inlinedLVs, metric: metric}) // 這里寫入 metricMap.metrics
}
return metric
}
// A Metric models a single sample value with its meta data being exported to
// Prometheus. Implementations of Metric in this package are Gauge, Counter,
// Histogram, Summary, and Untyped.
type Metric interface { // 哦哦哦哦,是接口啊肺稀。Gauge 實現(xiàn)這個接口
// Desc returns the descriptor for the Metric. This method idempotently
// returns the same descriptor throughout the lifetime of the
// Metric. The returned descriptor is immutable by contract. A Metric
// unable to describe itself must return an invalid descriptor (created
// with NewInvalidDesc).
Desc() *Desc
// Write encodes the Metric into a "Metric" Protocol Buffer data
// transmission object.
//
// Metric implementations must observe concurrency safety as reads of
// this metric may occur at any time, and any blocking occurs at the
// expense of total performance of rendering all registered
// metrics. Ideally, Metric implementations should support concurrent
// readers.
//
// While populating dto.Metric, it is the responsibility of the
// implementation to ensure validity of the Metric protobuf (like valid
// UTF-8 strings or syntactically valid metric and label names). It is
// recommended to sort labels lexicographically. Callers of Write should
// still make sure of sorting if they depend on it.
Write(*dto.Metric) error
// TODO(beorn7): The original rationale of passing in a pre-allocated
// dto.Metric protobuf to save allocations has disappeared. The
// signature of this method should be changed to "Write() (*dto.Metric,
// error)".
}
看到這里就知道了寫入第股、存儲、讀取已經(jīng)連接到了一起话原。 同時如果沒有顯式的調(diào)用方法刪除 metricMap.metrics 的內(nèi)容夕吻,那么記錄的 metrics 條目的值就會一直存在,而原生代碼中只是創(chuàng)建和變更內(nèi)部值繁仁。正是因為這個邏輯才導(dǎo)致上面說的事情涉馅。
處理方法
既然找到原因,也找到對應(yīng)的代碼以及對應(yīng)的內(nèi)部邏輯黄虱,就清楚了 prometheus.GaugeVec 這個變量真正的使用方法稚矿。到此解決方案也就有了,找到合適的位置添加代碼捻浦,顯式調(diào)用 DeleteLabelValues 這個方法來刪除無效 metrics 條目晤揣。
為了最后實現(xiàn)整體效果,我總結(jié)下有幾個關(guān)鍵詞:“異步”朱灿、“多次性使用”昧识、“自動回收”。
最后的改造思路:
- 創(chuàng)建一個 scanner 掃描結(jié)果存儲的狀態(tài)機(jī) (status)
- 每次 scanner 掃描結(jié)果會向這個狀態(tài)機(jī)做更新動作盗扒,并記錄對應(yīng)的更新時間
- 啟動一個 goroutine (cleaner) 定期掃描狀態(tài)機(jī)跪楞,然后遍歷分析記錄數(shù)據(jù)的更新時間。如果遍歷到對應(yīng)數(shù)據(jù)的更新時間跟現(xiàn)在的時間差值超過一個固定的閾值侣灶,就主動刪除狀態(tài)機(jī)中對應(yīng)的信息甸祭,同時刪除對應(yīng)的 metrics 條目
通過這個動作就可以實現(xiàn)自動回收和清理無效的 metrics 條目,最后驗證下來確實有效褥影。
最終效果
通過測試代碼來驗證這個方案的效果池户,具體如下演示:
package main
import (
"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
"strconv"
"sync"
"time"
)
type metricsMetaData struct {
UpdatedAt int64
Labels []string
}
func main() {
var wg sync.WaitGroup
var status sync.Map
vec := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "app",
Name: "running_status",
}, []string{"id"},
)
prometheus.MustRegister(vec)
defer prometheus.Unregister(vec)
// 寫入數(shù)據(jù)
for i := 0; i < 10; i++ {
labels := strconv.Itoa(i)
vec.WithLabelValues(labels).Set(1) // 寫入 metric 條目
status.Store(labels, metricsMetaData{UpdatedAt: time.Now().Unix(), Labels: []string{labels}}) // 寫入狀態(tài)
}
// 創(chuàng)建退出 ctx
stopCtx, stopCancel := context.WithCancel(context.Background())
// 啟動清理器
go func(ctx *context.Context, g *sync.WaitGroup) {
defer g.Done()
ticker := time.NewTicker(time.Second * 2)
for {
select {
case <-ticker.C:
now := time.Now().Unix()
status.Range(func(key, value interface{}) bool {
if now-value.(metricsMetaData).UpdatedAt > 5 {
vec.DeleteLabelValues(value.(metricsMetaData).Labels...) // 刪除 metrics 條目
status.Delete(key) // 刪除 map 中的記錄
}
return true
})
break
case <-(*ctx).Done():
return
}
}
}(&stopCtx, &wg)
wg.Add(1)
// 創(chuàng)建 http
http.Handle("/metrics", promhttp.Handler())
srv := http.Server{Addr: "0.0.0.0:8080"}
// 啟動 http server
go func(srv *http.Server, g *sync.WaitGroup) {
defer g.Done()
_ = srv.ListenAndServe()
}(&srv, &wg)
wg.Add(1)
// 退出
time.Sleep(time.Second * 10)
stopCancel()
_ = srv.Shutdown(context.Background())
wg.Wait()
}
結(jié)果動畫: