OpenFalcon 源碼分析(Nodata組件)

Nodata版本

VERSION = "0.0.11"

Nodata組件功能

nodata用于檢測監(jiān)控數(shù)據(jù)的上報異常沸久。nodata和實(shí)時報警judge模塊協(xié)同工作,過程為: 配置了nodata的采集項(xiàng)超時未上報數(shù)據(jù)死相,nodata生成一條默認(rèn)的模擬數(shù)據(jù);用戶配置相應(yīng)的報警策略,收到mock數(shù)據(jù)就產(chǎn)生報警。采集項(xiàng)上報異常檢測轿偎,作為judge模塊的一個必要補(bǔ)充,能夠使judge的實(shí)時報警功能更加可靠甩挫、完善贴硫。【官方描述】

Nodata組件邏輯圖

系統(tǒng)流圖

官方系統(tǒng)流圖

模塊結(jié)構(gòu)

官方模塊結(jié)構(gòu)圖

main入口分析


func main() {
    //命令參數(shù)解析
    cfg := flag.String("c", "cfg.json", "configuration file")
    version := flag.Bool("v", false, "show version")
    versionGit := flag.Bool("vg", false, "show version")
    flag.Parse()
    //版本輸出
    if *version {
        fmt.Println(g.VERSION)
        os.Exit(0)
    }
    //gitcommit序列號輸出
    if *versionGit {
        fmt.Println(g.VERSION, g.COMMIT)
        os.Exit(0)
    }

    // 全局配置格式化
    g.ParseConfig(*cfg)
    // 統(tǒng)計
    g.StartProc()

    // 緩存Nodata配置
    config.Start()              // 【參考詳細(xì)分析】
    // 緩存Nodata配置主機(jī)的采集數(shù)據(jù)點(diǎn)
    collector.Start()           // 【參考詳細(xì)分析】
    // judge策略判斷 
    judge.Start()               // 【參考詳細(xì)分析】

    // http API服務(wù)
    http.Start()                // 【參考詳細(xì)分析】

    select {}
}


config.Start() 從DB加載Nodata配置(dashboard配置nodata策略寫入mysql)

# 加載nodata配置主函數(shù)
func Start() {
    if !g.Config().Config.Enabled {
        log.Println("config.Start warning, not enabled")
        return
    }

    service.InitDB()     //初始化DB
    StartNdConfigCron()  //加載nodata配置緩存至內(nèi)存 
    log.Println("config.Start ok")
}

## 初始化DB連接
func InitDB() {
    _, err := GetDbConn(dbBaseConnName)   //"db.base"連接conn初始化并保存至內(nèi)存Map
    if err != nil {
        log.Fatalln("config.InitDB error", err)
        return // never go here
    }

    log.Println("config.InitDB ok")
}

### GetDbConn實(shí)現(xiàn)函數(shù)
### makeDbConn函數(shù)實(shí)現(xiàn)sql客戶端連接dbconn
### 內(nèi)存map dbConnMap[connName]保存dbconn
func GetDbConn(connName string) (c *sql.DB, e error) {
    dbLock.Lock()
    defer dbLock.Unlock()

    var err error
    var dbConn *sql.DB
    dbConn = dbConnMap[connName]
    if dbConn == nil {
        dbConn, err = makeDbConn()  //創(chuàng)建sql客戶端連接
        if err != nil {
            closeDbConn(dbConn)
            return nil, err
        }
        dbConnMap[connName] = dbConn
    }

    err = dbConn.Ping()      //dbconn檢測伊者,conn.Ping()
    if err != nil {
        closeDbConn(dbConn)  //dbconn關(guān)閉英遭,conn.Close()
        delete(dbConnMap, connName)
        return nil, err
    }

    return dbConn, err
}

func makeDbConn() (conn *sql.DB, err error) {
    conn, err = sql.Open("mysql", g.Config().Config.Dsn)
    if err != nil {
        return nil, err
    }

    conn.SetMaxIdleConns(int(g.Config().Config.MaxIdle))
    err = conn.Ping()

    return conn, err
}

