Rabbit MQ 死信隊列、延遲隊列

什么是死信隊列

消息變成死信的三種情況:

  • 消息被拒絕(basic.reject / basic.nack),并且requeue = false
  • 消息TTL過期
  • 隊列達到最大長度

“死信”是RabbitMQ中的一種消息機制樟结,當(dāng)消息出現(xiàn)以上三種情況,就會變成死信易结,死信”消息會被RabbitMQ進行特殊處理枕荞,如果配置了死信隊列信息,那么該消息將會被丟進死信隊列中搞动,如果沒有配置躏精,則該消息將會被丟棄。


死信隊列示意圖

什么是延遲隊列

延遲隊列就是利用TTL和死信隊列來實現(xiàn)的鹦肿,在過期時間后將消息轉(zhuǎn)發(fā)到死信隊列矗烛,在死信隊列做邏輯處理。從上圖中可以看出箩溃,如果普通消費者收到消息后啥也不干瞭吃,等到了過期時間消息就會轉(zhuǎn)發(fā)到死信隊列中,相當(dāng)于涣旨,消息只是在普通消費者那里暫存了TTL時間歪架,然后交給了死信隊列,也就實現(xiàn)了延時效果霹陡。

延遲隊列使用場景

  • 訂單超時未支付自動關(guān)閉
  • 召回N天前注冊的用戶
  • 訂單購買成功N分鐘后檢查下游環(huán)節(jié)是否正常和蚪,如開通會員權(quán)益

如何配置死信隊列

在業(yè)務(wù)隊列中配置死信隊列信息

    'x-dead-letter-exchange' => '',//死信交換機
    'x-dead-letter-routing-key' => '',//死信路由key
    'x-message-ttl' => '',//超時時間,單位毫秒 
    'x-max-length' => '',//隊列最大長度  

如何讓消息過期

讓消息過期有2種方式烹棉,一種是在隊列中設(shè)置x-message-ttl

//聲明支付通知隊列
$channel->queue_declare($queue_pay_notice,false,false,false,false,false,new \PhpAmqpLib\Wire\AMQPTable([
    'x-dead-letter-exchange'=>$exchange_order_delay,
    'x-dead-letter-routing-key'=>$routing_key_order_delay,
    'x-message-ttl'=>10000,//過期時間10S
]));

這種設(shè)置方式會讓每個消息的過期時間都一樣攒霹,都是10S,如果想靈活的設(shè)置過期時間浆洗,可以再發(fā)送消息的時候設(shè)置expiration

$msg = new \PhpAmqpLib\Message\AMQPMessage('hello',['expiration'=>10000]) //10s后過期;
 $channel->basic_publish(msg, $exchange_name, $routing_key);

如何實現(xiàn)死信隊列

普通消費者

死信隊列在普通消費者中的代碼需要處理的工作是最多的催束,需要聲明普通隊列和死信隊列,普通交換機和死信交換機辅髓,并綁定普通交換機的路由鍵泣崩,死信隊列的路由鍵少梁,并在普通隊列中配置死信隊列的交換機和路由鍵。

生產(chǎn)者

生產(chǎn)者還是一如既往只要負責(zé)發(fā)消息即可矫付。生產(chǎn)者并不知道消息最終會進入什么流程凯沪,他只管把消息發(fā)送到對應(yīng)的普通交換機即可。

死信隊列消費者

死信隊列消費者也不需要關(guān)心之前的邏輯买优,只要將收到的消息做對應(yīng)的處理即可妨马。

PHP demo

生產(chǎn)者 send.php

<?php
require_once __DIR__ . '/../../vendor/autoload.php';

$queue_order = 'queue_order';
$queue_delay_order = 'queue_delay_order';
$exchange_delay_order = 'exchange_delay_order';
$routing_delay_order = 'routing_delay_order';
$delay_ttl = 10 * 1000;
//場景 關(guān)閉10分鐘后未支付的訂單
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
$channel = $conn->channel();

$channel->queue_declare($queue_order, false, false, false, false, false, New \PhpAmqpLib\Wire\AMQPTable([
    'x-dead-letter-exchange' => $exchange_delay_order,
    'x-dead-letter-routing-key' => $routing_delay_order,
    'x-message-ttl' => $delay_ttl
]));

