golang nats[4] request reply模式

請(qǐng)求響應(yīng)模式

無(wú)論是發(fā)布訂閱模式還是queue模式,nats都不能保證消息一定發(fā)送到訂閱方玫膀,除非訂閱者發(fā)送一個(gè)響應(yīng)給發(fā)布者帖旨。
所以訂閱者發(fā)送一個(gè)回執(zhí)給發(fā)布者誉简,就是請(qǐng)求響應(yīng)模式闷串。

這種模式有什么用?

nats要求訂閱者一定要先完成訂閱,發(fā)布消息后碉熄,訂閱者才能收到消息锈津,類(lèi)似離線(xiàn)消息的模式nats不支持凉蜂。就算先完成訂閱,后發(fā)送消息茎杂,消息發(fā)送方也不知道是否有訂閱者收到了消息煌往,請(qǐng)求響應(yīng)模式就是應(yīng)對(duì)這種情況。

基本流程

A發(fā)送消息羞海,B收到消息却邓,發(fā)送回執(zhí)給A院水。這就是request reply的基本流程衙耕。

基本實(shí)現(xiàn)原理

  • A啟用request模式發(fā)送消息(消息中包含了回執(zhí)信息,replya主題)橙喘,同步等待回執(zhí)(有超時(shí)時(shí)間)厅瞎。
  • B收到消息初坠,在消息中取出回執(zhí)信息=replay主題,對(duì)replay主題碟刺,主動(dòng)發(fā)送普通消息(消息內(nèi)容可自定義,比如server A上的service1收到msgid=xxxx的消息爽柒。)。
  • A在超時(shí)內(nèi)收到消息白嘁,確認(rèn)結(jié)束酿矢。
  • A在超時(shí)內(nèi)未收到消息瘫筐,超時(shí)結(jié)束刺覆。

注意

  • 因?yàn)锳發(fā)送的消息中包裝了回執(zhí)測(cè)相關(guān)信息谦屑,訂閱者B收到消息后,也要主動(dòng)發(fā)送回執(zhí)酝枢,所以請(qǐng)求響應(yīng)模式帘睦,對(duì)雙方都有影響坦康。
  • A發(fā)送消息后滞欠,等待B的回執(zhí),需要給A設(shè)置超時(shí)時(shí)間逸绎,超時(shí)后棺牧,不在等待回執(zhí)颊乘,直接結(jié)束醉锄,效果和不需要回執(zhí)的消息發(fā)送一樣,不在關(guān)心是否有訂閱者收到消息纲爸。

兩種模式

request reply有兩種模式:

  • one to one 默認(rèn)模式

1條消息识啦,N個(gè)訂閱者,消息發(fā)送方家妆,僅會(huì)收到一條回執(zhí)記錄(因?yàn)橄l(fā)送方收到回執(zhí)消息后伤极,就自動(dòng)斷開(kāi)了對(duì)回執(zhí)消息的訂閱姨伤。)乍楚,即使N個(gè)訂閱都都收到了消息。注意:pub/sub和queue模式的不同

  • one to many 非默認(rèn)模式忿偷,需要自己實(shí)現(xiàn)

1條消息鲤桥,N個(gè)訂閱者芜壁,消息發(fā)送方高氮,可以自己設(shè)定一個(gè)數(shù)量限制N,接受到N個(gè)回執(zhí)消息后剪芍,斷開(kāi)對(duì)回執(zhí)消息的訂閱罪裹。

Server

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "flag"
)

const (
    //url   = "nats://192.168.3.125:4222"
    url = nats.DefaultURL
)

var (
    nc *nats.Conn

    encodeConn *nats.EncodedConn
    err        error
)

func init() {
    if nc, err = nats.Connect(url); checkErr(err) {
        //
        if encodeConn, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER);
            checkErr(err) {
        }
    }
}

func main() {
    var (
        servername = flag.String("servername", "Y", "name for server")
        queueGroup = flag.String("group", "", "group name for Subscribe")
        subj       = flag.String("subj", "yasenagat", "subject name")
    )
    flag.Parse()

    mode := "queue"
    if *queueGroup == "" {
        mode = "pub/sub"
    }
    log.Printf("Server[%v] Subscribe Subject[%v] in [%v]Mode", *servername, *subj, mode)

    startService(*subj, *servername+" worker1", *queueGroup)
    startService(*subj, *servername+" worker2", *queueGroup)
    startService(*subj, *servername+" worker3", *queueGroup)

    nc.Flush()
    select {}
}

//receive message
func startService(subj, name, queue string) {
    go async(nc, subj, name, queue)
}

func async(nc *nats.Conn, subj, name, queue string) {
    replyMsg := name + " Received a msg"
    if queue == "" {
        nc.Subscribe(subj, func(msg *nats.Msg) {
            nc.Publish(msg.Reply, []byte(replyMsg))
            log.Println(name, "Received a message From Async : ", string(msg.Data))
        })
    } else {
        nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
            nc.Publish(msg.Reply, []byte(replyMsg))
            log.Println(name, "Received a message From Async : ", string(msg.Data))
        })
    }

}

