redis06消息隊列

任務(wù)隊列

在web開發(fā)的場景中撰寫常規(guī)的代碼,一般是在一個視圖層(view)將所有業(yè)務(wù)邏輯完成之后直接返回數(shù)據(jù)(可以是一個渲染后的html也可以是一組json數(shù)據(jù))給用戶的瀏覽器惦辛;業(yè)務(wù)比較簡單且單一的邏輯處理響應(yīng)時間通常都比較短,但涉及到發(fā)郵件女揭、數(shù)據(jù)爬取稳析、密集計算、復(fù)雜業(yè)務(wù)這種情況耗時較長的操作驯嘱,如果不采取任何措施的話假瞬,那么在用戶這邊最直觀的感受就是提交了請求后頁面就僵在那里了(如果時間大于10秒鐘陕靠,可能就會認(rèn)為出問題了)。

redis的任務(wù)隊列(列表)就是這種耗時等待比較長場景的解決辦法之一笨触,視圖層是一個負(fù)責(zé)塞消息的對象(Producer角色)懦傍,然后在單獨(dú)寫一個獨(dú)立的進(jìn)程來讀取這個列表(Consumer)去完成后續(xù)的工作;那么視圖層就可以直接返回一個頁面給用戶芦劣,告訴用戶這個事情的狀態(tài)是正在處理中粗俱,當(dāng)處理完成之后將狀態(tài)改成已完成,并提供一個鏈接讓用戶可以去查看任務(wù)明細(xì)結(jié)果虚吟。

將一個耗時操作拆開要從好幾個方面進(jìn)行考慮寸认,用戶的等待體驗(yàn)、將相同功能相同作用的代碼抽象出來解耦串慰、簡化邏輯層的代碼復(fù)雜度偏塞。

?

非堵塞讀取(rpop)

queue_rpop/producer.py

import redis

# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)

# 清空所有鍵
for number, key in enumerate(r.keys()):
    r.delete(key)

# 準(zhǔn)備數(shù)據(jù)
r.rpush('queue:send_mail', *['3330356463@qq.com', 'zhengtong0898@aliyun.com'])

queue_rpop/consumer1.py

import redis
from datetime import datetime as dt

# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)


while True:
    email = r.rpop('queue:send_mail')
    print('[{} {} INFO]: {}'.format(
        dt.now(),
        'queue:send_mail',
        email
    ))

queue_rpop/consumer2.py

import redis
import time
from datetime import datetime as dt

# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)


def send_mail(email):
    print('[{} {} INFO]: send mail to {}'.format(
        dt.now(),
        'queue:send_mail',
        email
    ))

if __name__ == '__main__':

    while True:
        mail = r.rpop('queue:send_mail')
        if not mail:

            print('[{} {} INFO]: {}'.format(
                dt.now(),
                'queue:send_mail',
                mail
            ))

            time.sleep(1)
            continue
        send_mail(mail)

運(yùn)行

# 運(yùn)行consumer1.py
python consumer1.py
# 顯示結(jié)果,這種方式對CPU消耗非常大邦鲫,整體的IO消耗也很高灸叼。
[2017-04-26 18:59:31.822200 queue:send_mail INFO]: None
[2017-04-26 18:59:31.822700 queue:send_mail INFO]: None
[2017-04-26 18:59:31.822700 queue:send_mail INFO]: None
[2017-04-26 18:59:31.823200 queue:send_mail INFO]: None
[2017-04-26 18:59:31.823200 queue:send_mail INFO]: None
[2017-04-26 18:59:31.823700 queue:send_mail INFO]: None
[2017-04-26 18:59:31.824200 queue:send_mail INFO]: None
...
...
...


# 運(yùn)行consumer2.py (為了降低沒有必要的系統(tǒng)消耗)
python consumer2.py

# 再開一個窗口運(yùn)行 producer.py
python producer.py

