直連型交換機(jī)(direct exchange)是根據(jù)消息攜帶的路由鍵(routing key)將消息投遞給對(duì)應(yīng)隊(duì)列的吵瞻。直連交換機(jī)用來(lái)處理消息的單播路由(unicast routing)(盡管它也可以處理多播路由)。下邊介紹它是如何工作的:
- 直連交換機(jī)經(jīng)常用來(lái)循環(huán)分發(fā)任務(wù)給多個(gè)工作者(workers)。當(dāng)這樣做的時(shí)候橱夭,我們需要明白一點(diǎn),在AMQP 0-9-1中,消息的負(fù)載均衡是發(fā)生在消費(fèi)者(consumer)之間的门粪,而不是隊(duì)列(queue)之間。
- 將一個(gè)隊(duì)列綁定到某個(gè)交換機(jī)上烹困,同時(shí)賦予該綁定一個(gè)路由鍵(routing key)
當(dāng)一個(gè)攜帶著路由鍵為R的消息被發(fā)送給直連交換機(jī)時(shí)玄妈,交換機(jī)會(huì)把它路由給綁定值同樣為R的隊(duì)列。
- 假設(shè)一個(gè)商城的場(chǎng)景,每支付一個(gè)訂單拟蜻,我們要通知業(yè)務(wù)A系統(tǒng)
A.send.php 消費(fèi)者
<?php
//生產(chǎn)者-業(yè)務(wù)A
require_once __DIR__ . '/../../../../vendor/autoload.php';
$queue = 'biz_A_queue';
$exchange = 'pay_order_exchange';
//連接
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
//建立通道
$channel = $conn->channel();
//試探性的聲明一個(gè)隊(duì)列
$channel->queue_declare($queue, false, false, false, false);
//試探性的聲明一個(gè)交換機(jī)
$channel->exchange_declare($exchange, 'direct', false, false, false);
//將隊(duì)列與交換機(jī)綁定
$channel->queue_bind($queue, $exchange);
$msg = new \PhpAmqpLib\Message\AMQPMessage(json_encode([
'id' => 1,
'msg' => '賣(mài)出A產(chǎn)品',
'time' => date('Ymd H:i:s')
],JSON_UNESCAPED_UNICODE));
$channel->basic_publish($msg, $exchange);
$channel->close();
$conn->close();
A.receive.1.php 消費(fèi)者demo
//消費(fèi)者-業(yè)務(wù)A
require_once __DIR__ . '/../../../../vendor/autoload.php';
$queue = 'biz_A_queue';
$exchange = 'pay_order_exchange';
//連接
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
//建立通道
$channel = $conn->channel();
//試探性的聲明一個(gè)隊(duì)列
$channel->queue_declare($queue, false, false, false, false);
$channel->basic_consume($queue,'',false,false,false,false,function ($msg){
echo $msg->body.PHP_EOL;
});
while ($channel->is_consuming()){
$channel->wait();
}
$channel->close();
$conn->close();
運(yùn)行消費(fèi)者文件
$ php ./A.receive.1.php
{"id":1,"msg":"賣(mài)出A產(chǎn)品","time":"20210520 14:52:00"}
再來(lái)看一下交換機(jī)循環(huán)分發(fā)任務(wù)給多個(gè)消費(fèi)者的demo,也就是負(fù)載均衡
把 A.send.php的代碼改成如下绎签,讓他發(fā)10條消息
<?php
//生產(chǎn)者-業(yè)務(wù)A
require_once __DIR__ . '/../../../../vendor/autoload.php';
$queue = 'biz_A_queue';
$exchange = 'pay_order_exchange';
//連接
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
//建立通道
$channel = $conn->channel();
//試探性的聲明一個(gè)隊(duì)列
$channel->queue_declare($queue, false, false, false, false);
//試探性的聲明一個(gè)交換機(jī)
$channel->exchange_declare($exchange, 'direct', false, false, false);
//將隊(duì)列與交換機(jī)綁定
$channel->queue_bind($queue, $exchange);
for ($i=1;$i<11;$i++){
$msg=json_encode([
'id' => $i,
'msg' => '賣(mài)出A產(chǎn)品',
'time' => date('Ymd H:i:s')
],JSON_UNESCAPED_UNICODE);
echo 'send message:'.$msg.PHP_EOL;
$msg = new \PhpAmqpLib\Message\AMQPMessage($msg);
$channel->basic_publish($msg, $exchange);
}
$channel->close();
$conn->close();
再將消費(fèi)者文件復(fù)制兩份,現(xiàn)在有了A.receive.2.php酝锅、A.receive.3.php加上之前的A.receive.1.php 一共有3個(gè)消費(fèi)者
這就是多個(gè)消費(fèi)者的工作模式诡必,接下來(lái)再看下用routing key路由的例子
如果A業(yè)務(wù)和B業(yè)務(wù)都有隊(duì)列綁定了賣(mài)出C產(chǎn)品的路由鍵(sell_product_c
),那么除了A隊(duì)列本就可以收到這個(gè)消息之外,B隊(duì)列也可以通過(guò)這個(gè)路由鍵收到消息搔扁。
A.rk.send.php
<?php
//生產(chǎn)者-業(yè)務(wù)A
require_once __DIR__ . '/../../../../vendor/autoload.php';
$queue = 'biz_A_queue';
$exchange = 'pay_order_exchange';
$routingKey = 'sell_product_c';
//連接
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
//建立通道
$channel = $conn->channel();
//試探性的聲明一個(gè)隊(duì)列
$channel->queue_declare($queue, false, false, false, false);
//試探性的聲明一個(gè)交換機(jī)
$channel->exchange_declare($exchange, 'direct', false, false, false);
//將隊(duì)列與交換機(jī)綁定
$channel->queue_bind($queue, $exchange, $routingKey);//交換機(jī)與routing key 的隊(duì)列綁定
for ($i = 1; $i < 11; $i++) {
$msg = json_encode([
'id' => $i,
'msg' => '賣(mài)出C產(chǎn)品',
'time' => date('Ymd H:i:s')
], JSON_UNESCAPED_UNICODE);
echo 'send message:' . $msg . PHP_EOL;
$msg = new \PhpAmqpLib\Message\AMQPMessage($msg);
$channel->basic_publish($msg, $exchange,$routingKey);//將消息發(fā)送到綁定routing key 的隊(duì)列
}
$channel->close();
$conn->close();
B.rk.receive.1.php
<?php
//消費(fèi)者-業(yè)務(wù)B
require_once __DIR__ . '/../../../../vendor/autoload.php';
$queue = 'biz_B_queue';//業(yè)務(wù)B的隊(duì)列名稱
$exchange = 'pay_order_exchange';
$routingKey = 'sell_product_c';
$conn = new \PhpAmqpLib\Connection\AMQPStreamConnection('localhost', '5672', 'guest', 'guest');
$channel = $conn->channel();
$channel->queue_declare($queue, false, false, false, false);
$channel->queue_bind($queue, $exchange, $routingKey);//通過(guò)routing key綁定交換器和隊(duì)列
$channel->basic_consume($queue, false, false, true, false, false, function ($msg) {
echo '【隊(duì)列 biz_B_queue】 ' . $msg->body . PHP_EOL;
});
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$conn->close();
運(yùn)行B.rk.receive.1.php
爸舒、 A.receive.1.php
、 A.receive.2.php
三個(gè)消費(fèi)者進(jìn)程 和 A.rk.send.php
可以看出 B.rk.receive.1完整收到了10個(gè)消息稿蹲, A.receive.1 和A.receive.2 各自收到5個(gè)消息扭勉。也是上面說(shuō)的將消息發(fā)送到綁定了routing key的隊(duì)列上,除了A隊(duì)列本就可以收到這個(gè)消息之外苛聘,B隊(duì)列也可以通過(guò)這個(gè)路由鍵收到消息涂炎。
適用場(chǎng)景:有優(yōu)先級(jí)的任務(wù),根據(jù)任務(wù)的優(yōu)先級(jí)把消息發(fā)送到對(duì)應(yīng)的隊(duì)列焰盗,這樣可以指派更多的資源去處理高優(yōu)先級(jí)的隊(duì)列璧尸。