Python 定時任務(wù)最佳實踐

背景

最近有個需求,需要實現(xiàn)一個定時或定期任務(wù)的功能惜傲,需要實現(xiàn)每月洽故、每日、每時盗誊、一次性等需求时甚,必須是輕量級不依賴其它額外組件隘弊,并能支持動態(tài)添加任務(wù)。由于當(dāng)前任務(wù)信息保存在集群 ETCD 數(shù)據(jù)庫中撞秋,因此對任務(wù)持久化要求不高长捧,每次重啟都直接讀取 ETCD 任務(wù)信息嚣鄙,為了后面擴(kuò)展吻贿,還需要添加任務(wù)持久化功能。

定時任務(wù)庫對比

根據(jù)上面需求哑子,從社區(qū)中找到了幾個 Python 好用的任務(wù)調(diào)度庫舅列。有以下幾個庫:

  • schedule:Python job scheduling for humans. 輕量級,無需配置的作業(yè)調(diào)度庫
  • python-crontab: 針對系統(tǒng) Cron 操作 crontab 文件的作業(yè)調(diào)度庫
  • Apscheduler:一個高級的 Python 任務(wù)調(diào)度庫
  • Celery: 是一個簡單卧蜓,靈活帐要,可靠的分布式系統(tǒng),用于處理大量消息弥奸,同時為操作提供維護(hù)此類系統(tǒng)所需的工具, 也可用于任務(wù)調(diào)度

優(yōu)缺點(diǎn)對比:

  • schedule 優(yōu)點(diǎn)是簡單榨惠、輕量級、無需配置盛霎、語法簡單赠橙,缺點(diǎn)是阻塞式調(diào)用、無法動態(tài)添加或刪除任務(wù)
  • Python-crontab 優(yōu)點(diǎn)是針對于系統(tǒng) crontab 操作愤炸,支持定時期揪、定期任務(wù),能夠動態(tài)添加任務(wù)规个,不能實現(xiàn)一次性任務(wù)需求
  • Apscheduler 優(yōu)點(diǎn)支持定時凤薛、定期、一次性任務(wù)诞仓,支持任務(wù)持久化及動態(tài)添加缤苫、支持配置各種持久化存儲源(如 redis、MongoDB)墅拭,支持接入到各種異步框架(如 gevent榨馁、asyncio、tornado)
  • Celery 支持配置定期任務(wù)帜矾、支持 crontab 模式配置翼虫,不支持一次性定時任務(wù)

schedule 庫

人類的Python 任務(wù)調(diào)度庫,和 requests 庫一樣 for humans. 這個庫也是最輕量級的一個任務(wù)調(diào)度庫屡萤,schedule 允許用戶使用簡單珍剑、人性化的語法以預(yù)定的時間間隔定期運(yùn)行Python函數(shù)(或其它可調(diào)用函數(shù))。

直接使用 pip install schedule進(jìn)行安裝使用死陆,下面來看看官網(wǎng)給的示例:

import schedule
import time

# 定義你要周期運(yùn)行的函數(shù)
def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)               # 每隔 10 分鐘運(yùn)行一次 job 函數(shù)
schedule.every().hour.do(job)                    # 每隔 1 小時運(yùn)行一次 job 函數(shù)
schedule.every().day.at("10:30").do(job)         # 每天在 10:30 時間點(diǎn)運(yùn)行 job 函數(shù)
schedule.every().monday.do(job)                  # 每周一 運(yùn)行一次 job 函數(shù)
schedule.every().wednesday.at("13:15").do(job)   # 每周三 13:15 時間點(diǎn)運(yùn)行 job 函數(shù)
schedule.every().minute.at(":17").do(job)        # 每分鐘的 17 秒時間點(diǎn)運(yùn)行 job 函數(shù)

while True:
    schedule.run_pending()   # 運(yùn)行所有可以運(yùn)行的任務(wù)
    time.sleep(1)

