golang 訂閱發(fā)布機(jī)制實(shí)現(xiàn)

大家使用較多的生產(chǎn)消費(fèi)模式中間件县袱,大都是訂閱發(fā)布機(jī)制滴铅,反過來大家用這些中間件肯定都會用窘拯,一方subscribe(訂閱)一個topic烘苹,另一方publish(發(fā)布)消息向該topic躲株,訂閱了該topic的subsciber都會收到該消息,反之镣衡,則收不到霜定。那么如果讓大家手動實(shí)現(xiàn)個這個機(jī)制,大家向下是否能夠?qū)崿F(xiàn)出來廊鸥?小伙伴們不妨切到自己的開發(fā)工具上手動敲下代碼代碼試試望浩。如果沒問題的可以不用往下看了,浪費(fèi)大家的時間惰说,如果敲不出來的可以繼續(xù)往下看磨德,我?guī)Т蠹乙黄疬吳眠吺崂磉@個流程。

想下我們在使用訂閱發(fā)布組件的時候吆视,編寫demo的例程的邏輯步驟

1典挑、創(chuàng)建publisher對象,這樣就要有publisher類

?2啦吧、創(chuàng)建publisher 的client您觉,也就是subscriber

3、subscriber定于publisher的topic

4授滓、publisher發(fā)布消息

5琳水、subscriber可以收到publisher發(fā)布的消息

至此完成。

首先我們寫main般堆,因?yàn)槲抑耙苍噲D先寫子函數(shù)在孝,以及在看資料的時候,資料的例子往往從struct的定義寫起淮摔,這樣不利于自己思考浑玛,而且即使看完之后知識還是在那里安靜的躺著,并沒有被吸收到你的大腦里噩咪。好的顾彰,廢話不多說极阅。將上面的流程在main函數(shù)中體現(xiàn)出來:

func main() {

????p :=NewPublisher()? ? //對應(yīng)上面的第一步

? ? s := NewSubscriber()? ? //對應(yīng)上面的第二步

? ? p.addSubscriber(s)? ? //對應(yīng)上面的第三部

? ? ? ?})

????p.publish("Hello, golang")? ? ?//對應(yīng)上面的第4步

????go func() {? ? //對應(yīng)上面的第5步

????????if msg <-s {

????????????fmt.Println(msg)

????????}

????}()

????time.Sleep(time.Second *3)

}

就這么簡單,上面是一段偽代碼涨享,下面我們在一點(diǎn)點(diǎn)的填充豐富下代碼筋搏。

第一個publiser類,需要填充下信息厕隧,大家想下最基礎(chǔ)的publisher需要有哪些成員奔脐,1、超時時間吁讨,以便在publish消息時發(fā)生阻塞髓迎;2、存儲第三部添加進(jìn)來的subscriber建丧,這里可以使用slice存儲排龄;

第二步subscriber類,大家想下該類中應(yīng)該有哪些成員翎朱,1橄维、channel interface{},因?yàn)閜ublisher向subscriber發(fā)布消息的時候,只需向每個subsciber發(fā)布msg就可以了拴曲,subsciber監(jiān)聽這個channel争舞,有消息打印即可;2、既然有channel相應(yīng)的就要知道channel的buf大小澈灼,所有第二個成員buffer竞川。

就這么簡單的demo 我們實(shí)現(xiàn)下:

func main() {

????p :=newPublisher(100 *time.Millisecond)

????defer p.close()

????s :=newSubsciber(10)

????p.addSubscribe(*s)

????p.publish("Hello, golang")

????p.publish("Hello,php")

????p.publish("Hello,java")

????go func() {

????????for msg :=range s.ch {

????????????fmt.Println(msg)????

????????}

????}()

????sig :=make(chan os.Signal)

????signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

????select {

????????case <-sig:

????????????fmt.Println("finished!!!")

? ? ? ?}

}


//發(fā)布者對象

type publisher struct {

????timeout time.Duration

? ? subscribers []subscriber

}

type subscriber struct {

????buffer? ?int

? ? ch chan interface{}

}

func newPublisher(timeout time.Duration) *publisher {

????return &publisher{timeout: timeout}

}

func newSubsciber(buf int) *subscriber {

????return &subscriber{buffer: buf, ch:make(chan interface{}, buf)}

}

func (p *publisher)addSubscribe(sub subscriber) {

????p.subscribers =append(p.subscribers, sub)

}

func (p *publisher) close() {

????for i, _ :=range p.subscribers {

????????close(p.subscribers[i].ch)

????????p.subscribers =append(p.subscribers[0:i], p.subscribers[i+1:]...)

????}

}