# 顯示結(jié)果
[2017-04-26 19:15:29.695200 queue:send_mail INFO]: None
[2017-04-26 19:15:30.695700 queue:send_mail INFO]: None
[2017-04-26 19:15:31.695700 queue:send_mail INFO]: None
[2017-04-26 19:15:32.696700 queue:send_mail INFO]: None
[2017-04-26 19:15:33.697200 queue:send_mail INFO]: send mail to 3330356463@qq.com
[2017-04-26 19:15:33.697700 queue:send_mail INFO]: send mail to zhengtong0898@aliyun.com
[2017-04-26 19:15:33.698200 queue:send_mail INFO]: None
[2017-04-26 19:15:34.698700 queue:send_mail INFO]: None
[2017-04-26 19:15:35.699200 queue:send_mail INFO]: None

?

堵塞讀取(brpop)

redis內(nèi)部并沒有采用非堵塞的socket模型,因此初步判斷它是一個堵塞請求庆捺,它的等待機(jī)制是由redis內(nèi)部來負(fù)責(zé)響應(yīng)的古今。堵塞讓資源消耗更少,邏輯撰寫更容易滔以。

queue_brpop/consumer.py

import redis
from datetime import datetime as dt

# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)


def send_mail(email):
    print('[{} {} INFO]: send mail to {}'.format(
        dt.now(),
        'queue:send_mail',
        email
    ))

if __name__ == '__main__':

    while True:
        queue_name, mail = r.brpop('queue:send_mail')
        send_mail(mail)

運(yùn)行

# 運(yùn)行queue_brpop/consumer.py
python queue_brpop/consumer.py

# 運(yùn)行queue_rpop/producer.py

# 顯示結(jié)果(沒有多余的輸出捉腥,挺好!)
[2017-04-26 19:31:23.947200 queue:send_mail INFO]: send mail to 3330356463@qq.com
[2017-04-26 19:31:23.947700 queue:send_mail INFO]: send mail to zhengtong0898@aliyun.com
優(yōu)先級(lpush、blpop)

最普遍和常見的新增列表列表元素是在當(dāng)前列表后面追加元素你画,取值的話則是從左到右讀取抵碟,這種行為在消息隊列中被稱為先進(jìn)先出(FIFO);那也會有一些場景要求后進(jìn)先出(LIFO)坏匪,這就需要利用lpush(從左邊開始插入元素)和blpop(從左邊開始讀取)拟逮。

quque_priority/producer.py

import redis

# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)

# 清空所有鍵
for number, key in enumerate(r.keys()):
    r.delete(key)

# 準(zhǔn)備數(shù)據(jù)
r.lpush('queue:send_mail', '3330356463@qq.com')
r.lpush('queue:send_mail', 'zhengtong0898@aliyun.com')
r.lpush('queue:send_mail', 'zhengtong0898@a.com')
r.lpush('queue:send_mail', 'zhengtong0898@b.com')
r.lpush('queue:send_mail', 'zhengtong0898@c.com')
r.lpush('queue:send_mail', 'zhengtong0898@d.com')
r.lpush('queue:send_mail', 'zhengtong0898@e.com')

queue_priority/consumer.py

import redis
from datetime import datetime as dt

# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)


def send_mail(email):
    print('[{} {} INFO]: send mail to {}'.format(
        dt.now(),
        'queue:send_mail',
        email
    ))

if __name__ == '__main__':

    while True:
        queue_name, mail = r.blpop('queue:send_mail')
        send_mail(mail)

運(yùn)行

# 運(yùn)行producer.py,先將數(shù)據(jù)塞到redis列表中(每次都是從左邊插入)
python queue_priority/producer.py

# 運(yùn)行consumer.py适滓,消費(fèi)數(shù)據(jù)(每次都是從左邊讀取)
python queue_priority/consumer.py

