介紹
當(dāng)我解決問題時(shí),尤其是新問題磨德,我不會(huì)一上來就想著能不能使用并發(fā)來處理缘回。我會(huì)首先想出一個(gè)順序執(zhí)行的解決方案。然后在可讀性和技術(shù)評(píng)審后典挑,我會(huì)開始考慮并發(fā)處理能不能讓問題得到更快的解決酥宴。有時(shí)很顯然并發(fā)是很合適的,但有時(shí)候并不那么明顯您觉。
第一篇文章拙寡,我講解了系統(tǒng)調(diào)度器的機(jī)制和原理,我相信這對(duì)寫一個(gè)多線程代碼是重要的琳水。第二篇文章我講解了 Go 語言調(diào)度器的機(jī)制肆糕,對(duì)如何 Go 語言寫出并發(fā)代碼是重要的。這篇文章炫刷,我會(huì)開始把操作系統(tǒng)和 Go 調(diào)度器統(tǒng)一起來擎宝,提供深入理解下什么并發(fā),什么不是浑玛。
這篇文章有兩個(gè)目的:
- 提供幾個(gè)在考慮你的服務(wù)是否適合用并發(fā)來解決時(shí)绍申,需要思考的關(guān)鍵點(diǎn)。
- 像你展示不同類型的工作顾彰,需要的工程決策也是不同的极阅。
什么是并發(fā)
并發(fā)意味著不按順序執(zhí)行。給定一組指令涨享,可以按順序執(zhí)行筋搏,也可以找一種方式不按順序執(zhí)行,但仍能得到同樣的結(jié)果厕隧。對(duì)于你眼前的問題奔脐,顯然亂序執(zhí)行能夠增加價(jià)值俄周。我所說的價(jià)值,意思是以復(fù)雜性為成本髓迎,而得到足夠的性能增益峦朗。關(guān)鍵還是要看你的問題,它可能無法或甚至無法進(jìn)行無序執(zhí)行排龄。
理解并發(fā)并不等于并行也是重要的波势。并行意味著 2 個(gè)或者 2 個(gè)以上的指令可以在同一時(shí)間一起執(zhí)行。這和并發(fā)的概念不同橄维。只有你的機(jī)器有至少 2 個(gè) hardware threads 才能使你的程序達(dá)到并行效果尺铣。
上圖中,你看到有兩個(gè)邏輯處理器(P)争舞,每個(gè)都分配了一個(gè)獨(dú)立的系統(tǒng)線程(M)凛忿,線程被關(guān)聯(lián)到了獨(dú)立的 Core 上。你可以看到 2 個(gè) Goroutine 正在并行執(zhí)行兑障,他們同時(shí)在不同的 Core 上執(zhí)行各自的指令侄非。對(duì)于每一個(gè)邏輯處理器蕉汪,3 個(gè) Groutine 正在輪流共享他們的系統(tǒng)線程流译。所有的 Goroutine 正在并發(fā)執(zhí)行,沒有按照特定的順序執(zhí)行他們的指令者疤。
這里有個(gè)難點(diǎn)福澡,就是有時(shí)候在沒有并行能力機(jī)器上使用并發(fā),實(shí)際上會(huì)降低你的性能驹马。更有趣的是革砸,有時(shí)候利用并行來達(dá)并發(fā)效果,并不會(huì)如你期望的那樣得到更好的性能糯累。
工作負(fù)荷
首先算利,你最好是先搞懂你的問題屬于哪種負(fù)荷類型。是 CPU 密集型的還是 IO 密集型的泳姐。之前的文章有描述效拭,這里不再重復(fù)卷仑。
CPU 密集型的工作你需要通過并行來達(dá)到并發(fā)火架。單個(gè) hardware thread 處理多個(gè) Goroutine 并不高效,因?yàn)檫@些 Goroutine 不會(huì)進(jìn)入阻塞狀態(tài)箩张。使用比機(jī)器 hardware thread 數(shù)量更多的 Goroutine 會(huì)減慢工作阎肝,因?yàn)榘?Goroutine 不斷從系統(tǒng)線程上調(diào)來調(diào)去也是有成本挤渔。上下文切換會(huì)觸發(fā) Stop The World(簡稱STW)事件,因?yàn)樵谇袚Q過程中你的工作不會(huì)被執(zhí)行风题。
對(duì)于 IO 密集型的工作判导,你不需要并行來保證并發(fā)嫉父。單個(gè) hardware thread 可以處理高效的處理多個(gè) Goroutine,因?yàn)檫@些 Goroutine 會(huì)自動(dòng)進(jìn)出于阻塞態(tài)眼刃。擁有比 hardware thread 更多的 Goroutine 可以加速工作的完成熔号,因?yàn)樽?Goroutine 從系統(tǒng)線程上調(diào)來調(diào)去不會(huì)觸發(fā) STW 事件。你的工作會(huì)自動(dòng)暫停鸟整,這允許其他不同的 Goroutine 使用同一個(gè) hardware thread 來更高效的完成工作引镊,而不是讓它處于空閑狀態(tài)。
你如何能知道篮条,一個(gè) hardware thread 上跑多少個(gè) Goroutine 才能得到最大程度的吞吐量呢弟头?太少的 Goroutine 導(dǎo)致你有太多的空閑資源,太多的 Goroutine 導(dǎo)致你有太多的延遲時(shí)間涉茧。這是您需要考慮的赴恨,但是超出了這篇文章的范圍。
是時(shí)候該看一些代碼了伴栓,可以鞏固一下你判斷工作適不適合并發(fā)伦连,是否需要并行的能力。
加法
我們不需要復(fù)雜的代碼來搞懂這個(gè)機(jī)制钳垮』蟠荆看下面的 add
函數(shù),功能就是對(duì)一組數(shù)字求和饺窿。
func add(numbers []int) int {
var v int
for _, n := range numbers {
v += n
}
return v
}
問題:這個(gè) add
函數(shù)可不可以亂序執(zhí)行歧焦?答案可以的。一個(gè)數(shù)字集合可以被拆成更小的一些集合肚医,這些小集合是可以并發(fā)處理的绢馍。一旦小集合求和完了,所有的結(jié)果再相加求和肠套,能得到同樣的答案舰涌。
但是,還有另外一個(gè)問題你稚。應(yīng)該把集合拆成多小瓷耙,才能讓速度最快呢?
為了回答這個(gè)問題你就必須知道 add
屬于哪種工作入宦。add
方法屬于 CPU 密集型哺徊,因?yàn)樗惴ň褪遣粩鄨?zhí)行數(shù)學(xué)運(yùn)算,不會(huì)導(dǎo)致 Goroutine 進(jìn)入阻塞態(tài)乾闰。這意味著每一個(gè) hardware thread 跑一個(gè) Goroutine 會(huì)讓速度最快落追。
下面有一個(gè)并發(fā)版本的 add
。
44 func addConcurrent(goroutines int, numbers []int) int {
45 var v int64
46 totalNumbers := len(numbers)
47 lastGoroutine := goroutines - 1
48 stride := totalNumbers / goroutines
49
50 var wg sync.WaitGroup
51 wg.Add(goroutines)
52
53 for g := 0; g < goroutines; g++ {
54 go func(g int) {
55 start := g * stride
56 end := start + stride
57 if g == lastGoroutine {
58 end = totalNumbers
59 }
60
61 var lv int
62 for _, n := range numbers[start:end] {
63 lv += n
64 }
65
66 atomic.AddInt64(&v, int64(lv))
67 wg.Done()
68 }(g)
69 }
70
71 wg.Wait()
72
73 return int(v)
74 }
上面代碼中涯肩,addConcurrent
函數(shù)就是并發(fā)版本的 add
方法轿钠。解釋下這里面比較重要的幾行代碼巢钓。
48行:每個(gè) Goroutine 獲得一個(gè)獨(dú)立的但是更小的數(shù)字集合進(jìn)行相加。每個(gè)集合的大小是總集合的大小除以 Goroutine 的數(shù)量疗垛。
53行: 一堆 Goroutine 開始執(zhí)行加法運(yùn)算症汹。
57-59行:最后一個(gè) Goroutine 要把剩下的所有數(shù)字相加,這有可能使得它的集合要比其他集合大贷腕。
66行: 將小集合的求和結(jié)果背镇,加在一起得到最終求和結(jié)果。
并發(fā)版本的實(shí)現(xiàn)泽裳,要比順序版本的更復(fù)雜瞒斩,但這到底值不值呢?最好的辦法就是做壓測涮总。壓測時(shí)我使用一千萬數(shù)字的集合胸囱,并關(guān)閉掉 GC。
func BenchmarkSequential(b *testing.B) {
for i := 0; i < b.N; i++ {
add(numbers)
}
}
func BenchmarkConcurrent(b *testing.B) {
for i := 0; i < b.N; i++ {
addConcurrent(runtime.NumCPU(), numbers)
}
}
上面代碼就是壓測函數(shù)瀑梗。這里我只分配一個(gè) CPU Core 給程序烹笔。順序版本的使用 1 個(gè) Goroutine,并發(fā)版本的使用 runtie.NumCPU()
個(gè)(也就是 8 個(gè))Goroutine抛丽。這種情況下并發(fā)版本因?yàn)闊o法使用并行來進(jìn)行并發(fā)谤职。
10 Million Numbers using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITHOUT Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
BenchmarkSequential 1000 5720764 ns/op : ~10% Faster
BenchmarkConcurrent 1000 6387344 ns/op
BenchmarkSequentialAgain 1000 5614666 ns/op : ~13% Faster
BenchmarkConcurrentAgain 1000 6482612 ns/op
上面的壓測結(jié)果顯示了,在只有一個(gè) CPU Core 的情況下铺纽,順序版本的要比并發(fā)版本的快 10%~13%柬帕。這與我估計(jì)的一樣哟忍,因?yàn)椴l(fā)版本的那幾個(gè) Goroutine 在同一個(gè) CPU Core 上不斷的做切換狡门。
下面我們用多核進(jìn)行在此測試。順序版本還是只使用 1 個(gè)核锅很,而并發(fā)版本使用 8 個(gè)核其馏。這種情況下,并發(fā)版本可以同步并行來并發(fā)爆安。
10 Million Numbers using 8 goroutines with 8 cores
2.9 GHz Intel 4 Core i7
Concurrency WITH Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 8 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/cpu-bound
BenchmarkSequential-8 1000 5910799 ns/op
BenchmarkConcurrent-8 2000 3362643 ns/op : ~43% Faster
BenchmarkSequentialAgain-8 1000 5933444 ns/op
BenchmarkConcurrentAgain-8 2000 3477253 ns/op : ~41% Faster
上面的壓測結(jié)果顯示了叛复,在多核的情況下并發(fā)版本要比順序版本快 41%~43%。這也是符合預(yù)期的扔仓,因?yàn)樗械?Goroutine 都是并行執(zhí)行的褐奥,8 個(gè) Goroutine 是同時(shí)一起工作的。
排序
我們一定要知道翘簇,不是所有的 CPU密集型工作都可以使用并發(fā)完成的撬码。尤其是在把工作拆分再把結(jié)果合并所帶來的開銷很大時(shí)。比如冒泡排序版保∥匦Γ看看下面冒泡排序的實(shí)現(xiàn)夫否。
01 package main
02
03 import "fmt"
04
05 func bubbleSort(numbers []int) {
06 n := len(numbers)
07 for i := 0; i < n; i++ {
08 if !sweep(numbers, i) {
09 return
10 }
11 }
12 }
13
14 func sweep(numbers []int, currentPass int) bool {
15 var idx int
16 idxNext := idx + 1
17 n := len(numbers)
18 var swap bool
19
20 for idxNext < (n - currentPass) {
21 a := numbers[idx]
22 b := numbers[idxNext]
23 if a > b {
24 numbers[idx] = b
25 numbers[idxNext] = a
26 swap = true
27 }
28 idx++
29 idxNext = idx + 1
30 }
31 return swap
32 }
33
34 func main() {
35 org := []int{1, 3, 2, 4, 8, 6, 7, 2, 3, 0}
36 fmt.Println(org)
37
38 bubbleSort(org)
39 fmt.Println(org)
40 }
冒泡排序通過將數(shù)組中數(shù)字交換位置來達(dá)到排序的效果。在某些情況中叫胁,有些數(shù)字可能要交換好幾次凰慈。
問題:bubboSort
函數(shù)是否可以亂序執(zhí)行?答案是不可以驼鹅。這組數(shù)字是可以拆成一堆小數(shù)組并并發(fā)的執(zhí)行排序微谓。但是,當(dāng)所有并發(fā)工作完成后输钩,沒有一個(gè)高效的方法將這些小結(jié)果合并成最終的結(jié)果堰酿。我們來看一下并發(fā)版本的冒泡排序。
01 func bubbleSortConcurrent(goroutines int, numbers []int) {
02 totalNumbers := len(numbers)
03 lastGoroutine := goroutines - 1
04 stride := totalNumbers / goroutines
05
06 var wg sync.WaitGroup
07 wg.Add(goroutines)
08
09 for g := 0; g < goroutines; g++ {
10 go func(g int) {
11 start := g * stride
12 end := start + stride
13 if g == lastGoroutine {
14 end = totalNumbers
15 }
16
17 bubbleSort(numbers[start:end])
18 wg.Done()
19 }(g)
20 }
21
22 wg.Wait()
23
24 // Ugh, we have to sort the entire list again.
25 bubbleSort(numbers)
26 }
bubbleSortConcurrent
函數(shù)是 bubbleSort
的并發(fā)實(shí)現(xiàn)张足。它使用多個(gè) Goroutine 并發(fā)的對(duì)數(shù)組自己進(jìn)行排序触创。但是,你最后在結(jié)果合并時(shí)为牍,相當(dāng)于把整個(gè)數(shù)組又重新排了一遍哼绑。
讀者可以思考下歸并排序適不適合并發(fā),最好親自動(dòng)手測試一下
讀文件
上面舉了 2 個(gè) CPU 密集型例子碉咆,但是 IO 密集型的呢抖韩?在 Goroutine 自然的不斷頻繁的進(jìn)出阻塞態(tài)時(shí),情況有什么不同疫铜?我們看一個(gè) IO 密集的例子茂浮,就是讀取一些文件然后執(zhí)行查找操作。
第一個(gè)版本是順序方式實(shí)現(xiàn)壳咕,函數(shù)名是 find
42 func find(topic string, docs []string) int {
43 var found int
44 for _, doc := range docs {
45 items, err := read(doc)
46 if err != nil {
47 continue
48 }
49 for _, item := range items {
50 if strings.Contains(item.Description, topic) {
51 found++
52 }
53 }
54 }
55 return found
56 }
上面代碼就是 find
方法的實(shí)現(xiàn)席揽。功能就是從一組字符串中找到
下面是 read
函數(shù)的實(shí)現(xiàn)
33 func read(doc string) ([]item, error) {
34 time.Sleep(time.Millisecond) // Simulate blocking disk read.
35 var d document
36 if err := xml.Unmarshal([]byte(file), &d); err != nil {
37 return nil, err
38 }
39 return d.Channel.Items, nil
40 }
read
函數(shù)在使用了 time.Sleep
把自己掛起 1 毫秒。這個(gè)主要是為了模擬 IO 操作延遲谓厘。因?yàn)槟銓?shí)際去讀文件幌羞,Goroutine 也是一樣會(huì)被掛起一段時(shí)間。這 1 毫秒的延遲對(duì)于后面的壓測結(jié)果至關(guān)重要竟稳。
下面是并發(fā)版本的實(shí)現(xiàn)属桦。
58 func findConcurrent(goroutines int, topic string, docs []string) int {
59 var found int64
60
61 ch := make(chan string, len(docs))
62 for _, doc := range docs {
63 ch <- doc
64 }
65 close(ch)
66
67 var wg sync.WaitGroup
68 wg.Add(goroutines)
69
70 for g := 0; g < goroutines; g++ {
71 go func() {
72 var lFound int64
73 for doc := range ch {
74 items, err := read(doc)
75 if err != nil {
76 continue
77 }
78 for _, item := range items {
79 if strings.Contains(item.Description, topic) {
80 lFound++
81 }
82 }
83 }
84 atomic.AddInt64(&found, lFound)
85 wg.Done()
86 }()
87 }
88
89 wg.Wait()
90
91 return int(found)
92 }
上面代碼中 findConcurrent
函數(shù)就是 find
函數(shù)的并發(fā)實(shí)現(xiàn)版本。并發(fā)版本中使用適量的 Goroutine 完成不定數(shù)量的文檔查詢他爸。代碼太多聂宾,這里只解釋幾個(gè)重要的地方。
61-64行:創(chuàng)建了一個(gè) Channel诊笤,用來發(fā)送文檔系谐。
65行: 關(guān)閉了 channel,這樣當(dāng)所有文檔都處理完了后盏混,所有的 Goroutine 就都自動(dòng)退出了蔚鸥。
70行: Goroutine 被創(chuàng)建
73-83行:每個(gè) Goroutine 從 Channel 中獲取文檔惜论,把文檔讀到內(nèi)存并查找 topic。當(dāng)找到了止喷,將本地變量 lFound
加一馆类。
84行:每個(gè) Goroutine 都將自己查找到的文檔個(gè)數(shù),加到全局變量found
上弹谁。
下面我使用一千個(gè)文檔進(jìn)行壓測乾巧,并關(guān)閉 GC。
func BenchmarkSequential(b *testing.B) {
for i := 0; i < b.N; i++ {
find("test", docs)
}
}
func BenchmarkConcurrent(b *testing.B) {
for i := 0; i < b.N; i++ {
findConcurrent(runtime.NumCPU(), "test", docs)
}
}
上面是壓測代碼预愤。下面是僅使用 1 個(gè) CPU Core 的壓測結(jié)果沟于。此時(shí)并發(fā)版本也只能使用一個(gè) CPU Core 來執(zhí)行 8 個(gè) Goroutine。
10 Thousand Documents using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITHOUT Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -cpu 1 -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
BenchmarkSequential 3 1483458120 ns/op
BenchmarkConcurrent 20 188941855 ns/op : ~87% Faster
BenchmarkSequentialAgain 2 1502682536 ns/op
BenchmarkConcurrentAgain 20 184037843 ns/op : ~88% Faster
發(fā)現(xiàn)在單核的情況下植康。并發(fā)版本要比順序版本快 87%~88%旷太。這和我預(yù)期一致。因?yàn)樗械?Goroutine 共用一個(gè) CPU Core销睁,而 Goroutine 在調(diào)用 read
時(shí)候會(huì)自動(dòng)進(jìn)入阻塞態(tài)供璧,這時(shí)會(huì)將 CPU Core 出讓給其他 Goroutine 使用,這使得這個(gè) CPU Core 更加繁忙冻记。
下面看下使用多核進(jìn)行壓測睡毒。
10 Thousand Documents using 8 goroutines with 1 core
2.9 GHz Intel 4 Core i7
Concurrency WITH Parallelism
-----------------------------------------------------------------------------
$ GOGC=off go test -run none -bench . -benchtime 3s
goos: darwin
goarch: amd64
pkg: github.com/ardanlabs/gotraining/topics/go/testing/benchmarks/io-bound
BenchmarkSequential-8 3 1490947198 ns/op
BenchmarkConcurrent-8 20 187382200 ns/op : ~88% Faster
BenchmarkSequentialAgain-8 3 1416126029 ns/op
BenchmarkConcurrentAgain-8 20 185965460 ns/op : ~87% Faster
這個(gè)壓測結(jié)果說明,更多的 CPU Core 并不會(huì)對(duì)性能有多大的提升冗栗。
結(jié)論
這篇文章的目標(biāo)就是用具體的例子像你說明演顾,你一定要考慮你的場景到底適不適合并發(fā)。這樣才能做出更好的工程決策隅居。
你現(xiàn)在清晰的看到了钠至,對(duì)于 IO 密集型的工作,并行對(duì)提升性能沒有多大的幫助军浆,這與 CPU 密集型的正好相反棕洋。但是并不是所有 CPU 密集型的工作都適合并發(fā)的,比如冒泡排序乒融。搞明白你所應(yīng)對(duì)的場景的特點(diǎn)非常重要。