通過上面示例招拙,可以很容易學(xué)會使用 schedule 庫唧瘾,可以設(shè)置秒、分鐘别凤、小時饰序、天、周來運(yùn)行任務(wù)规哪,然后通過一個死循環(huán)求豫,一直不斷地運(yùn)行所有的計劃任務(wù)。

schedule 常見問題

1诉稍、如何并行執(zhí)行任務(wù)蝠嘉?

schedule 是阻塞式的,默認(rèn)情況下杯巨, schedule 按順序執(zhí)行所有的作業(yè)蚤告,不能達(dá)到并行執(zhí)行任務(wù)。如下所示:

import arrow
import schedule

def job1():
    print("job1 start time: %s" % arrow.get().format())
    time.sleep(2)
    print("job1 end time: %s" % arrow.get().format())

def job2():
    print("job2 start time: %s" % arrow.get().format())
    time.sleep(5)
    print("job2 end time: %s" % arrow.get().format())

def job3():
    print("job3 start time: %s" % arrow.get().format())
    time.sleep(10)
    print("job3 end time: %s" % arrow.get().format())

if __name__ == '__main__':
    schedule.every(10).seconds.do(job1)
    schedule.every(30).seconds.do(job2)
    schedule.every(5).to(10).seconds.do(job3)

    while True:
        schedule.run_pending()

返回部分結(jié)果如下所示服爷,幾個任務(wù)并不是并行開始的杜恰,是安裝時間順序先后開始的:

job3 start time: 2019-06-01 09:27:54+00:00
job3 end time: 2019-06-01 09:28:04+00:00
job1 start time: 2019-06-01 09:28:04+00:00
job1 end time: 2019-06-01 09:28:06+00:00
job3 start time: 2019-06-01 09:28:13+00:00
job3 end time: 2019-06-01 09:28:23+00:00
job2 start time: 2019-06-01 09:28:23+00:00
job2 end time: 2019-06-01 09:28:28+00:00
job1 start time: 2019-06-01 09:28:28+00:00
job1 end time: 2019-06-01 09:28:30+00:00
job3 start time: 2019-06-01 09:28:30+00:00
job3 end time: 2019-06-01 09:28:40+00:00
job1 start time: 2019-06-01 09:28:40+00:00
job1 end time: 2019-06-01 09:28:42+00:00

如果需要實現(xiàn)并行,那么使用多線程方式運(yùn)行任務(wù)仍源,官方給出的并行方案如下:

import threading
import time
import schedule

def job():
    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)

while 1:
    schedule.run_pending()
    time.sleep(1)
    
    
# 我在項目里也是通過對每個任務(wù)運(yùn)行后臺線程方式, 可以通過 run_daemon_thread 起一個守護(hù)線程方式來達(dá)到動態(tài)
添加任務(wù)的功能心褐,每個任務(wù)最終通過新開線程方式執(zhí)行
import threading

    
def ensure_schedule():
    schedule.every(5).seconds.do(do_some)
    
def ensure_schedule_2():
    schedule.every(10).seconds.do(print_some)

def run_daemon_thread(target, *args, **kwargs):
    job_thread = threading.Thread(target=target, args=args, kwargs=kwargs)
    job_thread.setDaemon(True)
    job_thread.start()

def __start_schedule_deamon():
    def schedule_run():
        while True:
            schedule.run_pending()
            time.sleep(1)

    t = threading.Thread(target=schedule_run)
    t.setDaemon(True)
    t.start()
    
def init_schedule_job():
        run_daemon_thread(ensure_schedule)
        run_daemon_thread(ensure_schedule_2)
        
init_schedule_job()
__start_schedule_deamon()

2、如何在不阻塞主線程的情況下連續(xù)運(yùn)行調(diào)度程序镜会?