# 顯示結(jié)果唱歧,仔細(xì)看就可以看得出來,確實(shí)是后進(jìn)先出(LIFO)
[2017-04-26 20:29:18.735700 queue:send_mail INFO]: send mail to zhengtong0898@e.com
[2017-04-26 20:29:18.735700 queue:send_mail INFO]: send mail to zhengtong0898@d.com
[2017-04-26 20:29:18.736200 queue:send_mail INFO]: send mail to zhengtong0898@c.com
[2017-04-26 20:29:18.736700 queue:send_mail INFO]: send mail to zhengtong0898@b.com
[2017-04-26 20:29:18.736700 queue:send_mail INFO]: send mail to zhengtong0898@a.com
[2017-04-26 20:29:18.737200 queue:send_mail INFO]: send mail to zhengtong0898@aliyun.com
[2017-04-26 20:29:18.737700 queue:send_mail INFO]: send mail to 3330356463@qq.com

?
?

發(fā)布/訂閱

消息隊列是將消息放在列表中,由一個或多個consumer去消費(fèi)列表中不同的元素颅崩;而發(fā)布/訂閱是將消息放在一個頻道中,有一個或多個consumer去消費(fèi)頻道中相同的元素蕊苗;若該頻道中沒有任何consumer沿后,這次producer生產(chǎn)了一條消息放入到頻道中,這條消息將會自動消失朽砰,因此發(fā)布/訂閱模式要求consumer必須時刻運(yùn)行尖滚,否則producer發(fā)送的任何消息都是無效的。

publish_subscribe/producer.py

import redis
from datetime import datetime as dt

# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)

r.publish('channel1.1', 'hi')
r.publish('channel1.1', 'my')
r.publish('channel1.1', 'name')
r.publish('channel1.1', 'is')
r.publish('channel1.1', 'publisher')


publish_subscribe/consumer.py

import redis
import time
from datetime import datetime as dt

# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)
p = r.pubsub()
p.subscribe('channel1.1')    # 支持同時訂閱多個頻道: p.subscribe('channel1.1', 'channel1.2')

while True:
    msg = p.get_message(ignore_subscribe_messages=True)
    if not msg:
        time.sleep(1)
        print('sleep 1')
        continue
    print('debug: ', msg)


運(yùn)行

# 打開兩個窗口運(yùn)行兩次 consumer.py瞧柔, 都訂閱'channel1.1'頻道.
python publish_subscribe/consumer.py # 窗口一
python publish_subscribe/consumer.py # 窗口二

# 運(yùn)行producer.py漆弄, 將消息塞到'channel1.1'頻道.
python publish_subscribe/producer.py


# 顯示結(jié)果(兩邊都同時存在)
[2017-04-27 13:01:17.712700 publish/subscribe INFO]: None
[2017-04-27 13:01:18.712700 publish/subscribe INFO]: None
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: None
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: {'data': 'hi', 'type': 'message', 'pattern': None, 'channel': 'channel1.1'}
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: {'data': 'my', 'type': 'message', 'pattern': None, 'channel': 'channel1.1'}
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: {'data': 'name', 'type': 'message', 'pattern': None, 'channel': 'channel1.1'}
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: {'data': 'is', 'type': 'message', 'pattern': None, 'channel': 'channel1.1'}
[2017-04-27 13:01:19.712700 publish/subscribe INFO]: {'data': 'publisher', 'type': 'message', 'pattern': None, 'channel': 'channel1.1'}
[2017-04-27 13:01:20.713200 publish/subscribe INFO]: None
[2017-04-27 13:01:21.714200 publish/subscribe INFO]: None

主題訂閱

除了訂閱多個頻道之外,redis也支持consumer通過glob通配符(psubscribe)來訂閱不同主題的頻道造锅。

publish_psubscribe/producer.py

import redis
from datetime import datetime as dt

# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)

r.publish('channel1.1', 'hi')
r.publish('channel1.2', 'my')
r.publish('channel1.3', 'name')
r.publish('channel1.4', 'is')
r.publish('channel1.5', 'publisher')

publish_psubscribe/consumer.py

import redis
import time
from datetime import datetime as dt

