理解golang io.Pipe

golang的io包中,稍微有點兒晦澀的就是Pipe方法,今天我們就一起來看一看這個Pipe材泄。

函數定義如下:

func Pipe() (*PipeReader, *PipeWriter)

它返回了一個Reader和一個Writer

起初一看是有點兒奇怪的,很少有這么用的哦版姑,它到底能干嘛呢?
其實它返回的不僅僅是簡單的一個Writer一個Reader,它返回的是息息相關的一個Writer和一個Reader。
下面我先用比較口語化的方式來講一下它們是如何工作的芬膝。

假設

先假設我們在工地上,有兩個工人形娇,一個叫w蔗候,一個叫r,w負責搬磚埂软,而r負責砌墻。

初始

w和r一起配合工作纫事,一開始啥都沒有勘畔,負責砌墻的r就沒法工作,于是它開始睡覺(Wait)丽惶。而w只能去搬磚了炫七。

磚來了

w深知r懶惰的習性,當它把磚搬過來后钾唬,就把r叫醒(Signal)万哪。然后w心想,反正你砌墻也要一會兒抡秆,那我也睡會兒奕巍。于是w叫醒r后它也開始睡覺(Wait)。

砌墻

r被叫醒之后儒士,心想著睡了這么久害怕被包工頭責罵的止,自然就開始辛勤地砌墻了,很快就把w搬過來的磚用完了着撩。r心想诅福,這墻砌不完可怪不到我頭上匾委,因為沒磚了,于是r叫醒了w氓润,然后自己又去睡覺了赂乐。

繼續(xù)搬磚

w被叫醒后一看,哎喲我去咖气,這么快就沒磚了挨措?然后他又跑去搬了些轉過來,然后叫醒睡得跟死豬一樣的r起來砌墻采章,自己又開始睡覺……

周而復始运嗜,直到……

w和r兩人就這么周而復始地配合,直到r發(fā)現(xiàn)墻砌好了悯舟,或者w發(fā)現(xiàn)工地上已經沒有磚了担租。


以上大概就是Pipe的通俗的解釋。不過問題也來了抵怎,這倆人瞌睡怎么這么多呢奋救?w干活r就歇著,不能同時干嗎反惕?答案是——不能
為什么尝艘?因為Pipe就是為了某些特定場景而提出的∽巳荆看看官方文檔的說明:

Reads and Writes on the pipe are matched one to one except when multiple Reads are needed to consume a single Write

也就是說背亥,Pipe適用于,產生了一條數據悬赏,緊接著就要處理掉這條數據的場景狡汉。而且因為其內部是一把大鎖,因此是線程安全的闽颇。

內部實現(xiàn)

來看看內部實現(xiàn)盾戴,先看看read

func (p *pipe) read(b []byte) (n int, err error) {
    // One reader at a time.
    p.rl.Lock()
    defer p.rl.Unlock()

    p.l.Lock()
    defer p.l.Unlock()
    for {
        if p.rerr != nil {
            return 0, ErrClosedPipe
        }
        if p.data != nil {
            break
        }
        if p.werr != nil {
            return 0, p.werr
        }
        p.rwait.Wait()
    }
    n = copy(b, p.data)
    p.data = p.data[n:]
    if len(p.data) == 0 {
        p.data = nil
        p.wwait.Signal()
    }
    return
}

這段代碼,我用偽代碼簡化一下:

func (p *pipe) read(b []byte) (n int, err error) {
    各種加鎖()
    for {
        if 有數據可以讀或者哪里有錯 {
           break
        } 
        讓出時間片等待被喚醒兵多,如果是被正常調度回來的依然不醒尖啡,必須是被指名點姓叫醒才醒()
    }
    copy(b, p.data)
    通知writer可以繼續(xù)寫數據進來了()
}

write其實也是大同小異:

func (p *pipe) write(b []byte) (n int, err error) {
  各種加鎖()
  p.data = b
  通知reader有數據了()
  for {
    if 數據被讀完了或者哪里有錯 {
      break
    }
    讓出時間片等待被喚醒,如果是被正常調度回來的依然不醒剩膘,必須是被指名點姓叫醒才醒()
  }
  p.data = nil
}

