摘要
redis常用命令已經(jīng)足夠簡(jiǎn)單律姨,但有些業(yè)務(wù)需求需要批量,增刪查取臼疫,于是就簡(jiǎn)單的封裝了下python-redis择份,同時(shí)對(duì)一些功能方法統(tǒng)一接口,方便調(diào)用
代碼
import time
import redis
#工具類簡(jiǎn)單烫堤,如果是字節(jié)荣赶,轉(zhuǎn)成str
def bytes_to_str(s, encoding='utf-8'):
"""Returns a str if a bytes object is given."""
if isinstance(s, bytes):
return s.decode(encoding)
return s
class RedisClient(object):
def __init__(self, url='redis://@localhost:6379/0'):
self.redis = redis.from_url(url=url)
#保證單例,減少連接數(shù)
def __new__(cls, *args, **kwargs):
if not hasattr(cls, '__instance'):
cls.__instance = object.__new__(cls)
return cls.__instance
# 遍歷隊(duì)列鸽斟,獲取所有keys拔创,用cursor在keys量大時(shí)不會(huì)堵塞redis
def get_all_keys(self, match=None, cursor=0):
result_list = []
while True:
iter_result = self.redis.scan(match=match, cursor=cursor)
cursor = iter_result[0]
result_list.extend(iter_result[1])
if cursor == 0:
return map(bytes_to_str, result_list)
# 獲取隊(duì)列長(zhǎng)度,兼容不同類型的獲取長(zhǎng)度富蓄,同時(shí)可以批量檢測(cè)
def get_len(self, args):
len_dict = dict()
val = None
if not isinstance(args, str):
for key in args:
kind = self.get_kind(key)
if kind == 'hash':
val = self.redis.hlen(key)
elif kind == 'zset':
val = self.redis.zcard(key)
elif kind == 'list':
val = self.redis.llen(key)
elif kind == 'set':
val = self.redis.scard(key)
len_dict[key] = val
return len_dict
else:
dict_len = self.get_len([args])
return dict_len[args]
# 獲取隊(duì)列類型剩燥,同時(shí)兼容批量檢測(cè)類型
def get_kind(self, args):
if isinstance(args, str):
return bytes_to_str(self.redis.type(args))
else:
kind_dict = dict()
for item in args:
kind_dict[item] = bytes_to_str(self.redis.type(item))
return kind_dict
#批量取,因?yàn)楸WC進(jìn)程安全立倍,采取迭代彈出灭红,如果是單進(jìn)程可以批量查然后批量刪
def batch_fetch(self, name, count=1, kind=None):
if not kind:
kind = self.get_kind(name)
pipe = self.redis.pipeline()
if kind == 'set':
while True:
[pipe.spop(name=name) for i in range(count)]
result = pipe.execute()
clean_result = set(result) - set([None])
if clean_result:
yield clean_result
else:
break
elif kind == 'list':
while True:
[pipe.lpop(name=name) for i in range(count)]
result = pipe.execute()
clean_result = set(result) - set([None])
if clean_result:
yield clean_result
else:
break
# 批量查找
def batch_find(self, name, count, kind=None, cursor=0):
if not kind:
kind = self.get_kind(name)
if kind == 'set':
cursor, result = self.redis.sscan(name=name, count=count, cursor=cursor)
yield result
while cursor:
cursor, result = self.redis.sscan(name=name, count=count, cursor=cursor)
yield result
if not cursor:
break
elif kind == 'list':
while True:
start = 0
end = count - 1
result = self.redis.lrange(name=name, start=start, end=end)
yield result
if not result:
break
# 批量刪除
def batch_delete(self, name, value, kind=None, count=None):
if not kind:
kind = self.get_kind(name)
if kind == 'set':
self.redis.srem(name, *value)
elif kind == 'list':
self.redis.ltrim(name=name, start=count, end=-1)
# 批量插入
def batch_insert(self, name, value, kind=None):
if not kind:
kind = self.get_kind(name)
insert_num = 0
if kind == 'set':
insert_num = self.redis.sadd(name, *value)
elif kind == 'list':
insert_num = self.redis.rpush(name, *value)
return insert_num
if __name__ == '__main__':
redis = RedisClient()
bb = redis.get_kind('hhh')
name = '998_info_video_queue'
for i in redis.batch_fetch(name=name, count=10000):
print(i)
# redis.batch_insert(name=name, value=i)
time.sleep(1)
db = redis.redis
pipe = db.pipeline()