Judge版本
VERSION = "2.0.2"
Judge組件功能
Judge是用于判斷是否觸發(fā)報(bào)警條件的組件词顾。
Transfer的數(shù)據(jù)不但要轉(zhuǎn)送到Graph來(lái)存儲(chǔ)并繪圖蹈矮,還要轉(zhuǎn)送到Judge用于報(bào)警判斷。Judge先從hbs獲取所有策略列表亲桥,靜等Transfer的數(shù)據(jù)轉(zhuǎn)發(fā)疑务。
每收到一條Transfer轉(zhuǎn)發(fā)過(guò)來(lái)的數(shù)據(jù)暗甥,立即找到這條數(shù)據(jù)關(guān)聯(lián)的Strategy、Expression焚廊,然后做閾值判斷冶匹。【官方描述】
Judge組件邏輯圖
Judge邏輯圖
Portal關(guān)于報(bào)警策略與表達(dá)式定義操作說(shuō)明書(shū)
main主入口分析
func main() {
cfg := flag.String("c", "cfg.json", "configuration file")
version := flag.Bool("v", false, "show version")
flag.Parse()
if *version {
fmt.Println(g.VERSION)
os.Exit(0)
}
g.ParseConfig(*cfg) //全局配置文件解析
g.InitRedisConnPool() //初始化Redis連接池 【參考詳細(xì)分析】
g.InitHbsClient() //初始化HBS客戶端連接【參考詳細(xì)分析】
store.InitHistoryBigMap() //BigMap緩存指定采集數(shù)據(jù) 【參考詳細(xì)分析】
go http.Start() //Http API服務(wù)啟動(dòng) 【參考詳細(xì)分析】
go rpc.Start() //RPC服務(wù)啟動(dòng) 【參考詳細(xì)分析】
go cron.SyncStrategies() //周期任務(wù)咆瘟,同步HBS策略和表達(dá)式 【參考詳細(xì)分析】
go cron.CleanStale() //周期任務(wù)嚼隘,清理過(guò)時(shí)策略 【參考詳細(xì)分析】
select {}
}
g.ParseConfig(*cfg) 初始化全局配置文件
type GlobalConfig struct {
Debug bool `json:"debug"`
DebugHost string `json:"debugHost"`
Remain int `json:"remain"`
Http *HttpConfig `json:"http"`
Rpc *RpcConfig `json:"rpc"`
Hbs *HbsConfig `json:"hbs"`
Alarm *AlarmConfig `json:"alarm"`
}
func Config() *GlobalConfig {
configLock.RLock()
defer configLock.RUnlock()
return config
}
# 解析全局配置
func ParseConfig(cfg string) {
if cfg == "" { //參數(shù)配置
log.Fatalln("use -c to specify configuration file")
}
if !file.IsExist(cfg) { //是否存在
log.Fatalln("config file:", cfg, "is not existent")
}
ConfigFile = cfg
configContent, err := file.ToTrimString(cfg) //字符串
if err != nil {
log.Fatalln("read config file:", cfg, "fail:", err)
}
var c GlobalConfig
err = json.Unmarshal([]byte(configContent), &c) //反序列為結(jié)構(gòu)
if err != nil {
log.Fatalln("parse config file:", cfg, "fail:", err)
}
configLock.Lock()
defer configLock.Unlock()
config = &c
log.Println("read config file:", cfg, "successfully")
}
g.InitRedisConnPool() 初始化Redis連接池
func InitRedisConnPool() {
if !Config().Alarm.Enabled {
return
}
dsn := Config().Alarm.Redis.Dsn
maxIdle := Config().Alarm.Redis.MaxIdle
idleTimeout := 240 * time.Second
connTimeout := time.Duration(Config().Alarm.Redis.ConnTimeout) * time.Millisecond
readTimeout := time.Duration(Config().Alarm.Redis.ReadTimeout) * time.Millisecond
writeTimeout := time.Duration(Config().Alarm.Redis.WriteTimeout) * time.Millisecond
RedisConnPool = &redis.Pool{
MaxIdle: maxIdle,
IdleTimeout: idleTimeout,
Dial: func() (redis.Conn, error) {
c, err := redis.DialTimeout("tcp", dsn, connTimeout, readTimeout, writeTimeout)
if err != nil {
return nil, err
}
return c, err
},
TestOnBorrow: PingRedis,
}
}
func PingRedis(c redis.Conn, t time.Time) error {
_, err := c.Do("ping")
if err != nil {
log.Println("[ERROR] ping redis fail", err)
}
return err
}
g.InitHbsClient() 實(shí)例化HBS客戶端對(duì)象
func InitHbsClient() {
HbsClient = &SingleConnRpcClient{
RpcServers: Config().Hbs.Servers,
Timeout: time.Duration(Config().Hbs.Timeout) * time.Millisecond,
}
}
store.InitHistoryBigMap() 初始化內(nèi)存BigMap,存在采集歷史數(shù)據(jù)
# 創(chuàng)建BigMap([256]JudgeItemMap),存放采集的監(jiān)控歷史數(shù)據(jù)
# [00..f] JudgeItemMap
# .
# .
# [f0..f] JudgeItemMap
func InitHistoryBigMap() {
arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
for i := 0; i < 16; i++ {
for j := 0; j < 16; j++ {
HistoryBigMap[arr[i]+arr[j]] = NewJudgeItemMap()
}
}
}
## 創(chuàng)建與初始化JudgeItemMap -> map[string]*SafeLinkedList
func NewJudgeItemMap() *JudgeItemMap {
return &JudgeItemMap{M: make(map[string]*SafeLinkedList)}
}
//這是個(gè)線程不安全的大Map袒餐,需要提前初始化好
var HistoryBigMap = make(map[string]*JudgeItemMap)
//JudgeItemMap結(jié)構(gòu)體
type JudgeItemMap struct {
sync.RWMutex
M map[string]*SafeLinkedList
}
//SafeLinkedList結(jié)構(gòu)體飞蛹,"container/list"
type SafeLinkedList struct {
sync.RWMutex
L *list.List
}
http.Start() HTTP API服務(wù)監(jiān)聽(tīng)與處理
func init() {
configCommonRoutes() //組件公共API路由,可參考HBS模塊
configInfoRoutes() //信息查詢API路由
}
func Start() {
if !g.Config().Http.Enabled { //開(kāi)啟HTTP
return
}
addr := g.Config().Http.Listen //全局配置監(jiān)聽(tīng)端品
if addr == "" {
return
}
s := &http.Server{
Addr: addr,
MaxHeaderBytes: 1 << 30,
}
log.Println("http listening", addr)
log.Fatalln(s.ListenAndServe())
}
func configInfoRoutes() {
// e.g. /strategy/lg-dinp-docker01.bj/cpu.idle
http.HandleFunc("/strategy/", func(w http.ResponseWriter, r *http.Request) {})
// e.g. /expression/net.port.listen/port=22
http.HandleFunc("/expression/", func(w http.ResponseWriter, r *http.Request) {})
//統(tǒng)計(jì)bigmap數(shù)據(jù)總長(zhǎng)度
http.HandleFunc("/count", func(w http.ResponseWriter, r *http.Request) {})
//查看BigMap內(nèi)指定的歷史數(shù)據(jù)
http.HandleFunc("/history/", func(w http.ResponseWriter, r *http.Request) {})
}
rpc.Start() RPC服務(wù)注冊(cè)與處理
func Start() {
if !g.Config().Rpc.Enabled {
return
}
addr := g.Config().Rpc.Listen
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
log.Fatalf("net.ResolveTCPAddr fail: %s", err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
log.Fatalf("listen %s fail: %s", addr, err)
} else {
log.Println("rpc listening", addr)
}
rpc.Register(new(Judge)) //注冊(cè)Judge
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("listener.Accept occur error: %s", err)
continue
}
go rpc.ServeConn(conn)
}
}
## RPC Judge.Ping方法
type Judge int
func (this *Judge) Ping(req model.NullRpcRequest, resp *model.SimpleRpcResponse) error {
return nil
}
## RPC Judge.Send方法灸眼,Transfer使用此RPC方式上傳數(shù)據(jù)
func (this *Judge) Send(items []*model.JudgeItem, resp *model.SimpleRpcResponse) error {
remain := g.Config().Remain //最大保留多少次歷史記錄卧檐,由全局配置文件定義
// 把當(dāng)前時(shí)間的計(jì)算放在最外層,是為了減少獲取時(shí)間時(shí)的系統(tǒng)調(diào)用開(kāi)銷
now := time.Now().Unix()
for _, item := range items {
exists := g.FilterMap.Exists(item.Metric) //判斷緩存filtermap是否存在匹配Metric相關(guān)策略焰宣,無(wú)相關(guān)策略將不緩存此數(shù)據(jù)
if !exists {
continue
}
pk := item.PrimaryKey() //生成HASH key
store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now) //緩存歷史數(shù)據(jù)
}
return nil
}
//JudgeItem數(shù)據(jù)結(jié)構(gòu)
type JudgeItem struct {
Endpoint string `json:"endpoint"`
Metric string `json:"metric"`
Value float64 `json:"value"`
Timestamp int64 `json:"timestamp"`
JudgeType string `json:"judgeType"`
Tags map[string]string `json:"tags"`
}
- 歷史數(shù)據(jù)的緩存邏輯分析
#生成MD5 Hash值
func (this *JudgeItem) PrimaryKey() string {
return utils.Md5(utils.PK(this.Endpoint, this.Metric, this.Tags))
}
#基于HASH前兩位做為索引KEY霉囚,存入HistoryBigMap且Judge計(jì)算處理
store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now)
#JudgeItem Map緩存和Judge計(jì)算
func (this *JudgeItemMap) PushFrontAndMaintain(key string, val *model.JudgeItem, maxCount int, now int64) {
//JudgeItemMap.Get(HASH)找是否存在HASH KEY項(xiàng)列表,如果存在Push
//數(shù)據(jù)和校驗(yàn)數(shù)據(jù)有效性后Judge計(jì)算匕积;如果不存在則基于HASH key創(chuàng)建
//JudgeItemMap并Push數(shù)據(jù)盈罐。
if linkedList, exists := this.Get(key); exists {
needJudge := linkedList.PushFrontAndMaintain(val, maxCount)
if needJudge {
Judge(linkedList, val, now) //【參考Judge邏輯分析】
}
} else {
NL := list.New()
NL.PushFront(val) //push into list
safeList := &SafeLinkedList{L: NL} //create safelist
this.Set(key, safeList) //save into JudgeItemMap[hash]
Judge(safeList, val, now) //【參考Judge邏輯分析】
}
}
# @return needJudge 如果是false不需要做judge,因?yàn)樾律蟻?lái)的數(shù)據(jù)不合法
func (this *SafeLinkedList) PushFrontAndMaintain(v *model.JudgeItem, maxCount int) bool {
this.Lock()
defer this.Unlock()
sz := this.L.Len()
if sz > 0 {
// 新push上來(lái)的數(shù)據(jù)有可能重復(fù)了(等于以前ts)闪唆,或者timestamp不對(duì)(小于以前ts)暖呕,這種數(shù)據(jù)要丟掉
if v.Timestamp <= this.L.Front().Value.(*model.JudgeItem).Timestamp || v.Timestamp <= 0 {
return false
}
}
this.L.PushFront(v) //最新數(shù)據(jù)放置在列表首
sz++
if sz <= maxCount {
return true
}
//達(dá)到最大保存歷史數(shù)據(jù)項(xiàng)條數(shù)后則清除尾部
del := sz - maxCount
for i := 0; i < del; i++ {
this.L.Remove(this.L.Back()) //刪除列表尾記錄
}
return true
}
- Judge報(bào)警判斷邏輯分析
# Judge入口函數(shù)
func Judge(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
CheckStrategy(L, firstItem, now) //Strategy處理
CheckExpression(L, firstItem, now) //Expression處理
}
## 策略檢測(cè)及發(fā)送事件
func CheckStrategy(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
key := fmt.Sprintf("%s/%s", firstItem.Endpoint, firstItem.Metric)
strategyMap := g.StrategyMap.Get()
strategies, exists := strategyMap[key]
if !exists {
return
}
for _, s := range strategies {
// 因?yàn)閗ey僅僅是endpoint和metric,所以得到的strategies并不一定是與當(dāng)前judgeItem相關(guān)的
// 比如lg-dinp-docker01.bj配置了兩個(gè)proc.num的策略苞氮,一個(gè)name=docker,一個(gè)name=agent
// 所以此處要排除掉一部分
related := true
for tagKey, tagVal := range s.Tags {
if myVal, exists := firstItem.Tags[tagKey]; !exists || myVal != tagVal {
related = false
break
}
}
if !related {
continue
}
judgeItemWithStrategy(L, s, firstItem, now)
}
}
### 判斷采集數(shù)據(jù)瓤逼,如果匹配策略計(jì)算條件則發(fā)送報(bào)警事件
func judgeItemWithStrategy(L *SafeLinkedList, strategy model.Strategy, firstItem *model.JudgeItem, now int64) {
fn, err := ParseFuncFromString(strategy.Func, strategy.Operator, strategy.RightValue) //解析報(bào)警函數(shù)
if err != nil {
log.Printf("[ERROR] parse func %s fail: %v. strategy id: %d", strategy.Func, err, strategy.Id)
return
}
historyData, leftValue, isTriggered, isEnough := fn.Compute(L) //執(zhí)行判斷與計(jì)算
if !isEnough {
return
}
// 格式化事件信息
event := &model.Event{
Id: fmt.Sprintf("s_%d_%s", strategy.Id, firstItem.PrimaryKey()),
Strategy: &strategy,
Endpoint: firstItem.Endpoint,
LeftValue: leftValue,
EventTime: firstItem.Timestamp,
PushedTags: firstItem.Tags,
}
sendEventIfNeed(historyData, isTriggered, now, event, strategy.MaxStep) //依據(jù)執(zhí)行判斷結(jié)果決定發(fā)送報(bào)警事件
}
## 表達(dá)式檢測(cè)及發(fā)送事件
func CheckExpression(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
keys := buildKeysFromMetricAndTags(firstItem)
if len(keys) == 0 {
return
}
// expression可能會(huì)被多次重復(fù)處理笼吟,用此數(shù)據(jù)結(jié)構(gòu)保證只被處理一次
handledExpression := make(map[int]struct{})
expressionMap := g.ExpressionMap.Get()
for _, key := range keys {
expressions, exists := expressionMap[key] //查詢是否存在采集數(shù)據(jù)相對(duì)應(yīng)的表達(dá)式
if !exists {
continue
}
related := filterRelatedExpressions(expressions, firstItem) //過(guò)濾與采集數(shù)據(jù)相關(guān)的Expression
for _, exp := range related {
if _, ok := handledExpression[exp.Id]; ok {
continue
}
handledExpression[exp.Id] = struct{}{}
judgeItemWithExpression(L, exp, firstItem, now)
}
}
}
### 過(guò)濾與采集數(shù)據(jù)相關(guān)的Expression
func filterRelatedExpressions(expressions []*model.Expression, firstItem *model.JudgeItem) []*model.Expression {
size := len(expressions)
if size == 0 {
return []*model.Expression{}
}
exps := make([]*model.Expression, 0, size)
for _, exp := range expressions {
related := true
itemTagsCopy := firstItem.Tags
// 注意:exp.Tags 中可能會(huì)有一個(gè)endpoint=xxx的tag
if _, ok := exp.Tags["endpoint"]; ok {
itemTagsCopy = copyItemTags(firstItem)
}
for tagKey, tagVal := range exp.Tags {
if myVal, exists := itemTagsCopy[tagKey]; !exists || myVal != tagVal {
related = false
break
}
}
if !related {
continue
}
exps = append(exps, exp) //[]exps
}
return exps
}
### 判斷采集數(shù)據(jù),如果匹配表達(dá)式計(jì)算條件則發(fā)送報(bào)警事件
func judgeItemWithExpression(L *SafeLinkedList, expression *model.Expression, firstItem *model.JudgeItem, now int64) {
fn, err := ParseFuncFromString(expression.Func, expression.Operator, expression.RightValue) //解析報(bào)警函數(shù)
if err != nil {
log.Printf("[ERROR] parse func %s fail: %v. expression id: %d", expression.Func, err, expression.Id)
return
}
historyData, leftValue, isTriggered, isEnough := fn.Compute(L) //執(zhí)行判斷與計(jì)算
if !isEnough {
return
}
// 格式化事件信息
event := &model.Event{
Id: fmt.Sprintf("e_%d_%s", expression.Id, firstItem.PrimaryKey()),
Expression: expression,
Endpoint: firstItem.Endpoint,
LeftValue: leftValue,
EventTime: firstItem.Timestamp,
PushedTags: firstItem.Tags,
}
sendEventIfNeed(historyData, isTriggered, now, event, expression.MaxStep) //依據(jù)執(zhí)行判斷結(jié)果決定發(fā)送報(bào)警事件
}
## 解析操作字符串霸旗,轉(zhuǎn)化為報(bào)警函數(shù)
func ParseFuncFromString(str string, operator string, rightValue float64) (fn Function, err error) {
if str == "" {
return nil, fmt.Errorf("func can not be null!")
}
idx := strings.Index(str, "#") //以#為定位符
args, err := atois(str[idx+1 : len(str)-1]) //字位符后為參數(shù)
if err != nil {
return nil, err
}
switch str[:idx-1] { //定位符前為函數(shù)名
case "max":
fn = &MaxFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
case "min":
fn = &MinFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
case "all":
fn = &AllFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
case "sum":
fn = &SumFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
case "avg":
fn = &AvgFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
case "diff":
fn = &DiffFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
case "pdiff":
fn = &PDiffFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
case "lookup":
fn = &LookupFunction{Num: args[0], Limit: args[1], Operator: operator, RightValue: rightValue}
default:
err = fmt.Errorf("not_supported_method")
}
return
}
## 判斷與發(fā)送事件
func sendEventIfNeed(historyData []*model.HistoryData, isTriggered bool, now int64, event *model.Event, maxStep int) {
lastEvent, exists := g.LastEvents.Get(event.Id)
if isTriggered {
event.Status = "PROBLEM"
if !exists || lastEvent.Status[0] == 'O' {
// 本次觸發(fā)了閾值贷帮,之前又沒(méi)報(bào)過(guò)警,得產(chǎn)生一個(gè)報(bào)警Event
event.CurrentStep = 1
// 但是有些用戶把最大報(bào)警次數(shù)配置成了0诱告,相當(dāng)于屏蔽了撵枢,要檢查一下
if maxStep == 0 {
return
}
sendEvent(event) //發(fā)送事件
return
}
// 邏輯走到這里,說(shuō)明之前Event是PROBLEM狀態(tài)
if lastEvent.CurrentStep >= maxStep {
// 報(bào)警次數(shù)已經(jīng)足夠多,到達(dá)了最多報(bào)警次數(shù)了锄禽,不再報(bào)警
return
}
if historyData[len(historyData)-1].Timestamp <= lastEvent.EventTime {
// 產(chǎn)生過(guò)報(bào)警的點(diǎn)潜必,就不能再使用來(lái)判斷了,否則容易出現(xiàn)一分鐘報(bào)一次的情況
// 只需要拿最后一個(gè)historyData來(lái)做判斷即可沃但,因?yàn)樗臅r(shí)間最老
return
}
if now-lastEvent.EventTime < g.Config().Alarm.MinInterval {
// 報(bào)警不能太頻繁磁滚,兩次報(bào)警之間至少要間隔MinInterval秒,否則就不能報(bào)警
return
}
event.CurrentStep = lastEvent.CurrentStep + 1
sendEvent(event)
} else {
// 如果LastEvent是Problem宵晚,報(bào)OK垂攘,否則啥都不做
if exists && lastEvent.Status[0] == 'P' {
event.Status = "OK" //狀態(tài)轉(zhuǎn)OK
event.CurrentStep = 1
sendEvent(event) //發(fā)送事件
}
}
}
### sendEvent將事件保存至Redis(預(yù)警的異步機(jī)制)
func sendEvent(event *model.Event) {
// update last event
g.LastEvents.Set(event.Id, event) //事件緩存
bs, err := json.Marshal(event) //Json序列化事件
if err != nil {
log.Printf("json marshal event %v fail: %v", event, err)
return
}
// send to redis
redisKey := fmt.Sprintf(g.Config().Alarm.QueuePattern, event.Priority()) //redis鍵名
rc := g.RedisConnPool.Get()
defer rc.Close()
rc.Do("LPUSH", redisKey, string(bs)) //LPUSH存儲(chǔ)
}
- 報(bào)警函數(shù)分析
# Max
# 例如: max(#3)
# 對(duì)于最新的3個(gè)點(diǎn),其最大值滿足閾值條件則報(bào)警
type MaxFunction struct {
Function
Limit int //點(diǎn)數(shù)
Operator string //操作符
RightValue float64 //閥值
}
func (this MaxFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
vs, isEnough = L.HistoryData(this.Limit) //取指定的歷史數(shù)據(jù)
if !isEnough {
return
}
max := vs[0].Value
//取最大值
for i := 1; i < this.Limit; i++ {
if max < vs[i].Value {
max = vs[i].Value
}
}
leftValue = max
isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue) //操作符判斷返回true|false
return
}
# Min
# 如: min(#3)
# 對(duì)于最新的3個(gè)點(diǎn)淤刃,其最小值滿足閾值條件則報(bào)警
type MinFunction struct {
Function
Limit int
Operator string
RightValue float64
}
func (this MinFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
vs, isEnough = L.HistoryData(this.Limit)
if !isEnough {
return
}
min := vs[0].Value
//取最小值
for i := 1; i < this.Limit; i++ {
if min > vs[i].Value {
min = vs[i].Value
}
}
leftValue = min
isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue) //操作符判斷返回true|false
return
}
# All
# 如:all(#3)
# 最新的3個(gè)點(diǎn)都滿足閾值條件則報(bào)警
type AllFunction struct {
Function
Limit int
Operator string
RightValue float64
}
func (this AllFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
vs, isEnough = L.HistoryData(this.Limit)
if !isEnough {
return
}
isTriggered = true
// 遁環(huán)判斷操作條件
for i := 0; i < this.Limit; i++ {
isTriggered = checkIsTriggered(vs[i].Value, this.Operator, this.RightValue) //操作符判斷返回true|false
if !isTriggered {
break
}
}
leftValue = vs[0].Value
return
}
# Lookup
# 如 lookup(#2,3)
# 最新的3個(gè)點(diǎn)中有2個(gè)滿足條件則報(bào)警
type LookupFunction struct {
Function
Num int //條件數(shù)2
Limit int //點(diǎn)數(shù)3
Operator string
RightValue float64
}
func (this LookupFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
vs, isEnough = L.HistoryData(this.Limit)
if !isEnough {
return
}
leftValue = vs[0].Value
for n, i := 0, 0; i < this.Limit; i++ {
if checkIsTriggered(vs[i].Value, this.Operator, this.RightValue) {
n++ //滿足條件則累計(jì)
if n == this.Num { //達(dá)到條件則觸發(fā)
isTriggered = true
return
}
}
}
return
}
# Sum
# 如 sum(#3)
# 對(duì)于最新的3個(gè)點(diǎn)晒他,其和滿足閾值條件則報(bào)警
type SumFunction struct {
Function
Limit int
Operator string
RightValue float64
}
func (this SumFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
vs, isEnough = L.HistoryData(this.Limit)
if !isEnough {
return
}
sum := 0.0
for i := 0; i < this.Limit; i++ {
sum += vs[i].Value //累計(jì)和
}
leftValue = sum
isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue) //操作符判斷返回true|false
return
}
# Avg
# 如 avg(#3)
# 對(duì)于最新的3個(gè)點(diǎn),其平均值滿足閾值條件則報(bào)警
type AvgFunction struct {
Function
Limit int
Operator string
RightValue float64
}
func (this AvgFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
vs, isEnough = L.HistoryData(this.Limit)
if !isEnough {
return
}
sum := 0.0
for i := 0; i < this.Limit; i++ {
sum += vs[i].Value //累計(jì)和
}
leftValue = sum / float64(this.Limit) //求平均
isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue) //操作符判斷返回true|false
return
}
# Diff
# 如 diff(#3)
# 拿最新push上來(lái)的點(diǎn)(被減數(shù))逸贾,與歷史最新的3個(gè)點(diǎn)(3個(gè)減數(shù))相減陨仅,得到3個(gè)差
# 只要有一個(gè)差滿足閾值條件則報(bào)警
type DiffFunction struct {
Function
Limit int
Operator string
RightValue float64
}
// 只要有一個(gè)點(diǎn)的diff觸發(fā)閾值,就報(bào)警
func (this DiffFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
// 此處this.Limit要+1耕陷,因?yàn)橥ǔUf(shuō)diff(#3)掂名,是當(dāng)前點(diǎn)與歷史的3個(gè)點(diǎn)相比較
// 然而最新點(diǎn)已經(jīng)在linkedlist的第一個(gè)位置,所以……
vs, isEnough = L.HistoryData(this.Limit + 1)
if !isEnough {
return
}
if len(vs) == 0 {
isEnough = false
return
}
first := vs[0].Value //最新值
isTriggered = false
for i := 1; i < this.Limit+1; i++ {
// diff是當(dāng)前值減去歷史值
leftValue = first - vs[i].Value
isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)
if isTriggered {
break //只要任何一次滿足判斷條件則返回True觸發(fā)
}
}
return
}
# Pdiff
# 如:pdiff(#3)
# 拿最新push上來(lái)的點(diǎn)哟沫,與歷史最新的3個(gè)點(diǎn)相減饺蔑,得到3個(gè)差
# 再將3個(gè)差值分別除以減數(shù),得到3個(gè)商值嗜诀,只要有一個(gè)商值滿足閾值則報(bào)警
type PDiffFunction struct {
Function
Limit int
Operator string
RightValue float64
}
func (this PDiffFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
vs, isEnough = L.HistoryData(this.Limit + 1)
if !isEnough {
return
}
if len(vs) == 0 {
isEnough = false
return
}
first := vs[0].Value
isTriggered = false
for i := 1; i < this.Limit+1; i++ {
if vs[i].Value == 0 {
continue
}
// 差/Value*100
leftValue = (first - vs[i].Value) / vs[i].Value * 100.0
isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)
if isTriggered {
break
}
}
return
}
# 操作符解析與判斷
func checkIsTriggered(leftValue float64, operator string, rightValue float64) (isTriggered bool) {
switch operator {
case "=", "==":
isTriggered = math.Abs(leftValue-rightValue) < 0.0001
case "!=":
isTriggered = math.Abs(leftValue-rightValue) > 0.0001
case "<":
isTriggered = leftValue < rightValue
case "<=":
isTriggered = leftValue <= rightValue
case ">":
isTriggered = leftValue > rightValue
case ">=":
isTriggered = leftValue >= rightValue
}
return
}
cron.SyncStrategies() 同步HBS策略和表達(dá)式
# 同步策略配置入口函數(shù)
func SyncStrategies() {
duration := time.Duration(g.Config().Hbs.Interval) * time.Second //全局配置間隔
for {
syncStrategies() //同步策略項(xiàng)配置函數(shù)調(diào)用
syncExpression() //同步表達(dá)式配置函數(shù)調(diào)用
syncFilter() //同步過(guò)濾器配置函數(shù)調(diào)用
time.Sleep(duration) //同步間隔
}
}
## RPC調(diào)用"Hbs.GetStrategies"HBS同步策略
func syncStrategies() {
var strategiesResponse model.StrategiesResponse
err := g.HbsClient.Call("Hbs.GetStrategies", model.NullRpcRequest{}, &strategiesResponse) //RPC調(diào)用HBS
if err != nil {
log.Println("[ERROR] Hbs.GetStrategies:", err)
return
}
rebuildStrategyMap(&strategiesResponse) //緩存
}
### 歸整策略數(shù)據(jù)和緩存
func rebuildStrategyMap(strategiesResponse *model.StrategiesResponse) {
//緩存MAP格式 Key => Value
// || ||
// endpoint/metric => [strategy1, strategy2 ...]
m := make(map[string][]model.Strategy)
for _, hs := range strategiesResponse.HostStrategies {
hostname := hs.Hostname
//debug打印
if g.Config().Debug && hostname == g.Config().DebugHost {
log.Println(hostname, "strategies:")
bs, _ := json.Marshal(hs.Strategies)
fmt.Println(string(bs))
}
//數(shù)據(jù)歸整至Map
for _, strategy := range hs.Strategies {
key := fmt.Sprintf("%s/%s", hostname, strategy.Metric)
if _, exists := m[key]; exists {
m[key] = append(m[key], strategy)
} else {
m[key] = []model.Strategy{strategy}
}
}
}
g.StrategyMap.ReInit(m) //初始化全局變量
}
## RPC調(diào)用"Hbs.GetExpressions"HBS同步表達(dá)式
func syncExpression() {
var expressionResponse model.ExpressionResponse
err := g.HbsClient.Call("Hbs.GetExpressions", model.NullRpcRequest{}, &expressionResponse) //RPC調(diào)用HBS
if err != nil {
log.Println("[ERROR] Hbs.GetExpressions:", err)
return
}
rebuildExpressionMap(&expressionResponse) //緩存
}
### 歸整表達(dá)式數(shù)據(jù)和緩存
func rebuildExpressionMap(expressionResponse *model.ExpressionResponse) {
m := make(map[string][]*model.Expression)
//緩存MAP格式 Key => Value
// || ||
// metric/k=v => [expression1, expression2 ...]
for _, exp := range expressionResponse.Expressions {
for k, v := range exp.Tags {
key := fmt.Sprintf("%s/%s=%s", exp.Metric, k, v)
if _, exists := m[key]; exists {
m[key] = append(m[key], exp)
} else {
m[key] = []*model.Expression{exp}
}
}
}
g.ExpressionMap.ReInit(m) //初始化全局變量
}
## 構(gòu)建同步過(guò)濾器map猾警,以Metric為查詢Key
func syncFilter() {
m := make(map[string]string) //緩存map
//M map[string][]model.Strategy
strategyMap := g.StrategyMap.Get() //獲取同步的strategyMap
for _, strategies := range strategyMap {
for _, strategy := range strategies {
m[strategy.Metric] = strategy.Metric
}
} //迭代Metric
//M map[string][]*model.Expression
expressionMap := g.ExpressionMap.Get() //獲取同步的expressionMap
for _, expressions := range expressionMap {
for _, expression := range expressions {
m[expression.Metric] = expression.Metric
}
} //迭代Metric
g.FilterMap.ReInit(m) //初始化全局變量
}
#### 全局(StrategyMap、ExpressionMap隆敢、FilterMap)變量和緩存初始化
var (
StrategyMap = &SafeStrategyMap{M: make(map[string][]model.Strategy)}
ExpressionMap = &SafeExpressionMap{M: make(map[string][]*model.Expression)}
FilterMap = &SafeFilterMap{M: make(map[string]string)}
)
func (this *SafeStrategyMap) ReInit(m map[string][]model.Strategy) {
this.Lock()
defer this.Unlock()
this.M = m
}
func (this *SafeExpressionMap) ReInit(m map[string][]*model.Expression) {
this.Lock()
defer this.Unlock()
this.M = m
}
func (this *SafeFilterMap) ReInit(m map[string]string) {
this.Lock()
defer this.Unlock()
this.M = m
}
####策略結(jié)構(gòu)體定義
type Strategy struct {
Id int `json:"id"`
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
Func string `json:"func"` // e.g. max(#3) all(#3)
Operator string `json:"operator"` // e.g. < !=
RightValue float64 `json:"rightValue"` // critical value
MaxStep int `json:"maxStep"`
Priority int `json:"priority"`
Note string `json:"note"`
Tpl *Template `json:"tpl"` //模版
}
####表達(dá)式結(jié)構(gòu)體定義
type Expression struct {
Id int `json:"id"`
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
Func string `json:"func"` // e.g. max(#3) all(#3)
Operator string `json:"operator"` // e.g. < !=
RightValue float64 `json:"rightValue"` // critical value
MaxStep int `json:"maxStep"`
Priority int `json:"priority"`
Note string `json:"note"`
ActionId int `json:"actionId"` //執(zhí)行動(dòng)作ID
}
cron.CleanStale()
# 定期清理任務(wù)運(yùn)行入口
func CleanStale() {
for {
time.Sleep(time.Hour * 5)
cleanStale() //調(diào)用清理
}
}
##清理7天之前的歷史過(guò)期數(shù)據(jù)
func cleanStale() {
before := time.Now().Unix() - 3600*24*7
arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
for i := 0; i < 16; i++ {
for j := 0; j < 16; j++ {
store.HistoryBigMap[arr[i]+arr[j]].CleanStale(before) //清理BigMap數(shù)據(jù)
}
}
}
#清理實(shí)現(xiàn)
func (this *JudgeItemMap) CleanStale(before int64) {
keys := []string{}
this.RLock()
for key, L := range this.M {
front := L.Front()
if front == nil {
continue
}
//迭代匹配時(shí)間戳发皿,小于則過(guò)期
if front.Value.(*model.JudgeItem).Timestamp < before {
keys = append(keys, key)
}
}
this.RUnlock()
//批量清理
this.BatchDelete(keys)
}
func (this *JudgeItemMap) BatchDelete(keys []string) {
count := len(keys)
if count == 0 {
return
}
this.Lock()
defer this.Unlock()
for i := 0; i < count; i++ {
delete(this.M, keys[i]) //map delete條目
}
}
技術(shù)經(jīng)驗(yàn)借鑒
- BigMap內(nèi)存構(gòu)造緩存歷史數(shù)據(jù)機(jī)制與應(yīng)用
- 針對(duì)報(bào)警計(jì)算funcation的設(shè)計(jì)模式之 "策略模式"應(yīng)用
- SafeLinkedList并發(fā)安全的鏈表LIST操作實(shí)現(xiàn)
擴(kuò)展學(xué)習(xí)
- github.com/garyburd/redigo/redis redis客戶端