func checkErr(err error) bool {
    if err != nil {
        log.Println(err)
        return false
    }
    return true
}

Client

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "github.com/pborman/uuid"
    "flag"
    "time"
)

const (
    //url   = "nats://192.168.3.125:4222"
    url = nats.DefaultURL
)

var (
    nc         *nats.Conn
    encodeConn *nats.EncodedConn
    err        error
)

func init() {
    if nc, err = nats.Connect(url); checkErr(err, func() {

    }) {
        //
        if encodeConn, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER);
            checkErr(err, func() {

            }) {

        }
    }
}

func main() {
    var (
        subj = flag.String("subj", "yasenagat", "subject name")
    )
    flag.Parse()
    log.Println(*subj)
    startClient(*subj)

    time.Sleep(time.Second)
}

//send message to server
func startClient(subj string) {
    for i := 0; i < 3; i++ {
        id := uuid.New()
        log.Println(id)
        if msg, err := nc.Request(subj, []byte(id+" hello"), time.Second); checkErr(err, func() {
            // handle err
        }) {
            log.Println(string(msg.Data))
        }
    }
}

func checkErr(err error, errFun func()) bool {
    if err != nil {
        log.Println(err)
        errFun()
        return false
    }
    return true
}

pub/sub模式啟動(dòng)

$ ./main
2018/08/18 18:54:10 Server[Y] Subscribe Subject[yasenagat] in [pub/sub]Mode
2018/08/18 18:54:26 Y worker2 Received a message From Async :  b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
2018/08/18 18:54:26 Y worker1 Received a message From Async :  b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
2018/08/18 18:54:26 Y worker3 Received a message From Async :  b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
2018/08/18 18:54:26 Y worker2 Received a message From Async :  2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
2018/08/18 18:54:26 Y worker1 Received a message From Async :  2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
2018/08/18 18:54:26 Y worker3 Received a message From Async :  2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
2018/08/18 18:54:26 Y worker2 Received a message From Async :  fe9f773a-129b-4919-9bc4-c8a4571fef6e hello
2018/08/18 18:54:26 Y worker1 Received a message From Async :  fe9f773a-129b-4919-9bc4-c8a4571fef6e hello
2018/08/18 18:54:26 Y worker3 Received a message From Async :  fe9f773a-129b-4919-9bc4-c8a4571fef6e hello

發(fā)送消息

$ ./main
2018/08/18 18:54:26 yasenagat
2018/08/18 18:54:26 b035d7c2-e7e9-4337-bb8a-a23ec85fc31a
2018/08/18 18:54:26 Y worker3 Received a msg
2018/08/18 18:54:26 2d8dfe75-8fee-4b4c-8599-1824638dfa8c
2018/08/18 18:54:26 Y worker2 Received a msg
2018/08/18 18:54:26 fe9f773a-129b-4919-9bc4-c8a4571fef6e
2018/08/18 18:54:26 Y worker2 Received a msg

queue模式啟動(dòng)

$ ./main -group=test
2018/08/18 19:14:31 Server[Y] Subscribe Subject[yasenagat] in [queue]Mode
2018/08/18 19:14:33 Y worker2 Received a message From Async :  4ecf2728-b3a7-4181-893a-aefde3bc8d2e hello Y worker2 Received a msg
2018/08/18 19:14:33 Y worker3 Received a message From Async :  4e7f1363-9a47-4705-b87a-4aaeb80164f0 hello Y worker3 Received a msg
2018/08/18 19:14:33 Y worker2 Received a message From Async :  38b1f74b-8a3b-46ba-a10e-62e50efbc127 hello Y worker2 Received a msg

發(fā)送消息

$ ./main
2018/08/18 19:14:33 yasenagat
2018/08/18 19:14:33 4ecf2728-b3a7-4181-893a-aefde3bc8d2e
2018/08/18 19:14:33 Y worker2 Received a msg
2018/08/18 19:14:33 4e7f1363-9a47-4705-b87a-4aaeb80164f0
2018/08/18 19:14:33 Y worker3 Received a msg
2018/08/18 19:14:33 38b1f74b-8a3b-46ba-a10e-62e50efbc127
2018/08/18 19:14:33 Y worker2 Received a msg

queue模式下谁帕,發(fā)送3條消息,3個(gè)訂閱者有相同的queue碾牌,每條消息只有一個(gè)訂閱者收到。

pub/sub模式下征冷,發(fā)送3條消息誓琼,3個(gè)訂閱者都收到3條消息腹侣,一共9條。

總結(jié):

回執(zhí)主要解決:訂閱者是否收到消息的問(wèn)題今穿、有多少個(gè)訂閱者收到消息的問(wèn)題伦籍。(不是具體業(yè)務(wù)是否執(zhí)行完成的回執(zhí)帖鸦!)
基于事件的架構(gòu)模式可以構(gòu)建于消息機(jī)制之上,依賴(lài)消息機(jī)制作儿。異步調(diào)用的其中一種實(shí)現(xiàn)方式攻锰,就是基于事件模式。異步調(diào)用又是分布式系統(tǒng)中常見(jiàn)的任務(wù)處理方式垒迂。

