HBS (HeartBeat Server)版本
VERSION = "1.1.0"
HBS組件功能
連接后端MySQL數(shù)據(jù)庫,獲取數(shù)據(jù)庫數(shù)據(jù)及緩存至內(nèi)存(緩存配置)供agent和judge組件提供服務(wù)嘶是。
HBS組件邏輯圖
Main入口分析
func main() {
#命令參數(shù)配置(配置文件/版本)
cfg := flag.String("c", "cfg.json", "configuration file")
version := flag.Bool("v", false, "show version")
flag.Parse()
#版本顯示
if *version {
fmt.Println(g.VERSION)
os.Exit(0)
}
#配置文件解析
g.ParseConfig(*cfg) // 【參考詳細(xì)分析】
#初始化
db.Init() //Mysql DB對象初始化 【參考詳細(xì)分析】
cache.Init() //從DB獲取數(shù)據(jù)加載到內(nèi)存 【參考詳細(xì)分析】
#清理無心跳Agent
go cache.DeleteStaleAgents() // 【參考詳細(xì)分析】
#后臺線程岳悟,HTTP病毡、RPC服務(wù)監(jiān)聽與處理
go http.Start() // 【參考詳細(xì)分析】
go rpc.Start() // 【參考詳細(xì)分析】
#終止信號注冊與接收Chan,完成程序退出清理工作
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
fmt.Println()
db.DB.Close() //DB對象資源關(guān)閉
os.Exit(0)
}()
select {} //阻塞主進(jìn)程
}
g.ParseConfig(*cfg) 配置文件解析與格式化
# 解析指定的配置文件位置,默認(rèn)為同級目錄下"cfg.json"
func ParseConfig(cfg string) {
if cfg == "" {
log.Fatalln("use -c to specify configuration file")
}
//判斷文件是否存在
if !file.IsExist(cfg) {
log.Fatalln("config file:", cfg, "is not existent")
}
ConfigFile = cfg
//配置文件轉(zhuǎn)字符串
configContent, err := file.ToTrimString(cfg)
if err != nil {
log.Fatalln("read config file:", cfg, "fail:", err)
}
//Json反序列化為全局變量結(jié)構(gòu)體
var c GlobalConfig
err = json.Unmarshal([]byte(configContent), &c)
if err != nil {
log.Fatalln("parse config file:", cfg, "fail:", err)
}
configLock.Lock()
defer configLock.Unlock()
//附值私有全局變量config
config = &c
log.Println("read config file:", cfg, "successfully")
}
#公開方法獲取全局配置氯析,常用g.Config().XXX獲取某項配置
func Config() *GlobalConfig {
configLock.RLock()
defer configLock.RUnlock()
return config
}
#全局配置結(jié)構(gòu)定義
type GlobalConfig struct {
Debug bool `json:"debug"`
Hosts string `json:"hosts"`
Database string `json:"database"`
MaxConns int `json:"maxConns"`
MaxIdle int `json:"maxIdle"`
Listen string `json:"listen"`
Trustable []string `json:"trustable"`
Http *HttpConfig `json:"http"`
}
db.Init() 初始化MySQL連接對象
# 公開全局變量DB對象
var DB *sql.DB
# 引用"github.com/go-sql-driver/mysql"Driver庫
func Init() {
var err error
DB, err = sql.Open("mysql", g.Config().Database)
if err != nil {
log.Fatalln("open db fail:", err)
}
//最大打開連接數(shù)
DB.SetMaxOpenConns(g.Config().MaxConns)
//最大空閑連接數(shù)
DB.SetMaxIdleConns(g.Config().MaxIdle)
//DB健康檢測
err = DB.Ping()
if err != nil {
log.Fatalln("ping db fail:", err)
}
}
cache.Init() 從DB獲取數(shù)據(jù)加載到內(nèi)存
func Init() {
log.Println("cache begin")
log.Println("#1 GroupPlugins...")
GroupPlugins.Init() //從DB獲取插件目錄列表緩存內(nèi)存【參考詳細(xì)分析】
log.Println("#2 GroupTemplates...")
GroupTemplates.Init() //從DB獲取組策略模板緩存內(nèi)存【參考詳細(xì)分析】
log.Println("#3 HostGroupsMap...")
HostGroupsMap.Init() //從DB獲取主機與策略組對應(yīng)表信息緩存內(nèi)存 【參考詳細(xì)分析】
log.Println("#4 HostMap...")
HostMap.Init() //從DB獲取主機名和主機ID對應(yīng)表緩存內(nèi)存【參考詳細(xì)分析】
log.Println("#5 TemplateCache...")
TemplateCache.Init() //從DB獲取與緩存內(nèi)存所有策略模板列表 【參考詳細(xì)分析】
log.Println("#6 Strategies...")
Strategies.Init(TemplateCache.GetMap()) //從DB獲取所有Strategy列表與緩存 【參考詳細(xì)分析】
log.Println("#7 HostTemplateIds...")
HostTemplateIds.Init() //從DB獲取及緩存主機ID和模板列表對應(yīng)表信息 【參考詳細(xì)分析】
log.Println("#8 ExpressionCache...")
ExpressionCache.Init() //從DB獲取與緩存Expression表達(dá)式信息【參考詳細(xì)分析】
log.Println("#9 MonitoredHosts...")
MonitoredHosts.Init() //從DB獲取所有不處于維護(hù)狀態(tài)的主機列表緩存 【參考詳細(xì)分析】
log.Println("cache done")
go LoopInit() //后臺線程運行同步數(shù)據(jù)
}
# 每分鐘周期性同步數(shù)據(jù)
func LoopInit() {
for {
time.Sleep(time.Minute)
GroupPlugins.Init()
GroupTemplates.Init()
HostGroupsMap.Init()
HostMap.Init()
TemplateCache.Init()
Strategies.Init(TemplateCache.GetMap())
HostTemplateIds.Init()
ExpressionCache.Init()
MonitoredHosts.Init()
}
}
- GroupPlugins.Init() 從DB內(nèi)獲取插件目錄列表加載到內(nèi)存
#GroupPlugins 全局對象實例化,一個HostGroup可以綁定多個Plugin
#key: Groupid value:[]plugins
var GroupPlugins = &SafeGroupPlugins{M: make(map[int][]string)}
#從DB獲取插件目錄列表加載到內(nèi)存
func (this *SafeGroupPlugins) Init() {
m, err := db.QueryPlugins() //db查詢插件目錄條目
if err != nil {
return
}
this.Lock()
defer this.Unlock()
this.M = m
}
#db->plugin 查詢MySQL plugin_dir表中內(nèi)插件目錄記錄條目
func QueryPlugins() (map[int][]string, error) {
m := make(map[int][]string)
sql := "select grp_id, dir from plugin_dir" //執(zhí)行SQL語句
rows, err := DB.Query(sql)
if err != nil {
log.Println("ERROR:", err)
return m, err
}
defer rows.Close()
for rows.Next() { //迭代所有條目,組ID和目錄
var (
id int
dir string
)
err = rows.Scan(&id, &dir)
if err != nil {
log.Println("ERROR:", err)
continue
}
if _, exists := m[id]; exists {
m[id] = append(m[id], dir)
} else {
m[id] = []string{dir}
}
}
return m, nil
}
- GroupTemplates.Init() 從DB獲取組策略模板加載到內(nèi)存
#GroupTemplates 全局對象實例化嗡害,一個HostGroup對應(yīng)多個Template
#key: gid value:[]tid
var GroupTemplates = &SafeGroupTemplates{M: make(map[int][]int)}
#從DB獲取組策略模板加載到內(nèi)存
func (this *SafeGroupTemplates) Init() {
m, err := db.QueryGroupTemplates() //db查詢組策略模板
if err != nil {
return
}
this.Lock()
defer this.Unlock()
this.M = m
}
# db->template 從MySQL grp_tpl表中查詢DB組策略模板
func QueryGroupTemplates() (map[int][]int, error) {
m := make(map[int][]int)
sql := "select grp_id, tpl_id from grp_tpl" //DB查詢語句
rows, err := DB.Query(sql)
if err != nil {
log.Println("ERROR:", err)
return m, err
}
defer rows.Close()
//迭代獲取策略組ID和模板ID
for rows.Next() {
var gid, tid int
err = rows.Scan(&gid, &tid)
if err != nil {
log.Println("ERROR:", err)
continue
}
if _, exists := m[gid]; exists {
m[gid] = append(m[gid], tid)
} else {
m[gid] = []int{tid}
}
}
return m, nil
}
- HostGroupsMap.Init() 主機與策略組對應(yīng)表信息加載內(nèi)存
#HostGroupsMap全局對象實例化琢歇,一個機器可能在多個group下兰怠,做一個map緩存hostid與groupid的對應(yīng)關(guān)系
#key: hid value:[]gid
var HostGroupsMap = &SafeHostGroupsMap{M: make(map[int][]int)}
#從DB獲取主機與組信息加載到內(nèi)存
func (this *SafeHostGroupsMap) Init() {
m, err := db.QueryHostGroups() //db查詢主機與組信息
if err != nil {
return
}
this.Lock()
defer this.Unlock()
this.M = m
}
#db->group 從MySQ Lgrp_host表中查詢主機與組信息
func QueryHostGroups() (map[int][]int, error) {
m := make(map[int][]int)
sql := "select grp_id, host_id from grp_host" //DB查詢語句
rows, err := DB.Query(sql)
if err != nil {
log.Println("ERROR:", err)
return m, err
}
defer rows.Close()
//迭代獲取主機ID梦鉴、策略組ID信息列表
for rows.Next() {
var gid, hid int
err = rows.Scan(&gid, &hid)
if err != nil {
log.Println("ERROR:", err)
continue
}
if _, exists := m[hid]; exists {
m[hid] = append(m[hid], gid)
} else {
m[hid] = []int{gid}
}
}
return m, nil
}
- HostMap.Init() 主機名和主機ID對應(yīng)表加載到內(nèi)存
#HostMap全局對象實例化
#每次心跳的時候agent把hostname匯報上來,經(jīng)常要知道這個機器的hostid痕慢,把此信息緩存
#key: hostname value: hostid
var HostMap = &SafeHostMap{M: make(map[string]int)}
#從DB獲取主機名與hid信息緩存內(nèi)存
func (this *SafeHostMap) Init() {
m, err := db.QueryHosts() //db查詢主機名與hid信息
if err != nil {
return
}
this.Lock()
defer this.Unlock()
this.M = m
}
#db->host 從MySQ host表中查詢主機名與hid條目
func QueryHosts() (map[string]int, error) {
m := make(map[string]int)
sql := "select id, hostname from host" //db查詢語句
rows, err := DB.Query(sql)
if err != nil {
log.Println("ERROR:", err)
return m, err
}
defer rows.Close()
//迭代獲取主機名尚揣、hid信息
for rows.Next() {
var (
id int
hostname string
)
err = rows.Scan(&id, &hostname)
if err != nil {
log.Println("ERROR:", err)
continue
}
m[hostname] = id
}
return m, nil
}
- TemplateCache.Init() 獲取與緩存所有策略模板列表
# TemplateCache全局對象實例化,所有templates列表
# key: tid value: template
var TemplateCache = &SafeTemplateCache{M: make(map[int]*model.Template)}
# 獲取與緩存所有策略模板列表
func (this *SafeTemplateCache) Init() {
ts, err := db.QueryTemplates() //獲取與緩存
if err != nil {
return
}
this.Lock()
defer this.Unlock()
this.M = ts
}
#db->template 從DB tpl表獲取所有的策略模板信息并緩存內(nèi)存
func QueryTemplates() (map[int]*model.Template, error) {
templates := make(map[int]*model.Template)
sql := "select id, tpl_name, parent_id, action_id, create_user from tpl" // db查詢語句
rows, err := DB.Query(sql)
if err != nil {
log.Println("ERROR:", err)
return templates, err
}
defer rows.Close()
//迭代查詢與緩存策略模板列表
for rows.Next() {
t := model.Template{}
err = rows.Scan(&t.Id, &t.Name, &t.ParentId, &t.ActionId, &t.Creator)
if err != nil {
log.Println("ERROR:", err)
continue
}
templates[t.Id] = &t
}
return templates, nil
}
- Strategies.Init(TemplateCache.GetMap()) 獲取所有Strategy列表與緩存
#Strategies全局對象實例化
#Strategy和Expression在功能上有類似的地方掖举,都是對于某些主機的某些采集指標(biāo)進(jìn)
#行判斷快骗,符合條件就執(zhí)行相應(yīng)的動作。不過塔次,Strategy需要依附于模(Teamplate)
#存在方篮,即模板是一個Strategy組,包含多個Strategy励负,模板可以與主機進(jìn)行映射藕溅,也
#可以和主機組進(jìn)行映射。另外继榆,模板和模板之前也可以有繼承關(guān)系巾表。比如,模板T1包含
#判斷CPU異常Strategy略吨,模板T2包含判斷網(wǎng)絡(luò)異常Strategy集币,模板T3集成了T1和
#T2,這樣T3就包含了T1和T2的所有異常判斷Strategy翠忠。 而Expression則存在去全
#局的范圍鞠苟,也不需要劃分為組的形式
var Strategies = &SafeStrategies{M: make(map[int]*model.Strategy)}
func (this *SafeStrategies) Init(tpls map[int]*model.Template) {
m, err := db.QueryStrategies(tpls) //獲取與緩存
if err != nil {
return
}
this.Lock()
defer this.Unlock()
this.M = m
}
#獲取所有的Strategy列表
func QueryStrategies(tpls map[int]*model.Template) (map[int]*model.Strategy, error) {
ret := make(map[int]*model.Strategy)
if tpls == nil || len(tpls) == 0 {
return ret, fmt.Errorf("illegal argument")
}
now := time.Now().Format("15:04")
sql := fmt.Sprintf(
"select %s from strategy as s where (s.run_begin='' and s.run_end='') "+
"or (s.run_begin <= '%s' and s.run_end >= '%s')"+
"or (s.run_begin > s.run_end and !(s.run_begin > '%s' and s.run_end < '%s'))",
"s.id, s.metric, s.tags, s.func, s.op, s.right_value, s.max_step, s.priority, s.note, s.tpl_id",
now,
now,
now,
now,
) //db查詢語句
rows, err := DB.Query(sql)
if err != nil {
log.Println("ERROR:", err)
return ret, err
}
defer rows.Close()
//迭代Strategy條目
for rows.Next() {
s := model.Strategy{}
var tags string
var tid int
err = rows.Scan(&s.Id, &s.Metric, &tags, &s.Func, &s.Operator, &s.RightValue, &s.MaxStep, &s.Priority, &s.Note, &tid)
if err != nil {
log.Println("ERROR:", err)
continue
}
tt := make(map[string]string)
if tags != "" {
arr := strings.Split(tags, ",")
for _, tag := range arr {
kv := strings.SplitN(tag, "=", 2)
if len(kv) != 2 {
continue
}
tt[kv[0]] = kv[1]
}
}
s.Tags = tt
s.Tpl = tpls[tid]
if s.Tpl == nil {
log.Printf("WARN: tpl is nil. strategy id=%d, tpl id=%d", s.Id, tid)
// 如果Strategy沒有對應(yīng)的Tpl,那就沒有action秽之,就沒法報警当娱,無需往后傳遞了
continue
}
ret[s.Id] = &s
}
return ret, nil
}
type Strategy struct {
Id int `json:"id"`
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
Func string `json:"func"` // e.g. max(#3) all(#3)
Operator string `json:"operator"` // e.g. < !=
RightValue float64 `json:"rightValue"` // critical value
MaxStep int `json:"maxStep"`
Priority int `json:"priority"`
Note string `json:"note"`
Tpl *Template `json:"tpl"`
}
- HostTemplateIds.Init() 查詢及緩存主機ID和模板列表對應(yīng)表信息
# HostTemplateIds全局對象實例化
# 一個機器ID對應(yīng)了多個模板ID
# key: hid value:[]tid
var HostTemplateIds = &SafeHostTemplateIds{M: make(map[int][]int)}
#
func (this *SafeHostTemplateIds) Init() {
m, err := db.QueryHostTemplateIds() //
if err != nil {
return
}
this.Lock()
defer this.Unlock()
this.M = m
}
#db->template 查詢及緩存主機ID和模板對應(yīng)表信息
func QueryHostTemplateIds() (map[int][]int, error) {
ret := make(map[int][]int)
rows, err := DB.Query("select a.tpl_id, b.host_id from grp_tpl as a inner join grp_host as b on a.grp_id=b.grp_id") //db查詢語句
if err != nil {
log.Println("ERROR:", err)
return ret, err
}
defer rows.Close()
//迭代查詢及緩存主機ID和模板對應(yīng)表信息
for rows.Next() {
var tid, hid int
err = rows.Scan(&tid, &hid)
if err != nil {
log.Println("ERROR:", err)
continue
}
if _, ok := ret[hid]; ok {
ret[hid] = append(ret[hid], tid)
} else {
ret[hid] = []int{tid}
}
}
return ret, nil
}
- ExpressionCache.Init() 獲取與緩存Expression表達(dá)式信息
# ExpressionCache全局對象實例化,緩存所有正常的Expression
#
var ExpressionCache = &SafeExpressionCache{}
# 查詢與緩存所有正常的Expression
func (this *SafeExpressionCache) Init() {
es, err := db.QueryExpressions() //db查詢與緩存
if err != nil {
return
}
this.Lock()
defer this.Unlock()
this.L = es
}
# db->express 從MySQL expression表查詢所有正常的expression信息并緩存內(nèi)存
func QueryExpressions() (ret []*model.Expression, err error) {
sql := "select id, expression, func, op, right_value, max_step, priority, note, action_id from expression where action_id>0 and pause=0"
rows, err := DB.Query(sql)
if err != nil {
log.Println("ERROR:", err)
return ret, err
}
defer rows.Close()
for rows.Next() {
e := model.Expression{}
var exp string
err = rows.Scan(
&e.Id,
&exp, //expression->進(jìn)一步解析出Metric,Tags
&e.Func,
&e.Operator,
&e.RightValue,
&e.MaxStep,
&e.Priority,
&e.Note,
&e.ActionId,
)
if err != nil {
log.Println("WARN:", err)
continue
}
//解析expression內(nèi)的Metric和Tags值
e.Metric, e.Tags, err = parseExpression(exp)
if err != nil {
log.Println("ERROR:", err)
continue
}
ret = append(ret, &e)
}
return ret, nil
}
#解析Expression
func parseExpression(exp string) (metric string, tags map[string]string, err error) {
//獲取字串"()"內(nèi)字符
left := strings.Index(exp, "(")
right := strings.Index(exp, ")")
tagStrs := strings.TrimSpace(exp[left+1 : right])
//將tags以空格分隔截為Slice
arr := strings.Fields(tagStrs)
if len(arr) < 2 {
err = fmt.Errorf("tag not enough. exp: %s", exp)
return
}
tags = make(map[string]string)
//迭代slice考榨,將每個Item再以"="為分隔成[2]Slice再解析為Map
for _, item := range arr {
kv := strings.Split(item, "=")
if len(kv) != 2 {
err = fmt.Errorf("parse %s fail", exp)
return
}
tags[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
}
//獲取Key:"metric"的值
metric, exists := tags["metric"]
if !exists {
err = fmt.Errorf("no metric give of %s", exp)
return
}
delete(tags, "metric")
return
}
type Expression struct {
Id int `json:"id"`
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
Func string `json:"func"` // e.g. max(#3) all(#3)
Operator string `json:"operator"` // e.g. < !=
RightValue float64 `json:"rightValue"` // critical value
MaxStep int `json:"maxStep"`
Priority int `json:"priority"`
Note string `json:"note"`
ActionId int `json:"actionId"`
}
- MonitoredHosts.Init() 所有不處于維護(hù)狀態(tài)的主機列表緩存
#MonitoredHosts全局對象實例化跨细,所有不處于維護(hù)狀態(tài)的主機列表
#key: hid value: Host
var MonitoredHosts = &SafeMonitoredHosts{M: make(map[int]*model.Host)}
#所有不處于維護(hù)狀態(tài)的主機列表緩存
func (this *SafeMonitoredHosts) Init() {
m, err := db.QueryMonitoredHosts() //查詢所有不處于維護(hù)狀態(tài)的主機列表
if err != nil {
return
}
this.Lock()
defer this.Unlock()
this.M = m
}
#db->host 從MySQL host表獲取所有不處于維護(hù)狀態(tài)的主機列表加載
func QueryMonitoredHosts() (map[int]*model.Host, error) {
hosts := make(map[int]*model.Host)
now := time.Now().Unix()
sql := fmt.Sprintf("select id, hostname from host where maintain_begin > %d or maintain_end < %d", now, now) //db查詢語句
rows, err := DB.Query(sql)
if err != nil {
log.Println("ERROR:", err)
return hosts, err
}
defer rows.Close()
//迭代獲取hid,hostname
for rows.Next() {
t := model.Host{}
err = rows.Scan(&t.Id, &t.Name)
if err != nil {
log.Println("WARN:", err)
continue
}
hosts[t.Id] = &t
}
return hosts, nil
}
cache.DeleteStaleAgents() 清理無心跳Agent
func deleteStaleAgents() {
// 一天都沒有心跳的Agent,從內(nèi)存中干掉
before := time.Now().Unix() - 3600*24
keys := Agents.Keys()
count := len(keys)
if count == 0 {
return
}
for i := 0; i < count; i++ {
curr, _ := Agents.Get(keys[i])
if curr.LastUpdate < before {
// 從Agent列表Map M中刪除
Agents.Delete(curr.ReportRequest.Hostname)
}
}
}
http.Start() HTTP API接口服務(wù)運行與監(jiān)聽處理
# 初始化路由
func init() {
configCommonRoutes() // 公共API接口路由
configProcRoutes() //
}
# 運行Http Server
func Start() {
if !g.Config().Http.Enabled {
return
}
addr := g.Config().Http.Listen
if addr == "" {
return
}
s := &http.Server{
Addr: addr,
MaxHeaderBytes: 1 << 30, //限制最大頭字節(jié)(2^30)
}
log.Println("http listening", addr)
log.Fatalln(s.ListenAndServe())
}
# 公共API接口路由/health河质、/version扼鞋、/workdir、/config/reload
func configCommonRoutes() {
//健康檢測
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
})
//版本
http.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(g.VERSION))
})
//工作目錄
http.HandleFunc("/workdir", func(w http.ResponseWriter, r *http.Request) {
RenderDataJson(w, file.SelfDir())
})
//重載配置
http.HandleFunc("/config/reload", func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.RemoteAddr, "127.0.0.1") {
g.ParseConfig(g.ConfigFile)
RenderDataJson(w, g.Config())
} else {
w.Write([]byte("no privilege"))
}
})
}
# 配置緩存信息API接口路由/expressions愤诱、/agents云头、/hosts、/strategies
# /templates淫半、/plugins/
func configProcRoutes() {
//expression緩存信息獲取
http.HandleFunc("/expressions", func(w http.ResponseWriter, r *http.Request) {
RenderDataJson(w, cache.ExpressionCache.Get())
})
//agents主機名列表緩存信息獲取
http.HandleFunc("/agents", func(w http.ResponseWriter, r *http.Request) {
RenderDataJson(w, cache.Agents.Keys())
})
//主機名與hid對應(yīng)表緩存信息獲取
http.HandleFunc("/hosts", func(w http.ResponseWriter, r *http.Request) {
data := make(map[string]*model.Host, len(cache.MonitoredHosts.Get()))
for k, v := range cache.MonitoredHosts.Get() {
data[fmt.Sprint(k)] = v
}
RenderDataJson(w, data)
})
//所有strategie緩存信息獲取
http.HandleFunc("/strategies", func(w http.ResponseWriter, r *http.Request) {
data := make(map[string]*model.Strategy, len(cache.Strategies.GetMap()))
for k, v := range cache.Strategies.GetMap() {
data[fmt.Sprint(k)] = v
}
RenderDataJson(w, data)
})
//所有template緩存信息獲取
http.HandleFunc("/templates", func(w http.ResponseWriter, r *http.Request) {
data := make(map[string]*model.Template, len(cache.TemplateCache.GetMap()))
for k, v := range cache.TemplateCache.GetMap() {
data[fmt.Sprint(k)] = v
}
RenderDataJson(w, data)
})
//"/plugins/XXXXX" 根據(jù)hostname獲取關(guān)聯(lián)的插件
http.HandleFunc("/plugins/", func(w http.ResponseWriter, r *http.Request) {
hostname := r.URL.Path[len("/plugins/"):]
RenderDataJson(w, cache.GetPlugins(hostname))
})
}
rpc.Start() 啟動RPC服務(wù)及監(jiān)聽處理
# RPC服務(wù)
type Hbs int
type Agent int
func Start() {
addr := g.Config().Listen
server := rpc.NewServer()
//注冊Agent/Hbs模塊
server.Register(new(Agent))
server.Register(new(Hbs))
l, e := net.Listen("tcp", addr)
if e != nil {
log.Fatalln("listen error:", e)
} else {
log.Println("listening", addr)
}
for {
conn, err := l.Accept()
if err != nil {
log.Println("listener accept fail:", err)
time.Sleep(time.Duration(100) * time.Millisecond)
continue
}
go server.ServeCodec(jsonrpc.NewServerCodec(conn))
}
}
//Hbs.GetExpressions 返回全局緩存的expression表達(dá)式
func (t *Hbs) GetExpressions(req model.NullRpcRequest, reply *model.ExpressionResponse) error {}
//Hbs.GetStrategies 返回緩存的主機與策略映射列表
func (t *Hbs) GetStrategies(req model.NullRpcRequest, reply *model.StrategiesResponse) error {}
//Agent.MinePlugins 返回指定主機名對應(yīng)的插件列表
func (t *Agent) MinePlugins(args model.AgentHeartbeatRequest, reply *model.AgentPluginsResponse) error {}
//Agent.ReportStatus 報告agent狀態(tài) "1 or nil"
func (t *Agent) ReportStatus(args *model.AgentReportRequest, reply *model.SimpleRpcResponse) error {}
//Agent.TrustableIps 返回可信IP列表
func (t *Agent) TrustableIps(args *model.NullRpcRequest, ips *string) error {}
//Agent.BuiltinMetrics agent按照server端的配置溃槐,按需采集的metric,比如net.port.listen port=22 或者 proc.num name=zabbix_agentd
func (t *Agent) BuiltinMetrics(args *model.AgentHeartbeatRequest, reply *model.BuiltinMetricResponse) error {}
思考與查證
- 查看源碼科吭,找出多長時間從DB刷新一次配置項緩存至內(nèi)存昏滴?
- 如修改了cfg.json配置文件猴鲫,除重啟服務(wù)外還有其它方法重新加載配置嗎?
- 詳細(xì)分析RPC "Hbs.GetStrategies"內(nèi)CalcInheritStrategies方法谣殊。