參考文檔地址:
http://rabbitmq.mr-ping.com/tutorials_with_golang/[6]RPC.html
抽出來的包 rabbitmq.go
import (
"github.com/streadway/amqp"
"log"
"sync"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
var pool = sync.Pool{
New: func() interface{} {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
return conn
},
}
type MessageQueue struct {
conn *amqp.Connection // 持有連接對象
ch *amqp.Channel // 持有channel 對象
ExchangeName string // 持有交換機的名稱
QueueName string // 接受消息的隊列名稱
}
// 獲取到交換機
func (mq *MessageQueue) getExchange(exchange string) {
ch, err := mq.conn.Channel()
failOnError(err, "Failed to open a channel")
mq.ch = ch
err = ch.ExchangeDeclare(
exchange, // name
"direct", // type
true, // durable 持久化消息
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
}
// 發(fā)送消息
func (mq *MessageQueue)SendMessage(msgType, msg string) {
mq.conn = pool.Get().(* amqp.Connection)
// 獲取到交換機
mq.getExchange(mq.ExchangeName)
// 推送消息
err := mq.ch.Publish(
mq.ExchangeName, // exchange
msgType, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
})
failOnError(err, "Failed to publish a message")
}
// 關(guān)閉資源
func (mq *MessageQueue)Close(){
pool.Put(mq.conn) // 連接放回
mq.ch.Close()
}
// 獲取到消息
func (mq *MessageQueue)GetMessage(msgType string) <-chan string {
mq.conn = pool.Get().(* amqp.Connection)
// 獲取到交換機
mq.getExchange(mq.ExchangeName)
// 存儲 臨時交換隊列
q, err := mq.ch.QueueDeclare(
mq.QueueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = mq.ch.QueueBind(
q.Name, // queue name
msgType, // routing key
mq.ExchangeName, // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
// 設(shè)置逐個消費消息
err = mq.ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := mq.ch.Consume(
q.Name, // queue
"", // consumer
false, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
ret := make(chan string)
println("進來循環(huán)處理消息")
go func() {
for {
select {
case d := <-msgs:
mess := string(d.Body)
println("獲得的消息:" + mess)
ret <- mess
d.Ack(false) // 標(biāo)記消息被消費掉了
}
}
}()
return ret
}
客戶端
func main() {
args := os.Args
message := rabbit.MessageQueue{ExchangeName:"messageQueue"}
defer message.Close()
message.SendMessage(args[1],args[2])
}
服務(wù)端
func main() {
// 接受消息并處理
message := rabbit.MessageQueue{ExchangeName:"messageQueue",QueueName:"messageQueueData"}
defer message.Close()
msg := message.GetMessage(os.Args[1])
sigs := make(chan os.Signal, 1)
// 接受用戶中斷和,ctrl+c 和 用戶中斷
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
ms := make([]string,0)
for {
select {
case m := <-msg:
println(" 開始處理消息: ")
println(m)
ms = append(ms, m)
time.Sleep( time.Second * 10 )
println(" 消息處理完畢: ")
case sig := <-sigs:
ioutil.WriteFile("a.txt", []byte(strings.Join(ms,"\n")) , os.ModePerm)
fmt.Println("%v", sig)
return
}
}
}