簡介Rabbitmq的幾種消費模式

前言

在日常開發(fā)中,消息隊列能幫我們解決系統(tǒng)的異步問題涩赢,流量的控制和服務(wù)解耦咙轩,不同的消息隊列有不同的消費模型

思考

redis也可以實現(xiàn)消息隊列(list和stream)苞慢,也稱為輕量級消息隊列蟹演,list實現(xiàn)的缺點在哪里风钻?stream類型怎么用顷蟀?

RabbitMq

具體概念的東西網(wǎng)上很多酒请,文檔也有詳細描述這里不做過多闡述,本文主要以PHP代碼為主進行實驗鸣个,消息隊列之rabbitmq

docker run -d --name mq \
-p 5672:5672 -p 15672:15672 \
-v /home/docker/mq/data:/var/lib/rabbitmq --hostname myRabbit \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin ee045987e252

--hostname myRabbit 是因為rabbitmq是基于Node節(jié)點名的

官方講rabbitmq比喻為郵局羞反,queue比喻為郵局里的郵箱布朦,我們要寄信(producer發(fā)送message),就需要把信賽到(send)郵箱昼窗,或者你交給前臺窗口(exchange)讓他幫你寄是趴,但是不同的前臺窗口提供的服務(wù)不同,因為呀不同的郵箱他發(fā)往的地址不同澄惊,有的需要你指定投到哪些郵箱(exchange類型為direct類型時要求完全匹配routing-key)唆途,有的只需要你告訴他投到郵箱的大致有什么特點就行(exchange類型為topic時routing-key模糊匹配就行),還有的是把信件復(fù)制多個(魔法)往每個郵箱都塞一封(exchange類為fanout)掸驱。最后有的郵箱在派送員派送完信件后要求收信人(consumer)簽個字(ack驗證)才能把信給他

下面例子基于官方提供的包,
composer require php-amqplib/php-amqplib

例子來自小猴子喝牛奶的博客

Direct-Exchange

生產(chǎn)者

<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;


//獲取終端提示用戶輸入的數(shù)據(jù)
fwrite(STDOUT, "Please enter a message:\n");
$msg_str = fgets(STDIN);

//建立生產(chǎn)者與mq之間的連接 -------動身前往郵局   
//參數(shù):地址肛搬,端口,賬號毕贼,密碼温赔,虛擬機名
//注意這個虛擬機名為綁定-e RABBITMQ_DEFAULT_VHOST=my_vhost參數(shù)時指定的
$connection = new AMQPStreamConnection('容器ip', 5672, 'admin', 'my_vhost');

//在已連接基礎(chǔ)上建立生產(chǎn)者與mq之間的通道-----進郵局門
$channel = $connection->channel();

//聲明初始化交換機,交換機不存在則創(chuàng)建----找前臺窗口   
//參數(shù):交換機名鬼癣,路由類型陶贼,是否檢測同名隊列,是否開啟隊列持久化待秃,通道關(guān)閉后是否刪除隊列
$channel->exchange_declare('ex_direct', 'direct', false, true, false); 

//聲明初始化一條隊列拜秧,隊列不存在則創(chuàng)建-----告訴前臺窗口要什么郵箱
//參數(shù):隊列名,是否檢測同名隊列章郁,是否開啟隊列持久化腹纳,是否能被其他隊列訪問,通道關(guān)閉后是否刪除隊列
$channel->queue_declare('ex_direct_queue', false, false, false, false);

//前臺窗口找你要的郵箱
//將隊列與某個交換機進行綁定驱犹,并使用路由關(guān)鍵字
//參數(shù):隊列名嘲恍,交換機名,路由鍵名
$channel->queue_bind('ex_direct_queue', 'ex_direct', 'hello'); 

//把信給封好
//生成消息
$msg = new AMQPMessage($msg_str);

//推送消息到某個交換機------把信給前臺
//參數(shù):消息雄驹,交換機名佃牛,路由鍵名
//就如同前面所講,你只需要把信給前臺医舆,并告訴他投給哪些指定的郵箱即可
$channel->basic_publish($msg, 'ex_direct', 'hello');
echo " [x] Sent: $msg_str \n";

$channel->close();
$connection->close();

消費者

<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//快遞員進門
$connection = new AMQPStreamConnection('容器ip', 5672, 'admin', 'my_vhost');
$channel = $connection->channel();
//快遞員先看看給自己任務(wù)的前臺在不在
$channel->exchange_declare('ex_direct', 'direct', false, true, false); 
//在看看自己負責的郵箱在不在
$channel->queue_declare('ex_direct_queue', false, false, false, false);
$channel->queue_bind('ex_direct_queue', 'ex_direct', 'hello'); 
//執(zhí)行上面的步驟主要是為保證這些目標交換機和隊列已經(jīng)存在
//這里是收信人的動作
$callback = function($msg) {
    //打印消息
    echo " [x] Received ", $msg->body, "\n";
    //消息確認
    $msg->ack();
};
//第三個參數(shù)為true表示了這個郵箱規(guī)定了收信人必須簽名
//參數(shù):隊列名俘侠,消費者標識符,不接收此使用者發(fā)布的消息蔬将,使用者是否使用自動確認模式爷速,請求獨占使用者訪問,不等待霞怀,消息回調(diào)函數(shù)
$channel->basic_consume('ex_direct_queue', 'consumer1', false, true, false, false, $callback);
//快遞員看有沒有信惫东,有就立馬寄
//監(jiān)聽通道消息
while(count($channel->callbacks)) {
    $channel->wait();
}

