What is RabbitMQ####
RabbitMQ是一個消息代理、一個消息系統(tǒng)的媒介祟滴。他可以提供一個通用的消息發(fā)送和接收平臺,并且保證消息在傳輸?shù)倪^程中的安全。
注:使用pika Python客戶端
核心理念####
RabbitMQ核心就是接收和發(fā)送消息瘤睹,我們可以將它想象成一個郵局,我們把信件放在郵箱中答倡,郵遞員會將信件投遞到你填寫的收件人哪里轰传。
在這個比喻中,RabbitMQ就扮演著郵箱瘪撇、郵局和郵遞員的角色获茬。
但是區(qū)別就是RabbitMQ處理的是消息(message)這種二進制數(shù)據。
RabbitMQ中專有名詞####
-
生產者(Producing):就是消息產生的一方倔既。發(fā)送消息的程序就是一個生產者恕曲。一般使用"P"表示。
-
隊列(Queue)就是郵箱的名稱渤涌。消息通過應用程序和RabbitMQ進行傳輸佩谣,這些消息能夠存儲在一個隊列中,隊列沒有限制实蓬,基本上是一個無限的緩沖茸俭,但是過多的消息積累也會讓RabbitMQ的性能下降。
多個生產者能將消息發(fā)送到一個隊列中安皱,多個消費者也能從同一個隊列中取消息调鬓。
-
消費者(Consuming)和獲取消息是一個意思。一個消費者就是一個等待獲取消息的程序酌伊。通常把它繪制成"C"腾窝。
Hello World!####
我們需要編寫兩個程序居砖,一個程序將"Hello World!"發(fā)送到隊列中(這句話說的不夠嚴謹虹脯,生產者是不能直接將消息發(fā)送到隊列中的),然后另一個程序從隊列中取出后并打印消息內容奏候。
RabbitMQ庫#####
RabbitMQ使用的是AMQP協(xié)議循集。要使用它就不洗使用一個使用相同協(xié)議的庫。python中有一個庫叫做pika鼻由。
安裝pika庫(Linux)
$ sudo pip install pika
發(fā)送消息#####
file_name: send.py
第一個程序send.py會發(fā)送一個消息到隊列中。首先建立一個到RabbitMQ服務器的連接connection厚棵。
# coding:utf-8
import pika
# 創(chuàng)建一個與RabbitMQ服務器的連接
connection=pika.BlockingConnection(pika.ConnectionParam(
'localhost'))
channel = connection.channel()
現(xiàn)在我們已經連接上服務器蕉世,在發(fā)送消息前我們需要確認隊列是存在的,如果我們把消息發(fā)送一個不存在的隊列婆硬,RabbitMQ會丟棄消息狠轻,我們先創(chuàng)建一個名字為hello的隊列,然后將消息發(fā)送到這個隊列彬犯。
# 創(chuàng)建了一個名字為hello的隊列
channel.queue_declare(queue='hello')
已經創(chuàng)建了一個隊列向楼,現(xiàn)在可以將第一條消息發(fā)送到隊列了查吊,我們發(fā)送一個字符串。
# 發(fā)送消息的函數(shù)湖蜕,excheng參數(shù)是交換機逻卖;
# routing_key是隊列的名字;body是發(fā)送消息的內容昭抒。
channel.basic_publish(exchange=' ',
routing_key = 'hello',
body = 'Hello World!')
print ' [x] Sent 'Hello World!'
退出程序之前评也,我們需要確認網絡緩沖已經被刷寫,消息已經投遞到RabbitMQ中灭返,完成這件事情可以這樣
connection.close()
獲取消息#####
我們第二個程序是獲取隊列中的消息盗迟,并打印消息。
在從隊列中獲取消息時熙含,首先也是先連接到RabbitMQ服務器還要確定要獲取消息的隊列是已經存在的隊列罚缕。
# 前面創(chuàng)建連接的代碼和上面是一樣的
channel.queue_declare(queue='hello')
為什么要重復聲明隊列呢?剛在發(fā)送的時候就已經創(chuàng)建隊列了怎静,再次創(chuàng)建會不會是兩個隊列邮弹?之所以創(chuàng)建兩次是因為我們的發(fā)送和接收程序不知道哪一個先運行,如果接收程序中沒有創(chuàng)建隊列消约,先于發(fā)送程序運行肠鲫,就會出現(xiàn)接收消息的隊列不存在。
列出所有隊列#####
$ sudo rabbitmqctl list_queues
在windows中進入rabbitMQ Server的sbin目錄下打開cmd運行rabbitmqctl list_queues
從隊列中獲取消息需要為隊列定義一個回調函數(shù)(callback)或粮。當我們獲取消息的時候就會調用這個回調函數(shù)导饲。這個回掉函數(shù)將接收到的消息輸出到屏幕上。
def callback(ch, method, properties, body):
print " [x] Received %r" % (body, )
下一步氯材,我們需要告訴RabbitMQ這個回調函數(shù)將會從名字為'hello'的隊列中接收消息渣锦。
# queue參數(shù)是隊列的名字;no_ack參數(shù)暫時不知何用
channel.basic_consume(callback,
queue='hello',
no_ack=True)
最后氢哮,我們輸入一個用來等待消息數(shù)據并且在需要的時候運行回調函數(shù)的無限循環(huán)袋毙。
print ' [*] Waiting for message. To exit press CRTL+C'
channel.start_consuming() # 開始循環(huán)
整合#####
send.py
# codin:utf-8
import pika
connection=pika.BlockingConnection(pika.ConnectionParam(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
receiver.py
# coding:utf-8
import pika
connection=pika.BlockingConnection(pika.ConnectionParaer(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
運行#####
從上面的運行結果看,生產者發(fā)送的消息成功被消費者接收冗尤。
待續(xù)听盖。。裂七。####
參考文章:http://wiki.jikexueyuan.com/project/rabbitmq/hello-world.html