##  周期任務(wù)同步nodata配置
func StartNdConfigCron() {
    ndconfigCron.AddFuncCC(ndconfigCronSpec, func() {
        start := time.Now().Unix()
        cnt, _ := syncNdConfig()   // 同步nodata配置
        end := time.Now().Unix()
        if g.Config().Debug {
            log.Printf("config cron, cnt %d, time %ds, start %s\n", cnt, end-start, ttime.FormatTs(start))
        }

        // 統(tǒng)計
        g.ConfigCronCnt.Incr()
        g.ConfigLastTs.SetCnt(end - start)
        g.ConfigLastCnt.SetCnt(int64(cnt))
    }, 1)
    ndconfigCron.Start()          //start cron
} 

#### 獲取nodata配置、重新格式化及緩存配置全局公開Map(NdConfigMap)
func syncNdConfig() (cnt int, errt error) {
    // 獲取nodata配置函數(shù)調(diào)用
    configs := service.GetMockCfgFromDB()
    // 重新格式化配置NodateConfig結(jié)構(gòu)
    nm := nmap.NewSafeMap()
    for _, ndc := range configs {
        endpoint := ndc.Endpoint
        metric := ndc.Metric
        tags := ndc.Tags
        if endpoint == "" {
            log.Printf("bad config: %+v\n", ndc)
            continue
        }
        pk := cutils.PK(endpoint, metric, tags)
        nm.Put(pk, ndc)
    }

    // 緩存map
    SetNdConfigMap(nm)

    return nm.Size(), nil    //返回map長度
}

##### 底層獲取nodata配置實(shí)現(xiàn)函數(shù)
func GetMockCfgFromDB() map[string]*cmodel.NodataConfig {
    ret := make(map[string]*cmodel.NodataConfig)

    dbConn, err := GetDbConn("nodata.mockcfg") //獲取dbConn連接
    if err != nil {
        log.Println("db.get_conn error, mockcfg", err)
        return ret
    }

    q := fmt.Sprintf("SELECT id,name,obj,obj_type,metric,tags,dstype,step,mock FROM mockcfg")
    rows, err := dbConn.Query(q) //執(zhí)行mockcfg表查詢語句
    if err != nil {
        log.Println("db.query error, mockcfg", err)
        return ret
    }

    defer rows.Close()
    for rows.Next() {         //迭代查詢結(jié)果集
        t := MockCfg{}
        tags := ""
        err := rows.Scan(&t.Id, &t.Name, &t.Obj, &t.ObjType, &t.Metric, &tags, &t.Type, &t.Step, &t.Mock)
        if err != nil {
            log.Println("db.scan error, mockcfg", err)
            continue
        }
        t.Tags = cutils.DictedTagstring(tags) //"tagskey=value"格式化map[Key]Value

        err = checkMockCfg(&t) //檢測配置是否有效
        if err != nil {
            log.Println("check mockcfg, error:", err)
            continue
        }

        endpoints := getEndpoint(t.ObjType, t.Obj) //獲取endpoint列表(hosts slice),后面有objtype為"host/group/other"處理函數(shù)分析亦渗。
       if len(endpoints) < 1 {
            continue
        }

        for _, ep := range endpoints {
            uuid := cutils.PK(ep, t.Metric, t.Tags)  //UUID format 'endpoint/metric/k=v,k=v(tags)'
            ncfg := cmodel.NewNodataConfig(t.Id, t.Name, t.ObjType, ep, t.Metric, t.Tags, t.Type, t.Step, t.Mock)

            val, found := ret[uuid]
            if !found {   //如果不存在則新建
                ret[uuid] = ncfg
                continue
            }

            //如果存在挖诸,判斷配置類型
            if isSpuerNodataCfg(val, ncfg) {
                // val is spuer than ncfg, so drop ncfg
                log.Printf("nodata.mockcfg conflict, %s, used %s, drop %s", uuid, val.Name, ncfg.Name)
            } else {
                ret[uuid] = ncfg //如果原val配置類型為group,而新ncfg配置類型為host則覆蓋原有配置 
                log.Printf("nodata.mockcfg conflict, %s, used %s, drop %s", uuid, ncfg.Name, val.Name)
            }
        }
    }

    return ret   //返回map[UUID]*cmodel.NodataConfig
}


