大家使用較多的生產(chǎn)消費(fèi)模式中間件县袱,大都是訂閱發(fā)布機(jī)制滴铅,反過來大家用這些中間件肯定都會用窘拯,一方subscribe(訂閱)一個topic烘苹,另一方publish(發(fā)布)消息向該topic躲株,訂閱了該topic的subsciber都會收到該消息,反之镣衡,則收不到霜定。那么如果讓大家手動實(shí)現(xiàn)個這個機(jī)制,大家向下是否能夠?qū)崿F(xiàn)出來廊鸥?小伙伴們不妨切到自己的開發(fā)工具上手動敲下代碼代碼試試望浩。如果沒問題的可以不用往下看了,浪費(fèi)大家的時間惰说,如果敲不出來的可以繼續(xù)往下看磨德,我?guī)Т蠹乙黄疬吳眠吺崂磉@個流程。
想下我們在使用訂閱發(fā)布組件的時候吆视,編寫demo的例程的邏輯步驟
1典挑、創(chuàng)建publisher對象,這樣就要有publisher類
?2啦吧、創(chuàng)建publisher 的client您觉,也就是subscriber
3、subscriber定于publisher的topic
4授滓、publisher發(fā)布消息
5琳水、subscriber可以收到publisher發(fā)布的消息
至此完成。
首先我們寫main般堆,因?yàn)槲抑耙苍噲D先寫子函數(shù)在孝,以及在看資料的時候,資料的例子往往從struct的定義寫起淮摔,這樣不利于自己思考浑玛,而且即使看完之后知識還是在那里安靜的躺著,并沒有被吸收到你的大腦里噩咪。好的顾彰,廢話不多說极阅。將上面的流程在main函數(shù)中體現(xiàn)出來:
func main() {
????p :=NewPublisher()? ? //對應(yīng)上面的第一步
? ? s := NewSubscriber()? ? //對應(yīng)上面的第二步
? ? p.addSubscriber(s)? ? //對應(yīng)上面的第三部
? ? ? ?})
????p.publish("Hello, golang")? ? ?//對應(yīng)上面的第4步
????go func() {? ? //對應(yīng)上面的第5步
????????if msg <-s {
????????????fmt.Println(msg)
????????}
????}()
????time.Sleep(time.Second *3)
}
就這么簡單,上面是一段偽代碼涨享,下面我們在一點(diǎn)點(diǎn)的填充豐富下代碼筋搏。
第一個publiser類,需要填充下信息厕隧,大家想下最基礎(chǔ)的publisher需要有哪些成員奔脐,1、超時時間吁讨,以便在publish消息時發(fā)生阻塞髓迎;2、存儲第三部添加進(jìn)來的subscriber建丧,這里可以使用slice存儲排龄;
第二步subscriber類,大家想下該類中應(yīng)該有哪些成員翎朱,1橄维、channel interface{},因?yàn)閜ublisher向subscriber發(fā)布消息的時候,只需向每個subsciber發(fā)布msg就可以了拴曲,subsciber監(jiān)聽這個channel争舞,有消息打印即可;2、既然有channel相應(yīng)的就要知道channel的buf大小澈灼,所有第二個成員buffer竞川。
就這么簡單的demo 我們實(shí)現(xiàn)下:
func main() {
????p :=newPublisher(100 *time.Millisecond)
????defer p.close()
????s :=newSubsciber(10)
????p.addSubscribe(*s)
????p.publish("Hello, golang")
????p.publish("Hello,php")
????p.publish("Hello,java")
????go func() {
????????for msg :=range s.ch {
????????????fmt.Println(msg)????
????????}
????}()
????sig :=make(chan os.Signal)
????signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
????select {
????????case <-sig:
????????????fmt.Println("finished!!!")
? ? ? ?}
}
//發(fā)布者對象
type publisher struct {
????timeout time.Duration
? ? subscribers []subscriber
}
type subscriber struct {
????buffer? ?int
? ? ch chan interface{}
}
func newPublisher(timeout time.Duration) *publisher {
????return &publisher{timeout: timeout}
}
func newSubsciber(buf int) *subscriber {
????return &subscriber{buffer: buf, ch:make(chan interface{}, buf)}
}
func (p *publisher)addSubscribe(sub subscriber) {
????p.subscribers =append(p.subscribers, sub)
}
func (p *publisher) close() {
????for i, _ :=range p.subscribers {
????????close(p.subscribers[i].ch)
????????p.subscribers =append(p.subscribers[0:i], p.subscribers[i+1:]...)
????}
}
func (p *publisher)publish(v interface{}) {
????var wg sync.WaitGroup
? ?for _, sub :=range p.subscribers {
????????wg.Add(1)
????????go p.sendTopic(sub, v, &wg)
????}
}
func (p *publisher)sendTopic(sub subscriber, v interface{}, wg *sync.WaitGroup) {
????defer wg.Done()
????select {
????????case sub.ch <- v:
????????case <-time.After(p.timeout):
????}
}
Ok,一個簡單的訂閱發(fā)布模型完成。接下來再增加點(diǎn)難度叁熔,根據(jù)發(fā)布的內(nèi)容委乌,某個subscriber訂閱包含特殊字符串消息的消息,如果沒有則不發(fā)送者疤,該怎么實(shí)現(xiàn)福澡?這里說的是不發(fā)送叠赦,很容易的想到就是在publish的時候做個filter驹马,設(shè)計(jì)成每個subscriber具有自己的filter這樣我們就修改下subsciber結(jié)構(gòu),內(nèi)置函數(shù)成員除秀。每個subsciber具有獨(dú)自的filter糯累,然后在publish的時候,走一下subsciber的filter內(nèi)置函數(shù)册踩,通過則發(fā)送泳姐,否則跳過。
首先修改下subscriber結(jié)構(gòu)
type (
????topicFilterFunc func(v interface{})bool
)
type subscriber struct {
????buffer? ?int
? ? ch chan interface{}
? ??tf topicFilterFunc
}
然后修改subscriber的new函數(shù)暂吉,如下
func newSubsciber(buf int, tf topicFilterFunc) *subscriber {
????return &subscriber{buffer: buf, ch:make(chan interface{}, buf), tf: tf}
}
然后修改publish里的sendTopic函數(shù)
func (p *publisher)sendTopic(sub subscriber, v interface{}, wg *sync.WaitGroup) {
????defer wg.Done(
? ? if sub.tf ==nil || !sub.tf(v) {
????????return
? ? }
????select {
????????case sub.ch <- v:
????????case <-time.After(p.timeout):
????}
}
修改下main還是里的new subscriber的方法胖秒。在實(shí)例化的時候增加topicFilter的具體實(shí)現(xiàn)缎患,如下:
func main() {
????p :=newPublisher(100 *time.Millisecond)
????defer p.close()
????s :=newSubsciber(10, func(v interface{})bool {
????????if s, ok := v.(string); ok {
????????????return strings.Contains(s, "golang")
????????}
????????return false
? ? ?})
????p.addSubscribe(*s)
????p.publish("Hello, golang")
????p.publish("Hello,php")
????p.publish("Hello,java")
????go func() {
????????for msg :=range s.ch {
????????????fmt.Println(msg)
????????}
????}()
????sig :=make(chan os.Signal)
????signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
????select {
????????case <-sig:
????????????fmt.Println("finished!!!")
????}
}
至此,整個例程已經(jīng)帶領(lǐng)大家完成阎肝。