官方推薦了這個方式檬寂,在單獨(dú)的線程中運(yùn)行調(diào)度程序,如下戳表,在單獨(dú)的線程中運(yùn)行 run_pending 調(diào)度程序桶至。通過 threading 庫的 Event 來實現(xiàn)

 # https://github.com/mrhwick/schedule/blob/master/schedule/__init__.py
 def run_continuously(self, interval=1):
        """Continuously run, while executing pending jobs at each elapsed
        time interval.
        @return cease_continuous_run: threading.Event which can be set to
        cease continuous run.
        Please note that it is *intended behavior that run_continuously()
        does not run missed jobs*. For example, if you've registered a job
        that should run every minute and you set a continuous run interval
        of one hour then your job won't be run 60 times at each interval but
        only once.
        """
        cease_continuous_run = threading.Event()

        class ScheduleThread(threading.Thread):
            @classmethod
            def run(cls):
                while not cease_continuous_run.is_set():
                    self.run_pending()
                    time.sleep(interval)

        continuous_thread = ScheduleThread()
        continuous_thread.start()
        return cease_continuous_run

3、是否支持時區(qū)

# 官方不計劃支持時區(qū)匾旭,可使用: 
討論:https://github.com/dbader/schedule/pull/16
時區(qū)解決:https://github.com/imiric/schedule/tree/feat/timezone 

4镣屹、如果我的任務(wù)拋出異常怎么辦?

schedule 不捕獲作業(yè)執(zhí)行期間發(fā)生的異常,因此在任務(wù)執(zhí)行期間的任何異常都會冒泡并中斷調(diào)度的 run_xyz(如 run_pending ) 函數(shù), 也就是 run_pending 中斷退出价涝,導(dǎo)致其它任務(wù)無法執(zhí)行

import functools

def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator

@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0

schedule.every(5).minutes.do(bad_task)

另外一種解決方案:
https://gist.github.com/mplewis/8483f1c24f2d6259aef6

5女蜈、如何設(shè)置只跑一次的任務(wù)?

def job_that_executes_once():
    # Do some work ...
    return schedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

6、如何一次取消多個任務(wù)色瘩?

# 通過 tag 函數(shù)給它們添加唯一標(biāo)識符進(jìn)行分組伪窖,取消時通過標(biāo)識符進(jìn)行取消相應(yīng)組的任務(wù)
def greet(name):
    print('Hello {}'.format(name))

schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

schedule.clear('daily-tasks')

7、如何傳遞參數(shù)給任務(wù)函數(shù)

def greet(name):
    print('Hello', name)

schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')

schedule 源碼閱讀

使用 0.6.0 版本最新代碼進(jìn)行分析居兆,加起來才 6百多行覆山,實現(xiàn)很簡潔,先來看看當(dāng)前的文件結(jié)構(gòu)

"""
? tree
# 省略部分文件泥栖,代碼全部在 __init__.py 中
.
├── schedule
│   └── __init__.py
├── setup.py
├── test_schedule.py
└── tox.ini

先從一段簡單的代碼分析
"""
import schedule
import time
import arrow

def job():
    print(f"time: {arrow.now().format('YYYY-MM-DD HH:mm:ss')}, I'm working...")

if __name__ == '__main__':
    print('start schedule...')
    schedule.every(10).seconds.do(job)

    while True:
        schedule.run_pending()
        time.sleep(1)
        
"""
start schedule...
time: 2019-09-27 21:28:14, I'm working...
time: 2019-09-27 21:28:24, I'm working...
time: 2019-09-27 21:28:34, I'm working...
"""

先來看看 run_pending 做了什么事情:

def run_pending():
    """Calls :meth:`run_pending <Scheduler.run_pending>` on the
    :data:`default scheduler instance <default_scheduler>`.
    """
    default_scheduler.run_pending()  # what? default_scheduler 又是什么簇宽?
    
    
#: Default :class:`Scheduler <Scheduler>` object
default_scheduler = Scheduler()   

原來是 Scheduler 類的一個實例勋篓,接下來去 Scheduler 找 run_pending 看看這個方法做了什么操作.

Scheduler 類

