場景:平時工作中需要用到一些延時操作,同時避免多個人同時操作一個數(shù)據(jù)
1厨相、創(chuàng)建一個redis連接池
import redis
import uuid
import time
import threading
pool = redis.ConnectionPool(host="127.0.0.1", port=6379, decode_responses=True, db=3)
r = redis.Redis(connection_pool=pool)
2领曼、我們這邊用到的是redis里面的Zset(有序數(shù)組)鸥鹉,把任務(wù)根據(jù)延遲的時間進(jìn)行排隊{job:delayTime},然后依次進(jìn)行執(zhí)行
#創(chuàng)建一個延時器
def delayTask(name, delayTime):
task_id = str(uuid.uuid4())
processTime = delayTime + time.time()
#往Zset里面的delay-queue鍵里面加入值
r.zadd("delay-queue", {name + task_id: processTime}
#創(chuàng)建一個任務(wù)函數(shù)
def handleTask(task_id, ):
""" do somethings"""
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),"{0}延遲執(zhí)行完畢".format(task_id, ))
#創(chuàng)建一個loop函數(shù)
def loop():
while True:
#將數(shù)組里面的元素根據(jù)delayTime排隊之后庶骄,在0到當(dāng)前時間范圍內(nèi)循環(huán)取值毁渗,其實每次都只能取到一個,一定是這個時間和當(dāng)前時間一樣了才會被取到
task_list = r.zrangebyscore("delay-queue", 0, time.time(), 0, 1)
#如果為空单刁,證明沒有任務(wù)需要執(zhí)行灸异,進(jìn)入下一個循環(huán)
if not task_list:
print("cost 1s")
time.sleep(1)
continue
else:
#如果有值,就取值
task_id = task_list[0]
# 能刪除成功羔飞,證明被該線程搶到了這個任務(wù)肺樟,其他線程無法刪除,也就無法拿到任務(wù)
is_del = r.zrem("delay-queue", task_id)
if is_del:
handleTask(task_id)
3逻淌、執(zhí)行函數(shù)
if __name__ == '__main__':
#方便演示么伯,我們開2個線程,其實只會有一個線程搶到該任務(wù)
t1 = threading.Thread(target=loop)
t2 = threading.Thread(target=loop)
t1.start()
t2.start()
delayTask("任務(wù)一", 3)
delayTask("任務(wù)二", 1)
delayTask("任務(wù)三", 5)
2024-04-23 11:36:40
2024-04-23 11:36:40
2024-04-23 11:36:41 任務(wù)二7d455dc0-ff89-47de-a170-cbcbaea87c94延遲執(zhí)行完畢
2024-04-23 11:36:41
2024-04-23 11:36:41
2024-04-23 11:36:42
2024-04-23 11:36:42
2024-04-23 11:36:43 任務(wù)一a98fc5fd-2bff-4d6f-8ca9-5c42ab2d42cc延遲執(zhí)行完畢
2024-04-23 11:36:43
2024-04-23 11:36:43
2024-04-23 11:36:44
2024-04-23 11:36:44
2024-04-23 11:36:45 任務(wù)三aa7dc283-f5bc-4f71-b2ab-1e200bb32588延遲執(zhí)行完畢
2024-04-23 11:36:45
2024-04-23 11:36:45
可以看出主程序是40s開始執(zhí)行卡儒,我們在41s田柔,43s,45s分別執(zhí)行了job