go| go并發(fā)實戰(zhàn): 搭配 influxdb + grafana 高性能實時日志監(jiān)控系統(tǒng)

date: 2018-4-27 16:22:14
title: go| go并發(fā)實戰(zhàn): 搭配 influxdb + grafana 高性能實時日志監(jiān)控系統(tǒng)
description: go并發(fā)實戰(zhàn): 搭配 influxdb + grafana 高性能實時日志監(jiān)控系統(tǒng)

Go并發(fā)編程案例解析

繼續(xù)好玩的并發(fā)編程實戰(zhàn), 上一篇 go| 感受并發(fā)編程的樂趣 前篇.

實戰(zhàn)內容: 實時處理讀取/解析日志文件, 搭配 influxdb(時序數據庫) 存儲, grafana 展示, 并提供系統(tǒng)的簡單監(jiān)控.

0x00: 初始化, 面向過程編程

用面向過程的方式, 對問題進行簡單的梳理, 代碼如下:

package main

func main() {
    // read log file

    // process log

    // write data
}

這里并沒有寫具體的實現, 因為到這里, 我們就可以開始考慮 封裝

0x01: 過程封裝, 使用 LogPorcess 結構體

引入 LogProcess 結構體, 將整個任務 面向對象 化, 偽代碼如下:

package main

import (
    "fmt"
    "strings"
)

type LogProcess struct {
    path string // 日志文件路徑
    dsn string // influxdb dsn
}

func (lp *LogProcess) Read() {
    path := lp.path
    fmt.Println(path)
}

func (lp *LogProcess) Process() {
    log := "hello world"
    fmt.Println(strings.ToUpper(log))
}

func (lp *LogProcess) Write()  {
    dsn := lp.dsn
    fmt.Println(dsn)
}

func main() {
    lp := &LogProcess{
        path: "test path",
        dsn: "test dsn",
    }

    // read log file
    lp.Read()

    // process log
    lp.Process()

    // write data
    lp.Write()
}

0x02: 加上 go 和 chan, 并發(fā)就是如此簡單

加上 go 關鍵字, 輕松實現協程:

func main() {
    lp := &LogProcess{
        path: "test path",
        dsn: "test dsn",
    }

    // read log file
    go lp.Read()

    // process log
    go lp.Process()

    // write data
    go lp.Write()

    time.Sleep(time.Second) // 新手必知: 保證程序退出前, 協程可以執(zhí)行完
}

加上 chan, 輕松實現協程間通信:

type LogProcess struct {
    path string // 日志文件路徑
    dsn string // influxdb dsn
    rc chan string // read chan
    wc chan string // write chan
}

func (lp *LogProcess) Read() {
    path := lp.path
    fmt.Println(path)

    lp.rc <- "test data"
}

func (lp *LogProcess) Process() {
    log := <- lp.rc
    lp.wc <- strings.ToUpper(log)
}

func (lp *LogProcess) Write()  {
    dsn := lp.dsn
    fmt.Println(dsn)

    data := <- lp.wc
    fmt.Println(data)
}

0x03: 引入 interface, 方便以后擴展

現在是從 文件 讀取, 如果以后要從 其他數據源 讀取呢? 這個時候就可以用上接口:

type Reader interface {
    Read(rc chan string)
}

type ReadFromFile struct {
    path string
}

func (r *ReadFromFile) Read(rc chan string) {
    // read from file
}

同理, 數據寫入到 influxdb 也可以加入接口, 方便以后擴展.

0x04: 讀取文件的細節(jié)

實時讀取日志文件要怎么實現呢? 直接上代碼, 細節(jié)有很多, 注意 注釋:

  • 實時 讀取怎么實現: 從文件末尾開始讀取
  • 怎么一行一行的讀取日志: buf.ReadBytes('\n')
  • 輸出怎么多了換行呢: 截取掉最后的換行符 line[:len(line)-1]
func (r *ReadFromFile) Read(rc chan []byte) {
    f, err := os.Open(r.path)
    if err != nil {
        panic(err)
    }
    defer f.Close()

    f.Seek(0, 2) // 文件末尾
    buf := bufio.NewReader(f) // []byte 數據類型, rc chan 的類型也相應進行了修改

    for {
        line, err := buf.ReadBytes('\n')
        // todo: 處理日志切割, inode 變化的情況
        if err == io.EOF {
            time.Sleep(500 * time.Millisecond)
        } else if err != nil {
            panic(err)
        } else { // 需要寫到這里
            rc <- line[:len(line)-1]
        }
    }
}

