1. 訪問(wèn)范圍約束
通過(guò)限制訪問(wèn)約束,減少不必要的同步帶來(lái)的性能損耗颖杏。例如纯陨,集中控制channel的寫入,對(duì)外提供channel的讀取留储,這樣本身便提供了對(duì)并發(fā)安全的支持翼抠。
// channel擁有者具有寫入權(quán)限
chanOwner := func() <-chan int {
results := make(chan int, 5) //1
go func() {
defer close(results)
for i := 0; i <= 5; i++ {
results <- i
}
}()
return results
}
// 消費(fèi)者只具備讀取權(quán)限
consumer := func(results <-chan int) { //3
for result := range results {
fmt.Printf("Received: %d\n", result)
}
fmt.Println("Done receiving!")
}
results := chanOwner() //2
consumer(results)
// 對(duì)于共享數(shù)據(jù)的不同數(shù)據(jù)段的并發(fā)訪問(wèn)同樣是安全的
printData := func(wg *sync.WaitGroup, data []byte) {
defer wg.Done()
var buff bytes.Buffer
for _, b := range data {
fmt.Fprintf(&buff, "%c", b)
}
fmt.Println(buff.String())
}
var wg sync.WaitGroup
wg.Add(2)
data := []byte("golang")
go printData(&wg, data[:3]) // 1
go printData(&wg, data[3:]) // 2
wg.Wait()
2. for-select
for-select循環(huán)模式如下所示:
for { // 無(wú)限循環(huán)或遍歷
select {
// 對(duì)通道進(jìn)行操作
}
}
常見(jiàn)的幾種for-select循環(huán)的用法
a. 在通道上發(fā)送迭代變量
for _, s := range []string{"a", "b", "c"} {
select {
case <-done:
return
case stringStream <- s: // slice數(shù)據(jù)循環(huán)迭代寫入channel
}
}
b. 無(wú)限循環(huán)等待停止
// 第一種方式
for {
select {
case <-done:
return // 停止返回
default:
}
// 執(zhí)行非搶占任務(wù)
}
//
for {
select {
case <-done:
return
default: // 將要執(zhí)行的任務(wù)放入default分支中
// 執(zhí)行非搶占任務(wù)
}
}
3. goroutine泄露
goroutine幾種終止方式:
- 完成任務(wù),終止
- 遇到不可恢復(fù)的錯(cuò)誤無(wú)法繼續(xù)它的任務(wù)欲鹏,終止映挂。
- 被告知停止當(dāng)前的任務(wù)。
一個(gè)常見(jiàn)的goroutine泄露的例子:
doWork := func(strings <-chan string) <-chan interface{} {
completed := make(chan interface{})
go func() {
defer fmt.Println("doWork exited.")
defer close(completed)
for s := range strings { // 對(duì)于channel的訪問(wèn)轨帜,將一直被阻塞
fmt.Println(s)
}
}()
return completed
}
doWork(nil)
// 這里還有其他任務(wù)執(zhí)行
fmt.Println("Done.")
解決goroutine泄露的一種方法转锈,即向子goroutine發(fā)送結(jié)束信號(hào),通知其退出尤误。
doWork := func(done <-chan interface{}, strings <-chan string) <-chan interface{} { //1
terminated := make(chan interface{})
go func() {
defer fmt.Println("doWork exited.")
defer close(terminated)
for { // for-select 處理一手終止信號(hào)
select {
case s := <-strings: // 該case分支將一直被阻塞
// Do something interesting
fmt.Println(s)
case <-done: //2 :接收到結(jié)束信號(hào)侠畔,退出當(dāng)前goroutine
return
}
}
}()
return terminated
}
done := make(chan interface{})
terminated := doWork(done, nil)
go func() { //3
// Cancel the operation after 1 second.
// 1s后close channel,向子goroutine廣播結(jié)束信號(hào)
time.Sleep(1 * time.Second)
fmt.Println("Canceling doWork goroutine...")
close(done)
}()
// 一直阻塞损晤,直到子goroutine結(jié)束
<-terminated //4
fmt.Println("Done.")
另外一個(gè)goroutine泄露的示例:
newRandStream := func() <-chan int {
randStream := make(chan int)
go func() {
defer fmt.Println("newRandStream closure exited.") // 1
defer close(randStream)
for {
randStream <- rand.Int() // 此處在讀取完第三個(gè)元素后软棺,將會(huì)永久阻塞,導(dǎo)致goroutine泄露
}
}()
return randStream
}
randStream := newRandStream()
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
fmt.Printf("%d: %d\n", i, <-randStream)
}
針對(duì)該goroutine泄露的解決方案:
newRandStream := func(done <-chan interface{}) <-chan int {
randStream := make(chan int)
go func() {
defer fmt.Println("newRandStream closure exited.")
defer close(randStream)
for {
select {
case randStream <- rand.Int():
case <-done: // 結(jié)束信號(hào)到達(dá)尤勋,立即結(jié)束
return
}
}
}()
return randStream
}
done := make(chan interface{})
randStream := newRandStream(done)
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
fmt.Printf("%d: %d\n", i, <-randStream)
}
close(done) // close channel喘落,發(fā)出通知信號(hào)
//模擬正在進(jìn)行的工作
time.Sleep(1 * time.Second)
防止goroutine泄露遵循的一個(gè)原則:如果goroutine負(fù)責(zé)創(chuàng)建子goroutine,它也必須負(fù)責(zé)確保它可以停止子goroutine
4. or-channel
or-done-channel:將任意個(gè)數(shù)的done channel組合成為一個(gè)done channel最冰,即N個(gè)done channel中任意一個(gè)done了瘦棋,整個(gè)組合的done channnel就done了。
// or-channel的一種遞歸實(shí)現(xiàn)
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
var or func(chs ...<-chan interface{}) <-chan interface{}
or = func(chs ...<-chan interface{}) <-chan interface{} {
if len(chs) == 0 {
return nil
}
if len(chs) == 1 {
return chs[0]
}
chsLen := len(chs)
orDone := make(chan interface{}) // done channel
go func() {
defer close(orDone)
select {
case <-or(chs[:chsLen/2]...): // 0...chsLen/2-1 channel監(jiān)聽
case <-or(chs[chsLen/2:]...): // chsLen/2...chsLen-1 channel監(jiān)聽
}
}()
return orDone
}
var chs []chan interface{}
for i := 0; i < 5; i++ {
chs = append(chs, make(chan interface{}))
}
go func(chs ...chan interface{}) {
time.Sleep(1 * time.Second)
idx := rand.Intn(5)
fmt.Printf("close channel %d\n", idx)
close(chs[idx])
}(chs...)
//<-or(chs...)
<-or(chs[0], chs[1], chs[2], chs[3], chs[4])
fmt.Println("end test")
}
Tips:
Go的一個(gè)優(yōu)點(diǎn)是能夠快速創(chuàng)建暖哨,調(diào)度和運(yùn)行g(shù)oroutine赌朋,并且在Go中積極鼓勵(lì)使用goroutines來(lái)正確建模問(wèn)題。
5. 錯(cuò)誤處理
Go避開了流行的錯(cuò)誤異常模型,Go認(rèn)為錯(cuò)誤處理非常重要沛慢,并且在開發(fā)程序時(shí)赡若,我們應(yīng)該像關(guān)注算法一樣關(guān)注錯(cuò)誤處理。
// 并發(fā)情況下的錯(cuò)誤處理团甲,潛在的結(jié)果和潛在的錯(cuò)誤同時(shí)返回
type Result struct { //1
Error error
Response *http.Response
}
checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result { //2
results := make(chan Result)
go func() {
defer close(results)
for _, url := range urls {
var result Result
resp, err := http.Get(url)
result = Result{Error: err, Response: resp} //3:錯(cuò)誤和結(jié)果包裝在一起
select {
case <-done:
return
case results <- result: //4
}
}
}()
return results
}
done := make(chan interface{})
defer close(done)
urls := []string{"https://www.baidu.com", "https://badhost"}
for result := range checkStatus(done, urls...) { // 檢查錯(cuò)誤和結(jié)果
if result.Error != nil { //5
fmt.Printf("error: %v", result.Error)
continue
}
fmt.Printf("Response: %v\n", result.Response.Status)
}
6. 管道
利用channel來(lái)實(shí)現(xiàn)管道的功能:
generator := func(done <-chan interface{}, integers ...int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
}()
return intStream
}
multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
multipliedStream := make(chan int)
go func() {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done:
return
case multipliedStream <- i * multiplier: // 乘法結(jié)果塞入管道中
}
}
}()
return multipliedStream
}
add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
for i := range intStream {
select {
case <-done:
return
case addedStream <- i + additive: // 加法結(jié)果塞入管道中
}
}
}()
return addedStream
}
done := make(chan interface{})
defer close(done)
intStream := generator(done, 1, 2, 3, 4) // 生產(chǎn)數(shù)據(jù)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2) // 管道傳輸數(shù)據(jù)
// 通過(guò)關(guān)閉done channel逾冬,隨時(shí)終止管道數(shù)據(jù)的傳輸
for v := range pipeline {
fmt.Println(v)
}
利用channel實(shí)現(xiàn)的一些generator:
// repeat 會(huì)重復(fù)傳輸你給它的值,知道關(guān)閉done channel
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
// take: 從channel中拿取一定數(shù)量的數(shù)據(jù)伐庭,然后返回
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream: // 從傳入的管道中接收數(shù)據(jù)
}
}
}()
return takeStream
}
// 模擬管道兩端的生產(chǎn)方和消費(fèi)方
done := make(chan interface{})
defer close(done)
for num := range take(done, repeat(done, 1), 10) {
fmt.Printf("%v ", num)
}
一個(gè)指定函數(shù)規(guī)則的generator:
// fn即為函數(shù)產(chǎn)生器
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
done := make(chan interface{})
defer close(done)
// 隨機(jī)數(shù)生成器粉渠,用于產(chǎn)生隨機(jī)數(shù)
rand := func() interface{} {
return rand.Int()
}
for num := range take(done, repeatFn(done, rand), 10) {
fmt.Println(num)
}
7. 扇入/扇出
單個(gè)goroutine處理管道的輸入/輸出變成多個(gè)goroutine處理管道的輸入/輸出,稱之為扇入/扇出圾另。
扇出:描述啟動(dòng)多個(gè)goroutine以處理來(lái)自管道的輸入過(guò)程霸株。
扇入:描述將多個(gè)goroutine的處理結(jié)果組合到一個(gè)通道中。
啟用扇入/扇出操作的時(shí)機(jī):
- 模塊當(dāng)前計(jì)算結(jié)果不依賴于前面的計(jì)算結(jié)果(可并行)集乔。
- 整個(gè)管道流程的處理時(shí)間較長(zhǎng)去件。
相關(guān)扇入/扇出的示例可參考:https://www.kancloud.cn/mutouzhang/go/596844
可以創(chuàng)建一個(gè)function來(lái)同時(shí)處理close done channel和數(shù)據(jù)流接收操作
orDone := func(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done: // 處理done channel
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <- v: // 數(shù)據(jù)接收處理
case <-done:
// return
}
}
}
}()
return valStream
}
// 從返回的channel中讀取數(shù)據(jù),可隨時(shí)中斷數(shù)據(jù)的讀取扰路,close done channel
for val := range orDone(done, myChan) {
// Do something with val
}
8. tee-channel
// or-done channel + bridge-channel
orDone := func(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done: // 處理done channel
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <- v: // 數(shù)據(jù)接收處理
case <-done:
// return
}
}
}
}()
return valStream
}
// 從返回的channel中讀取數(shù)據(jù)尤溜,可隨時(shí)中斷數(shù)據(jù)的讀取,close done channel
for val := range orDone(done, myChan) {
// Do something with val
}
// bridge-channel : channel套channel
bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{}) // 1
go func() {
defer close(valStream)
for { // 2
var stream <-chan interface{}
select {
case maybeStream, ok := <-chanStream:
if ok == false {
return
}
stream = maybeStream
case <-done:
return
}
for val := range orDone(done, stream) { // 3
select {
case valStream <- val:
case <-done:
}
}
}
}()
return valStream
}
9. context package
context package提供的一些操作方法如下所示:
var Canceled = errors.New("context canceled")
var Canceled = errors.New("context canceled")
type CancelFunc // 取消函數(shù)
type Context
func Background() Context
func TODO() Context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
Context interface定義:
type Context interface {
// Deadline 返回任務(wù)完成時(shí)(該 context 被取消)的時(shí)間汗唱。
// 如果deadline 未設(shè)置宫莱,則返回的ok值為false。
// 連續(xù)調(diào)用該函數(shù)將返回相同的結(jié)果哩罪。
Deadline() (deadline time.Time, ok bool)
// Done 返回任務(wù)完成時(shí)(該 context 被取消)一個(gè)已關(guān)閉的通道授霸。
// 如果該context無(wú)法被取消,Done 將返回nil际插。
// 連續(xù)調(diào)用該函數(shù)將返回相同的結(jié)果碘耳。
//
// 當(dāng)cancel被調(diào)用時(shí),WithCancel 遍歷 Done以執(zhí)行關(guān)閉框弛;
// 當(dāng)deadline即將到期時(shí)辛辨,WithDeadline 遍歷 Done以執(zhí)行關(guān)閉;
// 當(dāng)timeout時(shí)瑟枫,WithTimeout 遍歷 Done以執(zhí)行關(guān)閉斗搞。
//
// Done 主要被用于 select 語(yǔ)句:
//
// // Stream 使用DoSomething生成值,并將值發(fā)送出去
// // 直到 DoSomething 返回錯(cuò)誤或 ctx.Done 被關(guān)閉
// func Stream(ctx context.Context, out chan<- Value) error {
// for {
// v, err := DoSomething(ctx)
// if err != nil {
// return err
// }
// select {
// case <-ctx.Done():
// return ctx.Err()
// case out <- v:
// }
// }
// }
//
// 查看 https://blog.golang.org/pipelines更多示例以了解如何使用
// Done通道執(zhí)行取消操作慷妙。
Done() <-chan struct{}
// 如果 Done 尚未關(guān)閉, Err 返回 nil.
// 如果 Done 已關(guān)閉, Err 返回值不為nil的error以解釋為何關(guān)閉:
// 因 context 的關(guān)閉導(dǎo)致
// 或 context 的 deadline 執(zhí)行導(dǎo)致榜旦。
// 在 Err 返回值不為nil的error之后, 連續(xù)調(diào)用該函數(shù)將返回相同的結(jié)果。
Err() error
// Value 根據(jù) key 返回與 context 相關(guān)的結(jié)果景殷,
// 如果沒(méi)有與key對(duì)應(yīng)的結(jié)果,則返回nil。
// 連續(xù)調(diào)用該函數(shù)將返回相同的結(jié)果猿挚。
//
// 該方法僅用于傳輸進(jìn)程和API邊界的請(qǐng)求數(shù)據(jù)咐旧,
// 不可用于將可選參數(shù)傳遞給函數(shù)。
//
// 鍵標(biāo)識(shí)著上Context中的特定值绩蜻。
// 在Context中存儲(chǔ)值的函數(shù)通常在全局變量中分配一個(gè)鍵铣墨,
// 然后使用該鍵作為context.WithValue和Context.Value的參數(shù)。
// 鍵可以是系統(tǒng)支持的任何類型办绝;
// 程序中各包應(yīng)將鍵定義為未導(dǎo)出類型以避免沖突伊约。
//
// 定義Context鍵的程序包應(yīng)該為使用該鍵存儲(chǔ)的值提供類型安全的訪問(wèn)器:
//
// // user包 定義了一個(gè)User類型,該類型存儲(chǔ)在Context中孕蝉。
// package user
//
// import "context"
//
// // User 類型的值會(huì)存儲(chǔ)在 Context中屡律。
// type User struct {...}
//
// // key是位于包內(nèi)的非導(dǎo)出類型。
// // 這可以防止與其他包中定義的鍵的沖突降淮。
// type key int
//
// // userKey 是user.User類型的值存儲(chǔ)在Contexts中的鍵超埋。
// // 它是非導(dǎo)出的; clients use user.NewContext and user.FromContext
// // 使用 user.NewContext 和 user.FromContext來(lái)替代直接使用鍵佳鳖。
// var userKey key
//
// // NewContext 返回一個(gè)新的含有值 u 的 Context霍殴。
// func NewContext(ctx context.Context, u *User) context.Context {
// return context.WithValue(ctx, userKey, u)
// }
//
// // FromContext 返回存儲(chǔ)在 ctx中的 User類型的值(如果存在的話)。
// func FromContext(ctx context.Context) (*User, bool) {
// u, ok := ctx.Value(userKey).(*User)
// return u, ok
// }
Value(key interface{}) interface{}
一些終止信號(hào)的函數(shù):
// 取消
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// deadline
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
// 超時(shí)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
其中系吩,
WithCancel返回一個(gè)新的Context来庭,它在調(diào)用返回的cancel函數(shù)時(shí)關(guān)閉done通道。
WithDeadline返回一個(gè)新的Context穿挨,當(dāng)機(jī)器的時(shí)鐘超過(guò)給定的最后期限時(shí)月弛,它關(guān)閉done通道。
WithTimeout返回一個(gè)新的Context絮蒿,它在給定的超時(shí)時(shí)間后關(guān)閉done通道尊搬。
具體一些context包的使用示例可參考:https://www.kancloud.cn/mutouzhang/go/596849