OpenFalcon源碼分析(Judge組件)

Judge版本

VERSION = "2.0.2"

Judge組件功能

Judge是用于判斷是否觸發(fā)報(bào)警條件的組件词顾。
Transfer的數(shù)據(jù)不但要轉(zhuǎn)送到Graph來(lái)存儲(chǔ)并繪圖蹈矮,還要轉(zhuǎn)送到Judge用于報(bào)警判斷。Judge先從hbs獲取所有策略列表亲桥,靜等Transfer的數(shù)據(jù)轉(zhuǎn)發(fā)疑务。
每收到一條Transfer轉(zhuǎn)發(fā)過(guò)來(lái)的數(shù)據(jù)暗甥,立即找到這條數(shù)據(jù)關(guān)聯(lián)的Strategy、Expression焚廊,然后做閾值判斷冶匹。【官方描述】

Judge組件邏輯圖

Judge邏輯圖

Portal關(guān)于報(bào)警策略與表達(dá)式定義操作說(shuō)明書(shū)

操作文檔

main主入口分析

func main() {
    cfg := flag.String("c", "cfg.json", "configuration file")
    version := flag.Bool("v", false, "show version")
    flag.Parse()

    if *version {
        fmt.Println(g.VERSION)
        os.Exit(0)
    }

    g.ParseConfig(*cfg)     //全局配置文件解析

    g.InitRedisConnPool()  //初始化Redis連接池 【參考詳細(xì)分析】
    g.InitHbsClient()      //初始化HBS客戶端連接【參考詳細(xì)分析】
 
    store.InitHistoryBigMap()  //BigMap緩存指定采集數(shù)據(jù) 【參考詳細(xì)分析】

    go http.Start()       //Http API服務(wù)啟動(dòng) 【參考詳細(xì)分析】
    go rpc.Start()        //RPC服務(wù)啟動(dòng)      【參考詳細(xì)分析】

    go cron.SyncStrategies()  //周期任務(wù)咆瘟,同步HBS策略和表達(dá)式    【參考詳細(xì)分析】
    go cron.CleanStale()      //周期任務(wù)嚼隘,清理過(guò)時(shí)策略 【參考詳細(xì)分析】

    select {}
}

g.ParseConfig(*cfg) 初始化全局配置文件

type GlobalConfig struct {
    Debug     bool         `json:"debug"`
    DebugHost string       `json:"debugHost"`
    Remain    int          `json:"remain"`
    Http      *HttpConfig  `json:"http"`
    Rpc       *RpcConfig   `json:"rpc"`
    Hbs       *HbsConfig   `json:"hbs"`
    Alarm     *AlarmConfig `json:"alarm"`
}

func Config() *GlobalConfig {
    configLock.RLock()
    defer configLock.RUnlock()
    return config
}

# 解析全局配置
func ParseConfig(cfg string) {
    if cfg == "" {   //參數(shù)配置
        log.Fatalln("use -c to specify configuration file")
    }

    if !file.IsExist(cfg) {  //是否存在
        log.Fatalln("config file:", cfg, "is not existent")
    }

    ConfigFile = cfg

    configContent, err := file.ToTrimString(cfg) //字符串
    if err != nil {
        log.Fatalln("read config file:", cfg, "fail:", err)
    }

    var c GlobalConfig
    err = json.Unmarshal([]byte(configContent), &c) //反序列為結(jié)構(gòu)
    if err != nil {
        log.Fatalln("parse config file:", cfg, "fail:", err)
    }

    configLock.Lock()
    defer configLock.Unlock()

    config = &c

    log.Println("read config file:", cfg, "successfully")
}



g.InitRedisConnPool() 初始化Redis連接池

func InitRedisConnPool() {
    if !Config().Alarm.Enabled {
        return
    }

    dsn := Config().Alarm.Redis.Dsn
    maxIdle := Config().Alarm.Redis.MaxIdle
    idleTimeout := 240 * time.Second

    connTimeout := time.Duration(Config().Alarm.Redis.ConnTimeout) * time.Millisecond
    readTimeout := time.Duration(Config().Alarm.Redis.ReadTimeout) * time.Millisecond
    writeTimeout := time.Duration(Config().Alarm.Redis.WriteTimeout) * time.Millisecond

    RedisConnPool = &redis.Pool{
        MaxIdle:     maxIdle,
        IdleTimeout: idleTimeout,
        Dial: func() (redis.Conn, error) {
            c, err := redis.DialTimeout("tcp", dsn, connTimeout, readTimeout, writeTimeout)
            if err != nil {
                return nil, err
            }
            return c, err
        },
        TestOnBorrow: PingRedis,
    }
}

func PingRedis(c redis.Conn, t time.Time) error {
    _, err := c.Do("ping")
    if err != nil {
        log.Println("[ERROR] ping redis fail", err)
    }
    return err
}

g.InitHbsClient() 實(shí)例化HBS客戶端對(duì)象

