任務(wù)隊列
在web開發(fā)的場景中撰寫常規(guī)的代碼,一般是在一個視圖層(view)將所有業(yè)務(wù)邏輯完成之后直接返回數(shù)據(jù)(可以是一個渲染后的html也可以是一組json數(shù)據(jù))給用戶的瀏覽器惦辛;業(yè)務(wù)比較簡單且單一的邏輯處理響應(yīng)時間通常都比較短,但涉及到發(fā)郵件女揭、數(shù)據(jù)爬取稳析、密集計算、復(fù)雜業(yè)務(wù)這種情況耗時較長的操作驯嘱,如果不采取任何措施的話假瞬,那么在用戶這邊最直觀的感受就是提交了請求后頁面就僵在那里了(如果時間大于10秒鐘陕靠,可能就會認(rèn)為出問題了)。
redis的任務(wù)隊列(列表)就是這種耗時等待比較長場景的解決辦法之一笨触,視圖層是一個負(fù)責(zé)塞消息的對象(Producer角色)懦傍,然后在單獨(dú)寫一個獨(dú)立的進(jìn)程來讀取這個列表(Consumer)去完成后續(xù)的工作;那么視圖層就可以直接返回一個頁面給用戶芦劣,告訴用戶這個事情的狀態(tài)是正在處理中粗俱,當(dāng)處理完成之后將狀態(tài)改成已完成,并提供一個鏈接讓用戶可以去查看任務(wù)明細(xì)結(jié)果虚吟。
將一個耗時操作拆開要從好幾個方面進(jìn)行考慮寸认,用戶的等待體驗(yàn)、將相同功能相同作用的代碼抽象出來解耦串慰、簡化邏輯層的代碼復(fù)雜度偏塞。
?
非堵塞讀取(rpop)
queue_rpop/producer.py
import redis
# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)
# 清空所有鍵
for number, key in enumerate(r.keys()):
r.delete(key)
# 準(zhǔn)備數(shù)據(jù)
r.rpush('queue:send_mail', *['3330356463@qq.com', 'zhengtong0898@aliyun.com'])
queue_rpop/consumer1.py
import redis
from datetime import datetime as dt
# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)
while True:
email = r.rpop('queue:send_mail')
print('[{} {} INFO]: {}'.format(
dt.now(),
'queue:send_mail',
email
))
queue_rpop/consumer2.py
import redis
import time
from datetime import datetime as dt
# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)
def send_mail(email):
print('[{} {} INFO]: send mail to {}'.format(
dt.now(),
'queue:send_mail',
email
))
if __name__ == '__main__':
while True:
mail = r.rpop('queue:send_mail')
if not mail:
print('[{} {} INFO]: {}'.format(
dt.now(),
'queue:send_mail',
mail
))
time.sleep(1)
continue
send_mail(mail)
運(yùn)行
# 運(yùn)行consumer1.py
python consumer1.py
# 顯示結(jié)果,這種方式對CPU消耗非常大邦鲫,整體的IO消耗也很高灸叼。
[2017-04-26 18:59:31.822200 queue:send_mail INFO]: None
[2017-04-26 18:59:31.822700 queue:send_mail INFO]: None
[2017-04-26 18:59:31.822700 queue:send_mail INFO]: None
[2017-04-26 18:59:31.823200 queue:send_mail INFO]: None
[2017-04-26 18:59:31.823200 queue:send_mail INFO]: None
[2017-04-26 18:59:31.823700 queue:send_mail INFO]: None
[2017-04-26 18:59:31.824200 queue:send_mail INFO]: None
...
...
...
# 運(yùn)行consumer2.py (為了降低沒有必要的系統(tǒng)消耗)
python consumer2.py
# 再開一個窗口運(yùn)行 producer.py
python producer.py
# 顯示結(jié)果
[2017-04-26 19:15:29.695200 queue:send_mail INFO]: None
[2017-04-26 19:15:30.695700 queue:send_mail INFO]: None
[2017-04-26 19:15:31.695700 queue:send_mail INFO]: None
[2017-04-26 19:15:32.696700 queue:send_mail INFO]: None
[2017-04-26 19:15:33.697200 queue:send_mail INFO]: send mail to 3330356463@qq.com
[2017-04-26 19:15:33.697700 queue:send_mail INFO]: send mail to zhengtong0898@aliyun.com
[2017-04-26 19:15:33.698200 queue:send_mail INFO]: None
[2017-04-26 19:15:34.698700 queue:send_mail INFO]: None
[2017-04-26 19:15:35.699200 queue:send_mail INFO]: None
?
堵塞讀取(brpop)
redis內(nèi)部并沒有采用非堵塞的socket模型,因此初步判斷它是一個堵塞請求庆捺,它的等待機(jī)制是由redis內(nèi)部來負(fù)責(zé)響應(yīng)的古今。堵塞讓資源消耗更少,邏輯撰寫更容易滔以。
queue_brpop/consumer.py
import redis
from datetime import datetime as dt
# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)
def send_mail(email):
print('[{} {} INFO]: send mail to {}'.format(
dt.now(),
'queue:send_mail',
email
))
if __name__ == '__main__':
while True:
queue_name, mail = r.brpop('queue:send_mail')
send_mail(mail)
運(yùn)行
# 運(yùn)行queue_brpop/consumer.py
python queue_brpop/consumer.py
# 運(yùn)行queue_rpop/producer.py
# 顯示結(jié)果(沒有多余的輸出捉腥,挺好!)
[2017-04-26 19:31:23.947200 queue:send_mail INFO]: send mail to 3330356463@qq.com
[2017-04-26 19:31:23.947700 queue:send_mail INFO]: send mail to zhengtong0898@aliyun.com
優(yōu)先級(lpush、blpop)
最普遍和常見的新增列表列表元素是在當(dāng)前列表后面追加元素你画,取值的話則是從左到右讀取抵碟,這種行為在消息隊列中被稱為先進(jìn)先出(FIFO);那也會有一些場景要求后進(jìn)先出(LIFO)坏匪,這就需要利用lpush(從左邊開始插入元素)和blpop(從左邊開始讀取)拟逮。
quque_priority/producer.py
import redis
# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)
# 清空所有鍵
for number, key in enumerate(r.keys()):
r.delete(key)
# 準(zhǔn)備數(shù)據(jù)
r.lpush('queue:send_mail', '3330356463@qq.com')
r.lpush('queue:send_mail', 'zhengtong0898@aliyun.com')
r.lpush('queue:send_mail', 'zhengtong0898@a.com')
r.lpush('queue:send_mail', 'zhengtong0898@b.com')
r.lpush('queue:send_mail', 'zhengtong0898@c.com')
r.lpush('queue:send_mail', 'zhengtong0898@d.com')
r.lpush('queue:send_mail', 'zhengtong0898@e.com')
queue_priority/consumer.py
import redis
from datetime import datetime as dt
# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)
def send_mail(email):
print('[{} {} INFO]: send mail to {}'.format(
dt.now(),
'queue:send_mail',
email
))
if __name__ == '__main__':
while True:
queue_name, mail = r.blpop('queue:send_mail')
send_mail(mail)
運(yùn)行
# 運(yùn)行producer.py,先將數(shù)據(jù)塞到redis列表中(每次都是從左邊插入)
python queue_priority/producer.py
# 運(yùn)行consumer.py适滓,消費(fèi)數(shù)據(jù)(每次都是從左邊讀取)
python queue_priority/consumer.py
# 顯示結(jié)果唱歧,仔細(xì)看就可以看得出來,確實(shí)是后進(jìn)先出(LIFO)
[2017-04-26 20:29:18.735700 queue:send_mail INFO]: send mail to zhengtong0898@e.com
[2017-04-26 20:29:18.735700 queue:send_mail INFO]: send mail to zhengtong0898@d.com
[2017-04-26 20:29:18.736200 queue:send_mail INFO]: send mail to zhengtong0898@c.com
[2017-04-26 20:29:18.736700 queue:send_mail INFO]: send mail to zhengtong0898@b.com
[2017-04-26 20:29:18.736700 queue:send_mail INFO]: send mail to zhengtong0898@a.com
[2017-04-26 20:29:18.737200 queue:send_mail INFO]: send mail to zhengtong0898@aliyun.com
[2017-04-26 20:29:18.737700 queue:send_mail INFO]: send mail to 3330356463@qq.com
?
?
發(fā)布/訂閱
消息隊列是將消息放在列表中,由一個或多個consumer去消費(fèi)列表中不同的元素颅崩;而發(fā)布/訂閱是將消息放在一個頻道中,有一個或多個consumer去消費(fèi)頻道中相同的元素蕊苗;若該頻道中沒有任何consumer沿后,這次producer生產(chǎn)了一條消息放入到頻道中,這條消息將會自動消失朽砰,因此發(fā)布/訂閱模式要求consumer必須時刻運(yùn)行尖滚,否則producer發(fā)送的任何消息都是無效的。
publish_subscribe/producer.py
import redis
from datetime import datetime as dt
# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)
r.publish('channel1.1', 'hi')
r.publish('channel1.1', 'my')
r.publish('channel1.1', 'name')
r.publish('channel1.1', 'is')
r.publish('channel1.1', 'publisher')
publish_subscribe/consumer.py
import redis
import time
from datetime import datetime as dt
# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)
p = r.pubsub()
p.subscribe('channel1.1') # 支持同時訂閱多個頻道: p.subscribe('channel1.1', 'channel1.2')
while True:
msg = p.get_message(ignore_subscribe_messages=True)
if not msg:
time.sleep(1)
print('sleep 1')
continue
print('debug: ', msg)
運(yùn)行
# 打開兩個窗口運(yùn)行兩次 consumer.py瞧柔, 都訂閱'channel1.1'頻道.
python publish_subscribe/consumer.py # 窗口一
python publish_subscribe/consumer.py # 窗口二
# 運(yùn)行producer.py漆弄, 將消息塞到'channel1.1'頻道.
python publish_subscribe/producer.py
# 顯示結(jié)果(兩邊都同時存在)
[2017-04-27 13:01:17.712700 publish/subscribe INFO]: None
[2017-04-27 13:01:18.712700 publish/subscribe INFO]: None
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: None
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: {'data': 'hi', 'type': 'message', 'pattern': None, 'channel': 'channel1.1'}
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: {'data': 'my', 'type': 'message', 'pattern': None, 'channel': 'channel1.1'}
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: {'data': 'name', 'type': 'message', 'pattern': None, 'channel': 'channel1.1'}
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: {'data': 'is', 'type': 'message', 'pattern': None, 'channel': 'channel1.1'}
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: {'data': 'publisher', 'type': 'message', 'pattern': None, 'channel': 'channel1.1'}
[2017-04-27 13:01:20.713200 publish/subscribe INFO]: None
[2017-04-27 13:01:21.714200 publish/subscribe INFO]: None
主題訂閱
除了訂閱多個頻道之外,redis也支持consumer通過glob通配符(psubscribe)來訂閱不同主題的頻道造锅。
publish_psubscribe/producer.py
import redis
from datetime import datetime as dt
# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)
r.publish('channel1.1', 'hi')
r.publish('channel1.2', 'my')
r.publish('channel1.3', 'name')
r.publish('channel1.4', 'is')
r.publish('channel1.5', 'publisher')
publish_psubscribe/consumer.py
import redis
import time
from datetime import datetime as dt
# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)
p = r.pubsub()
p.psubscribe('channel1.*')
while True:
msg = p.get_message(ignore_subscribe_messages=True)
if not msg:
time.sleep(1)
msg = None
print('[{} {} INFO]: {}'.format(
dt.now(),
'publish/psubscribe',
msg
))
運(yùn)行
# 運(yùn)行consumer.py
python publish_psubscribe/consumer.py
# 運(yùn)行producer.py
publish_psubscribe/producer.py
# 顯示結(jié)果
[2017-04-27 13:31:00.683700 publish/psubscribe INFO]: None
[2017-04-27 13:31:01.683700 publish/psubscribe INFO]: None
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: None
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: {'data': 'hi', 'type': 'pmessage', 'pattern': 'channel1.*', 'channel': 'channel1.1'}
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: {'data': 'my', 'type': 'pmessage', 'pattern': 'channel1.*', 'channel': 'channel1.2'}
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: {'data': 'name', 'type': 'pmessage', 'pattern': 'channel1.*', 'channel': 'channel1.3'}
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: {'data': 'is', 'type': 'pmessage', 'pattern': 'channel1.*', 'channel': 'channel1.4'}
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: {'data': 'publisher', 'type': 'pmessage', 'pattern': 'channel1.*', 'channel': 'channel1.5'}
[2017-04-27 13:31:03.683700 publish/psubscribe INFO]: None
[2017-04-27 13:31:04.683700 publish/psubscribe INFO]: None
參考
- [x] 書籍: Redis 入門指南