Go 并發(fā)編程:利用通道創(chuàng)建并發(fā)安全的數(shù)據(jù)結(jié)構(gòu)

利用通道創(chuàng)建并發(fā)安全的映射或切片

創(chuàng)建一個(gè)并發(fā)安全的映射或切片恤浪,不需要使用鎖或者其他底層原語(yǔ)

我們之前講過(guò)值類型的數(shù)據(jù)在函數(shù)參數(shù)傳遞時(shí)是拷貝傳遞,所以沒有并發(fā)安全問(wèn)題肴楷,我們并不用擔(dān)心傳遞的值被多個(gè)協(xié)程操作的后果水由。然而當(dāng)我們使用切片、映射等引用類型參數(shù)赛蔫,甚至指針等參數(shù)時(shí)砂客,多個(gè)并發(fā)協(xié)程對(duì)該參數(shù)的操作會(huì)引起混亂,這樣的參數(shù)傳遞是數(shù)據(jù)不安全的呵恢。

我們知道對(duì)于引用或指針在協(xié)程內(nèi)的操作鞠值,如果它同時(shí)被多個(gè)協(xié)程使用,一般我們需要加同步鎖渗钉,以保證數(shù)據(jù)安全彤恶,但是加鎖會(huì)損失一些計(jì)算資源,如果鎖過(guò)多的話會(huì)影響整體效率鳄橘。

有沒有不加鎖的方式來(lái)保證引用類型或指針類型的并發(fā)安全的實(shí)現(xiàn)方案呢声离?這就要利用通道了,我們知道瘫怜,在Go中术徊,通道通信是并發(fā)安全的。下面以映射Map和切片Slice為例鲸湃,我們來(lái)創(chuàng)建并發(fā)安全的數(shù)據(jù)結(jié)構(gòu)赠涮。

并發(fā)安全映射

安全映射的實(shí)現(xiàn)其實(shí)就是在一個(gè)協(xié)程里執(zhí)行一個(gè)內(nèi)部方法以操作一個(gè)普通的map,外界只能通過(guò)通道來(lái)操作這個(gè)內(nèi)部映射暗挑,這樣就能保證對(duì)這個(gè)映射的所有訪問(wèn)都是串行的笋除。這種方法運(yùn)行著一個(gè)無(wú)限循環(huán),阻塞等待一個(gè)輸入通道的命令(即insert炸裆、remove等)垃它。

以下為具體實(shí)現(xiàn):

// 定義一個(gè)并發(fā)安全的映射,此處聲明該安全映射的接口
type SafeMap interface {
    Insert(string, interface{})      // 插入鍵值
    Delete(string)                   // 刪除鍵
    Find(string) (interface{}, bool) // 查找鍵值
    Len() int                        // map元素長(zhǎng)度
    Update(string, UpdateFunc)       // 更新鍵值
    Close() map[string]interface{}   // 關(guān)閉map,實(shí)質(zhì)關(guān)閉通道
}

type UpdateFunc func(interface{}, bool) interface{}

// 操作安全映射的命令數(shù)據(jù)結(jié)構(gòu)
type commandData struct {
    action  commandAction                 // 操作類型
    key     string                        // 操作的鍵
    value   interface{}                   // 操作的值
    result  chan<- interface{}            // 操作的結(jié)果嗤瞎,用于查找結(jié)果返回
    data    chan<- map[string]interface{} // map存儲(chǔ)的數(shù)據(jù),用于關(guān)閉通道時(shí)返回map數(shù)據(jù)
    updater UpdateFunc                    // 更新函數(shù)听系,用于更新時(shí)設(shè)置更新函數(shù)
}

// 定義安全映射的操作項(xiàng)
type commandAction int

const (
    remove commandAction = iota
    end
    find
    insert
    length
    update
)

// safeMap的實(shí)現(xiàn)基于一個(gè)可發(fā)送和接收的commandData類型值的通道
type safeMap chan commandData