###### 根據(jù)objType獲取Hosts slice
func getEndpoint(objType string, obj string) []string {
    switch objType {
    case "host":         //類型Host與處理函數(shù)
        return getEndpointFromHosts(obj)
    case "group":        //類型group與處理函數(shù)
        return getEndpointFromGroups(obj)
    case "other":        //類型other與處理函數(shù)
        return getEndpointFromOther(obj)
    default:
        return make([]string, 0)
    }
}
//類型Host與處理函數(shù)
func getEndpointFromHosts(hosts string) []string {
    ret := make([]string, 0)

    hlist := strings.Split(hosts, "\n")  //分隔處理
    for _, host := range hlist {
        nh := strings.TrimSpace(host)
        if nh != "" {
            ret = append(ret, nh)
        }
    }

    return ret
}
//類型group與處理函數(shù)
func getEndpointFromGroups(grps string) []string {
    grplist := strings.Split(grps, "\n")

    // get host map, avoid duplicating
    hosts := make(map[string]string)
    for _, grp := range grplist {
        ngrp := strings.TrimSpace(grp)
        if len(ngrp) < 1 {
            continue
        }

        hostmap := GetHostsFromGroup(grp) //根據(jù)Group名稱獲取主機(jī)MAP
        for hostname := range hostmap {
            if hostname != "" {
                hosts[hostname] = hostname
            }
        }
    }

    // get host slice
    ret := make([]string, 0)
    for key := range hosts {
        ret = append(ret, key)
    }

    return ret
}
//類型other與處理函數(shù),同類型Host與處理函數(shù)
func getEndpointFromOther(other string) []string {
    return getEndpointFromHosts(other)
}

collector.Start() 緩存Nodata配置主機(jī)的采集數(shù)據(jù)點(diǎn)


# 運(yùn)行收集nodata數(shù)據(jù)主函數(shù)
func Start() {
    if !g.Config().Collector.Enabled {
        log.Println("collector.Start warning, not enabled")
        return
    }

    StartCollectorCron()   //周期任務(wù)法精,收集nodata數(shù)據(jù)
    log.Println("collector.Start ok")
}
## 定時任務(wù)執(zhí)行多律,收集函數(shù)調(diào)用與任務(wù)運(yùn)行 
func StartCollectorCron() {
    collectorCron.AddFuncCC("*/20 * * * * ?", func() {
        start := time.Now().Unix()
        cnt := collectDataOnce()   //收集函數(shù)調(diào)用
        end := time.Now().Unix()
        if g.Config().Debug {
            log.Printf("collect cron, cnt %d, time %ds, start %s\n", cnt, end-start, ttime.FormatTs(start))
        }

        //統(tǒng)計
        g.CollectorCronCnt.Incr()
        g.CollectorLastTs.SetCnt(end - start)
        g.CollectorLastCnt.SetCnt(int64(cnt))
        g.CollectorCnt.IncrBy(int64(cnt))
    }, 1)
    collectorCron.Start()
}

### 收集功能實(shí)現(xiàn)函數(shù)
func collectDataOnce() int {
    keys := config.Keys()
    keysLen := len(keys)

    // 并發(fā)+同步控制
    cfg := g.Config().Collector    //collector全局配置
    concurrent := int(cfg.Concurrent) //并發(fā)數(shù)
    if concurrent < 1 || concurrent > 50 {
        concurrent = 10
    }
    sema := tsema.NewSemaphore(concurrent)   //創(chuàng)建并發(fā)同步

    batch := int(cfg.Batch)  //全局批量處理數(shù)
    if batch < 100 || batch > 1000 {
        batch = 200 //batch不能太小, 否則channel將會很大
    }

    //根據(jù)nodata配置數(shù)長度和批量處理數(shù)創(chuàng)建channel長度
    batchCnt := (keysLen + batch - 1) / batch
    rch := make(chan int, batchCnt+1)

    i := 0
    for i < keysLen {     
        leftLen := keysLen - i
        fetchSize := batch // 每次處理batch個配置
        if leftLen < fetchSize {
            fetchSize = leftLen
        }
        fetchKeys := keys[i : i+fetchSize]

        // 并發(fā)collect數(shù)據(jù)
        sema.Acquire()
        //線程批量處理(fetchKeys, fetchSize)
        go func(keys []string, keySize int) {
            defer sema.Release()
            size, err := fetchItemsAndStore(keys, keySize)//批查獲取函數(shù)調(diào)用
            if err != nil {
                log.Printf("fetchItemAndStore fail, size:%v, error:%v", size, err)
            }
            if g.Config().Debug {
                log.Printf("fetchItemAndStore keys:%v, key_size:%v, ret_size:%v", keys, keySize, size)
            }
            rch <- size
        }(fetchKeys, fetchSize) 

        i += fetchSize
    }

    collectCnt := 0
    //計數(shù)
    for i := 0; i < batchCnt; i++ {
        select {
        case cnt := <-rch:
            collectCnt += cnt
        }
    }

    return collectCnt
}

