go實(shí)例演示nsq消息隊(duì)列的使用

使用場(chǎng)景

我這兒用一個(gè)實(shí)例來說明nsq的使用卷谈,假設(shè)我有一個(gè)apiServer的程序,它負(fù)責(zé)面向用戶霞篡。有一個(gè)dataServer的程序世蔗,負(fù)責(zé)處理數(shù)據(jù)。apiServer這個(gè)進(jìn)程對(duì)用戶的請(qǐng)求進(jìn)行簡(jiǎn)單的處理朗兵,復(fù)雜的資源處理則去請(qǐng)求dataSever污淋,讓dataServer來處理。
dataServer部署在多臺(tái)主機(jī)余掖,apiServer需要隨機(jī)的向一個(gè)dataServer發(fā)送請(qǐng)求寸爆,來實(shí)現(xiàn)負(fù)載均衡。所有我們需要在apiServer的配置文件中寫入所有dataServer的服務(wù)器地址和端口盐欺。
如果這個(gè)時(shí)候有一個(gè)dataServer掛掉了怎么辦呢而昨?apiServer還是會(huì)隨機(jī)的請(qǐng)求到它,這就會(huì)造成用戶請(qǐng)求失敗找田。
所以我們就需要用到nsq消息隊(duì)列了,而不是將dataServer的所有地址寫到配置中着憨。每個(gè)dataServer進(jìn)程都需要定時(shí)的向apiServer發(fā)送消息,告訴自身的地址漆改,apiServer將地址進(jìn)行篩選,然后就可以用這些地址來請(qǐng)求dataServer了愉棱。

程序演示:

dataServer(生產(chǎn)者):

func HeartBeat(){
    var(
        producer *nsq.Producer
        err error
        ticker *time.Ticker
    )
    ticker = time.NewTicker(time.Duration(conf.HEART_BEAT_TIME)*time.Second)  //創(chuàng)建一個(gè)定時(shí)器顺少,這里的時(shí)間我都寫到配置文件里了梅猿,然后用conf包拿出來袱蚓。我這里是設(shè)置的5秒
    for {
        select {
        case <-ticker.C:
            //創(chuàng)建一個(gè)生產(chǎn)者入蛆,這里的RanDomGetServer()是自定義的一個(gè)工具枫甲,用來隨機(jī)獲取一個(gè)nsqd地址
            if producer, err = nsq.NewProducer(utils.RadomGetNsqd(conf.NSQ_TCP_ADDRS), nsq.NewConfig()); err != nil {
                panic(err)
            }
            //pulish()接的第一個(gè)參數(shù)是topic,這個(gè)也是自己定義(值為data_server_addr),第二個(gè)參數(shù)是要發(fā)送的消息,這里是本機(jī)服務(wù)器地址
            err = producer.Publish(conf.DATA_SERVER_TOPIC, []byte(conf.SERVER_ADDR))
            if err != nil {
                panic(err)
            }
        }
    }
}

utils下的RanDomGetNsq工具:

//從nsqd集群地址中隨機(jī)獲取一個(gè)nsqd地址用于publish,nsqd集群地址是事先寫在配置文件中的
func RadomGetNsqd([]string)string{
    rand.Seed(time.Now().UnixNano())
    return conf.NSQ_TCP_ADDRS[rand.Intn(len(conf.NSQ_TCP_ADDRS))]
}

main函數(shù):

var confPath string

func init(){
    flag.StringVar(&confPath,"c","","config file")
    runtime.GOMAXPROCS(4)
}

func main(){
    flag.Parse()
    conf.Init(confPath)  //加載配置
    gin.SetMode(gin.DebugMode)
    r := gin.New()
    router.Load(r)  //加載路由和中間件
    server := http.Server{
        Addr:conf.SERVER_ADDR,  
        ReadTimeout:time.Duration(conf.SERVER_READ_TIME_OUT)*time.Second,
        WriteTimeout:time.Duration(conf.SERVER_READ_TIME_OUT)*time.Second,
        Handler:r,
    }
    go middleWare.HeartBeat()   //這里開goroutine來發(fā)送心跳包
    if err := server.ListenAndServe();err != nil{
        panic(err)
    }
}

