更多精彩文章:https://deepzz.com
Desc:Go sync 包的使用方法错洁,sync.Mutex赊瞬,sync.RMutex,sync.Once椭岩,sync.Cond茅逮,sync.Waitgroup
盡管 Golang 推薦通過 channel 進(jìn)行通信和同步,但在實(shí)際開發(fā)中 sync 包用得也非常的多判哥。另外 sync 下還有一個(gè) atomic 包献雅,提供了一些底層的原子操作(這里不做介紹)。本篇文章主要介紹該包下的鎖的一些概念及使用方法塌计。
整個(gè)包都圍繞這 Locker 進(jìn)行挺身,這是一個(gè) interface:
type Locker interface {
Lock()
Unlock()
}
只有兩個(gè)方法,Lock()
和 Unlock()
锌仅。
另外該包下的對象章钾,在使用過之后,千萬不要復(fù)制技扼。
有許多同學(xué)不理解鎖的概念伍玖,下面會(huì)一一介紹到:
為什么需要鎖嫩痰?
在并發(fā)的情況下剿吻,多個(gè)線程或協(xié)程同時(shí)去修改一個(gè)變量,可能會(huì)出現(xiàn)如下情況:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var a = 0
// 啟動(dòng) 100 個(gè)協(xié)程串纺,需要足夠大
// var lock sync.Mutex
for i := 0; i < 100; i++ {
go func(idx int) {
// lock.Lock()
// defer lock.Unlock()
a += 1
fmt.Printf("goroutine %d, a=%d\n", idx, a)
}(i)
}
// 等待 1s 結(jié)束主程序
// 確保所有協(xié)程執(zhí)行完
time.Sleep(time.Second)
}
觀察打印結(jié)果丽旅,是否出現(xiàn) a 的值是相同的情況(未出現(xiàn)則重試或調(diào)大協(xié)程數(shù)),答案:是的纺棺。
顯然這不是我們想要的結(jié)果榄笙。出現(xiàn)這種情況的原因是,協(xié)程依次執(zhí)行:從寄存器讀取 a 的值 -> 然后做加法運(yùn)算 -> 最后寫會(huì)寄存器祷蝌。試想茅撞,此時(shí)一個(gè)協(xié)程取出 a 的值 3,正在做加法運(yùn)算(還未寫回寄存器)。同時(shí)另一個(gè)協(xié)程此時(shí)去取米丘,取出了同樣的 a 的值 3剑令。最終導(dǎo)致的結(jié)果是运敢,兩個(gè)協(xié)程產(chǎn)出的結(jié)果相同幽污,a 相當(dāng)于只增加了 1。
所以哟楷,鎖的概念就是堕扶,我正在處理 a(鎖定)碍脏,你們誰都別和我搶,等我處理完了(解鎖)稍算,你們再處理典尾。這樣就實(shí)現(xiàn)了,同時(shí)處理 a 的協(xié)程只有一個(gè)糊探,就實(shí)現(xiàn)了同步急黎。
把上面代碼里的注釋取消掉再試下。
什么是互斥鎖 Mutex侧到?
什么是互斥鎖勃教?它是鎖的一種具體實(shí)現(xiàn),有兩個(gè)方法:
func (m *Mutex) Lock()
func (m *Mutex) Unlock()
在首次使用后不要復(fù)制該互斥鎖匠抗。對一個(gè)未鎖定的互斥鎖解鎖將會(huì)產(chǎn)生運(yùn)行時(shí)錯(cuò)誤故源。
一個(gè)互斥鎖只能同時(shí)被一個(gè) goroutine 鎖定,其它 goroutine 將阻塞直到互斥鎖被解鎖(重新爭搶對互斥鎖的鎖定)汞贸。如:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
ch := make(chan struct{}, 2)
var l sync.Mutex
go func() {
l.Lock()
defer l.Unlock()
fmt.Println("goroutine1: 我會(huì)鎖定大概 2s")
time.Sleep(time.Second * 2)
fmt.Println("goroutine1: 我解鎖了绳军,你們?nèi)尠?)
ch <- struct{}{}
}()
go func() {
fmt.Println("groutine2: 等待解鎖")
l.Lock()
defer l.Unlock()
fmt.Println("goroutine2: 哈哈,我鎖定了")
ch <- struct{}{}
}()
// 等待 goroutine 執(zhí)行結(jié)束
for i := 0; i < 2; i++ {
<-ch
}
}
注意矢腻,平時(shí)所說的鎖定门驾,其實(shí)就是去鎖定互斥鎖,而不是說去鎖定一段代碼多柑。也就是說奶是,當(dāng)代碼執(zhí)行到有鎖的地方時(shí),它獲取不到互斥鎖的鎖定竣灌,會(huì)阻塞在那里聂沙,從而達(dá)到控制同步的目的。
什么是讀寫鎖 RWMutex?
那么什么是讀寫鎖呢初嘹?它是針對讀寫操作的互斥鎖及汉,讀寫鎖與互斥鎖最大的不同就是可以分別對 讀
、寫
進(jìn)行鎖定屯烦。一般用在大量讀操作坷随、少量寫操作的情況:
func (rw *RWMutex) Lock()
func (rw *RWMutex) Unlock()
func (rw *RWMutex) RLock()
func (rw *RWMutex) RUnlock()
由于這里需要區(qū)分讀寫鎖定房铭,我們這樣定義:
- 讀鎖定(RLock),對讀操作進(jìn)行鎖定
- 讀解鎖(RUnlock)温眉,對讀鎖定進(jìn)行解鎖
- 寫鎖定(Lock)育叁,對寫操作進(jìn)行鎖定
- 寫解鎖(Unlock),對寫鎖定進(jìn)行解鎖
在首次使用之后芍殖,不要復(fù)制該讀寫鎖豪嗽。不要混用鎖定和解鎖,如:Lock 和 RUnlock豌骏、RLock 和 Unlock龟梦。因?yàn)閷ξ醋x鎖定的讀寫鎖進(jìn)行讀解鎖或?qū)ξ磳戞i定的讀寫鎖進(jìn)行寫解鎖將會(huì)引起運(yùn)行時(shí)錯(cuò)誤。
如何理解讀寫鎖呢窃躲?
- 同時(shí)只能有一個(gè) goroutine 能夠獲得寫鎖定计贰。
- 同時(shí)可以有任意多個(gè) gorouinte 獲得讀鎖定。
- 同時(shí)只能存在寫鎖定或讀鎖定(讀和寫互斥)蒂窒。
也就是說躁倒,當(dāng)有一個(gè) goroutine 獲得寫鎖定,其它無論是讀鎖定還是寫鎖定都將阻塞直到寫解鎖洒琢;當(dāng)有一個(gè) goroutine 獲得讀鎖定秧秉,其它讀鎖定任然可以繼續(xù);當(dāng)有一個(gè)或任意多個(gè)讀鎖定衰抑,寫鎖定將等待所有讀鎖定解鎖之后才能夠進(jìn)行寫鎖定象迎。所以說這里的讀鎖定(RLock)目的其實(shí)是告訴寫鎖定:有很多人正在讀取數(shù)據(jù),你給我站一邊去呛踊,等它們讀(讀解鎖)完你再來寫(寫鎖定)砾淌。
使用例子:
package main
import (
"fmt"
"math/rand"
"sync"
)
var count int
var rw sync.RWMutex
func main() {
ch := make(chan struct{}, 10)
for i := 0; i < 5; i++ {
go read(i, ch)
}
for i := 0; i < 5; i++ {
go write(i, ch)
}
for i := 0; i < 10; i++ {
<-ch
}
}
func read(n int, ch chan struct{}) {
rw.RLock()
fmt.Printf("goroutine %d 進(jìn)入讀操作...\n", n)
v := count
fmt.Printf("goroutine %d 讀取結(jié)束,值為:%d\n", n, v)
rw.RUnlock()
ch <- struct{}{}
}
func write(n int, ch chan struct{}) {
rw.Lock()
fmt.Printf("goroutine %d 進(jìn)入寫操作...\n", n)
v := rand.Intn(1000)
count = v
fmt.Printf("goroutine %d 寫入結(jié)束谭网,新值為:%d\n", n, v)
rw.Unlock()
ch <- struct{}{}
}
WaitGroup 例子
WaitGroup 用于等待一組 goroutine 結(jié)束汪厨,用法很簡單。它有三個(gè)方法:
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
Add 用來添加 goroutine 的個(gè)數(shù)愉择。Done 執(zhí)行一次數(shù)量減 1劫乱。Wait 用來等待結(jié)束:
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i, s := range seconds {
// 計(jì)數(shù)加 1
wg.Add(1)
go func(i, s int) {
// 計(jì)數(shù)減 1
defer wg.Done()
fmt.Printf("goroutine%d 結(jié)束\n", i)
}(i, s)
}
// 等待執(zhí)行結(jié)束
wg.Wait()
fmt.Println("所有 goroutine 執(zhí)行結(jié)束")
}
注意,wg.Add()
方法一定要在 goroutine 開始前執(zhí)行哦薄辅。
Cond 條件變量
Cond 實(shí)現(xiàn)一個(gè)條件變量要拂,即等待或宣布事件發(fā)生的 goroutines 的會(huì)合點(diǎn)。
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
它會(huì)保存一個(gè)通知列表站楚。
func NewCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
Wait 方法、Signal 方法和 Broadcast 方法搏嗡。它們分別代表了等待通知窿春、單發(fā)通知和廣播通知的操作拉一。
我們來看一下 Wait 方法:
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
它的操作為:加入到通知列表 -> 解鎖 L -> 等待通知 -> 鎖定 L。其使用方法是:
c.L.Lock()
for !condition() {
c.Wait()
}
... make use of condition ...
c.L.Unlock()
舉個(gè)例子:
// Package main provides ...
package main
import (
"fmt"
"sync"
"time"
)
var count int = 4
func main() {
ch := make(chan struct{}, 5)
// 新建 cond
var l sync.Mutex
cond := sync.NewCond(&l)
for i := 0; i < 5; i++ {
go func(i int) {
// 爭搶互斥鎖的鎖定
cond.L.Lock()
defer func() {
cond.L.Unlock()
ch <- struct{}{}
}()
// 條件是否達(dá)成
for count > i {
cond.Wait()
fmt.Printf("收到一個(gè)通知 goroutine%d\n", i)
}
fmt.Printf("goroutine%d 執(zhí)行結(jié)束\n", i)
}(i)
}
// 確保所有 goroutine 啟動(dòng)完成
time.Sleep(time.Millisecond * 20)
// 鎖定一下旧乞,我要改變 count 的值
fmt.Println("broadcast...")
cond.L.Lock()
count -= 1
cond.Broadcast()
cond.L.Unlock()
time.Sleep(time.Second)
fmt.Println("signal...")
cond.L.Lock()
count -= 2
cond.Signal()
cond.L.Unlock()
time.Sleep(time.Second)
fmt.Println("broadcast...")
cond.L.Lock()
count -= 1
cond.Broadcast()
cond.L.Unlock()
for i := 0; i < 5; i++ {
<-ch
}
}
Pool 臨時(shí)對象池
sync.Pool
可以作為臨時(shí)對象的保存和復(fù)用的集合蔚润。其結(jié)構(gòu)為:
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})
新鍵 Pool 需要提供一個(gè) New 方法,目的是當(dāng)獲取不到臨時(shí)對象時(shí)自動(dòng)創(chuàng)建一個(gè)(不會(huì)主動(dòng)加入到 Pool 中)尺栖,Get 和 Put 方法都很好理解嫡纠。
深入了解過 Go 的同學(xué)應(yīng)該知道,Go 的重要組成結(jié)構(gòu)為 M延赌、P除盏、G。Pool 實(shí)際上會(huì)為每一個(gè)操作它的 goroutine 相關(guān)聯(lián)的 P 都生成一個(gè)本地池挫以。如果從本地池 Get 對象的時(shí)候者蠕,本地池沒有,則會(huì)從其它的 P 本地池獲取掐松。因此踱侣,Pool 的一個(gè)特點(diǎn)就是:可以把由其中的對象值產(chǎn)生的存儲(chǔ)壓力進(jìn)行分?jǐn)偂?/p>
它有著以下特點(diǎn):
- Pool 中的對象在僅有 Pool 有著唯一索引的情況下可能會(huì)被自動(dòng)刪除(取決于下一次 GC 執(zhí)行的時(shí)間)。
- goroutines 協(xié)程安全大磺,可以同時(shí)被多個(gè)協(xié)程使用抡句。
GC 的執(zhí)行一般會(huì)使 Pool 中的對象全部移除。
那么 Pool 都適用于什么場景呢杠愧?從它的特點(diǎn)來說玉转,適用與無狀態(tài)的對象的復(fù)用,而不適用與如連接池之類的殴蹄。在 fmt 包中有一個(gè)很好的使用池的例子究抓,它維護(hù)一個(gè)動(dòng)態(tài)大小的臨時(shí)輸出緩沖區(qū)。
官方例子:
package main
import (
"bytes"
"io"
"os"
"sync"
"time"
)
var bufPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func timeNow() time.Time {
return time.Unix(1136214245, 0)
}
func Log(w io.Writer, key, val string) {
// 獲取臨時(shí)對象袭灯,沒有的話會(huì)自動(dòng)創(chuàng)建
b := bufPool.Get().(*bytes.Buffer)
b.Reset()
b.WriteString(timeNow().UTC().Format(time.RFC3339))
b.WriteByte(' ')
b.WriteString(key)
b.WriteByte('=')
b.WriteString(val)
w.Write(b.Bytes())
// 將臨時(shí)對象放回到 Pool 中
bufPool.Put(b)
}
func main() {
Log(os.Stdout, "path", "/search?q=flowers")
}
打印結(jié)果:
2006-01-02T15:04:05Z path=/search?q=flowers
Once 執(zhí)行一次
使用 sync.Once
對象可以使得函數(shù)多次調(diào)用只執(zhí)行一次刺下。其結(jié)構(gòu)為:
type Once struct {
m Mutex
done uint32
}
func (o *Once) Do(f func())
用 done 來記錄執(zhí)行次數(shù),用 m 來保證保證僅被執(zhí)行一次稽荧。只有一個(gè) Do 方法橘茉,調(diào)用執(zhí)行。
package main
import (
"fmt"
"sync"
)
func main() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}
# 打印結(jié)果
Only once