由于工作需要,開一個celery源碼筆記的坑帚呼。
啟動
從github上下載源碼打開后掏缎,可看到源碼的結構如下:
打開setup.py文件,在文件的最后可以看到
因此煤杀,可以分析出celery的入口是
celery/__main__.py
文件的main函數(shù)眷蜈,函數(shù)定義如下
def main():
"""Entrypoint to the ``celery`` umbrella command."""
if 'multi' not in sys.argv:
maybe_patch_concurrency()
from celery.bin.celery import main as _main
_main()
這里可以看到,main函數(shù)會調(diào)用celery.bin.celery
模塊的main()函數(shù)沈自,轉到定義酌儒,在代碼中可以看到這里的主要邏輯為
cmd = CeleryCommand() # 創(chuàng)建CeleryComman對象
cmd.execute_from_commandline(argv) # 從命令行啟動
CeleryCommand
對象在celery/bin/celery.py
文件,這里可以看到CeleryCommand
繼承自Command
類(該類的聲明celery/bin/base.py
文件酥泛,在很多類都是由該類派生出來今豆,以后會提到)嫌拣,由于CeleryCommand
并沒有實現(xiàn)自己的__init__
函數(shù)柔袁,因此會調(diào)用Command
類的__init__
函數(shù)進行初始化
def __init__(self, app=None, get_app=None, no_color=False,
stdout=None, stderr=None, quiet=False, on_error=None,
on_usage_error=None):
self.app = app
self.get_app = get_app or self._get_default_app
self.stdout = stdout or sys.stdout
self.stderr = stderr or sys.stderr
self._colored = None
self._no_color = no_color
self.quiet = quiet
if not self.description:
self.description = self._strip_restructeredtext(self.__doc__)
if on_error:
self.on_error = on_error
if on_usage_error:
self.on_usage_error = on_usage_error
這里我們可以看到__init__
執(zhí)行進行了一些簡單的初始化工作。接下來分析execute_from_commandline
函數(shù)
def execute_from_commandline(self, argv=None):
argv = sys.argv if argv is None else argv
if 'multi' in argv[1:3]: # Issue 1008
self.respects_app_option = False
try:
sys.exit(determine_exit_status(
super(CeleryCommand, self).execute_from_commandline(argv)))
except KeyboardInterrupt:
sys.exit(EX_FAILURE)
在CeleryCommand
的execute_from_commandline
函數(shù)中异逐,我們可以看到這里調(diào)用了Command
類的execute_from_commandline
函數(shù)
def execute_from_commandline(self, argv=None):
"""Execute application from command-line.
Arguments:
argv (List[str]): The list of command-line arguments.
Defaults to ``sys.argv``.
"""
if argv is None:
argv = list(sys.argv)
# Should we load any special concurrency environment?
self.maybe_patch_concurrency(argv)
self.on_concurrency_setup()
# Dump version and exit if '--version' arg set.
self.early_version(argv)
argv = self.setup_app_from_commandline(argv) # 解析命令行參數(shù)并創(chuàng)建Celery實例
self.prog_name = os.path.basename(argv[0])
return self.handle_argv(self.prog_name, argv[1:]) # 調(diào)用當前對象的handle_argv函數(shù)
在該函數(shù)中會調(diào)用setup_app_from_commandline
解析命令行參數(shù)并創(chuàng)建應用(用戶的app也是在這一步被加載)捶索,之后調(diào)用handle_argv
函數(shù)繼續(xù)處理,這里需要注意灰瞻,代碼中調(diào)用的handle_argv
函數(shù)是CeleryCommand
中定義的腥例,接下來我們分析handle_argv
函數(shù)。
def handle_argv(self, prog_name, argv, **kwargs):
self.prog_name = self.prepare_prog_name(prog_name)
argv = self._relocate_args_from_start(argv)
_, argv = self.prepare_args(None, argv)
try:
command = argv[0]
except IndexError:
command, argv = 'help', ['help']
return self.execute(command, argv)
這里可以看到酝润,在解析了參數(shù)之后燎竖,調(diào)用了execute
函數(shù),其中第一個參數(shù)為命令行參數(shù)中解析出來的要销,按照官網(wǎng)的示例构回,這里的字符串為"worker",(后面的分析都暫時認為command的值是"worker")疏咐。之后進入到execute
函數(shù)中
def execute(self, command, argv=None):
try:
cls = self.commands[command]
except KeyError:
cls, argv = self.commands['help'], ['help']
cls = self.commands.get(command) or self.commands['help'] # 根據(jù)傳入的command字符串獲取對應的類
try:
return cls(
app=self.app, on_error=self.on_error,
no_color=self.no_color, quiet=self.quiet,
on_usage_error=partial(self.on_usage_error, command=command),
).run_from_argv(self.prog_name, argv[1:], command=argv[0]) # 初始化并啟動實例
except self.UsageError as exc:
self.on_usage_error(exc)
return exc.status
except self.Error as exc:
self.on_error(exc)
return exc.status
這里我們看到execute
函數(shù)主要做了兩件事纤掸,一是根據(jù)傳入的command查找類;二是創(chuàng)建上一步的類的實例并啟動浑塞。
轉到worker
類的定義借跪,在文件celery/bin/worker.py
中,可以看到酌壕,該類也是繼承自Command
類掏愁,worker
類實例的初始化也是調(diào)用Command
類的__init__
歇由,初始化完成后會調(diào)用run_from_argv
啟動,該函數(shù)只是回調(diào)了一下當前對象的handle_argv
函數(shù)果港。由于worker
類沒有重寫handle_argv
印蓖,因此會調(diào)用Command
類中的該函數(shù)。
def handle_argv(self, prog_name, argv, command=None):
"""Parse arguments from argv and dispatch to :meth:`run`.
Warning:
Exits with an error message if :attr:`supports_args` is disabled
and ``argv`` contains positional arguments.
Arguments:
prog_name (str): The program name (``argv[0]``).
argv (List[str]): Rest of command-line arguments.
"""
options, args = self.prepare_args(
*self.parse_options(prog_name, argv, command))
return self(*args, **options)
在該函數(shù)中京腥,會調(diào)用當前對象的__call__
函數(shù)赦肃,同樣地,這里也是調(diào)用Command
類中定義的該函數(shù)公浪。該函數(shù)中他宛,會調(diào)用當前對象的run
函數(shù),這里調(diào)用的便是worker
類中定義的run
函數(shù)欠气。在該函數(shù)中厅各,會首先進行一些配置,之后便是創(chuàng)建真正的Worker
類的對象之后調(diào)用start
函數(shù)啟動预柒。
本階段的調(diào)用時序圖可以整理如下: