Redis實現(xiàn)簡單消息隊列

任務(wù)異步化

打開瀏覽器阶界,輸入地址,按下回車坛梁,打開了頁面而姐。于是一個HTTP請求(request)就由客戶端發(fā)送到服務(wù)器,服務(wù)器處理請求划咐,返回響應(yīng)(response)內(nèi)容拴念。

我們每天都在瀏覽網(wǎng)頁,發(fā)送大大小小的請求給服務(wù)器褐缠。有時候政鼠,服務(wù)器接到了請求,會發(fā)現(xiàn)他也需要給另外的服務(wù)器發(fā)送請求送丰,或者服務(wù)器也需要做另外一些事情缔俄,于是最初們發(fā)送的請求就被阻塞了弛秋,也就是要等待服務(wù)器完成其他的事情器躏。

更多的時候,服務(wù)器做的額外事情蟹略,并不需要客戶端等待登失,這時候就可以把這些額外的事情異步去做。從事異步任務(wù)的工具有很多挖炬。主要原理還是處理通知消息揽浙,針對通知消息通常采取是隊列結(jié)構(gòu)。生產(chǎn)和消費消息進(jìn)行通信和業(yè)務(wù)實現(xiàn)。

生產(chǎn)消費與隊列

上述異步任務(wù)的實現(xiàn)馅巷,可以抽象為生產(chǎn)者消費模型膛虫。如同一個餐館,廚師在做飯钓猬,吃貨在吃飯稍刀。如果廚師做了很多,暫時賣不完敞曹,廚師就會休息账月;如果客戶很多,廚師馬不停蹄的忙碌澳迫,客戶則需要慢慢等待局齿。實現(xiàn)生產(chǎn)者和消費者的方式用很多,下面使用Python標(biāo)準(zhǔn)庫Queue寫個小例子:


import random
import time
from Queue import Queue
from threading import Thread

queue = Queue(10)

class Producer(Thread):
    def run(self):
        while True:
            elem = random.randrange(9)
            queue.put(elem)
            print "廚師 {} 做了 {} 飯 --- 還剩 {} 飯沒賣完".format(self.name, elem, queue.qsize())
            time.sleep(random.random())

class Consumer(Thread):
    def run(self):
        while True:
            elem = queue.get()
            print "吃貨{} 吃了 {} 飯 --- 還有 {} 飯可以吃".format(self.name, elem, queue.qsize())
            time.sleep(random.random())

def main():
    for i in range(3):
        p = Producer()
        p.start()
    for i in range(2):
        c = Consumer()
        c.start()

if __name__ == '__main__':
    main()

大概輸出如下:

廚師 Thread-1 做了 1 飯 --- 還剩 1 飯沒賣完
廚師 Thread-2 做了 8 飯 --- 還剩 2 飯沒賣完
廚師 Thread-3 做了 3 飯 --- 還剩 3 飯沒賣完
吃貨Thread-4 吃了 1 飯 --- 還有 2 飯可以吃
吃貨Thread-5 吃了 8 飯 --- 還有 1 飯可以吃
吃貨Thread-4 吃了 3 飯 --- 還有 0 飯可以吃
廚師 Thread-1 做了 0 飯 --- 還剩 1 飯沒賣完
廚師 Thread-2 做了 0 飯 --- 還剩 2 飯沒賣完
廚師 Thread-1 做了 1 飯 --- 還剩 3 飯沒賣完
廚師 Thread-1 做了 1 飯 --- 還剩 4 飯沒賣完
吃貨Thread-4 吃了 0 飯 --- 還有 3 飯可以吃
廚師 Thread-3 做了 3 飯 --- 還剩 4 飯沒賣完
吃貨Thread-5 吃了 0 飯 --- 還有 3 飯可以吃
吃貨Thread-5 吃了 1 飯 --- 還有 2 飯可以吃
廚師 Thread-2 做了 8 飯 --- 還剩 3 飯沒賣完
廚師 Thread-2 做了 8 飯 --- 還剩 4 飯沒賣完

Redis 隊列