func InitHbsClient() {
    HbsClient = &SingleConnRpcClient{
        RpcServers: Config().Hbs.Servers,
        Timeout:    time.Duration(Config().Hbs.Timeout) * time.Millisecond,
    }
}

store.InitHistoryBigMap() 初始化內(nèi)存BigMap,存在采集歷史數(shù)據(jù)

# 創(chuàng)建BigMap([256]JudgeItemMap),存放采集的監(jiān)控歷史數(shù)據(jù)
# [00..f] JudgeItemMap
#  .
#  .
# [f0..f] JudgeItemMap
func InitHistoryBigMap() {
    arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
    for i := 0; i < 16; i++ {
        for j := 0; j < 16; j++ {
            HistoryBigMap[arr[i]+arr[j]] = NewJudgeItemMap() 
        }
    }
}


## 創(chuàng)建與初始化JudgeItemMap -> map[string]*SafeLinkedList
func NewJudgeItemMap() *JudgeItemMap {
    return &JudgeItemMap{M: make(map[string]*SafeLinkedList)}
}

//這是個(gè)線程不安全的大Map袒餐,需要提前初始化好
var HistoryBigMap = make(map[string]*JudgeItemMap)

//JudgeItemMap結(jié)構(gòu)體
type JudgeItemMap struct {
    sync.RWMutex
    M map[string]*SafeLinkedList
}

//SafeLinkedList結(jié)構(gòu)體飞蛹,"container/list"
type SafeLinkedList struct {
    sync.RWMutex
    L *list.List
}

http.Start() HTTP API服務(wù)監(jiān)聽(tīng)與處理

func init() {
    configCommonRoutes()  //組件公共API路由,可參考HBS模塊
    configInfoRoutes()    //信息查詢API路由 
}

func Start() {
    if !g.Config().Http.Enabled {   //開(kāi)啟HTTP
        return
    }

    addr := g.Config().Http.Listen  //全局配置監(jiān)聽(tīng)端品
    if addr == "" {
        return
    }
    s := &http.Server{    
        Addr:           addr,
        MaxHeaderBytes: 1 << 30,
    }
    log.Println("http listening", addr)
    log.Fatalln(s.ListenAndServe())
}

func configInfoRoutes() {
    // e.g. /strategy/lg-dinp-docker01.bj/cpu.idle
    http.HandleFunc("/strategy/", func(w http.ResponseWriter, r *http.Request) {})

    // e.g. /expression/net.port.listen/port=22
    http.HandleFunc("/expression/", func(w http.ResponseWriter, r *http.Request) {})

    //統(tǒng)計(jì)bigmap數(shù)據(jù)總長(zhǎng)度
    http.HandleFunc("/count", func(w http.ResponseWriter, r *http.Request) {})
    //查看BigMap內(nèi)指定的歷史數(shù)據(jù)
    http.HandleFunc("/history/", func(w http.ResponseWriter, r *http.Request) {})

}


rpc.Start() RPC服務(wù)注冊(cè)與處理


func Start() {
    if !g.Config().Rpc.Enabled {
        return
    }
    addr := g.Config().Rpc.Listen
    tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
    if err != nil {
        log.Fatalf("net.ResolveTCPAddr fail: %s", err)
    }

    listener, err := net.ListenTCP("tcp", tcpAddr)
    if err != nil {
        log.Fatalf("listen %s fail: %s", addr, err)
    } else {
        log.Println("rpc listening", addr)
    }

    rpc.Register(new(Judge))   //注冊(cè)Judge

    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Printf("listener.Accept occur error: %s", err)
            continue
        }
        go rpc.ServeConn(conn)
    }
}

## RPC Judge.Ping方法
type Judge int
func (this *Judge) Ping(req model.NullRpcRequest, resp *model.SimpleRpcResponse) error {
    return nil
}
## RPC Judge.Send方法灸眼,Transfer使用此RPC方式上傳數(shù)據(jù)
func (this *Judge) Send(items []*model.JudgeItem, resp *model.SimpleRpcResponse) error {
    remain := g.Config().Remain   //最大保留多少次歷史記錄卧檐,由全局配置文件定義
    // 把當(dāng)前時(shí)間的計(jì)算放在最外層,是為了減少獲取時(shí)間時(shí)的系統(tǒng)調(diào)用開(kāi)銷
    now := time.Now().Unix()
    for _, item := range items {
        exists := g.FilterMap.Exists(item.Metric)  //判斷緩存filtermap是否存在匹配Metric相關(guān)策略焰宣,無(wú)相關(guān)策略將不緩存此數(shù)據(jù)
        if !exists {
            continue
        }
        pk := item.PrimaryKey() //生成HASH key
        store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now)  //緩存歷史數(shù)據(jù)
    }
    return nil
}

//JudgeItem數(shù)據(jù)結(jié)構(gòu)
type JudgeItem struct {
        Endpoint    string            `json:"endpoint"`
        Metric        string            `json:"metric"`
        Value        float64            `json:"value"`
        Timestamp    int64            `json:"timestamp"`
        JudgeType    string            `json:"judgeType"`
        Tags        map[string]string    `json:"tags"`
    }

  • 歷史數(shù)據(jù)的緩存邏輯分析
