翻譯-使用redis做任務(wù)隊(duì)列

前段時間,在工作中我們使用azure storage隊(duì)列作為任務(wù)隊(duì)列引擎暇检,但過段時間后我們發(fā)現(xiàn)它并沒有我們希望的那么快,隨之做葵,我們集中地使用redis并開始考慮將redis作為任務(wù)隊(duì)列占哟。雖然有許多的文檔介紹如何使用redis的發(fā)布/訂閱服務(wù),但使用redis作為任務(wù)隊(duì)列的屈指可數(shù)酿矢,所以我決定來描述如何去做榨乎。

什么是任務(wù)隊(duì)列?

任務(wù)隊(duì)列允許某些服務(wù)的客戶端異步發(fā)送任務(wù)給它瘫筐。通常服務(wù)有許多clients蜜暑,可能許多處理的workers〔吒危總之整個工作流程是這樣的:

  1. client將任務(wù)放到隊(duì)列中
  2. workers定期循環(huán)檢查隊(duì)列中的新任務(wù)肛捍,如何存在隐绵,則執(zhí)行任務(wù)
    但也有一個隊(duì)列一些額外的要求:
  3. 服務(wù)質(zhì)量:client不應(yīng)該阻塞其他client的請求。
  4. 批處理:clients & workers應(yīng)該能夠獲得多個任務(wù)以獲得更好的性能拙毫。
  5. 可靠性:如果worker在處理task失敗時依许,該任務(wù)可以被其他worker再次處理。
  6. 死信:如果某些任務(wù)被多次嘗試后失敗缀蹄,它可以放在死信存儲峭跳。
  7. 一個任務(wù)僅可以被成功處理一次

每個客戶端將使用一個redis list,list key將使用一個任務(wù)隊(duì)列名作為前綴缺前,第二部分將是client id蛀醉。當(dāng)client準(zhǔn)備把消息放入隊(duì)列前,會將隊(duì)列名和它自己的ID的進(jìn)行關(guān)聯(lián)成列表鍵衅码。我們將使用Redis的list為每個客戶端拯刁。列表鍵將使用一個任務(wù)隊(duì)列名作為前綴和第二部分將是客戶端ID。當(dāng)客戶希望將消息放入隊(duì)列會由隊(duì)列名和它自己的ID的級聯(lián)計(jì)算列表鍵逝段。會有很多的list存入單獨(dú)的redis db中垛玻,但在該情況下,我們須共用一個redis db和其他一些代碼惹恃,同時允許在它們的名字前添加額外的前綴夭谤,如:"queues:"。因此巫糙,我們定義一個類RedisQueue來隱藏這些細(xì)節(jié)。

import json
import datetime
import pytz
from random import randint
import logging
import time

main_prefix = "bqueues:"

class ClientRedisQueue():
    def __init__(self, qname, clientid, redis):
        self.client_id = clientid
        self.queue_name = main_prefix + qname + ":" + clientid
        logging.debug("created queue %s" % self.queue_name)
        self.redis = redis

    def send(self, msgs):
        jmsgs = [json.dumps({ "client_id" : self.client_id, "msg" :msg, "attempts" : 0}) for msg in msgs]
        self.redis.lpush(self.queue_name, *jmsgs)

    def exists(self):
        return self.redis.exists(self.queue_name)

    def length(self):
        return self.redis.llen(self.queue_name) 

    def delete(self):
        self.redis.delete(self.queue_name)

r = redis.StrictRedis("localhost")
cq = ClientRedisQueue("worker1", "client", r)

cq2 = ClientRedisQueue("worker1", "client2", r)
cq.send([1,2])
cq2.send([3,4,0]) 

所以發(fā)送端容易實(shí)現(xiàn)颊乘,那接受端呢参淹?首先,我們需要找到所有隊(duì)列列表list乏悄。有三種方式:

  1. 使用KEYS“prefix:*"命令, 該命令能夠列出來所有列表浙值。但這個命令可能會導(dǎo)致生產(chǎn)出現(xiàn)嚴(yán)重的問題,當(dāng)針對大型數(shù)據(jù)庫中執(zhí)行它可能毀性能檩小。所以永遠(yuǎn)不要使用此方式开呐。
  2. 使用SCAN命令, 該命令的作用相當(dāng)于上一條命令,但沒有性能問題规求。
  3. 使用redis set存儲所有l(wèi)ist名字筐付,即發(fā)送消息時將list名字添加到redis set中,當(dāng)消息被處理時阻肿,刪除名字瓦戚。不幸的是,該步需要額外的代碼來實(shí)現(xiàn)丛塌,所以我們將使用第二個選項(xiàng)较解。

當(dāng)我們發(fā)現(xiàn)的所有的隊(duì)列畜疾,我們??需要他們隨機(jī)排序以保證所有的list以相同的概率處理。之后印衔,我們需通過redis pipeline的方式啡捶,一次處理一批大量的消息,隨后奸焙,如果沒有找到的消息瞎暑,我們需要刪除它們。此外忿偷,我們需要防止消息的雙重處理列表金顿,并防止消息因失敗等異常情況造成的消息丟失。要做到這一點(diǎn)鲤桥,我們將使用RPOPLPUSH命令揍拆,它以原子從列表中刪除的消息,并把它變成一個額外的“processing”列表茶凳,并返回至調(diào)用者嫂拴。因此,我們將使用其他列表中為每個隊(duì)列與關(guān)鍵“processing:queue_name”贮喧。消息處理后筒狠,我們必須從prccessing列表中刪除。但在幾次不成功的嘗試過程中消息的情況下箱沦,我們需要最終將其移動到死信中辩恼。并將之設(shè)置為:"dead:queue_name"。不時谓形,我們需要檢查的處理列表灶伊,如果算上嘗試的消息低于允許最大計(jì)數(shù)然后把它返回到客戶端列表或在其他情況下,把它設(shè)置成一紙空文寒跳。

AX_ATTEMPTS_COUNT = 4
class WorkerRedisQueue():
    def __init__(self, name, redis):
        self.queue_name = main_prefix + name
        self.processing_lname = main_prefix + "processing:" + name
        self.dead_sname = main_prefix + "dead:" + name
        self.refresh_period = datetime.timedelta(seconds=2)
        self.queue_pattern = self.queue_name + "*"
        self.redis = redis
        self.last_refresh_time = datetime.datetime.now(pytz.utc) - self.refresh_period - self.refresh_period
        self.list_names = []

    def proccessed(self, msg):
        self.redis.lrem(self.processing_lname, 0, json.dumps(msg))

    # start this from time to time
    def recover(self):
        recovered = 0
        proc_msgs = self.redis.lrange(self.processing_lname, -5,-1)
        for (msg, orig) in [(json.loads(msg),msg) for msg in proc_msgs if msg]:
            if msg["attempts"] > MAX_ATTEMPTS_COUNT:
                print "found dead letter"
                self.redis.sadd(self.dead_sname, orig)
            else:
                print "recovering"
                recovered = recovered + 1
                msg["attempts"] = msg["attempts"] + 1
                self.redis.lpush("%s:%s" % (self.queue_name, msg["client_id"]), json.dumps(msg))
            self.redis.lrem(self.processing_lname, 0, orig)

        return recovered

    def get_list_names(self):
        lists = []
        print "searching pattern", self.queue_pattern
        for l in self.redis.scan_iter(self.queue_pattern):
            print "found list", l
            lists.append(l)
        return lists

    def refresh(self, force = False):
        now = datetime.datetime.now(pytz.utc)
        time_to_refresh = now - self.last_refresh_time > self.refresh_period
        if force or time_to_refresh:
            self.list_names = self.get_list_names()
            self.last_refresh_time = now
        else:
            print "skip refresh"

    def receive(self, msg_count):
        self.refresh()
        count = len(self.list_names)
        if count == 0:
            print "queues not found"
            return []
        p = self.redis.pipeline()
        for i in range(msg_count):
            l = self.list_names[randint(0, count - 1)]
            p.rpoplpush(l,self.processing_lname)
        msgs = p.execute()
        return [json.loads(msg) for msg in msgs if msg]

    def length(self):
        self.refresh(True)
        res = 0
        for l in self.list_names:
            res = res + self.redis.llen(l)
        return res

wq = WorkerRedisQueue("worker1", r)
while(True):
    time.sleep(1)
    msgs = wq.receive(2)
    if len(msgs) == 0:
        if randint(0, 10) == 0 and wq.length() == 0 and wq.recover() == 0:
            print "sleeping"
            time.sleep(1)
            
    for msg in msgs:
        print "received msg", msg
        try:
            a = 10/msg["msg"]
            wq.proccessed(msg)
        except Exception,e: 
            print "exception", str(e)   