class Scheduler(object):
    """
    Objects instantiated by the :class:`Scheduler <Scheduler>` are
    factories to create jobs, keep record of scheduled jobs and
    handle their execution.
    """
    def __init__(self):
        self.jobs = []

1、初始化函數(shù) __init__ 做了什么初始化操作魏割,結(jié)果只是簡單地設(shè)置了一個保存 Job 的列表譬嚣,這也是 schedule 簡潔設(shè)計的一個重要點(diǎn),所有運(yùn)行的任務(wù)都使用一個列表來保存在內(nèi)存中钞它,不提供任何接入不同存儲的配置項拜银。

class Scheduler(object):
    """
    Objects instantiated by the :class:`Scheduler <Scheduler>` are
    factories to create jobs, keep record of scheduled jobs and
    handle their execution.
    """
        # 省略部分代碼
    def run_pending(self):
        """
        運(yùn)行所有計劃運(yùn)行的作業(yè)
        Run all jobs that are scheduled to run.

        Please note that it is *intended behavior that run_pending()
        does not run missed jobs*. For example, if you've registered a job
        that should run every minute and you only call run_pending()
        in one hour increments then your job won't be run 60 times in
        between but only once.
        """
        runnable_jobs = (job for job in self.jobs if job.should_run)
        for job in sorted(runnable_jobs):
            self._run_job(job)

2、接著看看 run_pending 方法做了什么须揣,可以看到 run_pending 從 jobs 列表中獲取當(dāng)前可以運(yùn)行的 job 保存在 runnable_jobs盐股, 并對 runnable_jobs 元祖進(jìn)行排序并對每一個 job 執(zhí)行 _run_job 方法, 這個地方可能有人會有疑問钱豁,Job 到底是個什么對象耻卡,為什么可以用 sorted 方法排序,難道 Job 對象實現(xiàn)了一些 Python 黑魔法函數(shù)牲尺,例如 __lt__ 等?

    def _run_job(self, job):
        ret = job.run()
        if isinstance(ret, CancelJob) or ret is CancelJob:
            self.cancel_job(job)
            
    def cancel_job(self, job):
        """
        Delete a scheduled job.

        :param job: The job to be unscheduled
        """
        try:
            self.jobs.remove(job)
        except ValueError:
            pass

3卵酪、進(jìn)入 _run_job 方法,可以看到這個方法只是負(fù)責(zé)調(diào)用 job 對象的 run 方法運(yùn)行這個 job 而已谤碳,并且根據(jù) run 返回的對象判斷是否取消該任務(wù). 取消一個 job 就是從 jobs 列表中將這個 job 對象去除即可

Job 類

從上面 Schedule 追蹤下面溃卡,最后還是調(diào)用 job 的 run 方法進(jìn)行操作,接下來就繼續(xù)對 Job 追蹤

1蜒简、Job 的初始化方法做了什么

class Job(object):
    """
    A periodic job as used by :class:`Scheduler`.

    :param interval: A quantity of a certain time unit
    :param scheduler: The :class:`Scheduler <Scheduler>` instance that
                      this job will register itself with once it has
                      been fully configured in :meth:`Job.do()`.

    Every job runs at a given fixed time interval that is defined by:

    * a :meth:`time unit <Job.second>`
    * a quantity of `time units` defined by `interval`

    A job is usually created and returned by :meth:`Scheduler.every`
    method, which also defines its `interval`.
    """
    def __init__(self, interval, scheduler=None):
        self.interval = interval  # pause interval * unit between runs(配置運(yùn)行任務(wù)的時間單位數(shù)量瘸羡,如 seconds(10).do(job),10 就是這個時間單位數(shù)量)
        self.latest = None  # upper limit to the interval( interval 的上限)
        self.job_func = None  # the job job_func to run   (運(yùn)行任務(wù)的函數(shù),通常是我們定義需要定時執(zhí)行的代碼邏輯)
        self.unit = None  # time units, e.g. 'minutes', 'hours', ... (時間單位)
        self.at_time = None  # optional time at which this job runs   (設(shè)置在某個時間點(diǎn)運(yùn)行)
        self.last_run = None  # datetime of the last run   (上次執(zhí)行的時間)
        self.next_run = None  # datetime of the next run    (下次執(zhí)行的時間)
        self.period = None  # timedelta between runs, only valid for
        self.start_day = None  # Specific day of the week to start on (指定一周中的第幾天運(yùn)行)
        self.tags = set()  # unique set of tags for the job  (Job 的唯一tag 標(biāo)識)
        self.scheduler = scheduler  # scheduler to register with  (Scheduler 類搓茬,可繼承并使用自己實現(xiàn)的 Scheduler 類)
        
   def __lt__(self, other):
        """
        看這個黑魔法函數(shù)犹赖,就是上面 Scheduler 類中 job 可以使用 sorted 排序的原因
        PeriodicJobs are sortable based on the scheduled time they
        run next.
        """
        return self.next_run < other.next_run  