// 安全映射的新增操作
func (sm safeMap) Insert(key string, value interface{}) {
    // 向通道發(fā)送新增操作命令
    sm <- commandData{action: insert, key: key, value: value}
}

// 安全映射的刪除操作
func (sm safeMap) Delete(key string) {
    // 向通道發(fā)送刪除操作命令
    sm <- commandData{action: remove, key: key}
}

// 定義查找的返回結(jié)果
type findResult struct {
    value interface{} // 查找的值
    found bool        // 查找是否存在
}

// 安全映射的查找操作
func (sm safeMap) Find(key string) (value interface{}, found bool) {
    // 響應(yīng)通道,命令執(zhí)行完成后會(huì)在此通道接收結(jié)果數(shù)據(jù)
    reply := make(chan interface{})
    sm <- commandData{action: find, key: key, result: reply}
    result := (<-reply).(findResult)
    return result.value, result.found
}

// 安全映射的長(zhǎng)度計(jì)算操作
func (sm safeMap) Len() int {
    // 響應(yīng)通道,命令執(zhí)行完成后會(huì)在此通道接收結(jié)果數(shù)據(jù)
    reply := make(chan interface{})
    sm <- commandData{action: length, result: reply}
    return (<-reply).(int)
}

// 安全映射的更新操作
func (sm safeMap) Update(key string, updater UpdateFunc) {
    sm <- commandData{action: update, key: key, updater: updater}
}

// 安全映射的關(guān)閉操作贝奇,針對(duì)通道,返回map的底層數(shù)據(jù)
func (sm safeMap) Close() map[string]interface{} {
    reply := make(chan map[string]interface{})
    sm <- commandData{action: end, data: reply}
    return <-reply
}

// 運(yùn)行于協(xié)程中的safeMap,其創(chuàng)建一個(gè)底層映射來(lái)保存實(shí)際數(shù)據(jù)
func (sm safeMap) run() {
    store := make(map[string]interface{})
    // 不斷從safeMap通道接收操作命令
    for command := range sm {
        switch command.action {
        case insert: // 新增
            store[command.key] = command.value
        case remove: // 刪除
            delete(store, command.key)
        case find: // 查找
            value, found := store[command.key]
            command.result <- findResult{value, found}
        case length: // 計(jì)算長(zhǎng)度
            command.result <- len(store)
        case update: // 更新鍵值
            value, found := store[command.key]
            // 執(zhí)行更新函數(shù)靠胜,注意掉瞳,更新函數(shù)不能調(diào)用sm的其他方法,否則會(huì)導(dǎo)致死鎖
            store[command.key] = command.updater(value, found)
        case end: // 關(guān)閉操作
            close(sm)
            command.data <- store
        }
    }
}

// safeMap的工廠方法
func NewSafeMap() SafeMap {
    sm := make(safeMap)
    go sm.run()
    return sm
}

并發(fā)測(cè)試:


func TestSafeMap(t *testing.T) {
    smap := NewSafeMap()

    wg := sync.WaitGroup{}

    // 并發(fā)新增
    for i := 0; i < 100; i++ {
        key := fmt.Sprintf("key%d", i)
        wg.Add(1)
        go func(sm SafeMap, k, v string) {
            sm.Insert(k, v)
            wg.Done()
        }(smap, key, key)
    }

    // 并發(fā)查找
    go func() {
        for i := 0; i < 40; i += 2 {
            key := fmt.Sprintf("key%d", i)
            wg.Add(1)
            go func(sm SafeMap, k string) {
                value, found := sm.Find(key)
                fmt.Printf("Find Key:%s,result:%s,%v\n", k, value, found)
                wg.Done()
            }(smap, key)
        }
    }()

    // 并發(fā)更新
    go func() {
        for i := 0; i < 60; i += 3 {
            key := fmt.Sprintf("key%d", i)
            wg.Add(1)
            go func(sm SafeMap, k string) {
                sm.Update(key, func(oldValue interface{}, found bool) (newValue interface{}) {
                    if found {
                        newValue = oldValue.(string) + "update"
                        return
                    }
                    return oldValue
                })
                wg.Done()
            }(smap, key)
        }
    }()

    // 并發(fā)刪除
    go func() {
        for i := 0; i < 80; i += 4 {
            key := fmt.Sprintf("key%d", i)
            wg.Add(1)
            go func(sm SafeMap, k string) {
                sm.Delete(k)
                wg.Done()
            }(smap, key)
        }
    }()

    // 并發(fā)計(jì)算長(zhǎng)度
    wg.Add(1)
    go func(sm SafeMap) {
        l := sm.Len()
        fmt.Println("SafeMap Len:", l)
        wg.Done()
    }(smap)

    wg.Wait()

    // 關(guān)閉管到輸出結(jié)果
    sm := smap.Close()
    fmt.Println("Print SafeMap:")
    for k, v := range sm {
        fmt.Printf("key:%s,value:%v\n", k, v)

    }

}

并發(fā)安全切片

與并發(fā)安全映射類似,以下實(shí)現(xiàn)的安全切片數(shù)據(jù)結(jié)構(gòu)都是基于在協(xié)程通信中通道的同步屬性:

package ss

// 安全切片接口
type SafeSlice interface {
    Append(interface{})     // 添加指定元素
    At(int) interface{}     // 返回指定位置的元素
    Close() []interface{}   // 關(guān)閉通道并返回切片
    Delete(int)             // 刪除指定位置元素
    Len() int               // 返回元素個(gè)數(shù)
    Update(int, UpdateFunc) // 更新指定位置元素
}

// 更新函數(shù)
type UpdateFunc func(int, interface{}) interface{}

// 命令數(shù)據(jù)結(jié)構(gòu)
type commandData struct {
    action  commandAction
    index   int
    value   interface{}
    result  chan<- interface{}
    data    chan<- []interface{}
    updater UpdateFunc
}

// 命令類型值
type commandAction int

const (
    insert commandAction = iota
    at
    remove
    update
    size
    end
)

// 安全切片實(shí)現(xiàn),由通道作為外部輸入财破,內(nèi)部運(yùn)行于一個(gè)協(xié)程中缀遍,協(xié)程函數(shù)維護(hù)一個(gè)底層切片
type safeSlice chan commandData

func (ss safeSlice) Append(value interface{}) {
    ss <- commandData{action: insert, value: value}
}

func (ss safeSlice) At(index int) interface{} {
    reply := make(chan interface{})
    ss <- commandData{action: at, index: index, result: reply}
    return <-reply
}

func (ss safeSlice) Close() []interface{} {
    reply := make(chan []interface{})
    ss <- commandData{action: end, data: reply}
    return <-reply
}

func (ss safeSlice) Delete(index int) {
    ss <- commandData{action: remove, index: index}
}

func (ss safeSlice) Len() int {
    reply := make(chan interface{})
    ss <- commandData{action: size, result: reply}
    return (<-reply).(int)
}

func (ss safeSlice) Update(index int, updater UpdateFunc) {
    ss <- commandData{action: update, index: index, updater: updater}
}

// 協(xié)程運(yùn)行
func (ss safeSlice) run(length, cap int) {
    slice := make([]interface{}, length, cap)
    for command := range ss {
        switch command.action {
        case insert:
            slice = append(slice, command.value)
        case at:
            var value interface{}
            if len(slice)-1 > command.index {
                value = slice[command.index]
            }
            command.result <- value

        case remove:
            if len(slice)-1 > command.index {
                slice[command.index] = nil
            }

        case update:
            if len(slice)-1 > command.index {
                oldValue := slice[command.index]
                slice[command.index] = command.updater(command.index, oldValue)
            }
        case size:
            command.result <- len(slice)
        case end:
            close(ss)
            command.data <- slice
        }
    }

}