#生成MD5 Hash值
func (this *JudgeItem) PrimaryKey() string {
    return utils.Md5(utils.PK(this.Endpoint, this.Metric, this.Tags))
}

#基于HASH前兩位做為索引KEY霉囚,存入HistoryBigMap且Judge計(jì)算處理
store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now)

#JudgeItem Map緩存和Judge計(jì)算
func (this *JudgeItemMap) PushFrontAndMaintain(key string, val *model.JudgeItem, maxCount int, now int64) {
    //JudgeItemMap.Get(HASH)找是否存在HASH KEY項(xiàng)列表,如果存在Push
    //數(shù)據(jù)和校驗(yàn)數(shù)據(jù)有效性后Judge計(jì)算匕积;如果不存在則基于HASH key創(chuàng)建
    //JudgeItemMap并Push數(shù)據(jù)盈罐。
    if linkedList, exists := this.Get(key); exists {
        needJudge := linkedList.PushFrontAndMaintain(val, maxCount)
        if needJudge {
            Judge(linkedList, val, now) //【參考Judge邏輯分析】
        }
    } else {
        NL := list.New()
        NL.PushFront(val)   //push into list
        safeList := &SafeLinkedList{L: NL}  //create safelist
        this.Set(key, safeList)    //save into JudgeItemMap[hash]
        Judge(safeList, val, now)  //【參考Judge邏輯分析】
    }
}

# @return needJudge 如果是false不需要做judge,因?yàn)樾律蟻?lái)的數(shù)據(jù)不合法
func (this *SafeLinkedList) PushFrontAndMaintain(v *model.JudgeItem, maxCount int) bool {
    this.Lock()
    defer this.Unlock()

    sz := this.L.Len()
    if sz > 0 {
        // 新push上來(lái)的數(shù)據(jù)有可能重復(fù)了(等于以前ts)闪唆,或者timestamp不對(duì)(小于以前ts)暖呕,這種數(shù)據(jù)要丟掉
        if v.Timestamp <= this.L.Front().Value.(*model.JudgeItem).Timestamp || v.Timestamp <= 0 {
            return false
        }
    }

    this.L.PushFront(v)  //最新數(shù)據(jù)放置在列表首

    sz++
    if sz <= maxCount {
        return true
    }
   
    //達(dá)到最大保存歷史數(shù)據(jù)項(xiàng)條數(shù)后則清除尾部
    del := sz - maxCount
    for i := 0; i < del; i++ {
        this.L.Remove(this.L.Back())   //刪除列表尾記錄
    }

    return true
}
  • Judge報(bào)警判斷邏輯分析
# Judge入口函數(shù)
func Judge(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
    CheckStrategy(L, firstItem, now)   //Strategy處理
    CheckExpression(L, firstItem, now) //Expression處理
}

## 策略檢測(cè)及發(fā)送事件
func CheckStrategy(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
    key := fmt.Sprintf("%s/%s", firstItem.Endpoint, firstItem.Metric)
    strategyMap := g.StrategyMap.Get()
    strategies, exists := strategyMap[key]
    if !exists {
        return
    }

    for _, s := range strategies {
        // 因?yàn)閗ey僅僅是endpoint和metric,所以得到的strategies并不一定是與當(dāng)前judgeItem相關(guān)的
        // 比如lg-dinp-docker01.bj配置了兩個(gè)proc.num的策略苞氮,一個(gè)name=docker,一個(gè)name=agent
        // 所以此處要排除掉一部分
        related := true
        for tagKey, tagVal := range s.Tags {
            if myVal, exists := firstItem.Tags[tagKey]; !exists || myVal != tagVal {
                related = false
                break
            }
        }

        if !related {
            continue
        }

        judgeItemWithStrategy(L, s, firstItem, now)
    }
}

### 判斷采集數(shù)據(jù)瓤逼,如果匹配策略計(jì)算條件則發(fā)送報(bào)警事件
func judgeItemWithStrategy(L *SafeLinkedList, strategy model.Strategy, firstItem *model.JudgeItem, now int64) {
    fn, err := ParseFuncFromString(strategy.Func, strategy.Operator, strategy.RightValue)  //解析報(bào)警函數(shù)
    if err != nil {
        log.Printf("[ERROR] parse func %s fail: %v. strategy id: %d", strategy.Func, err, strategy.Id)
        return
    }

    historyData, leftValue, isTriggered, isEnough := fn.Compute(L)  //執(zhí)行判斷與計(jì)算
    if !isEnough {
        return
    }
  
   // 格式化事件信息
    event := &model.Event{
        Id:         fmt.Sprintf("s_%d_%s", strategy.Id, firstItem.PrimaryKey()),
        Strategy:   &strategy,
        Endpoint:   firstItem.Endpoint,
        LeftValue:  leftValue,
        EventTime:  firstItem.Timestamp,
        PushedTags: firstItem.Tags,
    }

    sendEventIfNeed(historyData, isTriggered, now, event, strategy.MaxStep)   //依據(jù)執(zhí)行判斷結(jié)果決定發(fā)送報(bào)警事件
}

