date: 2018-4-27 16:22:14
title: go| go并發(fā)實戰(zhàn): 搭配 influxdb + grafana 高性能實時日志監(jiān)控系統(tǒng)
description: go并發(fā)實戰(zhàn): 搭配 influxdb + grafana 高性能實時日志監(jiān)控系統(tǒng)
繼續(xù)好玩的并發(fā)編程實戰(zhàn), 上一篇 go| 感受并發(fā)編程的樂趣 前篇.
實戰(zhàn)內容: 實時處理讀取/解析日志文件, 搭配 influxdb(時序數據庫) 存儲, grafana 展示, 并提供系統(tǒng)的簡單監(jiān)控.
0x00: 初始化, 面向過程編程
用面向過程的方式, 對問題進行簡單的梳理, 代碼如下:
package main
func main() {
// read log file
// process log
// write data
}
這里并沒有寫具體的實現, 因為到這里, 我們就可以開始考慮 封裝 了
0x01: 過程封裝, 使用 LogPorcess 結構體
引入 LogProcess 結構體, 將整個任務 面向對象 化, 偽代碼如下:
package main
import (
"fmt"
"strings"
)
type LogProcess struct {
path string // 日志文件路徑
dsn string // influxdb dsn
}
func (lp *LogProcess) Read() {
path := lp.path
fmt.Println(path)
}
func (lp *LogProcess) Process() {
log := "hello world"
fmt.Println(strings.ToUpper(log))
}
func (lp *LogProcess) Write() {
dsn := lp.dsn
fmt.Println(dsn)
}
func main() {
lp := &LogProcess{
path: "test path",
dsn: "test dsn",
}
// read log file
lp.Read()
// process log
lp.Process()
// write data
lp.Write()
}
0x02: 加上 go 和 chan, 并發(fā)就是如此簡單
加上 go 關鍵字, 輕松實現協程:
func main() {
lp := &LogProcess{
path: "test path",
dsn: "test dsn",
}
// read log file
go lp.Read()
// process log
go lp.Process()
// write data
go lp.Write()
time.Sleep(time.Second) // 新手必知: 保證程序退出前, 協程可以執(zhí)行完
}
加上 chan, 輕松實現協程間通信:
type LogProcess struct {
path string // 日志文件路徑
dsn string // influxdb dsn
rc chan string // read chan
wc chan string // write chan
}
func (lp *LogProcess) Read() {
path := lp.path
fmt.Println(path)
lp.rc <- "test data"
}
func (lp *LogProcess) Process() {
log := <- lp.rc
lp.wc <- strings.ToUpper(log)
}
func (lp *LogProcess) Write() {
dsn := lp.dsn
fmt.Println(dsn)
data := <- lp.wc
fmt.Println(data)
}
0x03: 引入 interface, 方便以后擴展
現在是從 文件 讀取, 如果以后要從 其他數據源 讀取呢? 這個時候就可以用上接口:
type Reader interface {
Read(rc chan string)
}
type ReadFromFile struct {
path string
}
func (r *ReadFromFile) Read(rc chan string) {
// read from file
}
同理, 數據寫入到 influxdb 也可以加入接口, 方便以后擴展.
0x04: 讀取文件的細節(jié)
實時讀取日志文件要怎么實現呢? 直接上代碼, 細節(jié)有很多, 注意 注釋:
- 實時 讀取怎么實現: 從文件末尾開始讀取
- 怎么一行一行的讀取日志:
buf.ReadBytes('\n')
- 輸出怎么多了換行呢: 截取掉最后的換行符
line[:len(line)-1]
func (r *ReadFromFile) Read(rc chan []byte) {
f, err := os.Open(r.path)
if err != nil {
panic(err)
}
defer f.Close()
f.Seek(0, 2) // 文件末尾
buf := bufio.NewReader(f) // []byte 數據類型, rc chan 的類型也相應進行了修改
for {
line, err := buf.ReadBytes('\n')
// todo: 處理日志切割, inode 變化的情況
if err == io.EOF {
time.Sleep(500 * time.Millisecond)
} else if err != nil {
panic(err)
} else { // 需要寫到這里
rc <- line[:len(line)-1]
}
}
}
還有一個需要優(yōu)化的地方, 一般日志文件都會采取 輪轉 策略(詳見上篇blog devops| 日志服務實踐), 文件可能更新了, 所以讀取文件時, 還需要加一個判斷.
0x05: 日志解析, 又見正則
日志的解析比較簡單, 按照日志的格式正則匹配即可:
// 使用結構體來記錄匹配到的日志數據
type Log struct {
TimeLocal time.Time
BytesSent int
Path, Method, Scheme, Status string
UpstreamTime, RequestTime float64
}
func (l *LogProcess) Process() {
// 正則
re := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(
\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([d\.-]+)`)
loc, _ := time.LoadLocation("PRC")
for v := range l.rc {
str := string(v)
ret := re.FindStringSubmatch(str)
if len(ret) != 14 {
log.Println(str)
continue
}
msg := &Log{}
t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
if err != nil {
log.Println(ret[4])
}
msg.TimeLocal = t
byteSent, _ := strconv.Atoi(ret[8])
msg.BytesSent = byteSent
// Get /for?query=t HTTP/1.0
reqSli := strings.Split(ret[6], " ")
if len(reqSli) != 3 {
log.Println(ret[6])
continue
}
msg.Method = reqSli[0]
msg.Scheme = reqSli[2]
// url parse
u, err := url.Parse(reqSli[1])
if err != nil {
log.Println(reqSli[1])
continue
}
msg.Path = u.Path
msg.Status = ret[7]
upTime, _ := strconv.ParseFloat(ret[12], 64)
reqTime, _ := strconv.ParseFloat(ret[13], 64)
msg.UpstreamTime = upTime
msg.RequestTime = reqTime
l.wc <- msg
}
}
0x06: 上手 influxdb
influxdb 是時序數據庫的一種, 包含如下基礎概念:
- database: 數據庫
- measurement: 數據庫中的表
- points: 表里的一行數據
其中 points 包含以下內容:
- tags: 有索引的屬性
- fields: 值
- time: 時間戳, 也是自動生成的主索引
使用 docker 快速開啟 InfluxDb Server:
influxdb:
image: influxdb:1.4.3-alpine
ports:
- "8086:8086"
# - "8083:8083" # admin
# - "2003:2003" # graphite
environment:
INFLUXDB_DB: log
INFLUXDB_USER: log
INFLUXDB_USER_PASSWORD: logpass
# INFLUXDB_GRAPHITE_ENABLED: 1
# INFLUXDB_ADMIN_ENABLED: 1
# volumes:
# - ./data/influxdb:/var/lib/influxdb
influxdb 使用 go 語言實現, 稍微修改一下官方文檔中示例, 就可以使用 client:
InfluxDB Client: https://github.com/influxdata/influxdb/tree/master/client
// 寫入也使用接口
type Writer interface {
Write(wc chan *Log)
}
type WriteToInfluxdb struct {
dsn string
}
// 只在官方示例代碼上做了一點修改
func (w *WriteToInfluxdb) Write(wc chan *Log) {
// dsn 示例: http://localhost:8086@log@logpass@log@s
dsnSli := strings.Split(w.dsn, "@")
// Create a new HTTPClient
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: dsnSli[0],
Username: dsnSli[1],
Password: dsnSli[2],
})
if err != nil {
log.Fatal(err)
}
defer c.Close()
// Create a new point batch
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: dsnSli[3],
Precision: dsnSli[4],
})
if err != nil {
log.Fatal(err)
}
for v := range wc {
// Create a point and add to batch
tags := map[string]string{
"Path": v.Path,
"Method": v.Method,
"Scheme": v.Scheme,
"Status": v.Status,
}
fields := map[string]interface{}{
"bytesSent": v.BytesSent,
"upstreamTime": v.UpstreamTime,
"RequestTime": v.RequestTime,
}
pt, err := client.NewPoint("log", tags, fields, v.TimeLocal)
if err != nil {
log.Fatal(err)
}
bp.AddPoint(pt)
// Write the batch
if err := c.Write(bp); err != nil {
log.Fatal(err)
}
// Close client resources
if err := c.Close(); err != nil {
log.Fatal(err)
}
}
}
0x07: 使用 Grafana 接入 InfluxDB 數據源
Grafana 使用 docker 也可以輕松部署:
grafana:
image: grafana/grafana:5.1.0-beta1
ports:
- "3000:3000"
environment:
GF_SERVER_ROOT_URL: http://grafana.server.name
GF_SECURITY_ADMIN_PASSWORD: secret
官網效果圖:
0x08: 簡單監(jiān)控系統(tǒng)實現
作為一個 實時 系統(tǒng), 需要后臺常駐運行, 怎么查看系統(tǒng)的運行狀態(tài)的呢?
加入一個簡單的監(jiān)控系統(tǒng), 通過 http 請求查看系統(tǒng)實時運行狀態(tài):
// 需要監(jiān)控的系統(tǒng)狀態(tài)
type SystemInfo struct {
LogLine int `json:"logline"` // 總日志處理數
Tps float64 `json:"tps"`
ReadChanLen int `json:"readchanlen"` // read chan 長度
WriteChanLen int `json:"writechanlen"` // write chan 長度
RunTime string `json:"runtime"` // 運行總時間
ErrNum int `json:"errnum"` // 錯誤數
}
// 監(jiān)控類
type Monitor struct {
startTime time.Time
data SystemInfo
}
// 啟動監(jiān)控, 其實就是一個簡單的 http server
func (m *Monitor) start(lp *LogProcess) {
http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
m.data.RunTime = time.Now().Sub(m.startTime).String()
m.data.ReadChanLen = len(lp.rc)
m.data.WriteChanLen = len(lp.wc)
ret, _ := json.MarshalIndent(m.data, "", "\t")
io.WriteString(writer, string(ret))
})
http.ListenAndServe(":9091", nil)
}
func main() {
...
// 運行監(jiān)控
m := &Monitor{
startTime: time.Now(),
data: SystemInfo{},
}
m.start(l)
}
監(jiān)控數據中的 TPS 稍微有點難處理:
- 啟動一個定時器, 比如 5s
- 記錄下時間間隔內的
LogLine
(日志處理行數)
這樣我們就可以用 LogLine
來估算系統(tǒng)的 TPS 了
0x09: 寫在最后
并發(fā)編程實戰(zhàn), 總會給人帶來 又完成了了不起的任務 的感覺, 特別是會了解更多的細節(jié).
能夠相遇, 也是一種快樂吧.
完整代碼: https://gitee.com/daydaygo/codes/sc6tyr2odf58k39npub0v72