Golang筆記-- 并發(fā)模式

并發(fā)模式

[TOC]

并發(fā)程序指同時進行多個任務的程序, Go程序一種支持并發(fā)的方式是通過goroutine和channel, 支持“順序通信進程”(communicating sequential processes)或被簡稱為CSP.

CSP是一種現代的并發(fā)編程模型,在這種編程模型中值會在不同的運行實例(goroutine)中傳遞,盡管大多數情況下仍然是被限制在單一實例中.

goroutines

  • Golang中并發(fā)的執(zhí)行單元叫goroutine, 可類比為線程, 或協(xié)程.
  • 程序啟動時main函數在main goroutine中運行
  • go語句創(chuàng)建新的goroutine, 多個goroutine并發(fā)執(zhí)行:
go f()
go func() {
   }()
  • 一個簡單的并發(fā)網絡模型:
for {
    conn, err := listener.Accept()
    if err != nil {
        log.Print(err) // e.g., connection aborted
        continue
    }
    go handleConn(conn) // handle connections concurrently
}

func handleConn(c net.Conn) {
    defer c.Close()
    for {
        _, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
        if err != nil {
            return // e.g., client disconnected
        }
        time.Sleep(1 * time.Second)
    }
}
  • Echo 服務器
func echo(c net.Conn, shout string, delay time.Duration) {
    fmt.Fprintln(c, "\t", strings.ToUpper(shout))
    time.Sleep(delay)
    fmt.Fprintln(c, "\t", shout)
    time.Sleep(delay)
    fmt.Fprintln(c, "\t", strings.ToLower(shout))
}