## 表達(dá)式檢測(cè)及發(fā)送事件
func CheckExpression(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
    keys := buildKeysFromMetricAndTags(firstItem)
    if len(keys) == 0 {
        return
    }

    // expression可能會(huì)被多次重復(fù)處理笼吟,用此數(shù)據(jù)結(jié)構(gòu)保證只被處理一次
    handledExpression := make(map[int]struct{})

    expressionMap := g.ExpressionMap.Get()
    for _, key := range keys {
        expressions, exists := expressionMap[key]  //查詢是否存在采集數(shù)據(jù)相對(duì)應(yīng)的表達(dá)式
        if !exists {
            continue
        }

        related := filterRelatedExpressions(expressions, firstItem)   //過(guò)濾與采集數(shù)據(jù)相關(guān)的Expression 
        for _, exp := range related {
            if _, ok := handledExpression[exp.Id]; ok {
                continue
            }
            handledExpression[exp.Id] = struct{}{}
            judgeItemWithExpression(L, exp, firstItem, now)
        }
    }
}

### 過(guò)濾與采集數(shù)據(jù)相關(guān)的Expression
func filterRelatedExpressions(expressions []*model.Expression, firstItem *model.JudgeItem) []*model.Expression {
    size := len(expressions)
    if size == 0 {
        return []*model.Expression{}
    }

    exps := make([]*model.Expression, 0, size)

    for _, exp := range expressions {

        related := true
        
        itemTagsCopy := firstItem.Tags
        // 注意:exp.Tags 中可能會(huì)有一個(gè)endpoint=xxx的tag
        if _, ok := exp.Tags["endpoint"]; ok {
            itemTagsCopy = copyItemTags(firstItem)
        }

        for tagKey, tagVal := range exp.Tags {
            if myVal, exists := itemTagsCopy[tagKey]; !exists || myVal != tagVal {
                related = false
                break
            }
        }

        if !related {
            continue
        }
        exps = append(exps, exp)    //[]exps
    }
    return exps
}

### 判斷采集數(shù)據(jù),如果匹配表達(dá)式計(jì)算條件則發(fā)送報(bào)警事件
func judgeItemWithExpression(L *SafeLinkedList, expression *model.Expression, firstItem *model.JudgeItem, now int64) {
    fn, err := ParseFuncFromString(expression.Func, expression.Operator, expression.RightValue)  //解析報(bào)警函數(shù)
    if err != nil {
        log.Printf("[ERROR] parse func %s fail: %v. expression id: %d", expression.Func, err, expression.Id)
        return
    }

    historyData, leftValue, isTriggered, isEnough := fn.Compute(L)  //執(zhí)行判斷與計(jì)算
    if !isEnough {
        return
    }

    // 格式化事件信息
    event := &model.Event{
        Id:         fmt.Sprintf("e_%d_%s", expression.Id, firstItem.PrimaryKey()),
        Expression: expression,
        Endpoint:   firstItem.Endpoint,
        LeftValue:  leftValue,
        EventTime:  firstItem.Timestamp,
        PushedTags: firstItem.Tags,
    }

    sendEventIfNeed(historyData, isTriggered, now, event, expression.MaxStep) //依據(jù)執(zhí)行判斷結(jié)果決定發(fā)送報(bào)警事件

}

## 解析操作字符串霸旗,轉(zhuǎn)化為報(bào)警函數(shù)
func ParseFuncFromString(str string, operator string, rightValue float64) (fn Function, err error) {
    if str == "" {
        return nil, fmt.Errorf("func can not be null!")
    }
    idx := strings.Index(str, "#") //以#為定位符
    args, err := atois(str[idx+1 : len(str)-1])  //字位符后為參數(shù)
    if err != nil {
        return nil, err
    }

    switch str[:idx-1] {   //定位符前為函數(shù)名
    case "max":
        fn = &MaxFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
    case "min":
        fn = &MinFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
    case "all":
        fn = &AllFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
    case "sum":
        fn = &SumFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
    case "avg":
        fn = &AvgFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
    case "diff":
        fn = &DiffFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
    case "pdiff":
        fn = &PDiffFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
    case "lookup":
        fn = &LookupFunction{Num: args[0], Limit: args[1], Operator: operator, RightValue: rightValue}
    default:
        err = fmt.Errorf("not_supported_method")
    }
    return
}

