消息隊(duì)列
此處使用window環(huán)境,Linux環(huán)境下,添加
資源URL
PHP的rabbitMQ擴(kuò)展
rabbitMQ官網(wǎng)
rabbitMQ官網(wǎng)下載地址
erlang官網(wǎng)
erlang下載
概念
幾個(gè)概念說(shuō)明:
Broker:簡(jiǎn)單來(lái)說(shuō)就是消息隊(duì)列服務(wù)器實(shí)體划滋。
- Exchange:消息交換機(jī)荐吵,它指定消息按什么規(guī)則仔蝌,路由到哪個(gè)隊(duì)列。
- Queue:消息隊(duì)列載體土居,每個(gè)消息都會(huì)被投入到一個(gè)或多個(gè)隊(duì)列。
- Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來(lái)为黎。
- Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞行您。
- vhost:虛擬主機(jī)铭乾,一個(gè)broker里可以開(kāi)設(shè)多個(gè)vhost,用作不同用戶的權(quán)限分離娃循。
- producer:消息生產(chǎn)者炕檩,就是投遞消息的程序。
- consumer:消息消費(fèi)者捌斧,就是接受消息的程序笛质。
- channel:消息通道,在客戶端的每個(gè)連接里捞蚂,可建立多個(gè)channel经瓷,每個(gè)channel代表一個(gè)會(huì)話任務(wù)。
消息隊(duì)列的使用過(guò)程大概如下:
(1)客戶端連接到消息隊(duì)列服務(wù)器洞难,打開(kāi)一個(gè)channel舆吮。
(2)客戶端聲明一個(gè)exchange,并設(shè)置相關(guān)屬性队贱。
(3)客戶端聲明一個(gè)queue色冀,并設(shè)置相關(guān)屬性。
(4)客戶端使用routing key柱嫌,在exchange和queue之間建立好綁定關(guān)系锋恬。
(5)客戶端投遞消息到exchange。
exchange接收到消息后编丘,就根據(jù)消息的key和已經(jīng)設(shè)置的binding与学,進(jìn)行消息路由,將消息投遞到一個(gè)或多個(gè)隊(duì)列里嘉抓。
exchange也有幾個(gè)類型索守,完全根據(jù)key進(jìn)行投遞的叫做Direct交換機(jī),例如抑片,綁定時(shí)設(shè)置了routing key為”abc”卵佛,那么客戶端提交的消息,只有設(shè)置了key為”abc”的才會(huì)投遞到隊(duì)列。對(duì)key進(jìn)行模式匹配后進(jìn)行投遞的叫做Topic交換機(jī)截汪,符號(hào)”#”匹配一個(gè)或多個(gè)詞疾牲,符號(hào)””匹配正好一個(gè)詞。例如”abc.#”匹配”abc.def.ghi”衙解,”abc.”只匹配”abc.def”阳柔。還有一種不需要key的,叫做Fanout交換機(jī)蚓峦,它采取廣播模式盔沫,一個(gè)消息進(jìn)來(lái)時(shí),投遞到與該交換機(jī)綁定的所有隊(duì)列枫匾。
RabbitMQ支持消息的持久化架诞,也就是數(shù)據(jù)寫在磁盤上,為了數(shù)據(jù)安全考慮干茉,我想大多數(shù)用戶都會(huì)選擇持久化谴忧。消息隊(duì)列持久化包括3個(gè)部分:
(1)exchange持久化角虫,在聲明時(shí)指定durable => 1
≌次健(2)queue持久化,在聲明時(shí)指定durable => 1
〈炼臁(3)消息持久化均驶,在投遞時(shí)指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的枫虏。如果exchange和queue兩者之間有一個(gè)持久化妇穴,一個(gè)非持久化,就不允許建立綁定隶债。
安裝環(huán)境
- 先安裝erlang包,然后添加環(huán)境變量 PATH erlang安裝路徑\bin
- 然后安裝rabbitMQ,然后添加環(huán)境變量 PATH rabbitMQ安裝路徑\sbin
- 以管理員身份進(jìn)入rabbitMQ安裝目錄/sbin依次執(zhí)行命令
- rabbitmq-plugins.bat enable rabbitmq_management
- rabbitmq-service.bat stop
- rabbitmq-service.bat install
- rabbitmq-service.bat start
執(zhí)行后,訪問(wèn)127.0.0.1:15672 查看rabbitMQ是否安裝成功, 登錄賬號(hào)密碼是 guest
- PHP環(huán)境搭建
- 配置 php.ini extension=php_amqp.dll; 并將rabbitmq.*.dll 放到和php.ini的同級(jí)目錄下
- 重啟php-fpm
- 打印phpinfo() 查看amqp擴(kuò)展
安裝END
customer.php
$conn_args = [
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost' => '/',
];
$exchangeName= 'exchange'; #交換機(jī)名
$queueName = 'queue'; #隊(duì)列名
$keyRoute = 'keyRoute'; #路由key
//創(chuàng)建鏈接
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die('連接失敗');
}
//創(chuàng)建頻道
$channel = new AMQPChannel($conn);
//創(chuàng)建交換機(jī)
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName); #交換機(jī)名
$exchange->setType(AMQP_EX_TYPE_DIRECT); #類型
$exchange->setFlags(AMQP_DURABLE); #持久化
echo "交換機(jī)狀態(tài):".$exchange->declareExchange()."\n";
//創(chuàng)建隊(duì)列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE); #持久化
echo "消息總數(shù):".$queue->declareQueue()."\n";
//綁定交換機(jī)與隊(duì)列腾它,并指定路由鍵
$queue->bind($exchangeName, $keyRoute);
//阻塞模式接收消息
echo "Message:\n";
while(True){
$queue->consume(function($envelope, $queue){
$msg = $envelope->getBody();
echo $msg."\n"; //處理消息
$queue->ack($envelope->getDeliveryTag()); //手動(dòng)發(fā)送ACK應(yīng)答
});
//$queue->consume('processMessage', AMQP_AUTOACK); //自動(dòng)ACK應(yīng)答
}
$conn->disconnect();
publisher.php
$conn_args = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
);
$exchangeName = 'exchange'; //交換機(jī)名
$keyRoute = 'keyRoute'; //路由key
//創(chuàng)建連接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
date_default_timezone_set("Asia/Shanghai");
//創(chuàng)建交換機(jī)對(duì)象
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_DURABLE);
//發(fā)送消息
//$channel->startTransaction(); //開(kāi)始事務(wù)
for ($i = 0; $i < 5; ++$i) {
sleep(1);//休眠1秒
//消息內(nèi)容
$message = "測(cè)試數(shù)據(jù)!" . date("H:i:s");
echo "Send Message:" . $exchange->publish($message, $keyRoute) . "\n";
}
//$channel->commitTransaction(); //提交事務(wù)
$conn->disconnect();
命令行A下先執(zhí)行 php customer.php,然后新cmd下執(zhí)行 php publisher.php
命令行A將有輸出