一、Celery異步分布式
Celery ?是一個(gè)python開發(fā)的異步分布式任務(wù)調(diào)度模塊莫湘,是一個(gè)消息傳輸?shù)闹虚g件,可以理解為一個(gè)郵箱,每當(dāng)應(yīng)用程序調(diào)用celery的異步任務(wù)時(shí)毅糟,會(huì)向broker傳遞消息,然后celery的worker從中取消息
Celery ?用于存儲(chǔ)消息以及celery執(zhí)行的一些消息和結(jié)果
對(duì)于brokers澜公,官方推薦是rabbitmq和redis
對(duì)于backend姆另,也就是指數(shù)據(jù)庫,為了簡單一般使用redis
使用redis連接url格式:
redis://:password@hostname:port/db_number
1)定義連接腳本tasks.py
#!/usr/bin/env pythonfromceleryimportCelerybroker="redis://192.168.2.230:6379/1"backend="redis://192.168.2.230:6379/2"app=Celery("tasks",broker=broker,backend=backend)@app.taskdefadd(x,y):returnx+y
2)安裝啟動(dòng)celery
pip install celery
pip install redis
啟動(dòng)方式:celery -A huang?tasks?-l info ?#-l 等同于 --loglevel
3)執(zhí)行測(cè)試 huang.py?
#!/usr/bin/env pythonfromtasksimportaddre=add.delay(10,20)print(re.result)#任務(wù)返回值print(re.ready)#如果任務(wù)被執(zhí)行返回True,其他情況返回Falseprint(re.get(timeout=1))#帶參數(shù)的等待坟乾,最后返回結(jié)果print(re.status)#任務(wù)當(dāng)前狀態(tài)
運(yùn)行結(jié)果:
30
<bound method AsyncResult.ready of <AsyncResult: d2e0a2d8-cdd9-4fe3-a8bb-81fe3c53ba9a>>
30
SUCCESS
4)根據(jù)成功返回的key或celery界面輸出的信息迹辐,查看redis存儲(chǔ)
說明:停止celery服務(wù),執(zhí)行完huang.py之后甚侣,再啟動(dòng)celery服務(wù)也是有保存數(shù)據(jù)的
二明吩、celery多進(jìn)程
1)配置文件 celeryconfig.py
#!/usr/bin/env python#-*- coding:utf-8 -*-fromkombuimportExchange,QueueBROKER_URL="redis://192.168.2.230:6379/3"CELERY_RESULT_BACKEND="redis://192.168.2.230:6379/4"CELERY_QUEUES=(Queue("default",Exchange("default"),routing_key="default"),Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B"))CELERY_ROUTES={'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"},'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"}}
2)tasks.py
#!/usr/bin/env python#-*- coding:utf-8 -*-fromceleryimportCeleryapp=Celery()app.config_from_object("celeryconfig")@app.taskdeftaskA(x,y):returnx+y? ? @app.taskdeftaskB(x,y,z):returnx+y+z
3)啟動(dòng)celery
celery -A tasks worker --loglevel info
4)執(zhí)行腳本huang2.py
#!/usr/bin/env python#-*- coding:utf-8 -*-fromtasksimporttaskA,taskBre=taskA.delay(10,20)print(re.result)#任務(wù)返回值print(re.ready)#如果任務(wù)被執(zhí)行返回True,其他情況返回Falseprint(re.get(timeout=1))#帶參數(shù)的等待,最后返回結(jié)果print(re.status)#任務(wù)當(dāng)前狀態(tài)re2=taskB.delay(10,20,30)print(re2.result)print(re2.ready)print(re2.get(timeout=1))print(re2.status)
5)運(yùn)行結(jié)果
None
<bound method AsyncResult.ready of <AsyncResult: e34a8490-05a7-473e-a082-f4956cabfc99>>
30
SUCCESS
None
<bound method AsyncResult.ready of <AsyncResult: 3c5cd839-dbe2-4e63-ba4e-86e8c79d943f>>
60
SUCCESS