func handleConn(c net.Conn) {
    input := bufio.NewScanner(c)
    for input.Scan() {
        echo(c, input.Text(), 1*time.Second)
    }
    // NOTE: ignoring potential errors from input.Err()
    c.Close()
}
// client
func main() {
    conn, err := net.Dial("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()
    go mustCopy(os.Stdout, conn)
    mustCopy(conn, os.Stdin)
}

Goroutines和線程

  • 量的區(qū)別, 動態(tài)棧: os線程固定大小(一般2M), goroutines小(一般2k)且不固定, 動態(tài)伸縮,最大1G
  • 調度, os沒幾毫秒由scheduler內核函數調度, goroutines由go自己大調度器調度,如m:n調度方法在n個線程調度m個goroutines, 且不需要進入內核
  • GOMAXPROCS大變量決定調度到多少個os線程

channels

  • channels是goroutines之間通信機制, 通過channel從一個goroutine向另一個發(fā)送消息
  • chan定義某種類型的channel, 類比C/C++的一個支持多線程的泛型queue
  • 和map類似,channel也對應一個make創(chuàng)建的底層數據結構的引用: make(chan int)
  • 和make,slice等引用類型一樣, channel的零值也是nil
  • 發(fā)送和接收兩個操作都使用<-運算符
  • 支持close操作, 向close后的channel發(fā)送會panic, 但仍然可接收chan里的數據, 如沒有數據將產生一個零值
// int型chan, 無緩沖阻塞型, 會阻塞發(fā)送者, 直到另一個goroutine接收數據
ch:=make(chan int)
ch<-1 //發(fā)送
go func() {
    fmt.Println(<-ch) //接收
}

無緩沖channel

  • 基于無緩存Channels將導致兩個goroutine做同步操作, 因此也叫同步channel
  • 當通過一個無緩存Channels發(fā)送數據時,接收者收到數據發(fā)生在喚醒發(fā)送者goroutine之前
func main() {
    conn, err := net.Dial("tcp", "localhost:8000")
    if err != nil {
        log.Fatal(err)
    }
    done := make(chan struct{})
    go func() {
        io.Copy(os.Stdout, conn) // NOTE: ignoring errors
        log.Println("done")
        done <- struct{}{} // signal the main goroutine
    }()
    mustCopy(conn, os.Stdin)
    conn.Close()
    <-done // wait for background goroutine to finish
}

串聯(lián)的Channels(Pipeline)

Channels也可以用于將多個goroutine連接在一起昧互,一個Channel的輸出作為下一個Channel的輸入。這種串聯(lián)的Channels就是所謂的管道(pipeline)

func main() {
    naturals := make(chan int)
    squares := make(chan int)

    // Counter
    go func() {
        for x := 0; x < 100; x++ {
            naturals <- x
        }
        close(naturals)
    }()

    // Squarer
    go func() {
        for x := range naturals {
            squares <- x * x
        }
        close(squares)
    }()

    // Printer (in main goroutine)
    for x := range squares {
        fmt.Println(x)
    }
}

單向channels

<-chan左邊是一個只讀channel, 在右邊則只寫,形參傳參時可隱式轉換:

//改進版本pipeline示例
func counter(out chan<- int) {
    for x := 0; x < 100; x++ {
        out <- x
    }
    close(out)
}

func squarer(out chan<- int, in <-chan int) {
    for v := range in {
        out <- v * v
    }
    close(out)
}

func printer(in <-chan int) {
    for v := range in {
        fmt.Println(v)
    }
}

func main() {
    naturals := make(chan int)
    squares := make(chan int)
    go counter(naturals)
    go squarer(squares, naturals)
    printer(squares)
}

帶緩沖channels

  • 像一個可以容納指定個數元素的隊列, 在達到容量前寫入不會阻塞, 寫入讀取操作解耦
ch = make(chan string, 3)

Multiplexing with select

select多路復用,可以監(jiān)聽多個chan

  • 當一個或多個case滿足條件(如chan可發(fā)送或接收消息, 類似于讀寫事件發(fā)生), 會隨機選擇其中一個分支執(zhí)行
  • default是所有case都阻塞時執(zhí)行
  • 每一個case代表一個通信操作(在channel上發(fā)送或接收)
  • 無分支的select{}會永遠等待
  • 因為對一個nil的channel發(fā)送和接收操作會永遠阻塞,select不會選擇到, 可以用nil來激活或者禁用case
select {
case <-ch1:
    // ...
case x := <-ch2:
    // ...use x...
case ch3 <- y:
    // ...
default:
    // ...
}

示例: 并發(fā)遍歷目錄

原始版本

// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
func walkDir(dir string, fileSizes chan<- int64) {
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            subdir := filepath.Join(dir, entry.Name())
            walkDir(subdir, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
    entries, err := ioutil.ReadDir(dir)
    if err != nil {
        fmt.Fprintf(os.Stderr, "du1: %v\n", err)
        return nil
    }
    return entries
}
  • ioutil.ReadDir()返回os.FileInfo(os.Stat()也返回)類型的slice
  • walkDir()遞歸調調用
  • walkDir()會向fileSizes這個channel發(fā)送一條消息告知文件大小

main包不限制并發(fā)的版本(可能產生大量goroutines)

  • 注意for ... select...慣用方式,未使用range循環(huán),用channel接收的二值形式顯式判斷channel是否close
  • 不用標簽break(break loop)的話只會退出內層select
package main

import (
    "flag"
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
)

func main() {
    // ...determine roots...
    // Traverse each root of the file tree in parallel.
    fileSizes := make(chan int64)
    var n sync.WaitGroup
    for _, root := range roots {
        n.Add(1)
        go walkDir(root, &n, fileSizes)
    }
    go func() {
        n.Wait()
        close(fileSizes)
    }()
    // ...select loop...
    loop:
    for {
        select {
        case size, ok := <-fileSizes:
            if !ok {
                break loop // fileSizes was closed
            }
            nfiles++
            nbytes += size
        case <-tick:
            printDiskUsage(nfiles, nbytes)
        }
    }
}

// walkDir修改為并發(fā)
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
    defer n.Done()
    for _, entry := range dirents(dir) {
        if entry.IsDir() {
            n.Add(1)
            subdir := filepath.Join(dir, entry.Name())
            go walkDir(subdir, n, fileSizes)
        } else {
            fileSizes <- entry.Size()
        }
    }
}

通過token方式限制并發(fā)

// sema is a counting semaphore for limiting concurrency in dirents.
var sema = make(chan struct{}, 20)

// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
    sema <- struct{}{}        // acquire token
    defer func() { <-sema }() // release token
    // ...

并發(fā)的退出

Golang未提供一個goroutine終止另一個goroutine的方法, 一般可以通過close done channel的廣播方式退出goroutines

var done = make(chan struct{})

for {
    select {
    case <-done:
        return 
    case v, ok:=<-workChan:
        if !ok {
            return 
        }
    }
}

// Cancel traversal when input is detected.
go func() {
    os.Stdin.Read(make([]byte, 1)) // read a single byte
    close(done) // done
}()

基于共享變量的并發(fā)

goroutine與channel方式的并發(fā)免于處理許多細節(jié), 而基于共享數據的并發(fā)就不可避免地需要處理它們, 否則一些競爭條件導致程序產生非預期結果,如錯誤結果腕让、 死鎖(deadlock)裹纳、活鎖(livelock)和餓死(resource starvation),這就不是并發(fā)安全的程序了.

競爭條件

競爭條件指程序在多個goroutine交叉執(zhí)行時沒法給出確定預期的結果.數據競爭是一種特殊競爭條件, 產生于多個goroutines并發(fā)訪問共享數據,且至少有一個寫操作, 避免辦法:

  • 不寫
  • 避免多個goroutines訪問, 綁定到某個goroutine, 其他的都用channel, pipeline channel傳遞到下一個階段線性訪問這種叫串行綁定
  • 互斥訪問, 做并發(fā)控制(加鎖等)

競爭條件檢測: 在go build,go run或者go test命令后加-race

sync.Mutex互斥鎖

類似于二元信號量:

var (
    sema    = make(chan struct{}, 1) // a binary semaphore guarding balance
    balance int
)

func Deposit(amount int) {
    sema <- struct{}{} // acquire token
    balance = balance + amount
    <-sema // release token
}

func Balance() int {
    sema <- struct{}{} // acquire token
    b := balance
    <-sema // release token
    return b
}

在Mutex中對應Lock,Unlock:

import "sync"

var (
    mu      sync.Mutex // guards balance
    balance int
)

func Deposit(amount int) {
    mu.Lock()
    balance = balance + amount
    mu.Unlock()
}

func Balance() int {
    mu.Lock()
    // defer mu.Unlock()
    b := balance
    mu.Unlock() 
    return b
}
  • 調用Lock()方法來獲取一個互斥鎖
  • 如果其它goroutine已經獲得了該鎖,Lock被阻塞直到其它goroutine調用了Unlock
  • mutex會保護共享變量, 慣例來說挨务,被mutex所保護的變量是在mutex變量聲明之后立刻聲明
  • Lock和Unlock之間的代碼段叫臨界區(qū)
  • defer來調用Unlock, panic也依然會執(zhí)行
  • 不是可重入鎖(遞歸鎖), 無法對已鎖mutex再次上鎖, 一個通用的解決方案是將一個函數分離為多個函數(導出的加鎖調內部的, 內部的執(zhí)行實際動作不加鎖)
func Withdraw(amount int) bool {
    mu.Lock()
    defer mu.Unlock()
    deposit(-amount)
    if balance < 0 {
        deposit(amount)
        return false // insufficient funds
    }
    return true
}

func Deposit(amount int) {
    mu.Lock()
    defer mu.Unlock()
    deposit(amount)
}

// This function requires that the lock be held.
func deposit(amount int) { balance += amount }

sync.RWMutex讀寫鎖

  • 允許多個只讀操作并行執(zhí)行, 但寫操作完全互斥
  • 或叫“多讀單寫”鎖(multiple readers, single writer lock)
  • 讀鎖RLock, 寫鎖還是Lock
var mu sync.RWMutex
var balance int
func Balance() int {
    mu.RLock() // readers lock
    defer mu.RUnlock()
    return balance
}

內存同步

寫入內存前可能cache, 如不互斥訪問, 并不能保證多個goroutines中語句執(zhí)行順序

sync.Once

初始化延遲執(zhí)行有加速啟動等好處, 但如果只能或只需執(zhí)行一次的init操作延遲到多個goroutines執(zhí)行, 那就需要保證只執(zhí)行一次, 這就是sync.Once的作用

var loadIconsOnce sync.Once
var icons map[string]image.Image
// Concurrency-safe.
func Icon(name string) image.Image {
    loadIconsOnce.Do(func(){
    // initializing
    })
    return icons[name]
}

Reference

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末击你,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子谎柄,更是在濱河造成了極大的恐慌丁侄,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,378評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件朝巫,死亡現場離奇詭異鸿摇,居然都是意外死亡,警方通過查閱死者的電腦和手機劈猿,發(fā)現死者居然都...
    沈念sama閱讀 88,356評論 2 382
  • 文/潘曉璐 我一進店門拙吉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人揪荣,你說我怎么就攤上這事筷黔。” “怎么了仗颈?”我有些...
    開封第一講書人閱讀 152,702評論 0 342
  • 文/不壞的土叔 我叫張陵佛舱,是天一觀的道長。 經常有香客問我,道長请祖,這世上最難降的妖魔是什么订歪? 我笑而不...
    開封第一講書人閱讀 55,259評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮肆捕,結果婚禮上陌粹,老公的妹妹穿的比我還像新娘。我一直安慰自己福压,他們只是感情好掏秩,可當我...
    茶點故事閱讀 64,263評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著荆姆,像睡著了一般蒙幻。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上胆筒,一...
    開封第一講書人閱讀 49,036評論 1 285
  • 那天邮破,我揣著相機與錄音,去河邊找鬼仆救。 笑死抒和,一個胖子當著我的面吹牛,可吹牛的內容都是我干的彤蔽。 我是一名探鬼主播摧莽,決...
    沈念sama閱讀 38,349評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼顿痪!你這毒婦竟也來了镊辕?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 36,979評論 0 259
  • 序言:老撾萬榮一對情侶失蹤蚁袭,失蹤者是張志新(化名)和其女友劉穎征懈,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體揩悄,經...
    沈念sama閱讀 43,469評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡卖哎,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,938評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了删性。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片亏娜。...
    茶點故事閱讀 38,059評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖镇匀,靈堂內的尸體忽然破棺而出照藻,到底是詐尸還是另有隱情,我是刑警寧澤汗侵,帶...
    沈念sama閱讀 33,703評論 4 323
  • 正文 年R本政府宣布幸缕,位于F島的核電站群发,受9級特大地震影響,放射性物質發(fā)生泄漏发乔。R本人自食惡果不足惜熟妓,卻給世界環(huán)境...
    茶點故事閱讀 39,257評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望栏尚。 院中可真熱鬧起愈,春花似錦、人聲如沸译仗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,262評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽纵菌。三九已至阐污,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間咱圆,已是汗流浹背笛辟。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留序苏,地道東北人手幢。 一個月前我還...
    沈念sama閱讀 45,501評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像忱详,于是被迫代替她去往敵國和親围来。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,792評論 2 345

推薦閱讀更多精彩內容