基于Redis實現(xiàn)無限級優(yōu)先級隊列(Python代碼)

最近工作中有這么一種需求,需要定時將三種任務(wù)(假設(shè)任務(wù)為:A能颁、B杂瘸、C)分配到10臺Windows Server中執(zhí)行,而且這三種任務(wù)中還分有優(yōu)先級的(為了簡單就以每種任務(wù)分三種優(yōu)先級為例吧)伙菊。很容易想到這不就是做一個異步調(diào)度嘛败玉,找一個有優(yōu)先級的消息隊列就應(yīng)該可以搞定了【邓叮可以后來發(fā)現(xiàn)目前Python這邊的消息隊列竟然主流不支持Windows运翼,如:RQ、高版本的Celery兴枯,還有優(yōu)先級支持也不是很好血淌,于是乎打算自己造一個。
看了一些相關(guān)的博客念恍,發(fā)現(xiàn)可以用Redis的list結(jié)構(gòu)做隊列六剥,對于優(yōu)先級的支持呢目前我是打算采用這種方式:

每一種任務(wù)每一種優(yōu)先級都單獨放一個隊列存儲(那么三種任務(wù)并且每種任務(wù)三個優(yōu)先級別的話就需要9個Redis隊列)晚顷。

上代碼前先簡單說明一下實現(xiàn)流程峰伙,其實主要就兩個模塊:入隊、出隊该默,說清楚這兩塊就OK了瞳氓。

  1. 入隊時,定時任務(wù)將A栓袖、B匣摘、C任務(wù)以及它們的優(yōu)先級別傳過來,接著我們對其進(jìn)行判斷裹刮,看各些任務(wù)進(jìn)那些隊列中(也就是各些任務(wù)在Redis隊列中的鍵是什么)音榜。我目前采用這么一種鍵的組合方式:任務(wù)類型-優(yōu)先級(taskType-level),比如:A類型任務(wù)中優(yōu)先級為1的任務(wù)最后進(jìn)入的Redis隊列的鍵為:A-1捧弃,那么優(yōu)先級為100的B類型任務(wù)在Redis隊列中的鍵也就為:B-100赠叼。簡單弄了一張圖,湊合著看吧违霞。
image.png
  1. 到出隊了嘴办,出隊這邊其實挺簡單,第一種是:如果該Redis的DB下只有我們的任務(wù)买鸽,那么我們把所有的鍵取出來即可涧郊,取出來后可以對鍵按優(yōu)先級排列(像SQL:order by level),或按任務(wù)類型和優(yōu)先級排列(像SQL:order by taskType, level)眼五,排列后得到一個鍵的列表妆艘,再根據(jù)這個鍵的列表去pop任務(wù)即可彤灶。第二種是:我們可以配置某臺客戶端可執(zhí)行的任務(wù)的類型,比如其中一臺電腦我只想讓它跑A類型任務(wù)批旺。那我只給它配置A枢希,這樣讓它去模式匹配Redis中的鍵(A-[0-9]*),這樣取出來的就是A類型的所有優(yōu)先級的任務(wù)了朱沃,如果想讓它跑A苞轿、B任務(wù)就可以循環(huán)匹配嘛。
    我也不知道有沒有講清楚這個流程逗物,看代碼吧(代碼寫得丑搬卒,萌新請各位大大多指教)
    https://github.com/wikizero/MyScripts/blob/master/forWork/MyRedisQueue.py
# coding:utf-8
import redis
import re
import json
import time
from itertools import chain
from datetime import datetime, date


class ExpandJsonEncoder(json.JSONEncoder):
    '''
        采用json方式序列化傳入的任務(wù)參數(shù),而原生的json.dumps()方法不支持datetime翎卓、date契邀,這里做了擴(kuò)展
    '''

    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.strftime('%Y-%m-%d %H:%M:%S')
        elif isinstance(obj, date):
            return obj.strftime('%Y-%m-%d')
        else:
            return json.JSONEncoder.default(self, obj)


class MyRedisQueue:

    def __init__(self):
        self.redis_connect = redis.Redis()

    def get_len(self, key):
        keys = self.get_keys(key)
        # 每個鍵的任務(wù)數(shù)量
        key_len = [(k, self.redis_connect.llen(k)) for k in keys]
        # 所有鍵的任務(wù)數(shù)量
        task_len = sum(dict(key_len).values())
        return task_len, key_len

    def get_keys(self, key):
        # Redis的鍵支持模式匹配
        keys = self.redis_connect.keys(key + '-[0-9]*')
        # 按優(yōu)先級將鍵降序排序
        keys = sorted(keys, key=lambda x: int(x.split('-')[-1]), reverse=True)
        return keys

    def push_task(self, key, tasks, level=1):
        '''
        雙端隊列,左邊推進(jìn)任務(wù)
        :param level: 優(yōu)先級(int類型)失暴,數(shù)值越大優(yōu)先級越高坯门,默認(rèn)1
        :return: 任務(wù)隊列任務(wù)數(shù)量
        '''
        # 重新定義優(yōu)先隊列的key
        new_key = key + '-' + str(level)
        # 序列化任務(wù)參數(shù)
        tasks = [json.dumps(t, cls=ExpandJsonEncoder) for t in tasks]

        print 'RedisQueue info > the number of push tasks:', len(tasks)

        if not tasks:
            return self.get_len(key)

        self.redis_connect.lpush(new_key, *tasks)
        return self.get_len(key)

    def pop_task(self, keys=None, priority=False):
        '''
        雙端隊列 右邊彈出任務(wù)
        :param keys: 鍵列表,默認(rèn)為None(將獲取所有任務(wù)的keys)
        :return:
        '''
        while True:
            # 避免在while循環(huán)中修改參數(shù)逗扒,將keys參數(shù)賦值到臨時變量
            temp_keys = keys

            # 不指定keys古戴,將獲取所有任務(wù)
            if not keys:
                temp_keys = self.redis_connect.keys()
                temp_keys = list(set([re.sub('-\d+$', '', k) for k in temp_keys if re.findall('\w+-\d+$', k)]))

            # 根據(jù)key作為關(guān)鍵字獲取所有的鍵
            all_keys = list(chain(*[self.get_keys(k) for k in temp_keys]))

            # 屏蔽任務(wù)差異性,只按優(yōu)先級高到低彈出任務(wù)
            if priority:
                all_keys = sorted(all_keys, key=lambda x: int(x.split('-')[-1]), reverse=True)

            if all_keys:
                task_key, task = self.redis_connect.brpop(all_keys)
                return task_key, json.loads(task)
            time.sleep(2)


