rabbitmq簡(jiǎn)述

消息系統(tǒng)允許軟件應(yīng)用相互連接和擴(kuò)展.這些應(yīng)用可以相互鏈接起來組成一個(gè)更大的應(yīng)用,或者將用戶設(shè)備和數(shù)據(jù)進(jìn)行連接.消息系統(tǒng)通過將消息的發(fā)送和接收分離來實(shí)現(xiàn)應(yīng)用程序的異步和解偶.
數(shù)據(jù)投遞痰哨,非阻塞操作或推送通知俊柔、發(fā)布/訂閱框往,異步處理匙赞,或者工作隊(duì)列屡穗。所有這些都屬于消息系統(tǒng)的模式厅目。

消息系統(tǒng)的幾個(gè)有點(diǎn)
  • 異步(asynchronous):耗時(shí)的工作可以直接丟給消費(fèi)者番枚,不會(huì)阻塞生產(chǎn)者
  • 可擴(kuò)展(scale):消息機(jī)制的工作模式法严,在理論上可以讓無限的消費(fèi)者接入進(jìn)來,使得橫向擴(kuò)展變得異常簡(jiǎn)單
  • 模塊化(modulize):生產(chǎn)者和消費(fèi)者不需要知道雙方的存在葫笼,而且可以使用不同的語言和框架深啤,在物理上位于不同的地方。
Paste_Image.png
  • Exchanges:生產(chǎn)者把消息發(fā)送到某個(gè) exchange渔欢,exchange 的主要工具就是根據(jù)一定的規(guī)則把消息分發(fā)到不同的 queues

  • Queues:消息最終被發(fā)送到的地方墓塌,消費(fèi)者也是從這里拿消息進(jìn)行處理

  • Routing key:queue 要綁定(binding)到某個(gè) exchange 才有可能接受這個(gè) exchange 轉(zhuǎn)發(fā)過來的消息,它們之間的綁定要有 binding key(當(dāng) exchange 是 fanout 類型的時(shí)候并不需要)奥额。然后每個(gè)消息發(fā)過來的時(shí)候苫幢,都會(huì)帶著 routing key,根據(jù) exchange 的類型垫挨,binding key 和 routing key韩肝,消息才能正確被轉(zhuǎn)發(fā)。

  • Connection:生產(chǎn)者和消費(fèi)者需要連接到 rabbitmq 才能發(fā)送和讀取消息九榔,這個(gè)連接就是 connection哀峻,本質(zhì)上就是 TCP 連接。

  • Channel:為了減少網(wǎng)絡(luò)負(fù)載哲泊,和減少 TCP 鏈接數(shù)剩蟀。多個(gè)不同的 生產(chǎn)者可以在同一個(gè) TCP 發(fā)送消息,只不過在這個(gè) connection 下面獨(dú)自的 channel 里切威。你可以把 channel 想象成一根網(wǎng)線里獨(dú)立的細(xì)線育特。每個(gè) channel 都有自己的 id,用來標(biāo)識(shí)自己先朦。雖然有 connection缰冤,不過所有的通信都是在對(duì)應(yīng)的 channel 里進(jìn)行的。

  • vhost:Virtual host喳魏,是起到隔離作用的棉浸。每一個(gè) vhost 都有自己的 exchanges 和 queues,它們互不影響刺彩。不同的應(yīng)用可以跑在相同的 rabbitmq 上迷郑,使用 vhost 把它們隔離開就行。默認(rèn)情況下创倔,rabbitmq 安裝后三热,默認(rèn)的 vhost 是 /。

1.簡(jiǎn)單的生產(chǎn)者與消費(fèi)者

send.py

\#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()  

receive.py

\#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

channel.start_consuming()  
```
####2.工作隊(duì)列

![8.png](http://upload-images.jianshu.io/upload_images/2061490-eb0848fab54e7a0d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

new_task.py 
```python
\#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print " [x] Sent %r" % (message,)
connection.close()  
```
worker.py
channel.basic_qos(prefetch_count=1) 保證每次取一個(gè)
channel.queue_declare(queue='task_queue', durable=True) 隊(duì)列持久化
```python
\#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()  
```

####3.發(fā)布/訂閱
fanout 它把消息發(fā)送給它所知道的所有隊(duì)列
emit_log.py

```python
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print " [x] Sent %r" % (message,)
connection.close()  

