-
Nsq簡介
-
1.1. 是一個基于Go語言的分布式實時消息平臺蝗蛙,它基于MIT開源協(xié)議發(fā)布,由bitly公司開源出來的一款簡單易用的消息中間件记餐。
1.2. 去中心化,分布式易部署,易水平擴展,高可用,消除單點故障,內(nèi)置服務發(fā)現(xiàn),并提供相對友好的Web管理UI
1.3. 官方提供詳細架構(gòu)設(shè)計說明, 使用文檔.性能測試等
1.4. 多語言支持(Go,Python,Java等)
-
Nsq 三大組件
-
2.1 nsqd:
負責接收消息,存儲消息,分發(fā)消息給客戶端,nsqd可以單獨部署,也可以多節(jié)點部署,主要監(jiān)聽了兩個端口,一個用來服務客戶端(4150),一個用來提供api服務(4151).當然也可以配置監(jiān)聽https(4152)端口
2.2 nsqlookupd:
主要負責服務發(fā)現(xiàn),nsqd的心跳、狀態(tài)監(jiān)測洲守,給客戶端疑务、nsqadmin提供nsqd地址與狀態(tài),主要監(jiān)聽端口(4160)服務客戶端,端口(4161)提供api服務.
2.3 nsqadmin:
是nsq的web后臺管理,比如節(jié)點管理,topic管理,實時消息狀態(tài)等,主要監(jiān)聽端口(4171)提供web服務
-
一些注意的問題
-
3.1 Producer與nsqd是1:1關(guān)系,nsqd與topic是1:N關(guān)系,nsqlookupd與nqsd關(guān)系可以是M:N
3.2 Consumer可以指定單個nsqd,或者多個nsqd地址,也可以通過單個或者多個nsqlookupd去匹配對應的topic+channel的nsqd去消費消息
3.3 Consumer第一次通過nsqlookupd,會重試三次, 然后就是根據(jù)LookupdPollInterval(60s)時間輪詢或者防抖動算法LookupdPollJitter(0.3默認)去進行重試查詢
3.4 Producer發(fā)布消息沒有topic會新建,如果先Consumer使用nsqlookupd去尋找消費,也就是沒有創(chuàng)建topic之前會無法連接,重試3.3步驟
3.5 Consumer退出,channel不會自動刪除, 多個nsqd服務都有相同的topic的時候,需要修改默認的config.MaxInflight才能連接,代表一次性可以接受多少條消息.
3.6 channel和topic的命名都有限制,正則匹配如下^[\.a-zA-Z0-9_-]+(#ephemeral)?$
3.7 多個Consumer消費channel數(shù)據(jù)是隨機的,無序. 消息至少投遞一次,可能會重復投遞
3.8 nsqd之間不會擴散消息. 但是topic的消息會分發(fā)給下面所有channel,但一個channel如果有多個消費者,消息會隨機發(fā)送給其中一個消費者
3.9 nsq延時消息最長是一小時(60min)
-
有贊自研版Nsq
-
4.1 有贊的自研版 NSQ 在高可用性以及負載均衡方面進行了改造梗醇,自研版的 nsqd 中引入了數(shù)據(jù)分區(qū)以及副本知允,副本保存在不同的 nsqd 上,達到容災目的叙谨。此外温鸽,自研版 NSQ 在原有 Protocol Spec 基礎(chǔ)上進行了拓展,支持基于分區(qū)的消息生產(chǎn)手负、消費涤垫,以及基于消息分區(qū)的有序消費,以及消息追蹤功能竟终。
-
Nsq 在Go簡單使用
-
5.1 docker安裝nsq,具體使用以下docker-compose.yml,然后再
docker-compose up -d
啟動,不需要的話就是用docker-compose down
停止并刪除容器
version: '3'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- "4160:4160"
- "4161:4161"
nsqd:
image: nsqio/nsq
# 廣播地址不填的話默認就是oshostname, 那樣子在程序lookupd 連接不上
command: /nsqd --broadcast-address=192.168.1.103 --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- "4150:4150"
- "4151:4151"
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
ports:
- "4171:4171"
5.2 Go的測試代碼如下
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"log"
"math"
"time"
)
// nsq demo
var topicName = "nsq_test"
func main() {
host := "localhost:4150"
discoverAddr := "localhost:4161"
go producer(host)
go consumer_nsqd(discoverAddr)
go consumer_nsqd2(host)
for{}
}
func producer(addr string){
// 1:1 nsqd:producer 一比一的關(guān)系
producer,err := nsq.NewProducer(addr,nsq.NewConfig())
if err != nil {
log.Fatal(err)
}
for i := 1; i< math.MaxInt64 ; i++ {
// 直接發(fā)布消息
_ = producer.Publish(topicName,[]byte(fmt.Sprintf("%s%d",topicName,i+1)))
// 發(fā)布延時消息
//_ = producer.DeferredPublish()
// 發(fā)送消息數(shù)組
//producer.MultiPublish()
// 還有異步發(fā)送, doneChan通知
//producer.DeferredPublishAsync()
time.Sleep(time.Second*10)
}
}
// 通過服務發(fā)現(xiàn), 根據(jù)topicName,Channel作為key去尋找對應的nsqd去連接
func consumer_nsqd(addr string){
consumer,err := nsq.NewConsumer(topicName,"default",nsq.NewConfig())
if err != nil {
log.Println(err)
}
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error{
log.Println("nsqd1",message.Timestamp,message.NSQDAddress,string(message.Body))
return nil
}))
err = consumer.ConnectToNSQLookupd(addr)
if err != nil {
log.Println(err)
}
}
// 直連方式
func consumer_nsqd2(addr string){
consumer,err := nsq.NewConsumer(topicName,"spec",nsq.NewConfig())
if err != nil {
log.Fatal(err)
}
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error{
log.Println("nsqd2",message.Timestamp,message.NSQDAddress,string(message.Body))
return nil
}))
err = consumer.ConnectToNSQD(addr)
if err != nil {
log.Fatal(err)
}
}
輸出結(jié)果:
2019/06/26 23:16:40 nsqd2 1561562080041909815 localhost:4150 nsq_test481
2019/06/26 23:16:40 nsqd2 1561562090046390841 localhost:4150 nsq_test482
2019/06/26 23:16:40 nsqd2 1561562100049180666 localhost:4150 nsq_test483
2019/06/26 23:16:40 nsqd2 1561562110055842132 localhost:4150 nsq_test484
2019/06/26 23:16:40 nsqd2 1561562120060210535 localhost:4150 nsq_test485
2019/06/26 23:16:40 nsqd2 1561562130066839086 localhost:4150 nsq_test486
2019/06/26 23:16:40 nsqd2 1561562140073121280 localhost:4150 nsq_test487
2019/06/26 23:16:40 nsqd2 1561562150076661111 localhost:4150 nsq_test488
2019/06/26 23:16:40 nsqd2 1561562160078242317 localhost:4150 nsq_test489
2019/06/26 23:16:40 nsqd2 1561562170081163554 localhost:4150 nsq_test490
2019/06/26 23:16:40 nsqd2 1561562180083818010 localhost:4150 nsq_test491
2019/06/26 23:16:40 nsqd2 1561562190088318866 localhost:4150 nsq_test492
2019/06/26 23:16:40 nsqd2 1561562200159782512 localhost:4150 nsq_test2