if __name__ == '__main__':
    mrq = MyRedisQueue()

    # 把任務(wù)推入redis 隊列
    # lst = [i for i in xrange(0, 40)]
    # print mrq.push_task('C', lst, level=4)

    # 從redis queue取出任務(wù)
    # while True:
    #     task_type, task = mrq.pop_task(keys=['A', 'B', 'C', 'D', 'E'], priority=True)
    #     print task_type, task
    #     time.sleep(1)

    # 查看任務(wù)數(shù)量以及優(yōu)先級情況
    # count, key_len = mrq.get_len('task')
    # print key_len


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末矩肩,一起剝皮案震驚了整個濱河市现恼,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌黍檩,老刑警劉巖叉袍,帶你破解...
    沈念sama閱讀 222,252評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異刽酱,居然都是意外死亡喳逛,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,886評論 3 399
  • 文/潘曉璐 我一進(jìn)店門棵里,熙熙樓的掌柜王于貴愁眉苦臉地迎上來润文,“玉大人,你說我怎么就攤上這事衍慎∽Γ” “怎么了?”我有些...
    開封第一講書人閱讀 168,814評論 0 361
  • 文/不壞的土叔 我叫張陵稳捆,是天一觀的道長赠法。 經(jīng)常有香客問我,道長,這世上最難降的妖魔是什么砖织? 我笑而不...
    開封第一講書人閱讀 59,869評論 1 299
  • 正文 為了忘掉前任款侵,我火速辦了婚禮,結(jié)果婚禮上侧纯,老公的妹妹穿的比我還像新娘新锈。我一直安慰自己,他們只是感情好眶熬,可當(dāng)我...
    茶點故事閱讀 68,888評論 6 398
  • 文/花漫 我一把揭開白布妹笆。 她就那樣靜靜地躺著,像睡著了一般娜氏。 火紅的嫁衣襯著肌膚如雪拳缠。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,475評論 1 312
  • 那天贸弥,我揣著相機(jī)與錄音窟坐,去河邊找鬼。 笑死绵疲,一個胖子當(dāng)著我的面吹牛哲鸳,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播盔憨,決...
    沈念sama閱讀 41,010評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼徙菠,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了般渡?” 一聲冷哼從身側(cè)響起懒豹,我...
    開封第一講書人閱讀 39,924評論 0 277
  • 序言:老撾萬榮一對情侶失蹤芙盘,失蹤者是張志新(化名)和其女友劉穎驯用,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體儒老,經(jīng)...
    沈念sama閱讀 46,469評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡蝴乔,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,552評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了驮樊。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片薇正。...
    茶點故事閱讀 40,680評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖囚衔,靈堂內(nèi)的尸體忽然破棺而出挖腰,到底是詐尸還是另有隱情,我是刑警寧澤练湿,帶...
    沈念sama閱讀 36,362評論 5 351
  • 正文 年R本政府宣布猴仑,位于F島的核電站,受9級特大地震影響肥哎,放射性物質(zhì)發(fā)生泄漏辽俗。R本人自食惡果不足惜疾渣,卻給世界環(huán)境...
    茶點故事閱讀 42,037評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望崖飘。 院中可真熱鬧榴捡,春花似錦、人聲如沸朱浴。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,519評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽翰蠢。三九已至街夭,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間躏筏,已是汗流浹背板丽。 一陣腳步聲響...
    開封第一講書人閱讀 33,621評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留趁尼,地道東北人埃碱。 一個月前我還...
    沈念sama閱讀 49,099評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像酥泞,于是被迫代替她去往敵國和親砚殿。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,691評論 2 361

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理芝囤,服務(wù)發(fā)現(xiàn)似炎,斷路器,智...
    卡卡羅2017閱讀 134,713評論 18 139
  • 愛民謠的小孩 又怎么離開 說不清楚的人世 我懷抱著滿心歡喜 我是選擇銷聲匿跡 可最終還是無人問及 二十歲的年紀(jì) 是...
    惺心閱讀 230評論 0 0
  • 1 2016年的最后一天下午,公司基本上沒有人悯许,同事們都早早開始休假仆嗦,準(zhǔn)備過元旦了。而我先壕,呆呆的在椅子上坐了好一會...
    一晨思閱讀 287評論 1 2
  • 類似于上面布局的布局顯示 用tableView 或者 collectionview 來寫上面部分,很多人可能會用t...
    wsj2012閱讀 493評論 1 1
  • 一到周三 就內(nèi)心惶恐 因為小豬巴士里 這一天的廣播有一檔節(jié)目講靈異故事的 今晚煮面時 細(xì)碎輕微不懈的敲門聲響起 我...
    骨碌圓閱讀 155評論 0 0