```
receive_logs.py 
當(dāng)與消費(fèi)者(consumer)斷開連接的時(shí)候三幻,這個(gè)隊(duì)列應(yīng)當(dāng)被立即刪除。exclusive 標(biāo)識(shí)符即可達(dá)到此目的呐能。
```
result = channel.queue_declare(exclusive=True)  
```
```
\#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r" % (body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()  
```
===============================================================
3.路由(Routing)
在前面的教程中念搬,我們實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的日志系統(tǒng)抑堡。可以把日志消息廣播給多個(gè)接收者朗徊。

本篇教程中我們打算新增一個(gè)功能 —— 使得它能夠只訂閱消息的一個(gè)字集首妖。例如,我們只需要把嚴(yán)重的錯(cuò)誤日志信息寫入日志文件(存儲(chǔ)到磁盤)爷恳,但同時(shí)仍然把所有的日志信息輸出到控制臺(tái)中
#####綁定(Bindings)
隊(duì)列綁定到路由
```
channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)  
```
綁定的時(shí)候可以帶上一個(gè)額外的 routing_key 參數(shù)有缆。
```
channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black') 
```
直連交換機(jī)(Direct exchange)

![Paste_Image.png](http://upload-images.jianshu.io/upload_images/2061490-aa685553d5d00401.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
emit_log_direct.py 的代碼
```
\#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()  
```
receive_logs_direct.py 的代碼:
```
\#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
                         (sys.argv[0],)
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()  
```
==============================================================

####4.主題交換機(jī)

![Paste_Image.png](http://upload-images.jianshu.io/upload_images/2061490-cfe5558b62a179da.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

當(dāng)一個(gè)隊(duì)列的綁定鍵為 "#"(井號(hào)) 的時(shí)候,這個(gè)隊(duì)列將會(huì)無視消息的路由鍵温亲,接收所有的消息棚壁。

當(dāng) * (星號(hào)) 和 # (井號(hào)) 這兩個(gè)特殊字符都未在綁定鍵中出現(xiàn)的時(shí)候,此時(shí)主題交換機(jī)就擁有的直連交換機(jī)的行為栈虚。
emit_log_topic.py 的代碼:
```
\#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()  
```
receive_logs_topic.py 的代碼:
```
\#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()  
```
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末袖外,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子魂务,更是在濱河造成了極大的恐慌曼验,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,591評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件粘姜,死亡現(xiàn)場(chǎng)離奇詭異鬓照,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)孤紧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門豺裆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人坛芽,你說我怎么就攤上這事留储。” “怎么了咙轩?”我有些...
    開封第一講書人閱讀 162,823評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵获讳,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我活喊,道長(zhǎng)丐膝,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,204評(píng)論 1 292
  • 正文 為了忘掉前任钾菊,我火速辦了婚禮帅矗,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘煞烫。我一直安慰自己浑此,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,228評(píng)論 6 388
  • 文/花漫 我一把揭開白布滞详。 她就那樣靜靜地躺著凛俱,像睡著了一般紊馏。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上蒲犬,一...
    開封第一講書人閱讀 51,190評(píng)論 1 299
  • 那天朱监,我揣著相機(jī)與錄音,去河邊找鬼原叮。 笑死赫编,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的奋隶。 我是一名探鬼主播擂送,決...
    沈念sama閱讀 40,078評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼达布!你這毒婦竟也來了团甲?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,923評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤黍聂,失蹤者是張志新(化名)和其女友劉穎躺苦,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體产还,經(jīng)...
    沈念sama閱讀 45,334評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡匹厘,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,550評(píng)論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了脐区。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片愈诚。...
    茶點(diǎn)故事閱讀 39,727評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖牛隅,靈堂內(nèi)的尸體忽然破棺而出炕柔,到底是詐尸還是另有隱情,我是刑警寧澤媒佣,帶...
    沈念sama閱讀 35,428評(píng)論 5 343
  • 正文 年R本政府宣布匕累,位于F島的核電站,受9級(jí)特大地震影響默伍,放射性物質(zhì)發(fā)生泄漏欢嘿。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,022評(píng)論 3 326
  • 文/蒙蒙 一也糊、第九天 我趴在偏房一處隱蔽的房頂上張望炼蹦。 院中可真熱鬧,春花似錦狸剃、人聲如沸掐隐。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瑟枫。三九已至斗搞,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間慷妙,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工允悦, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留膝擂,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,734評(píng)論 2 368
  • 正文 我出身青樓隙弛,卻偏偏與公主長(zhǎng)得像架馋,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子全闷,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,619評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器叉寂。支持消息的持久化、事務(wù)总珠、擁塞控...
    jiangmo閱讀 10,357評(píng)論 2 34
  • 1. 歷史 RabbitMQ是一個(gè)由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,096評(píng)論 3 51
  • 什么叫消息隊(duì)列 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)屏鳍。消息可以非常簡(jiǎn)單,比如只包含文本字符串局服,也可以更復(fù)雜...
    lijun_m閱讀 1,343評(píng)論 0 1
  • rabbitMQ是一款基于AMQP協(xié)議的消息中間件钓瞭,它能夠在應(yīng)用之間提供可靠的消息傳輸。在易用性淫奔,擴(kuò)展性山涡,高可用性...
    點(diǎn)融黑幫閱讀 2,997評(píng)論 3 41
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)唆迁,斷路器鸭丛,智...
    卡卡羅2017閱讀 134,652評(píng)論 18 139