2、job run 方法的邏輯

    def run(self):
        """
        Run the job and immediately reschedule it.

        :return: The return value returned by the `job_func`
        """
        logger.info('Running job %s', self)
        ret = self.job_func()
        self.last_run = datetime.datetime.now()
        self._schedule_next_run()
        return ret

3卷仑、every 函數(shù)操作

可能看到這里的朋友覺得有點(diǎn)混亂了峻村,job 是什么時候?qū)嵗模质鞘裁磿r候加入 Scheduler 的 jobs 列表的锡凝,其實我沒有按照 示例代碼的順序來講話粘昨,可以回到前面代碼示例中有下面這句代碼在 run_pending 之前:

schedule.every(10).seconds.do(job)

也就是這個地方實例化 Job 類的,可以跳到這段代碼分析分析:

class Scheduler(object):
        # 省略部分代碼
    def every(self, interval=1):
        """
        Schedule a new periodic job.

        :param interval: A quantity of a certain time unit
        :return: An unconfigured :class:`Job <Job>`
        """
        job = Job(interval, self)
        return job
      
def every(interval=1):
    """Calls :meth:`every <Scheduler.every>` on the
    :data:`default scheduler instance <default_scheduler>`.
    """
    return default_scheduler.every(interval)

4窜锯、do 函數(shù)操作

可以看到在調(diào)用 every 函數(shù)時张肾,最終調(diào)用的是 Scheduler 類的 every 方法,該方法主要是根據(jù)設(shè)置的間隔時間(interval) 實例化 Job 類并返回該實例锚扎,這段代碼同樣沒有 job 加入 Scheduler 的 jobs 列表的邏輯吞瞪,那就是在 seconds.do 方法進(jìn)行的操作,接著翻代碼看看:

    def do(self, job_func, *args, **kwargs):
        """
        Specifies the job_func that should be called every time the
        job runs.

        Any additional arguments are passed on to job_func when
        the job runs.

        :param job_func: The function to be scheduled
        :return: The invoked job instance
        """
        self.job_func = functools.partial(job_func, *args, **kwargs)
        try:
            functools.update_wrapper(self.job_func, job_func)
        except AttributeError:
            # job_funcs already wrapped by functools.partial won't have
            # __name__, __module__ or __doc__ and the update_wrapper()
            # call will fail.
            pass
        self._schedule_next_run()
        self.scheduler.jobs.append(self)
        return self
  • 首先使用標(biāo)準(zhǔn)庫的 partial (偏函數(shù)) 先提前為 job_func(我們提供的業(yè)務(wù)邏輯代碼)設(shè)置參數(shù)工秩,用一些默認(rèn)參數(shù)包裝一個可調(diào)用對象,返回結(jié)果是可調(diào)用對象尸饺,并且可以像原始對象一樣對待进统,凍結(jié)部分函數(shù)位置函數(shù)或關(guān)鍵字參數(shù),簡化函數(shù),更少更靈活的函數(shù)參數(shù)調(diào)用浪听。
  • update_wrapper 做了什么操作螟碎,這個函數(shù)的作用就是從 **被修飾的函數(shù)(job_func) ** 中取出一些屬性值來,賦值給 修飾器函數(shù)(self.job_func) 迹栓。默認(rèn) partial 對象沒有 namedoc, 這種情況下掉分,對于裝飾器函數(shù)非常難以debug.
  • 接著就是 self._schedule_next_run 方法,這個是 schedule 庫的核心代碼克伊,有點(diǎn)復(fù)雜酥郭,下面慢慢解釋
  • 接下來一行就是想要的答案了,這個時候?qū)?dāng)前 Job 類加入 Scheduler 類的 jobs 列表中愿吹,append(self) 這個 self 就是當(dāng)前的 Job 類

