前言
最近某個Flask Web項目需要定時讀取數(shù)據(jù)庫浮定,并對數(shù)據(jù)進(jìn)行更新伦连,想了想還是有自己的實現(xiàn)辦法的:
引入threading
from threading import Thread
thr = Thread(target=timed_task)
thr.start()
這樣就通過一個函數(shù)構(gòu)造了一個線程撮躁,每次在manager.run()
之前加上這幾行代碼惜犀,就大概可以實現(xiàn)要求了
但是存在問題牙甫,Python有GIL全局鎖限制,一個Python進(jìn)程實際上總是一個線程在跑病蛉,無法充分使用CPU(可能還可以使用多進(jìn)程Process炫加,但博主不太熟悉這方面,沒有嘗試)铺然,自然性能上會有很大問題俗孝。
并且還有一個問題是,博主使用uwsgi在服務(wù)器上部署Web應(yīng)用魄健,所以在這里寫出這樣的代碼就太過牽強了赋铝,我想應(yīng)該有一個工具能夠不斷的監(jiān)控、分配任務(wù)沽瘦、執(zhí)行任務(wù)革骨,它就是Celery
农尖。
Celery介紹
Celery翻譯為“隊列”,它的工作過程也自然離不開這個概念良哲。
Celery里有兩個模塊:worker和beat
worker:用于執(zhí)行隊列中的任務(wù)
beat:用于定時分配任務(wù)
這兩個模塊可以同時啟動盛卡,也可以分別啟動
任務(wù)可以有兩個來源
代碼內(nèi)調(diào)用:將需要放入執(zhí)行隊列的任務(wù)函數(shù)import進(jìn)來
beat定時派發(fā):在配置文件里設(shè)置好需要調(diào)用的任務(wù)函數(shù)和調(diào)用周期,beat就會自動派發(fā)任務(wù)到隊列里了
Celery安裝
Celery使用之前需要配置消息中間件
一般是用Redis數(shù)據(jù)庫來通訊更加方便筑凫,需要本地安裝redis服務(wù)并啟動窟扑,具體操作請參考其他博客和官方文檔。
Python也需要安裝redis支持
pip install redis
用pip可以非常簡單地安裝
pip install celery
Celery最簡單的Demo
我們先來看看這個代碼漏健,它是一個最簡單的celery程序
celery_demo.py
import celery
import time
worker = celery.Celery("celery_name", backend="redis://localhost:6379/", broker="redis://localhost:6379/")
@worker.task
def hello():
return "hello,{}".format(time.time())
這樣每次處理hello這個任務(wù)的時候嚎货,就會返回“hello,”加上一個時間戳
如此而來,我們只是定義好了任務(wù)函數(shù)和worker(celery對象)
我們還需要創(chuàng)建一個py來調(diào)用這個模塊(當(dāng)然你也可以直接在命令行把這個模塊import進(jìn)去)
celery_schedule.py
from celery_demo import hello
hello.delay()
每運行一次celery_schedule.py蔫浆,一個hello任務(wù)就會被放入任務(wù)隊列殖属,等待worker執(zhí)行
現(xiàn)在我們已經(jīng)將它運行了一次,我們需要開啟worker來執(zhí)行它
在命令行運行如下代碼來啟動worker:
celery worker -A celery_demo.worker -l info
// 后面的-l info參數(shù)意思是開啟日志模式瓦盛,所有消息將會打印在命令行
可以發(fā)現(xiàn)命令行已經(jīng)出現(xiàn)結(jié)果了
[2018-09-01 19:02:32,218: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-09-01 19:02:32,224: INFO/MainProcess] mingle: searching for neighbors
[2018-09-01 19:02:33,238: INFO/MainProcess] mingle: all alone
[2018-09-01 19:02:33,274: INFO/MainProcess] celery@exqlnet-PC ready.
[2018-09-01 19:02:33,275: INFO/MainProcess] Received task: celery_demo.hello[9f32d5e8-282f-44b1-a6b7-39d21682b5f7]
[2018-09-01 19:02:33,276: INFO/MainProcess] Received task: celery_demo.hello[bb4342f4-4950-4b9d-b0d1-dd20614b8b29]
[2018-09-01 19:02:33,276: INFO/MainProcess] Received task: celery_demo.hello[8edd36ba-eadc-428a-9398-06f7910e777f]
[2018-09-01 19:02:33,285: INFO/ForkPoolWorker-1] Task celery_demo.hello[9f32d5e8-282f-44b1-a6b7-39d21682b5f7] succeeded in 0.00840658400557004s: 'hello,1535799753.2767727'
[2018-09-01 19:02:33,285: INFO/ForkPoolWorker-3] Task celery_demo.hello[8edd36ba-eadc-428a-9398-06f7910e777f] succeeded in 0.007285808002052363s: 'hello,1535799753.278022'
[2018-09-01 19:02:33,290: INFO/ForkPoolWorker-2] Task celery_demo.hello[bb4342f4-4950-4b9d-b0d1-dd20614b8b29] succeeded in 0.013728965997870546s: 'hello,1535799753.2767704'
Celery定時任務(wù)
那么既然已經(jīng)可以簡單實現(xiàn)任務(wù)分配和執(zhí)行了洗显,那么如何定時分配任務(wù)呢?
我們在celery_demo.py里加一些東西:
import celery
import time
from datetime import timedelta
worker = celery.Celery("celery_name", backend="redis://localhost:6379/", broker="redis://localhost:6379/")
class Config:
CELERYBEAT_SCHEDULE = {
'update_info': {
'task': 'celery_demo.hello',
"schedule": timedelta(seconds=3),
}
}
worker.config_from_object(Config)
@worker.task
def hello():
return "hello,{}".format(time.time())
類Config是一個配置類原环,CELERYBEAT_SCHEDULE是用來配置定時任務(wù)的挠唆,具體的直接看代碼
worker.config_from_object()傳入一個配置類或?qū)ο蠹纯杉虞d配置
一開始就說到Celery的beat才是定時安排任務(wù)的工具,所以我們需要用beat來啟動定時嘱吗,在命令行運行以下代碼:
celery beat -A celery_demo.worker -l info
啟動beat玄组,現(xiàn)在應(yīng)該可以在命令行看到如下信息:
LocalTime -> 2018-09-01 19:12:53
Configuration ->
. broker -> redis://localhost:6379//
. loader -> celery.loaders.app.AppLoader
. scheduler -> celery.beat.PersistentScheduler
. db -> celerybeat-schedule
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 minutes (300s)
[2018-09-01 19:12:53,821: INFO/MainProcess] beat: Starting...
[2018-09-01 19:12:53,839: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
[2018-09-01 19:12:56,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
[2018-09-01 19:12:59,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
[2018-09-01 19:13:02,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
[2018-09-01 19:13:05,827: INFO/MainProcess] Scheduler: Sending due task update_info (celery_demo.hello)
說明任務(wù)已經(jīng)分配了,由于停留時間谒麦,我這里已經(jīng)分配了5個任務(wù)到隊列里
再次運行啟動worker命令
celery worker -A celery_demo.worker -l info
可以在屏幕上看到以下信息了
[tasks]
. celery_demo.hello
[2018-09-01 19:14:40,146: INFO/MainProcess] Connected to redis://localhost:6379//
[2018-09-01 19:14:40,152: INFO/MainProcess] mingle: searching for neighbors
[2018-09-01 19:14:41,163: INFO/MainProcess] mingle: all alone
[2018-09-01 19:14:41,172: INFO/MainProcess] celery@exqlnet-PC ready.
[2018-09-01 19:14:41,336: INFO/MainProcess] Received task: celery_demo.hello[8edf753c-edd8-4bf0-b708-22dc53fcf07a]
[2018-09-01 19:14:41,338: INFO/MainProcess] Received task: celery_demo.hello[d747f1a7-12fa-4557-8a98-4db0d4c6f9b3]
[2018-09-01 19:14:41,340: INFO/MainProcess] Received task: celery_demo.hello[0925b2ba-4c24-428c-958b-5d2072292e8e]
[2018-09-01 19:14:41,347: INFO/MainProcess] Received task: celery_demo.hello[c63183f5-c191-42c3-99c4-128555100b69]
[2018-09-01 19:14:41,350: INFO/MainProcess] Received task: celery_demo.hello[8c74ab95-83b7-4d96-a724-044dd4276c30]
[2018-09-01 19:14:41,352: INFO/MainProcess] Received task: celery_demo.hello[86410550-3817-47c5-8f1b-8c3bf467ae72]
[2018-09-01 19:14:41,352: INFO/MainProcess] Received task: celery_demo.hello[e00e896f-79cf-40c4-82a7-59b89beebd78]
[2018-09-01 19:14:41,353: INFO/MainProcess] Received task: celery_demo.hello[7ee3d655-5d20-45fd-85b0-6de38dcf2958]
[2018-09-01 19:14:41,354: INFO/MainProcess] Received task: celery_demo.hello[e0bcb917-693b-4bea-97fc-0987f337c6bc]
[2018-09-01 19:14:41,355: INFO/MainProcess] Received task: celery_demo.hello[2f3068bb-d7dc-4f82-8d70-76a03e2fd682]
[2018-09-01 19:14:41,359: INFO/ForkPoolWorker-2] Task celery_demo.hello[c63183f5-c191-42c3-99c4-128555100b69] succeeded in 0.011140925002109725s: 'hello,1535800481.348562'
[2018-09-01 19:14:41,360: INFO/ForkPoolWorker-4] Task celery_demo.hello[8edf753c-edd8-4bf0-b708-22dc53fcf07a] succeeded in 0.021171232001506723s: 'hello,1535800481.3396409'
[2018-09-01 19:14:41,361: INFO/ForkPoolWorker-2] Task celery_demo.hello[8c74ab95-83b7-4d96-a724-044dd4276c30] succeeded in 0.0003823369988822378s: 'hello,1535800481.3609214'
[2018-09-01 19:14:41,362: INFO/ForkPoolWorker-3] Task celery_demo.hello[d747f1a7-12fa-4557-8a98-4db0d4c6f9b3] succeeded in 0.02279239599738503s: 'hello,1535800481.33969'
[2018-09-01 19:14:41,362: INFO/ForkPoolWorker-4] Task celery_demo.hello[86410550-3817-47c5-8f1b-8c3bf467ae72] succeeded in 0.0008120330021483824s: 'hello,1535800481.3620644'
[2018-09-01 19:14:41,363: INFO/ForkPoolWorker-2] Task celery_demo.hello[e00e896f-79cf-40c4-82a7-59b89beebd78] succeeded in 0.000599899998633191s: 'hello,1535800481.362508'
[2018-09-01 19:14:41,364: INFO/ForkPoolWorker-4] Task celery_demo.hello[7ee3d655-5d20-45fd-85b0-6de38dcf2958] succeeded in 0.0004470089988899417s: 'hello,1535800481.3638816'
[2018-09-01 19:14:41,364: INFO/ForkPoolWorker-3] Task celery_demo.hello[e0bcb917-693b-4bea-97fc-0987f337c6bc] succeeded in 0.000407947001804132s: 'hello,1535800481.363944'
[2018-09-01 19:14:41,364: INFO/ForkPoolWorker-2] Task celery_demo.hello[2f3068bb-d7dc-4f82-8d70-76a03e2fd682] succeeded in 0.00035140299587510526s: 'hello,1535800481.3644059'
[2018-09-01 19:14:41,365: INFO/ForkPoolWorker-1] Task celery_demo.hello[0925b2ba-4c24-428c-958b-5d2072292e8e] succeeded in 0.016904400996281765s: 'hello,1535800481.3485875'