發(fā)送消息
send.py
#coding:utf-8
# 引入pika庫(kù)
import pika
#創(chuàng)建發(fā)送消息的類
class Message_Producer():
def __init__(self,ip):
''' 初始化發(fā)送生產(chǎn)消息類
:param ip: rabbitmq所在服務(wù)器
'''
self.ip = ip
# 創(chuàng)建連接
self.connection = pika.BlockingConnection(pika.ConnectionParameters(self.ip))
#創(chuàng)建通道
self.channel = self.__make_channel()
def __make_channel(self):
'''
創(chuàng)建通道
:return:返回通道實(shí)例
'''
__channel = self.connection.channel()
return __channel
def make_queue(self,queue_name):
'''
創(chuàng)建隊(duì)列
:param queue_name: 需要?jiǎng)?chuàng)建的隊(duì)列名稱
'''
self.channel.queue_declare(queue=queue_name)
def send_message(self,queue_name,message,exchange=''):
'''
發(fā)送消息
:param queue_name: 指定發(fā)送的隊(duì)列
:param message: 需要發(fā)送的消息
:param exchange: 交換機(jī)
'''
self.channel.basic_publish(exchange=exchange,
routing_key=queue_name,#指定隊(duì)列的名稱
body=message#消息內(nèi)容 )
print(" [x] Sent %s"%message)
def close_connection(self):
'''
關(guān)閉連接
'''
self.connection.close()
if __name__ == '__main__':
# 實(shí)例化生產(chǎn)消息類
message_producer = Message_Producer(ip='192.168.1.113')
# 創(chuàng)建隊(duì)列
message_producer.make_queue(queue_name='queue1')
# 發(fā)送消息
message_producer.send_message(queue_name='name1',message='hello,i am name1')
# 關(guān)閉連接
message_producer.close_connection()
接收消息
receive.py
#coding:utf-8
# 引入pika庫(kù)
import pika
#創(chuàng)建消費(fèi)消息的類
class Message_Consumer():
def __init__(self,ip):
'''
初始化消費(fèi)消息類
:param ip: rabbitmq所在服務(wù)器
'''
self.ip = ip
# 創(chuàng)建連接
self.connection = pika.BlockingConnection(pika.ConnectionParameters(self.ip))
#創(chuàng)建通道
self.channel = self.__make_channel()
def __make_channel(self):
'''
創(chuàng)建通道
:return:返回通道實(shí)例
'''
__channel = self.connection.channel()
return __channel
def receive(self,queue_name):
#接收到消息后的回調(diào)函數(shù)
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
#指定接收消息的隊(duì)列艘儒,并執(zhí)行回調(diào)函數(shù)
self.channel.basic_consume(callback, queue=queue_name, no_ack=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
#開(kāi)始接收信息锯岖,并進(jìn)入阻塞狀態(tài)罩抗,隊(duì)列里有信息才會(huì)調(diào)用callback進(jìn)行處理。按ctrl+c退出栅隐。
self.channel.start_consuming()
def close_connection(self):
'''
關(guān)閉連接
'''
self.connection.close()
if __name__ == '__main__':
message_consumer = Message_Consumer('192.168.1.106')
message_consumer.receive('name1')
使用過(guò)程
1.打開(kāi)一個(gè)命令行窗口清女,進(jìn)入到send.py目錄,執(zhí)行命令
python send.py
可以看到此時(shí)消息已經(jīng)被發(fā)送出去了
2.查看隊(duì)列是否增加了一個(gè)消息
3.再打開(kāi)一個(gè)cmd窗口谍失,進(jìn)入到receive.py目錄眶俩,執(zhí)行命名
python receive.py
此時(shí)消息已被接收
4.查看隊(duì)列的消息是否減少了