Go語言使用NSQ消息隊列

1. 概述

NSQ 是一個基于Go語言的分布式實時消息平臺舶治,它基于MIT開源協(xié)議發(fā)布冬殃,由bitly公司開源出來的一款簡單易用的消息中間件

相關(guān)描述如下:

NSQ是一個實時分布式消息傳遞平臺,旨在大規(guī)模運行较坛,每天處理數(shù)十億條消息捐迫。 它促進了沒有單點故障的分布式和分散式拓撲卢厂,從而實現(xiàn)了容錯能力和高可用性,并提供了可靠的消息傳遞保證河劝。 查看功能和保證壁榕。 從操作上講,NSQ易于配置和部署(所有參數(shù)均在命令行上指定赎瞎,并且編譯的二進制文件不具有運行時相關(guān)性)牌里。 為了獲得最大的靈活性,它與數(shù)據(jù)格式無關(guān)(消息可以是JSON务甥,MsgPack牡辽,協(xié)議緩沖區(qū)或其他任何東西)。 官方提供了Go和Python庫(以及許多其他客戶端庫)敞临,并且态辛,如果您有興趣構(gòu)建自己的庫,則有一個協(xié)議規(guī)范挺尿。

NSQ的特點

  • 支持水平橫向拓展(無縫添加更多節(jié)點到集群中)
  • 部署配置容易,自帶集群管理界面(nsqadmin)
  • 提倡分布式拓撲,減少單點故障,提高容錯
  • 低延遲的消息傳遞
  • 可靠的消交付保障保障
    • 默認中消息都在內(nèi)存中, nsq 內(nèi)部機制保證在程序關(guān)閉時將隊列中的數(shù)據(jù)持久化到硬盤奏黑,重啟后就會恢復(fù)。
    • 消息最少被投遞一次

比較知名和常用的消息處理系統(tǒng)還有

RabbitMQ

KafKa

2. 基礎(chǔ)應(yīng)用場景

我們知道一般的消息隊列(Message Queue) 常用的場景有系統(tǒng)解耦 異步處理 流量削峰 消息通信

3. 相關(guān)文檔

  1. 項目地址 : https://github.com/nsqio/nsq
  2. 項目文檔 英文: https://nsq.io/overview/design.html
  3. 下載地址: https://nsq.io/deployment/installing.html
  4. 客戶端下載地址 : https://nsq.io/clients/client_libraries.html

4.安裝操作

根據(jù)自己的操作平臺下載解壓即可

  • 根據(jù)自己的操作系統(tǒng)下載對應(yīng)的壓縮包文件
  • 解壓壓縮文件
  • 進入解壓后 bin 目錄中

bin 目錄中我們能看到如下文件

-rwxr-xr-x 1 captain 197121 5515776 8月  28 13:46 nsq_stat.exe*
-rwxr-xr-x 1 captain 197121 5823488 8月  28 13:46 nsq_tail.exe*
-rwxr-xr-x 1 captain 197121 5997568 8月  28 13:46 nsq_to_file.exe*
-rwxr-xr-x 1 captain 197121 5923840 8月  28 13:46 nsq_to_http.exe*
-rwxr-xr-x 1 captain 197121 5903872 8月  28 13:46 nsq_to_nsq.exe*
-rwxr-xr-x 1 captain 197121 8787968 8月  28 13:46 nsqadmin.exe*
-rwxr-xr-x 1 captain 197121 9108992 8月  28 13:46 nsqd.exe*
-rwxr-xr-x 1 captain 197121 8384000 8月  28 13:46 nsqlookupd.exe*
-rwxr-xr-x 1 captain 197121 5639680 8月  28 13:46 to_nsq.exe*

5. NSQ服務(wù)端基礎(chǔ)組件介紹

5.1 nsqd

nsqd是一個守護進程負責(zé)接收,排隊,消息傳遞 到客戶端票髓。 它可以獨立運行攀涵,但通常由nsqlookupd實例的群集中配置(在這種情況下,它將能聲明topics和發(fā)現(xiàn)channel)洽沟。 它偵聽兩個TCP端口以故,一個偵聽客戶端,另一個偵聽HTTP API裆操。 它可以選擇在第三個端口上偵聽HTTPS怒详。

5.2 nsqlookupd

nsqlookupd 是管理拓撲信息的守護程序炉媒。 客戶端查詢nsqlookupd以發(fā)現(xiàn)特定 topicnsqd 生產(chǎn)者和 nsqd 節(jié)點廣播topicchannel信息。
有兩個接口:

