RabbitMQ 功能實現(xiàn)
引入php-amqplib類庫,類庫地址為https://github.com/php-amqplib/php-amqplib
簡單的示例代碼實現(xiàn)
- 生產(chǎn)者(發(fā)送消息者)
public function producer(){
//創(chuàng)建連接實例
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
//創(chuàng)建一個連接通道
$channel = $connection->channel();
//聲明隊列,如果該隊列不存在會創(chuàng)建
$channel->queue_declare('hello', false, false, false, false);
//創(chuàng)建消息實例
$msg = new AMQPMessage('Hello World1!');
//通過通道,推送消息到隊列中
$channel->basic_publish($msg, '', 'hello');
//關(guān)閉通道
$channel->close();
//關(guān)閉連接
$connection->close();
echo " [x] Sent 'Hello World!'\n";
}
- 消費者(獲取消息者)
public function consumer(){
//創(chuàng)建連接實例
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
//創(chuàng)建一個連接通道
$channel = $connection->channel();
//聲明隊列,如果該隊列不存在會創(chuàng)建
$channel->queue_declare('hello', false, false, false, false);
//創(chuàng)建一個實例(這里用于回調(diào))
$callback_model=new Callback();
//通過通道消費隊列中的信息,并執(zhí)行回調(diào)(這里為array($callback_model,'getQueueInfo'))
$channel->basic_consume('hello', '', false, false, false, false,array($callback_model,'getQueueInfo'));
//當存在回調(diào)時,這里將進入無限循環(huán),每當隊列中被推送新值,就會執(zhí)行回調(diào)
while(count($channel->callbacks)) {
$channel->wait();
}
//關(guān)閉通道
$channel->close();
//關(guān)閉連接
$connection->close();
}
- 回調(diào)方法
class Callback extends Model{
//$channel->basic_consume 執(zhí)行回調(diào)方法時,會傳入$msg對象
public static function getQueueInfo($msg){
//我這里將$msg中的主體(隊列中的消息值) 和 當前進程號 存入表中
$test_model=new Table();
$test_model->content=$msg->body;
$test_model->num=getmypid();
$test_model->save();
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
}
隊列及消息的持久化設(shè)置
- 首先設(shè)置隊列的持久化
//設(shè)置第三個參數(shù)durable 為true
//注意:如果這里hello隊列已存在,RabbitMQ不允許重新定義現(xiàn)有隊列,并且會返回錯誤,這里你可以聲明一個新隊列
$channel->queue_declare('hello', false, true, false, false);
- 設(shè)置消息的持久化
$msg = new AMQPMessage($data,
array('delivery_mode' => 2) //使消息持久化
);
雖然設(shè)置了隊列和消息的持久化,但RabbitMQ可能有時只是存入緩存不是磁盤中,如果需要更強力的保障,請使用 publisher confirms
合理調(diào)度實現(xiàn)
如果你想讓工人處理并確認了當前任務(wù)后再接受新任務(wù),需在消耗信息時設(shè)置
//設(shè)置prefetch_count =1
$channel->basic_qos(null, 1, null); //參數(shù)為1 表示工人當前任務(wù)最多1個
$channel->basic_consume('hello', '', false, false, false, false);
消息確認機制
$channel->basic_consume('hello', '', false, false, false, false,array($callback_model,'getQueueInfo'));
//注意:$channel->basic_consume 的第四個參數(shù)為true時(即 no ack),則為關(guān)閉消息確認
//$channel->basic_consume 的第四個參數(shù)為false時,則為開啟消息確認
//開啟消息確認機制后,回調(diào)方法執(zhí)行消息確認后,該信息才會被消耗. 當該工人或服務(wù)死后,未確認的信息會被再次放入到隊列中
//回調(diào)方法中執(zhí)行
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);