【RabbitMq】快速入門之work queue模式沈自、fanout模式、direct模式辜妓、topic模式枯途、RPC實(shí)現(xiàn)、publisher confirm機(jī)制

消息隊(duì)列(MQ)籍滴,很多場(chǎng)景都有它的身影酪夷,MQ的主要功能包括應(yīng)用解耦流量削峰孽惰、異步處理捶索。本文主要講解RabbitMq的原理及應(yīng)用實(shí)例,將參考官網(wǎng)文檔重點(diǎn)介紹RabbitMq基本概念灰瞻、work queue模式fanout模式辅甥、direct模式酝润、topic模式RPC實(shí)現(xiàn)璃弄、publisher confirms機(jī)制要销,從而達(dá)到快速入門的目的。

0.RabbitMq基本概念

  • vhost夏块,虛擬主機(jī)疏咐,提供了完全隔離獨(dú)立的環(huán)境,包括exchange脐供、queue等浑塞,可通過(guò)插件web管理后臺(tái)或者rabbitmqctl命令設(shè)置user的vhost權(quán)限。
  • connection政己,要使用rabbitmq必然要與服務(wù)器建立連接了酌壕,AMQP協(xié)議是基于TCP連接的應(yīng)用層協(xié)議
  • channel歇由,信道用于復(fù)用connection卵牍,減少TCP連接帶來(lái)的資源開銷,當(dāng)訪問(wèn)量大的時(shí)候則需要開辟多個(gè)connection沦泌,并分?jǐn)偟絚hennel糊昙。
  • routing_key,路由鍵在pub/sub模式下作為exchange匹配binding到queue的條件谢谦;在work queue模式下释牺,可視為隊(duì)列名稱發(fā)送消息萝衩。
  • exchange,交換機(jī)在信道內(nèi)船侧,負(fù)責(zé)接受并轉(zhuǎn)發(fā)消息欠气。根據(jù)交換機(jī)的類型,有不同的匹配方式镜撩。
  • binding_key预柒,綁定值可視為exchange與queue之間的映射關(guān)系值,綁定值與queue之間的關(guān)系是n:n袁梗,當(dāng)一個(gè)queue對(duì)應(yīng)exchange的多個(gè)binding_key時(shí)宜鸯,exchange只會(huì)發(fā)送一次到該queue。
  • queue遮怜,消息隊(duì)列淋袖。
  • message,消息是要傳遞及處理的數(shù)據(jù)锯梁,通過(guò)RabbitMq指定的類來(lái)構(gòu)造即碗,可配置消息的參數(shù)屬性,如correlation_id(請(qǐng)求標(biāo)識(shí))陌凳,delivery_mode(投遞模式)等剥懒。
  • producer/publisher,消息的生產(chǎn)者/發(fā)布者合敦,攜帶routing_key和msg初橘。
  • consumer/subscriber,消息的消費(fèi)者/訂閱者充岛,按照不同的模式處理隊(duì)列中的消息保檐。

1.work queues模式

1.work queues

常規(guī)的消息隊(duì)列模式,不涉及交換機(jī)exchange和隊(duì)列綁定queue_binding崔梗,執(zhí)行過(guò)程:生產(chǎn)者發(fā)送消息至隊(duì)列夜只,消費(fèi)者從隊(duì)列中取數(shù)據(jù)消費(fèi)。

producer代碼示例(PHP)

//1.建立連接
$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
//2.信道
$channel = $connection->channel();
//3.信道中聲明隊(duì)列
$queue_name='task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
$message = "Hello Task";
//4.生成amqp消息
$msg = new AMQPMessage($message, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);//投遞模式設(shè)置為消息持久化
//5.發(fā)布消息
$channel->basic_publish($msg, '', $queue_name);
echo "publisher  Sent '{$message}!'\n";
$channel->close();
$connection->close();

consumer代碼示例