這樣我們就實(shí)現(xiàn)了每五秒發(fā)送一次本機(jī)服務(wù)器地址給apiServer進(jìn)程,apiServer創(chuàng)建生產(chǎn)者就能接受到消息了忙迁。

apiServer(消費(fèi)者):

消費(fèi)者的創(chuàng)建比生產(chǎn)者的創(chuàng)建要麻煩些。
首先是自定義的工具utils.NewConsumer來創(chuàng)建一個(gè)消費(fèi)者:

//這里第一個(gè)參數(shù)是需要綁定topic(data_server_addr),
//第二個(gè)參數(shù)傳入一個(gè)string晨川,這就是創(chuàng)建的ch呀页,topic消息隊(duì)列中的消息會(huì)分發(fā)到每個(gè)ch中.
//每個(gè)消費(fèi)者可以創(chuàng)建不通的ch丸氛,也可以共用一個(gè)ch定续,共用一個(gè)ch, ch的消息會(huì)隨機(jī)發(fā)送給其中一個(gè)消費(fèi)者
//第三個(gè)參數(shù)是處理message的nsq.handler接口,需要實(shí)現(xiàn)一個(gè)HanddleMessage(*nsq.Message)error()方法禾锤。
func NewConsumer(topic string,chanName string,h nsq.Handler)(consumer *nsq.Consumer,err error){
    if consumer,err = nsq.NewConsumer(topic,chanName,nsq.NewConfig());err != nil{
        return nil,err
    }
    consumer.ChangeMaxInFlight(3)   //可以根據(jù)nsqds數(shù)量來配置
    consumer.AddHandler(h)
    err = consumer.ConnectToNSQLookupd(conf.NSQ_LOOKUPD_ADDR)     //todo:讀取配置地址
    if err != nil{
        return nil,err
    }
    return consumer,nil
}

接下來正式處理消息了:

package consumers
import("......")

//創(chuàng)建一個(gè)nsq.handler接口實(shí)例
var ServerConsumerHandler = &ServerConsumer{DataServerAddrs:make(map[string]time.Time)}

//ServerConsumer實(shí)現(xiàn)了nsq.handler接口
type ServerConsumer struct {
    DataServerAddrs  map[string]time.Time  //保存dataServer進(jìn)程發(fā)過來的服務(wù)器地址和接收時(shí)間
    rwLocker sync.RWMutex  //防止多線程同時(shí)讀寫
}

//HandleMessage是nsq,handler接口定義方法私股,必須實(shí)現(xiàn),主要是用來處理傳遞過來的消息
func(h *ServerConsumer)HandleMessage(message *nsq.Message)error{
    if dataServer := string(message.Body);dataServer != ""{
        h.rwLocker.Lock()
        h.DataServerAddrs[dataServer] = time.Now()
        h.rwLocker.Unlock()
    }
    return nil
}

//監(jiān)聽服務(wù)data_server_addr這個(gè)消息隊(duì)列
func MonitorDataServerAddrs(){
    consumer ,err:= utils.NewConsumer("data_server_addr","ch1",ServerConsumerHandler)
    if err != nil{
        panic(err)
    }
    //連接到NSQLookupd恩掷,它底層會(huì)創(chuàng)建連接到每個(gè)nsqd.這樣就可以監(jiān)聽每個(gè)nsqd中的消息
    err = consumer.ConnectToNSQLookupd(conf.NSQ_LOOKUPD_ADDR)   //方法本身會(huì)開一個(gè)goroutine來檢查消息隊(duì)列
    if err != nil{
        panic(err)
    }
}

//刪除過期的服務(wù)器地址
func(h *ServerConsumer)removeExpireDatasServers(){
    for  {
        h.rwLocker.Lock()
        for dataServer,t := range h.DataServerAddrs{
            //只保存10秒之內(nèi)發(fā)送過來的服務(wù)器地址
            if t.Add(10*time.Second).Before(time.Now()){
                delete(h.DataServerAddrs,dataServer)
            }
        }
        h.rwLocker.Unlock()
        time.Sleep(2*time.Second)
    }
}