## 判斷與發(fā)送事件
func sendEventIfNeed(historyData []*model.HistoryData, isTriggered bool, now int64, event *model.Event, maxStep int) {
    lastEvent, exists := g.LastEvents.Get(event.Id)
    if isTriggered {
        event.Status = "PROBLEM"
        if !exists || lastEvent.Status[0] == 'O' {
            // 本次觸發(fā)了閾值贷帮,之前又沒(méi)報(bào)過(guò)警,得產(chǎn)生一個(gè)報(bào)警Event
            event.CurrentStep = 1

            // 但是有些用戶把最大報(bào)警次數(shù)配置成了0诱告,相當(dāng)于屏蔽了撵枢,要檢查一下
            if maxStep == 0 {
                return
            }

            sendEvent(event)  //發(fā)送事件
            return
        }

        // 邏輯走到這里,說(shuō)明之前Event是PROBLEM狀態(tài)
        if lastEvent.CurrentStep >= maxStep {
            // 報(bào)警次數(shù)已經(jīng)足夠多,到達(dá)了最多報(bào)警次數(shù)了锄禽,不再報(bào)警
            return
        }

        if historyData[len(historyData)-1].Timestamp <= lastEvent.EventTime {
            // 產(chǎn)生過(guò)報(bào)警的點(diǎn)潜必,就不能再使用來(lái)判斷了,否則容易出現(xiàn)一分鐘報(bào)一次的情況
            // 只需要拿最后一個(gè)historyData來(lái)做判斷即可沃但,因?yàn)樗臅r(shí)間最老
            return
        }

        if now-lastEvent.EventTime < g.Config().Alarm.MinInterval {
            // 報(bào)警不能太頻繁磁滚,兩次報(bào)警之間至少要間隔MinInterval秒,否則就不能報(bào)警
            return
        }

        event.CurrentStep = lastEvent.CurrentStep + 1
        sendEvent(event)
    } else {
        // 如果LastEvent是Problem宵晚,報(bào)OK垂攘,否則啥都不做
        if exists && lastEvent.Status[0] == 'P' {
            event.Status = "OK"          //狀態(tài)轉(zhuǎn)OK
            event.CurrentStep = 1
            sendEvent(event)  //發(fā)送事件
        }
    }
}

### sendEvent將事件保存至Redis(預(yù)警的異步機(jī)制)
func sendEvent(event *model.Event) {
    // update last event
    g.LastEvents.Set(event.Id, event)  //事件緩存

    bs, err := json.Marshal(event)     //Json序列化事件
    if err != nil {
        log.Printf("json marshal event %v fail: %v", event, err)
        return
    }

    // send to redis
    redisKey := fmt.Sprintf(g.Config().Alarm.QueuePattern, event.Priority())                        //redis鍵名
    rc := g.RedisConnPool.Get()
    defer rc.Close()
    rc.Do("LPUSH", redisKey, string(bs))  //LPUSH存儲(chǔ)
}

  • 報(bào)警函數(shù)分析
# Max
# 例如: max(#3)
#    對(duì)于最新的3個(gè)點(diǎn),其最大值滿足閾值條件則報(bào)警
type MaxFunction struct {
    Function
    Limit      int     //點(diǎn)數(shù)
    Operator   string  //操作符
    RightValue float64 //閥值 
}
func (this MaxFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    vs, isEnough = L.HistoryData(this.Limit) //取指定的歷史數(shù)據(jù)
    if !isEnough {
        return
    }
 
    max := vs[0].Value
    //取最大值 
    for i := 1; i < this.Limit; i++ {
        if max < vs[i].Value {
            max = vs[i].Value
        }
    }

    leftValue = max
    isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue) //操作符判斷返回true|false
    return
}

# Min
# 如: min(#3)
#     對(duì)于最新的3個(gè)點(diǎn)淤刃,其最小值滿足閾值條件則報(bào)警
type MinFunction struct {
    Function
    Limit      int
    Operator   string
    RightValue float64
}
func (this MinFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    vs, isEnough = L.HistoryData(this.Limit)
    if !isEnough {
        return
    }

    min := vs[0].Value
    //取最小值
    for i := 1; i < this.Limit; i++ {
        if min > vs[i].Value {
            min = vs[i].Value
        }
    }

    leftValue = min
    isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue) //操作符判斷返回true|false
    return
}

# All
# 如:all(#3)
#    最新的3個(gè)點(diǎn)都滿足閾值條件則報(bào)警
type AllFunction struct {
    Function
    Limit      int
    Operator   string
    RightValue float64
}
func (this AllFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    vs, isEnough = L.HistoryData(this.Limit)
    if !isEnough {
        return
    }

    isTriggered = true
    // 遁環(huán)判斷操作條件
    for i := 0; i < this.Limit; i++ {
        isTriggered = checkIsTriggered(vs[i].Value, this.Operator, this.RightValue) //操作符判斷返回true|false
        if !isTriggered {
            break
        }
    }

    leftValue = vs[0].Value
    return
}

