4椎咧、簡單封裝RabbitMQ的發(fā)布與訂閱模式

生產(chǎn)者類:Publisher.class.php

classPublisher{

? ? ? ? ? private $config=array();

? ? ? ? ? private $conn=Null;

? ? ? ? ? private $channel=Null;

? ? ? ? ? private $exchange=Null;

? ? ? ? ? public $is_ready=False;

? ? ? ? ?/**

? ? ? ? ? * 創(chuàng)建連接侠讯,并指定交換機

? ? ? ? ? * @paramarray $config RabbitMQ服務(wù)器信息

? ? ? ? ? * @paramstring $e_name交換機名稱

? ? ? ? ? * @returnvoid

? ? ? ? ? */

? ? ? ? ? public function__construct($config,$e_name){

? ? ? ? ? ? ? ? ? if(!($config&&$e_name)) {

? ? ? ? ? ? ? ? ? ? ? ? ? ?return False;

? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? shuffle($config);

? ? ? ? ? ? ? ? ? $this->config=$config;

? ? ? ? ? ? ? ? ? if(!self::connect()) {

? ? ? ? ? ? ? ? ? ? ? ? ? return False;

? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? $this->channel=newAMQPChannel($this->conn);

? ? ? ? ? ? ? ? ? $this->establishExchange($e_name);

? ? ? ? ? ? ? ? ? $this->is_ready=True;

? ? ? ? ? }

? ? ? ? ? /**

? ? ? ? ? ?* 發(fā)送消息

? ? ? ? ? ?* @paramstring $msg消息體

? ? ? ? ? ?* @paramstring $k_route路由鍵

? ? ? ? ? ?* @returnint / False

? ? ? ? ? ?*/

? ? ? ? ? ?public functionsend($msg,$k_route){

? ? ? ? ? ? ? ? ? ?$msg=trim(strval($msg));

? ? ? ? ? ? ? ? ? ?if(!$this->exchange||$msg===''|| !$k_route) return False;

? ? ? ? ? ? ? ? ? ?$ret=$this->exchange->publish($msg,$k_route);

? ? ? ? ? ? ? ? ? ?return $ret;

? ? ? ? ? ?}

? ? ? ? ? ?/**

? ? ? ? ? ? * 創(chuàng)建鏈接

? ? ? ? ? ? * 無法鏈接時則會自動選擇下一個配置項(IP不通的情況下會有5秒等待)

? ? ? ? ? ? * @paramint $i配置項索引

? ? ? ? ? ? * @returnbool

? ? ? ? ? ? */

? ? ? ? ? ? private functionconnect($i=0){

? ? ? ? ? ? ? ? ? ? if(array_key_exists($i,$this->config)){

? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try{

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $this->conn=newAMQPConnection($this->config[$i]);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $this->conn->connect();

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $ret=True;

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }catch(AMQPConnectionException$e){

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $ret=$this->connect(++$i);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ?}else{

? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$ret=False;

? ? ? ? ? ? ? ? ? ? ?}

? ? ? ? ? ? ? ? ? ? ?return$ret;

? ? ? ? ? ? }

? ? ? ? ? ? /**

? ? ? ? ? ? ?* 創(chuàng)建交換機

? ? ? ? ? ? ?* @paramstring $name名稱

? ? ? ? ? ? ?* @returnvoid

? ? ? ? ? ? ?*/

? ? ? ? ? ? ?private functionestablishExchange($name){

? ? ? ? ? ? ? ? ? ? ? ?$this->exchange=newAMQPExchange($this->channel);

? ? ? ? ? ? ? ? ? ? ? ?$this->exchange->setName($name);

? ? ? ? ? ? ? }

? ? ? ? ? ? ? public function__destruct(){

? ? ? ? ? ? ? ? ? ? ? ?if($this->conn){

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $this->conn->disconnect();

? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ?}

}

消費者類:Consumer.class.php

class Consumer {

? ? ? ? ? ?private$config=array();

? ? ? ? ? ?private$durable=True;

? ? ? ? ? ?private$mirror=False;

? ? ? ? ? ?private$autodelete=False;

? ? ? ? ? ?private$conn=Null;

? ? ? ? ? ?private$channel=Null;

? ? ? ? ? ?private$queue=Null;

? ? ? ? ? ?public$is_ready=False;

? ? ? ? ? ?/**

? ? ? ? ? ? * 創(chuàng)建連接挖藏、交換機、隊列厢漩,并綁定

? ? ? ? ? ? * @paramarray $config RabbitMQ服務(wù)器信息

? ? ? ? ? ? * @paramstring $e_name交換機名稱

? ? ? ? ? ? * @paramstring $k_route路由鍵

? ? ? ? ? ? * @paramstring $q_name隊列名稱

? ? ? ? ? ? * @parambool $durable隊列是否持久化

? ? ? ? ? ? * @parambool $mirror隊列是否鏡像

? ? ? ? ? ? * @returnvoid

? ? ? ? ? ? */

? ? ? ? ? ? public function__construct($config,$e_name,$k_route,$q_name,$durable=True,$mirror=False,$autodelete=False){

? ? ? ? ? ? ? ? ? ? ? ? ?if(!($config&&$e_name&&$q_name&&$k_route)){

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?return False;

? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? ? shuffle($config);

? ? ? ? ? ? ? ? ? ? ? ? ? $this->config=$config;

? ? ? ? ? ? ? ? ? ? ? ? ? if(!self::connect()){

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? return False;

? ? ? ? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? ? ? ?$this->channel=newAMQPChannel($this->conn);

? ? ? ? ? ? ? ? ? ? ? ? ?$this->durable= (bool)$durable;

? ? ? ? ? ? ? ? ? ? ? ? ?$this->mirror= (bool)$mirror;

? ? ? ? ? ? ? ? ? ? ? ? ?$this->autodelete= (bool)$autodelete;

? ? ? ? ? ? ? ? ? ? ? ? $this->establishExchange($e_name);

? ? ? ? ? ? ? ? ? ? ? ? $this->establishQueue($q_name,$e_name,$k_route);

? ? ? ? ? ? ? ? ? ? ? ? $this->is_ready=True;

? ? ? ? ? ? ?}

? ? ? ? ? ? /**

? ? ? ? ? ? ?* 循環(huán)阻塞方式接收消息

? ? ? ? ? ? ?* @paramstring $fun_name自定義處理函數(shù)的函數(shù)名

? ? ? ? ? ? ? * @parambool $autoack是否自動發(fā)送ACK應(yīng)答膜眠,否則需要在自定義處理函數(shù)中手動發(fā)送

? ? ? ? ? ? ? * @returnbool

? ? ? ? ? ? ? */

? ? ? ? ? ? ? public functionrun($fun_name,$autoack=True){

? ? ? ? ? ? ? ? ? ? ? ? ?$fun_name=strval($fun_name);

? ? ? ? ? ? ? ? ? ? ? ? ?if(!$fun_name|| !$this->queue)return False;

? ? ? ? ? ? ? ? ? ? ? ? ?while(True){

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? if($autoack)$this->queue->consume($fun_name,AMQP_AUTOACK);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? else$this->queue->consume($fun_name);

? ? ? ? ? ? ? ? ? ? ? ? ?}

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? /**

? ? ? ? ? ? ? ? ?* 創(chuàng)建鏈接

? ? ? ? ? ? ? ? ?* 無法鏈接時則會自動選擇下一個配置項(IP不通的情況下會有5秒等待)

? ? ? ? ? ? ? ? ?* @paramint $i配置項索引

? ? ? ? ? ? ? ? ?* @returnbool

? ? ? ? ? ? ? ? ?*/

? ? ? ? ? ? ? ? ?private functionconnect($i=0){

? ? ? ? ? ? ? ? ? ? ? ? ? ? if(array_key_exists($i,$this->config)){

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try{

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$this->conn=newAMQPConnection($this->config[$i]);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$this->conn->connect();

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$ret=True;

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}catch(AMQPConnectionException$e){

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $ret=$this->connect(++$i);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}

? ? ? ? ? ? ? ? ? ? ? ? ? ?}else{

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$ret=False;

? ? ? ? ? ? ? ? ? ? ? ? ? ?}

? ? ? ? ? ? ? ? ? ? ? ? ? return $ret;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ?/**

? ? ? ? ? ? ? ? * 創(chuàng)建交換機

? ? ? ? ? ? ? ? * @paramstring $name名稱

? ? ? ? ? ? ? ? * @returnint

? ? ? ? ? ? ? ? */

? ? ? ? ? ? ? ? private functionestablishExchange($name){

? ? ? ? ? ? ? ? ? ? ? ? ? ?$ex=newAMQPExchange($this->channel);

? ? ? ? ? ? ? ? ? ? ? ? ? ?$ex->setName($name);

? ? ? ? ? ? ? ? ? ? ? ? ? ?$ex->setType(AMQP_EX_TYPE_DIRECT);//direct類型

? ? ? ? ? ? ? ? ? ? ? ? ? ?if($this->durable) $ex->setFlags(AMQP_DURABLE);//持久化

? ? ? ? ? ? ? ? ? ? ? ? ? ?//return $ex->declareExchange();

? ? ? ? ? ? ? ? ? ? ? ? ? ?return true;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? /**

? ? ? ? ? ? ? ? ?* 創(chuàng)建隊列

? ? ? ? ? ? ? ? ?* @paramstring $name名稱

? ? ? ? ? ? ? ? ?* @paramstring $e_name交換機名稱

? ? ? ? ? ? ? ? ?* @paramstring $k_route路由鍵

? ? ? ? ? ? ? ? ?* @returnint

? ? ? ? ? ? ? ? ?*/

? ? ? ? ? ? ? ? ?private functionestablishQueue($name,$e_name,$k_route){

? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$this->queue=newAMQPQueue($this->channel);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$this->queue->setName($name);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ?if($this->durable)$this->queue->setFlags(AMQP_DURABLE);//持久化

? ? ? ? ? ? ? ? ? ? ? ? ? ? ?if($this->mirror)$this->queue->setArgument('x-ha-policy','all');//鏡像

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? if($this->autodelete)$this->queue->setFlags(AMQP_AUTODELETE);//auto-delete

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $this->queue->declareQueue();

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $ret=$this->queue->bind($e_name,$k_route);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? return$ret;

? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? public function__destruct(){

? ? ? ? ? ? ? ? ? ? ? ? ? ? if($this->conn){

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$this->conn->disconnect();

? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}

? ? ? ? ? ? ? ? ? ?}

}

demoC.php

include_once'./Consumer.class.php';

functionlogResult($word='') {

? ? ? ? ? ?$fp=fopen("log.txt","a");

? ? ? ? ? ?flock($fp,LOCK_EX) ;

? ? ? ? ? ?fwrite($fp,"執(zhí)行日期:".strftime("%Y-%m-%d %H:%M:%S",time())."\n".$word."\n");

? ? ? ? ? ?flock($fp,LOCK_UN);

? ? ? ? ? ?fclose($fp);

}

$config=array(

? ? ? ? ?array(

? ? ? ? ? ? ? ? ? ? ? ? ?'host'=>'127.0.0.1',

? ? ? ? ? ? ? ? ? ? ? ? ?'port'=>'5672',

? ? ? ? ? ? ? ? ? ? ? ? ?'login'=>'ybl',

? ? ? ? ? ? ? ? ? ? ? ? ?'password'=>'ybl',

? ? ? ? ? ? ? ? ? ? ? ? ?'vhost'=>'/'

? ? ? ? ?),

? ? ? ? ?array(

? ? ? ? ? ? ? ? ? ? ? ? ?'host'=>'127.0.0.2',

? ? ? ? ? ? ? ? ? ? ? ? ?'port'=>'5672',

? ? ? ? ? ? ? ? ? ? ? ? ?'login'=>'ybl',

? ? ? ? ? ? ? ? ? ? ? ? ?'password'=>'ybl',

? ? ? ? ? ? ? ? ? ? ? ? ?'vhost'=>'/'

? ? ? ? ? )

);

$e_name='demo';//交換機名

$q_name='ybl';//隊列名

$k_route='hello';//路由key

if(!$cs=newConsumer($config,$e_name,$k_route,$q_name)){

? ? ? ? ? ? ?exit("error");

}

//第二個參數(shù)默認為true,自動發(fā)送ACK應(yīng)答

$cs->run('dealMessage');

//消費回調(diào)函數(shù),處理消息

functiondealMessage($envelope,$queue) {

$msg=$envelope->getBody();

//記錄log日記

logResult($msg);

$queue->ack($envelope->getDeliveryTag());//手動發(fā)送ACK應(yīng)答

}

demoP.php

include_once'./Publisher.class.php';

$config=array(

? ? ? ?array(

? ? ? ? ? ? ? ?'host'=>'127.0.0.1',

? ? ? ? ? ? ? ?'port'=>'5672',

? ? ? ? ? ? ? ?'login'=>'ybl',

? ? ? ? ? ? ? ?'password'=>'ybl',

? ? ? ? ? ? ? ?'vhost'=>'/'

? ? ? ),

? ? ? array(

? ? ? ? ? ? ? 'host'=>'127.0.0.2',

? ? ? ? ? ? ? 'port'=>'5672',

? ? ? ? ? ? ? 'login'=>'ybl',

? ? ? ? ? ? ? 'password'=>'ybl',

? ? ? ? ? ? ? 'vhost'=>'/'

? ? ? ? ?)

);

$e_name='demo';//交換機名

$k_route='hello';//路由key

if(!$conn=newPublisher($config,$e_name)){

? ? ? ? ? ?echo'error';

? ? ? ? ? ?exit;

}

$msg='hello RabbitMQ';

for($i=0;$i<10;$i++){

? ? ? ? ? ?$res=$conn->send($msg,$k_route);

? ? ? ? ? ?ob_flush();

? ? ? ? ? ?flush();

? ? ? ? ? ?echo $res;

? ? ? ? ? ?sleep(1);

}

運行腳本demoP.php ? demoC.php查看

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末溜嗜,一起剝皮案震驚了整個濱河市宵膨,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌炸宵,老刑警劉巖柄驻,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異焙压,居然都是意外死亡鸿脓,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門涯曲,熙熙樓的掌柜王于貴愁眉苦臉地迎上來野哭,“玉大人,你說我怎么就攤上這事幻件〔η” “怎么了?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵绰沥,是天一觀的道長篱蝇。 經(jīng)常有香客問我,道長徽曲,這世上最難降的妖魔是什么零截? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮秃臣,結(jié)果婚禮上涧衙,老公的妹妹穿的比我還像新娘哪工。我一直安慰自己,他們只是感情好弧哎,可當我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布雁比。 她就那樣靜靜地躺著,像睡著了一般撤嫩。 火紅的嫁衣襯著肌膚如雪偎捎。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天序攘,我揣著相機與錄音茴她,去河邊找鬼。 笑死两踏,一個胖子當著我的面吹牛败京,可吹牛的內(nèi)容都是我干的兜喻。 我是一名探鬼主播梦染,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼朴皆!你這毒婦竟也來了帕识?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤遂铡,失蹤者是張志新(化名)和其女友劉穎肮疗,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體扒接,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡伪货,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了钾怔。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片碱呼。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖宗侦,靈堂內(nèi)的尸體忽然破棺而出愚臀,到底是詐尸還是另有隱情,我是刑警寧澤矾利,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布姑裂,位于F島的核電站,受9級特大地震影響男旗,放射性物質(zhì)發(fā)生泄漏舶斧。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一察皇、第九天 我趴在偏房一處隱蔽的房頂上張望捧毛。 院中可真熱鬧,春花似錦呀忧、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽咐吼。三九已至,卻和暖如春厢塘,著一層夾襖步出監(jiān)牢的瞬間晚碾,已是汗流浹背喂急。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工廊移, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留狡孔,地道東北人。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓响禽,卻偏偏與公主長得像荚醒,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子界阁,可洞房花燭夜當晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理泡躯,服務(wù)發(fā)現(xiàn)丽焊,斷路器咕别,智...
    卡卡羅2017閱讀 134,657評論 18 139
  • 不支持上傳文件惰拱,所以就復(fù)制過來了偿短。作者信息什么的都沒刪。對前端基本屬于一竅不通降传,所以沒有任何修改婆排,反正用著沒問題就...
    全棧在路上閱讀 1,960評論 0 2
  • 背景 一年多以前我在知乎上答了有關(guān)LeetCode的問題, 分享了一些自己做題目的經(jīng)驗泽论。 張土汪:刷leetcod...
    土汪閱讀 12,745評論 0 33
  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語法缚够,類相關(guān)的語法幔妨,內(nèi)部類的語法误堡,繼承相關(guān)的語法,異常的語法雏吭,線程的語...
    子非魚_t_閱讀 31,631評論 18 399
  • 為什么說中國是基建狂魔姥饰? “基建狂魔”這個外號應(yīng)該是近年間才興起的孝治,源于近年來中國一系列大規(guī)牧蟹啵基建建設(shè)和超級工程审磁。...
    玉扳手閱讀 1,006評論 0 0