nsqd用于廣播的TCP接口

客戶端(nsqadmin)執(zhí)行發(fā)現(xiàn)和管理操作的HTTP接口

5.3 nsqadmin

nsqadmin 是一套 WEB管理界面,用來匯集集群的實時統(tǒng)計昆烁,并執(zhí)行不同的管理任務(wù)吊骤。

重點提示:

NSQ還有許多功能組件,我們只介紹這三個(nsqd nsqlookupd nsqadmin)最常用和主要的

NSQ的所有組件都可以通過參數(shù) -- help 查看相關(guān)配置

nsqdnsqlookupd 都有對應(yīng)的http API ,需要使用的時候查看文檔即可

6.操作NSQ

6.1 安裝客戶端

根據(jù)不同的開發(fā)語言選擇不同的客戶端

我們是使用Golang操作所以采用NSQ的官方提供客戶端 go-nsq

go get -u github.com/nsqio/go-nsq

6.1 單機啟動nsqd

默認啟動的nsqd 監(jiān)聽 HTTP對應(yīng)的4151端口和TCP對應(yīng)的4150端口

$ ./nsqd
[nsqd] 2019/11/10 13:41:29.575014 INFO: nsqd v1.2.0 (built w/go1.12.9)
[nsqd] 2019/11/10 13:41:29.593002 INFO: ID: 825
[nsqd] 2019/11/10 13:41:29.597000 INFO: TOPIC(topic_demo): created
[nsqd] 2019/11/10 13:41:29.599998 INFO: TOPIC(topic_demo): new channel(aa)
[nsqd] 2019/11/10 13:41:29.599998 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2019/11/10 13:41:29.644973 INFO: HTTP: listening on [::]:4151
[nsqd] 2019/11/10 13:41:29.644973 INFO: TCP: listening on [::]:4150

我們同樣可以指定端口

$ ./nsqd -http-address="0.0.0.0:8080" -tcp-address="0.0.0.0:8081"
[nsqd] 2019/11/10 14:05:40.726849 INFO: nsqd v1.2.0 (built w/go1.12.9)
[nsqd] 2019/11/10 14:05:40.745838 INFO: ID: 825
[nsqd] 2019/11/10 14:05:40.747836 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2019/11/10 14:05:40.788814 INFO: TCP: listening on [::]:8081
[nsqd] 2019/11/10 14:05:40.788814 INFO: HTTP: listening on [::]:8080

這樣我們就啟動了一個nsqd 的實例

在NSQ中有兩個非常重要的概念 topicChannel

我們看一下文檔中的描述:

每個nsqd實例旨在一次處理多個數(shù)據(jù)流。這些數(shù)據(jù)流稱為“topics”静尼,一個topic具有1個或多個“channels”白粉。每個channel都會收到topic所有消息的副本,實際上下游的服務(wù)是通過對應(yīng)的channel來消費topic消息鼠渺。

topicchannel不是預(yù)先配置的鸭巴。topic在首次使用時創(chuàng)建,方法是將其發(fā)布到指定topic拦盹,或者訂閱指定topic上的channel鹃祖。channel是通過訂閱指定的channel在第一次使用時創(chuàng)建的。

topicchannel都相互獨立地緩沖數(shù)據(jù)普舆,防止緩慢的消費者導(dǎo)致其他chennel的積壓(同樣適用于topic級別)恬口。

channel可以并且通常會連接多個客戶端。假設(shè)所有連接的客戶端都處于準(zhǔn)備接收消息的狀態(tài)沼侣,則每條消息將被傳遞到隨機客戶端祖能。例如:

topic-channel.gif

總而言之,消息是從topic->channel多播的(每個channel都接收該topic的所有消息的副本)华临,但從channel-> 消息消費者 均勻分發(fā)(每個消費者都接收該頻道的一部分消息)芯杀。

6.1.1 單NSQ的使用

編寫一個消息生產(chǎn)者
nsq_single_product.go

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "time"
)
func main() {
    nsqAddr := "127.0.0.1:8081"
    conf :=nsq.NewConfig()
    p ,err := nsq.NewProducer(nsqAddr,conf)
    if err != nil {
        fmt.Println(err)
        return
    }
    for  {
        message := "message :"+ time.Now().Format("2006-01-02 15:04:05")
        fmt.Println(message)
        // 發(fā)送消息
        p.Publish("topic-demo1",[]byte(message))
        time.Sleep(time.Second)
    }

}

