在web應(yīng)用業(yè)務(wù)中,經(jīng)常會(huì)遇到類(lèi)似異步處理芭挽,秒殺辜羊,排隊(duì)等邏輯,這時(shí)利用消息隊(duì)列來(lái)完成這樣的功能是一個(gè)明智的選擇斜友;
在業(yè)務(wù)規(guī)模較小的應(yīng)用中我們可以使用redis中的list數(shù)據(jù)類(lèi)型炸裆,在大規(guī)模業(yè)務(wù)中我們可以引入rocketmq等,尤其在業(yè)務(wù)重構(gòu)時(shí)需要將原有的redis消息隊(duì)列實(shí)現(xiàn)改成rocketmq鲜屏,為了保證既有業(yè)務(wù)不受影響又不引入新的BUG烹看,是一件非常苦惱的事情洛史;
今天就介紹一個(gè)包惯殊,可以方便的解決上述問(wèn)題,他是 Orange框架 中的一個(gè)子包也殖,對(duì)常用的mq操作進(jìn)行了封裝土思,即使沒(méi)有通過(guò)該框架開(kāi)發(fā)也能直接使用該子包;通過(guò) go moduls 特性進(jìn)行按需加載忆嗜;
讓我們更新關(guān)注業(yè)務(wù)本身浪漠,各種客戶端/sdk接入交給既有封裝即可;
準(zhǔn)備工作:
- 搭建好rocketmq服務(wù)霎褐,啟動(dòng) mqnamesrv和至少1個(gè)mqbroker節(jié)點(diǎn)址愿;
- 在 GOPATH 目錄下創(chuàng)建一個(gè)demo目錄并在目錄中創(chuàng)建一個(gè)main.go,寫(xiě)入如下demo代碼:
package main
import (
"gitee.com/zhucheer/orange/queue"
"time"
"fmt"
)
func main(){
// 注冊(cè)生產(chǎn)者 填入broker節(jié)點(diǎn),group名稱(chēng),重試次數(shù)信息
mqProducerClient := queue.RegisterRocketProducerMust([]string{"192.168.137.100:9876"}, "test", 1)
// 注冊(cè)消費(fèi)者 填入broker節(jié)點(diǎn),group名稱(chēng)信息
mqConsumerClient:= queue.RegisterRocketConsumerMust([]string{"192.168.137.100:9876"}, "test")
go func() {
for i:=0;i<10;i++{
// 向隊(duì)列發(fā)送一條消息 填入消息隊(duì)列topic和消息體信息
ret,_:=mqProducerClient.SendMsg("topicTest", "Hello mq~~")
fmt.Println("========producer push one message====", ret.MsgId)
time.Sleep(time.Second)
}
}()
// 執(zhí)行消費(fèi)者監(jiān)聽(tīng) 填入消息隊(duì)列topic
mqConsumerClient.ListenReceiveMsgDo("topicTest", func(mqMsg queue.MqMsg) {
// 收到一條消息
fmt.Println("receive====>",mqMsg.MsgId, mqMsg.BodyString())
})
time.Sleep(20*time.Second)
}
- 利用 go moduls 加載依賴并運(yùn)行(需要開(kāi)啟go moduls 或go1.13版本以上)
go mod init
go mod tidy
go run main.go
這樣我們就能看到該示例冻璃,每隔1s會(huì)生產(chǎn)一條消息并立刻消費(fèi)了該消息框架介紹??????
如果我們直接使用了 Orange框架 來(lái)開(kāi)發(fā)我們的web業(yè)務(wù)响谓,那么體驗(yàn)將會(huì)更加優(yōu)雅;
因該包使用了面向接口的模式省艳,我們能輕易的通過(guò)配置來(lái)對(duì)消息隊(duì)列驅(qū)動(dòng)進(jìn)行更改娘纷,能輕松的在redis和rocketmq直接進(jìn)行切換。
如果有興趣可以查看詳細(xì)文檔:Orange框架#消息隊(duì)列文檔