//綁定死信隊列
$channel->queue_declare($queue_delay_order, false, false, false, false);
$channel->exchange_declare($exchange_delay_order, 'direct', false, false, false);
$channel->queue_bind($queue_delay_order, $exchange_delay_order, $routing_delay_order);
for ($i = 1; $i <= 10; $i++) {
    $msg = json_encode(['orderId' => $i,'time'=>date('H:i:s')]);
    echo 'seng msg' . $msg . PHP_EOL;
    $msg = new \PhpAmqpLib\Message\AMQPMessage($msg);
    $channel->basic_publish($msg, '',$queue_order);
}

$channel->close();
$conn->close();

業(yè)務(wù)消費者 receive.php

<?php
require_once __DIR__ . '/../../vendor/autoload.php';

$queue_order = 'queue_order';
//場景 關(guān)閉10分鐘后未支付的訂單
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
$channel = $conn->channel();

$channel->basic_consume($queue_order, '', false, false, false, false, function ($msg) {
    $data = json_decode($msg->body, true);

    if ($data['orderId'] % 3 == 0) {//3,6,9
       //確認消息
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);//ack
        echo 'get msg & ack orderId:' . $data['orderId'] . $msg->body . PHP_EOL;
    } else if ($data['orderId'] % 3 == 1) {//1,4,7,10
       //不確認消息
        $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);//nack 
        echo 'get msg & nack orderId:' . $data['orderId'] . $msg->body . PHP_EOL;
    } else {//2,5,8
       //拒絕消息
        $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
        echo 'get msg & reject orderId:' . $data['orderId'] . $msg->body . PHP_EOL;
    }

});

while ($channel->is_consuming()) {
    $channel->wait();
}
$channel->close();
$conn->close();

延遲隊列消費者 delay_receive.php

<?php
require_once __DIR__ . '/../../vendor/autoload.php';

$queue_delay_order = 'queue_delay_order';
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
$channel = $conn->channel();

$channel->queue_declare($queue_delay_order, false, false, false, false);
$channel->basic_consume($queue_delay_order, '', false, false, false, false, function ($msg) {
    echo date('H:i:s').' get msg ' . $msg->body . PHP_EOL;
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);//確認收到消息
});

while ($channel->is_consuming()) {
    $channel->wait();
}
$channel->close();
$conn->close();
receive.php執(zhí)行結(jié)果
delay_receive.php執(zhí)行結(jié)果

可以看到 nack和reject的消息進入到了死信隊列中。如果沒有運行receive.php杀赢,那么消息將會在設(shè)置的$delay_ttl后進入死信隊列烘跺。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市脂崔,隨后出現(xiàn)的幾起案子滤淳,更是在濱河造成了極大的恐慌,老刑警劉巖砌左,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件脖咐,死亡現(xiàn)場離奇詭異,居然都是意外死亡汇歹,警方通過查閱死者的電腦和手機屁擅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來产弹,“玉大人派歌,你說我怎么就攤上這事√瞪冢” “怎么了胶果?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長作谭。 經(jīng)常有香客問我稽物,道長,這世上最難降的妖魔是什么折欠? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任贝或,我火速辦了婚禮,結(jié)果婚禮上锐秦,老公的妹妹穿的比我還像新娘咪奖。我一直安慰自己,他們只是感情好酱床,可當(dāng)我...
    茶點故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布羊赵。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪昧捷。 梳的紋絲不亂的頭發(fā)上闲昭,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天,我揣著相機與錄音靡挥,去河邊找鬼序矩。 笑死,一個胖子當(dāng)著我的面吹牛跋破,可吹牛的內(nèi)容都是我干的簸淀。 我是一名探鬼主播,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼毒返,長吁一口氣:“原來是場噩夢啊……” “哼租幕!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起拧簸,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤劲绪,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后盆赤,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體珠叔,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年弟劲,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片姥芥。...
    茶點故事閱讀 39,902評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡兔乞,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出凉唐,到底是詐尸還是另有隱情庸追,我是刑警寧澤,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布台囱,位于F島的核電站淡溯,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏簿训。R本人自食惡果不足惜咱娶,卻給世界環(huán)境...
    茶點故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望强品。 院中可真熱鬧膘侮,春花似錦、人聲如沸的榛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽夫晌。三九已至雕薪,卻和暖如春昧诱,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背所袁。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工盏档, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人纲熏。 一個月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓妆丘,卻偏偏與公主長得像,于是被迫代替她去往敵國和親局劲。 傳聞我的和親對象是個殘疾皇子勺拣,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容