編寫一個消息消費者

nsq_single_consumer.go

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
)

type NewHandler struct{}

func (m *NewHandler) HandleMessage(msg *nsq.Message) (err error) {
    addr := msg.NSQDAddress
    message := string(msg.Body)
    fmt.Println(addr, message)
    return
}
func MyConsumers(topic, channel, addr string) {
    conf := nsq.NewConfig()
    new_consumer, err := nsq.NewConsumer(topic, channel, conf)
    if err != nil {

    }
    // 接收消息
    new_handler := &NewHandler{}
    new_consumer.AddHandler(new_handler)
    err = new_consumer.ConnectToNSQD(addr)
    if err != nil {

    }
}
func main() {
    addr := "127.0.0.1:8081"
    go MyConsumers("topic-demo1", "channel-aa", addr)
    // 模擬多個從多個channel去消息
    go MyConsumers("topic-demo1", "channel-bb", addr)
    select {}
}

6.1.2 通過nsqadmin查看

啟動nsqadmin

nsqadmin 的web界面默認監(jiān)聽了 4171端口

$ ./nsqadmin --nsqd-http-address="127.0.0.1:8080"
[nsqadmin] 2019/11/10 16:06:15.842033 INFO: nsqadmin v1.2.0 (built w/go1.12.9)
[nsqadmin] 2019/11/10 16:06:15.858026 INFO: HTTP: listening on [::]:4171

我們在地址欄中輸如

http://127.0.0.1:4171/

就能看看管理界面

nsqadmin
nsqadmin
6.1.3 NSQ的單點結(jié)構(gòu)
nsq.png

6.3 NSQ集群

6.3.1 啟動NSQ各組件

構(gòu)建一個NSQ的基礎(chǔ)拓撲結(jié)構(gòu)

我們可以簡單的說nsqlookupd 是用來管理nsqd實例節(jié)點的

第一步
啟動nsqlookupd

啟動的nsqlookupd 采用了默認配置 通過參數(shù) --help 查看配置項

$ ./nsqlookupd
[nsqlookupd] 2019/11/10 16:40:55.968588 INFO: nsqlookupd v1.2.0 (built w/go1.12.9)
[nsqlookupd] 2019/11/10 16:40:55.983580 INFO: HTTP: listening on [::]:4161
[nsqlookupd] 2019/11/10 16:40:55.984579 INFO: TCP: listening on [::]:4160

第二步

添加nsqd 實例

與前面的啟動不同,需要帶上參數(shù) -lookupd-tcp-address

添加第一個實例

./nsqd -http-address="0.0.0.0:8080" -tcp-address="0.0.0.0:8081" -lookupd-tcp-address="127.0.0.1:4160"

添加第二個實例

 ./nsqd -http-address="0.0.0.0:8090" -tcp-address="0.0.0.0:8091" -lookupd-tcp-address="127.0.0.1:4160"

第三步

啟動nsqadmin

與前面的也不同了需要帶上參數(shù) -lookupd-http-address

$ ./nsqadmin -lookupd-http-address="127.0.0.1:4161"

在瀏覽器中訪問nsqadmin

nsqadmin-nodes
nsqadmin-lookup
6.3.2 NSQ的拓撲結(jié)構(gòu)
nsqlookupd
  1. 在集群模式中,消息生產(chǎn)方發(fā)送消息給任意一個nsqd 實例都不影響
  2. 消息的消費者需要通過nsqlookupd 查詢nsqd的地址后才能獲取消息
  3. 增加nsqd 節(jié)點完全不影響其他的節(jié)點
6.3.3 Golang使用NSQ代碼示例

消息生產(chǎn)者

nsq_cluster_product.go

package main

import (
    "bufio"
    "fmt"
    "github.com/nsqio/go-nsq"
    "log"
    "os"
    "strings"
)

var pro *nsq.Producer

