消息系統(tǒng)允許軟件應(yīng)用相互連接和擴(kuò)展.這些應(yīng)用可以相互鏈接起來組成一個(gè)更大的應(yīng)用,或者將用戶設(shè)備和數(shù)據(jù)進(jìn)行連接.消息系統(tǒng)通過將消息的發(fā)送和接收分離來實(shí)現(xiàn)應(yīng)用程序的異步和解偶.
數(shù)據(jù)投遞痰哨,非阻塞操作或推送通知俊柔、發(fā)布/訂閱框往,異步處理匙赞,或者工作隊(duì)列屡穗。所有這些都屬于消息系統(tǒng)的模式厅目。
消息系統(tǒng)的幾個(gè)有點(diǎn)
- 異步(asynchronous):耗時(shí)的工作可以直接丟給消費(fèi)者番枚,不會(huì)阻塞生產(chǎn)者
- 可擴(kuò)展(scale):消息機(jī)制的工作模式法严,在理論上可以讓無限的消費(fèi)者接入進(jìn)來,使得橫向擴(kuò)展變得異常簡(jiǎn)單
- 模塊化(modulize):生產(chǎn)者和消費(fèi)者不需要知道雙方的存在葫笼,而且可以使用不同的語言和框架深啤,在物理上位于不同的地方。
Exchanges
:生產(chǎn)者把消息發(fā)送到某個(gè) exchange渔欢,exchange 的主要工具就是根據(jù)一定的規(guī)則把消息分發(fā)到不同的 queuesQueues
:消息最終被發(fā)送到的地方墓塌,消費(fèi)者也是從這里拿消息進(jìn)行處理Routing key
:queue 要綁定(binding)到某個(gè) exchange 才有可能接受這個(gè) exchange 轉(zhuǎn)發(fā)過來的消息,它們之間的綁定要有 binding key(當(dāng) exchange 是 fanout 類型的時(shí)候并不需要)奥额。然后每個(gè)消息發(fā)過來的時(shí)候苫幢,都會(huì)帶著 routing key,根據(jù) exchange 的類型垫挨,binding key 和 routing key韩肝,消息才能正確被轉(zhuǎn)發(fā)。Connection
:生產(chǎn)者和消費(fèi)者需要連接到 rabbitmq 才能發(fā)送和讀取消息九榔,這個(gè)連接就是 connection哀峻,本質(zhì)上就是 TCP 連接。Channel
:為了減少網(wǎng)絡(luò)負(fù)載哲泊,和減少 TCP 鏈接數(shù)剩蟀。多個(gè)不同的 生產(chǎn)者可以在同一個(gè) TCP 發(fā)送消息,只不過在這個(gè) connection 下面獨(dú)自的 channel 里切威。你可以把 channel 想象成一根網(wǎng)線里獨(dú)立的細(xì)線育特。每個(gè) channel 都有自己的 id,用來標(biāo)識(shí)自己先朦。雖然有 connection缰冤,不過所有的通信都是在對(duì)應(yīng)的 channel 里進(jìn)行的。vhost
:Virtual host喳魏,是起到隔離作用的棉浸。每一個(gè) vhost 都有自己的 exchanges 和 queues,它們互不影響刺彩。不同的應(yīng)用可以跑在相同的 rabbitmq 上迷郑,使用 vhost 把它們隔離開就行。默認(rèn)情況下创倔,rabbitmq 安裝后三热,默認(rèn)的 vhost 是 /。
1.簡(jiǎn)單的生產(chǎn)者與消費(fèi)者
send.py
\#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
receive.py
\#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
```
####2.工作隊(duì)列
![8.png](http://upload-images.jianshu.io/upload_images/2061490-eb0848fab54e7a0d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
new_task.py
```python
\#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
print " [x] Sent %r" % (message,)
connection.close()
```
worker.py
channel.basic_qos(prefetch_count=1) 保證每次取一個(gè)
channel.queue_declare(queue='task_queue', durable=True) 隊(duì)列持久化
```python
\#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
```
####3.發(fā)布/訂閱
fanout 它把消息發(fā)送給它所知道的所有隊(duì)列
emit_log.py
```python
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print " [x] Sent %r" % (message,)
connection.close()
```
receive_logs.py
當(dāng)與消費(fèi)者(consumer)斷開連接的時(shí)候三幻,這個(gè)隊(duì)列應(yīng)當(dāng)被立即刪除。exclusive 標(biāo)識(shí)符即可達(dá)到此目的呐能。
```
result = channel.queue_declare(exclusive=True)
```
```
\#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r" % (body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
```
===============================================================
3.路由(Routing)
在前面的教程中念搬,我們實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的日志系統(tǒng)抑堡。可以把日志消息廣播給多個(gè)接收者朗徊。
本篇教程中我們打算新增一個(gè)功能 —— 使得它能夠只訂閱消息的一個(gè)字集首妖。例如,我們只需要把嚴(yán)重的錯(cuò)誤日志信息寫入日志文件(存儲(chǔ)到磁盤)爷恳,但同時(shí)仍然把所有的日志信息輸出到控制臺(tái)中
#####綁定(Bindings)
隊(duì)列綁定到路由
```
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
```
綁定的時(shí)候可以帶上一個(gè)額外的 routing_key 參數(shù)有缆。
```
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
```
直連交換機(jī)(Direct exchange)
![Paste_Image.png](http://upload-images.jianshu.io/upload_images/2061490-aa685553d5d00401.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
emit_log_direct.py 的代碼
```
\#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()
```
receive_logs_direct.py 的代碼:
```
\#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
(sys.argv[0],)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
```
==============================================================
####4.主題交換機(jī)
![Paste_Image.png](http://upload-images.jianshu.io/upload_images/2061490-cfe5558b62a179da.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
當(dāng)一個(gè)隊(duì)列的綁定鍵為 "#"(井號(hào)) 的時(shí)候,這個(gè)隊(duì)列將會(huì)無視消息的路由鍵温亲,接收所有的消息棚壁。
當(dāng) * (星號(hào)) 和 # (井號(hào)) 這兩個(gè)特殊字符都未在綁定鍵中出現(xiàn)的時(shí)候,此時(shí)主題交換機(jī)就擁有的直連交換機(jī)的行為栈虚。
emit_log_topic.py 的代碼:
```
\#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()
```
receive_logs_topic.py 的代碼:
```
\#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
```