前言
在日常開發(fā)中,消息隊列能幫我們解決系統(tǒng)的異步問題涩赢,流量的控制和服務(wù)解耦咙轩,不同的消息隊列有不同的消費模型
思考
redis也可以實現(xiàn)消息隊列(list和stream)苞慢,也稱為輕量級消息隊列蟹演,list實現(xiàn)的缺點在哪里风钻?stream類型怎么用顷蟀?
RabbitMq
具體概念的東西網(wǎng)上很多酒请,文檔也有詳細描述這里不做過多闡述,本文主要以PHP代碼為主進行實驗鸣个,消息隊列之rabbitmq
docker run -d --name mq \
-p 5672:5672 -p 15672:15672 \
-v /home/docker/mq/data:/var/lib/rabbitmq --hostname myRabbit \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin ee045987e252
--hostname myRabbit 是因為rabbitmq是基于Node節(jié)點名的
官方講rabbitmq比喻為郵局羞反,queue比喻為郵局里的郵箱布朦,我們要寄信(producer發(fā)送message),就需要把信賽到(send)郵箱昼窗,或者你交給前臺窗口(exchange)讓他幫你寄是趴,但是不同的前臺窗口提供的服務(wù)不同,因為呀不同的郵箱他發(fā)往的地址不同澄惊,有的需要你指定投到哪些郵箱(exchange類型為direct類型時要求完全匹配routing-key)唆途,有的只需要你告訴他投到郵箱的大致有什么特點就行(exchange類型為topic時routing-key模糊匹配就行),還有的是把信件復(fù)制多個(魔法)往每個郵箱都塞一封(exchange類為fanout)掸驱。最后有的郵箱在派送員派送完信件后要求收信人(consumer)簽個字(ack驗證)才能把信給他
下面例子基于官方提供的包,
composer require php-amqplib/php-amqplib
例子來自小猴子喝牛奶的博客
Direct-Exchange
生產(chǎn)者
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//獲取終端提示用戶輸入的數(shù)據(jù)
fwrite(STDOUT, "Please enter a message:\n");
$msg_str = fgets(STDIN);
//建立生產(chǎn)者與mq之間的連接 -------動身前往郵局
//參數(shù):地址肛搬,端口,賬號毕贼,密碼温赔,虛擬機名
//注意這個虛擬機名為綁定-e RABBITMQ_DEFAULT_VHOST=my_vhost參數(shù)時指定的
$connection = new AMQPStreamConnection('容器ip', 5672, 'admin', 'my_vhost');
//在已連接基礎(chǔ)上建立生產(chǎn)者與mq之間的通道-----進郵局門
$channel = $connection->channel();
//聲明初始化交換機,交換機不存在則創(chuàng)建----找前臺窗口
//參數(shù):交換機名鬼癣,路由類型陶贼,是否檢測同名隊列,是否開啟隊列持久化待秃,通道關(guān)閉后是否刪除隊列
$channel->exchange_declare('ex_direct', 'direct', false, true, false);
//聲明初始化一條隊列拜秧,隊列不存在則創(chuàng)建-----告訴前臺窗口要什么郵箱
//參數(shù):隊列名,是否檢測同名隊列章郁,是否開啟隊列持久化腹纳,是否能被其他隊列訪問,通道關(guān)閉后是否刪除隊列
$channel->queue_declare('ex_direct_queue', false, false, false, false);
//前臺窗口找你要的郵箱
//將隊列與某個交換機進行綁定驱犹,并使用路由關(guān)鍵字
//參數(shù):隊列名嘲恍,交換機名,路由鍵名
$channel->queue_bind('ex_direct_queue', 'ex_direct', 'hello');
//把信給封好
//生成消息
$msg = new AMQPMessage($msg_str);
//推送消息到某個交換機------把信給前臺
//參數(shù):消息雄驹,交換機名佃牛,路由鍵名
//就如同前面所講,你只需要把信給前臺医舆,并告訴他投給哪些指定的郵箱即可
$channel->basic_publish($msg, 'ex_direct', 'hello');
echo " [x] Sent: $msg_str \n";
$channel->close();
$connection->close();
消費者
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//快遞員進門
$connection = new AMQPStreamConnection('容器ip', 5672, 'admin', 'my_vhost');
$channel = $connection->channel();
//快遞員先看看給自己任務(wù)的前臺在不在
$channel->exchange_declare('ex_direct', 'direct', false, true, false);
//在看看自己負責的郵箱在不在
$channel->queue_declare('ex_direct_queue', false, false, false, false);
$channel->queue_bind('ex_direct_queue', 'ex_direct', 'hello');
//執(zhí)行上面的步驟主要是為保證這些目標交換機和隊列已經(jīng)存在
//這里是收信人的動作
$callback = function($msg) {
//打印消息
echo " [x] Received ", $msg->body, "\n";
//消息確認
$msg->ack();
};
//第三個參數(shù)為true表示了這個郵箱規(guī)定了收信人必須簽名
//參數(shù):隊列名俘侠,消費者標識符,不接收此使用者發(fā)布的消息蔬将,使用者是否使用自動確認模式爷速,請求獨占使用者訪問,不等待霞怀,消息回調(diào)函數(shù)
$channel->basic_consume('ex_direct_queue', 'consumer1', false, true, false, false, $callback);
//快遞員看有沒有信惫东,有就立馬寄
//監(jiān)聽通道消息
while(count($channel->callbacks)) {
$channel->wait();
}
思考
1.萬一RabbitMQ崩潰了退出了怎么辦?里面的隊列和消息會不會消失,這需要我們在聲明交換機和隊列時候廉沮,讓他保證持久化
//第4個參數(shù)為true
$channel->exchange_declare('ex_direct', 'direct', false, true, false);
//第三個參數(shù)為true颓遏,這里不能在已存在的隊列上加持久化
$channel->queue_declare('hello', false, true, false, false);
//里面的消息也保證持久化
//第二個參數(shù)為數(shù)組
$msg = new AMQPMessage('你的消息', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] );
2.Rabbitmq默認一旦發(fā)送消息給客戶端后就立即刪除,那萬一消費者收到消息后要執(zhí)行一個耗時任務(wù)滞时,但是中途異常退出了叁幢,那么這個消息不就丟了嗎(比如上面的回調(diào)函數(shù)中sleep(10)但是我們中涂把他kill掉導(dǎo)致沒發(fā)送ack碼)。
們希望消費者完成消息處理后發(fā)送ack確認坪稽,rabbitMQ收到后才能對消息刪除曼玩。
//即在消費者綁定隊列時第4個參數(shù)為false
$channel->basic_consume('ex_direct_queue', 'consumer1', false, false, false, false, $callback);
3.快遞員有多個,那么萬一有的快遞員要寄很多信窒百,有的在偷懶怎么辦演训?
利用函數(shù)進行公平調(diào)度
//消費者代碼添加,表示在等待消費者處理完消息后才能再接受消息贝咙,不堆積消息
$channel->basic_qos(null, 1, null);
Topic-Exchange
和Direct代碼基本相同不同的是綁定交換機是時的'direct'成了'topic'
注意样悟,routing-key是模糊匹配,這里并不是參考正則庭猩,*表示多個字符窟她,#表示一個字符如 .log. 匹配 aaa.log.aaa
Fanout-Exchange
又稱發(fā)布與訂閱,即向與交換機的所有隊列廣播消息蔼水,既然是廣播震糖,那么我們就不需要考慮消息的ack了
生產(chǎn)者
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
//獲取終端提示用戶輸入的數(shù)據(jù)
fwrite(STDOUT, "Please enter a message:\n");
$msg_str = fgets(STDIN);
//建立生產(chǎn)者與mq之間的連接
//參數(shù):地址,端口趴腋,賬號吊说,密碼
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
//在已連接基礎(chǔ)上建立生產(chǎn)者與mq之間的通道
$channel = $connection->channel();
//聲明初始化交換機
//參數(shù):交換機名,路由類型优炬,是否檢測同名隊列颁井,是否開啟隊列持久化,通道關(guān)閉后是否刪除隊列
$channel->exchange_declare('mq_sms_send_ex3', AMQPExchangeType::FANOUT, false, false, false);
//生成消息
$msg = new AMQPMessage($msg_str);
//推送消息到某個交換機
//參數(shù):消息蠢护,交換機名雅宾,路由鍵名
$channel->basic_publish($msg, 'mq_sms_send_ex3');
echo " [x] Sent: $msg_str \n";
$channel->close();
$connection->close();
消費者
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
//聲明初始化交換機
//參數(shù):交換機名,路由類型葵硕,是否檢測同名隊列眉抬,是否開啟隊列持久化,通道關(guān)閉后是否刪除隊列
$channel->exchange_declare('mq_sms_send_ex3', AMQPExchangeType::FANOUT, false, false, false);
//聲明初始化一條隊列
//參數(shù):隊列名懈凹,是否檢測同名隊列蜀变,是否開啟隊列持久化,是否能被其他隊列訪問介评,通道關(guān)閉后是否刪除隊列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
//將隊列與某個交換機進行綁定库北,并使用路由關(guān)鍵字
//參數(shù):隊列名,交換機名,路由鍵名
$channel->queue_bind($queue_name, 'mq_sms_send_ex3');
echo ' [*] Waiting for messages', "\n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
//判斷獲取到quit后
if (trim($msg->body) == 'quit') {
$msg->getChannel()->basic_cancel($msg->getConsumerTag());
}
};
$channel->basic_qos(null, 1, null);
//參數(shù):隊列名贤惯,消費者標識符,不接收此使用者發(fā)布的消息棒掠,使用者是否使用自動確認模式孵构,請求獨占使用者訪問,不等待烟很,消息回調(diào)函數(shù)
$channel->basic_consume($queue_name, 'consumer1', false, true, false, false, $callback);
死信隊列
即延遲隊列颈墅,講消息發(fā)送到指定的隊列,消息要在隊列中待到指定時間(ttl)后才能被發(fā)送給消費者
如何保證死信隊列在消息過去后才把消息發(fā)給業(yè)務(wù)交換機---不設(shè)置消費者(快遞員)不就行了
生產(chǎn)者
require_once '../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Wire\AMQPWriter;
fwrite(STDOUT, "Please enter a message:\n");
$msg_str = fgets(STDIN);
$connection = new AMQPStreamConnection(
'172.17.0.5','5672','admin','admin','my_vhost'
);
$channel = $connection->channel();
//業(yè)務(wù)交換機雾袱,負責處理過期消息
$channel->exchange_declare(
'ex_dl','direct',false,true
);
//死信交換機
$channel->exchange_declare(
'ex_normal','fanout',false,true
);
//因此創(chuàng)建死信隊列的配置參數(shù)要求是AMQPTable類型
$args=new AMQPTable();
//設(shè)置消息過期時間
$args->set('x-message-ttl',120000);
//過期后發(fā)送給哪個交換機
$args->set('x-dead-letter-exchange','ex_dl');
//設(shè)置路由鍵
$args->set('x-dead-letter-routing-key','ex_qu');
//也就是說normal隊列上的消息存活時間都是2分組
//死信隊列
$channel->queue_declare('queue_normal',false,true,
false,false,false,$args
);
//業(yè)務(wù)隊列
$channel->queue_declare('queue_dlx',false,true,
false,false
);
$channel->queue_bind('queue_normal','ex_normal');
$channel->queue_bind('queue_dlx','ex_dl','ex_qu');
$message =new AMQPMessage($msg_str);
//只發(fā)送消息給死信交換機恤筛,因為業(yè)務(wù)交換機的消息是死信隊列給的
$channel->basic_publish($message,'ex_normal','ex_qu');
$channel->close();
$connection->close();
消費者
require_once '../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$connection = new AMQPStreamConnection(
'172.17.0.5','5672','admin','admin','my_vhost'
);
$channel= $connection->channel();
//我們只要保證業(yè)務(wù)交換機和業(yè)務(wù)隊列在就行了
//死信隊列不給消費者消費消息
$channel->exchange_declare(
'ex_dl','direct',false,true
);
$channel->queue_declare('queue_dlx',false,true,
false,false
);
$channel->queue_bind('queue_dlx','ex_dl','ex_qu');
echo '[*]Waiting for message';
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
$msg->ack();
if(trim($msg->body)=='quit'){
echo 2;
$msg->getChannel()->basic_cancel($msg->getConsumerTag());
}
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('queue_dlx','c1',false,false,false,false,$callback);