5不从、核心 _schedule_next_run

    def _schedule_next_run(self):
        """
        Compute the instant when this job should run next.
        """
        # 第一步,判斷當(dāng)前運(yùn)行的時間單位是否在指定范圍內(nèi)犁跪,不在則報錯
        if self.unit not in ('seconds', 'minutes', 'hours', 'days', 'weeks'):
            raise ScheduleValueError('Invalid unit')
            
                # 如果 interval 的上限時間不為 None, 判斷 interval 上限時間是否小于 interval
        # 小于則報錯椿息,latest 用于 every(A).to(B).seconds 每 N 秒執(zhí)行一次 job 任務(wù),
        # 其中 A <= N <= B. 所以 A(interval) <= B (latest)
        if self.latest is not None:
            if not (self.latest >= self.interval):
                raise ScheduleError('`latest` is greater than `interval`')
            # 執(zhí)行時間隨機(jī)從 interval 到 latest 之前取值
            interval = random.randint(self.interval, self.latest)
        else:
            interval = self.interval

        # 下面兩行用于獲取下一次執(zhí)行的時間
        self.period = datetime.timedelta(**{self.unit: interval})
        self.next_run = datetime.datetime.now() + self.period
        
        # start_day 這個只會在設(shè)置一周的第幾天執(zhí)行才會有坷衍,所以 unit 時間單位不是 weeks 就報錯
        if self.start_day is not None:
            if self.unit != 'weeks':
                raise ScheduleValueError('`unit` should be \'weeks\'')
            weekdays = (
                'monday',
                'tuesday',
                'wednesday',
                'thursday',
                'friday',
                'saturday',
                'sunday'
            )
            # 如果天數(shù)的標(biāo)識不是上面的寝优,報錯
            if self.start_day not in weekdays:
                raise ScheduleValueError('Invalid start day')
            
            weekday = weekdays.index(self.start_day)
            # datetime 的 weekday() 函數(shù),計算目標(biāo)時間是否已經(jīng)在本周發(fā)送
            days_ahead = weekday - self.next_run.weekday() 
            if days_ahead <= 0:  # Target day already happened this week
                days_ahead += 7
            self.next_run += datetime.timedelta(days_ahead) - self.period
        if self.at_time is not None:
            if (self.unit not in ('days', 'hours', 'minutes')
                    and self.start_day is None):
                raise ScheduleValueError(('Invalid unit without'
                                          ' specifying start day'))
            kwargs = {
                'second': self.at_time.second,
                'microsecond': 0
            }
            if self.unit == 'days' or self.start_day is not None:
                kwargs['hour'] = self.at_time.hour
            if self.unit in ['days', 'hours'] or self.start_day is not None:
                kwargs['minute'] = self.at_time.minute
            self.next_run = self.next_run.replace(**kwargs)
            # If we are running for the first time, make sure we run
            # at the specified time *today* (or *this hour*) as well
            if not self.last_run:
                now = datetime.datetime.now()
                if (self.unit == 'days' and self.at_time > now.time() and
                        self.interval == 1):
                    self.next_run = self.next_run - datetime.timedelta(days=1)
                elif self.unit == 'hours' \
                        and self.at_time.minute > now.minute \
                        or (self.at_time.minute == now.minute
                            and self.at_time.second > now.second):
                    self.next_run = self.next_run - datetime.timedelta(hours=1)
                elif self.unit == 'minutes' \
                        and self.at_time.second > now.second:
                    self.next_run = self.next_run - \
                                    datetime.timedelta(minutes=1)
        if self.start_day is not None and self.at_time is not None:
            # Let's see if we will still make that time we specified today
            if (self.next_run - datetime.datetime.now()).days >= 7:
                self.next_run -= self.period

