Go語言與RabbitMQ

RabbitMQ 概述

RabbitMQ是采用Erlang編程語言實現(xiàn)了高級消息隊列協(xié)議AMQP (Advanced Message Queuing Protocol)的開源消息代理軟件(消息隊列中間件

市面上流行的消息隊列中間件有很多種,而RabbitMQ只是其中比較流行的一種

我們簡單說說消息隊列中間件的作用

  • 解耦
  • 削峰
  • 異步處理
  • 緩存存儲
  • 消息通信
  • 提高系統(tǒng)拓展性

RabbitMQ 特點

  1. 可靠性

    通過一些機制例如,持久化,傳輸確認等來確保消息傳遞的可靠性

  2. 拓展性

    多個RabbitMQ節(jié)點可以組成集群

  3. 高可用性

    隊列可以在RabbitMQ集群中設置鏡像,如此一來即使部分節(jié)點掛掉了,但是隊列仍然可以使用

  4. 多種協(xié)議

    原生的支持AMQP,也能支持STOMP,MQTT等協(xié)議

  5. 豐富的客戶端

    我們常用的編程語言都支持RabbitMQ

  6. 管理界面

    自帶提供一個WEB管理界面

  7. 插件機制

    RabbitMQ 自己提供了很多插件,可以按需要進行拓展 Plugins

RabbitMQ基礎概念

總體上看RabbitMQ是一個生產(chǎn)者和消費者的模型, 接收,存儲 ,轉發(fā)

RabbitMQ_model.jpg

我們看看在RabbitMQ中的幾個主要概念

  1. Producer (生產(chǎn)者) : 消息的生產(chǎn)者,投遞方

  2. Consumer (消費者) : 消息的消費者

  3. RabbitMQ Broker (RabbitMQ 代理) : RabbitMQ 服務節(jié)點(單機情況中,就是代表RabbitMQ服務器)

  4. Queue (隊列) : 在RabbitMQ中Queue是存儲消息數(shù)據(jù)的唯一形式

  5. Binding (綁定) : RabbitMQ中綁定(Binding)是交換機(exchange)將消息(message)路由給隊列(queue)所需遵循的規(guī)則。如果要指示交換機“E”將消息路由給隊列“Q”躁锡,那么“Q”就需要與“E”進行綁定午绳。綁定操作需要定義一個可選的路由鍵(routing key)屬性給某些類型的交換機。路由鍵的意義在于從發(fā)送給交換機的眾多消息中選擇出某些消息映之,將其路由給綁定的隊列拦焚。

  6. RoutingKey (路由鍵) : 消息投遞給交換器,通常會指定一個 RoutingKey ,通過這個路由鍵來明確消息的路由規(guī)則

    RoutingKey 通常是生產(chǎn)者和消費者有協(xié)商一致的key策略,消費者就可以合法從生產(chǎn)者手中獲取數(shù)據(jù)杠输。這個RoutingKey主要當Exchange交換機模式為設定為direct和topic模式的時候使用耕漱,fanout模式不使用RoutingKey

  7. Exchange (交換機) : 生產(chǎn)者將消息發(fā)送給交換器(交換機),再由交換器將消息路由導對應的隊列中

    交換機四種類型 : fanout,direct,topic,headers

    1. fanout (扇形交換機) :

      將發(fā)送到該類型交換機的消息(message)路由到所有的與該交換機綁定的隊列中,如同一個"扇"狀擴散給各個隊列

    fanout_exchange.jpg

    fanout類型的交換機會忽略RoutingKey的存在,將message直接"廣播"到綁定的所有隊列中

  1. direct (直連交換機) :

    根據(jù)消息攜帶的路由鍵(RoutingKey) 將消息投遞到對應的隊列中

direct_exchange.jpg

direct類型的交換機(exchange)是RabbitMQ Broker的默認類型,它有一個特別的屬性對一些簡單的應用來說是非常有用的,在使用這個類型的Exchange時当编,可以不必指定routing key的名字黎烈,在此類型下創(chuàng)建的Queue有一個默認的routing key,這個routing key一般同Queue同名.

  1. Topic (主題交換機) :

    topic類型交換機在RoutingKeyBindKey 匹配規(guī)則上更加的靈活. 同樣是將消息路由到RoutingKeyBindingKey 相匹配的隊列中,但是匹配規(guī)則有如下的特點 :

    規(guī)則1: RoutingKey 是一個使用. 的字符串 例如: "go.log.info" , "java.log.error"

    規(guī)則2: BingingKey 也會一個使用 . 分割的字符串, 但是在 BindingKey 中可以使用兩種特殊字符 *# ,其中 "*" 用于匹配一個單詞,"#"用于匹配多規(guī)格單詞(零個或者多個單詞)

topic_exchange.jpg

RoutingKey和BindingKey 是一種"模糊匹配" ,那么一個消息Message可能 會被發(fā)送到一個或者多個隊列中
無法匹配的消息將會被丟棄或者返回者生產(chǎn)者

  1. Headers (頭交換機):

    Headers類型的交換機使用的不是很多

    關于Headers Exchange 摘取一段比較容易理解的解釋 :

    有時消息的路由操作會涉及到多個屬性考婴,此時使用消息頭就比用路由鍵更容易表達,頭交換機(headers exchange)就是為此而生的。頭交換機使用多個消息屬性來代替路由鍵建立路由規(guī)則能岩。通過判斷消息頭的值能否與指定的綁定相匹配來確立路由規(guī)則。

    我們可以綁定一個隊列到頭交換機上萧福,并給他們之間的綁定使用多個用于匹配的頭(header)拉鹃。這個案例中,消息代理得從應用開發(fā)者那兒取到更多一段信息鲫忍,換句話說膏燕,它需要考慮某條消息(message)是需要部分匹配還是全部匹配。上邊說的“更多一段消息”就是"x-match"參數(shù)悟民。當"x-match"設置為“any”時坝辫,消息頭的任意一個值被匹配就可以滿足條件,而當"x-match"設置為“all”的時候射亏,就需要消息頭的所有值都匹配成功近忙。

    頭交換機可以視為直連交換機的另一種表現(xiàn)形式。頭交換機能夠像直連交換機一樣工作智润,不同之處在于頭交換機的路由規(guī)則是建立在頭屬性值之上及舍,而不是路由鍵。路由鍵必須是一個字符串窟绷,而頭屬性值則沒有這個約束锯玛,它們甚至可以是整數(shù)或者哈希值(字典)等。

RabbitMQ 工作流程

消息生產(chǎn)流程

  1. 消息生產(chǎn)者連與RabbitMQ Broker 建立一個連接,建立好了連接之后,開啟一個信道Channel
  2. 聲明一個交換機,并設置其相關的屬性(交換機類型,持久化等)
  3. 聲明一個隊列并設置其相關屬性(排他性,持久化自動刪除等)
  4. 通過路由鍵將交換機和隊列綁定起來
  5. 消息生產(chǎn)者發(fā)送消息給 RabbitMQ Broker , 消息中包含了路由鍵,交換機等信息,交換機根據(jù)接收的路由鍵查找匹配對應的隊列
  6. 查找匹配成功,則將消息存儲到隊列中
  7. 查找匹配失敗,根據(jù)生產(chǎn)者配置的屬性選擇丟棄或者回退給生產(chǎn)者
  8. 關閉信道Channel , 關閉連接

消息消費流程

  1. 消息消費者連與RabbitMQ Broker 建立一個連接,建立好了連接之后,開啟一個信道Channel
  2. 消費者向RabbitMQ Broker 請求消費者相應隊列中的消息
  3. 等待RabbitMQ Broker 回應并投遞相應隊列中的消息,消費者接收消息
  4. 消費者確認(ack) 接收消息, RabbitMQ Broker 消除已經(jīng)確認的消息
  5. 關閉信道Channel ,關閉連接

Golang 操作RabbitMQ

RabbitMQ 支持我們常見的編程語言,此處我們使用 Golang 來操作

Golang操作RabbitMQ的前提我們需要有個RabbitMQ的服務端,至于RabbitMQ的服務怎么搭建我們此處就不詳細描述了.

Golang操作RabbitMQ的客戶端包,網(wǎng)上已經(jīng)有一個很流行的了,而且也是RabbitMQ官網(wǎng)比較推薦的,不需要我們再從頭開始構建一個RabbitMQ的Go語言客戶端包. 詳情

go get github.com/streadway/amqp

項目目錄

___lib
______commonFunc.go
___producer.go
___comsumer.go

commonFunc.go

package lib

import (
    "github.com/streadway/amqp"
    "log"
)
// RabbitMQ連接函數(shù)
func RabbitMQConn() (conn *amqp.Connection,err error){
    // RabbitMQ分配的用戶名稱
    var user string = "admin"
    // RabbitMQ用戶的密碼
    var pwd string = "123456"
    // RabbitMQ Broker 的ip地址
    var host string = "192.168.230.132"
    // RabbitMQ Broker 監(jiān)聽的端口
    var port string = "5672"
    url := "amqp://"+user+":"+pwd+"@"+host+":"+port+"/"
    // 新建一個連接
    conn,err =amqp.Dial(url)
    // 返回連接和錯誤
    return
}
// 錯誤處理函數(shù)
func ErrorHanding(err error, msg string){
    if err != nil{
        log.Fatalf("%s: %s", msg, err)
    }
}

基礎隊列使用

簡單隊列模式是RabbitMQ的常規(guī)用法,簡單理解就是消息生產(chǎn)者發(fā)送消息給一個隊列,然后消息的消息的消費者從隊列中讀取消息

當多個消費者訂閱同一個隊列的時候,隊列中的消息是平均分攤給多個消費者處理

定義一個消息的生產(chǎn)者

producer.go

package main

import (
    "encoding/json"
    "log"
    "myDemo/rabbitmq_demo/lib"

    "github.com/streadway/amqp"
)
type simpleDemo struct {
    Name string `json:"name"`
    Addr string `json:"addr"`
}
func main() {
    // 連接RabbitMQ服務器
    conn, err := lib.RabbitMQConn()
    lib.ErrorHanding(err, "Failed to connect to RabbitMQ")
    // 關閉連接
    defer conn.Close()
    // 新建一個通道
    ch, err := conn.Channel()
    lib.ErrorHanding(err, "Failed to open a channel")
    // 關閉通道
    defer ch.Close()
    // 聲明或者創(chuàng)建一個隊列用來保存消息
    q, err := ch.QueueDeclare(
        // 隊列名稱
        "simple:queue", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    lib.ErrorHanding(err, "Failed to declare a queue")
    data := simpleDemo{
        Name: "Tom",
        Addr: "Beijing",
    }
    dataBytes,err := json.Marshal(data)
    if err != nil{
        lib.ErrorHanding(err,"struct to json failed")
    }
    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        dataBytes,
        })
    log.Printf(" [x] Sent %s", dataBytes)
    lib.ErrorHanding(err, "Failed to publish a message")
}

