實(shí)現(xiàn)步驟:
- 創(chuàng)建兩個(gè)交換機(jī)和隊(duì)列:死信隊(duì)列和死信隊(duì)列上游隊(duì)列
- 死信隊(duì)列的特性:
- 插入一個(gè)沒有過期時(shí)間的消息會導(dǎo)致 丁逝,隊(duì)列永遠(yuǎn)不會被消費(fèi)
- 前面的過期時(shí)間太長了蚓耽,后面的消息就算過期也不會被消費(fèi)的
- 按隨機(jī)串的方式來生成 交換機(jī)和隊(duì)列,保證一個(gè)交換機(jī)只有一條消息,消費(fèi)完以后刪除掉
- 這里是設(shè)置消息的過期時(shí)間方式來實(shí)現(xiàn)的延時(shí)任務(wù)
- 只需要監(jiān)聽死信隊(duì)列的上游隊(duì)列就可以實(shí)現(xiàn)想要的效果
<?php
namespace app\index\controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use think\Log;
class Index
{
const consumerTag = 'consumer'; // 消費(fèi)者標(biāo)簽
const exchange = 'router'; // 交換機(jī)名
const queue = 'msgs';// 的隊(duì)列名
/**
* 推入消息到隊(duì)列中
*/
public static function pushMessage()
{
// 連接rabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'dome', 'dome', '/');
// 開啟一個(gè)信道
$channel = $connection->channel();
// 動(dòng)態(tài)的創(chuàng)建交換機(jī)和隊(duì)列
// 以免在死信隊(duì)列中出現(xiàn)時(shí)間太長的消息,造成隊(duì)列阻塞
// 過期時(shí)長(毫秒)
$timeout = 10000;
$rand_num = time() . rand(100000, 999999);
$exchange_name = "cache_exchange_" . $rand_num;
$queue_name = "queue_" . $rand_num;
// exchange 交換機(jī)名稱
// type 交換器類型
// passive 檢測交換機(jī)是否存在 true 只檢測不創(chuàng)建 false 創(chuàng)建
// durable 是否持久化隊(duì)列 true 為持久化
// auto_delete 當(dāng)所有綁定隊(duì)列都不在使用時(shí)椅您,是否自動(dòng)刪除交換器 true:刪除false:不刪除
/**死信隊(duì)列交換機(jī)*/
$channel->exchange_declare($exchange_name, 'direct', false, false, false);
/**死信隊(duì)列上游交換機(jī), 隊(duì)列內(nèi)的消息過期以后會把消息推到這個(gè)隊(duì)列中*/
$channel->exchange_declare('delay_exchange', 'direct',false,false,false);
// 兩個(gè)參數(shù)寡键,來控制隊(duì)列出現(xiàn) dead letter 的時(shí)候掀泳,重新發(fā)送消息的目的地
$tale = new AMQPTable();
$tale->set('x-dead-letter-exchange', 'delay_exchange');// 表示過期后由哪個(gè)exchange處理
$tale->set('x-dead-letter-routing-key', 'delay_exchange');
$tale->set('x-message-ttl', $timeout); //存活時(shí)長(毫秒), 下面的過期時(shí)間不能超過
// 綁定隊(duì)列和交換機(jī) --- 死信隊(duì)列
// queue 隊(duì)列名
// passive 檢測隊(duì)列是否存在 true 只檢測不創(chuàng)建 false 創(chuàng)建
// durable 是否持久化隊(duì)列 true 為持久化
// exclusive 私有隊(duì)列 不允許其它用戶訪問 設(shè)置true 將會變成私有
// auto_delete 當(dāng)所有消費(fèi)客戶端連接斷開后西轩,是否自動(dòng)刪除隊(duì)列
$channel->queue_declare($queue_name, false,true,false,false,false, $tale);
$channel->queue_bind($queue_name, $exchange_name, $exchange_name);
// 綁定隊(duì)列和交換機(jī) --- 死信隊(duì)列 上游隊(duì)列
// 消息過期會交給這個(gè)隊(duì)列進(jìn)行處理
$channel->queue_declare('delay_queue', false, true, false, false);
$channel->queue_bind('delay_queue', 'delay_exchange', 'delay_exchange');
// 寫入隊(duì)列的消息
$data['message'] = [
'name' => '隊(duì)列的消息體'
];
$data['exchange'] = $exchange_name; // 死信隊(duì)列交換機(jī)
$data['queue'] = $queue_name; // 死信隊(duì)列
$messageBody = json_encode($data) ;
// 消息內(nèi)容
// delivery_mode 投遞模式 delivery mode 設(shè)置為 2標(biāo)記持久化
$message = new AMQPMessage(
$messageBody,
array(
'expiration' => intval($timeout),
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
));
// $message 消息內(nèi)容
// $exchange 交換器名稱
// routing_key 路由鍵 (routing key) 主題交換機(jī)會用到
$channel->basic_publish($message, $exchange_name, $exchange_name);
// 關(guān)閉信道
$channel->close();
//關(guān)閉 amqp 連接
$connection->close();
return "ok";
}
function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
Log::info("closed",3);
}
function process_message($message)
{
// *****這里寫自己的邏輯员舵,我這里測試方便寫了一個(gè)獲取消息寫到日志中
Log::info("error data111111111111111:" . json_encode($message) , 2);
// *****這里寫自己的邏輯,我這里測試方便寫了一個(gè)獲取消息寫到日志中
$body = $body = json_decode($message->body, true) ;
$exchange_name = $body['exchange'];
$queue_name = $body['queue'];
// 一個(gè)隊(duì)列只有一條消息藕畔,消費(fèi)完就直接刪除马僻,交換機(jī)和隊(duì)列
$message->delivery_info['channel']->exchange_delete($exchange_name);
$message->delivery_info['channel']->queue_delete($queue_name);
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {
$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
}
}
/**
* 啟動(dòng)
*
* @return \think\Response
*/
public function start()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'dome', 'dome', '/');
$channel = $connection->channel();
$channel->queue_declare('delay_queue', false, true, false, false);
$channel->exchange_declare('delay_exchange', 'direct', false, false, false);
$channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');
// queue 隊(duì)列名稱
// consumer_tag 消費(fèi)者標(biāo)簽
// no_ack 在設(shè)置了 no_ack=false 的情況下)只要 consumer 手動(dòng)應(yīng)答了 Basic.Ack ,就算其“成功”處理了
// no_ack=true (此時(shí)為自動(dòng)應(yīng)答)
// exclusive 是否是私有隊(duì)列 設(shè)置true 將會變成私有
// callback = null, 回調(diào)函數(shù)
$channel->basic_consume('delay_queue', self::consumerTag, false, false, false, false, array($this, 'process_message'));
// 不管你的php代碼執(zhí)行是否成功注服,最后都會執(zhí)行 shutdown方法韭邓,關(guān)閉信道和連接
register_shutdown_function(array($this, 'shutdown'), $channel, $connection);
while (count($channel->callbacks)) {
$channel->wait();
}
Log::info ("starting",3);
}
}