func (p *publisher)publish(v interface{}) {

????var wg sync.WaitGroup

? ?for _, sub :=range p.subscribers {

????????wg.Add(1)

????????go p.sendTopic(sub, v, &wg)

????}

}

func (p *publisher)sendTopic(sub subscriber, v interface{}, wg *sync.WaitGroup) {

????defer wg.Done()

????select {

????????case sub.ch <- v:

????????case <-time.After(p.timeout):

????}

}

Ok,一個簡單的訂閱發(fā)布模型完成。接下來再增加點(diǎn)難度叁熔,根據(jù)發(fā)布的內(nèi)容委乌,某個subscriber訂閱包含特殊字符串消息的消息,如果沒有則不發(fā)送者疤,該怎么實(shí)現(xiàn)福澡?這里說的是不發(fā)送叠赦,很容易的想到就是在publish的時候做個filter驹马,設(shè)計(jì)成每個subscriber具有自己的filter這樣我們就修改下subsciber結(jié)構(gòu),內(nèi)置函數(shù)成員除秀。每個subsciber具有獨(dú)自的filter糯累,然后在publish的時候,走一下subsciber的filter內(nèi)置函數(shù)册踩,通過則發(fā)送泳姐,否則跳過。

首先修改下subscriber結(jié)構(gòu)

type (

????topicFilterFunc func(v interface{})bool

type subscriber struct {

????buffer? ?int

? ? ch chan interface{}

? ??tf topicFilterFunc

}

然后修改subscriber的new函數(shù)暂吉,如下

func newSubsciber(buf int, tf topicFilterFunc) *subscriber {

????return &subscriber{buffer: buf, ch:make(chan interface{}, buf), tf: tf}

}

然后修改publish里的sendTopic函數(shù)

func (p *publisher)sendTopic(sub subscriber, v interface{}, wg *sync.WaitGroup) {

????defer wg.Done(

? ? if sub.tf ==nil || !sub.tf(v) {

????????return

? ? }

????select {

????????case sub.ch <- v:

????????case <-time.After(p.timeout):

????}

}

修改下main還是里的new subscriber的方法胖秒。在實(shí)例化的時候增加topicFilter的具體實(shí)現(xiàn)缎患,如下:

func main() {

????p :=newPublisher(100 *time.Millisecond)

????defer p.close()

????s :=newSubsciber(10, func(v interface{})bool {

????????if s, ok := v.(string); ok {

????????????return strings.Contains(s, "golang")

????????}

????????return false

? ? ?})

????p.addSubscribe(*s)

????p.publish("Hello, golang")

????p.publish("Hello,php")

????p.publish("Hello,java")

????go func() {

????????for msg :=range s.ch {

????????????fmt.Println(msg)

????????}

????}()

????sig :=make(chan os.Signal)

????signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

????select {

????????case <-sig:

????????????fmt.Println("finished!!!")

????}

}

至此,整個例程已經(jīng)帶領(lǐng)大家完成阎肝。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末挤渔,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子风题,更是在濱河造成了極大的恐慌判导,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件沛硅,死亡現(xiàn)場離奇詭異眼刃,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)摇肌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進(jìn)店門擂红,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人朦蕴,你說我怎么就攤上這事篮条。” “怎么了吩抓?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵涉茧,是天一觀的道長。 經(jīng)常有香客問我疹娶,道長伴栓,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任雨饺,我火速辦了婚禮钳垮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘额港。我一直安慰自己饺窿,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布移斩。 她就那樣靜靜地躺著肚医,像睡著了一般。 火紅的嫁衣襯著肌膚如雪向瓷。 梳的紋絲不亂的頭發(fā)上肠套,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天,我揣著相機(jī)與錄音猖任,去河邊找鬼你稚。 笑死,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的刁赖。 我是一名探鬼主播搁痛,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼宇弛!你這毒婦竟也來了落追?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤涯肩,失蹤者是張志新(化名)和其女友劉穎轿钠,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體病苗,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡疗垛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了硫朦。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片贷腕。...
    茶點(diǎn)故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖咬展,靈堂內(nèi)的尸體忽然破棺而出泽裳,到底是詐尸還是另有隱情,我是刑警寧澤破婆,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布涮总,位于F島的核電站,受9級特大地震影響祷舀,放射性物質(zhì)發(fā)生泄漏瀑梗。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一裳扯、第九天 我趴在偏房一處隱蔽的房頂上張望抛丽。 院中可真熱鬧,春花似錦饰豺、人聲如沸亿鲜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蒿柳。三九已至,卻和暖如春锅很,著一層夾襖步出監(jiān)牢的瞬間其馏,已是汗流浹背凤跑。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工爆安, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人仔引。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓扔仓,卻偏偏與公主長得像褐奥,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子翘簇,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,700評論 2 354