業(yè)務(wù)模式

  • 業(yè)務(wù)A發(fā)送eventA給事件中心妒蛇,等待回執(zhí)
  • 事件中心告知A收到了消息,開(kāi)始對(duì)外發(fā)送廣播
  • 訂閱者B訂閱了eventA主題
  • 事件中心對(duì)eventA主題發(fā)送廣播吏奸,等待回執(zhí)
  • B收到消息陶耍,告知事件中心,收到eventA旺拉,開(kāi)始執(zhí)行任務(wù)taskA
  • B異步執(zhí)行完taskA,通知事件中心taskAComplete,等待回執(zhí)
  • 事件中心發(fā)送回執(zhí)給B晋涣,對(duì)外發(fā)送廣播,taskAComplete
  • ........

如果超時(shí)谢鹊,未能收到回執(zhí),需要回執(zhí)信息的確認(rèn)方可以主動(dòng)調(diào)用相關(guān)接口佃扼,查詢(xún)?nèi)蝿?wù)執(zhí)行狀態(tài)兼耀,根據(jù)任務(wù)狀態(tài)做后續(xù)的處理瘤运。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末匠题,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子郁季,更是在濱河造成了極大的恐慌梦裂,老刑警劉巖盖淡,帶你破解...
    沈念sama閱讀 210,978評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件禁舷,死亡現(xiàn)場(chǎng)離奇詭異牵咙,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)洁桌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門(mén)另凌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人吠谢,你說(shuō)我怎么就攤上這事工坊。” “怎么了罢吃?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,623評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵尿招,是天一觀的道長(zhǎng)阱驾。 經(jīng)常有香客問(wèn)我,道長(zhǎng)吁伺,這世上最難降的妖魔是什么租谈? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,324評(píng)論 1 282
  • 正文 為了忘掉前任割去,我火速辦了婚禮,結(jié)果婚禮上呻逆,老公的妹妹穿的比我還像新娘咖城。我一直安慰自己,他們只是感情好宜雀,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,390評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布悴品。 她就那樣靜靜地躺著苔严,像睡著了一般。 火紅的嫁衣襯著肌膚如雪届氢。 梳的紋絲不亂的頭發(fā)上悼沈,一...
    開(kāi)封第一講書(shū)人閱讀 49,741評(píng)論 1 289
  • 那天絮供,我揣著相機(jī)與錄音,去河邊找鬼缚俏。 笑死忧换,一個(gè)胖子當(dāng)著我的面吹牛向拆,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播刹缝,決...
    沈念sama閱讀 38,892評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼梢夯,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼颂砸!你這毒婦竟也來(lái)了死姚?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,655評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎温鸽,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體姑尺,經(jīng)...
    沈念sama閱讀 44,104評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡切蟋,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年柄粹,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了驻右。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,569評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖拣凹,靈堂內(nèi)的尸體忽然破棺而出嚣镜,到底是詐尸還是另有隱情,我是刑警寧澤菊匿,帶...
    沈念sama閱讀 34,254評(píng)論 4 328
  • 正文 年R本政府宣布凡涩,位于F島的核電站疹蛉,受9級(jí)特大地震影響可款,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜筋讨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,834評(píng)論 3 312
  • 文/蒙蒙 一赤屋、第九天 我趴在偏房一處隱蔽的房頂上張望壁袄。 院中可真熱鬧嗜逻,春花似錦、人聲如沸栈顷。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,725評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至靡努,卻和暖如春狠半,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背颤难。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,950評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工神年, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人行嗤。 一個(gè)月前我還...
    沈念sama閱讀 46,260評(píng)論 2 360
  • 正文 我出身青樓已日,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親栅屏。 傳聞我的和親對(duì)象是個(gè)殘疾皇子飘千,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,446評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 點(diǎn)擊查看原文 Web SDK 開(kāi)發(fā)手冊(cè) SDK 概述 網(wǎng)易云信 SDK 為 Web 應(yīng)用提供一個(gè)完善的 IM 系統(tǒng)...
    layjoy閱讀 13,692評(píng)論 0 15
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)护奈,斷路器,智...
    卡卡羅2017閱讀 134,628評(píng)論 18 139
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,889評(píng)論 2 11
  • 今天早晨鸵闪,處在霧霾中的北京朋友發(fā)來(lái)一張照片: 是褚時(shí)健的橙子蚌讼,於是我們有了以下的對(duì)話(huà): "褚橙啊伞矩,我還沒(méi)吃過(guò)苛让,好吃...
    MZ_梅枝閱讀 408評(píng)論 0 0
  • ——舍與得,珍惜現(xiàn)在擁有 電影《人在囧途泰囧》有三位主...
    行走在學(xué)習(xí)的路上閱讀 1,009評(píng)論 4 51