重要的主題交換機(jī)
ps:使用pika Python客戶端
之前的日志系統(tǒng) 中使用的直連交換機(jī)代替了扇型交換機(jī)坚嗜。扇型交換機(jī)相當(dāng)于全訂閱,而直連交換機(jī)可以訂閱隊(duì)列感興趣的數(shù)據(jù)铺罢。
但是在我們的日志系統(tǒng)中历葛,我們不只是希望訂閱基于嚴(yán)重程度的日志,同時(shí)還希望訂閱基于發(fā)送來源的日志茴晋。為了實(shí)現(xiàn)這個(gè)目的,下面來看一下更加復(fù)雜的交換機(jī)——主題交換機(jī)回窘。
主交換機(jī)
發(fā)送到主題交換機(jī)(topic exchange)的消息不可以攜帶任意的樣子的路由建诺擅,他的路由鍵必須是一個(gè)由"."分隔開的單詞列表。這些單詞隨便什么都可以啡直,但是最好還是帶一些和消息有關(guān)的詞匯烁涌。
一個(gè)帶有特定路由鍵的消息會(huì)被主題交換機(jī)投遞到綁定與之匹配的隊(duì)列。但是他的綁定鍵和路由鍵有兩個(gè)特殊的應(yīng)用
- *(星號(hào)) 用來表示一個(gè)單詞
- #######(#號(hào))用來表示任意數(shù)量的單詞(0個(gè)或者多個(gè))單詞酒觅。
上圖中的兩個(gè)隊(duì)列Q1和Q2分別綁定了不同的路由鍵撮执。
- Q1對多有的橘黃色的動(dòng)物感興趣
- Q2對所有的兔子感興趣
例如:一個(gè)攜帶quick.orange.rabbit的消息會(huì)被分別投遞到兩個(gè)隊(duì)列。攜帶azy.orange.elephant 的消息同樣也會(huì)給兩個(gè)隊(duì)列都投遞過去阐滩。
注意
主題交換機(jī)很強(qiáng)大二打,他可以完成其交換機(jī)相同的功能,當(dāng)一個(gè)隊(duì)列綁定鍵是井號(hào)的時(shí)候掂榔,這個(gè)隊(duì)列就會(huì)無時(shí)消息的路由鍵,相當(dāng)于扇型交換機(jī)症杏。
組合代碼
emit_log_topic.py
# coding:utf-8
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()
receive_logs_topic.py
# coding:utf-8
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
執(zhí)行下面的命令則會(huì)接收所有的日志文件:
python receive_logs_topic.py "#"
只想下面的命令装获,接收來自"kern"設(shè)備的日志:
python receive_logs_topic.py "hern.*"
執(zhí)行下面命令綁定多個(gè):
python receive_logs_topic.py "kern.*" "*.critical"
執(zhí)行下面的命令將會(huì)發(fā)送路由鍵為"kern.critical"的日志
python emit_log_topic.py "kern.critical" "A critical kernel error"
通過運(yùn)行結(jié)果可以看出,主題交換機(jī)是怎樣將消息發(fā)送到不同的隊(duì)列中去的厉颤。
待續(xù)穴豫。。。
參考文章:http://rabbitmq.mr-ping.com/tutorials_with_python/[5]Topics.html