還有一個需要優(yōu)化的地方, 一般日志文件都會采取 輪轉 策略(詳見上篇blog devops| 日志服務實踐), 文件可能更新了, 所以讀取文件時, 還需要加一個判斷.

0x05: 日志解析, 又見正則

日志的解析比較簡單, 按照日志的格式正則匹配即可:

// 使用結構體來記錄匹配到的日志數據
type Log struct {
    TimeLocal                    time.Time
    BytesSent                    int
    Path, Method, Scheme, Status string
    UpstreamTime, RequestTime    float64
}

func (l *LogProcess) Process() {
    // 正則
    re := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(
\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([d\.-]+)`)

    loc, _ := time.LoadLocation("PRC")
    for v := range l.rc {
        str := string(v)
        ret := re.FindStringSubmatch(str)
        if len(ret) != 14 {
            log.Println(str)
            continue
        }

        msg := &Log{}
        t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
        if err != nil {
            log.Println(ret[4])
        }
        msg.TimeLocal = t

        byteSent, _ := strconv.Atoi(ret[8])
        msg.BytesSent = byteSent

        // Get /for?query=t HTTP/1.0
        reqSli := strings.Split(ret[6], " ")
        if len(reqSli) != 3 {
            log.Println(ret[6])
            continue
        }
        msg.Method = reqSli[0]
        msg.Scheme = reqSli[2]
        // url parse
        u, err := url.Parse(reqSli[1])
        if err != nil {
            log.Println(reqSli[1])
            continue
        }
        msg.Path = u.Path
        msg.Status = ret[7]
        upTime, _ := strconv.ParseFloat(ret[12], 64)
        reqTime, _ := strconv.ParseFloat(ret[13], 64)
        msg.UpstreamTime = upTime
        msg.RequestTime = reqTime

        l.wc <- msg
    }
}

0x06: 上手 influxdb

influxdb 是時序數據庫的一種, 包含如下基礎概念:

  • database: 數據庫
  • measurement: 數據庫中的表
  • points: 表里的一行數據

其中 points 包含以下內容:

  • tags: 有索引的屬性
  • fields: 值
  • time: 時間戳, 也是自動生成的主索引

使用 docker 快速開啟 InfluxDb Server:

    influxdb:
        image: influxdb:1.4.3-alpine
        ports:
            - "8086:8086"
        #     - "8083:8083" # admin
        #     - "2003:2003" # graphite
        environment:
            INFLUXDB_DB: log
            INFLUXDB_USER: log
            INFLUXDB_USER_PASSWORD: logpass
        #     INFLUXDB_GRAPHITE_ENABLED: 1
        #     INFLUXDB_ADMIN_ENABLED: 1
        # volumes:
        #     - ./data/influxdb:/var/lib/influxdb

influxdb 使用 go 語言實現, 稍微修改一下官方文檔中示例, 就可以使用 client:

InfluxDB Client: https://github.com/influxdata/influxdb/tree/master/client

// 寫入也使用接口
type Writer interface {
    Write(wc chan *Log)
}

type WriteToInfluxdb struct {
    dsn string
}

// 只在官方示例代碼上做了一點修改
func (w *WriteToInfluxdb) Write(wc chan *Log) {
    // dsn 示例: http://localhost:8086@log@logpass@log@s
    dsnSli := strings.Split(w.dsn, "@")

    // Create a new HTTPClient
    c, err := client.NewHTTPClient(client.HTTPConfig{
        Addr:     dsnSli[0],
        Username: dsnSli[1],
        Password: dsnSli[2],
    })
    if err != nil {
        log.Fatal(err)
    }
    defer c.Close()

    // Create a new point batch
    bp, err := client.NewBatchPoints(client.BatchPointsConfig{
        Database:  dsnSli[3],
        Precision: dsnSli[4],
    })
    if err != nil {
        log.Fatal(err)
    }

    for v := range wc {
        // Create a point and add to batch
        tags := map[string]string{
            "Path": v.Path,
            "Method": v.Method,
            "Scheme": v.Scheme,
            "Status": v.Status,
        }
        fields := map[string]interface{}{
            "bytesSent":   v.BytesSent,
            "upstreamTime": v.UpstreamTime,
            "RequestTime":   v.RequestTime,
        }

        pt, err := client.NewPoint("log", tags, fields, v.TimeLocal)
        if err != nil {
            log.Fatal(err)
        }
        bp.AddPoint(pt)

        // Write the batch
        if err := c.Write(bp); err != nil {
            log.Fatal(err)
        }

        // Close client resources
        if err := c.Close(); err != nil {
            log.Fatal(err)
        }
    }
}

0x07: 使用 Grafana 接入 InfluxDB 數據源

Grafana 使用 docker 也可以輕松部署:

    grafana:
        image: grafana/grafana:5.1.0-beta1
        ports:
            - "3000:3000"
        environment:
            GF_SERVER_ROOT_URL: http://grafana.server.name
            GF_SECURITY_ADMIN_PASSWORD: secret

官網效果圖:

Grafana

0x08: 簡單監(jiān)控系統(tǒng)實現

作為一個 實時 系統(tǒng), 需要后臺常駐運行, 怎么查看系統(tǒng)的運行狀態(tài)的呢?

加入一個簡單的監(jiān)控系統(tǒng), 通過 http 請求查看系統(tǒng)實時運行狀態(tài):

// 需要監(jiān)控的系統(tǒng)狀態(tài)
type SystemInfo struct {
    LogLine int `json:"logline"` // 總日志處理數
    Tps float64 `json:"tps"`
    ReadChanLen int `json:"readchanlen"` // read chan 長度
    WriteChanLen int `json:"writechanlen"` // write chan 長度
    RunTime string `json:"runtime"` // 運行總時間
    ErrNum int `json:"errnum"` // 錯誤數
}

// 監(jiān)控類
type Monitor struct {
    startTime time.Time
    data SystemInfo
}

// 啟動監(jiān)控, 其實就是一個簡單的 http server
func (m *Monitor) start(lp *LogProcess) {
    http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
        m.data.RunTime = time.Now().Sub(m.startTime).String()
        m.data.ReadChanLen = len(lp.rc)
        m.data.WriteChanLen = len(lp.wc)

        ret, _ := json.MarshalIndent(m.data, "", "\t")

        io.WriteString(writer, string(ret))
    })

    http.ListenAndServe(":9091", nil)
}

func main() {
    ...

    // 運行監(jiān)控
    m := &Monitor{
        startTime: time.Now(),
        data: SystemInfo{},
    }
    m.start(l)
}

監(jiān)控數據中的 TPS 稍微有點難處理:

  • 啟動一個定時器, 比如 5s
  • 記錄下時間間隔內的 LogLine(日志處理行數)

這樣我們就可以用 LogLine 來估算系統(tǒng)的 TPS 了

0x09: 寫在最后

并發(fā)編程實戰(zhàn), 總會給人帶來 又完成了了不起的任務 的感覺, 特別是會了解更多的細節(jié).

能夠相遇, 也是一種快樂吧.

完整代碼: https://gitee.com/daydaygo/codes/sc6tyr2odf58k39npub0v72

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市寸五,隨后出現的幾起案子璃诀,更是在濱河造成了極大的恐慌闽烙,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異叶眉,居然都是意外死亡址儒,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進店門衅疙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來莲趣,“玉大人,你說我怎么就攤上這事饱溢⌒。” “怎么了?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵绩郎,是天一觀的道長潘鲫。 經常有香客問我,道長嗽上,這世上最難降的妖魔是什么次舌? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮兽愤,結果婚禮上彼念,老公的妹妹穿的比我還像新娘。我一直安慰自己浅萧,他們只是感情好逐沙,可當我...
    茶點故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著洼畅,像睡著了一般吩案。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上帝簇,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天徘郭,我揣著相機與錄音,去河邊找鬼丧肴。 笑死残揉,一個胖子當著我的面吹牛,可吹牛的內容都是我干的芋浮。 我是一名探鬼主播抱环,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼纸巷!你這毒婦竟也來了镇草?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤瘤旨,失蹤者是張志新(化名)和其女友劉穎梯啤,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體裆站,經...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡条辟,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年黔夭,在試婚紗的時候發(fā)現自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片羽嫡。...
    茶點故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡本姥,死狀恐怖,靈堂內的尸體忽然破棺而出杭棵,到底是詐尸還是另有隱情婚惫,我是刑警寧澤,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布魂爪,位于F島的核電站先舷,受9級特大地震影響,放射性物質發(fā)生泄漏滓侍。R本人自食惡果不足惜蒋川,卻給世界環(huán)境...
    茶點故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望撩笆。 院中可真熱鬧捺球,春花似錦、人聲如沸夕冲。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽歹鱼。三九已至泣栈,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間弥姻,已是汗流浹背南片。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留庭敦,地道東北人铃绒。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像螺捐,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子矮燎,可洞房花燭夜當晚...
    茶點故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內容