# Lookup
# 如 lookup(#2,3)
#    最新的3個(gè)點(diǎn)中有2個(gè)滿足條件則報(bào)警
type LookupFunction struct {
    Function
    Num        int   //條件數(shù)2
    Limit      int   //點(diǎn)數(shù)3
    Operator   string
    RightValue float64
}
func (this LookupFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    vs, isEnough = L.HistoryData(this.Limit)
    if !isEnough {
        return
    }

    leftValue = vs[0].Value

    for n, i := 0, 0; i < this.Limit; i++ {
        if checkIsTriggered(vs[i].Value, this.Operator, this.RightValue) {
            n++           //滿足條件則累計(jì)
            if n == this.Num {     //達(dá)到條件則觸發(fā)
                isTriggered = true
                return
            }
        }
    }

    return
}

# Sum
# 如 sum(#3)
#    對(duì)于最新的3個(gè)點(diǎn)晒他,其和滿足閾值條件則報(bào)警
type SumFunction struct {
    Function
    Limit      int  
    Operator   string
    RightValue float64
}
func (this SumFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    vs, isEnough = L.HistoryData(this.Limit)
    if !isEnough {
        return
    }

    sum := 0.0
    for i := 0; i < this.Limit; i++ {
        sum += vs[i].Value     //累計(jì)和
    }

    leftValue = sum
    isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)    //操作符判斷返回true|false
    return 
}

# Avg
# 如 avg(#3)
#    對(duì)于最新的3個(gè)點(diǎn),其平均值滿足閾值條件則報(bào)警
type AvgFunction struct {
    Function
    Limit      int
    Operator   string
    RightValue float64
}
func (this AvgFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    vs, isEnough = L.HistoryData(this.Limit)
    if !isEnough {
        return
    }

    sum := 0.0
    for i := 0; i < this.Limit; i++ {
        sum += vs[i].Value              //累計(jì)和
    }

    leftValue = sum / float64(this.Limit)  //求平均
    isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)     //操作符判斷返回true|false
    return
}

# Diff
# 如 diff(#3)
#  拿最新push上來(lái)的點(diǎn)(被減數(shù))逸贾,與歷史最新的3個(gè)點(diǎn)(3個(gè)減數(shù))相減陨仅,得到3個(gè)差
#  只要有一個(gè)差滿足閾值條件則報(bào)警
type DiffFunction struct {
    Function
    Limit      int
    Operator   string
    RightValue float64
}
// 只要有一個(gè)點(diǎn)的diff觸發(fā)閾值,就報(bào)警
func (this DiffFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    // 此處this.Limit要+1耕陷,因?yàn)橥ǔUf(shuō)diff(#3)掂名,是當(dāng)前點(diǎn)與歷史的3個(gè)點(diǎn)相比較
    // 然而最新點(diǎn)已經(jīng)在linkedlist的第一個(gè)位置,所以……
    vs, isEnough = L.HistoryData(this.Limit + 1)
    if !isEnough {
        return
    }

    if len(vs) == 0 {
        isEnough = false
        return
    }

    first := vs[0].Value  //最新值

    isTriggered = false
    for i := 1; i < this.Limit+1; i++ {
        // diff是當(dāng)前值減去歷史值
        leftValue = first - vs[i].Value
        isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)  
        if isTriggered {
            break   //只要任何一次滿足判斷條件則返回True觸發(fā)
        }
    }
    return
}

# Pdiff 
# 如:pdiff(#3)
#    拿最新push上來(lái)的點(diǎn)哟沫,與歷史最新的3個(gè)點(diǎn)相減饺蔑,得到3個(gè)差
#    再將3個(gè)差值分別除以減數(shù),得到3個(gè)商值嗜诀,只要有一個(gè)商值滿足閾值則報(bào)警
type PDiffFunction struct {
    Function
    Limit      int
    Operator   string
    RightValue float64
}
func (this PDiffFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    vs, isEnough = L.HistoryData(this.Limit + 1)
    if !isEnough {
        return
    }

    if len(vs) == 0 {
        isEnough = false
        return
    }

    first := vs[0].Value

    isTriggered = false
    for i := 1; i < this.Limit+1; i++ {
        if vs[i].Value == 0 {
            continue
        }
        
        // 差/Value*100
        leftValue = (first - vs[i].Value) / vs[i].Value * 100.0
        isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)
        if isTriggered {
            break
        }
    }
    return
}

# 操作符解析與判斷
func checkIsTriggered(leftValue float64, operator string, rightValue float64) (isTriggered bool) {
    switch operator {
    case "=", "==":
        isTriggered = math.Abs(leftValue-rightValue) < 0.0001
    case "!=":
        isTriggered = math.Abs(leftValue-rightValue) > 0.0001
    case "<":
        isTriggered = leftValue < rightValue
    case "<=":
        isTriggered = leftValue <= rightValue
    case ">":
        isTriggered = leftValue > rightValue
    case ">=":
        isTriggered = leftValue >= rightValue
    }
    return
}

cron.SyncStrategies() 同步HBS策略和表達(dá)式


# 同步策略配置入口函數(shù)
func SyncStrategies() {
    duration := time.Duration(g.Config().Hbs.Interval) * time.Second   //全局配置間隔
    for {
        syncStrategies()   //同步策略項(xiàng)配置函數(shù)調(diào)用
        syncExpression()   //同步表達(dá)式配置函數(shù)調(diào)用
        syncFilter()       //同步過(guò)濾器配置函數(shù)調(diào)用
        time.Sleep(duration)  //同步間隔
    }
}


