消息隊(duì)列(MQ)籍滴,很多場(chǎng)景都有它的身影酪夷,MQ的主要功能包括應(yīng)用解耦、流量削峰孽惰、異步處理捶索。本文主要講解RabbitMq的原理及應(yīng)用實(shí)例,將參考官網(wǎng)文檔重點(diǎn)介紹RabbitMq基本概念灰瞻、work queue模式、fanout模式辅甥、direct模式酝润、topic模式、RPC實(shí)現(xiàn)璃弄、publisher confirms機(jī)制要销,從而達(dá)到快速入門的目的。
0.RabbitMq基本概念
- vhost夏块,虛擬主機(jī)疏咐,提供了完全隔離獨(dú)立的環(huán)境,包括exchange脐供、queue等浑塞,可通過(guò)插件web管理后臺(tái)或者rabbitmqctl命令設(shè)置user的vhost權(quán)限。
- connection政己,要使用rabbitmq必然要與服務(wù)器建立連接了酌壕,AMQP協(xié)議是基于TCP連接的應(yīng)用層協(xié)議。
- channel歇由,信道用于復(fù)用connection卵牍,減少TCP連接帶來(lái)的資源開銷,當(dāng)訪問(wèn)量大的時(shí)候則需要開辟多個(gè)connection沦泌,并分?jǐn)偟絚hennel糊昙。
- routing_key,路由鍵在pub/sub模式下作為exchange匹配binding到queue的條件谢谦;在work queue模式下释牺,可視為隊(duì)列名稱發(fā)送消息萝衩。
- exchange,交換機(jī)在信道內(nèi)船侧,負(fù)責(zé)接受并轉(zhuǎn)發(fā)消息欠气。根據(jù)交換機(jī)的類型,有不同的匹配方式镜撩。
- binding_key预柒,綁定值可視為exchange與queue之間的映射關(guān)系值,綁定值與queue之間的關(guān)系是n:n袁梗,當(dāng)一個(gè)queue對(duì)應(yīng)exchange的多個(gè)binding_key時(shí)宜鸯,exchange只會(huì)發(fā)送一次到該queue。
- queue遮怜,消息隊(duì)列淋袖。
- message,消息是要傳遞及處理的數(shù)據(jù)锯梁,通過(guò)RabbitMq指定的類來(lái)構(gòu)造即碗,可配置消息的參數(shù)屬性,如correlation_id(請(qǐng)求標(biāo)識(shí))陌凳,delivery_mode(投遞模式)等剥懒。
- producer/publisher,消息的生產(chǎn)者/發(fā)布者合敦,攜帶routing_key和msg初橘。
- consumer/subscriber,消息的消費(fèi)者/訂閱者充岛,按照不同的模式處理隊(duì)列中的消息保檐。
1.work queues模式
常規(guī)的消息隊(duì)列模式,不涉及交換機(jī)exchange和隊(duì)列綁定queue_binding崔梗,執(zhí)行過(guò)程:生產(chǎn)者發(fā)送消息至隊(duì)列夜只,消費(fèi)者從隊(duì)列中取數(shù)據(jù)消費(fèi)。
producer代碼示例(PHP)
//1.建立連接
$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
//2.信道
$channel = $connection->channel();
//3.信道中聲明隊(duì)列
$queue_name='task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
$message = "Hello Task";
//4.生成amqp消息
$msg = new AMQPMessage($message, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);//投遞模式設(shè)置為消息持久化
//5.發(fā)布消息
$channel->basic_publish($msg, '', $queue_name);
echo "publisher Sent '{$message}!'\n";
$channel->close();
$connection->close();
consumer代碼示例
$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
$queue_name='task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo "consumer received : " . $msg->body . PHP_EOL;
sleep(1);
echo "Done" . PHP_EOL;
//確認(rèn)消息
$msg->ack();
};
//公平調(diào)度, 設(shè)置預(yù)加載個(gè)數(shù)
$channel->basic_qos(null, 1, null);
//持續(xù)監(jiān)聽蒜魄,回調(diào)處理消息
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
下面介紹publish/subscribe模式盐肃,并引入exchange和queue_binding。該模式根據(jù)exchange的不同類型有不同的轉(zhuǎn)發(fā)規(guī)則权悟,exchange的類型主要有fanout砸王、direct、topic峦阁。
2.fanout模式
該模式引入exchange谦铃、queue_binding,但不涉及routing_key和binding_key榔昔,因?yàn)閜ublisher把消息投遞給exchange后驹闰,所有綁定在該交換機(jī)上的隊(duì)列都能接收到消息瘪菌。
publisher代碼
...
//通用連接部分參考上面,后面代碼同理嘹朗,只展示核心變更部分;完整代碼可看官網(wǎng)
//該模式不用聲明隊(duì)列师妙,只需聲明exchange
$channel->exchange_declare('fanout_logs', 'fanout', false, true, false);//1.fanout交換機(jī)
..
//消息投遞到交換機(jī)
$channel->basic_publish($msg, 'fanout_logs');//2.fanout模式
subscriber代碼
...
$channel->exchange_declare('fanout_logs', 'fanout', false, true, false);//1.聲明交換機(jī)
...
$channel->queue_bind($queue_name, 'fanout_logs');//2.隊(duì)列綁定交換機(jī)
...
比起work queue,該模式更靈活屹培,利用exchange可將消息轉(zhuǎn)發(fā)到多個(gè)queue中默穴。
3.direct模式
如果在pub/sub模式下,只想將交換機(jī)的消息轉(zhuǎn)發(fā)給指定的隊(duì)列褪秀,fanout模式顯然無(wú)法滿足蓄诽。此時(shí)可以利用direct模式,該模式將exchange和queue通過(guò)binding_key綁定在一起媒吗;exchange在接收publisher消息時(shí)依據(jù)routing_key和binding_key是否完全匹配仑氛,決定是否轉(zhuǎn)發(fā)到對(duì)應(yīng)queue。
publisher代碼
$channel->exchange_declare('direct_logs', 'direct', false, true, false);//1.direct交換機(jī)
$routing_key = 'black';
$channel->basic_publish($msg, 'direct_logs', $routing_key);//2.發(fā)布消息至交換機(jī)闸英,攜帶routing_key
subscriber代碼
...
$channel->exchange_declare('direct_logs', 'direct', false, true, false);
$bindingKey = 'black';
$channel->queue_bind($queue_name, 'direct_logs', $bindingKey);//隊(duì)列綁定交換機(jī),聲明binding_key
...
4.topic模式
topic模式在direct模式基礎(chǔ)上升級(jí)锯岖,routing_key和binding_key非完全匹配,支持更靈活的匹配規(guī)則甫何;routing_key/binding_key可以通過(guò)word1.word2.wordn方式進(jìn)行靈活擴(kuò)展出吹。【符號(hào)*代表1個(gè)word沛豌,符號(hào)#可代表0或n個(gè)words】
publisher代碼
$channel->exchange_declare('topic_logs', 'topic', false, false, false); //3.1.topics路由
$routing_key = 'black.tall.big';
$channel->basic_publish($msg, 'topic_logs', $routing_key);//2.發(fā)布消息至交換機(jī),攜帶routing_key
subscriber代碼
$channel->exchange_declare('topic_logs', 'topic', false, true, false);//topic模式
$bindingKey = '#';//相當(dāng)于全部消息都能接收
$channel->queue_bind($queue_name, 'topic_logs', $bindingKey);//隊(duì)列綁定交換機(jī),聲明binding_key
bindingKey的舉??
成功:black.#
赃额,自動(dòng)匹配2個(gè)words加派、'black.tall.*'匹配1個(gè)word,占位匹配時(shí)必須要有點(diǎn)號(hào).
失敗:black.short.*
失敗-錯(cuò)誤使用符號(hào):black#
5.RPC模式
RPC, 全稱remote procedure call即遠(yuǎn)程程序調(diào)用跳芳,比起常規(guī)的遠(yuǎn)程調(diào)用芍锦,基于RabbitMq的RPC優(yōu)點(diǎn)有:1.異步調(diào)用;2.方便擴(kuò)展提升服務(wù)端性能(開啟多個(gè)server)
5.1.實(shí)現(xiàn)原理?
- 服務(wù)端和客戶端飞盆,通過(guò)兩個(gè)隊(duì)列進(jìn)行通信娄琉,RPC隊(duì)列rpc_queue和回調(diào)隊(duì)列reply_to_queue。
- 客戶端攜帶請(qǐng)求標(biāo)識(shí)correlation_id和reply_to_queue回調(diào)隊(duì)列信息吓歇,發(fā)送請(qǐng)求至rpc_queue孽水,服務(wù)端監(jiān)聽rpc_queue,消費(fèi)消息并發(fā)送消息至指定回調(diào)隊(duì)列reply_to_queue城看。
- 客戶端監(jiān)聽回調(diào)隊(duì)列reply_to_queue并通過(guò)correlation_id獲取請(qǐng)求處理結(jié)果女气。
下面以計(jì)算斐波那契數(shù)為作為RPC示例。
client端代碼
class FibonacciRpcClient
{
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;
//構(gòu)造函數(shù)测柠,監(jiān)聽回調(diào)隊(duì)列炼鞠,處理
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'root',
'root'
);
$this->channel = $this->connection->channel();
//1.生成回調(diào)隊(duì)列
$this->callback_queue = 'reply_to';
$this->channel->queue_declare($this->callback_queue, false, true, false, false);
//2.1.輪訓(xùn)消費(fèi)
$this->channel->basic_consume(
$this->callback_queue,
'',
false,
true,
false,
false,
array(
$this,
'onResponse'
)
);
}
//2.1.2監(jiān)聽隊(duì)列的回調(diào)函數(shù)
public function onResponse($rep)
{
if ($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}
//遠(yuǎn)程調(diào)用,發(fā)送消息至rpc隊(duì)列
public function call($n)
{
$this->response = null;
$this->corr_id = uniqid();//3.生成請(qǐng)求的唯一標(biāo)識(shí)
//4.1.創(chuàng)建消息缘滥,攜帶請(qǐng)求標(biāo)識(shí)、回調(diào)隊(duì)列名稱
$msg = new AMQPMessage(
(string)$n,
array(
'correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue
)
);
//4.2.發(fā)送消息至rpc隊(duì)列谒主,等待服務(wù)端消費(fèi)
$this->channel->basic_publish($msg, '', 'rpc_queue');
//5.循環(huán)判斷結(jié)果
while (!$this->response) {
$this->channel->wait();
}
return intval($this->response);
}
}
$fibonacci_rpc = new FibonacciRpcClient();//構(gòu)造函數(shù)朝扼,監(jiān)聽回調(diào)隊(duì)列reply_to
$response = $fibonacci_rpc->call(35);//發(fā)送消息至prc隊(duì)列,并循環(huán)判斷回調(diào)隊(duì)列的處理結(jié)果霎肯。
echo ' [.] Got ', $response, "\n";//回調(diào)隊(duì)列的處理結(jié)果
server端代碼
$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
//聲明隊(duì)列
$channel->queue_declare('rpc_queue', false, false, false, false);
function fib($n)
{
if ($n == 0) {
return 0;
}
if ($n == 1) {
return 1;
}
return fib($n-1) + fib($n-2);
}
echo " [x] Awaiting RPC requests\n";
$callback = function ($req) {
//1.1監(jiān)聽rpc隊(duì)列擎颖,處理client發(fā)送的消息
$n = intval($req->body);
echo ' [.] fib(', $n, ")\n";
//1.2.返回處理結(jié)果,并攜帶請(qǐng)求標(biāo)識(shí)
$msg = new AMQPMessage(
(string) fib($n),
array('correlation_id' => $req->get('correlation_id'))
);
//2.發(fā)送消息至同一信道的 回調(diào)隊(duì)列姿现, 由client監(jiān)聽消費(fèi)肠仪。
$req->delivery_info['channel']->basic_publish(
$msg,
'',
$req->get('reply_to')
);
//3.消息接受確認(rèn)
$req->ack();
};
//設(shè)置預(yù)加載數(shù)量,服務(wù)端worker公平調(diào)度
$channel->basic_qos(null, 1, null);
//輪訓(xùn)消費(fèi)备典,監(jiān)聽rpc隊(duì)列
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
調(diào)用結(jié)果
client:
[.] Got 9227465
server:
[x] Awaiting RPC requests
[.] fib(35)
6.publisher confirms模式
publisher confirms是RabbitMq實(shí)現(xiàn)可靠傳輸的擴(kuò)展异旧,用來(lái)判斷publisher是否成功把消息發(fā)送到RabbitMq的broker。RabbitMq實(shí)現(xiàn)可靠傳輸?shù)姆绞接袃煞N:事務(wù)(不推薦)提佣、publisher confirms吮蛹,這兩種方式互斥。publisher confirms的實(shí)現(xiàn)方式又可分為:同步拌屏、異步潮针。
- 6.1. 同步實(shí)現(xiàn)
該模式是基于信道的,所以只要增加兩個(gè)步驟即可:
6.1.1. 信道聲明為confirm模式
6.1.2. 聲明同步等待的超時(shí)時(shí)間
代碼如下:
...
$channel->confirm_select();//1.聲明信道為confirm模式
...
try {
$channel->wait_for_pending_acks($timeOut);//2.同步等待timeOut時(shí)間
}catch (Exception $exception){
echo "exception:" . $exception->getMessage() . PHP_EOL;
}
..
- 6.2. 異步實(shí)現(xiàn)
異步實(shí)現(xiàn)通過(guò)注冊(cè)回調(diào)的兩個(gè)方法set_ack_handler和set_nack_handler倚喂。
代碼如下
$channel->confirm_select();//1.聲明信道為confirm模式
//2.消息被ack后的回調(diào)
$channel->set_ack_handler(function (AMQPMessage $msg) {
echo "ack msg" . PHP_EOL;
file_put_contents('./ackfile.txt',json_encode($msg),FILE_APPEND);
});
//3.消息被nack'ed后的回調(diào)
$channel->set_nack_handler(function (AMQPMessage $msg) {
echo "nack msg" . PHP_EOL;
file_put_contents('./nackfile.txt',json_encode($msg),FILE_APPEND);
});
$channel->wait_for_pending_acks();
以上只是RabbitMq各種模式的基本使用每篷,其他很多特性(持久化、網(wǎng)絡(luò)分區(qū)端圈、集群等)并未涉及焦读,若要使用更多的特性請(qǐng)查閱官網(wǎng)文檔,然后手動(dòng)跑一下代碼才能理解得更好舱权。希望本文能幫助大家對(duì)RabbitMq的使用有個(gè)大致了解矗晃。