#### 獲取數(shù)據(jù)實(shí)現(xiàn)函數(shù)
func fetchItemsAndStore(fetchKeys []string, fetchSize int) (size int, errt error) {
    if fetchSize < 1 {
        return
    }

    // form request args
    args := make([]*cmodel.GraphLastParam, 0)
    for _, key := range fetchKeys {
        ndcfg, found := config.GetNdConfig(key) //根據(jù)hostname返回nodata配置
        if !found {
            continue
        }

        endpoint := ndcfg.Endpoint //endpoint主機(jī)對象
        counter := cutils.Counter(ndcfg.Metric, ndcfg.Tags) // 格式metric/tags(k=v,k=v)
        arg := &cmodel.GraphLastParam{endpoint, counter} //請求參數(shù)
        args = append(args, arg)
    }
    if len(args) < 1 {
        return
    }

    resp, err := queryLastPoints(args)//API調(diào)用查詢endpoint最近一次采集數(shù)據(jù)。(POST請求API組件api調(diào)用搂蜓,查看后面函數(shù)分析)
    if err != nil {
        return 0, err
    }

    // store items
    fts := time.Now().Unix()   //存儲Items時間float time,Judge邏輯用到
    for _, glr := range resp {
        //log.Printf("collect:%v\n", glr)
        if glr == nil || glr.Value == nil {
            continue
        }
        AddItem(cutils.PK2(glr.Endpoint, glr.Counter), NewDataItem(glr.Value.Timestamp, float64(glr.Value.Value), "OK", fts))  //緩存收集到的監(jiān)控數(shù)據(jù)(ItemMap)狼荞。Value.Timestamp數(shù)據(jù)項(xiàng)時間戳,Value.Value數(shù)據(jù)項(xiàng)值帮碰,"OK"數(shù)據(jù)項(xiàng)狀態(tài)相味,fts數(shù)據(jù)項(xiàng)存儲時間。
    }

    return len(resp), nil
}

##### config.GetNdConfig 根據(jù)hostname返回NodataConfig配置
func GetNdConfig(key string) (*cmodel.NodataConfig, bool) {
    rwlock.RLock()
    defer rwlock.RUnlock()

    val, found := NdConfigMap.Get(key)//map操作
    if found && val != nil {
        return val.(*cmodel.NodataConfig), true
    }
    return &cmodel.NodataConfig{}, false
}

##### API組件POST請求實(shí)現(xiàn)函數(shù)
func queryLastPoints(param []*cmodel.GraphLastParam) (resp []*cmodel.GraphLastResp, err error) {
    cfg := g.Config()
    uri := fmt.Sprintf("%s/api/v1/graph/lastpoint", cfg.PlusApi.Addr) //接口定義

    var req *httplib.BeegoHttpRequest
    headers := map[string]string{"Content-type": "application/json"}
    req, err = requests.CurlPlus(uri, "POST", "nodata", cfg.PlusApi.Token, 
        headers, map[string]string{}) //Curl請求頭與方法

    if err != nil {
        return
    }
    req.SetTimeout(time.Duration(cfg.PlusApi.ConnectTimeout)*time.Millisecond,
        time.Duration(cfg.PlusApi.RequestTimeout)*time.Millisecond)

    b, err := json.Marshal(param)
    if err != nil {
        return
    }

    req.Body(b)   //請求體

    err = req.ToJson(&resp)   //請求執(zhí)行與response json
    if err != nil {
        return
    }

    return resp, nil
}