## RPC調(diào)用"Hbs.GetStrategies"HBS同步策略
func syncStrategies() {
    var strategiesResponse model.StrategiesResponse
    err := g.HbsClient.Call("Hbs.GetStrategies", model.NullRpcRequest{}, &strategiesResponse) //RPC調(diào)用HBS
    if err != nil {
        log.Println("[ERROR] Hbs.GetStrategies:", err)
        return
    }
    rebuildStrategyMap(&strategiesResponse) //緩存
}
### 歸整策略數(shù)據(jù)和緩存
func rebuildStrategyMap(strategiesResponse *model.StrategiesResponse) {
    //緩存MAP格式 Key    =>  Value
    //            ||         ||          
    //  endpoint/metric => [strategy1, strategy2 ...]
    m := make(map[string][]model.Strategy)
    for _, hs := range strategiesResponse.HostStrategies {
        hostname := hs.Hostname
        //debug打印
        if g.Config().Debug && hostname == g.Config().DebugHost {
            log.Println(hostname, "strategies:")
            bs, _ := json.Marshal(hs.Strategies)
            fmt.Println(string(bs))
        }
        //數(shù)據(jù)歸整至Map
        for _, strategy := range hs.Strategies {
            key := fmt.Sprintf("%s/%s", hostname, strategy.Metric) 
            if _, exists := m[key]; exists {
                m[key] = append(m[key], strategy)
            } else {
                m[key] = []model.Strategy{strategy}
            }
        }
    }

    g.StrategyMap.ReInit(m)  //初始化全局變量
}


## RPC調(diào)用"Hbs.GetExpressions"HBS同步表達(dá)式
func syncExpression() {
    var expressionResponse model.ExpressionResponse
    err := g.HbsClient.Call("Hbs.GetExpressions", model.NullRpcRequest{}, &expressionResponse)  //RPC調(diào)用HBS
    if err != nil {
        log.Println("[ERROR] Hbs.GetExpressions:", err)
        return
    }

    rebuildExpressionMap(&expressionResponse)  //緩存
}
### 歸整表達(dá)式數(shù)據(jù)和緩存

func rebuildExpressionMap(expressionResponse *model.ExpressionResponse) {
    m := make(map[string][]*model.Expression)
         //緩存MAP格式 Key    =>    Value
         //          ||            ||          
         //      metric/k=v  => [expression1, expression2 ...]
    for _, exp := range expressionResponse.Expressions {
        for k, v := range exp.Tags {
            key := fmt.Sprintf("%s/%s=%s", exp.Metric, k, v)
            if _, exists := m[key]; exists {
                m[key] = append(m[key], exp)
            } else {
                m[key] = []*model.Expression{exp}
            }
        }
    }

    g.ExpressionMap.ReInit(m)  //初始化全局變量
}


## 構(gòu)建同步過(guò)濾器map猾警,以Metric為查詢Key
func syncFilter() {
    m := make(map[string]string)  //緩存map

    //M map[string][]model.Strategy
    strategyMap := g.StrategyMap.Get() //獲取同步的strategyMap
    for _, strategies := range strategyMap {
        for _, strategy := range strategies {
            m[strategy.Metric] = strategy.Metric
        }
    }  //迭代Metric

    //M map[string][]*model.Expression 
    expressionMap := g.ExpressionMap.Get() //獲取同步的expressionMap
    for _, expressions := range expressionMap {
        for _, expression := range expressions {
            m[expression.Metric] = expression.Metric
        }
    }  //迭代Metric
    g.FilterMap.ReInit(m)   //初始化全局變量
}

#### 全局(StrategyMap、ExpressionMap隆敢、FilterMap)變量和緩存初始化
var (
    StrategyMap   = &SafeStrategyMap{M: make(map[string][]model.Strategy)}
    ExpressionMap = &SafeExpressionMap{M: make(map[string][]*model.Expression)}
    FilterMap     = &SafeFilterMap{M: make(map[string]string)}
)

func (this *SafeStrategyMap) ReInit(m map[string][]model.Strategy) {
    this.Lock()
    defer this.Unlock()
    this.M = m
}

func (this *SafeExpressionMap) ReInit(m map[string][]*model.Expression) {
    this.Lock()
    defer this.Unlock()
    this.M = m
}

func (this *SafeFilterMap) ReInit(m map[string]string) {
    this.Lock()
    defer this.Unlock()
    this.M = m
}


####策略結(jié)構(gòu)體定義
type Strategy struct {
    Id         int               `json:"id"`
    Metric     string            `json:"metric"`
    Tags       map[string]string `json:"tags"`
    Func       string            `json:"func"`       // e.g. max(#3) all(#3)
    Operator   string            `json:"operator"`   // e.g. < !=
    RightValue float64           `json:"rightValue"` // critical value
    MaxStep    int               `json:"maxStep"`
    Priority   int               `json:"priority"`
    Note       string            `json:"note"`
    Tpl        *Template         `json:"tpl"`        //模版
}