$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
$queue_name='task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
    echo "consumer received : " . $msg->body . PHP_EOL;
    sleep(1);
    echo "Done" . PHP_EOL;
    //確認(rèn)消息
    $msg->ack();
};
//公平調(diào)度, 設(shè)置預(yù)加載個(gè)數(shù)
$channel->basic_qos(null, 1, null);
//持續(xù)監(jiān)聽蒜魄,回調(diào)處理消息
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
while ($channel->is_open()) {
    $channel->wait();
}

下面介紹publish/subscribe模式盐肃,并引入exchangequeue_binding。該模式根據(jù)exchange的不同類型有不同的轉(zhuǎn)發(fā)規(guī)則权悟,exchange的類型主要有fanout砸王、direct、topic峦阁。

2.fanout模式

2.fanout

該模式引入exchange谦铃、queue_binding,但不涉及routing_key和binding_key榔昔,因?yàn)閜ublisher把消息投遞給exchange后驹闰,所有綁定在該交換機(jī)上的隊(duì)列都能接收到消息瘪菌。

publisher代碼

...
//通用連接部分參考上面,后面代碼同理嘹朗,只展示核心變更部分;完整代碼可看官網(wǎng)
//該模式不用聲明隊(duì)列师妙,只需聲明exchange
$channel->exchange_declare('fanout_logs', 'fanout', false, true, false);//1.fanout交換機(jī)
..
//消息投遞到交換機(jī)
$channel->basic_publish($msg, 'fanout_logs');//2.fanout模式

subscriber代碼

...
$channel->exchange_declare('fanout_logs', 'fanout', false, true, false);//1.聲明交換機(jī)
...
$channel->queue_bind($queue_name, 'fanout_logs');//2.隊(duì)列綁定交換機(jī)
...

比起work queue,該模式更靈活屹培,利用exchange可將消息轉(zhuǎn)發(fā)到多個(gè)queue中默穴。

3.direct模式

3.direct

如果在pub/sub模式下,只想將交換機(jī)的消息轉(zhuǎn)發(fā)給指定的隊(duì)列褪秀,fanout模式顯然無(wú)法滿足蓄诽。此時(shí)可以利用direct模式,該模式將exchange和queue通過(guò)binding_key綁定在一起媒吗;exchange在接收publisher消息時(shí)依據(jù)routing_key和binding_key是否完全匹配仑氛,決定是否轉(zhuǎn)發(fā)到對(duì)應(yīng)queue。

publisher代碼

$channel->exchange_declare('direct_logs', 'direct', false, true, false);//1.direct交換機(jī)
$routing_key = 'black';
$channel->basic_publish($msg, 'direct_logs', $routing_key);//2.發(fā)布消息至交換機(jī)闸英,攜帶routing_key

subscriber代碼

...
$channel->exchange_declare('direct_logs', 'direct', false, true, false);
$bindingKey = 'black';
$channel->queue_bind($queue_name, 'direct_logs', $bindingKey);//隊(duì)列綁定交換機(jī),聲明binding_key
...

4.topic模式

4.topic

topic模式在direct模式基礎(chǔ)上升級(jí)锯岖,routing_key和binding_key非完全匹配,支持更靈活的匹配規(guī)則甫何;routing_key/binding_key可以通過(guò)word1.word2.wordn方式進(jìn)行靈活擴(kuò)展出吹。【符號(hào)*代表1個(gè)word沛豌,符號(hào)#可代表0或n個(gè)words】

publisher代碼

$channel->exchange_declare('topic_logs', 'topic', false, false, false); //3.1.topics路由
$routing_key = 'black.tall.big';
$channel->basic_publish($msg, 'topic_logs', $routing_key);//2.發(fā)布消息至交換機(jī),攜帶routing_key

subscriber代碼

$channel->exchange_declare('topic_logs', 'topic', false, true, false);//topic模式
$bindingKey = '#';//相當(dāng)于全部消息都能接收
$channel->queue_bind($queue_name, 'topic_logs', $bindingKey);//隊(duì)列綁定交換機(jī),聲明binding_key