# 連接redis
r = redis.StrictRedis('192.168.1.124', decode_responses=True)
p = r.pubsub()
p.psubscribe('channel1.*')

while True:
    msg = p.get_message(ignore_subscribe_messages=True)
    if not msg:
        time.sleep(1)
        msg = None

    print('[{} {} INFO]: {}'.format(
        dt.now(),
        'publish/psubscribe',
        msg
    ))


運(yùn)行

# 運(yùn)行consumer.py
python publish_psubscribe/consumer.py

# 運(yùn)行producer.py
publish_psubscribe/producer.py

# 顯示結(jié)果
[2017-04-27 13:31:00.683700 publish/psubscribe INFO]: None
[2017-04-27 13:31:01.683700 publish/psubscribe INFO]: None
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: None
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: {'data': 'hi', 'type': 'pmessage', 'pattern': 'channel1.*', 'channel': 'channel1.1'}
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: {'data': 'my', 'type': 'pmessage', 'pattern': 'channel1.*', 'channel': 'channel1.2'}
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: {'data': 'name', 'type': 'pmessage', 'pattern': 'channel1.*', 'channel': 'channel1.3'}
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: {'data': 'is', 'type': 'pmessage', 'pattern': 'channel1.*', 'channel': 'channel1.4'}
[2017-04-27 13:31:02.683700 publish/psubscribe INFO]: {'data': 'publisher', 'type': 'pmessage', 'pattern': 'channel1.*', 'channel': 'channel1.5'}
[2017-04-27 13:31:03.683700 publish/psubscribe INFO]: None
[2017-04-27 13:31:04.683700 publish/psubscribe INFO]: None

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末撼唾,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子哥蔚,更是在濱河造成了極大的恐慌倒谷,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件糙箍,死亡現(xiàn)場離奇詭異渤愁,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)深夯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進(jìn)店門抖格,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人咕晋,你說我怎么就攤上這事雹拄。” “怎么了捡需?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵办桨,是天一觀的道長。 經(jīng)常有香客問我站辉,道長呢撞,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任饰剥,我火速辦了婚禮殊霞,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘汰蓉。我一直安慰自己绷蹲,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著祝钢,像睡著了一般比规。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上拦英,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天蜒什,我揣著相機(jī)與錄音,去河邊找鬼疤估。 笑死灾常,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的铃拇。 我是一名探鬼主播钞瀑,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼慷荔!你這毒婦竟也來了雕什?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤拧廊,失蹤者是張志新(化名)和其女友劉穎监徘,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體吧碾,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡凰盔,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了倦春。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片户敬。...
    茶點(diǎn)故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖睁本,靈堂內(nèi)的尸體忽然破棺而出尿庐,到底是詐尸還是另有隱情,我是刑警寧澤呢堰,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布抄瑟,位于F島的核電站,受9級特大地震影響枉疼,放射性物質(zhì)發(fā)生泄漏皮假。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一骂维、第九天 我趴在偏房一處隱蔽的房頂上張望惹资。 院中可真熱鬧,春花似錦航闺、人聲如沸褪测。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽侮措。三九已至懈叹,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間萝毛,已是汗流浹背项阴。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留笆包,地道東北人。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓略荡,卻偏偏與公主長得像庵佣,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子汛兜,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理巴粪,服務(wù)發(fā)現(xiàn),斷路器粥谬,智...
    卡卡羅2017閱讀 134,659評論 18 139
  • 1. 歷史 RabbitMQ是一個由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,096評論 3 51
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,469評論 0 34
  • 背景介紹 Kafka簡介 Kafka是一種分布式的肛根,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計目標(biāo)如下: 以時間復(fù)雜度為O...
    高廣超閱讀 12,835評論 8 167
  • Kafka系列一- Kafka背景及架構(gòu)介紹 Kafka簡介 Kafka是一種分布式的漏策,基于發(fā)布/訂閱的消息系統(tǒng)柠逞。...
    raincoffee閱讀 2,209評論 0 22