定義一個消息的消費者

comsumer.go

package main

import (
    "log"
    "myDemo/rabbitmq_demo/lib"
)

func main() {
    conn, err := lib.RabbitMQConn()
    lib.ErrorHanding(err,"failed to connect to RabbitMQ")
    defer conn.Close()
    ch, err := conn.Channel()
    lib.ErrorHanding(err,"failed to open a channel")
    defer ch.Close()
    q, err := ch.QueueDeclare(
        "simple:queue", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    lib.ErrorHanding(err,"Failed to declare a queue")
    // 定義一個消費者
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    lib.ErrorHanding(err,"Failed to register a consume")
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    select {}
}

工作隊列

工作隊列也稱為 任務隊列 任務隊列是為了避免等待執(zhí)行一些耗時的任務,而是將需要執(zhí)行的任務封裝為消息發(fā)送給工作隊列,后臺運行的工作進程將任務消息取出來并執(zhí)行相關任務 , 多個后臺工作進程同時間進行,那么任務在他們之間共享

work-queue.png

我們定義一個任務的生產(chǎn)者,用于生產(chǎn)任務消息

task.go

package main

import (
    "github.com/streadway/amqp"
    "log"
    "myDemo/rabbitmq_demo/lib"
    "os"
    "strings"
)

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "no task"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}
func main() {
    // 連接RabbitMQ服務器
    conn, err := lib.RabbitMQConn()
    lib.ErrorHanding(err, "Failed to connect to RabbitMQ")
    // 關閉連接
    defer conn.Close()
    // 新建一個通道
    ch, err := conn.Channel()
    lib.ErrorHanding(err, "Failed to open a channel")
    // 關閉通道
    defer ch.Close()
    // 聲明或者創(chuàng)建一個隊列用來保存消息
    q, err := ch.QueueDeclare(
        // 隊列名稱
        "task:queue", // name
        false,          // durable
        false,          // delete when unused
        false,          // exclusive
        false,          // no-wait
        nil,            // arguments
    )
    lib.ErrorHanding(err, "Failed to declare a queue")
    body := bodyFrom(os.Args)
    err = ch.Publish(
        "",
        q.Name,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            // 將消息標記為持久消息
            DeliveryMode: amqp.Persistent,
            Body:         []byte(body),
        })
    lib.ErrorHanding(err, "Failed to publish a message")
    log.Printf("sent %s", body)
}

