請(qǐng)求響應(yīng)模式
無(wú)論是發(fā)布訂閱模式還是queue模式,nats都不能保證消息一定發(fā)送到訂閱方玫膀,除非訂閱者發(fā)送一個(gè)響應(yīng)給發(fā)布者帖旨。
所以訂閱者發(fā)送一個(gè)回執(zhí)給發(fā)布者誉简,就是請(qǐng)求響應(yīng)模式闷串。
這種模式有什么用?
nats要求訂閱者一定要先完成訂閱,發(fā)布消息后碉熄,訂閱者才能收到消息锈津,類(lèi)似離線(xiàn)消息的模式nats不支持凉蜂。就算先完成訂閱,后發(fā)送消息茎杂,消息發(fā)送方也不知道是否有訂閱者收到了消息煌往,請(qǐng)求響應(yīng)模式就是應(yīng)對(duì)這種情況。
基本流程
A發(fā)送消息羞海,B收到消息却邓,發(fā)送回執(zhí)給A院水。這就是request reply的基本流程衙耕。
基本實(shí)現(xiàn)原理
- A啟用request模式發(fā)送消息(消息中包含了回執(zhí)信息,replya主題)橙喘,同步等待回執(zhí)(有超時(shí)時(shí)間)厅瞎。
- B收到消息初坠,在消息中取出回執(zhí)信息=replay主題,對(duì)replay主題碟刺,
主動(dòng)
發(fā)送普通消息(消息內(nèi)容可自定義,比如server A上的service1收到msgid=xxxx的消息爽柒。)。- A在超時(shí)內(nèi)收到消息白嘁,確認(rèn)結(jié)束酿矢。
- A在超時(shí)內(nèi)未收到消息瘫筐,超時(shí)結(jié)束刺覆。
注意
- 因?yàn)锳發(fā)送的消息中包裝了回執(zhí)測(cè)相關(guān)信息谦屑,訂閱者B收到消息后,也要主動(dòng)發(fā)送回執(zhí)酝枢,所以請(qǐng)求響應(yīng)模式帘睦,對(duì)雙方都有影響坦康。
- A發(fā)送消息后滞欠,等待B的回執(zhí),需要給A設(shè)置超時(shí)時(shí)間逸绎,超時(shí)后棺牧,不在等待回執(zhí)颊乘,直接結(jié)束醉锄,效果和不需要回執(zhí)的消息發(fā)送一樣,不在關(guān)心是否有訂閱者收到消息纲爸。
兩種模式
request reply有兩種模式:
- one to one 默認(rèn)模式
1條消息识啦,N個(gè)訂閱者,消息發(fā)送方家妆,僅會(huì)收到一條
回執(zhí)記錄
(因?yàn)橄l(fā)送方收到回執(zhí)消息后伤极,就自動(dòng)斷開(kāi)了對(duì)回執(zhí)消息的訂閱姨伤。)乍楚,即使N個(gè)訂閱都都收到了消息。注意:pub/sub和queue模式的不同
- one to many 非默認(rèn)模式忿偷,需要自己實(shí)現(xiàn)
1條消息鲤桥,N個(gè)訂閱者芜壁,消息發(fā)送方高氮,可以自己設(shè)定一個(gè)數(shù)量限制N,接受到N個(gè)回執(zhí)消息后剪芍,斷開(kāi)對(duì)回執(zhí)消息的訂閱罪裹。
Server
package main
import (
"github.com/nats-io/go-nats"
"log"
"flag"
)
const (
//url = "nats://192.168.3.125:4222"
url = nats.DefaultURL
)
var (
nc *nats.Conn
encodeConn *nats.EncodedConn
err error
)
func init() {
if nc, err = nats.Connect(url); checkErr(err) {
//
if encodeConn, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER);
checkErr(err) {
}
}
}
func main() {
var (
servername = flag.String("servername", "Y", "name for server")
queueGroup = flag.String("group", "", "group name for Subscribe")
subj = flag.String("subj", "yasenagat", "subject name")
)
flag.Parse()
mode := "queue"
if *queueGroup == "" {
mode = "pub/sub"
}
log.Printf("Server[%v] Subscribe Subject[%v] in [%v]Mode", *servername, *subj, mode)
startService(*subj, *servername+" worker1", *queueGroup)
startService(*subj, *servername+" worker2", *queueGroup)
startService(*subj, *servername+" worker3", *queueGroup)
nc.Flush()
select {}
}
//receive message
func startService(subj, name, queue string) {
go async(nc, subj, name, queue)
}
func async(nc *nats.Conn, subj, name, queue string) {
replyMsg := name + " Received a msg"
if queue == "" {
nc.Subscribe(subj, func(msg *nats.Msg) {
nc.Publish(msg.Reply, []byte(replyMsg))
log.Println(name, "Received a message From Async : ", string(msg.Data))
})
} else {
nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
nc.Publish(msg.Reply, []byte(replyMsg))
log.Println(name, "Received a message From Async : ", string(msg.Data))
})
}
}
func checkErr(err error) bool {
if err != nil {
log.Println(err)
return false
}
return true
}
Client
package main
import (
"github.com/nats-io/go-nats"
"log"
"github.com/pborman/uuid"
"flag"
"time"
)
const (
//url = "nats://192.168.3.125:4222"
url = nats.DefaultURL
)
var (
nc *nats.Conn
encodeConn *nats.EncodedConn
err error
)
func init() {
if nc, err = nats.Connect(url); checkErr(err, func() {
}) {
//
if encodeConn, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER);
checkErr(err, func() {
}) {
}
}
}
func main() {
var (
subj = flag.String("subj", "yasenagat", "subject name")
)
flag.Parse()
log.Println(*subj)
startClient(*subj)
time.Sleep(time.Second)
}
//send message to server
func startClient(subj string) {
for i := 0; i < 3; i++ {
id := uuid.New()
log.Println(id)
if msg, err := nc.Request(subj, []byte(id+" hello"), time.Second); checkErr(err, func() {
// handle err
}) {
log.Println(string(msg.Data))
}
}
}
func checkErr(err error, errFun func()) bool {
if err != nil {
log.Println(err)
errFun()
return false
}
return true
}
pub/sub模式啟動(dòng)
$ ./main
2018/08/18 18:54:10 Server[Y] Subscribe Subject[yasenagat] in [pub/sub]Mode
2018/08/18 18:54:26 Y worker2 Received a message From Async : b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
2018/08/18 18:54:26 Y worker1 Received a message From Async : b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
2018/08/18 18:54:26 Y worker3 Received a message From Async : b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
2018/08/18 18:54:26 Y worker2 Received a message From Async : 2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
2018/08/18 18:54:26 Y worker1 Received a message From Async : 2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
2018/08/18 18:54:26 Y worker3 Received a message From Async : 2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
2018/08/18 18:54:26 Y worker2 Received a message From Async : fe9f773a-129b-4919-9bc4-c8a4571fef6e hello
2018/08/18 18:54:26 Y worker1 Received a message From Async : fe9f773a-129b-4919-9bc4-c8a4571fef6e hello
2018/08/18 18:54:26 Y worker3 Received a message From Async : fe9f773a-129b-4919-9bc4-c8a4571fef6e hello
發(fā)送消息
$ ./main
2018/08/18 18:54:26 yasenagat
2018/08/18 18:54:26 b035d7c2-e7e9-4337-bb8a-a23ec85fc31a
2018/08/18 18:54:26 Y worker3 Received a msg
2018/08/18 18:54:26 2d8dfe75-8fee-4b4c-8599-1824638dfa8c
2018/08/18 18:54:26 Y worker2 Received a msg
2018/08/18 18:54:26 fe9f773a-129b-4919-9bc4-c8a4571fef6e
2018/08/18 18:54:26 Y worker2 Received a msg
queue模式啟動(dòng)
$ ./main -group=test
2018/08/18 19:14:31 Server[Y] Subscribe Subject[yasenagat] in [queue]Mode
2018/08/18 19:14:33 Y worker2 Received a message From Async : 4ecf2728-b3a7-4181-893a-aefde3bc8d2e hello Y worker2 Received a msg
2018/08/18 19:14:33 Y worker3 Received a message From Async : 4e7f1363-9a47-4705-b87a-4aaeb80164f0 hello Y worker3 Received a msg
2018/08/18 19:14:33 Y worker2 Received a message From Async : 38b1f74b-8a3b-46ba-a10e-62e50efbc127 hello Y worker2 Received a msg
發(fā)送消息
$ ./main
2018/08/18 19:14:33 yasenagat
2018/08/18 19:14:33 4ecf2728-b3a7-4181-893a-aefde3bc8d2e
2018/08/18 19:14:33 Y worker2 Received a msg
2018/08/18 19:14:33 4e7f1363-9a47-4705-b87a-4aaeb80164f0
2018/08/18 19:14:33 Y worker3 Received a msg
2018/08/18 19:14:33 38b1f74b-8a3b-46ba-a10e-62e50efbc127
2018/08/18 19:14:33 Y worker2 Received a msg
queue模式下谁帕,發(fā)送3條消息,3個(gè)訂閱者有相同的queue碾牌,每條消息只有一個(gè)訂閱者收到。
pub/sub模式下征冷,發(fā)送3條消息誓琼,3個(gè)訂閱者都收到3條消息腹侣,一共9條。
總結(jié):
回執(zhí)主要解決:訂閱者是否收到消息的問(wèn)題今穿、有多少個(gè)訂閱者收到消息的問(wèn)題伦籍。(不是具體業(yè)務(wù)是否執(zhí)行完成的回執(zhí)帖鸦!)
基于事件的架構(gòu)模式可以構(gòu)建于消息機(jī)制之上,依賴(lài)消息機(jī)制作儿。異步調(diào)用的其中一種實(shí)現(xiàn)方式攻锰,就是基于事件模式。異步調(diào)用又是分布式系統(tǒng)中常見(jiàn)的任務(wù)處理方式垒迂。
業(yè)務(wù)模式
- 業(yè)務(wù)A發(fā)送eventA給事件中心妒蛇,
等待回執(zhí)
- 事件中心告知A收到了消息,開(kāi)始對(duì)外發(fā)送廣播
- 訂閱者B訂閱了eventA主題
- 事件中心對(duì)eventA主題發(fā)送廣播吏奸,
等待回執(zhí)
- B收到消息陶耍,告知事件中心,收到eventA旺拉,開(kāi)始執(zhí)行任務(wù)taskA
- B異步執(zhí)行完taskA,通知事件中心taskAComplete,
等待回執(zhí)
- 事件中心發(fā)送回執(zhí)給B晋涣,對(duì)外發(fā)送廣播,taskAComplete
- ........
如果超時(shí)谢鹊,未能收到回執(zhí),需要回執(zhí)信息的確認(rèn)方可以主動(dòng)調(diào)用相關(guān)接口佃扼,查詢(xún)?nèi)蝿?wù)執(zhí)行狀態(tài)兼耀,根據(jù)任務(wù)狀態(tài)做后續(xù)的處理瘤运。