bindingKey的舉??
成功:black.#赃额,自動(dòng)匹配2個(gè)words加派、'black.tall.*'匹配1個(gè)word,占位匹配時(shí)必須要有點(diǎn)號(hào).
失敗:black.short.*
失敗-錯(cuò)誤使用符號(hào):black#

5.RPC模式

5.RPC

RPC, 全稱remote procedure call即遠(yuǎn)程程序調(diào)用跳芳,比起常規(guī)的遠(yuǎn)程調(diào)用芍锦,基于RabbitMq的RPC優(yōu)點(diǎn)有:1.異步調(diào)用2.方便擴(kuò)展提升服務(wù)端性能(開啟多個(gè)server)

5.1.實(shí)現(xiàn)原理?

  • 服務(wù)端和客戶端飞盆,通過(guò)兩個(gè)隊(duì)列進(jìn)行通信娄琉,RPC隊(duì)列rpc_queue和回調(diào)隊(duì)列reply_to_queue
  • 客戶端攜帶請(qǐng)求標(biāo)識(shí)correlation_idreply_to_queue回調(diào)隊(duì)列信息吓歇,發(fā)送請(qǐng)求至rpc_queue孽水,服務(wù)端監(jiān)聽rpc_queue,消費(fèi)消息并發(fā)送消息至指定回調(diào)隊(duì)列reply_to_queue城看。
  • 客戶端監(jiān)聽回調(diào)隊(duì)列reply_to_queue并通過(guò)correlation_id獲取請(qǐng)求處理結(jié)果女气。

下面以計(jì)算斐波那契數(shù)為作為RPC示例。

client端代碼

class FibonacciRpcClient
{
    private $connection;
    private $channel;
    private $callback_queue;
    private $response;
    private $corr_id;

    //構(gòu)造函數(shù)测柠,監(jiān)聽回調(diào)隊(duì)列炼鞠,處理
    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'root',
            'root'
        );
        $this->channel = $this->connection->channel();
        //1.生成回調(diào)隊(duì)列
        $this->callback_queue = 'reply_to';
        $this->channel->queue_declare($this->callback_queue, false, true, false, false);

        //2.1.輪訓(xùn)消費(fèi)
        $this->channel->basic_consume(
            $this->callback_queue,
            '',
            false,
            true,
            false,
            false,
            array(
                $this,
                'onResponse'
            )
        );


    }

    //2.1.2監(jiān)聽隊(duì)列的回調(diào)函數(shù)
    public function onResponse($rep)
    {
        if ($rep->get('correlation_id') == $this->corr_id) {
            $this->response = $rep->body;
        }
    }

    //遠(yuǎn)程調(diào)用,發(fā)送消息至rpc隊(duì)列
    public function call($n)
    {
        $this->response = null;
        $this->corr_id = uniqid();//3.生成請(qǐng)求的唯一標(biāo)識(shí)

        //4.1.創(chuàng)建消息缘滥,攜帶請(qǐng)求標(biāo)識(shí)、回調(diào)隊(duì)列名稱
        $msg = new AMQPMessage(
            (string)$n,
            array(
                'correlation_id' => $this->corr_id,
                'reply_to'       => $this->callback_queue
            )
        );
        //4.2.發(fā)送消息至rpc隊(duì)列谒主,等待服務(wù)端消費(fèi)
        $this->channel->basic_publish($msg, '', 'rpc_queue');
        //5.循環(huán)判斷結(jié)果
        while (!$this->response) {
            $this->channel->wait();
        }
        return intval($this->response);
    }
}

$fibonacci_rpc = new FibonacciRpcClient();//構(gòu)造函數(shù)朝扼,監(jiān)聽回調(diào)隊(duì)列reply_to
$response = $fibonacci_rpc->call(35);//發(fā)送消息至prc隊(duì)列,并循環(huán)判斷回調(diào)隊(duì)列的處理結(jié)果霎肯。
echo ' [.] Got ', $response, "\n";//回調(diào)隊(duì)列的處理結(jié)果

