生產(chǎn)者類:Publisher.class.php
classPublisher{
? ? ? ? ? private $config=array();
? ? ? ? ? private $conn=Null;
? ? ? ? ? private $channel=Null;
? ? ? ? ? private $exchange=Null;
? ? ? ? ? public $is_ready=False;
? ? ? ? ?/**
? ? ? ? ? * 創(chuàng)建連接侠讯,并指定交換機
? ? ? ? ? * @paramarray $config RabbitMQ服務(wù)器信息
? ? ? ? ? * @paramstring $e_name交換機名稱
? ? ? ? ? * @returnvoid
? ? ? ? ? */
? ? ? ? ? public function__construct($config,$e_name){
? ? ? ? ? ? ? ? ? if(!($config&&$e_name)) {
? ? ? ? ? ? ? ? ? ? ? ? ? ?return False;
? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? shuffle($config);
? ? ? ? ? ? ? ? ? $this->config=$config;
? ? ? ? ? ? ? ? ? if(!self::connect()) {
? ? ? ? ? ? ? ? ? ? ? ? ? return False;
? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? $this->channel=newAMQPChannel($this->conn);
? ? ? ? ? ? ? ? ? $this->establishExchange($e_name);
? ? ? ? ? ? ? ? ? $this->is_ready=True;
? ? ? ? ? }
? ? ? ? ? /**
? ? ? ? ? ?* 發(fā)送消息
? ? ? ? ? ?* @paramstring $msg消息體
? ? ? ? ? ?* @paramstring $k_route路由鍵
? ? ? ? ? ?* @returnint / False
? ? ? ? ? ?*/
? ? ? ? ? ?public functionsend($msg,$k_route){
? ? ? ? ? ? ? ? ? ?$msg=trim(strval($msg));
? ? ? ? ? ? ? ? ? ?if(!$this->exchange||$msg===''|| !$k_route) return False;
? ? ? ? ? ? ? ? ? ?$ret=$this->exchange->publish($msg,$k_route);
? ? ? ? ? ? ? ? ? ?return $ret;
? ? ? ? ? ?}
? ? ? ? ? ?/**
? ? ? ? ? ? * 創(chuàng)建鏈接
? ? ? ? ? ? * 無法鏈接時則會自動選擇下一個配置項(IP不通的情況下會有5秒等待)
? ? ? ? ? ? * @paramint $i配置項索引
? ? ? ? ? ? * @returnbool
? ? ? ? ? ? */
? ? ? ? ? ? private functionconnect($i=0){
? ? ? ? ? ? ? ? ? ? if(array_key_exists($i,$this->config)){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $this->conn=newAMQPConnection($this->config[$i]);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $this->conn->connect();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $ret=True;
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }catch(AMQPConnectionException$e){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $ret=$this->connect(++$i);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ?}else{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$ret=False;
? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ? ?return$ret;
? ? ? ? ? ? }
? ? ? ? ? ? /**
? ? ? ? ? ? ?* 創(chuàng)建交換機
? ? ? ? ? ? ?* @paramstring $name名稱
? ? ? ? ? ? ?* @returnvoid
? ? ? ? ? ? ?*/
? ? ? ? ? ? ?private functionestablishExchange($name){
? ? ? ? ? ? ? ? ? ? ? ?$this->exchange=newAMQPExchange($this->channel);
? ? ? ? ? ? ? ? ? ? ? ?$this->exchange->setName($name);
? ? ? ? ? ? ? }
? ? ? ? ? ? ? public function__destruct(){
? ? ? ? ? ? ? ? ? ? ? ?if($this->conn){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $this->conn->disconnect();
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ?}
}
消費者類:Consumer.class.php
class Consumer {
? ? ? ? ? ?private$config=array();
? ? ? ? ? ?private$durable=True;
? ? ? ? ? ?private$mirror=False;
? ? ? ? ? ?private$autodelete=False;
? ? ? ? ? ?private$conn=Null;
? ? ? ? ? ?private$channel=Null;
? ? ? ? ? ?private$queue=Null;
? ? ? ? ? ?public$is_ready=False;
? ? ? ? ? ?/**
? ? ? ? ? ? * 創(chuàng)建連接挖藏、交換機、隊列厢漩,并綁定
? ? ? ? ? ? * @paramarray $config RabbitMQ服務(wù)器信息
? ? ? ? ? ? * @paramstring $e_name交換機名稱
? ? ? ? ? ? * @paramstring $k_route路由鍵
? ? ? ? ? ? * @paramstring $q_name隊列名稱
? ? ? ? ? ? * @parambool $durable隊列是否持久化
? ? ? ? ? ? * @parambool $mirror隊列是否鏡像
? ? ? ? ? ? * @returnvoid
? ? ? ? ? ? */
? ? ? ? ? ? public function__construct($config,$e_name,$k_route,$q_name,$durable=True,$mirror=False,$autodelete=False){
? ? ? ? ? ? ? ? ? ? ? ? ?if(!($config&&$e_name&&$q_name&&$k_route)){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?return False;
? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ? shuffle($config);
? ? ? ? ? ? ? ? ? ? ? ? ? $this->config=$config;
? ? ? ? ? ? ? ? ? ? ? ? ? if(!self::connect()){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? return False;
? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? ?$this->channel=newAMQPChannel($this->conn);
? ? ? ? ? ? ? ? ? ? ? ? ?$this->durable= (bool)$durable;
? ? ? ? ? ? ? ? ? ? ? ? ?$this->mirror= (bool)$mirror;
? ? ? ? ? ? ? ? ? ? ? ? ?$this->autodelete= (bool)$autodelete;
? ? ? ? ? ? ? ? ? ? ? ? $this->establishExchange($e_name);
? ? ? ? ? ? ? ? ? ? ? ? $this->establishQueue($q_name,$e_name,$k_route);
? ? ? ? ? ? ? ? ? ? ? ? $this->is_ready=True;
? ? ? ? ? ? ?}
? ? ? ? ? ? /**
? ? ? ? ? ? ?* 循環(huán)阻塞方式接收消息
? ? ? ? ? ? ?* @paramstring $fun_name自定義處理函數(shù)的函數(shù)名
? ? ? ? ? ? ? * @parambool $autoack是否自動發(fā)送ACK應(yīng)答膜眠,否則需要在自定義處理函數(shù)中手動發(fā)送
? ? ? ? ? ? ? * @returnbool
? ? ? ? ? ? ? */
? ? ? ? ? ? ? public functionrun($fun_name,$autoack=True){
? ? ? ? ? ? ? ? ? ? ? ? ?$fun_name=strval($fun_name);
? ? ? ? ? ? ? ? ? ? ? ? ?if(!$fun_name|| !$this->queue)return False;
? ? ? ? ? ? ? ? ? ? ? ? ?while(True){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? if($autoack)$this->queue->consume($fun_name,AMQP_AUTOACK);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? else$this->queue->consume($fun_name);
? ? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? /**
? ? ? ? ? ? ? ? ?* 創(chuàng)建鏈接
? ? ? ? ? ? ? ? ?* 無法鏈接時則會自動選擇下一個配置項(IP不通的情況下會有5秒等待)
? ? ? ? ? ? ? ? ?* @paramint $i配置項索引
? ? ? ? ? ? ? ? ?* @returnbool
? ? ? ? ? ? ? ? ?*/
? ? ? ? ? ? ? ? ?private functionconnect($i=0){
? ? ? ? ? ? ? ? ? ? ? ? ? ? if(array_key_exists($i,$this->config)){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$this->conn=newAMQPConnection($this->config[$i]);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$this->conn->connect();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$ret=True;
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}catch(AMQPConnectionException$e){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $ret=$this->connect(++$i);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ? ? ? ? ?}else{
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$ret=False;
? ? ? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ? ? ? ? return $ret;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ?/**
? ? ? ? ? ? ? ? * 創(chuàng)建交換機
? ? ? ? ? ? ? ? * @paramstring $name名稱
? ? ? ? ? ? ? ? * @returnint
? ? ? ? ? ? ? ? */
? ? ? ? ? ? ? ? private functionestablishExchange($name){
? ? ? ? ? ? ? ? ? ? ? ? ? ?$ex=newAMQPExchange($this->channel);
? ? ? ? ? ? ? ? ? ? ? ? ? ?$ex->setName($name);
? ? ? ? ? ? ? ? ? ? ? ? ? ?$ex->setType(AMQP_EX_TYPE_DIRECT);//direct類型
? ? ? ? ? ? ? ? ? ? ? ? ? ?if($this->durable) $ex->setFlags(AMQP_DURABLE);//持久化
? ? ? ? ? ? ? ? ? ? ? ? ? ?//return $ex->declareExchange();
? ? ? ? ? ? ? ? ? ? ? ? ? ?return true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? /**
? ? ? ? ? ? ? ? ?* 創(chuàng)建隊列
? ? ? ? ? ? ? ? ?* @paramstring $name名稱
? ? ? ? ? ? ? ? ?* @paramstring $e_name交換機名稱
? ? ? ? ? ? ? ? ?* @paramstring $k_route路由鍵
? ? ? ? ? ? ? ? ?* @returnint
? ? ? ? ? ? ? ? ?*/
? ? ? ? ? ? ? ? ?private functionestablishQueue($name,$e_name,$k_route){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$this->queue=newAMQPQueue($this->channel);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$this->queue->setName($name);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?if($this->durable)$this->queue->setFlags(AMQP_DURABLE);//持久化
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?if($this->mirror)$this->queue->setArgument('x-ha-policy','all');//鏡像
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? if($this->autodelete)$this->queue->setFlags(AMQP_AUTODELETE);//auto-delete
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $this->queue->declareQueue();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? $ret=$this->queue->bind($e_name,$k_route);
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? return$ret;
? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? public function__destruct(){
? ? ? ? ? ? ? ? ? ? ? ? ? ? if($this->conn){
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?$this->conn->disconnect();
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ?}
}
demoC.php
include_once'./Consumer.class.php';
functionlogResult($word='') {
? ? ? ? ? ?$fp=fopen("log.txt","a");
? ? ? ? ? ?flock($fp,LOCK_EX) ;
? ? ? ? ? ?fwrite($fp,"執(zhí)行日期:".strftime("%Y-%m-%d %H:%M:%S",time())."\n".$word."\n");
? ? ? ? ? ?flock($fp,LOCK_UN);
? ? ? ? ? ?fclose($fp);
}
$config=array(
? ? ? ? ?array(
? ? ? ? ? ? ? ? ? ? ? ? ?'host'=>'127.0.0.1',
? ? ? ? ? ? ? ? ? ? ? ? ?'port'=>'5672',
? ? ? ? ? ? ? ? ? ? ? ? ?'login'=>'ybl',
? ? ? ? ? ? ? ? ? ? ? ? ?'password'=>'ybl',
? ? ? ? ? ? ? ? ? ? ? ? ?'vhost'=>'/'
? ? ? ? ?),
? ? ? ? ?array(
? ? ? ? ? ? ? ? ? ? ? ? ?'host'=>'127.0.0.2',
? ? ? ? ? ? ? ? ? ? ? ? ?'port'=>'5672',
? ? ? ? ? ? ? ? ? ? ? ? ?'login'=>'ybl',
? ? ? ? ? ? ? ? ? ? ? ? ?'password'=>'ybl',
? ? ? ? ? ? ? ? ? ? ? ? ?'vhost'=>'/'
? ? ? ? ? )
);
$e_name='demo';//交換機名
$q_name='ybl';//隊列名
$k_route='hello';//路由key
if(!$cs=newConsumer($config,$e_name,$k_route,$q_name)){
? ? ? ? ? ? ?exit("error");
}
//第二個參數(shù)默認為true,自動發(fā)送ACK應(yīng)答
$cs->run('dealMessage');
//消費回調(diào)函數(shù),處理消息
functiondealMessage($envelope,$queue) {
$msg=$envelope->getBody();
//記錄log日記
logResult($msg);
$queue->ack($envelope->getDeliveryTag());//手動發(fā)送ACK應(yīng)答
}
demoP.php
include_once'./Publisher.class.php';
$config=array(
? ? ? ?array(
? ? ? ? ? ? ? ?'host'=>'127.0.0.1',
? ? ? ? ? ? ? ?'port'=>'5672',
? ? ? ? ? ? ? ?'login'=>'ybl',
? ? ? ? ? ? ? ?'password'=>'ybl',
? ? ? ? ? ? ? ?'vhost'=>'/'
? ? ? ),
? ? ? array(
? ? ? ? ? ? ? 'host'=>'127.0.0.2',
? ? ? ? ? ? ? 'port'=>'5672',
? ? ? ? ? ? ? 'login'=>'ybl',
? ? ? ? ? ? ? 'password'=>'ybl',
? ? ? ? ? ? ? 'vhost'=>'/'
? ? ? ? ?)
);
$e_name='demo';//交換機名
$k_route='hello';//路由key
if(!$conn=newPublisher($config,$e_name)){
? ? ? ? ? ?echo'error';
? ? ? ? ? ?exit;
}
$msg='hello RabbitMQ';
for($i=0;$i<10;$i++){
? ? ? ? ? ?$res=$conn->send($msg,$k_route);
? ? ? ? ? ?ob_flush();
? ? ? ? ? ?flush();
? ? ? ? ? ?echo $res;
? ? ? ? ? ?sleep(1);
}
運行腳本demoP.php ? demoC.php查看