APScheduler 庫

APScheduler(Advanced Python Scheduler)是基于Quartz的一個Python定時任務(wù)框架枫耳,實現(xiàn)了Quartz的所有功能, 是一個輕量級但功能強(qiáng)大的進(jìn)程內(nèi)任務(wù)調(diào)度程序乏矾。它有以下三個特點(diǎn):

  • 類似于 Liunx Cron 的調(diào)度程序(可選的開始/結(jié)束時間)
  • 基于時間間隔的執(zhí)行調(diào)度(周期性調(diào)度,可選的開始/結(jié)束時間)
  • 一次性執(zhí)行任務(wù)(在設(shè)定的日期/時間運(yùn)行一次任務(wù))

可以按照個人喜好來混合和匹配調(diào)度系統(tǒng)和存儲作業(yè)的后端存儲迁杨,支持以下幾種后臺作業(yè)存儲:

  • Memory
  • SQLAlchemy (任何 SQLAlchemy 支持的關(guān)系型數(shù)據(jù)庫)
  • MongoDB
  • Redis
  • ZooKeeper
  • RethinkDB

APScheduler 集成了以下幾個 Python 框架:

  • asyncio
  • gevent
  • Tornado
  • Twisted
  • Qt

總結(jié)以上钻心,APScheduler 支持基于日期、固定時間仑最、crontab 形式三種形式的任務(wù)調(diào)度扔役,可以靈活接入各種類型的后臺作業(yè)存儲來持久化作業(yè),同時提供了多種調(diào)度器(后面提及)警医,集成多種 Python 框架亿胸,可以根據(jù)實際情況靈活組合后臺存儲以及調(diào)度器來使用。

APScheduler 的架構(gòu)及工作原理

1预皇、APScheduler 基本概念

APScheduler 由四個組件構(gòu)成(注:該部分翻譯至官方文檔):

  • triggers 觸發(fā)器

    觸發(fā)器包含調(diào)度邏輯侈玄。每個作業(yè)(job)都有自己的觸發(fā)器,用于確定下一個作業(yè)何時運(yùn)行吟温。除了最初的配置序仙,觸發(fā)器是完全無狀態(tài)的

  • job stores 作業(yè)存儲

    job stores 是存放作業(yè)的地方,默認(rèn)保存在內(nèi)存中鲁豪。作業(yè)數(shù)據(jù)序列化后保存至持久性數(shù)據(jù)庫潘悼,從持久性數(shù)據(jù)庫加載回來時會反序列化律秃。作業(yè)存儲(job stores)不將作業(yè)數(shù)據(jù)保存在內(nèi)存中(默認(rèn)存儲除外),相反治唤,內(nèi)存只是充當(dāng)后端存儲在保存棒动、加載、更新宾添、查找作業(yè)時的中間人角色船惨。作業(yè)存儲不能在調(diào)度器(schedulers) 之間共享

  • executors 執(zhí)行器

    執(zhí)行器處理作業(yè)的運(yùn)行。它們通常通過將作業(yè)中的指定可調(diào)用部分提交給線程或進(jìn)程池來實現(xiàn)這一點(diǎn)缕陕。 當(dāng)作業(yè)完成后粱锐,執(zhí)行器通知調(diào)度器,然后調(diào)度器發(fā)出一個適當(dāng)?shù)氖录?/p>

  • schedulers 調(diào)度器

    調(diào)度器是將其余部分綁定在一起的工具扛邑。通常只有一個調(diào)度器(scheduler)在應(yīng)用程序中運(yùn)行怜浅。應(yīng)用程序開發(fā)者通常不直接處理作業(yè)存儲(job stores)、執(zhí)行器(executors)或者觸發(fā)器(triggers)鹿榜。相反海雪,調(diào)度器提供了適當(dāng)?shù)慕涌趤硖幚硭鼈兘蹙簟E渲米鳂I(yè)存儲(job stores)和執(zhí)行器(executors)是通過調(diào)度器(scheduler)來完成的,就像添加舱殿、修改和刪除 job(作業(yè))一樣