Python內(nèi)置了一個好用的隊列結(jié)構(gòu)橄登。我們也可以是用redis實現(xiàn)類似的操作抓歼。并做一個簡單的異步任務(wù)。

Redis提供了兩種方式來作消息隊列示绊。一個是使用生產(chǎn)者消費模式模式锭部,另外一個方法就是發(fā)布訂閱者模式。前者會讓一個或者多個客戶端監(jiān)聽消息隊列面褐,一旦消息到達(dá)拌禾,消費者馬上消費,誰先搶到算誰的展哭,如果隊列里沒有消息湃窍,則消費者繼續(xù)監(jiān)聽。后者也是一個或多個客戶端訂閱消息頻道匪傍,只要發(fā)布者發(fā)布消息您市,所有訂閱者都能收到消息,訂閱者都是平等的役衡。

生產(chǎn)消費模式(不建議使用)

主要使用了redis提供的blpop獲取隊列數(shù)據(jù)茵休,如果隊列沒有數(shù)據(jù)則阻塞等待,也就是監(jiān)聽手蝎。


import redis

class Task(object):
    def __init__(self):
        self.rcon = redis.StrictRedis(host='localhost', db=5)
        self.queue = 'task:prodcons:queue'

    def listen_task(self):
        while True:
            task = self.rcon.blpop(self.queue, 0)[1]
            print "Task get", task

if __name__ == '__main__':
    print 'listen task queue'
    Task().listen_task()

使用redis的brpop方式做隊列榕莺,經(jīng)過一段時間,會發(fā)現(xiàn)程序莫名其妙的卡主棵介。也就是進(jìn)程一切ok钉鸯,redis的lpush也正常,唯獨brpop不再消費邮辽。該問題十分不好復(fù)現(xiàn)唠雕,但是總是過了一段時間就會重現(xiàn)贸营。本人采用了高并發(fā),高延遲岩睁,弱網(wǎng)絡(luò)環(huán)境等方式試圖復(fù)現(xiàn)都沒有成功钞脂,目前仍然在尋找解決方案。目測依賴redis做brokers的隊列的celery也遇到同樣的問題 捕儒,并且其他語言也有類似問題芳肌,但是作者的解決方不適用。猜測問題的原因是redis在處理brpop的時候連接長時間不適用會自動假死肋层。后來采用比較low的方案亿笤,每當(dāng)凌晨3點左右重啟一下隊列服務(wù)。目前設(shè)置了更短的idle連接時間(config set timeout 10)栋猖,再觀察一下是否能復(fù)現(xiàn)垒拢。
不建議在生成環(huán)境使用該方案喻括。如果使用類似方案也遇到了問題赦拘,并且有了解決方案忽冻,希望您能聯(lián)系我哈哈哈。
升級了 redis 3.2 版本之后雌团,運行了一個多月燃领,目前沒有再出現(xiàn)卡住的問題。

發(fā)布訂閱模式

使用redis的pubsub功能锦援,訂閱者訂閱頻道猛蔽,發(fā)布者發(fā)布消息到頻道了,頻道就是一個消息隊列灵寺。

import redis


class Task(object):

    def __init__(self):
        self.rcon = redis.StrictRedis(host='localhost', db=5)
        self.ps = self.rcon.pubsub()
        self.ps.subscribe('task:pubsub:channel')

    def listen_task(self):
        for i in self.ps.listen():
            if i['type'] == 'message':
                print "Task get", i['data']

if __name__ == '__main__':
    print 'listen task channel'
    Task().listen_task()

Flask 入口

我們分別實現(xiàn)了兩種異步任務(wù)的后端服務(wù)曼库,直接啟動他們,就能監(jiān)聽redis隊列或頻道的消息了略板。簡單的測試如下:



import redis
import random
import logging
from flask import Flask, redirect

app = Flask(__name__)

rcon = redis.StrictRedis(host='localhost', db=5)
prodcons_queue = 'task:prodcons:queue'
pubsub_channel = 'task:pubsub:channel'

