什么時(shí)候使用
用于系統(tǒng)的物理解耦和邏輯解耦
場(chǎng)景
- 削峰填谷
- 數(shù)據(jù)驅(qū)動(dòng)的任務(wù)依賴
- 多個(gè)接收方刁憋,上游不關(guān)心多下游執(zhí)行結(jié)果
- upsteam 關(guān)注結(jié)果但時(shí)間很長(zhǎng)
消息隊(duì)列的推送
- 推送 push
- 優(yōu)點(diǎn)
- 實(shí)時(shí)性好
- 中間件服務(wù)器做負(fù)載均衡
- 缺點(diǎn)
- 需要確認(rèn)收到
- 優(yōu)點(diǎn)
- 拉取 pull
- 優(yōu)點(diǎn)
- 可以拉取多條
- 服務(wù)端邏輯少
- 缺點(diǎn):
- 可能導(dǎo)致消息堆積
- 消費(fèi)端主動(dòng)輪訓(xùn)
- 優(yōu)點(diǎn)
rabbitmq的push和pull
- 拉
這種方式Api比較簡(jiǎn)單醇滥,但是需要自己控制拉取節(jié)奏佩谣,
func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error)
Get synchronously receives a single Delivery from the head of a queue from the server to the client. In almost all cases, using Channel.Consume will be preferred.
If there was a delivery waiting on the queue and that delivery was received, the second return value will be true. If there was no delivery waiting or an error occurred, the ok bool will be false.
- 推
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
c.deliveryChan, err = c.channel.Consume(c.queueName,
c.tag,
false,
false,
false,
false,
nil)
if err != nil {
c.RabbitMQClient.Close()
return err
}
// important: 在一個(gè)goroutine中同時(shí)對(duì)msgs和notifyClose兩個(gè)channel進(jìn)行讀取可能會(huì)導(dǎo)致死鎖凡怎。
// 因?yàn)閙sgs被關(guān)閉就會(huì)結(jié)束相應(yīng)的goroutine,
// 此時(shí)notifyClose因?yàn)闆](méi)有接收者宫蛆,而在amqp.channel關(guān)閉的過(guò)程中出現(xiàn)死鎖艘包。
go c.handle(c.deliveryChan) //另外啟動(dòng)一個(gè)攜程處理任務(wù)
消息不重復(fù)消費(fèi)
在消息中添加唯一的消息ID的猛;同時(shí)確保消息的冪等性;
消息不丟
消息發(fā)送的時(shí)候Producer要收到rabbitmq的Confirm消息想虎;消費(fèi)端收到消息后應(yīng)該給rabbitmq發(fā)送ACK卦尊;
保證送達(dá)
- 在保證消息不丟的前提下,在發(fā)送到rabbitmq之后寫入數(shù)據(jù)庫(kù)舌厨,當(dāng)消息被consumer處理之后更新數(shù)據(jù)庫(kù)中的狀態(tài)岂却;
- 啟動(dòng)一個(gè)異步認(rèn)為定時(shí)檢查數(shù)據(jù)庫(kù)中的任務(wù),如果狀態(tài)沒(méi)有被更新就取出來(lái)重新發(fā)送到消息隊(duì)列裙椭;
- 在保證消息冪等性的前提下躏哩,可以保證消息被送達(dá)
消息順序性
消息中只有一個(gè)接收者的情況下,可以保證消息的順序消費(fèi)
消息隊(duì)列的延時(shí)以及過(guò)期失效問(wèn)題
延時(shí)隊(duì)列可以通過(guò)以下2種方式實(shí)現(xiàn):
-
死信隊(duì)列
-
死信產(chǎn)生:
- 消息被拒絕(basic.reject / basic.nack)揉燃,并且requeue = false
- 消息TTL過(guò)期
- 隊(duì)列達(dá)到最大長(zhǎng)度
死信說(shuō)明
DLX也是一個(gè)正常的Exchange扫尺,和一般的Exchange沒(méi)有區(qū)別,它能在任 何的隊(duì)列上被指定炊汤,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性正驻。當(dāng)這個(gè)隊(duì)列中有死信時(shí),RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的Exchange上去抢腐,進(jìn)而被路由到另一個(gè)隊(duì)列拨拓。可以監(jiān)聽(tīng)這個(gè)隊(duì)列中的消息做相應(yīng)的處理氓栈。延時(shí)消息
基于上面的說(shuō)明,發(fā)送一條消息到一個(gè)沒(méi)有consumer的exchange 并設(shè)置ttl的過(guò)期時(shí)間為我們需要延時(shí)的時(shí)間比如30(秒),當(dāng)ttl過(guò)期之后就會(huì)根據(jù)私信dlx.exchange路由到指定的queue中婿着;然后再死信隊(duì)列中的consumer復(fù)制消費(fèi)這個(gè)消息
-
-
使用插件 delayed_exchange
rabbitmq-delayed-message-exchange
創(chuàng)建exchange的時(shí)候需要按照下圖所示指定類型arg := make(map[string]interface{}) arg["x-delayed-type"] = "topic" ex := mq.Exchange{ Name: "delay-task2", Kind: "x-delayed-message", Args: arg, }
發(fā)送message的時(shí)候指定消息的delay的時(shí)間
table := make(map[string]interface{}) table["x-delay"] = 3000 // 指定delay 3s err := p.Publish(ctx, d, "delay.order.abc", table);
限流
// 限定prefetch count prefetch_size, global
//Qos controls how many messages or how many bytes the server will try to keep on the network for consumers before receiving delivery acks. The intent of Qos is to make sure the network buffers stay full between the server and client.
if err := c.channel.Qos(1, 0, false); err != nil {
}
c.deliveryChan, err = c.channel.Consume(c.queueName,
c.tag,
false, // 關(guān)閉auto ack
false,
false,
false,
nil)
if err != nil {
c.RabbitMQClient.Close()
return err
}
斷線重連
單獨(dú)啟動(dòng)一個(gè)協(xié)程授瘦,執(zhí)行下面的操作
func (c *RabbitMQConsumer) reconnect() error {
// 重新連接成功之后,重新執(zhí)行consume竟宋;
for {
// 是否發(fā)生錯(cuò)誤
select {
case err := <-c.connNotify:
if err != nil {
log.Println("rabbitmq consumer - connection NotifyClose: ", err)
}
case err := <-c.channelNotify:
if err != nil {
log.Println("rabbitmq consumer - channel NotifyClose: ", err)
}
case <-c.quit:
return nil
}
// 連接未關(guān)閉
if !c.conn.IsClosed() {
var errConn, errChannel *amqp.Error
for errChannel = range c.channelNotify {
log.Println(errChannel)
}
for errConn = range c.connNotify {
log.Println(errConn)
}
// 關(guān)閉 SubMsg common delivery
if err := c.channel.Cancel(c.tag, true); err != nil {
log.Println("rabbitmq consumer - channel cancel failed: ", err)
}
if err := c.channel.Close(); err != nil {
}
if err := c.conn.Close(); err != nil {
log.Println("rabbitmq consumer - connection close failed: ", err)
}
} else {
log.Println("conn is closed")
}
// IMPORTANT: 必須清空 Notify提完,否則死連接不會(huì)釋放 如果還有error一起讀完否則連接不能釋放
retry:
for {
select {
case <-c.quit:
return nil
default:
log.Println("rabbitmq consumer - reconnect")
// 第二次重新連接
if err := c.Init(); err != nil {
// 等待;然后重試
time.Sleep(time.Second * 10)
log.Println("loop again continue")
continue
}
break retry
}
}
}
}
高可用
主備
只有主節(jié)點(diǎn)提供讀寫丘侠;備用節(jié)點(diǎn)只是在主節(jié)點(diǎn)掛掉的情況下服務(wù)徒欣;并發(fā)量并不大的情況可以使用haproxy做主備;-
鏡像模式
mirror鏡像隊(duì)列蜗字;保證rabbitmq數(shù)據(jù)的高可靠性打肝,實(shí)現(xiàn)數(shù)據(jù)同步2-3個(gè)節(jié)點(diǎn)的數(shù)據(jù)同步;前端需要自己做負(fù)載均衡
image.png Federation
在broker之間傳輸消息的插件
https://github.com/rabbitmq/rabbitmq-federation
如上圖所示挪捕,F(xiàn)ederation Exchanges,可以看成Downstream從Upstream主動(dòng)拉取消息粗梭,但并不是拉取所有消息,必須是在Downstream上已經(jīng)明確定義Bindings關(guān)系的Exchange级零,也就是有實(shí)際的物理Queue來(lái)接收消息断医,才會(huì)從Upstream拉取消息到Downstream。使用AMQP協(xié)議實(shí)施代理間通信,Downstream會(huì)將綁定關(guān)系組合在一起鉴嗤,綁定/解綁命令將會(huì)發(fā)送到Upstream交換機(jī)斩启。因此,F(xiàn)ederationExchange只接收具有訂閱的消息醉锅。