##### 緩存數(shù)據(jù)點(diǎn)
func AddItem(key string, val *DataItem) {
    listv, found := ItemMap.Get(key)
    if !found {
        ll := tlist.NewSafeListLimited(3) //每個采集指標(biāo),緩存最新的3個數(shù)據(jù)點(diǎn)
        ll.PushFrontViolently(val)
        ItemMap.Put(key, ll)
        return
    }

    listv.(*tlist.SafeListLimited).PushFrontViolently(val)
}


func NewDataItem(ts int64, val float64, fstatus string, fts int64) *DataItem {
    return &DataItem{Ts: ts, Value: val, FStatus: fstatus, FTs: fts}
}


type GraphLastResp struct {
        Endpoint    string        `json:"endpoint"`
        Counter     string        `json:"counter"`
        Value       *RRDData      `json:"value"`
}

judge.Start()

# Judge運(yùn)行入口函數(shù)
func Start() {
    StartJudgeCron()  //運(yùn)行調(diào)用
    log.Println("judge.Start ok")
}

# 定時任務(wù)Judge執(zhí)行函數(shù)
func StartJudgeCron() {
    judgeCron.AddFuncCC(judgeCronSpec, func() {
        start := time.Now().Unix()
        judge()    //執(zhí)行judge函數(shù)調(diào)用
        end := time.Now().Unix()
        if g.Config().Debug {
            log.Printf("judge cron, time %ds, start %s\n", end-start, ttime.FormatTs(start))
        }

        // 統(tǒng)計
        g.JudgeCronCnt.Incr()
        g.JudgeLastTs.SetCnt(end - start)

        // 觸發(fā)Mock列表數(shù)據(jù)發(fā)送
        sender.SendMockOnceAsync()  
    }, 1)
    judgeCron.Start()    //執(zhí)行Cron
}

## judge實(shí)現(xiàn)函數(shù)
func judge() {
    now := time.Now().Unix()
    keys := config.Keys()
    for _, key := range keys {
        ndcfg, found := config.GetNdConfig(key) //根據(jù)hostname返回nodata配置
        if !found { //策略不存在,不處理
            continue
        }
        step := ndcfg.Step
        mock := ndcfg.Mock

        item, found := collector.GetFirstItem(key) //根據(jù)hostname返回collector最近一次配置
        if !found { //沒有數(shù)據(jù),未開始采集,不處理
            continue
        }
        
        //3*step(or 180)超時時間
        lastTs := now - getTimeout(step) 
        if item.FStatus != "OK" || item.FTs < lastTs { //數(shù)據(jù)采集失敗,不處理
            continue
        }
        //采集到的數(shù)據(jù)為mock數(shù)據(jù),則認(rèn)為上報超時了
        if fCompare(mock, item.Value) == 0 { 
            //判斷采集到的最后一次數(shù)據(jù)項(xiàng)值timestamp+step(上報周期)
            //如果小于當(dāng)前時間則認(rèn)為已經(jīng)上報超時了殉挽,過了應(yīng)該上報的周期
            //如果大于當(dāng)前時間則表示還在有效的上報周期內(nèi)
            if LastTs(key)+step <= now {
                TurnNodata(key, now)  //設(shè)置nodata狀態(tài)
                genMock(genTs(now, step), key, ndcfg) //添加到Mock列表
            }
            continue
        }
          
        //判斷采集到的最后一次數(shù)據(jù)項(xiàng)值timestamp丰涉,如果小于
        //3*step(or 180)超時時間則認(rèn)為數(shù)據(jù)過期,則認(rèn)為上報超時
        if item.Ts < lastTs {
            if LastTs(key)+step <= now {
                TurnNodata(key, now)
                genMock(genTs(now, step), key, ndcfg)
            }
            continue
        }

        TurnOk(key, now)
    }
}