func NewPro(addr string) (err error) {
    conf := nsq.NewConfig()
    pro, err = nsq.NewProducer(addr, conf)
    if err != nil {
        log.Println(err)
        return err
    }
    return nil
}
func main() {
    nsqAddr := "127.0.0.1:8091"
    err := NewPro(nsqAddr)
    if err != nil {
        fmt.Println(err)
        return
    }else{
        fmt.Println("connect 127.0.0.1:8091 success")
    }
    // 讀取標(biāo)準(zhǔn)輸入
    reader := bufio.NewReader(os.Stdin)
    for {
        // 讀取所有內(nèi)容直到遇見回車(\n)
        data, err := reader.ReadString('\n')
        if err != nil {
            fmt.Println("read data from stdin is field : ", err)
            continue
        }
        // 當(dāng)輸入q的時候退出
        data = strings.TrimSpace(data)
        if strings.ToUpper(data) == "Q" {
            break
        }
        err = pro.Publish("topic-demo1", []byte(data))
        if err != nil {
            fmt.Println("nsq publish is field ", err)
            continue
        }
    }
    fmt.Println("exit !")
}

消息消費者

nsq_cluster_consumer.go

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
)

type Handler struct{}

func (m *Handler) HandleMessage(msg *nsq.Message) (err error) {
    addr := msg.NSQDAddress
    message := string(msg.Body)
    fmt.Println(addr, message)
    return
}
func NewConsumers(t string, c string, addr string) error {
    conf := nsq.NewConfig()
    nc, err := nsq.NewConsumer(t, c, conf)
    if err != nil {
        fmt.Println("create consumer failed err ", err)
        return err
    }
    consumer := &Handler{}
    nc.AddHandler(consumer)
    // 連接nsqlookupd
    if err:= nc.ConnectToNSQLookupd(addr);err!=nil{
        fmt.Println("connect nsqlookupd failed ", err)
        return err
    }
    return nil
}
func main() {
    // 這是nsqlookupd的地址
    addr := "127.0.0.1:4161"
    err := NewConsumers("topic-demo1", "channel-aa", addr)
    if err != nil {
        fmt.Println("new nsq consumer failed", err)
        return
    }
    select {}
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市雅潭,隨后出現(xiàn)的幾起案子揭厚,更是在濱河造成了極大的恐慌,老刑警劉巖扶供,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件筛圆,死亡現(xiàn)場離奇詭異,居然都是意外死亡椿浓,警方通過查閱死者的電腦和手機太援,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來扳碍,“玉大人提岔,你說我怎么就攤上這事∷癯ǎ” “怎么了碱蒙?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我赛惩,道長哀墓,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任喷兼,我火速辦了婚禮篮绰,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘季惯。我一直安慰自己吠各,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布勉抓。 她就那樣靜靜地躺著走孽,像睡著了一般。 火紅的嫁衣襯著肌膚如雪琳状。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天盒齿,我揣著相機與錄音念逞,去河邊找鬼。 笑死边翁,一個胖子當(dāng)著我的面吹牛翎承,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播符匾,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼叨咖,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了啊胶?” 一聲冷哼從身側(cè)響起甸各,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎焰坪,沒想到半個月后趣倾,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡某饰,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年儒恋,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片黔漂。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡诫尽,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出炬守,到底是詐尸還是另有隱情牧嫉,我是刑警寧澤,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布劳较,位于F島的核電站驹止,受9級特大地震影響浩聋,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜臊恋,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一衣洁、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧抖仅,春花似錦坊夫、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至放吩,卻和暖如春智听,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背渡紫。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工到推, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人惕澎。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓莉测,卻偏偏與公主長得像,于是被迫代替她去往敵國和親唧喉。 傳聞我的和親對象是個殘疾皇子捣卤,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容

  • 1. 介紹 最近在研究一些消息中間件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等八孝。NSQ是一個基...
    aoho閱讀 8,943評論 1 16
  • 1.安裝 根據(jù)官方安裝指引頁面下載最新穩(wěn)定版的二進制包https://nsq.io/deployment/inst...
    渺小Y閱讀 7,028評論 1 4
  • 前言 好久不見董朝。 從這篇文章開始,我將帶大家走進消息中間件的世界干跛。 消息中間件本質(zhì)上就是一種很簡單的數(shù)據(jù)結(jié)構(gòu)——隊...
    柳樹之閱讀 3,505評論 3 23
  • 上一篇: Go消息中間件Nsq系列(四)------apps/nsq_to_file源碼閱讀 1. Topic/C...
    Yangwenliu閱讀 803評論 0 3
  • 在城市車水馬龍 人潮擁擠中遇見你 我腦子迷糊得像鄉(xiāng)野的泥潭 你跟所有人都是不一樣的 或許用一些詞語才能形容你 王子...
    江小昨閱讀 289評論 1 4