最近項目中遇到了定時任務(wù),django的框架,用到了celery,mysql作為數(shù)據(jù)庫存儲job數(shù)據(jù),rabbit MQ作為消息隊列,還有oslo.service等openstack組件,現(xiàn)總結(jié)下
- 首先在task_manager_api項目中,創(chuàng)建任務(wù)時,create_update_trigger是在數(shù)據(jù)庫celery_period_task表中插入一條數(shù)據(jù)
- 然后在task_manager項目中scheduler/manager.py
通過添加裝飾器@periodic_task.periodic_task產(chǎn)生一個周期任務(wù),spacing表示周期運行任務(wù)時間間隔
schedule_task函數(shù)實現(xiàn)的是從數(shù)據(jù)庫表中獲取數(shù)據(jù),然后調(diào)celery.send_task函數(shù),發(fā)送任務(wù)
from oslo_service import periodic_task
@periodic_task.periodic_task(spacing=CONF.scheduler_task_interval)
def schedule_task():
jobs = objects.job.JobList.get_all_with_status(context,
[contants.PENDING,
contants.RUNNING])
for job in jobs:
self.schedule_job(context, job.uuid, job=job)
# self.schedule_job中是self.app.send_task
#send_task可以發(fā)送未被注冊的異步任務(wù),沒有被celery.task裝飾的任務(wù)
- 大概研究了一下裝飾器periodic_task,在oslo_service/periodic_task.py中,
def periodic_task 主要作用是增加一個屬性f._periodic_task = True,在_PeriodicTasksMeta元類中,會判斷函數(shù)是否有這個屬性,如果有的話,會加入到_periodic_tasks列表中
class _PeriodicTasksMeta(type):
def _add_periodic_task(cls, task):
....
cls._periodic_tasks.append((name, task))
return True
def __init__(cls, names, bases, dict_):
"""Metaclass that allows us to collect decorated periodic tasks."""
super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
...
for value in cls.__dict__.values():
if getattr(value, '_periodic_task', False):
cls._add_periodic_task(value)
@six.add_metaclass(_PeriodicTasksMeta)
class PeriodicTasks(object):
def run_periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
idle_for = DEFAULT_INTERVAL
for task_name, task in self._periodic_tasks:
....
try:
task(self, context)
- run_periodic_tasks函數(shù)是task_manager/service.py
Service類中的start函數(shù)中被觸發(fā)
periodic = loopingcall.FixedIntervalLoopingCall(
self.periodic_tasks)
loopingcall是查看task是否執(zhí)行完成