1、golang語言Redis客戶端簡介
通常我們在選擇某一組件的客戶端包時(shí),優(yōu)先選擇官方提供的包。redis 本身雖然并沒有提供 go 語言的 client 包,但是提供了一份 client 包列表陌知,并對(duì)部分包做了推薦標(biāo)識(shí),具體參考:https://redis.io/clients#go
接下來的全部使用示例掖肋,都是在 "github.com/gomodule/redigo/redis" 包的基礎(chǔ)上實(shí)現(xiàn)仆葡。選擇這個(gè)包的原因在于,這個(gè)包只有一個(gè) Do 函數(shù)執(zhí)行 Redis 命令,使用方法更接近Redis的原生命令沿盅,這無疑會(huì)降低我們的學(xué)習(xí)成本把篓,同時(shí)該包對(duì)Print-alike API, Pipelining (including transactions), Pub/Sub, Connection pooling, scripting 等我們常用的功能也有良好的支持。
在開始之前腰涧,首先下載該第三方包
go get "github.com/gomodule/redigo/redis"
2韧掩、Redis連接池對(duì)象的構(gòu)建和關(guān)閉
通常在初始化階段創(chuàng)建 Redis 客戶端連接池
var RedisClientPool *redis.Pool
func InitJimDb() {
RedisClientPool = &redis.Pool{
MaxIdle: 100,
MaxActive: 12000,
IdleTimeout: time.Duration(180),
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword("xxxxxx"),redis.DialReadTimeout(time.Second),redis.DialWriteTimeout(time.Second))
if err != nil {
logger.Errorf("redisClient dial host: %s, auth: %s err: %s", "127.0.0.1:6379", "xxxxxx", err.Error())
return nil, err
}
return c, nil
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
if err != nil {
logger.Errorf("redisClient ping err: %s", err.Error())
}
return err
},
}
logger.Infof("init jimDb ok")
}
func CloseJimDb() {
if RedisClientPool != nil {
err := RedisClientPool.Close()
if err != nil {
logger.Errorf("do CloseJimDb error:%s", err.Error())
}
}
}
客戶端初始化參數(shù)含義:
- MaxIdle:連接池中最大的空閑連接數(shù)
- MaxActive:允許的最大連接 Redis 的連接數(shù),設(shè)置為0則沒有限制
- IdleTimeout:空閑超時(shí)時(shí)間窖铡,超過此時(shí)間后疗锐,則會(huì)關(guān)閉連接。若此值設(shè)置為0费彼,則不會(huì)關(guān)閉連接滑臊,應(yīng)用應(yīng)設(shè)置一個(gè)小于服務(wù)超時(shí)的值
- Wait:若為 true,則當(dāng)連接數(shù)達(dá)到 MaxActive 時(shí)箍铲,使用 Get() 獲取新的連接時(shí)將會(huì)等待简珠,直到有連接釋放連接
- MaxConnLifetime:最大連接生命時(shí)長,當(dāng)連接存活時(shí)間超過改值虹钮,則會(huì)被關(guān)閉,若設(shè)置為0膘融,則不會(huì)因?yàn)榇婊顣r(shí)間關(guān)閉連接
- Dial:用于創(chuàng)建和配置連接的支持方法芙粱,通常用于 DB 選擇/連接超時(shí)時(shí)間/讀超時(shí)時(shí)間/寫超時(shí)時(shí)間/密碼認(rèn)證等的初始化
在執(zhí)行 Redis 數(shù)據(jù)庫操作時(shí),需要首先獲取連接氧映,同時(shí)記得在操作結(jié)束后釋放連接
rc := g.RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
logger.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
注:如果不執(zhí)行 rc.Close() 釋放連接,服務(wù)將很快打滿內(nèi)存春畔,通過 pprof 將發(fā)現(xiàn)連接數(shù)非常大。
系統(tǒng)退出時(shí)岛都,記得手動(dòng) Close() 客戶端連接律姨。
3、Redis客戶端基本用法
(1)臼疫、redis基本數(shù)據(jù)類型
數(shù)據(jù)類型 | 可以存儲(chǔ)的值 | 操作 |
---|---|---|
STRING | 字符串择份、整數(shù)或者浮點(diǎn)數(shù) | 對(duì)整個(gè)字符串或者字符串的其中一部分執(zhí)行操作 對(duì)整數(shù)和浮點(diǎn)數(shù)執(zhí)行自增或者自減操作 |
LIST | 列表 | 從兩端壓入或者彈出元素 對(duì)單個(gè)或者多個(gè)元素進(jìn)行修剪, 只保留一個(gè)范圍內(nèi)的元素 |
SET | 無序集合 | 添加烫堤、獲取荣赶、移除單個(gè)元素 檢查一個(gè)元素是否存在于集合中 計(jì)算交集、并集鸽斟、差集 從集合里面隨機(jī)獲取元素 |
HASH | 包含鍵值對(duì)的無序散列表 | 添加拔创、獲取、移除單個(gè)鍵值對(duì) 獲取所有鍵值對(duì) 檢查某個(gè)鍵是否存在 |
ZSET | 有序集合 | 添加富蓄、獲取剩燥、刪除元素 根據(jù)分值范圍或者成員來獲取元素 計(jì)算一個(gè)鍵的排名 |
各類型的基本操作預(yù)發(fā)可以參考Redis,本文不做過多介紹立倍。
注:對(duì)于 Redis 鍵值對(duì)的 Key 值來說灭红,key 只有唯一的類型 String侣滩。
示例:
127.0.0.1:6379> set 1 "value1"
OK
127.0.0.1:6379> set "1" "value2"
OK
127.0.0.1:6379> get 1
"value2"
127.0.0.1:6379> get "1"
"value2"
(2)、Redigo客戶端支持的操作函數(shù)
type Conn interface {
// Close closes the connection.
Close() error
// Err returns a non-nil value when the connection is not usable.
Err() error
// Do sends a command to the server and returns the received reply.
Do(commandName string, args ...interface{}) (reply interface{}, err error)
// Send writes the command to the client's output buffer.
Send(commandName string, args ...interface{}) error
// Flush flushes the output buffer to the Redis server.
Flush() error
// Receive receives a single reply from the Redis server
Receive() (reply interface{}, err error)
}
Redigo 客戶端支持的與數(shù)據(jù)操作相關(guān)的方法主要有4個(gè):Do()比伏、Send()胜卤、Flush() 和 Receive()。
其中赁项,最多用到的方法是 Do(),其功能是單次執(zhí)行某一命令葛躏,并返回執(zhí)行結(jié)果。Send()悠菜、Flush() 和 Receive() 是一組操作舰攒,通常同時(shí)出現(xiàn),主要實(shí)現(xiàn) pipline 方式的數(shù)據(jù)寫入悔醋,也可以實(shí)現(xiàn)消息的訂閱通知功能摩窃。
(3)、Do()方法示例
Do(commandName string, args ...interface{}) (reply interface{}, err error)
根據(jù)源碼可知芬骄,Do() 的入?yún)⒒福谝粋€(gè)為 commandName,也就是Redis 本身支持指令的大寫字符串账阻,arg 根據(jù)命令本身的參數(shù)按順序填入即可蒂秘。由于該客戶端支持的 commandName 與 Redis cli 本身的指令名一致,因此學(xué)習(xí)成本較低淘太。具體我們以 SET 命令為例姻僧,其余的操作一依次類推,只要命令和所需參數(shù)一致即可:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
t.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
reply, err := rc.Do("SET", "1", "value1")
if err != nil {
t.Errorf("do set %s,%s error:%s", "1", "valuea", err.Error())
return
}
reply, err := rc.Do("MSET", "1", "value1","2","value2")
if err != nil {
t.Errorf("do set %s,%s error:%s", "1", "valuea", err.Error())
return
}
上面示例在操作參數(shù)很少時(shí)還是十分方便的蒲牧,但是當(dāng)我們想批量寫入大量數(shù)據(jù)時(shí)撇贺,就會(huì)顯得異常繁瑣,那么有沒有簡單的方式執(zhí)行數(shù)據(jù)的批量寫入呢冰抢?
我們可以通過自己構(gòu)建 []interface{} 類型的參數(shù)數(shù)組實(shí)現(xiàn)松嘶,具體入如下所示:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
t.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
var values = []string{"1", "2", "3", "4", "5", "6", "7", "8", "8", "10"}
args := []interface{}{"redis-list"}
for _, v := range values {
args = append(args, v)
}
reply, err := rc.Do("RPUSH", args...)
if err != nil {
t.Errorf("do set %s,%s error:%s", GetKey("test"), "valuea", err.Error())
return
}
此外,redigo 客戶端其實(shí)已經(jīng)給我們提供了一個(gè)參數(shù)轉(zhuǎn)化的方法挎扰,而無需我們自己去手動(dòng)將參數(shù)轉(zhuǎn)化為 []interface{}
具體的源碼如下:
// Args is a helper for constructing command arguments from structured values.
type Args []interface{}
// Add returns the result of appending value to args.
func (args Args) Add(value ...interface{}) Args {
return append(args, value...)
}
// AddFlat returns the result of appending the flattened value of v to args.
//
// Maps are flattened by appending the alternating keys and map values to args.
//
// Slices are flattened by appending the slice elements to args.
//
// Structs are flattened by appending the alternating names and values of
// exported fields to args. If v is a nil struct pointer, then nothing is
// appended. The 'redis' field tag overrides struct field names. See ScanStruct
// for more information on the use of the 'redis' field tag.
//
// Other types are appended to args as is.
func (args Args) AddFlat(v interface{}) Args {
rv := reflect.ValueOf(v)
switch rv.Kind() {
case reflect.Struct:
args = flattenStruct(args, rv)
case reflect.Slice:
for i := 0; i < rv.Len(); i++ {
args = append(args, rv.Index(i).Interface())
}
case reflect.Map:
for _, k := range rv.MapKeys() {
args = append(args, k.Interface(), rv.MapIndex(k).Interface())
}
case reflect.Ptr:
if rv.Type().Elem().Kind() == reflect.Struct {
if !rv.IsNil() {
args = flattenStruct(args, rv.Elem())
}
} else {
args = append(args, v)
}
default:
args = append(args, v)
}
return args
}
Add() 方法用于添加基本類型參數(shù)喘蟆,例如 string、integer鼓鲁、float蕴轨、interface 等。
AddFlat() 用于添加復(fù)雜類型參數(shù)骇吭,例如 Struce橙弱、slice、map、str 等棘脐。
根據(jù)源碼可知斜筐,其基本原理都是幫助我們構(gòu)建一個(gè) []interface{}類型的參數(shù)組。只是簡化了我們的代碼量蛀缝。具體實(shí)例如下:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
fmt.Printf("Cache init redis client close err: %s", rcErr.Error())
}
}()
var tempMap = map[int64][]byte{
1:bytes,
2:bytes,
3:bytes,
4:bytes,
5:bytes,
6:bytes,
}
_,err := rc.Do("HMSET",redis.Args{}.Add("redis-hash").AddFlat(tempMap)...)
if err != nil{
fmt.Println("hmset error:",err.Error())
}
(4)顷链、pipline數(shù)據(jù)寫入方式
pipline 的數(shù)據(jù)寫入方式與 Do() 這種單次寫入方式最大的區(qū)別在于批量數(shù)據(jù)處理的寫入速度。由于 Do() 這種單次寫入需要等返回結(jié)果之后才能進(jìn)行第二次操作屈梁,而 pipline 可以在第一次操作結(jié)果返回之前嗤练,繼續(xù)發(fā)送后續(xù)的 Request,直到把所有的 Request 都發(fā)送完畢在讶。因此煞抬,當(dāng)操作很多時(shí),pipeline 更高效构哺。
Redigo客戶端的pipline處理方式依賴于三個(gè)方法:Send()革答、Flush()和Receive()。
Send() 發(fā)送命令到輸出緩沖區(qū)曙强。
Flush() 寫入命令并刷新輸出緩沖區(qū)残拐。
Receive()接收服務(wù)器的返回值。
調(diào)用 Receive() 的次數(shù)必須對(duì)應(yīng)使用 Send() 發(fā)送命令的次數(shù)碟嘴。具體示例如大下:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
t.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
rc.Send("SET", "1","1")
rc.Send("SET", "2","2")
rc.Flush()
res,_ :=rc.Receive()
fmt.Println(res)
res,_ = rc.Receive()
fmt.Println(res)
注:雖然多個(gè) Send() 的指令是在執(zhí)行 Flush() 時(shí)才真正將輸出緩沖區(qū)數(shù)據(jù)刷新到 Redis 服務(wù)器,但是 send()中的多個(gè)指令并不是事務(wù)蹦骑。如果需要保證多個(gè)操作的原子性,還是需要使用事務(wù)來實(shí)現(xiàn)臀防。
(5)、redis 客戶端的并發(fā)
redigo 支持 Receive 方法的一個(gè)并發(fā)調(diào)用者和 Send 和 Flush 方法的一個(gè)并發(fā)調(diào)用者边败,但是不支持 Do() 方法的并發(fā)調(diào)用袱衷。若需要完全并發(fā)訪問 redis,需要參考第2節(jié)內(nèi)容笑窜,創(chuàng)建redis 客戶端連接池致燥,通過多個(gè)連接實(shí)現(xiàn)真正意義上的并發(fā)。Redis-Doc
理解起來也很容易排截,Do()嫌蚤、Send()、Flush() 和 Receive()断傲,都必須依賴某一個(gè)連接實(shí)現(xiàn),具體的:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
t.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
reply, err := rc.Do("SET", "1", "value1")
無論擁有幾個(gè)并發(fā)協(xié)程脱吱,由于所有操作都基于同一個(gè)連接,那么 Do() 操作一定需要執(zhí)行完一個(gè)完整的 Request-Response才可以執(zhí)行下一個(gè)請求认罩,因此 Do() 無法支持并發(fā)調(diào)用箱蝠。而 Send()、Flush() 和 Receive() 方法是異步操作,無需等待Rsponse返回宦搬,因此可以支持并發(fā)調(diào)用牙瓢。具體的:一個(gè)協(xié)程執(zhí)行 Send() 和 Flush() 方法用于寫入操作,另一個(gè)協(xié)程執(zhí)行 Receive() 方法用于異步接收操作結(jié)果间校。具體如下:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
t.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
var wg sync.WaitGroup
wg.Add(1)
go func(){
defer wg.Done()
for i:=0;i<4;i++{
rc.Send("SET", i+1,i+1)
}
rc.Flush()
}()
wg.Add(1)
go func(){
defer wg.Done()
for i:=0;i<4;i++{
res,_ := rc.Receive()
fmt.Println(res)
}
}()
wg.Wait()
fmt.Println("exec success")
如果要實(shí)現(xiàn)真正意義上對(duì) Redis 的并發(fā)訪問矾克,只能依賴連接池實(shí)現(xiàn)。
(6)憔足、事務(wù)操作
單個(gè) Redis 命令的執(zhí)行是原子性的胁附,但 Redis 沒有在事務(wù)上增加任何維持原子性的機(jī)制,所以 Redis 事務(wù)的執(zhí)行并不是原子性的四瘫。事務(wù)可以理解為一個(gè)打包的批量執(zhí)行腳本汉嗽,但批量指令并非原子化的操作,中間某條指令的失敗不會(huì)導(dǎo)致前面已做指令的回滾找蜜,也不會(huì)造成后續(xù)的指令不做饼暑。
Redis 事務(wù)
命令 | 描述 |
---|---|
MULTI | 標(biāo)記一個(gè)事務(wù)塊的開始 |
EXEC | 執(zhí)行所有事務(wù)塊內(nèi)的命令 |
DISCARD | 取消事務(wù),放棄執(zhí)行事務(wù)塊內(nèi)的所有命令 |
WATCH | 監(jiān)視一個(gè)(或多個(gè))key洗做,如果在事務(wù)執(zhí)行之前這個(gè)(或多個(gè))key被其他命令所改動(dòng)弓叛,那么事務(wù)將被打斷 |
UNWATCH | 取消 WATCH 命令對(duì)所有 keys 的監(jiān)視 |
其中我們常用的命令是前三個(gè),例如诚纸,我們需要對(duì) userCount 和 productCount同時(shí)進(jìn)行自增處理撰筷,為保證兩個(gè)字段一致,基于事務(wù)來實(shí)現(xiàn)畦徘,具體示例如下:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
t.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
rc.Send("MULTI")
rc.Send("INCR", "userCount")
rc.Send("INCR", "produceCount")
rc.Send("EXEC")
rc.Flush()
r,_ := rc.Receive()
fmt.Println(r)
r,_ = rc.Receive()
fmt.Println(r)
r,_ = rc.Receive()
fmt.Println(r)
r,_ = rc.Receive()
fmt.Println(r)
對(duì)一個(gè)的結(jié)果如下:
OK //事務(wù)開啟的返回
QUEUED //操作入隊(duì)列
QUEUED //操作入隊(duì)列
[5 5] //執(zhí)行操作并返回結(jié)果
實(shí)際上毕籽,大多數(shù)情況下,我們并不關(guān)心中間過程的返回結(jié)果井辆,只關(guān)注最終的結(jié)果关筒,這種情況下,我們只需將最后一條 Send() 指令替換成 Do()即可杯缺。
Do方法結(jié)合了Send蒸播,F(xiàn)lush和Receive方法的功能。
Do方法首先寫入命令并刷新輸出緩沖區(qū)萍肆。接下來袍榆,
Do方法接收所有待處理的回復(fù),包括Do執(zhí)行的命令的回復(fù)塘揣。
如果收到的任何回復(fù)都是錯(cuò)誤包雀,則Do返回錯(cuò)誤。 如果沒有錯(cuò)誤亲铡,則Do返回最后一個(gè)返回值馏艾。
如果Do方法的命令參數(shù)是“”劳曹,則Do方法將刷新輸出緩沖區(qū)并接收掛起的回復(fù)而不發(fā)送命令。
具體示例如下:
rc := RedisClientPool.Get()
defer func() {
rcErr := rc.Close()
if rcErr != nil {
t.Errorf("Cache init redis client close err: %s", rcErr.Error())
}
}()
rc.Send("MULTI")
rc.Send("INCR", "userCount")
rc.Send("INCR", "produceCount")
r, _ := rc.Do("EXEC")
fmt.Println(r) // [1,1]
返回結(jié)果為:
[6 6]
通過 Do() 命令完成最終事務(wù)的執(zhí)行琅摩,將為我們忽略中間結(jié)果铁孵,更好的關(guān)注事務(wù)整體的運(yùn)行結(jié)果。