NSQ是一個(gè)基于Go語言的分布式實(shí)時(shí)消息平臺(tái),它基于MIT開源協(xié)議發(fā)布畜吊,代碼托管在GitHub步清。NSQ可用于大規(guī)模系統(tǒng)中的實(shí)時(shí)消息服務(wù),并且每天能夠處理數(shù)億級(jí)別的消息榜晦,其設(shè)計(jì)目標(biāo)是為在分布式環(huán)境下運(yùn)行的去中心化服務(wù)提供一個(gè)強(qiáng)大的基礎(chǔ)架構(gòu)冠蒋。NSQ具有分布式、去中心化的拓?fù)浣Y(jié)構(gòu)乾胶,該結(jié)構(gòu)具有無單點(diǎn)故障抖剿、故障容錯(cuò)灰追、高可用性以及能夠保證消息的可靠傳遞的特征棘捣。NSQ非常容易配置和部署介陶,且具有最大的靈活性看幼,支持眾多消息協(xié)議贸人。另外艘包,官方還提供了拆箱即用Go和Python庫褒颈。
部署
安裝步驟
# 下載
wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
# 解壓
tar -zxvf nsq-1.1.0.linux-amd64.go1.10.3.tar.gz
# 啟動(dòng)服務(wù)
cd nsq-1.1.0.linux-amd64.go1.10.3/bin/
nohup ./nsqlookupd > /dev/null 2>&1 &
nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &
使用
1廓推、創(chuàng)建一個(gè)test主題甥温,并發(fā)送一個(gè)hello world消息
curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'
2锻煌、瀏覽器訪問NSQ的管理界面: http://127.0.0.1:4171/
image.png
3 消費(fèi)test主題的消息
$ ./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
2019/03/13 11:09:49 INF 1 [test/nsq_to_file] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2019/03/13 11:09:49 INF 1 [test/nsq_to_file] (jinchunguang-TM1701:4150) connecting to nsqd
2019/03/13 11:09:49 INFO: opening /tmp/test.jinchunguang-TM1701.2019-03-13_11.log
2019/03/13 11:09:49 syncing 1 records to disk
$ cat /tmp/test.jinchunguang-TM1701.2019-03-13_11.log
hello world
客戶端
生產(chǎn)者可使用PHP curl 直接處理,github有許多現(xiàn)成的客戶端可以使用
<?php
$msg="最簡單的發(fā)送消息方式!";
$url= "http://127.0.0.1:4151/pub?topic=test";
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $url);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, "POST");
curl_setopt($ch, CURLOPT_POSTFIELDS, $msg);
curl_setopt($ch, CURLOPT_HTTPHEADER, array(
'Content-Type: text/html; charset=utf-8',
'Content-Length: ' . strlen($msg))
);
$output = curl_exec($ch);
if($output === FALSE ){
echo "CURL Error:".curl_error($ch);
}
使用go來處理
代碼目錄結(jié)構(gòu)如下(示例項(xiàng)目,通過gin封裝的姻蚓,學(xué)習(xí)使用)
image.png
nsq.go 簡單封裝nsq操作
package servers
import (
"fmt"
"github.com/nsqio/go-nsq"
)
// 默認(rèn)配置
const HOST = "127.0.0.1:4150"
const TOPIC_NAME = "test"
const CHANNEL_NAME = "test-channel"
// 啟動(dòng)Nsq
func NsqRun() {
Consumer()
}
// nsq發(fā)布消息
func Producer(msgBody string) {
// 新建生產(chǎn)者
p, err := nsq.NewProducer(HOST, nsq.NewConfig())
if err != nil {
panic(err)
}
// 發(fā)布消息
if err := p.Publish(TOPIC_NAME, []byte(msgBody)); err != nil {
panic(err)
}
}
// nsq訂閱消息
type ConsumerT struct{}
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
fmt.Println(string(msg.Body))
return nil
}
func Consumer() {
c, err := nsq.NewConsumer(TOPIC_NAME, CHANNEL_NAME, nsq.NewConfig()) // 新建一個(gè)消費(fèi)者
if err != nil {
panic(err)
}
c.AddHandler(&ConsumerT{}) // 添加消息處理
if err := c.ConnectToNSQD(HOST); err != nil { // 建立連接
panic(err)
}
}
main.go 項(xiàng)目入口文件
package main
import (
"github.com/gin-gonic/gin"
"wages_service/servers"
"wages_service/tasks"
)
var GinEngine *gin.Engine
func main() {
// 運(yùn)行 task
tasks.SyncDataRun()
// 運(yùn)行 nsq
servers.NsqRun()
// 運(yùn)行server
servers.HttpRun(GinEngine)
}
nsq_producer.go 我們測試用來發(fā)送消息的
package main
import "wages_service/servers"
func main() {
// 發(fā)送消息到nsq
servers.Producer("hello world!!!")
}
運(yùn)行測試
- 啟動(dòng)項(xiàng)目
image.png
-
推送消息
image.png 查看結(jié)果
image.png