@app.route('/')
def index():

    html = """
<br>
<center><h3>Redis Message Queue</h3>
<br>
<a href="/prodcons">生產(chǎn)消費者模式</a>
<br>
<br>
<a href="/pubsub">發(fā)布訂閱者模式</a>
</center>
"""
    return html


@app.route('/prodcons')
def prodcons():
    elem = random.randrange(10)
    rcon.lpush(prodcons_queue, elem)
    logging.info("lpush {} -- {}".format(prodcons_queue, elem))
    return redirect('/')

@app.route('/pubsub')
def pubsub():
    ps = rcon.pubsub()
    ps.subscribe(pubsub_channel)
    elem = random.randrange(10)
    rcon.publish(pubsub_channel, elem)
    return redirect('/')

if __name__ == '__main__':
    app.run(debug=True)

啟動腳本毁枯,使用

siege -c10 -r 5 http://127.0.0.1:5000/prodcons
siege -c10 -r 5 http://127.0.0.1:5000/pubsub

可以分別在監(jiān)聽的腳本輸入中看到異步消息。在異步的任務(wù)中叮称,可以執(zhí)行一些耗時間的操作种玛,當(dāng)然目前這些做法并不知道異步的執(zhí)行結(jié)果,如果需要知道異步的執(zhí)行結(jié)果瓤檐,可以考慮設(shè)計協(xié)程任務(wù)或者使用一些工具如RQ或者celery等赂韵。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市距帅,隨后出現(xiàn)的幾起案子右锨,更是在濱河造成了極大的恐慌括堤,老刑警劉巖碌秸,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件绍移,死亡現(xiàn)場離奇詭異,居然都是意外死亡讥电,警方通過查閱死者的電腦和手機蹂窖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來恩敌,“玉大人瞬测,你說我怎么就攤上這事【琅冢” “怎么了月趟?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長恢口。 經(jīng)常有香客問我孝宗,道長,這世上最難降的妖魔是什么耕肩? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任因妇,我火速辦了婚禮,結(jié)果婚禮上猿诸,老公的妹妹穿的比我還像新娘婚被。我一直安慰自己,他們只是感情好梳虽,可當(dāng)我...
    茶點故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布址芯。 她就那樣靜靜地躺著,像睡著了一般窜觉。 火紅的嫁衣襯著肌膚如雪是复。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天竖螃,我揣著相機與錄音淑廊,去河邊找鬼。 笑死特咆,一個胖子當(dāng)著我的面吹牛季惩,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播腻格,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼画拾,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了菜职?” 一聲冷哼從身側(cè)響起青抛,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎酬核,沒想到半個月后蜜另,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體适室,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年举瑰,在試婚紗的時候發(fā)現(xiàn)自己被綠了捣辆。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡此迅,死狀恐怖汽畴,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情耸序,我是刑警寧澤忍些,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站坎怪,受9級特大地震影響坐昙,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜芋忿,卻給世界環(huán)境...
    茶點故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一炸客、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧戈钢,春花似錦痹仙、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至薪铜,卻和暖如春众弓,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背隔箍。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工谓娃, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人蜒滩。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓滨达,卻偏偏與公主長得像,于是被迫代替她去往敵國和親俯艰。 傳聞我的和親對象是個殘疾皇子捡遍,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)竹握,斷路器画株,智...
    卡卡羅2017閱讀 134,599評論 18 139
  • 1 消息隊列概述 消息隊列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合,異步消息谓传,流量削鋒等問題蜈项。實現(xiàn)高性能,...
    Bobby0322閱讀 10,847評論 0 24
  • 消息隊列設(shè)計精要 消息隊列已經(jīng)逐漸成為企業(yè)IT系統(tǒng)內(nèi)部通信的核心手段良拼。它具有低耦合、可靠投遞充边、廣播庸推、流量控制、最終...
    meng_philip123閱讀 1,505評論 1 25
  • 昏昏欲睡浇冰,下載了簡書贬媒,朋友推薦,試著玩玩吧肘习。晚安际乘,看到的朋友們,做個美夢漂佩。
    幻辰魅汐閱讀 169評論 0 0
  • 123 test test'
    年小糯閱讀 170評論 0 0