main函數(shù):

flag.Parse()
    conf.Init(confPath)  //加載配置
    gin.SetMode(gin.DebugMode)
    r := gin.New()
    router.Load(r)
    server := http.Server{
        Addr:"192.168.50.250:8080",
        ReadTimeout:time.Duration(conf.SERVER_READ_TIME_OUT)*time.Second,
        WriteTimeout:time.Duration(conf.SERVER_READ_TIME_OUT)*time.Second,
        Handler:r,
    }
    consumers.MonitorDataServerAddrs()   //不用單獨(dú)開goroutine
    if err := server.ListenAndServe();err != nil{
        panic(err)
    }

這樣我們就實(shí)現(xiàn)了接收服務(wù)器地址倡鲸,并且將過期的地址刪掉,更大程度的保證保存的dataServer服務(wù)器地址是可用的黄娘。

通過nsqadmin來查看消息隊(duì)列情況:

image.png
image.png
image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末旦签,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子寸宏,更是在濱河造成了極大的恐慌,老刑警劉巖偿曙,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件氮凝,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡望忆,警方通過查閱死者的電腦和手機(jī)罩阵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門竿秆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人稿壁,你說我怎么就攤上這事幽钢。” “怎么了傅是?”我有些...
    開封第一講書人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵匪燕,是天一觀的道長。 經(jīng)常有香客問我喧笔,道長帽驯,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任茶没,我火速辦了婚禮瞒窒,結(jié)果婚禮上姥敛,老公的妹妹穿的比我還像新娘。我一直安慰自己嫌术,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開白布牌借。 她就那樣靜靜地躺著度气,像睡著了一般。 火紅的嫁衣襯著肌膚如雪走哺。 梳的紋絲不亂的頭發(fā)上蚯嫌,一...
    開封第一講書人閱讀 51,146評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音丙躏,去河邊找鬼择示。 笑死,一個(gè)胖子當(dāng)著我的面吹牛晒旅,可吹牛的內(nèi)容都是我干的栅盲。 我是一名探鬼主播,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼废恋,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼谈秫!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起鱼鼓,我...
    開封第一講書人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤拟烫,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后迄本,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體硕淑,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了置媳。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片于樟。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖拇囊,靈堂內(nèi)的尸體忽然破棺而出迂曲,到底是詐尸還是另有隱情,我是刑警寧澤寥袭,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布路捧,位于F島的核電站,受9級(jí)特大地震影響纠永,放射性物質(zhì)發(fā)生泄漏鬓长。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一尝江、第九天 我趴在偏房一處隱蔽的房頂上張望涉波。 院中可真熱鬧,春花似錦炭序、人聲如沸啤覆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽窗声。三九已至,卻和暖如春辜纲,著一層夾襖步出監(jiān)牢的瞬間笨觅,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來泰國打工耕腾, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留见剩,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓扫俺,卻偏偏與公主長得像苍苞,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子狼纬,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理羹呵,服務(wù)發(fā)現(xiàn),斷路器疗琉,智...
    卡卡羅2017閱讀 134,651評(píng)論 18 139
  • 1. 組件版本和配置策略 組件版本: Kubernetes 1.10.4 Docker 18.03.1-ce Et...
    Anson前行閱讀 5,770評(píng)論 0 11
  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對(duì)...
    cosWriter閱讀 11,097評(píng)論 1 32
  • 無數(shù)個(gè)黑夜里盈简,想起過去種種都會(huì)不寒而栗凑耻。 想起那個(gè)渣男無數(shù)次腳踩兩只船犯戏,無數(shù)次撒謊的嘴臉,還有無數(shù)次撒謊被...
    二拐吧閱讀 1,251評(píng)論 0 0
  • 中午乘公交車上班拳话,聽一老婦人喊:師傅,我到釘稱人那下种吸,停一下哦 啥 釘稱人那里下 啥弃衍,師傅還是沒聽清 我要去城關(guān)醫(yī)...
    金聲說今事閱讀 116評(píng)論 0 0