使用場(chǎng)景
我這兒用一個(gè)實(shí)例來說明nsq的使用卷谈,假設(shè)我有一個(gè)apiServer的程序,它負(fù)責(zé)面向用戶霞篡。有一個(gè)dataServer的程序世蔗,負(fù)責(zé)處理數(shù)據(jù)。apiServer這個(gè)進(jìn)程對(duì)用戶的請(qǐng)求進(jìn)行簡(jiǎn)單的處理朗兵,復(fù)雜的資源處理則去請(qǐng)求dataSever污淋,讓dataServer來處理。
dataServer部署在多臺(tái)主機(jī)余掖,apiServer需要隨機(jī)的向一個(gè)dataServer發(fā)送請(qǐng)求寸爆,來實(shí)現(xiàn)負(fù)載均衡。所有我們需要在apiServer的配置文件中寫入所有dataServer的服務(wù)器地址和端口盐欺。
如果這個(gè)時(shí)候有一個(gè)dataServer掛掉了怎么辦呢而昨?apiServer還是會(huì)隨機(jī)的請(qǐng)求到它,這就會(huì)造成用戶請(qǐng)求失敗找田。
所以我們就需要用到nsq消息隊(duì)列了,而不是將dataServer的所有地址寫到配置中着憨。每個(gè)dataServer進(jìn)程都需要定時(shí)的向apiServer發(fā)送消息,告訴自身的地址漆改,apiServer將地址進(jìn)行篩選,然后就可以用這些地址來請(qǐng)求dataServer了愉棱。
程序演示:
dataServer(生產(chǎn)者):
func HeartBeat(){
var(
producer *nsq.Producer
err error
ticker *time.Ticker
)
ticker = time.NewTicker(time.Duration(conf.HEART_BEAT_TIME)*time.Second) //創(chuàng)建一個(gè)定時(shí)器顺少,這里的時(shí)間我都寫到配置文件里了梅猿,然后用conf包拿出來袱蚓。我這里是設(shè)置的5秒
for {
select {
case <-ticker.C:
//創(chuàng)建一個(gè)生產(chǎn)者入蛆,這里的RanDomGetServer()是自定義的一個(gè)工具枫甲,用來隨機(jī)獲取一個(gè)nsqd地址
if producer, err = nsq.NewProducer(utils.RadomGetNsqd(conf.NSQ_TCP_ADDRS), nsq.NewConfig()); err != nil {
panic(err)
}
//pulish()接的第一個(gè)參數(shù)是topic,這個(gè)也是自己定義(值為data_server_addr),第二個(gè)參數(shù)是要發(fā)送的消息,這里是本機(jī)服務(wù)器地址
err = producer.Publish(conf.DATA_SERVER_TOPIC, []byte(conf.SERVER_ADDR))
if err != nil {
panic(err)
}
}
}
}
utils下的RanDomGetNsq工具:
//從nsqd集群地址中隨機(jī)獲取一個(gè)nsqd地址用于publish,nsqd集群地址是事先寫在配置文件中的
func RadomGetNsqd([]string)string{
rand.Seed(time.Now().UnixNano())
return conf.NSQ_TCP_ADDRS[rand.Intn(len(conf.NSQ_TCP_ADDRS))]
}
main函數(shù):
var confPath string
func init(){
flag.StringVar(&confPath,"c","","config file")
runtime.GOMAXPROCS(4)
}
func main(){
flag.Parse()
conf.Init(confPath) //加載配置
gin.SetMode(gin.DebugMode)
r := gin.New()
router.Load(r) //加載路由和中間件
server := http.Server{
Addr:conf.SERVER_ADDR,
ReadTimeout:time.Duration(conf.SERVER_READ_TIME_OUT)*time.Second,
WriteTimeout:time.Duration(conf.SERVER_READ_TIME_OUT)*time.Second,
Handler:r,
}
go middleWare.HeartBeat() //這里開goroutine來發(fā)送心跳包
if err := server.ListenAndServe();err != nil{
panic(err)
}
}
這樣我們就實(shí)現(xiàn)了每五秒發(fā)送一次本機(jī)服務(wù)器地址給apiServer進(jìn)程,apiServer創(chuàng)建生產(chǎn)者就能接受到消息了忙迁。
apiServer(消費(fèi)者):
消費(fèi)者的創(chuàng)建比生產(chǎn)者的創(chuàng)建要麻煩些。
首先是自定義的工具utils.NewConsumer來創(chuàng)建一個(gè)消費(fèi)者:
//這里第一個(gè)參數(shù)是需要綁定topic(data_server_addr),
//第二個(gè)參數(shù)傳入一個(gè)string晨川,這就是創(chuàng)建的ch呀页,topic消息隊(duì)列中的消息會(huì)分發(fā)到每個(gè)ch中.
//每個(gè)消費(fèi)者可以創(chuàng)建不通的ch丸氛,也可以共用一個(gè)ch定续,共用一個(gè)ch, ch的消息會(huì)隨機(jī)發(fā)送給其中一個(gè)消費(fèi)者
//第三個(gè)參數(shù)是處理message的nsq.handler接口,需要實(shí)現(xiàn)一個(gè)HanddleMessage(*nsq.Message)error()方法禾锤。
func NewConsumer(topic string,chanName string,h nsq.Handler)(consumer *nsq.Consumer,err error){
if consumer,err = nsq.NewConsumer(topic,chanName,nsq.NewConfig());err != nil{
return nil,err
}
consumer.ChangeMaxInFlight(3) //可以根據(jù)nsqds數(shù)量來配置
consumer.AddHandler(h)
err = consumer.ConnectToNSQLookupd(conf.NSQ_LOOKUPD_ADDR) //todo:讀取配置地址
if err != nil{
return nil,err
}
return consumer,nil
}
接下來正式處理消息了:
package consumers
import("......")
//創(chuàng)建一個(gè)nsq.handler接口實(shí)例
var ServerConsumerHandler = &ServerConsumer{DataServerAddrs:make(map[string]time.Time)}
//ServerConsumer實(shí)現(xiàn)了nsq.handler接口
type ServerConsumer struct {
DataServerAddrs map[string]time.Time //保存dataServer進(jìn)程發(fā)過來的服務(wù)器地址和接收時(shí)間
rwLocker sync.RWMutex //防止多線程同時(shí)讀寫
}
//HandleMessage是nsq,handler接口定義方法私股,必須實(shí)現(xiàn),主要是用來處理傳遞過來的消息
func(h *ServerConsumer)HandleMessage(message *nsq.Message)error{
if dataServer := string(message.Body);dataServer != ""{
h.rwLocker.Lock()
h.DataServerAddrs[dataServer] = time.Now()
h.rwLocker.Unlock()
}
return nil
}
//監(jiān)聽服務(wù)data_server_addr這個(gè)消息隊(duì)列
func MonitorDataServerAddrs(){
consumer ,err:= utils.NewConsumer("data_server_addr","ch1",ServerConsumerHandler)
if err != nil{
panic(err)
}
//連接到NSQLookupd恩掷,它底層會(huì)創(chuàng)建連接到每個(gè)nsqd.這樣就可以監(jiān)聽每個(gè)nsqd中的消息
err = consumer.ConnectToNSQLookupd(conf.NSQ_LOOKUPD_ADDR) //方法本身會(huì)開一個(gè)goroutine來檢查消息隊(duì)列
if err != nil{
panic(err)
}
}
//刪除過期的服務(wù)器地址
func(h *ServerConsumer)removeExpireDatasServers(){
for {
h.rwLocker.Lock()
for dataServer,t := range h.DataServerAddrs{
//只保存10秒之內(nèi)發(fā)送過來的服務(wù)器地址
if t.Add(10*time.Second).Before(time.Now()){
delete(h.DataServerAddrs,dataServer)
}
}
h.rwLocker.Unlock()
time.Sleep(2*time.Second)
}
}
main函數(shù):
flag.Parse()
conf.Init(confPath) //加載配置
gin.SetMode(gin.DebugMode)
r := gin.New()
router.Load(r)
server := http.Server{
Addr:"192.168.50.250:8080",
ReadTimeout:time.Duration(conf.SERVER_READ_TIME_OUT)*time.Second,
WriteTimeout:time.Duration(conf.SERVER_READ_TIME_OUT)*time.Second,
Handler:r,
}
consumers.MonitorDataServerAddrs() //不用單獨(dú)開goroutine
if err := server.ListenAndServe();err != nil{
panic(err)
}
這樣我們就實(shí)現(xiàn)了接收服務(wù)器地址倡鲸,并且將過期的地址刪掉,更大程度的保證保存的dataServer服務(wù)器地址是可用的黄娘。