base方法
<?php
namespace core\utils;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
class BaseRabbitmqService
{
//死信隊(duì)列和交換機(jī)
public static $dlxQueue = 'dlx.queue';
public static $dlxExchange = 'dlx.exchange';
//死信之后的隊(duì)列和交換機(jī)
public static $normalQueue = 'normal.queue';
public static $normalExchange = 'normal.exchange';
//消息發(fā)布者的routing_key
public static $msgKey = 'msgkey';
private static function getConfig()
{
return [
'host' => '127.0.0.1',
'port' => 5672,
'name' => 'guest',
'password' => 'guest',
];
}
public static function getConnection()
{
$config = self::getConfig();
$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['name'], $config['password']);
self::init($connection);
return $connection;
}
//初始化一些隊(duì)列信息
private static function init(&$connection)
{
$channel = $connection->channel();
//定義交換機(jī)
$channel->exchange_declare(self::$dlxExchange, AMQPExchangeType::DIRECT, false, true);
$channel->exchange_declare(self::$normalExchange, AMQPExchangeType::FANOUT, false, true);
//定義隊(duì)列究西,在正常隊(duì)列超時(shí)之后就送去死信隊(duì)列
$args = new AMQPTable();
// 消息過期方式:設(shè)置 queue.normal 隊(duì)列中的消息5s之后過期抚笔,毫秒單位
$args->set('x-message-ttl', 5000);
// 設(shè)置隊(duì)列最大長度方式: x-max-length
//$args->set('x-max-length', 1);
$args->set('x-dead-letter-exchange', self::$dlxExchange);
$args->set('x-dead-letter-routing-key', self::$msgKey);
$channel->queue_declare(self::$normalQueue, false, true, false, false, false, $args);
$channel->queue_declare(self::$dlxQueue, false, true, false, false);
$channel->queue_bind(self::$normalQueue, self::$normalExchange);
$channel->queue_bind(self::$dlxQueue, self::$dlxExchange, self::$msgKey);
}
}
生產(chǎn)者
<?php
namespace app\api\controller;
use core\utils\BaseRabbitmqService;
use PhpAmqpLib\Message\AMQPMessage;
class ProducerController extends BaseRabbitmqService
{
public static function doTask()
{
$connection = self::getConnection();
$channel = $connection->channel();
$data = [];
//生成5條數(shù)數(shù)據(jù)
for ($i = 0; $i < 5; $i++) {
$data['user_id'] = mt_rand(1, 100);
$data['order_amount'] = mt_rand(10000, 99999);
$data['order_number'] = mt_rand(100, 999);
// $msg = new AMQPMessage(json_encode($data),
// array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) # 使消息持久化
// );
$msg = new AMQPMessage(json_encode($data));
echo " [x] Send ", date('Y-m-d H:i:s') . '--' . json_encode($data), "\n";
$channel->basic_publish($msg, self::$normalExchange);
}
$channel->close();
$connection->close();
}
}
消費(fèi)者
<?php
namespace app\api\controller;
use core\utils\BaseRabbitmqService;
class ConsumerController extends BaseRabbitmqService
{
public static function doTask()
{
$connection = self::getConnection();
$channel = $connection->channel();
$callback = function ($msg) {
echo " [x] Received ", date('Y-m-d H:i:s') . '--' . $msg->body, "\n";
//主動(dòng)確認(rèn)信息處理完
// $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
//沒有確認(rèn)就手動(dòng)丟給死信隊(duì)列
sleep(10);
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag']);
};
//發(fā)送一個(gè)未處理完就不發(fā)送下一個(gè)
// $channel->basic_qos(null, 1, null);
$channel->basic_consume(self::$normalQueue, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}
死信消費(fèi)者
<?php
namespace app\api\controller;
use core\utils\BaseRabbitmqService;
class ConsumerDeadController extends BaseRabbitmqService {
public static function doTask() {
$connection = self::getConnection();
$channel = $connection->channel();
$callback = function($msg) {
echo " [x] Received ", date('Y-m-d H:i:s') . '--' . $msg->body, "\n";
//主動(dòng)確認(rèn)信息處理完
// $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//發(fā)送一個(gè)未處理完就不發(fā)送下一個(gè)
// $channel->basic_qos(null, 1, null);
$channel->basic_consume(self::$dlxQueue, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}