并發(fā)模式
[TOC]
并發(fā)程序指同時進行多個任務的程序, Go程序一種支持并發(fā)的方式是通過goroutine和channel, 支持“順序通信進程”(communicating sequential processes)或被簡稱為CSP.
CSP是一種現代的并發(fā)編程模型,在這種編程模型中值會在不同的運行實例(goroutine)中傳遞,盡管大多數情況下仍然是被限制在單一實例中.
goroutines
- Golang中并發(fā)的執(zhí)行單元叫
goroutine
, 可類比為線程, 或協(xié)程. - 程序啟動時main函數在
main goroutine
中運行 -
go
語句創(chuàng)建新的goroutine, 多個goroutine并發(fā)執(zhí)行:
go f()
go func() {
}()
- 一個簡單的并發(fā)網絡模型:
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err) // e.g., connection aborted
continue
}
go handleConn(conn) // handle connections concurrently
}
func handleConn(c net.Conn) {
defer c.Close()
for {
_, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
if err != nil {
return // e.g., client disconnected
}
time.Sleep(1 * time.Second)
}
}
- Echo 服務器
func echo(c net.Conn, shout string, delay time.Duration) {
fmt.Fprintln(c, "\t", strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, "\t", shout)
time.Sleep(delay)
fmt.Fprintln(c, "\t", strings.ToLower(shout))
}
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
echo(c, input.Text(), 1*time.Second)
}
// NOTE: ignoring potential errors from input.Err()
c.Close()
}
// client
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
go mustCopy(os.Stdout, conn)
mustCopy(conn, os.Stdin)
}
Goroutines和線程
- 量的區(qū)別, 動態(tài)棧: os線程固定大小(一般2M), goroutines小(一般2k)且不固定, 動態(tài)伸縮,最大1G
- 調度, os沒幾毫秒由scheduler內核函數調度, goroutines由go自己大調度器調度,如m:n調度方法在n個線程調度m個goroutines, 且不需要進入內核
- GOMAXPROCS大變量決定調度到多少個os線程
channels
- channels是goroutines之間通信機制, 通過channel從一個goroutine向另一個發(fā)送消息
- chan定義某種類型的channel, 類比C/C++的一個支持多線程的泛型queue
- 和map類似,channel也對應一個make創(chuàng)建的底層數據結構的引用:
make(chan int)
- 和make,slice等引用類型一樣, channel的零值也是
nil
- 發(fā)送和接收兩個操作都使用<-運算符
- 支持close操作, 向close后的channel發(fā)送會panic, 但仍然可接收chan里的數據, 如沒有數據將產生一個零值
// int型chan, 無緩沖阻塞型, 會阻塞發(fā)送者, 直到另一個goroutine接收數據
ch:=make(chan int)
ch<-1 //發(fā)送
go func() {
fmt.Println(<-ch) //接收
}
無緩沖channel
- 基于無緩存Channels將導致兩個goroutine做同步操作, 因此也叫同步channel
- 當通過一個無緩存Channels發(fā)送數據時,接收者收到數據發(fā)生在喚醒發(fā)送者goroutine之前
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // NOTE: ignoring errors
log.Println("done")
done <- struct{}{} // signal the main goroutine
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done // wait for background goroutine to finish
}
串聯(lián)的Channels(Pipeline)
Channels也可以用于將多個goroutine連接在一起昧互,一個Channel的輸出作為下一個Channel的輸入。這種串聯(lián)的Channels就是所謂的管道(pipeline)
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()
// Squarer
go func() {
for x := range naturals {
squares <- x * x
}
close(squares)
}()
// Printer (in main goroutine)
for x := range squares {
fmt.Println(x)
}
}
單向channels
<-
在chan
左邊是一個只讀channel, 在右邊則只寫,形參傳參時可隱式轉換:
//改進版本pipeline示例
func counter(out chan<- int) {
for x := 0; x < 100; x++ {
out <- x
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}
帶緩沖channels
- 像一個可以容納指定個數元素的隊列, 在達到容量前寫入不會阻塞, 寫入讀取操作解耦
ch = make(chan string, 3)
Multiplexing with select
select多路復用,可以監(jiān)聽多個chan
- 當一個或多個case滿足條件(如chan可發(fā)送或接收消息, 類似于讀寫事件發(fā)生), 會隨機選擇其中一個分支執(zhí)行
- default是所有case都阻塞時執(zhí)行
- 每一個case代表一個通信操作(在channel上發(fā)送或接收)
- 無分支的
select{}
會永遠等待 - 因為對一個nil的channel發(fā)送和接收操作會永遠阻塞,select不會選擇到, 可以用nil來激活或者禁用case
select {
case <-ch1:
// ...
case x := <-ch2:
// ...use x...
case ch3 <- y:
// ...
default:
// ...
}
示例: 并發(fā)遍歷目錄
原始版本
// walkDir recursively walks the file tree rooted at dir
// and sends the size of each found file on fileSizes.
func walkDir(dir string, fileSizes chan<- int64) {
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}
-
ioutil.ReadDir()
返回os.FileInfo
(os.Stat()
也返回)類型的slice -
walkDir()
遞歸調調用 -
walkDir()
會向fileSizes這個channel發(fā)送一條消息告知文件大小
main包不限制并發(fā)的版本(可能產生大量goroutines)
-
注意
for ... select...
慣用方式,未使用range循環(huán),用channel接收的二值形式顯式判斷channel是否close - 不用標簽break(
break loop
)的話只會退出內層select
package main
import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
)
func main() {
// ...determine roots...
// Traverse each root of the file tree in parallel.
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
n.Add(1)
go walkDir(root, &n, fileSizes)
}
go func() {
n.Wait()
close(fileSizes)
}()
// ...select loop...
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes was closed
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
}
// walkDir修改為并發(fā)
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
for _, entry := range dirents(dir) {
if entry.IsDir() {
n.Add(1)
subdir := filepath.Join(dir, entry.Name())
go walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
通過token方式限制并發(fā)
// sema is a counting semaphore for limiting concurrency in dirents.
var sema = make(chan struct{}, 20)
// dirents returns the entries of directory dir.
func dirents(dir string) []os.FileInfo {
sema <- struct{}{} // acquire token
defer func() { <-sema }() // release token
// ...
并發(fā)的退出
Golang未提供一個goroutine終止另一個goroutine的方法, 一般可以通過close done channel的廣播方式退出goroutines
var done = make(chan struct{})
for {
select {
case <-done:
return
case v, ok:=<-workChan:
if !ok {
return
}
}
}
// Cancel traversal when input is detected.
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
close(done) // done
}()
基于共享變量的并發(fā)
goroutine與channel方式的并發(fā)免于處理許多細節(jié), 而基于共享數據的并發(fā)就不可避免地需要處理它們, 否則一些競爭條件導致程序產生非預期結果,如錯誤結果腕让、 死鎖(deadlock)裹纳、活鎖(livelock)和餓死(resource starvation),這就不是并發(fā)安全的程序了.
競爭條件
競爭條件指程序在多個goroutine交叉執(zhí)行時沒法給出確定預期的結果.數據競爭是一種特殊競爭條件, 產生于多個goroutines并發(fā)訪問共享數據,且至少有一個寫操作, 避免辦法:
- 不寫
- 避免多個goroutines訪問, 綁定到某個goroutine, 其他的都用channel, pipeline channel傳遞到下一個階段線性訪問這種叫串行綁定
- 互斥訪問, 做并發(fā)控制(加鎖等)
競爭條件檢測: 在go build,go run或者go test命令后加-race
sync.Mutex互斥鎖
類似于二元信號量:
var (
sema = make(chan struct{}, 1) // a binary semaphore guarding balance
balance int
)
func Deposit(amount int) {
sema <- struct{}{} // acquire token
balance = balance + amount
<-sema // release token
}
func Balance() int {
sema <- struct{}{} // acquire token
b := balance
<-sema // release token
return b
}
在Mutex中對應Lock,Unlock:
import "sync"
var (
mu sync.Mutex // guards balance
balance int
)
func Deposit(amount int) {
mu.Lock()
balance = balance + amount
mu.Unlock()
}
func Balance() int {
mu.Lock()
// defer mu.Unlock()
b := balance
mu.Unlock()
return b
}
- 調用
Lock()
方法來獲取一個互斥鎖 - 如果其它goroutine已經獲得了該鎖,Lock被阻塞直到其它goroutine調用了Unlock
- mutex會保護共享變量, 慣例來說挨务,被mutex所保護的變量是在mutex變量聲明之后立刻聲明
- Lock和Unlock之間的代碼段叫臨界區(qū)
- 用
defer
來調用Unlock
, panic也依然會執(zhí)行 - 不是可重入鎖(遞歸鎖), 無法對已鎖mutex再次上鎖, 一個通用的解決方案是將一個函數分離為多個函數(導出的加鎖調內部的, 內部的執(zhí)行實際動作不加鎖)
func Withdraw(amount int) bool {
mu.Lock()
defer mu.Unlock()
deposit(-amount)
if balance < 0 {
deposit(amount)
return false // insufficient funds
}
return true
}
func Deposit(amount int) {
mu.Lock()
defer mu.Unlock()
deposit(amount)
}
// This function requires that the lock be held.
func deposit(amount int) { balance += amount }
sync.RWMutex讀寫鎖
- 允許多個只讀操作并行執(zhí)行, 但寫操作完全互斥
- 或叫“多讀單寫”鎖(multiple readers, single writer lock)
- 讀鎖
RLock
, 寫鎖還是Lock
var mu sync.RWMutex
var balance int
func Balance() int {
mu.RLock() // readers lock
defer mu.RUnlock()
return balance
}
內存同步
寫入內存前可能cache, 如不互斥訪問, 并不能保證多個goroutines中語句執(zhí)行順序
sync.Once
初始化延遲執(zhí)行有加速啟動等好處, 但如果只能或只需執(zhí)行一次的init操作延遲到多個goroutines執(zhí)行, 那就需要保證只執(zhí)行一次, 這就是sync.Once的作用
var loadIconsOnce sync.Once
var icons map[string]image.Image
// Concurrency-safe.
func Icon(name string) image.Image {
loadIconsOnce.Do(func(){
// initializing
})
return icons[name]
}