什么是死信隊列
消息變成死信的三種情況:
- 消息被拒絕(basic.reject / basic.nack),并且requeue = false
- 消息TTL過期
- 隊列達到最大長度
“死信”是RabbitMQ中的一種消息機制樟结,當(dāng)消息出現(xiàn)以上三種情況,就會變成死信易结,死信”消息會被RabbitMQ進行特殊處理枕荞,如果配置了死信隊列信息,那么該消息將會被丟進死信隊列中搞动,如果沒有配置躏精,則該消息將會被丟棄。
什么是延遲隊列
延遲隊列就是利用TTL和死信隊列來實現(xiàn)的鹦肿,在過期時間后將消息轉(zhuǎn)發(fā)到死信隊列矗烛,在死信隊列做邏輯處理。從上圖中可以看出箩溃,如果普通消費者收到消息后啥也不干瞭吃,等到了過期時間消息就會轉(zhuǎn)發(fā)到死信隊列中,相當(dāng)于涣旨,消息只是在普通消費者那里暫存了TTL時間歪架,然后交給了死信隊列,也就實現(xiàn)了延時效果霹陡。
延遲隊列使用場景
- 訂單超時未支付自動關(guān)閉
- 召回N天前注冊的用戶
- 訂單購買成功N分鐘后檢查下游環(huán)節(jié)是否正常和蚪,如開通會員權(quán)益
如何配置死信隊列
在業(yè)務(wù)隊列中配置死信隊列信息
'x-dead-letter-exchange' => '',//死信交換機
'x-dead-letter-routing-key' => '',//死信路由key
'x-message-ttl' => '',//超時時間,單位毫秒
'x-max-length' => '',//隊列最大長度
如何讓消息過期
讓消息過期有2種方式烹棉,一種是在隊列中設(shè)置x-message-ttl
//聲明支付通知隊列
$channel->queue_declare($queue_pay_notice,false,false,false,false,false,new \PhpAmqpLib\Wire\AMQPTable([
'x-dead-letter-exchange'=>$exchange_order_delay,
'x-dead-letter-routing-key'=>$routing_key_order_delay,
'x-message-ttl'=>10000,//過期時間10S
]));
這種設(shè)置方式會讓每個消息的過期時間都一樣攒霹,都是10S,如果想靈活的設(shè)置過期時間浆洗,可以再發(fā)送消息的時候設(shè)置expiration
$msg = new \PhpAmqpLib\Message\AMQPMessage('hello',['expiration'=>10000]) //10s后過期;
$channel->basic_publish(msg, $exchange_name, $routing_key);
如何實現(xiàn)死信隊列
普通消費者
死信隊列在普通消費者中的代碼需要處理的工作是最多的催束,需要聲明普通隊列和死信隊列,普通交換機和死信交換機辅髓,并綁定普通交換機的路由鍵泣崩,死信隊列的路由鍵少梁,并在普通隊列中配置死信隊列的交換機和路由鍵。
生產(chǎn)者
生產(chǎn)者還是一如既往只要負責(zé)發(fā)消息即可矫付。生產(chǎn)者并不知道消息最終會進入什么流程凯沪,他只管把消息發(fā)送到對應(yīng)的普通交換機即可。
死信隊列消費者
死信隊列消費者也不需要關(guān)心之前的邏輯买优,只要將收到的消息做對應(yīng)的處理即可妨马。
PHP demo
生產(chǎn)者 send.php
<?php
require_once __DIR__ . '/../../vendor/autoload.php';
$queue_order = 'queue_order';
$queue_delay_order = 'queue_delay_order';
$exchange_delay_order = 'exchange_delay_order';
$routing_delay_order = 'routing_delay_order';
$delay_ttl = 10 * 1000;
//場景 關(guān)閉10分鐘后未支付的訂單
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
$channel = $conn->channel();
$channel->queue_declare($queue_order, false, false, false, false, false, New \PhpAmqpLib\Wire\AMQPTable([
'x-dead-letter-exchange' => $exchange_delay_order,
'x-dead-letter-routing-key' => $routing_delay_order,
'x-message-ttl' => $delay_ttl
]));
//綁定死信隊列
$channel->queue_declare($queue_delay_order, false, false, false, false);
$channel->exchange_declare($exchange_delay_order, 'direct', false, false, false);
$channel->queue_bind($queue_delay_order, $exchange_delay_order, $routing_delay_order);
for ($i = 1; $i <= 10; $i++) {
$msg = json_encode(['orderId' => $i,'time'=>date('H:i:s')]);
echo 'seng msg' . $msg . PHP_EOL;
$msg = new \PhpAmqpLib\Message\AMQPMessage($msg);
$channel->basic_publish($msg, '',$queue_order);
}
$channel->close();
$conn->close();
業(yè)務(wù)消費者 receive.php
<?php
require_once __DIR__ . '/../../vendor/autoload.php';
$queue_order = 'queue_order';
//場景 關(guān)閉10分鐘后未支付的訂單
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
$channel = $conn->channel();
$channel->basic_consume($queue_order, '', false, false, false, false, function ($msg) {
$data = json_decode($msg->body, true);
if ($data['orderId'] % 3 == 0) {//3,6,9
//確認消息
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);//ack
echo 'get msg & ack orderId:' . $data['orderId'] . $msg->body . PHP_EOL;
} else if ($data['orderId'] % 3 == 1) {//1,4,7,10
//不確認消息
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);//nack
echo 'get msg & nack orderId:' . $data['orderId'] . $msg->body . PHP_EOL;
} else {//2,5,8
//拒絕消息
$msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
echo 'get msg & reject orderId:' . $data['orderId'] . $msg->body . PHP_EOL;
}
});
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$conn->close();
延遲隊列消費者 delay_receive.php
<?php
require_once __DIR__ . '/../../vendor/autoload.php';
$queue_delay_order = 'queue_delay_order';
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
$channel = $conn->channel();
$channel->queue_declare($queue_delay_order, false, false, false, false);
$channel->basic_consume($queue_delay_order, '', false, false, false, false, function ($msg) {
echo date('H:i:s').' get msg ' . $msg->body . PHP_EOL;
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);//確認收到消息
});
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$conn->close();
可以看到 nack和reject的消息進入到了死信隊列中。如果沒有運行receive.php杀赢,那么消息將會在設(shè)置的$delay_ttl后進入死信隊列烘跺。