2、APScheduler 架構(gòu)圖

apscheduler架構(gòu)圖
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末险掀,一起剝皮案震驚了整個濱河市沪袭,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌樟氢,老刑警劉巖冈绊,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異埠啃,居然都是意外死亡死宣,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進(jìn)店門碴开,熙熙樓的掌柜王于貴愁眉苦臉地迎上來毅该,“玉大人,你說我怎么就攤上這事潦牛】粽疲” “怎么了?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵巴碗,是天一觀的道長朴爬。 經(jīng)常有香客問我,道長橡淆,這世上最難降的妖魔是什么召噩? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任母赵,我火速辦了婚禮,結(jié)果婚禮上具滴,老公的妹妹穿的比我還像新娘市咽。我一直安慰自己,他們只是感情好抵蚊,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布施绎。 她就那樣靜靜地躺著,像睡著了一般贞绳。 火紅的嫁衣襯著肌膚如雪谷醉。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天冈闭,我揣著相機(jī)與錄音俱尼,去河邊找鬼。 笑死萎攒,一個胖子當(dāng)著我的面吹牛遇八,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播耍休,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼刃永,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了羊精?” 一聲冷哼從身側(cè)響起斯够,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎喧锦,沒想到半個月后读规,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡燃少,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年束亏,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片阵具。...
    茶點(diǎn)故事閱讀 38,039評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡琅绅,死狀恐怖眶明,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤蔚鸥,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布岔冀,位于F島的核電站早龟,受9級特大地震影響案疲,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一越庇、第九天 我趴在偏房一處隱蔽的房頂上張望罩锐。 院中可真熱鬧,春花似錦卤唉、人聲如沸涩惑。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽竭恬。三九已至,卻和暖如春熬的,著一層夾襖步出監(jiān)牢的瞬間痊硕,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工押框, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留岔绸,地道東北人。 一個月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓橡伞,卻偏偏與公主長得像盒揉,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子兑徘,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容

  • 上篇文章刚盈,我們了解到有三種辦法能實現(xiàn)定時任務(wù),但是都無法做到循環(huán)執(zhí)行定時任務(wù)道媚。因此扁掸,需要一個能夠擔(dān)當(dāng)此重任的庫。它...
    猴哥愛讀書閱讀 23,280評論 1 36
  • 1最域、 簡介 APScheduler的全稱是Advanced Python Scheduler。它是一個輕量級的 P...
    CoderZS閱讀 1,435評論 0 1
  • Python定時任務(wù) 在項目中锈麸,我們可能遇到有定時任務(wù)的需求镀脂。其一:定時執(zhí)行任務(wù)。例如每天早上 8 點(diǎn)定時推送早報...
    羋學(xué)僧閱讀 2,786評論 0 5
  • # Python 資源大全中文版 我想很多程序員應(yīng)該記得 GitHub 上有一個 Awesome - XXX 系列...
    小邁克閱讀 2,961評論 1 3
  • 今天不小心親眼目睹了兩個supervisor之間的爭吵… 一個即將離職忘伞,一個剛調(diào)過來接手薄翅。 剛做完的項目在項目管理...
    瓶蓋美閱讀 271評論 0 0