原文:http://hodzanassredin.github.io/2016/03/29/redis_task_queue.html
翻譯:yyt030

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末聘萨,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子童太,更是在濱河造成了極大的恐慌米辐,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,816評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件书释,死亡現(xiàn)場離奇詭異翘贮,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)征冷,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評論 3 385
  • 文/潘曉璐 我一進(jìn)店門择诈,熙熙樓的掌柜王于貴愁眉苦臉地迎上來样漆,“玉大人讶迁,你說我怎么就攤上這事「孤拢” “怎么了?”我有些...
    開封第一講書人閱讀 158,300評論 0 348
  • 文/不壞的土叔 我叫張陵齿穗,是天一觀的道長傲隶。 經(jīng)常有香客問我,道長窃页,這世上最難降的妖魔是什么跺株? 我笑而不...
    開封第一講書人閱讀 56,780評論 1 285
  • 正文 為了忘掉前任,我火速辦了婚禮脖卖,結(jié)果婚禮上乒省,老公的妹妹穿的比我還像新娘。我一直安慰自己畦木,他們只是感情好袖扛,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,890評論 6 385
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著十籍,像睡著了一般蛆封。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上勾栗,一...
    開封第一講書人閱讀 50,084評論 1 291
  • 那天惨篱,我揣著相機(jī)與錄音,去河邊找鬼围俘。 笑死砸讳,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的界牡。 我是一名探鬼主播绣夺,決...
    沈念sama閱讀 39,151評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼欢揖!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起奋蔚,我...
    開封第一講書人閱讀 37,912評論 0 268
  • 序言:老撾萬榮一對情侶失蹤她混,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后泊碑,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體坤按,經(jīng)...
    沈念sama閱讀 44,355評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,666評論 2 327
  • 正文 我和宋清朗相戀三年馒过,在試婚紗的時候發(fā)現(xiàn)自己被綠了臭脓。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,809評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡腹忽,死狀恐怖来累,靈堂內(nèi)的尸體忽然破棺而出砚作,到底是詐尸還是另有隱情,我是刑警寧澤嘹锁,帶...
    沈念sama閱讀 34,504評論 4 334
  • 正文 年R本政府宣布葫录,位于F島的核電站,受9級特大地震影響领猾,放射性物質(zhì)發(fā)生泄漏米同。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,150評論 3 317
  • 文/蒙蒙 一摔竿、第九天 我趴在偏房一處隱蔽的房頂上張望面粮。 院中可真熱鬧,春花似錦继低、人聲如沸熬苍。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽冷溃。三九已至,卻和暖如春梦裂,著一層夾襖步出監(jiān)牢的瞬間似枕,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評論 1 267
  • 我被黑心中介騙來泰國打工年柠, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留凿歼,地道東北人。 一個月前我還...
    沈念sama閱讀 46,628評論 2 362
  • 正文 我出身青樓冗恨,卻偏偏與公主長得像答憔,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子掀抹,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,724評論 2 351

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理虐拓,服務(wù)發(fā)現(xiàn),斷路器傲武,智...
    卡卡羅2017閱讀 134,638評論 18 139
  • 摘自http://xiaoh.me/2016/06/30/redis-advanced/ 排序 redis支持對l...
    鴕鳥要抬頭閱讀 66,420評論 1 3
  • 分布式緩存技術(shù)PK:選擇Redis還是Memcached揪利? 經(jīng)平臺同意授權(quán)轉(zhuǎn)載 作者:田京昆(騰訊后臺研發(fā)工程師)...
    meng_philip123閱讀 68,920評論 7 60
  • 安全性 設(shè)置客戶端連接后進(jìn)行任何其他指令前需要使用的密碼态兴。 警告:因?yàn)閞edis 速度相當(dāng)快,所以在一臺比較好的服...
    OzanShareing閱讀 1,682評論 1 7
  • 11月10日疟位,金溪縣錦繡小學(xué)“翁志紅工作室”成員們在王勝文主席的帶領(lǐng)下瞻润,如約來到琉璃鄉(xiāng)黃源小學(xué)送教。黃源小學(xué)三面環(huán)...
    aling8920閱讀 251評論 0 0