思考

1.萬一RabbitMQ崩潰了退出了怎么辦?里面的隊列和消息會不會消失,這需要我們在聲明交換機和隊列時候廉沮,讓他保證持久化

//第4個參數(shù)為true
$channel->exchange_declare('ex_direct', 'direct', false, true, false); 
//第三個參數(shù)為true颓遏,這里不能在已存在的隊列上加持久化
$channel->queue_declare('hello', false, true, false, false);
//里面的消息也保證持久化
//第二個參數(shù)為數(shù)組
$msg = new AMQPMessage('你的消息', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] );

2.Rabbitmq默認一旦發(fā)送消息給客戶端后就立即刪除,那萬一消費者收到消息后要執(zhí)行一個耗時任務(wù)滞时,但是中途異常退出了叁幢,那么這個消息不就丟了嗎(比如上面的回調(diào)函數(shù)中sleep(10)但是我們中涂把他kill掉導(dǎo)致沒發(fā)送ack碼)。
們希望消費者完成消息處理后發(fā)送ack確認坪稽,rabbitMQ收到后才能對消息刪除曼玩。

//即在消費者綁定隊列時第4個參數(shù)為false
$channel->basic_consume('ex_direct_queue', 'consumer1', false, false, false, false, $callback);

3.快遞員有多個,那么萬一有的快遞員要寄很多信窒百,有的在偷懶怎么辦演训?
利用函數(shù)進行公平調(diào)度

//消費者代碼添加,表示在等待消費者處理完消息后才能再接受消息贝咙,不堆積消息
$channel->basic_qos(null, 1, null);

Topic-Exchange

和Direct代碼基本相同不同的是綁定交換機是時的'direct'成了'topic'

注意样悟,routing-key是模糊匹配,這里并不是參考正則庭猩,*表示多個字符窟她,#表示一個字符如 .log. 匹配 aaa.log.aaa

Fanout-Exchange

又稱發(fā)布與訂閱,即向與交換機的所有隊列廣播消息蔼水,既然是廣播震糖,那么我們就不需要考慮消息的ack了
生產(chǎn)者

<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;


//獲取終端提示用戶輸入的數(shù)據(jù)
fwrite(STDOUT, "Please enter a message:\n");
$msg_str = fgets(STDIN);

//建立生產(chǎn)者與mq之間的連接    
//參數(shù):地址,端口趴腋,賬號吊说,密碼
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

//在已連接基礎(chǔ)上建立生產(chǎn)者與mq之間的通道
$channel = $connection->channel();

//聲明初始化交換機   
//參數(shù):交換機名,路由類型优炬,是否檢測同名隊列颁井,是否開啟隊列持久化,通道關(guān)閉后是否刪除隊列
$channel->exchange_declare('mq_sms_send_ex3', AMQPExchangeType::FANOUT, false, false, false); 


//生成消息
$msg = new AMQPMessage($msg_str);

//推送消息到某個交換機
//參數(shù):消息蠢护,交換機名雅宾,路由鍵名
$channel->basic_publish($msg, 'mq_sms_send_ex3');
echo " [x] Sent: $msg_str \n";

$channel->close();
$connection->close();

消費者

<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

//聲明初始化交換機   
//參數(shù):交換機名,路由類型葵硕,是否檢測同名隊列眉抬,是否開啟隊列持久化,通道關(guān)閉后是否刪除隊列
$channel->exchange_declare('mq_sms_send_ex3', AMQPExchangeType::FANOUT, false, false, false);

//聲明初始化一條隊列
//參數(shù):隊列名懈凹,是否檢測同名隊列蜀变,是否開啟隊列持久化,是否能被其他隊列訪問介评,通道關(guān)閉后是否刪除隊列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

//將隊列與某個交換機進行綁定库北,并使用路由關(guān)鍵字
//參數(shù):隊列名,交換機名,路由鍵名
$channel->queue_bind($queue_name, 'mq_sms_send_ex3');

echo ' [*] Waiting for messages', "\n";

$callback = function($msg) {
  echo " [x] Received ", $msg->body, "\n";

  //判斷獲取到quit后
  if (trim($msg->body) == 'quit') { 
        $msg->getChannel()->basic_cancel($msg->getConsumerTag());
    }

};

$channel->basic_qos(null, 1, null);

//參數(shù):隊列名贤惯,消費者標識符,不接收此使用者發(fā)布的消息棒掠,使用者是否使用自動確認模式孵构,請求獨占使用者訪問,不等待烟很,消息回調(diào)函數(shù)
$channel->basic_consume($queue_name, 'consumer1', false, true, false, false, $callback);

