封裝 golang 使用路由模式調(diào)用 rabbitmq

參考文檔地址:
http://rabbitmq.mr-ping.com/tutorials_with_golang/[6]RPC.html

抽出來的包 rabbitmq.go


import (
    "github.com/streadway/amqp"
    "log"
    "sync"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

var pool  = sync.Pool{
    New: func() interface{} {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
            failOnError(err, "Failed to connect to RabbitMQ")
            return conn
        },
}

type MessageQueue struct {
    conn *amqp.Connection // 持有連接對象
    ch *amqp.Channel      // 持有channel 對象
    ExchangeName string   // 持有交換機的名稱
    QueueName string      // 接受消息的隊列名稱
}

// 獲取到交換機
func (mq *MessageQueue) getExchange(exchange string)   {

    ch, err := mq.conn.Channel()
    failOnError(err, "Failed to open a channel")
    mq.ch = ch

    err = ch.ExchangeDeclare(
        exchange, // name
        "direct",      // type
        true,          // durable 持久化消息
        false,         // auto-deleted
        false,         // internal
        false,         // no-wait
        nil,           // arguments
    )

    failOnError(err, "Failed to declare an exchange")
}

// 發(fā)送消息
func (mq *MessageQueue)SendMessage(msgType, msg string)  {
    mq.conn = pool.Get().(* amqp.Connection)

    // 獲取到交換機
    mq.getExchange(mq.ExchangeName)

    // 推送消息
    err := mq.ch.Publish(
        mq.ExchangeName,         // exchange
        msgType,        // routing key
        false, // mandatory
        false, // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(msg),
        })
    failOnError(err, "Failed to publish a message")
}

// 關(guān)閉資源
func (mq *MessageQueue)Close(){
    pool.Put(mq.conn) // 連接放回
    mq.ch.Close()
}

// 獲取到消息
func (mq *MessageQueue)GetMessage(msgType string) <-chan string {
    mq.conn = pool.Get().(* amqp.Connection)

    // 獲取到交換機
    mq.getExchange(mq.ExchangeName)

    // 存儲 臨時交換隊列
    q, err := mq.ch.QueueDeclare(
        mq.QueueName,    // name
        true, // durable
        false, // delete when usused
        false,  // exclusive
        false, // no-wait
        nil,   // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = mq.ch.QueueBind(
        q.Name,        // queue name
        msgType,             // routing key
        mq.ExchangeName, // exchange
        false,
        nil)
    failOnError(err, "Failed to bind a queue")

    // 設(shè)置逐個消費消息
    err = mq.ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    msgs, err := mq.ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,   // auto ack
        false,  // exclusive
        false,  // no local
        false,  // no wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    ret := make(chan string)

    println("進來循環(huán)處理消息")
    go func() {
        for {
            select {
            case d := <-msgs:
                mess := string(d.Body)
                println("獲得的消息:" + mess)
                ret <- mess
                d.Ack(false) // 標(biāo)記消息被消費掉了
            }
        }
    }()

    return ret
}

客戶端

func main()  {
    args := os.Args
    message := rabbit.MessageQueue{ExchangeName:"messageQueue"}
    defer message.Close()

    message.SendMessage(args[1],args[2])

}

服務(wù)端

func main()  {
    // 接受消息并處理
    message := rabbit.MessageQueue{ExchangeName:"messageQueue",QueueName:"messageQueueData"}
    defer message.Close()

    msg := message.GetMessage(os.Args[1])
    sigs := make(chan os.Signal, 1)

    // 接受用戶中斷和,ctrl+c 和 用戶中斷
    signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)

    ms := make([]string,0)

    for {
        select {
        case m := <-msg:
            println(" 開始處理消息: ")
            println(m)

            ms = append(ms, m)

            time.Sleep( time.Second * 10 )
            println(" 消息處理完畢: ")
        case sig := <-sigs:
            ioutil.WriteFile("a.txt", []byte(strings.Join(ms,"\n")) , os.ModePerm)
            fmt.Println("%v", sig)
            return
        }
    }

}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末孵延,一起剝皮案震驚了整個濱河市寒瓦,隨后出現(xiàn)的幾起案子心包,更是在濱河造成了極大的恐慌琉苇,老刑警劉巖做鹰,帶你破解...
    沈念sama閱讀 218,284評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件雏门,死亡現(xiàn)場離奇詭異,居然都是意外死亡馍管,警方通過查閱死者的電腦和手機郭赐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來确沸,“玉大人捌锭,你說我怎么就攤上這事÷奚樱” “怎么了观谦?”我有些...
    開封第一講書人閱讀 164,614評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長桨菜。 經(jīng)常有香客問我豁状,道長,這世上最難降的妖魔是什么倒得? 我笑而不...
    開封第一講書人閱讀 58,671評論 1 293
  • 正文 為了忘掉前任泻红,我火速辦了婚禮,結(jié)果婚禮上霞掺,老公的妹妹穿的比我還像新娘谊路。我一直安慰自己,他們只是感情好菩彬,可當(dāng)我...
    茶點故事閱讀 67,699評論 6 392
  • 文/花漫 我一把揭開白布缠劝。 她就那樣靜靜地躺著,像睡著了一般骗灶。 火紅的嫁衣襯著肌膚如雪惨恭。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,562評論 1 305
  • 那天矿卑,我揣著相機與錄音喉恋,去河邊找鬼。 笑死母廷,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的糊肤。 我是一名探鬼主播琴昆,決...
    沈念sama閱讀 40,309評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼馆揉!你這毒婦竟也來了业舍?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,223評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎舷暮,沒想到半個月后态罪,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,668評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡下面,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,859評論 3 336
  • 正文 我和宋清朗相戀三年复颈,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片沥割。...
    茶點故事閱讀 39,981評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡耗啦,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出机杜,到底是詐尸還是另有隱情帜讲,我是刑警寧澤,帶...
    沈念sama閱讀 35,705評論 5 347
  • 正文 年R本政府宣布椒拗,位于F島的核電站似将,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏蚀苛。R本人自食惡果不足惜在验,卻給世界環(huán)境...
    茶點故事閱讀 41,310評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望枉阵。 院中可真熱鬧译红,春花似錦、人聲如沸兴溜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽拙徽。三九已至刨沦,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間膘怕,已是汗流浹背想诅。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留岛心,地道東北人来破。 一個月前我還...
    沈念sama閱讀 48,146評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像忘古,于是被迫代替她去往敵國和親徘禁。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,933評論 2 355

推薦閱讀更多精彩內(nèi)容