利用通道創(chuàng)建并發(fā)安全的映射或切片
創(chuàng)建一個(gè)并發(fā)安全的映射或切片恤浪,不需要使用鎖或者其他底層原語(yǔ)
我們之前講過(guò)值類型的數(shù)據(jù)在函數(shù)參數(shù)傳遞時(shí)是拷貝傳遞,所以沒有并發(fā)安全問(wèn)題肴楷,我們并不用擔(dān)心傳遞的值被多個(gè)協(xié)程操作的后果水由。然而當(dāng)我們使用切片、映射等引用類型參數(shù)赛蔫,甚至指針等參數(shù)時(shí)砂客,多個(gè)并發(fā)協(xié)程對(duì)該參數(shù)的操作會(huì)引起混亂,這樣的參數(shù)傳遞是數(shù)據(jù)不安全的呵恢。
我們知道對(duì)于引用或指針在協(xié)程內(nèi)的操作鞠值,如果它同時(shí)被多個(gè)協(xié)程使用,一般我們需要加同步鎖渗钉,以保證數(shù)據(jù)安全彤恶,但是加鎖會(huì)損失一些計(jì)算資源,如果鎖過(guò)多的話會(huì)影響整體效率鳄橘。
有沒有不加鎖的方式來(lái)保證引用類型或指針類型的并發(fā)安全的實(shí)現(xiàn)方案呢声离?這就要利用通道了,我們知道瘫怜,在Go中术徊,通道通信是并發(fā)安全的。下面以映射Map和切片Slice為例鲸湃,我們來(lái)創(chuàng)建并發(fā)安全的數(shù)據(jù)結(jié)構(gòu)赠涮。
并發(fā)安全映射
安全映射的實(shí)現(xiàn)其實(shí)就是在一個(gè)協(xié)程里執(zhí)行一個(gè)內(nèi)部方法以操作一個(gè)普通的map,外界只能通過(guò)通道來(lái)操作這個(gè)內(nèi)部映射暗挑,這樣就能保證對(duì)這個(gè)映射的所有訪問(wèn)都是串行的笋除。這種方法運(yùn)行著一個(gè)無(wú)限循環(huán),阻塞等待一個(gè)輸入通道的命令(即insert炸裆、remove等)垃它。
以下為具體實(shí)現(xiàn):
// 定義一個(gè)并發(fā)安全的映射,此處聲明該安全映射的接口
type SafeMap interface {
Insert(string, interface{}) // 插入鍵值
Delete(string) // 刪除鍵
Find(string) (interface{}, bool) // 查找鍵值
Len() int // map元素長(zhǎng)度
Update(string, UpdateFunc) // 更新鍵值
Close() map[string]interface{} // 關(guān)閉map,實(shí)質(zhì)關(guān)閉通道
}
type UpdateFunc func(interface{}, bool) interface{}
// 操作安全映射的命令數(shù)據(jù)結(jié)構(gòu)
type commandData struct {
action commandAction // 操作類型
key string // 操作的鍵
value interface{} // 操作的值
result chan<- interface{} // 操作的結(jié)果嗤瞎,用于查找結(jié)果返回
data chan<- map[string]interface{} // map存儲(chǔ)的數(shù)據(jù),用于關(guān)閉通道時(shí)返回map數(shù)據(jù)
updater UpdateFunc // 更新函數(shù)听系,用于更新時(shí)設(shè)置更新函數(shù)
}
// 定義安全映射的操作項(xiàng)
type commandAction int
const (
remove commandAction = iota
end
find
insert
length
update
)
// safeMap的實(shí)現(xiàn)基于一個(gè)可發(fā)送和接收的commandData類型值的通道
type safeMap chan commandData
// 安全映射的新增操作
func (sm safeMap) Insert(key string, value interface{}) {
// 向通道發(fā)送新增操作命令
sm <- commandData{action: insert, key: key, value: value}
}
// 安全映射的刪除操作
func (sm safeMap) Delete(key string) {
// 向通道發(fā)送刪除操作命令
sm <- commandData{action: remove, key: key}
}
// 定義查找的返回結(jié)果
type findResult struct {
value interface{} // 查找的值
found bool // 查找是否存在
}
// 安全映射的查找操作
func (sm safeMap) Find(key string) (value interface{}, found bool) {
// 響應(yīng)通道,命令執(zhí)行完成后會(huì)在此通道接收結(jié)果數(shù)據(jù)
reply := make(chan interface{})
sm <- commandData{action: find, key: key, result: reply}
result := (<-reply).(findResult)
return result.value, result.found
}
// 安全映射的長(zhǎng)度計(jì)算操作
func (sm safeMap) Len() int {
// 響應(yīng)通道,命令執(zhí)行完成后會(huì)在此通道接收結(jié)果數(shù)據(jù)
reply := make(chan interface{})
sm <- commandData{action: length, result: reply}
return (<-reply).(int)
}
// 安全映射的更新操作
func (sm safeMap) Update(key string, updater UpdateFunc) {
sm <- commandData{action: update, key: key, updater: updater}
}
// 安全映射的關(guān)閉操作贝奇,針對(duì)通道,返回map的底層數(shù)據(jù)
func (sm safeMap) Close() map[string]interface{} {
reply := make(chan map[string]interface{})
sm <- commandData{action: end, data: reply}
return <-reply
}
// 運(yùn)行于協(xié)程中的safeMap,其創(chuàng)建一個(gè)底層映射來(lái)保存實(shí)際數(shù)據(jù)
func (sm safeMap) run() {
store := make(map[string]interface{})
// 不斷從safeMap通道接收操作命令
for command := range sm {
switch command.action {
case insert: // 新增
store[command.key] = command.value
case remove: // 刪除
delete(store, command.key)
case find: // 查找
value, found := store[command.key]
command.result <- findResult{value, found}
case length: // 計(jì)算長(zhǎng)度
command.result <- len(store)
case update: // 更新鍵值
value, found := store[command.key]
// 執(zhí)行更新函數(shù)靠胜,注意掉瞳,更新函數(shù)不能調(diào)用sm的其他方法,否則會(huì)導(dǎo)致死鎖
store[command.key] = command.updater(value, found)
case end: // 關(guān)閉操作
close(sm)
command.data <- store
}
}
}
// safeMap的工廠方法
func NewSafeMap() SafeMap {
sm := make(safeMap)
go sm.run()
return sm
}
并發(fā)測(cè)試:
func TestSafeMap(t *testing.T) {
smap := NewSafeMap()
wg := sync.WaitGroup{}
// 并發(fā)新增
for i := 0; i < 100; i++ {
key := fmt.Sprintf("key%d", i)
wg.Add(1)
go func(sm SafeMap, k, v string) {
sm.Insert(k, v)
wg.Done()
}(smap, key, key)
}
// 并發(fā)查找
go func() {
for i := 0; i < 40; i += 2 {
key := fmt.Sprintf("key%d", i)
wg.Add(1)
go func(sm SafeMap, k string) {
value, found := sm.Find(key)
fmt.Printf("Find Key:%s,result:%s,%v\n", k, value, found)
wg.Done()
}(smap, key)
}
}()
// 并發(fā)更新
go func() {
for i := 0; i < 60; i += 3 {
key := fmt.Sprintf("key%d", i)
wg.Add(1)
go func(sm SafeMap, k string) {
sm.Update(key, func(oldValue interface{}, found bool) (newValue interface{}) {
if found {
newValue = oldValue.(string) + "update"
return
}
return oldValue
})
wg.Done()
}(smap, key)
}
}()
// 并發(fā)刪除
go func() {
for i := 0; i < 80; i += 4 {
key := fmt.Sprintf("key%d", i)
wg.Add(1)
go func(sm SafeMap, k string) {
sm.Delete(k)
wg.Done()
}(smap, key)
}
}()
// 并發(fā)計(jì)算長(zhǎng)度
wg.Add(1)
go func(sm SafeMap) {
l := sm.Len()
fmt.Println("SafeMap Len:", l)
wg.Done()
}(smap)
wg.Wait()
// 關(guān)閉管到輸出結(jié)果
sm := smap.Close()
fmt.Println("Print SafeMap:")
for k, v := range sm {
fmt.Printf("key:%s,value:%v\n", k, v)
}
}
并發(fā)安全切片
與并發(fā)安全映射類似,以下實(shí)現(xiàn)的安全切片數(shù)據(jù)結(jié)構(gòu)都是基于在協(xié)程通信中通道的同步屬性:
package ss
// 安全切片接口
type SafeSlice interface {
Append(interface{}) // 添加指定元素
At(int) interface{} // 返回指定位置的元素
Close() []interface{} // 關(guān)閉通道并返回切片
Delete(int) // 刪除指定位置元素
Len() int // 返回元素個(gè)數(shù)
Update(int, UpdateFunc) // 更新指定位置元素
}
// 更新函數(shù)
type UpdateFunc func(int, interface{}) interface{}
// 命令數(shù)據(jù)結(jié)構(gòu)
type commandData struct {
action commandAction
index int
value interface{}
result chan<- interface{}
data chan<- []interface{}
updater UpdateFunc
}
// 命令類型值
type commandAction int
const (
insert commandAction = iota
at
remove
update
size
end
)
// 安全切片實(shí)現(xiàn),由通道作為外部輸入财破,內(nèi)部運(yùn)行于一個(gè)協(xié)程中缀遍,協(xié)程函數(shù)維護(hù)一個(gè)底層切片
type safeSlice chan commandData
func (ss safeSlice) Append(value interface{}) {
ss <- commandData{action: insert, value: value}
}
func (ss safeSlice) At(index int) interface{} {
reply := make(chan interface{})
ss <- commandData{action: at, index: index, result: reply}
return <-reply
}
func (ss safeSlice) Close() []interface{} {
reply := make(chan []interface{})
ss <- commandData{action: end, data: reply}
return <-reply
}
func (ss safeSlice) Delete(index int) {
ss <- commandData{action: remove, index: index}
}
func (ss safeSlice) Len() int {
reply := make(chan interface{})
ss <- commandData{action: size, result: reply}
return (<-reply).(int)
}
func (ss safeSlice) Update(index int, updater UpdateFunc) {
ss <- commandData{action: update, index: index, updater: updater}
}
// 協(xié)程運(yùn)行
func (ss safeSlice) run(length, cap int) {
slice := make([]interface{}, length, cap)
for command := range ss {
switch command.action {
case insert:
slice = append(slice, command.value)
case at:
var value interface{}
if len(slice)-1 > command.index {
value = slice[command.index]
}
command.result <- value
case remove:
if len(slice)-1 > command.index {
slice[command.index] = nil
}
case update:
if len(slice)-1 > command.index {
oldValue := slice[command.index]
slice[command.index] = command.updater(command.index, oldValue)
}
case size:
command.result <- len(slice)
case end:
close(ss)
command.data <- slice
}
}
}
func NewSafeSlice(length, cap int) SafeSlice {
ss := make(safeSlice)
ss.run(length, cap)
return ss
}
小結(jié)
可以看到,使用一個(gè)通道解決并發(fā)安全的數(shù)據(jù)結(jié)構(gòu)比一個(gè)普通的映射會(huì)消耗更大的內(nèi)存開銷彻采,每條命令都需要?jiǎng)?chuàng)建一個(gè)commandData,利用通道來(lái)達(dá)到多個(gè)協(xié)程串行訪問(wèn)一個(gè)SafeMap或SafeSlice的目的。對(duì)于引用類型或指針類型的并發(fā)安全解決方案有很多,最簡(jiǎn)單的就是同步鎖sync.Mutex损合,在協(xié)程里操作map時(shí)記得加解鎖便可;此外娘纷,你也可以創(chuàng)建一個(gè)包含同步鎖的數(shù)據(jù)結(jié)構(gòu)嫁审,當(dāng)然這只是寫法不同而已。
我們?nèi)粘J褂脮r(shí)最好避免傳遞引用類型或指針類型到不同協(xié)程并發(fā)執(zhí)行赖晶,如需有這個(gè)需求律适,最好每個(gè)協(xié)程各維護(hù)一個(gè)map,然后在父協(xié)程合并即可遏插。但如果特殊場(chǎng)景你仍需要一個(gè)并發(fā)安全的映射捂贿,那么本文的并發(fā)安全映射結(jié)構(gòu)不失為一種解決方案。