定義一個工作者,用于消費掉任務消息

worker.go

package main

import (
    "log"
    "myDemo/rabbitmq_demo/lib"
)

func main() {
    conn, err := lib.RabbitMQConn()
    lib.ErrorHanding(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    lib.ErrorHanding(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "task:queue", // name
        false,         // durable
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    lib.ErrorHanding(err, "Failed to declare a queue")
    // 將預取計數(shù)器設置為1
    // 在并行處理中將消息分配給不同的工作進程
    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    lib.ErrorHanding(err, "Failed to set QoS")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    lib.ErrorHanding(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            log.Printf("Done")
            d.Ack(false)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

測試

#shell1
go run task.go
#shell2
go run worker.go
#shell3
go run worker.go

RabbitMQ 的用法很多,詳情參看官網(wǎng)文檔

參考資料

https://www.rabbitmq.com/getstarted.html
http://rabbitmq.mr-ping.com/
https://github.com/streadway/amqp
https://blog.csdn.net/u013256816/category_6532725.html

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末钾麸,一起剝皮案震驚了整個濱河市更振,隨后出現(xiàn)的幾起案子炕桨,更是在濱河造成了極大的恐慌,老刑警劉巖肯腕,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件献宫,死亡現(xiàn)場離奇詭異,居然都是意外死亡实撒,警方通過查閱死者的電腦和手機姊途,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來知态,“玉大人捷兰,你說我怎么就攤上這事「好簦” “怎么了贡茅?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長其做。 經(jīng)常有香客問我顶考,道長,這世上最難降的妖魔是什么妖泄? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任驹沿,我火速辦了婚禮,結果婚禮上蹈胡,老公的妹妹穿的比我還像新娘渊季。我一直安慰自己,他們只是感情好罚渐,可當我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布却汉。 她就那樣靜靜地躺著,像睡著了一般搅轿。 火紅的嫁衣襯著肌膚如雪病涨。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天璧坟,我揣著相機與錄音既穆,去河邊找鬼。 笑死雀鹃,一個胖子當著我的面吹牛幻工,可吹牛的內容都是我干的。 我是一名探鬼主播黎茎,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼囊颅,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側響起踢代,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤盲憎,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后胳挎,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體饼疙,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年慕爬,在試婚紗的時候發(fā)現(xiàn)自己被綠了窑眯。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡医窿,死狀恐怖磅甩,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情姥卢,我是刑警寧澤卷要,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站独榴,受9級特大地震影響却妨,放射性物質發(fā)生泄漏。R本人自食惡果不足惜括眠,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望倍权。 院中可真熱鬧掷豺,春花似錦、人聲如沸薄声。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽默辨。三九已至德频,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間缩幸,已是汗流浹背壹置。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留表谊,地道東北人钞护。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像爆办,于是被迫代替她去往敵國和親难咕。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,037評論 2 355