####表達(dá)式結(jié)構(gòu)體定義
type Expression struct {
    Id         int               `json:"id"`
    Metric     string            `json:"metric"`
    Tags       map[string]string `json:"tags"`
    Func       string            `json:"func"`       // e.g. max(#3) all(#3)
    Operator   string            `json:"operator"`   // e.g. < !=
    RightValue float64           `json:"rightValue"` // critical value
    MaxStep    int               `json:"maxStep"`
    Priority   int               `json:"priority"`
    Note       string            `json:"note"`
    ActionId   int               `json:"actionId"`   //執(zhí)行動(dòng)作ID
}

cron.CleanStale()

# 定期清理任務(wù)運(yùn)行入口
func CleanStale() {
    for {
        time.Sleep(time.Hour * 5)
        cleanStale()  //調(diào)用清理
    }
}

##清理7天之前的歷史過(guò)期數(shù)據(jù)
func cleanStale() {
    before := time.Now().Unix() - 3600*24*7

    arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
    for i := 0; i < 16; i++ {
        for j := 0; j < 16; j++ {
            store.HistoryBigMap[arr[i]+arr[j]].CleanStale(before) //清理BigMap數(shù)據(jù)
        }
    }
}

#清理實(shí)現(xiàn)
func (this *JudgeItemMap) CleanStale(before int64) {
    keys := []string{}

    this.RLock()
    for key, L := range this.M {
        front := L.Front()
        if front == nil {
            continue
        }
        //迭代匹配時(shí)間戳发皿,小于則過(guò)期
        if front.Value.(*model.JudgeItem).Timestamp < before {
            keys = append(keys, key)
        }
    }
    this.RUnlock()
   
    //批量清理
    this.BatchDelete(keys)
}

func (this *JudgeItemMap) BatchDelete(keys []string) {
    count := len(keys)
    if count == 0 {
        return
    }

    this.Lock()
    defer this.Unlock()
    for i := 0; i < count; i++ {
        delete(this.M, keys[i])  //map delete條目
    }
}


技術(shù)經(jīng)驗(yàn)借鑒

  • BigMap內(nèi)存構(gòu)造緩存歷史數(shù)據(jù)機(jī)制與應(yīng)用
  • 針對(duì)報(bào)警計(jì)算funcation的設(shè)計(jì)模式之 "策略模式"應(yīng)用
  • SafeLinkedList并發(fā)安全的鏈表LIST操作實(shí)現(xiàn)

擴(kuò)展學(xué)習(xí)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市拂蝎,隨后出現(xiàn)的幾起案子穴墅,更是在濱河造成了極大的恐慌,老刑警劉巖温自,帶你破解...
    沈念sama閱讀 217,826評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件玄货,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡悼泌,警方通過(guò)查閱死者的電腦和手機(jī)松捉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)馆里,“玉大人隘世,你說(shuō)我怎么就攤上這事可柿。” “怎么了丙者?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,234評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵复斥,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我蔓钟,道長(zhǎng)永票,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,562評(píng)論 1 293
  • 正文 為了忘掉前任滥沫,我火速辦了婚禮侣集,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘兰绣。我一直安慰自己世分,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,611評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布缀辩。 她就那樣靜靜地躺著臭埋,像睡著了一般。 火紅的嫁衣襯著肌膚如雪臀玄。 梳的紋絲不亂的頭發(fā)上瓢阴,一...
    開(kāi)封第一講書(shū)人閱讀 51,482評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音健无,去河邊找鬼荣恐。 笑死,一個(gè)胖子當(dāng)著我的面吹牛累贤,可吹牛的內(nèi)容都是我干的叠穆。 我是一名探鬼主播,決...
    沈念sama閱讀 40,271評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼臼膏,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼硼被!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起渗磅,我...
    開(kāi)封第一講書(shū)人閱讀 39,166評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤嚷硫,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后始鱼,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體论巍,經(jīng)...
    沈念sama閱讀 45,608評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,814評(píng)論 3 336
  • 正文 我和宋清朗相戀三年风响,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片丹禀。...
    茶點(diǎn)故事閱讀 39,926評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡状勤,死狀恐怖鞋怀,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情持搜,我是刑警寧澤密似,帶...
    沈念sama閱讀 35,644評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站葫盼,受9級(jí)特大地震影響残腌,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜贫导,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,249評(píng)論 3 329
  • 文/蒙蒙 一抛猫、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧孩灯,春花似錦闺金、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,866評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至讥巡,卻和暖如春掀亩,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背欢顷。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,991評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工槽棍, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人吱涉。 一個(gè)月前我還...
    沈念sama閱讀 48,063評(píng)論 3 370
  • 正文 我出身青樓刹泄,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親怎爵。 傳聞我的和親對(duì)象是個(gè)殘疾皇子特石,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,871評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容