func NewSafeSlice(length, cap int) SafeSlice {
    ss := make(safeSlice)
    ss.run(length, cap)
    return ss
}

小結(jié)

可以看到,使用一個(gè)通道解決并發(fā)安全的數(shù)據(jù)結(jié)構(gòu)比一個(gè)普通的映射會(huì)消耗更大的內(nèi)存開銷彻采,每條命令都需要?jiǎng)?chuàng)建一個(gè)commandData,利用通道來(lái)達(dá)到多個(gè)協(xié)程串行訪問(wèn)一個(gè)SafeMap或SafeSlice的目的。對(duì)于引用類型或指針類型的并發(fā)安全解決方案有很多,最簡(jiǎn)單的就是同步鎖sync.Mutex损合,在協(xié)程里操作map時(shí)記得加解鎖便可;此外娘纷,你也可以創(chuàng)建一個(gè)包含同步鎖的數(shù)據(jù)結(jié)構(gòu)嫁审,當(dāng)然這只是寫法不同而已。

我們?nèi)粘J褂脮r(shí)最好避免傳遞引用類型或指針類型到不同協(xié)程并發(fā)執(zhí)行赖晶,如需有這個(gè)需求律适,最好每個(gè)協(xié)程各維護(hù)一個(gè)map,然后在父協(xié)程合并即可遏插。但如果特殊場(chǎng)景你仍需要一個(gè)并發(fā)安全的映射捂贿,那么本文的并發(fā)安全映射結(jié)構(gòu)不失為一種解決方案。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末涩堤,一起剝皮案震驚了整個(gè)濱河市眷蜓,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌胎围,老刑警劉巖吁系,帶你破解...
    沈念sama閱讀 219,490評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異白魂,居然都是意外死亡汽纤,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門福荸,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)蕴坪,“玉大人,你說(shuō)我怎么就攤上這事”炒” “怎么了呆瞻?”我有些...
    開封第一講書人閱讀 165,830評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)径玖。 經(jīng)常有香客問(wèn)我痴脾,道長(zhǎng),這世上最難降的妖魔是什么梳星? 我笑而不...
    開封第一講書人閱讀 58,957評(píng)論 1 295
  • 正文 為了忘掉前任赞赖,我火速辦了婚禮,結(jié)果婚禮上冤灾,老公的妹妹穿的比我還像新娘前域。我一直安慰自己,他們只是感情好韵吨,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,974評(píng)論 6 393
  • 文/花漫 我一把揭開白布匿垄。 她就那樣靜靜地躺著,像睡著了一般归粉。 火紅的嫁衣襯著肌膚如雪年堆。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,754評(píng)論 1 307
  • 那天盏浇,我揣著相機(jī)與錄音变丧,去河邊找鬼。 笑死绢掰,一個(gè)胖子當(dāng)著我的面吹牛痒蓬,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播滴劲,決...
    沈念sama閱讀 40,464評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼攻晒,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了班挖?” 一聲冷哼從身側(cè)響起鲁捏,我...
    開封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎萧芙,沒想到半個(gè)月后给梅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,847評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡双揪,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,995評(píng)論 3 338
  • 正文 我和宋清朗相戀三年动羽,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片渔期。...
    茶點(diǎn)故事閱讀 40,137評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡运吓,死狀恐怖渴邦,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情拘哨,我是刑警寧澤谋梭,帶...
    沈念sama閱讀 35,819評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站倦青,受9級(jí)特大地震影響章蚣,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜姨夹,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,482評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望矾策。 院中可真熱鬧磷账,春花似錦、人聲如沸贾虽。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)蓬豁。三九已至绰咽,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間地粪,已是汗流浹背取募。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蟆技,地道東北人玩敏。 一個(gè)月前我還...
    沈念sama閱讀 48,409評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像质礼,于是被迫代替她去往敵國(guó)和親旺聚。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,086評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容