本文是在學(xué)習(xí)了慕課網(wǎng)中 “
Python
異步任務(wù)隊(duì)列Celery
使用”課程后記錄下來的筆記位喂。由于筆者使用的python版本是python 3.7乡话,所以直接需要在老師的代碼基礎(chǔ)上進(jìn)行一定的修改禀横,才能適配當(dāng)前的環(huán)境匾鸥。
環(huán)境:
python: python3.7
django: 2.1.5
celery: 4.2.0
django-celery: 3.2.2
flower: 0.9.2
kombu: 4.3.0
tornado: 5.1.1
什么是 Celery ?
Celery
是一個簡單印叁、靈活且可靠的仑性,處理大量消息的分布式系統(tǒng)
。專注于實(shí)時處理的
異步任務(wù)隊(duì)列
。同時也支持
任務(wù)調(diào)度
。
使用場景
-
異步任務(wù)
將耗時的操作任務(wù)提交給
Celery
去異步執(zhí)行贫橙,比如發(fā)送短信/郵件琼腔、消息推送、音視頻處理等等 -
定時任務(wù)
類似于
crontab
盗蟆,比如每日數(shù)據(jù)統(tǒng)計(jì)
安裝配置
python 虛擬環(huán)境管理工具
virtualenv & virtualenvwrapper
pyenv
pipenv
venv
-
安裝
Celery
pip install celery[redis]
-
Celery
的消息中間件- RabbitMQ
- Redis
-
創(chuàng)建
Celery App
app = Celery('xxx', backend='xxxxx', broker='xxxxx')
使用 Celery
-
配置
Celery
broker = 'redis://localhost:6379/1' backend = 'redis://localhost:6379/2' app = Celery('my_task', broker=broker, backend=backend)
-
注冊任務(wù)
@app.task # 將 `add` 方法變成異步 def add(x, y): print('enter call function ...') time.sleep(5) return x + y
-
向
Celery
提交任務(wù)- windows 64bit用戶:在任務(wù)腳本下添加以下代碼
import os os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
- 提交任務(wù)
result = add.delay(2, 15) s # 或者使用 apply_async(由參數(shù)組成的元組) # result = add.apply_async((2, 15))
- windows 64bit用戶:在任務(wù)腳本下添加以下代碼
合理分配文件夾管理 celery
任務(wù)
-
在當(dāng)前項(xiàng)目下新建一個
celery_app
的包虫埂,用來存放celery
的task
和config
贷盲, 并在__init__.py
中配置celery
實(shí)例from celery import Celery import os os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1') app = Celery('demo') # 通過 `Celery` 實(shí)例加載配置模塊 app.config_from_object('celery_app.celery_config')
-
在
celery_config.py
中添加celery
的配置# celery_config.py from datetime import timedelta from celery.schedules import crontab BROKER_URL = 'redis://localhost:6379/1' CELERY_RESULT_BACKEND = 'redis://localhost:6379/2' CELERY_TIMEZONE = 'Asia/Shanghai' # 默認(rèn)為 UTC # 導(dǎo)入指定的任務(wù)模塊 CELERY_IMPORTS = { 'celery_app.task1', 'celery_app.task2', } # 指定 `celery` 要執(zhí)行的任務(wù) CELERYBEAT_SCHEDULE = { 'task1': { 'task': 'celery_app.task1.add', 'schedule': timedelta(seconds=500), 'args': (2, 8) }, 'task2': { 'task': 'celery_app.task2.multiply', 'schedule': crontab(hour=16, minute=32), 'args': (4,6) } }
-
編寫
celery
的task
import time from celery_app import app @app.task def add(x, y): time.sleep(3) return x + y
-
在外部腳本中向
celery
提交任務(wù)# app.py from celery_app import task1, task2 task1.add.delay(2 ,5) task2.multiply.apply_async(args=(2, 5)) # 可以添加額外參數(shù)坏匪,比如:指定使用的隊(duì)列
啟動 Celery
-
運(yùn)行
Celery worker
celery worker -A [celery_project_name] -l INFO
參數(shù)說明:
- -A : 指定celery實(shí)例的位置
- -l : 指定日志的級別
注意:在windows 64bit 環(huán)境中運(yùn)行上述命令會報(bào)錯
(venv) c:\Users\jzw\Desktop\dj_celery>celery worker -A tasks -l INFO -------------- celery@DESKTOP-5DO1L05 v4.2.1 (windowlicker) ---- **** ----- --- * *** * -- Windows-10-10.0.17134-SP0 2019-02-10 15:17:01 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: my_task:0x1a932e708d0 - ** ---------- .> transport: redis://localhost:6379/1 - ** ---------- .> results: redis://localhost:6379/2 - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . app.add [2019-02-10 15:17:01,905: INFO/SpawnPoolWorker-1] child process 11932 calling self.run() [2019-02-10 15:17:01,913: INFO/SpawnPoolWorker-2] child process 10864 calling self.run() [2019-02-10 15:17:01,924: INFO/SpawnPoolWorker-3] child process 5544 calling self.run() [2019-02-10 15:17:01,937: INFO/SpawnPoolWorker-4] child process 5236 calling self.run() [2019-02-10 15:17:01,953: INFO/SpawnPoolWorker-6] child process 8684 calling self.run() [2019-02-10 15:17:01,963: INFO/SpawnPoolWorker-5] child process 2660 calling self.run() [2019-02-10 15:17:01,964: INFO/SpawnPoolWorker-7] child process 15968 calling self.run() [2019-02-10 15:17:01,979: INFO/SpawnPoolWorker-8] child process 15808 calling self.run() [2019-02-10 15:17:02,606: INFO/MainProcess] Connected to redis://localhost:6379/1 [2019-02-10 15:17:03,614: INFO/MainProcess] mingle: searching for neighbors [2019-02-10 15:17:07,641: INFO/MainProcess] mingle: all alone [2019-02-10 15:17:12,664: INFO/MainProcess] celery@DESKTOP-5DO1L05 ready. [2019-02-10 15:18:13,158: INFO/MainProcess] Received task: app.add[b7c0cbcc-c9a3-40e5-a7f8-e629bdc6d7e8] [2019-02-10 15:18:14,174: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)') Traceback (most recent call last): File "c:\users\jzw\desktop\dj_celery\venv\lib\site-packages\billiard\pool.py", line 358, in workloop result = (True, prepare_result(fun(*args, **kwargs))) File "c:\users\jzw\desktop\dj_celery\venv\lib\site-packages\celery\app\trace.py", line 537, in _fast_trace_task tasks, accept, hostname = _loc ValueError: not enough values to unpack (expected 3, got 0)
解決方案參考:
解決方案:添加celery運(yùn)行時的環(huán)境變量拟逮,在腳本中添加如下代碼
# task.py import os os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
在Python3.7中運(yùn)行celery worker時出現(xiàn)報(bào)錯以及解決辦法
from . import async, base
^
SyntaxError: invalid syntax
錯誤提出及討論:
解決方案:
https://github.com/celery/celery/pull/4852/commits/d737dec3c943632f21f73a2235409c29e3fe63e3
-
運(yùn)行
celery beat
celery beat -A [celery_project_name] -l INFO
參數(shù)說明:
- -A : 指定celery實(shí)例的位置
- -l : 指定日志的級別
-
同時運(yùn)行
celery worker
和celery beat
(不支持Windows)celery -B -A celery_app worker -l[--loglevel] INFO
-
查看
celery
幫助celery worker --help
在django中使用 celery
- 當(dāng)前環(huán)境
- Windows 64bit
- python 3.7
- Python 包環(huán)境
Package Version ------------- ------- amqp 2.4.1 anyjson 0.3.3 billiard 3.5.0.5 celery 4.2.0 Django 2.1.5 django-celery 3.2.2 kombu 4.3.0
運(yùn)行 python manage.py celery worker -l INFO
時報(bào)錯:
Traceback (most recent call last):
File "manage.py", line 15, in <module>
execute_from_command_line(sys.argv)
File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\django\core\management\__init__.py", line 381, in execute_from_command_line
utility.execute()
File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\django\core\management\__init__.py", line 375, in execute
self.fetch_command(subcommand).run_from_argv(self.argv)
File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\django\core\management\__init__.py", line 224, in fetch_command
klass = load_command_class(app_name, subcommand)
File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\django\core\management\__init__.py", line 36, in load_command_class
module = import_module('%s.management.commands.%s' % (app_name, name))
File "D:\Programs\Python\Python37\lib\importlib\__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\djcelery\management\co
mmands\celery.py", line 11, in <module>
class Command(CeleryCommand):
File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\djcelery\management\co
mmands\celery.py", line 15, in Command
base.get_options() +
TypeError: can only concatenate tuple (not "NoneType") to tuple
參考:
https://stackoverflow.com/questions/49085230/django-celery-typeerror-can-only-concatenate-tuple-not-nonetype-to-tuple
http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
解決方案:
- 將
djcelery\management\commands\celery.py
中的options
部分注釋掉# celery.py from __future__ import absolute_import, unicode_literals from celery.bin import celery from djcelery.app import app from djcelery.management.base import CeleryCommand base = celery.CeleryCommand(app=app) class Command(CeleryCommand): """The celery command.""" help = 'celery commands, see celery help' # options = (CeleryCommand.options + # base.get_options() + # base.preload_options) def run_from_argv(self, argv): argv = self.handle_default_options(argv) base.execute_from_commandline( ['{0[0]} {0[1]}'.format(argv)] + argv[2:], )
- 修改后運(yùn)行
python manage.py celery worker -l INFO
時報(bào)錯,報(bào)錯信息如下:[2019-02-11 01:11:40,836: CRITICAL/MainProcess] Unrecoverable error: SyntaxError('invalid syntax', ('C:\\Users\\jzw\\Desktop\\celery_learn\\dj_celery\\venv\\lib\\site-packages\\cel ery\\backends\\redis.py', 22, 19, 'from . import async, base\n')) Traceback (most recent call last): File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\kombu\utils\objects.py", line 42, in __get__ return obj.__dict__[self.__name__] KeyError: 'backend' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\worker\worker.py", line 205, in start self.blueprint.start(self) File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\bootsteps.py", line 115, in start self.on_start() File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\apps\worker.py", line 139, in on_start self.emit_banner() File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\apps\worker.py", line 154, in emit_banner ' \n', self.startup_info(artlines=not use_image))), File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\apps\worker.py", line 217, in startup_info results=self.app.backend.as_uri(), File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\kombu\utils\objects.py", line 44, in __get__ value = obj.__dict__[self.__name__] = self.__get(obj) File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\app\base.py", line 1196, in backend return self._get_backend() File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\app\base.py", line 914, in _get_backend self.loader) File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\app\backends.py", line 70, in by_url return by_name(backend, loader), url File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\app\backends.py", line 50, in by_name cls = symbol_by_name(backend, aliases) File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\kombu\utils\imports.py", line 56, in symbol_by_name module = imp(module_name, package=package, **kwargs) File "D:\Programs\Python\Python37\lib\importlib\__init__.py", line 127, in import_module return _bootstrap._gcd_import(name[level:], package, level) File "<frozen importlib._bootstrap>", line 1006, in _gcd_import File "<frozen importlib._bootstrap>", line 983, in _find_and_load File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked File "<frozen importlib._bootstrap>", line 677, in _load_unlocked File "<frozen importlib._bootstrap_external>", line 724, in exec_module File "<frozen importlib._bootstrap_external>", line 860, in get_code File "<frozen importlib._bootstrap_external>", line 791, in source_to_code File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\backends\redis.py", line 22 from . import async, base ^ SyntaxError: invalid syntax
說明:這是因?yàn)樵?python 3.7
中將 async
作為了關(guān)鍵字适滓,所以當(dāng) py 文件中出現(xiàn)類似 from . import async, base
這類不符合python語法的語句時唱歧,Python會報(bào)錯。
解決:
在
celery
官方的提議下粒竖,建議將async.py
文件的文件名改成asynchronous
颅崩。所以我們只需要將celery\backends\async.py
改成celery\backends\asynchronous.py
,并且把celery\backends\redis.py
中的所有async
改成asynchronous
就可以了蕊苗。
- 重新運(yùn)行
python manage.py celery worker
命令沿后,celery
的worker
運(yùn)行成功!
- 安裝
pip install django-celery
- 通過
manage.py
腳本來啟動worker
python manage.py celery worker -l INFO -Q [queue_name]
Celery
的監(jiān)控工具: flower
-
安裝
pip install flower
-
啟動
- 正常啟動
celery flower --address=0.0.0.0 --port=5555 --broker=xxx --basic_auth=finlu:finlu
參數(shù)說明:
-
adderss
:flower
服務(wù)的地址 -
port
:flower
服務(wù)的端口 -
broker
: -
basic_auth
:flower
的基本認(rèn)證
- 通過
manage.py
腳本啟動(可以讀取到settings.py
中的配置信息)
python manage.py celery flower
在
windows
上worker
執(zhí)行任務(wù)時崩潰- 報(bào)錯信息
Traceback (most recent call last): File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\billiard\pool.py", line 358, in workloop result = (True, prepare_result(fun(*args, **kwargs))) File "C:\Users\jzw\Desktop\celery_learn\dj_celery\venv\lib\site-packages\celery\app\trace.py", line 537, in _fast_trace_task tasks, accept, hostname = _loc ValueError: not enough values to unpack (expected 3, got 0)
參考:
解決方案:
- 在執(zhí)行
worker
是設(shè)置FORKED_BY_MULTIPROCESSING
環(huán)境變量的值為'1'
(可以通過在settings.py
中添加以下代碼實(shí)現(xiàn)# settings.py import os os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
- 正常啟動
使用 supervisor
來部署管理 celery
-
安裝
supervisor
- 由于
pip
上的supeervisor
只支持python2.x
朽砰, 所以選擇在github上進(jìn)行源碼安裝尖滚。 -
supervisor
項(xiàng)目地址:https://github.com/Supervisor/supervisor
- 由于
-
配置
-
修改
supervisor
基本配置# 將 `supervisor` 的配置存儲在 `conf` 目錄中 mkdir conf echo_supervisord_conf > conf/supervisord.conf # 將 `supervisor` 的默認(rèn)配置重定向到 `supervisord.conf`
在
supervisord.conf
中增加下列配置[unix_http_server] file=/tmp/supervisor.sock ; the path to the socket file [inet_http_server] ; inet (TCP) server disabled by default port=127.0.0.1:9001 ; ip_address:port specifier, *:port for all iface [supervisorctl] serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL for a unix socket [include] files = *.ini
-
在
conf
文件夾下增加進(jìn)程的配置-
supervisor_celery_worker.ini
配置 celery worker[program:celery-worker] command=python manage.py celery worker -l INFO # 運(yùn)行的命令 directory=/home/finlu/celery_learn/dj_celery # 命令運(yùn)行的目錄 enviroment=PATH="/home/finlu/celery_learn/dj_celery/venv/bin" # Python虛擬環(huán)境 stdout_logfile=/home/finlu/celery_learn/dj_celery/logs/celery.worker.log # 輸出日志 stderr_logfile=/home/finlu/celery_learn/dj_celery/logs/celery.worker.log # 錯誤日志 autostart=True # 自動啟動 autorestart=True # 自動重啟 startsecs=10 # 啟動延時 stopwatisecs=60 # 停止延遲 priority=998 # 進(jìn)程優(yōu)先級
-
supervisor_celery_flower.ini
配置 celery flower[program:celery-flower] command=python manage.py celery flower directory=/home/finlu/celery_learn/dj_celery enviroment=PATH="/home/finlu/celery_learn/dj_celery/venv/bin" stdout_logfile=/home/finlu/celery_learn/dj_celery/logs/celery.flower.log stderr_logfile=/home/finlu/celery_learn/dj_celery/logs/celery.flower.log autostart=True autorestart=True startsecs=10 stopwatisecs=60 priority=1000
-
supervisor_celery_beat.ini
# 配置 celery beat[program:celery-beat] command=python manage.py celery beat -l INFO directory=/home/finlu/celery_learn/dj_celery enviroment=PATH="/home/finlu/celery_learn/dj_celery/venv/bin" stdout_logfile=/home/finlu/celery_learn/dj_celery/logs/celery.beat.log stderr_logfile=/home/finlu/celery_learn/dj_celery/logs/celery.beat.log autostart=True autorestart=True startsecs=10 stopwatisecs=60 priority=997
-
-
-
運(yùn)行
supervisor
- 指定配置文件并運(yùn)行
supervisor
supervisord -c conf/supervisord.conf
- 使用
supervisorctl
命令進(jìn)行管理-
update
: 更新配置信息(相當(dāng)于restart)
-
- 指定配置文件并運(yùn)行