server端代碼

$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
//聲明隊(duì)列
$channel->queue_declare('rpc_queue', false, false, false, false);
function fib($n)
{
    if ($n == 0) {
        return 0;
    }
    if ($n == 1) {
        return 1;
    }
    return fib($n-1) + fib($n-2);
}

echo " [x] Awaiting RPC requests\n";
$callback = function ($req) {
    //1.1監(jiān)聽rpc隊(duì)列擎颖,處理client發(fā)送的消息
    $n = intval($req->body);
    echo ' [.] fib(', $n, ")\n";

    //1.2.返回處理結(jié)果,并攜帶請(qǐng)求標(biāo)識(shí)
    $msg = new AMQPMessage(
        (string) fib($n),
        array('correlation_id' => $req->get('correlation_id'))
    );
    //2.發(fā)送消息至同一信道的 回調(diào)隊(duì)列姿现, 由client監(jiān)聽消費(fèi)肠仪。
    $req->delivery_info['channel']->basic_publish(
        $msg,
        '',
        $req->get('reply_to')
    );
    //3.消息接受確認(rèn)
    $req->ack();
};

//設(shè)置預(yù)加載數(shù)量,服務(wù)端worker公平調(diào)度
$channel->basic_qos(null, 1, null);
//輪訓(xùn)消費(fèi)备典,監(jiān)聽rpc隊(duì)列
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

調(diào)用結(jié)果

client:
 [.] Got 9227465
server:
 [x] Awaiting RPC requests
 [.] fib(35)

6.publisher confirms模式
publisher confirms是RabbitMq實(shí)現(xiàn)可靠傳輸的擴(kuò)展异旧,用來(lái)判斷publisher是否成功把消息發(fā)送到RabbitMq的broker。RabbitMq實(shí)現(xiàn)可靠傳輸?shù)姆绞接袃煞N:事務(wù)(不推薦)提佣、publisher confirms吮蛹,這兩種方式互斥。publisher confirms的實(shí)現(xiàn)方式又可分為:同步拌屏、異步潮针。

  • 6.1. 同步實(shí)現(xiàn)
    該模式是基于信道的,所以只要增加兩個(gè)步驟即可:
    6.1.1. 信道聲明為confirm模式
    6.1.2. 聲明同步等待的超時(shí)時(shí)間
    代碼如下:
...
$channel->confirm_select();//1.聲明信道為confirm模式
...
try {
    $channel->wait_for_pending_acks($timeOut);//2.同步等待timeOut時(shí)間
}catch (Exception $exception){
    echo "exception:" . $exception->getMessage() . PHP_EOL;
}

..

  • 6.2. 異步實(shí)現(xiàn)
    異步實(shí)現(xiàn)通過(guò)注冊(cè)回調(diào)的兩個(gè)方法set_ack_handler和set_nack_handler倚喂。
    代碼如下
$channel->confirm_select();//1.聲明信道為confirm模式

//2.消息被ack后的回調(diào)
$channel->set_ack_handler(function (AMQPMessage $msg) {
    echo "ack msg" . PHP_EOL;
    file_put_contents('./ackfile.txt',json_encode($msg),FILE_APPEND);
});

//3.消息被nack'ed后的回調(diào)
$channel->set_nack_handler(function (AMQPMessage $msg) {
    echo "nack msg" . PHP_EOL;
    file_put_contents('./nackfile.txt',json_encode($msg),FILE_APPEND);
});

$channel->wait_for_pending_acks();

