0育苟、前面的話
為什么需要消息隊列抖僵?下面通過幾個場景來簡單解釋,具體可以參看其他資料此處不再展開镰绎。
- 應(yīng)用解耦
- 流量削峰
- 異步處理
1脓斩、NSQ介紹
1.1 NSQ是什么?
一句話NSQ是一種消息隊列畴栖,和常見的消息隊列如Kafka随静,RocketMQ所扮演的角色一致,NSQ是golang生態(tài)中經(jīng)常使用的消息隊列組件吗讶。常被用在應(yīng)用解耦燎猛,流量削峰等場景中。
1.2 名詞解釋:
Topic主題: 同一主題的消息歸為一類照皆,功能類似于大部分消息隊列中的含義重绷。
Channel: 同一主題下可以再細分為不同的channel通道,每個channel通道中存放的消息都是相同的相互獨立膜毁。
Node節(jié)點: 一個nsqd啟動實例為一個node節(jié)點昭卓。
Message: 具體業(yè)務(wù)數(shù)據(jù)
2、NSQ消息隊列優(yōu)缺點
2.1 優(yōu)點:
NSQ相比起其他消息隊列組件部署非常簡單瘟滨,極簡的配置候醒。
NSQ支持橫向擴展,可以動態(tài)增加node節(jié)點杂瘸。
官方給的結(jié)論-高性能火焰。
消息最少被消費一次不丟失。
提供內(nèi)存和磁盤的組合消息隊列 --mem-queue-size 來設(shè)置消息隊列存儲在內(nèi)存胧沫,磁盤或者兩者昌简。
2.2 缺點:
NSQ不保證消息的有序性,這點和Kafka類的有序消息隊列不同绒怨。
Node節(jié)點之間相互獨立纯赎,沒有消息的復(fù)制。這點既是優(yōu)點也是缺點吧南蹂。
3犬金、NSQ組件構(gòu)成
3.1 nsqlookupd:
主要負責(zé)管理整體拓撲信息。
客戶端可以通過向nsqlookupd守護進程請求查詢信息,來獲取topic晚顷,channel所在的nsqd服務(wù)節(jié)點地址峰伙。
nsqd服務(wù)節(jié)點向nsqlookupd服務(wù)廣播節(jié)點本身所包含的地址topic,channel信息该默。
nsqadmin界面服務(wù)向nsqlookupd服務(wù)請求展示所需的信息瞳氓。
特點:
唯一性:在一個NSQ消息隊列組件中可以只有一個nsqlookupd服務(wù),也可以有多個栓袖,相互之間獨立匣摘。
去中心化:nsqlookupd服務(wù)異常時,不影響真正的消息服務(wù)(nsqd)裹刮。
3.2 nsqd:
真正的負責(zé)接收消息音榜,入隊列,投遞消息的服務(wù)捧弃。
特點:
訂閱同一個topic赠叼,同一個channel的消費者使用負載均衡策略,見下圖违霞。
確保消息隊列中的消息至少被消費一次嘴办。
內(nèi)存+磁盤的組合使用,nsqd可以通過--mem-queue-size來限定消息隊列在內(nèi)存中的大小葛家,一旦消息隊列中的message數(shù)量超出限定大小户辞,則會序列化到磁盤中泌类。
topic這channel無需事先創(chuàng)建癞谒,在客戶端中的生產(chǎn)者和消費者首次使用時創(chuàng)建。
3.3 nsqadmin:
整體webUI界面刃榨,可以通過該界面來管理NSQ集群弹砚,和顯示NSQ組件內(nèi)部topic,channel枢希,node桌吃,messege的數(shù)量狀態(tài)等信息。
3.4 其他工具:
nsq_to_file苞轿、nsq_tail茅诱、nsq_to_http、nsq_to_nsq等搬卒,具體工具使用功能可以自行百度瑟俭。
4、NSQ部署
兩種部署方式:一種是直連模式契邀,客戶端直接連接nsqd服務(wù)進行message的消費摆寄。另外一種是nsqlookupd模式,客戶端通過連接nsqlookupd獲取topic所在nsqd服務(wù)的地址進行message的消費。
4.1 直連模式:
消費者和生產(chǎn)者直接連接nsqd服務(wù)進行生產(chǎn)消費微饥,nsqd服務(wù)之間相互獨立逗扒。
4.2 nsqlookupd模式:
生產(chǎn)則直連nsqd進行生產(chǎn),消費者通過連接nsqlookupd服務(wù)查詢nsqd服務(wù)地址之后連接nsqd服務(wù)進行消費欠橘。
4.3 案例
這里選用nsqlookupd模式來實際部署案例
4.3.1 下載部署包
這里為了演示方便矩肩,直接下載的Windows平臺的二進制包,開箱即用简软。
4.3.2 nsqlookupd
部署nsqlookupd 直接運行nsqlookupd即可蛮拔,默認監(jiān)聽tcp端口4160,http端口4161
4.3.3 nsqd
部署nsqd 參數(shù)中傳入nsqlookupd的tcp監(jiān)聽IP地址和端口痹升,服務(wù)默認監(jiān)聽tcp端口4150建炫,http端口4151
4.3.4 nsqadmin
部署nsqadmin webUI界面,服務(wù)默認監(jiān)聽http端口4171
5疼蛾、NSQ生產(chǎn)者消費者使用
5.1 現(xiàn)有工具
生產(chǎn)者可以直接發(fā)送http請求到nsqd服務(wù)來生產(chǎn)消息
curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
消費者請求nsqlookupd查找topic所在的nsqd節(jié)點肛跌,并進行消費
nsq_tail --topic=test --lookupd-http-address=127.0.0.1:4161
5.2 go-nsq
golang官方提供了go-nsq包進行客戶端生產(chǎn)消費,這里簡單實用
生產(chǎn)者代碼:
var (
tcpnsqaddr = "127.0.0.1:4150"
)
func main() {
config := nsq.NewConfig()
producer,err := nsq.NewProducer(tcpnsqaddr,config)
if err != nil{
fmt.Println("nsq.NewProducer")
return
}
for i:= 0;i<10;i++{
producer.Publish("kkk",[]byte("this is first " + strconv.Itoa(i)))
time.Sleep(time.Second)
}
producer.Ping()
producer.Stop()
}
消費者代碼:
type MyHandle int
func (c MyHandle)HandleMessage(message *nsq.Message) error{
fmt.Println(string(message.Body))
return nil
}
func main() {
config := nsq.NewConfig()
com,_ := nsq.NewConsumer("kkk","ccc",config)
var tmp MyHandle
com.AddHandler(tmp)
com.ConnectToNSQLookupd("127.0.0.1:4161")
time.Sleep(time.Second * 100)
}