### 返回最后一次采集數(shù)據(jù)的timestamp
func LastTs(key string) int64 {
    statusLock.RLock()

    var ts int64 = 0
    v, found := StatusMap.Get(key)
    if !found {
        statusLock.RUnlock()
        return ts
    }

    ts = v.(*NodataStatus).Ts

    statusLock.RUnlock()
    return ts
}

// NoData Data Item Struct
type DataItem struct {
    Ts      int64    //timestamp
    Value   float64  
    FStatus string  // OK|ERR
    FTs     int64   //存儲時間float
}

// Nodata Status Struct
type NodataStatus struct {
    Key    string
    Status string // OK|NODATA
    Cnt    int
    Ts     int64
}


### 設(shè)置為Nodata狀態(tài)
func TurnNodata(key string, ts int64) {
    statusLock.Lock()

    v, found := StatusMap.Get(key)
    if !found {
        // create new status
        ns := NewNodataStatus(key, "NODATA", 1, ts)
        StatusMap.Put(key, ns)
        statusLock.Unlock()
        return
    }

    // update status
    ns := v.(*NodataStatus)
    ns.Status = "NODATA"
    ns.Cnt += 1
    ns.Ts = ts

    statusLock.Unlock()
    return
}

### 添加Item至Mock列表
func genMock(ts int64, key string, ndcfg *cmodel.NodataConfig) {
    sender.AddMock(key, ndcfg.Endpoint, ndcfg.Metric, cutils.SortedTags(ndcfg.Tags), ts, ndcfg.Type, ndcfg.Step, ndcfg.Mock)
}

func AddMock(key string, endpoint string, metric string, tags string, ts int64, dstype string, step int64, value interface{}) {
    item := &cmodel.JsonMetaData{metric, endpoint, ts, step, value, dstype, tags}
    MockMap.Put(key, item)    //put into map
}

sender.SendMockOnceAsync() 發(fā)送模擬數(shù)據(jù)

# 發(fā)送Mock數(shù)據(jù)入口函數(shù)
func SendMockOnceAsync() {
    go SendMockOnce()
}

## 并發(fā)發(fā)送和統(tǒng)計
func SendMockOnce() int {
    if !sema.TryAcquire() {
        return -1
    }
    defer sema.Release()

    // not enabled
    if !g.Config().Sender.Enabled {
        return 0
    }

    start := time.Now().Unix()
    cnt, _ := sendMock()   //調(diào)用發(fā)送功能模塊
    end := time.Now().Unix()
    if g.Config().Debug {
        log.Printf("sender cron, cnt %d, time %ds, start %s", cnt, end-start, ttime.FormatTs(start))
    }

    // 統(tǒng)計
    g.SenderCronCnt.Incr()
    g.SenderLastTs.SetCnt(end - start)
    g.SenderCnt.IncrBy(int64(cnt))

    return cnt
}

### 發(fā)送模擬數(shù)據(jù)
func sendMock() (cnt int, errt error) {

    cfg := g.Config().Sender  //全局配置加載
    batch := int(cfg.Batch)   //批量值
    connTimeout := cfg.ConnectTimeout //連接超時
    requTimeout := cfg.RequestTimeout  //API請求超時

    // 發(fā)送至transfer組件
    mocks := MockMap.Slice()  //獲取Mock列表
    MockMap.Clear()        //清空列表空間
    mockSize := len(mocks)
    i := 0
    for i < mockSize {
        leftLen := mockSize - i
        sendSize := batch
        if leftLen < sendSize {
            sendSize = leftLen
        }
        fetchMocks := mocks[i : i+sendSize]  //取一批量
        i += sendSize

        items := make([]*cmodel.JsonMetaData, 0)
        //整理為slice
        for _, val := range fetchMocks {
            if val == nil {
                continue
            }
            items = append(items, val.(*cmodel.JsonMetaData))
        }
        cntonce, err := sendItemsToTransfer(items, len(items), "nodata.mock",
            time.Millisecond*time.Duration(connTimeout),
            time.Millisecond*time.Duration(requTimeout)) //API POT調(diào)用斯碌,發(fā)送Mock至Transfer
        if err == nil {
            if g.Config().Debug {
                log.Println("send items:", items)
            }
            cnt += cntonce
        }
    }

    return cnt, nil
}

