最近工作中有這么一種需求,需要定時將三種任務(wù)(假設(shè)任務(wù)為:A能颁、B杂瘸、C)分配到10臺Windows Server中執(zhí)行,而且這三種任務(wù)中還分有優(yōu)先級的(為了簡單就以每種任務(wù)分三種優(yōu)先級為例吧)伙菊。很容易想到這不就是做一個異步調(diào)度嘛败玉,找一個有優(yōu)先級的消息隊列就應(yīng)該可以搞定了【邓叮可以后來發(fā)現(xiàn)目前Python這邊的消息隊列竟然主流不支持Windows运翼,如:RQ、高版本的Celery兴枯,還有優(yōu)先級支持也不是很好血淌,于是乎打算自己造一個。
看了一些相關(guān)的博客念恍,發(fā)現(xiàn)可以用Redis的list結(jié)構(gòu)做隊列六剥,對于優(yōu)先級的支持呢目前我是打算采用這種方式:
每一種任務(wù)每一種優(yōu)先級都單獨放一個隊列存儲(那么三種任務(wù)并且每種任務(wù)三個優(yōu)先級別的話就需要9個Redis隊列)晚顷。
上代碼前先簡單說明一下實現(xiàn)流程峰伙,其實主要就兩個模塊:入隊、出隊该默,說清楚這兩塊就OK了瞳氓。
- 入隊時,定時任務(wù)將A栓袖、B匣摘、C任務(wù)以及它們的優(yōu)先級別傳過來,接著我們對其進(jìn)行判斷裹刮,看各些任務(wù)進(jìn)那些隊列中(也就是各些任務(wù)在Redis隊列中的鍵是什么)音榜。我目前采用這么一種鍵的組合方式:任務(wù)類型-優(yōu)先級(taskType-level),比如:A類型任務(wù)中優(yōu)先級為1的任務(wù)最后進(jìn)入的Redis隊列的鍵為:A-1捧弃,那么優(yōu)先級為100的B類型任務(wù)在Redis隊列中的鍵也就為:B-100赠叼。簡單弄了一張圖,湊合著看吧违霞。
image.png
- 到出隊了嘴办,出隊這邊其實挺簡單,第一種是:如果該Redis的DB下只有我們的任務(wù)买鸽,那么我們把所有的鍵取出來即可涧郊,取出來后可以對鍵按優(yōu)先級排列(像SQL:order by level),或按任務(wù)類型和優(yōu)先級排列(像SQL:order by taskType, level)眼五,排列后得到一個鍵的列表妆艘,再根據(jù)這個鍵的列表去pop任務(wù)即可彤灶。第二種是:我們可以配置某臺客戶端可執(zhí)行的任務(wù)的類型,比如其中一臺電腦我只想讓它跑A類型任務(wù)批旺。那我只給它配置A枢希,這樣讓它去模式匹配Redis中的鍵(A-[0-9]*),這樣取出來的就是A類型的所有優(yōu)先級的任務(wù)了朱沃,如果想讓它跑A苞轿、B任務(wù)就可以循環(huán)匹配嘛。
我也不知道有沒有講清楚這個流程逗物,看代碼吧(代碼寫得丑搬卒,萌新請各位大大多指教)
https://github.com/wikizero/MyScripts/blob/master/forWork/MyRedisQueue.py
# coding:utf-8
import redis
import re
import json
import time
from itertools import chain
from datetime import datetime, date
class ExpandJsonEncoder(json.JSONEncoder):
'''
采用json方式序列化傳入的任務(wù)參數(shù),而原生的json.dumps()方法不支持datetime翎卓、date契邀,這里做了擴(kuò)展
'''
def default(self, obj):
if isinstance(obj, datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(obj, date):
return obj.strftime('%Y-%m-%d')
else:
return json.JSONEncoder.default(self, obj)
class MyRedisQueue:
def __init__(self):
self.redis_connect = redis.Redis()
def get_len(self, key):
keys = self.get_keys(key)
# 每個鍵的任務(wù)數(shù)量
key_len = [(k, self.redis_connect.llen(k)) for k in keys]
# 所有鍵的任務(wù)數(shù)量
task_len = sum(dict(key_len).values())
return task_len, key_len
def get_keys(self, key):
# Redis的鍵支持模式匹配
keys = self.redis_connect.keys(key + '-[0-9]*')
# 按優(yōu)先級將鍵降序排序
keys = sorted(keys, key=lambda x: int(x.split('-')[-1]), reverse=True)
return keys
def push_task(self, key, tasks, level=1):
'''
雙端隊列,左邊推進(jìn)任務(wù)
:param level: 優(yōu)先級(int類型)失暴,數(shù)值越大優(yōu)先級越高坯门,默認(rèn)1
:return: 任務(wù)隊列任務(wù)數(shù)量
'''
# 重新定義優(yōu)先隊列的key
new_key = key + '-' + str(level)
# 序列化任務(wù)參數(shù)
tasks = [json.dumps(t, cls=ExpandJsonEncoder) for t in tasks]
print 'RedisQueue info > the number of push tasks:', len(tasks)
if not tasks:
return self.get_len(key)
self.redis_connect.lpush(new_key, *tasks)
return self.get_len(key)
def pop_task(self, keys=None, priority=False):
'''
雙端隊列 右邊彈出任務(wù)
:param keys: 鍵列表,默認(rèn)為None(將獲取所有任務(wù)的keys)
:return:
'''
while True:
# 避免在while循環(huán)中修改參數(shù)逗扒,將keys參數(shù)賦值到臨時變量
temp_keys = keys
# 不指定keys古戴,將獲取所有任務(wù)
if not keys:
temp_keys = self.redis_connect.keys()
temp_keys = list(set([re.sub('-\d+$', '', k) for k in temp_keys if re.findall('\w+-\d+$', k)]))
# 根據(jù)key作為關(guān)鍵字獲取所有的鍵
all_keys = list(chain(*[self.get_keys(k) for k in temp_keys]))
# 屏蔽任務(wù)差異性,只按優(yōu)先級高到低彈出任務(wù)
if priority:
all_keys = sorted(all_keys, key=lambda x: int(x.split('-')[-1]), reverse=True)
if all_keys:
task_key, task = self.redis_connect.brpop(all_keys)
return task_key, json.loads(task)
time.sleep(2)
if __name__ == '__main__':
mrq = MyRedisQueue()
# 把任務(wù)推入redis 隊列
# lst = [i for i in xrange(0, 40)]
# print mrq.push_task('C', lst, level=4)
# 從redis queue取出任務(wù)
# while True:
# task_type, task = mrq.pop_task(keys=['A', 'B', 'C', 'D', 'E'], priority=True)
# print task_type, task
# time.sleep(1)
# 查看任務(wù)數(shù)量以及優(yōu)先級情況
# count, key_len = mrq.get_len('task')
# print key_len