以上只是RabbitMq各種模式的基本使用每篷,其他很多特性(持久化、網(wǎng)絡(luò)分區(qū)端圈、集群等)并未涉及焦读,若要使用更多的特性請(qǐng)查閱官網(wǎng)文檔,然后手動(dòng)跑一下代碼才能理解得更好舱权。希望本文能幫助大家對(duì)RabbitMq的使用有個(gè)大致了解矗晃。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市宴倍,隨后出現(xiàn)的幾起案子张症,更是在濱河造成了極大的恐慌,老刑警劉巖鸵贬,帶你破解...
    沈念sama閱讀 218,607評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件俗他,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡阔逼,警方通過(guò)查閱死者的電腦和手機(jī)拯辙,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,239評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人涯保,你說(shuō)我怎么就攤上這事诉濒。” “怎么了夕春?”我有些...
    開封第一講書人閱讀 164,960評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵未荒,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我及志,道長(zhǎng)片排,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,750評(píng)論 1 294
  • 正文 為了忘掉前任速侈,我火速辦了婚禮率寡,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘倚搬。我一直安慰自己冶共,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,764評(píng)論 6 392
  • 文/花漫 我一把揭開白布每界。 她就那樣靜靜地躺著捅僵,像睡著了一般。 火紅的嫁衣襯著肌膚如雪眨层。 梳的紋絲不亂的頭發(fā)上庙楚,一...
    開封第一講書人閱讀 51,604評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音趴樱,去河邊找鬼馒闷。 笑死,一個(gè)胖子當(dāng)著我的面吹牛叁征,可吹牛的內(nèi)容都是我干的纳账。 我是一名探鬼主播,決...
    沈念sama閱讀 40,347評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼航揉,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼塞祈!你這毒婦竟也來(lái)了金刁?” 一聲冷哼從身側(cè)響起帅涂,我...
    開封第一講書人閱讀 39,253評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎尤蛮,沒(méi)想到半個(gè)月后媳友,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,702評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡产捞,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,893評(píng)論 3 336
  • 正文 我和宋清朗相戀三年醇锚,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,015評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡焊唬,死狀恐怖恋昼,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情赶促,我是刑警寧澤液肌,帶...
    沈念sama閱讀 35,734評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站鸥滨,受9級(jí)特大地震影響嗦哆,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜婿滓,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,352評(píng)論 3 330
  • 文/蒙蒙 一老速、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧凸主,春花似錦橘券、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,934評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至但两,卻和暖如春鬓梅,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背谨湘。 一陣腳步聲響...
    開封第一講書人閱讀 33,052評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工绽快, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人紧阔。 一個(gè)月前我還...
    沈念sama閱讀 48,216評(píng)論 3 371
  • 正文 我出身青樓坊罢,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親擅耽。 傳聞我的和親對(duì)象是個(gè)殘疾皇子活孩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,969評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 1,摘要 從安裝環(huán)境乖仇,配置入門憾儒,到HelloWorld實(shí)操,各種類型消息傳遞的演示代碼乃沙,原理介紹起趾,答疑解惑,面試題...
    筆名輝哥閱讀 1,953評(píng)論 0 3
  • 問(wèn)題一:RabbitMQ中的 broker 是指什么警儒?cluster 又是指什么训裆? 答:broker是指一個(gè)或多個(gè)...
    Leslie_Lee閱讀 178評(píng)論 0 0
  • # HelloWorld 簡(jiǎn)介 RabbitMQ:接受消息再傳遞消息,可以視為一個(gè)“郵局”。發(fā)送者和接受者通過(guò)隊(duì)列...
    xncode閱讀 1,442評(píng)論 0 1
  • 1.什么是消息隊(duì)列 消息隊(duì)列允許應(yīng)用間通過(guò)消息的發(fā)送與接收的方式進(jìn)行通信边琉,當(dāng)消息接收方服務(wù)忙或不可用時(shí)属百,其提供了一...
    zhuke閱讀 4,469評(píng)論 0 12
  • RabbitMQ是一個(gè)分布式系統(tǒng) 一、使用rabbitmq時(shí)的系統(tǒng)架構(gòu)圖 通過(guò)路由鍵將交換機(jī)和隊(duì)列進(jìn)行綁定变姨,從而實(shí)...
    Gem_kaili閱讀 4,990評(píng)論 0 0