// API接口調(diào)用發(fā)送Mock至Transfer 
func sendItemsToTransfer(items []*cmodel.JsonMetaData, size int, httpcliname string,
    connT, reqT time.Duration) (cnt int, errt error) {
    if size < 1 {
        return
    }

    cfg := g.Config()
    transUlr := fmt.Sprintf("http://%s/api/push", cfg.Sender.TransferAddr) //請求接口API
    hcli := thttpclient.GetHttpClient(httpcliname, connT, reqT)

    // 請求體
    itemsBody, err := json.Marshal(items)
    if err != nil {
        log.Println(transUlr+", format body error,", err)
        errt = err
        return
    }

    // 構(gòu)造與執(zhí)行API
    req, err := http.NewRequest("POST", transUlr, bytes.NewBuffer(itemsBody))   
    req.Header.Set("Content-Type", "application/json; charset=UTF-8")    //請求內(nèi)容類型
    req.Header.Set("Connection", "close")  
    postResp, err := hcli.Do(req)  //執(zhí)行POST
    if err != nil {
        log.Println(transUlr+", post to dest error,", err)
        errt = err
        return
    }
    defer postResp.Body.Close()
    
    //響應(yīng)狀態(tài)200判斷
    if postResp.StatusCode/100 != 2 {
        log.Println(transUlr+", post to dest, bad response,", postResp.Body)
        errt = fmt.Errorf("request failed, %s", postResp.Body)
        return
    }

    return size, nil
}

http.Start() http API服務(wù)

func Start() {
    go startHttpServer()
}

func configRoutes() {
    configCommonRoutes()    //公共API路由一死,可參考Agent模塊
    configProcHttpRoutes()   //
    configDebugHttpRoutes()  //Debug API路由
}

func startHttpServer() {
    if !g.Config().Http.Enabled {
        return
    }

    addr := g.Config().Http.Listen
    if addr == "" {
        return
    }

    configRoutes()        //配置路由

    s := &http.Server{       //httpServer實(shí)例
        Addr:           addr,
        MaxHeaderBytes: 1 << 30,
    }

    log.Println("http.startHttpServer ok, listening", addr)
    log.Fatalln(s.ListenAndServe())  //監(jiān)聽與服務(wù)
}

## Proc統(tǒng)計API模塊
func configProcHttpRoutes() {
    // counters
    http.HandleFunc("/proc/counters", func(w http.ResponseWriter, r *http.Request) {
    })
    http.HandleFunc("/statistics/all", func(w http.ResponseWriter, r *http.Request) {
    })
    // judge.status, /proc/status/$endpoint/$metric/$tags-pairs
    http.HandleFunc("/proc/status/", func(w http.ResponseWriter, r *http.Request) {
    })
    // collector.last.item, /proc/collect/$endpoint/$metric/$tags-pairs
    http.HandleFunc("/proc/collect/", func(w http.ResponseWriter, r *http.Request) {
    })
    // config.mockcfg
    http.HandleFunc("/proc/config", func(w http.ResponseWriter, r *http.Request) {
    })
    // config.mockcfg /proc/config/$endpoint/$metric/$tags-pairs
    http.HandleFunc("/proc/config/", func(w http.ResponseWriter, r *http.Request) {
    })
    // config.hostgroup, /group/$grpname
    http.HandleFunc("/proc/group/", func(w http.ResponseWriter, r *http.Request) {
        urlParam := r.URL.Path[len("/proc/group/"):]
        RenderDataJson(w, service.GetHostsFromGroup(urlParam))
    })
}

## Debug API
func configDebugHttpRoutes() {
    http.HandleFunc("/debug/collector/collect", func(w http.ResponseWriter, r *http.Request) {
    })
    http.HandleFunc("/debug/config/sync", func(w http.ResponseWriter, r *http.Request) {
    })
    http.HandleFunc("/debug/sender/send", func(w http.ResponseWriter, r *http.Request) {
    })
}