死信隊列

即延遲隊列颈墅,講消息發(fā)送到指定的隊列,消息要在隊列中待到指定時間(ttl)后才能被發(fā)送給消費者

RabbtMq實現(xiàn)大致示意圖
image.png

如何保證死信隊列在消息過去后才把消息發(fā)給業(yè)務(wù)交換機---不設(shè)置消費者(快遞員)不就行了
生產(chǎn)者

require_once '../vendor/autoload.php';


use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Wire\AMQPWriter;

fwrite(STDOUT, "Please enter a message:\n");
$msg_str = fgets(STDIN);

$connection = new AMQPStreamConnection(
    '172.17.0.5','5672','admin','admin','my_vhost'
);
$channel = $connection->channel();
//業(yè)務(wù)交換機雾袱,負責處理過期消息
$channel->exchange_declare(
    'ex_dl','direct',false,true
);
//死信交換機
$channel->exchange_declare(
    'ex_normal','fanout',false,true
);
//因此創(chuàng)建死信隊列的配置參數(shù)要求是AMQPTable類型
$args=new AMQPTable();
//設(shè)置消息過期時間
$args->set('x-message-ttl',120000);
//過期后發(fā)送給哪個交換機
$args->set('x-dead-letter-exchange','ex_dl');
//設(shè)置路由鍵
$args->set('x-dead-letter-routing-key','ex_qu');
//也就是說normal隊列上的消息存活時間都是2分組
//死信隊列
$channel->queue_declare('queue_normal',false,true,
false,false,false,$args
);
//業(yè)務(wù)隊列
$channel->queue_declare('queue_dlx',false,true,
false,false
);
$channel->queue_bind('queue_normal','ex_normal');
$channel->queue_bind('queue_dlx','ex_dl','ex_qu');
$message =new AMQPMessage($msg_str);
//只發(fā)送消息給死信交換機恤筛,因為業(yè)務(wù)交換機的消息是死信隊列給的
$channel->basic_publish($message,'ex_normal','ex_qu');

$channel->close();
$connection->close();

消費者

require_once '../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$connection = new AMQPStreamConnection(
    '172.17.0.5','5672','admin','admin','my_vhost'
);
$channel= $connection->channel();
//我們只要保證業(yè)務(wù)交換機和業(yè)務(wù)隊列在就行了
//死信隊列不給消費者消費消息
$channel->exchange_declare(
    'ex_dl','direct',false,true
);
$channel->queue_declare('queue_dlx',false,true,
false,false
);

$channel->queue_bind('queue_dlx','ex_dl','ex_qu');
echo '[*]Waiting for message';

$callback = function($msg){
    echo " [x] Received ", $msg->body, "\n";
    $msg->ack();
    if(trim($msg->body)=='quit'){
        echo 2;
        $msg->getChannel()->basic_cancel($msg->getConsumerTag());
    }
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('queue_dlx','c1',false,false,false,false,$callback);
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市芹橡,隨后出現(xiàn)的幾起案子毒坛,更是在濱河造成了極大的恐慌,老刑警劉巖林说,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件煎殷,死亡現(xiàn)場離奇詭異,居然都是意外死亡腿箩,警方通過查閱死者的電腦和手機豪直,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來珠移,“玉大人弓乙,你說我怎么就攤上這事【澹” “怎么了暇韧?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵,是天一觀的道長浓瞪。 經(jīng)常有香客問我锨咙,道長,這世上最難降的妖魔是什么追逮? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任酪刀,我火速辦了婚禮,結(jié)果婚禮上钮孵,老公的妹妹穿的比我還像新娘骂倘。我一直安慰自己,他們只是感情好巴席,可當我...
    茶點故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布历涝。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪荧库。 梳的紋絲不亂的頭發(fā)上堰塌,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天,我揣著相機與錄音分衫,去河邊找鬼场刑。 笑死,一個胖子當著我的面吹牛牵现,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播邀桑,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼瞎疼,長吁一口氣:“原來是場噩夢啊……” “哼捏萍!你這毒婦竟也來了照弥?” 一聲冷哼從身側(cè)響起给赞,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤柑蛇,失蹤者是張志新(化名)和其女友劉穎空免,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體墨坚,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年磷蛹,在試婚紗的時候發(fā)現(xiàn)自己被綠了溪烤。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡碗旅,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出镜悉,到底是詐尸還是另有隱情祟辟,我是刑警寧澤,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布侣肄,位于F島的核電站旧困,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏稼锅。R本人自食惡果不足惜吼具,卻給世界環(huán)境...
    茶點故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望矩距。 院中可真熱鬧拗盒,春花似錦、人聲如沸锥债。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽哮肚。三九已至登夫,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間允趟,已是汗流浹背恼策。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留潮剪,地道東北人涣楷。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像鲁纠,于是被迫代替她去往敵國和親总棵。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內(nèi)容