前段時間,在工作中我們使用azure storage隊(duì)列作為任務(wù)隊(duì)列引擎暇检,但過段時間后我們發(fā)現(xiàn)它并沒有我們希望的那么快,隨之做葵,我們集中地使用redis并開始考慮將redis作為任務(wù)隊(duì)列占哟。雖然有許多的文檔介紹如何使用redis的發(fā)布/訂閱服務(wù),但使用redis作為任務(wù)隊(duì)列的屈指可數(shù)酿矢,所以我決定來描述如何去做榨乎。
什么是任務(wù)隊(duì)列?
任務(wù)隊(duì)列允許某些服務(wù)的客戶端異步發(fā)送任務(wù)給它瘫筐。通常服務(wù)有許多clients蜜暑,可能許多處理的workers〔吒危總之整個工作流程是這樣的:
- client將任務(wù)放到隊(duì)列中
- workers定期循環(huán)檢查隊(duì)列中的新任務(wù)肛捍,如何存在隐绵,則執(zhí)行任務(wù)
但也有一個隊(duì)列一些額外的要求: - 服務(wù)質(zhì)量:client不應(yīng)該阻塞其他client的請求。
- 批處理:clients & workers應(yīng)該能夠獲得多個任務(wù)以獲得更好的性能拙毫。
- 可靠性:如果worker在處理task失敗時依许,該任務(wù)可以被其他worker再次處理。
- 死信:如果某些任務(wù)被多次嘗試后失敗缀蹄,它可以放在死信存儲峭跳。
- 一個任務(wù)僅可以被成功處理一次
每個客戶端將使用一個redis list,list key將使用一個任務(wù)隊(duì)列名作為前綴缺前,第二部分將是client id蛀醉。當(dāng)client準(zhǔn)備把消息放入隊(duì)列前,會將隊(duì)列名和它自己的ID的進(jìn)行關(guān)聯(lián)成列表鍵衅码。我們將使用Redis的list為每個客戶端拯刁。列表鍵將使用一個任務(wù)隊(duì)列名作為前綴和第二部分將是客戶端ID。當(dāng)客戶希望將消息放入隊(duì)列會由隊(duì)列名和它自己的ID的級聯(lián)計(jì)算列表鍵逝段。會有很多的list存入單獨(dú)的redis db中垛玻,但在該情況下,我們須共用一個redis db和其他一些代碼惹恃,同時允許在它們的名字前添加額外的前綴夭谤,如:"queues:"。因此巫糙,我們定義一個類RedisQueue來隱藏這些細(xì)節(jié)。
import json
import datetime
import pytz
from random import randint
import logging
import time
main_prefix = "bqueues:"
class ClientRedisQueue():
def __init__(self, qname, clientid, redis):
self.client_id = clientid
self.queue_name = main_prefix + qname + ":" + clientid
logging.debug("created queue %s" % self.queue_name)
self.redis = redis
def send(self, msgs):
jmsgs = [json.dumps({ "client_id" : self.client_id, "msg" :msg, "attempts" : 0}) for msg in msgs]
self.redis.lpush(self.queue_name, *jmsgs)
def exists(self):
return self.redis.exists(self.queue_name)
def length(self):
return self.redis.llen(self.queue_name)
def delete(self):
self.redis.delete(self.queue_name)
r = redis.StrictRedis("localhost")
cq = ClientRedisQueue("worker1", "client", r)
cq2 = ClientRedisQueue("worker1", "client2", r)
cq.send([1,2])
cq2.send([3,4,0])
所以發(fā)送端容易實(shí)現(xiàn)颊乘,那接受端呢参淹?首先,我們需要找到所有隊(duì)列列表list乏悄。有三種方式:
- 使用KEYS“prefix:*"命令, 該命令能夠列出來所有列表浙值。但這個命令可能會導(dǎo)致生產(chǎn)出現(xiàn)嚴(yán)重的問題,當(dāng)針對大型數(shù)據(jù)庫中執(zhí)行它可能毀性能檩小。所以永遠(yuǎn)不要使用此方式开呐。
- 使用SCAN命令, 該命令的作用相當(dāng)于上一條命令,但沒有性能問題规求。
- 使用redis set存儲所有l(wèi)ist名字筐付,即發(fā)送消息時將list名字添加到redis set中,當(dāng)消息被處理時阻肿,刪除名字瓦戚。不幸的是,該步需要額外的代碼來實(shí)現(xiàn)丛塌,所以我們將使用第二個選項(xiàng)较解。
當(dāng)我們發(fā)現(xiàn)的所有的隊(duì)列畜疾,我們??需要他們隨機(jī)排序以保證所有的list以相同的概率處理。之后印衔,我們需通過redis pipeline的方式啡捶,一次處理一批大量的消息,隨后奸焙,如果沒有找到的消息瞎暑,我們需要刪除它們。此外忿偷,我們需要防止消息的雙重處理列表金顿,并防止消息因失敗等異常情況造成的消息丟失。要做到這一點(diǎn)鲤桥,我們將使用RPOPLPUSH命令揍拆,它以原子從列表中刪除的消息,并把它變成一個額外的“processing”列表茶凳,并返回至調(diào)用者嫂拴。因此,我們將使用其他列表中為每個隊(duì)列與關(guān)鍵“processing:queue_name”贮喧。消息處理后筒狠,我們必須從prccessing列表中刪除。但在幾次不成功的嘗試過程中消息的情況下箱沦,我們需要最終將其移動到死信中辩恼。并將之設(shè)置為:"dead:queue_name"。不時谓形,我們需要檢查的處理列表灶伊,如果算上嘗試的消息低于允許最大計(jì)數(shù)然后把它返回到客戶端列表或在其他情況下,把它設(shè)置成一紙空文寒跳。
AX_ATTEMPTS_COUNT = 4
class WorkerRedisQueue():
def __init__(self, name, redis):
self.queue_name = main_prefix + name
self.processing_lname = main_prefix + "processing:" + name
self.dead_sname = main_prefix + "dead:" + name
self.refresh_period = datetime.timedelta(seconds=2)
self.queue_pattern = self.queue_name + "*"
self.redis = redis
self.last_refresh_time = datetime.datetime.now(pytz.utc) - self.refresh_period - self.refresh_period
self.list_names = []
def proccessed(self, msg):
self.redis.lrem(self.processing_lname, 0, json.dumps(msg))
# start this from time to time
def recover(self):
recovered = 0
proc_msgs = self.redis.lrange(self.processing_lname, -5,-1)
for (msg, orig) in [(json.loads(msg),msg) for msg in proc_msgs if msg]:
if msg["attempts"] > MAX_ATTEMPTS_COUNT:
print "found dead letter"
self.redis.sadd(self.dead_sname, orig)
else:
print "recovering"
recovered = recovered + 1
msg["attempts"] = msg["attempts"] + 1
self.redis.lpush("%s:%s" % (self.queue_name, msg["client_id"]), json.dumps(msg))
self.redis.lrem(self.processing_lname, 0, orig)
return recovered
def get_list_names(self):
lists = []
print "searching pattern", self.queue_pattern
for l in self.redis.scan_iter(self.queue_pattern):
print "found list", l
lists.append(l)
return lists
def refresh(self, force = False):
now = datetime.datetime.now(pytz.utc)
time_to_refresh = now - self.last_refresh_time > self.refresh_period
if force or time_to_refresh:
self.list_names = self.get_list_names()
self.last_refresh_time = now
else:
print "skip refresh"
def receive(self, msg_count):
self.refresh()
count = len(self.list_names)
if count == 0:
print "queues not found"
return []
p = self.redis.pipeline()
for i in range(msg_count):
l = self.list_names[randint(0, count - 1)]
p.rpoplpush(l,self.processing_lname)
msgs = p.execute()
return [json.loads(msg) for msg in msgs if msg]
def length(self):
self.refresh(True)
res = 0
for l in self.list_names:
res = res + self.redis.llen(l)
return res
wq = WorkerRedisQueue("worker1", r)
while(True):
time.sleep(1)
msgs = wq.receive(2)
if len(msgs) == 0:
if randint(0, 10) == 0 and wq.length() == 0 and wq.recover() == 0:
print "sleeping"
time.sleep(1)
for msg in msgs:
print "received msg", msg
try:
a = 10/msg["msg"]
wq.proccessed(msg)
except Exception,e:
print "exception", str(e)
原文:http://hodzanassredin.github.io/2016/03/29/redis_task_queue.html
翻譯:yyt030