golang的io包中,稍微有點兒晦澀的就是Pipe方法,今天我們就一起來看一看這個Pipe材泄。
函數定義如下:
func Pipe() (*PipeReader, *PipeWriter)
它返回了一個Reader和一個Writer
起初一看是有點兒奇怪的,很少有這么用的哦版姑,它到底能干嘛呢?
其實它返回的不僅僅是簡單的一個Writer一個Reader,它返回的是息息相關的一個Writer和一個Reader。
下面我先用比較口語化的方式來講一下它們是如何工作的芬膝。
假設
先假設我們在工地上,有兩個工人形娇,一個叫w蔗候,一個叫r,w負責搬磚埂软,而r負責砌墻。
初始
w和r一起配合工作纫事,一開始啥都沒有勘畔,負責砌墻的r就沒法工作,于是它開始睡覺(Wait
)丽惶。而w只能去搬磚了炫七。
磚來了
w深知r懶惰的習性,當它把磚搬過來后钾唬,就把r叫醒(Signal
)万哪。然后w心想,反正你砌墻也要一會兒抡秆,那我也睡會兒奕巍。于是w叫醒r后它也開始睡覺(Wait
)。
砌墻
r被叫醒之后儒士,心想著睡了這么久害怕被包工頭責罵的止,自然就開始辛勤地砌墻了,很快就把w搬過來的磚用完了着撩。r心想诅福,這墻砌不完可怪不到我頭上匾委,因為沒磚了,于是r叫醒了w氓润,然后自己又去睡覺了赂乐。
繼續(xù)搬磚
w被叫醒后一看,哎喲我去咖气,這么快就沒磚了挨措?然后他又跑去搬了些轉過來,然后叫醒睡得跟死豬一樣的r起來砌墻采章,自己又開始睡覺……
周而復始运嗜,直到……
w和r兩人就這么周而復始地配合,直到r發(fā)現(xiàn)墻砌好了悯舟,或者w發(fā)現(xiàn)工地上已經沒有磚了担租。
以上大概就是Pipe的通俗的解釋。不過問題也來了抵怎,這倆人瞌睡怎么這么多呢奋救?w干活r就歇著,不能同時干嗎反惕?答案是——不能
為什么尝艘?因為Pipe就是為了某些特定場景而提出的∽巳荆看看官方文檔的說明:
Reads and Writes on the pipe are matched one to one except when multiple Reads are needed to consume a single Write
也就是說背亥,Pipe適用于,產生了一條數據悬赏,緊接著就要處理掉這條數據的場景狡汉。而且因為其內部是一把大鎖,因此是線程安全的闽颇。
內部實現(xiàn)
來看看內部實現(xiàn)盾戴,先看看read
func (p *pipe) read(b []byte) (n int, err error) {
// One reader at a time.
p.rl.Lock()
defer p.rl.Unlock()
p.l.Lock()
defer p.l.Unlock()
for {
if p.rerr != nil {
return 0, ErrClosedPipe
}
if p.data != nil {
break
}
if p.werr != nil {
return 0, p.werr
}
p.rwait.Wait()
}
n = copy(b, p.data)
p.data = p.data[n:]
if len(p.data) == 0 {
p.data = nil
p.wwait.Signal()
}
return
}
這段代碼,我用偽代碼簡化一下:
func (p *pipe) read(b []byte) (n int, err error) {
各種加鎖()
for {
if 有數據可以讀或者哪里有錯 {
break
}
讓出時間片等待被喚醒兵多,如果是被正常調度回來的依然不醒尖啡,必須是被指名點姓叫醒才醒()
}
copy(b, p.data)
通知writer可以繼續(xù)寫數據進來了()
}
write其實也是大同小異:
func (p *pipe) write(b []byte) (n int, err error) {
各種加鎖()
p.data = b
通知reader有數據了()
for {
if 數據被讀完了或者哪里有錯 {
break
}
讓出時間片等待被喚醒,如果是被正常調度回來的依然不醒剩膘,必須是被指名點姓叫醒才醒()
}
p.data = nil
}
看了偽代碼衅斩,再看看實際代碼,應該就很容易了怠褐。但是還有幾個地方需要細說矛渴,第一個就是鎖的問題。
在read中:
func (p *pipe) read(b []byte) (n int, err error) {
// One reader at a time.
p.rl.Lock()
defer p.rl.Unlock()
p.l.Lock()
defer p.l.Unlock()
// ...
而在write中:
func (p *pipe) write(b []byte) (n int, err error) {
// pipe uses nil to mean not available
if b == nil {
b = zero[:]
}
// One writer at a time.
p.wl.Lock()
defer p.wl.Unlock()
p.l.Lock()
defer p.l.Unlock()
if p.werr != nil {
err = ErrClosedPipe
return
}
// ...
可能你注意到了,read和write都會去取同一把鎖p.l
具温。
假設我們writer和reader在兩個不同的goroutine中執(zhí)行蚕涤,并且write先執(zhí)行,那么依照上面的代碼铣猩,write會先拿鎖揖铜,當執(zhí)行完
p.data = b
之后會通知reader,然后自己進入一個死循環(huán)里進行Wait达皿,直到reader把p.data讀完天吓。但是問題來了,writer進入死循環(huán)時并沒有釋放鎖p.l
峦椰,然后reader一直等待p.l釋放然后去讀取數據龄寞,而writer一直在等reader讀取完數據才能跳出去釋放鎖√拦Γ看起來這是一個死鎖物邑?
我只能說“Naive”,官方標準庫怎么會犯這么低級的錯誤呢滔金?但是代碼就這樣色解,該如何解釋?
其實餐茵,關鍵在于那個sync.Cond
type pipe struct {
rl sync.Mutex // gates readers one at a time
wl sync.Mutex // gates writers one at a time
l sync.Mutex // protects remaining fields
data []byte // data remaining in pending write
rwait sync.Cond // waiting reader
wwait sync.Cond // waiting writer
rerr error // if reader closed, error to give writes
werr error // if writer closed, error to give reads
}
rwait和wwait都是sync.Cond
科阎,這是什么東東呢?
看下它的文檔:
// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond can be created as part of other structures.
// A Cond must not be copied after first use.
type Cond struct {
noCopy noCopy
// L is held while observing or changing the condition
L Locker
notify notifyList
checker copyChecker
}
Cond如果要細說的話忿族,又得寫另一篇文章了锣笨。在這里你只要知道sync.Cond其內部依賴于一個Locker。
而且在初始化時:
func Pipe() (*PipeReader, *PipeWriter) {
p := new(pipe)
p.rwait.L = &p.l
p.wwait.L = &p.l
r := &PipeReader{p}
w := &PipeWriter{p}
return r, w
}
可以看到rwait和wwait都是依賴于用一把鎖道批,而且這把鎖就是p.l错英。可能有點兒繞屹徘,其實就是:
p.l.Lock()
p.rwait.Wait()
-
p.wwait.Wait()
都是依賴于同一把鎖。這有什么玄機嗎衅金?——有的噪伊!
如前所述,當writer拿到鎖p.l
氮唯,然后開始在死循環(huán)中p.wwait.Wait()
等著reader讀完數據時鉴吹,表面上看起來p.l鎖沒有被釋放,會發(fā)生死鎖惩琉。但是豆励,玄機就在p.wwait.Wait
上。
不賣關子了,p.wwait.Wait
被調用時良蒸,會在內部釋放鎖技扼,而由于p.l和p.wwait.L是同一把鎖,因此reader進去時是可以獲取到鎖的嫩痰。
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
Cond這個東西剿吻,要說起來比較復雜,它涉及到runtime串纺,下次會寫一篇文章具體講講丽旅。本文主要是講Pipe,所以就不擴展了纺棺。
例子
Pipe的使用場景榄笙,我覺得極少數場景可能才會需要用到,我目前沒有想到非常需要Pipe的場景祷蝌。因為每次Read需要等Write寫完茅撞,是串行的場景。不過Pipe的好處是杆逗,由于它把Write的slice放到p.data中乡翅,這是一次引用賦值。之后Read時罪郊,把p.data copy出去蠕蚜,本質上相當于copy了write的原始數據,并沒有用臨時slice存儲悔橄,減少了內存使用量靶累。
我感覺也就那么回事兒吧,為此你不得不再開個goroutine癣疟,gotoutine雖然輕量級挣柬,但也不是沒有開銷,當然它的開銷和分配內存比就小巫見大巫了睛挚。我個人感覺邪蛔,如果你的應用沒有對內存要求嚴苛到這種級別,Pipe也沒什么卵用扎狱。
如果你發(fā)現(xiàn)了Pipe比較合適的場景侧到,非常希望告訴我!
下面給出一個強行使用Pipe的代碼:起了多個goroutine作為writer淤击,每個writer內部隨機生成字符串寫進去匠抗。唯一的reader讀取數據并打印:
var r = rand.New(rand.NewSource(time.Now().UnixNano()))
func generate(writer *PipeWriter) {
arr := make([]byte, 32)
for {
for i := 0; i < 32; i++ {
arr[i] = byte(r.Uint32() >> 24)
}
n, err := writer.Write(arr)
if nil != err {
log.Fatal(err)
}
time.Sleep(200 * time.Millisecond)
}
}
func main() {
rp, wp := Pipe()
for i := 0; i < 20; i++ {
go generate(wp)
}
time.Sleep(1 * time.Second)
data := make([]byte, 64)
for {
n, err := rp.Read(data)
if nil != err {
log.Fatal(err)
}
if 0 != n {
log.Println("main loop", n, string(data))
}
time.Sleep(1 * time.Second)
}
}