看了偽代碼衅斩,再看看實際代碼,應該就很容易了怠褐。但是還有幾個地方需要細說矛渴,第一個就是鎖的問題。

在read中:

func (p *pipe) read(b []byte) (n int, err error) {
    // One reader at a time.
    p.rl.Lock()
    defer p.rl.Unlock()

    p.l.Lock()
    defer p.l.Unlock()
        // ...

而在write中:

func (p *pipe) write(b []byte) (n int, err error) {
    // pipe uses nil to mean not available
    if b == nil {
        b = zero[:]
    }

    // One writer at a time.
    p.wl.Lock()
    defer p.wl.Unlock()

    p.l.Lock()
    defer p.l.Unlock()
    if p.werr != nil {
        err = ErrClosedPipe
        return
    }
        // ...

可能你注意到了,read和write都會去取同一把鎖p.l具温。
假設我們writer和reader在兩個不同的goroutine中執(zhí)行蚕涤,并且write先執(zhí)行,那么依照上面的代碼铣猩,write會先拿鎖揖铜,當執(zhí)行完

p.data = b

之后會通知reader,然后自己進入一個死循環(huán)里進行Wait达皿,直到reader把p.data讀完天吓。但是問題來了,writer進入死循環(huán)時并沒有釋放鎖p.l峦椰,然后reader一直等待p.l釋放然后去讀取數據龄寞,而writer一直在等reader讀取完數據才能跳出去釋放鎖√拦Γ看起來這是一個死鎖物邑?
我只能說“Naive”,官方標準庫怎么會犯這么低級的錯誤呢滔金?但是代碼就這樣色解,該如何解釋?
其實餐茵,關鍵在于那個sync.Cond

type pipe struct {
    rl    sync.Mutex // gates readers one at a time
    wl    sync.Mutex // gates writers one at a time
    l     sync.Mutex // protects remaining fields
    data  []byte     // data remaining in pending write
    rwait sync.Cond  // waiting reader
    wwait sync.Cond  // waiting writer
    rerr  error      // if reader closed, error to give writes
    werr  error      // if writer closed, error to give reads
}

rwait和wwait都是sync.Cond科阎,這是什么東東呢?
看下它的文檔:

// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
//
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond can be created as part of other structures.
// A Cond must not be copied after first use.
type Cond struct {
    noCopy noCopy

    // L is held while observing or changing the condition
    L Locker

    notify  notifyList
    checker copyChecker
}

Cond如果要細說的話忿族,又得寫另一篇文章了锣笨。在這里你只要知道sync.Cond其內部依賴于一個Locker。
而且在初始化時:

func Pipe() (*PipeReader, *PipeWriter) {
    p := new(pipe)
    p.rwait.L = &p.l
    p.wwait.L = &p.l
    r := &PipeReader{p}
    w := &PipeWriter{p}
    return r, w
}

可以看到rwait和wwait都是依賴于用一把鎖道批,而且這把鎖就是p.l错英。可能有點兒繞屹徘,其實就是:

  • p.l.Lock()
  • p.rwait.Wait()
  • p.wwait.Wait()
    都是依賴于同一把鎖。這有什么玄機嗎衅金?——有的噪伊!
    如前所述,當writer拿到鎖p.l氮唯,然后開始在死循環(huán)中p.wwait.Wait()等著reader讀完數據時鉴吹,表面上看起來p.l鎖沒有被釋放,會發(fā)生死鎖惩琉。但是豆励,玄機就在p.wwait.Wait上。
    不賣關子了,p.wwait.Wait被調用時良蒸,會在內部釋放鎖技扼,而由于p.l和p.wwait.L是同一把鎖,因此reader進去時是可以獲取到鎖的嫩痰。
func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

Cond這個東西剿吻,要說起來比較復雜,它涉及到runtime串纺,下次會寫一篇文章具體講講丽旅。本文主要是講Pipe,所以就不擴展了纺棺。

例子

Pipe的使用場景榄笙,我覺得極少數場景可能才會需要用到,我目前沒有想到非常需要Pipe的場景祷蝌。因為每次Read需要等Write寫完茅撞,是串行的場景。不過Pipe的好處是杆逗,由于它把Write的slice放到p.data中乡翅,這是一次引用賦值。之后Read時罪郊,把p.data copy出去蠕蚜,本質上相當于copy了write的原始數據,并沒有用臨時slice存儲悔橄,減少了內存使用量靶累。
我感覺也就那么回事兒吧,為此你不得不再開個goroutine癣疟,gotoutine雖然輕量級挣柬,但也不是沒有開銷,當然它的開銷和分配內存比就小巫見大巫了睛挚。我個人感覺邪蛔,如果你的應用沒有對內存要求嚴苛到這種級別,Pipe也沒什么卵用扎狱。
如果你發(fā)現(xiàn)了Pipe比較合適的場景侧到,非常希望告訴我!
下面給出一個強行使用Pipe的代碼:起了多個goroutine作為writer淤击,每個writer內部隨機生成字符串寫進去匠抗。唯一的reader讀取數據并打印:

var r = rand.New(rand.NewSource(time.Now().UnixNano()))

func generate(writer *PipeWriter) {
    arr := make([]byte, 32)
    for {
        for i := 0; i < 32; i++ {
            arr[i] = byte(r.Uint32() >> 24)
        }
        n, err := writer.Write(arr)
        if nil != err {
            log.Fatal(err)
        }
        time.Sleep(200 * time.Millisecond)
    }
}

func main() {
    rp, wp := Pipe()
    for i := 0; i < 20; i++ {
        go generate(wp)
    }
    time.Sleep(1 * time.Second)
    data := make([]byte, 64)
    for {
        n, err := rp.Read(data)
        if nil != err {
            log.Fatal(err)
        }
        if 0 != n {
            log.Println("main loop", n, string(data))
        }
        time.Sleep(1 * time.Second)
    }
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末污抬,一起剝皮案震驚了整個濱河市汞贸,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖矢腻,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件门驾,死亡現(xiàn)場離奇詭異,居然都是意外死亡踏堡,警方通過查閱死者的電腦和手機猎唁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來顷蟆,“玉大人诫隅,你說我怎么就攤上這事≌寿耍” “怎么了逐纬?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵,是天一觀的道長削樊。 經常有香客問我豁生,道長,這世上最難降的妖魔是什么漫贞? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任甸箱,我火速辦了婚禮,結果婚禮上迅脐,老公的妹妹穿的比我還像新娘芍殖。我一直安慰自己,他們只是感情好谴蔑,可當我...
    茶點故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布豌骏。 她就那樣靜靜地躺著,像睡著了一般隐锭。 火紅的嫁衣襯著肌膚如雪窃躲。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天钦睡,我揣著相機與錄音蒂窒,去河邊找鬼。 笑死荞怒,一個胖子當著我的面吹牛洒琢,可吹牛的內容都是我干的。 我是一名探鬼主播挣输,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼纬凤,長吁一口氣:“原來是場噩夢啊……” “哼福贞!你這毒婦竟也來了撩嚼?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎完丽,沒想到半個月后恋技,有當地人在樹林里發(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡逻族,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年蜻底,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片聘鳞。...
    茶點故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡薄辅,死狀恐怖,靈堂內的尸體忽然破棺而出抠璃,到底是詐尸還是另有隱情站楚,我是刑警寧澤,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布搏嗡,位于F島的核電站窿春,受9級特大地震影響,放射性物質發(fā)生泄漏采盒。R本人自食惡果不足惜旧乞,卻給世界環(huán)境...
    茶點故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望磅氨。 院中可真熱鬧尺栖,春花似錦、人聲如沸悍赢。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽左权。三九已至皮胡,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間赏迟,已是汗流浹背屡贺。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留锌杀,地道東北人甩栈。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像糕再,于是被迫代替她去往敵國和親量没。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內容