思考與查證

  • nodata config、collector 傻唾、judge各模塊執(zhí)行周期是多少投慈?
  • Judge主要判斷Nodata主機(jī)的兩點(diǎn)要素是?

擴(kuò)展學(xué)習(xí)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末冠骄,一起剝皮案震驚了整個濱河市逛裤,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌猴抹,老刑警劉巖带族,帶你破解...
    沈念sama閱讀 211,639評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蟀给,居然都是意外死亡蝙砌,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,277評論 3 385
  • 文/潘曉璐 我一進(jìn)店門跋理,熙熙樓的掌柜王于貴愁眉苦臉地迎上來择克,“玉大人,你說我怎么就攤上這事前普《切希” “怎么了?”我有些...
    開封第一講書人閱讀 157,221評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長骡湖。 經(jīng)常有香客問我贱纠,道長,這世上最難降的妖魔是什么响蕴? 我笑而不...
    開封第一講書人閱讀 56,474評論 1 283
  • 正文 為了忘掉前任谆焊,我火速辦了婚禮,結(jié)果婚禮上浦夷,老公的妹妹穿的比我還像新娘辖试。我一直安慰自己,他們只是感情好劈狐,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,570評論 6 386
  • 文/花漫 我一把揭開白布罐孝。 她就那樣靜靜地躺著,像睡著了一般肥缔。 火紅的嫁衣襯著肌膚如雪莲兢。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,816評論 1 290
  • 那天辫继,我揣著相機(jī)與錄音怒见,去河邊找鬼。 笑死姑宽,一個胖子當(dāng)著我的面吹牛遣耍,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播炮车,決...
    沈念sama閱讀 38,957評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼舵变,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了瘦穆?” 一聲冷哼從身側(cè)響起纪隙,我...
    開封第一講書人閱讀 37,718評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎扛或,沒想到半個月后绵咱,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,176評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡熙兔,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,511評論 2 327
  • 正文 我和宋清朗相戀三年悲伶,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片住涉。...
    茶點(diǎn)故事閱讀 38,646評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡麸锉,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出舆声,到底是詐尸還是另有隱情花沉,我是刑警寧澤,帶...
    沈念sama閱讀 34,322評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站碱屁,受9級特大地震影響磷脯,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜忽媒,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,934評論 3 313
  • 文/蒙蒙 一争拐、第九天 我趴在偏房一處隱蔽的房頂上張望腋粥。 院中可真熱鬧晦雨,春花似錦、人聲如沸隘冲。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,755評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽展辞。三九已至奥邮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間罗珍,已是汗流浹背洽腺。 一陣腳步聲響...
    開封第一講書人閱讀 31,987評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留覆旱,地道東北人蘸朋。 一個月前我還...
    沈念sama閱讀 46,358評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像扣唱,于是被迫代替她去往敵國和親藕坯。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,514評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 國家電網(wǎng)公司企業(yè)標(biāo)準(zhǔn)(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報批稿:20170802 前言: 排版 ...
    庭說閱讀 10,930評論 6 13
  • 文章圖片上傳不正常噪沙,如需文檔炼彪,可聯(lián)系微信:1017429387 目錄 1 安裝... 4 1.1 配置探針... ...
    Mrhappy_a7eb閱讀 6,287評論 0 5
  • Open-falcon是小米運(yùn)維團(tuán)隊(duì)從互聯(lián)網(wǎng)公司的需求出發(fā),根據(jù)多年的運(yùn)維經(jīng)驗(yàn)正歼,結(jié)合SRE辐马、SA、DEVS的使用經(jīng)...
    猴子精h閱讀 5,134評論 1 5
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理局义,服務(wù)發(fā)現(xiàn)喜爷,斷路器,智...
    卡卡羅2017閱讀 134,633評論 18 139
  • 現(xiàn)在的表嫂已經(jīng)不是我的表嫂,因?yàn)楸砀缫呀?jīng)死去勾缭,謀害他的人正是他的媳婦揍障,我的表嫂。 十四年過去了俩由,清...
    久尾閱讀 7,910評論 4 3