問題
需求是統(tǒng)計平臺內(nèi)包括用戶, 訂單, 網(wǎng)站訪問量等某時間段的各種數(shù)據(jù), 第一想法是使用更熟悉的Mysql做數(shù)據(jù)統(tǒng)計, 那么有什么問題呢?
之前使用過的設計方案有兩種
1. 從源表查詢, 聚合使用group和count
優(yōu)點: 簡單
缺點:
- 速度慢, 由于數(shù)據(jù)庫存儲的時間是時間戳, 在group的時候還需要轉化為正常的字符串時間才能按天, 月, 小時聚合.
- 性能損耗高, 可能會影響到線上業(yè)務.
2. 建立聚合表, 按天, 月, 小時
優(yōu)點: 查詢速度快
缺點:
- 邏輯麻煩, 難得開發(fā).
- 數(shù)據(jù)量大, 往往造成統(tǒng)計數(shù)據(jù)比源數(shù)據(jù)還多的尷尬局面.
選擇
有沒有什么方案能解決速度慢 數(shù)據(jù)量大的問題呢?
HBase? 太重, 聽說是搞大數(shù)據(jù)的, 我們小公司, 沒必要了.
想到在之前有用過Graylog就是使用Elasticsearch做日志存儲和統(tǒng)計, 那么Elasticsearch能不能做我想要的數(shù)據(jù)統(tǒng)計呢? 肯定是可以的.
同時ES(Elasticsearch)還能做訂單和用戶等數(shù)據(jù)的全文搜索, 一舉多得.
當然選擇ES還有一個原因就是之前了解過一點ES.
什么是Elasticsearch
Elasticsearch is a highly scalable open-source full-text search and analytics engine. It allows you to store, search, and analyze big volumes of data quickly and in near real time. It is generally used as the underlying engine/technology that powers applications that have complex search features and requirements.
Elasticsearch能近乎實時的全文搜索(如模糊搜索)和分析(如聚合)大量的數(shù)據(jù).
使用方法
跟著官方文檔先上手吧, 我選擇的是5.5.1版本的Es, 使用Docker安裝, 安裝方法就不多說了, 按文檔來就好.
這里簡單講一下Es中基本概念
index(索引)
可以理解為mysql的表
type(類型)
索引下的類型, 可以理解為分組. 在相同index的不同的type應該使用相同的數(shù)據(jù)結構.
不建議使用, 非要使用的話建議和索引同名, 因為在ES 7.x版本type會被廢棄.
document(文檔)
一個文檔就是一條數(shù)據(jù), 多個document組成了index
aggregation(聚合)
做統(tǒng)計就需要使用聚合.
可理解為mysql的group by語句, 但聚合更強大的是aggregations can be nested!(聚合可以被嵌套), 并且沒有層數(shù)限制.
bucket(桶)
理解為mysql group by之后的每個條目, aggregation之后會返回多個bucket.
Date Histogram Aggregation(日期直方圖聚合)
為了讓按時間聚合更簡單, ES內(nèi)置了一個日期直方圖的聚合類型, 可以方便的實現(xiàn)按小時(h),天(d),月(month),年(year)等時間段聚合數(shù)據(jù).
為了使用Date Histogram Aggregation, 需要時間字段是long型(精確到毫秒的時間戳),或者是RFC3339格式的字符串(如2006-01-02T15:04:05Z07:00).
這里值得注意的是時區(qū), 如果要設置時區(qū), 那么時間字段必須為RFC3339格式, 所以在ES中推薦統(tǒng)一使用RFC3339作為時間字段類型.
由于我們的時區(qū)是+08:00, 而ES默認使用UTC, 所以如果按天聚合的話聚合時間是從每一天的8:00到下一天的8:00, 如果要重置到0:00則需要設置時區(qū) +08:00.
{
"aggs": {
"by_day": {
"date_histogram": {
"field": "date",
"interval": "day",
"time_zone": "+08:00"
}
}
}
}
排序
假如有一個需求: 獲取這個月的訪問量最高的10個站點. 這時就需要使用到排序.
排序使用order
{
"aggregations": {
"pv": {
"sum": {
"field": "pv"
}
}
},
"terms": {
"field": "site_id",
"min_doc_count": 0,
"order": [
{
"pv": "desc"
}
],
"size": 10
}
}
可以指定size表示返回幾個.
但不支持類似mysql的offset, 因為es不支持聚合后分頁, 詳見https://blog.csdn.net/laoyang360/article/details/79112946.
實戰(zhàn)
可以用Restful接口操作ES, 但這樣比較麻煩, 所以我在github上找了一個go語言的Client: gopkg.in/olivere/elastic.v5.
筆者也看了下官方包:github.com/elastic/go-elasticsearch, 但官方支持的功能太簡單, 為了節(jié)約開發(fā)時間, 還是使用前者吧.
案例中使用到的結構體如下:
// 用戶
type User struct {
Id int64 `json:"id"`
Nickname string `json:"nickname"`
Mobile string `json:"mobile"`
Email string `json:"email"`
CreatedAt string `json:"created_at"`
LastLoginAt int64 `json:"last_login_at"`
}
// 每天的站點訪問記錄, 用來聚合出每月的訪問記錄
type SiteVisitLog struct {
Id string `json:"id"` // es隨機id
SiteId int64 `json:"site_id"` // 站點id
Day string `json:"day"` // 每天一條記錄, 用每天的凌晨代表一天
Pv int64 `json:"pv"` // 每訪問一次就記錄
Uv int64 `json:"uv"` // 使用cookie去重, 每天(0-24h)內(nèi)同一個cookie算一個
}
案例1: 統(tǒng)計某時間段內(nèi)新用戶的注冊數(shù)量
使用Search即可, 與mysql不同, 查詢結果中會帶有命中(hit)的文檔個數(shù)(TotalHits)而不需要count操作.
// getUserCount 返回start到end之間的文檔數(shù)量.
func getUserCount(start int64, end int64) (item *pb.EsUserCountItem) {
// 范圍查詢
query := elastic.NewRangeQuery("created_at")
if start != 0 {
query.Gte(start)
}
if end != 0 {
query.Lte(end)
}
ctx := context.Background()
result, err := client.Search().
Index(user.IndexName()).
Type(user.IndexName()).
Query(query).
Size(0). // 只是統(tǒng)計數(shù)量, 不需要返回文檔
Do(ctx)
if err != nil {
return
}
item = &pb.EsUserCountItem{All: result.Hits.TotalHits}
return
}
案例2: 聚合統(tǒng)計新用戶數(shù)量
支持秒級別的聚合, 威力巨大.
// AggsUser聚合start到end的新增用戶, interval為聚合顆粒度, 如year, quarter, month, week, day, hour, minute, second, 支持的單位詳情看文檔
func AggsUser(start, end int64, interval string) (items []*pb.AggsUserItem, err error) {
ctx := context.Background()
da := elastic.NewDateHistogramAggregation().
Interval(interval).
Field("created_at").
TimeZone("+08:00"). // 時區(qū)問題
MinDocCount(0) // 顯示沒有數(shù)據(jù)(文檔數(shù)為0)的bucket(桶)
// 條件
query := elastic.NewRangeQuery("created_at")
if start != 0 {
query.Gte(start)
}
if end != 0 {
query.Lte(end)
}
result, err := client.Search().
Index(user.IndexName()).
Type(user.IndexName()).
Query(query).
Size(0).
Aggregation("a", da).
Do(ctx)
if err != nil {
return
}
bs := &structs.DateHistogramBuckets{}
err = bs.UnMarshal(result.Aggregations["a"])
if err != nil {
return
}
items = make([]*pb.AggsUserItem, len(bs.Buckets))
for i, v := range bs.Buckets {
items[i] = &pb.AggsUserItem{
Time: v.Key,
All: v.DocCount,
}
}
return
}
案例3: 聚合統(tǒng)計站點訪問量(pv, uv)
體驗嵌套聚合的威力吧.
// AggsSiteVisitLog 聚合統(tǒng)計站點訪問量, 得到每月的總pv,uv
func AggsSiteVisitLog(siteId, start, end int64, interval string) (items []*pb.AggsSiteVisitLogItem, err error) {
ctx := context.Background()
pvAgg := elastic.NewSumAggregation().Field("pv") // 將pv求和
uvAgg := elastic.NewSumAggregation().Field("uv")
dAgg := elastic.NewDateHistogramAggregation().
Interval(interval).
Field("day").
TimeZone("+08:00").
MinDocCount(0).
SubAggregation("pv", pvAgg). // 在`時間聚合`中再嵌套`求和聚合`
SubAggregation("uv", uvAgg)
rQuery := elastic.NewRangeQuery("day")
if start != 0 {
rQuery.Gte(start)
}
if end != 0 {
rQuery.Lte(end)
}
query := elastic.NewBoolQuery()
query.Must(rQuery)
s := client.Search().
Index(siteVisitLog.IndexName()).
Type(siteVisitLog.IndexName())
if siteId != 0 {
query.Must(elastic.NewTermQuery("site_id", siteId))
}
result, err := s.
Query(elastic.NewConstantScoreQuery(query)).
Size(0).
Aggregation("a", dAgg).
Do(ctx)
if err != nil {
return
}
// 結構體有點復雜, 使用log先打印出結果再寫結構體解析吧
buckets := &struct {
Buckets []*struct {
Key int64 `json:"key"`
DocCount int64 `json:"doc_count"`
Uv struct {
Value float64 `json:"value"`
} `json:"uv"`
Pv struct {
Value float64 `json:"value"`
} `json:"pv"`
} `json:"buckets"`
}{}
bs, _ := result.Aggregations["a"].MarshalJSON()
err = json.Unmarshal(bs, buckets)
if err != nil {
return
}
items = make([]*pb.AggsSiteVisitLogItem, len(buckets.Buckets))
for i, v := range buckets.Buckets {
items[i] = &pb.AggsSiteVisitLogItem{
Time: v.Key,
Uv: int64(v.Uv.Value),
Pv: int64(v.Pv.Value),
}
}
return
}
案例三有點復雜, 要知道的知識點還挺多
布爾過濾器
待續(xù)
constant_score
待續(xù)