Celery 是什么?
- 異步任務(wù)隊列工具,主要解決 realtime 事件的異步操作接箫,但也支持定時任務(wù)泰鸡。
- 什么是異步?那要先理解什么是同步,比如我去麥當(dāng)勞吃飯,如果麥當(dāng)勞前一個顧客點完單,拿到餐抗悍,吃完走人之后才能接待下一個顧客,就是同步钳枕。反過來我點完單缴渊,它馬上就接下一個客人的單,我的流程雖然還沒有走完(time-consuming)鱼炒,但也不影響下一個顧客點單(blocked)衔沼,這就是異步。
Celery 安裝
-
pip install celery
昔瞧,由于 celery 自己并不帶隊列存儲指蚁,所以根據(jù)官方推薦,還需要安裝 RabbitMQ 或者 Redis 來存儲隊列自晰。方便起見凝化,本文用 Redis。
Celery 機制
- celery 是一個裝飾器類酬荞,本質(zhì)上是把一個函數(shù)變成一個可以異步調(diào)用的函數(shù)搓劫。
- 開啟 celery 進程。
- 在另一段程序中導(dǎo)入這個函數(shù)混巧,當(dāng)這個函數(shù)被以 delay 模式調(diào)用時:
xxx.delay()
枪向,celery 會把這個任務(wù)寫到任務(wù)隊列然后返回,原程序不會被阻塞可以往下跑咧党。注意這里需要用 delay 模式去跑這個函數(shù)秘蛔,同時注意不要把xxx.delay()
的結(jié)果賦值給一個變量,否則依然會被阻塞。 - celery 的另一個進程會去這個任務(wù)隊列里取任務(wù)缠犀,完成之后寫到 result 隊列里面数苫〈鲜妫或者多數(shù)情況下辨液,這個被異步調(diào)用的函數(shù)不需要返回結(jié)果,比如發(fā)送一個郵件箱残,提醒之類的滔迈,連 result 隊列都不用。
Celery first blood
開一個 python 腳本被辑,比如
tasks.py
燎悍。-
生成一個 Celery 對象實例,是一個裝飾器盼理。
from celery import Celery app = Celery('__name__', broker='redis://localhost:6379')
-
定義一個你想異步操作的函數(shù)谈山,并加上 celery 裝飾器
@app.task
。@app.task def add(x, y): return x + y
-
保存宏怔,退出奏路,然后在 terminal 啟動 celery 服務(wù)。
celery -A tasks worker --loglevel=info
做個定時任務(wù):每天發(fā)問候
接下來臊诊,我要搭配釘釘機器人了鸽粉,我希望小仙女每天早上7點給我發(fā)個問候,然后在7點半的時候確認(rèn)我有沒有開始干活了抓艳。
-
先寫一個簡單的釘釘提醒程序触机,命名
celery_worker.py
:#! /usr/bin/env python # coding: utf-8 import requests import json import time from config import HOST_IP, NOTIFY_URL, MOBILE_NUMBER def notify_dingding(msg): headers = {"Content-Type": "application/json; charset=utf-8"} post_data = { "msgtype": "text", "text": { "content": msg }, "at": { "atMobiles": [MOBILE_NUMBER] } } r = requests.post(NOTIFY_URL, headers=headers, data=json.dumps(post_data)) print(r.content)
注意這里從 config 里導(dǎo)入了一些參數(shù),所以要在這個程序的同一層寫一個
config.py
的配置文件玷或。# config.py NOTIFY_URL = ("https://oapi.dingtalk.com/robot/send?access_token=" "c6d5a2936381dfc29394f3c336bea5fad962d90ffd31809e92d95a1xxxxxxxx") MOBILE_NUMBER = "176xxxxx619" HOST_IP = "127.0.0.1"
-
導(dǎo)入 celery 包儡首,給函數(shù)加上裝飾器:
from celery import Celery BROKER_URI = 'redis://%s:6379/6' % HOST_IP BACKEND_URI = 'redis://%s:6379/5' % HOST_IP worker = Celery('celery_worker', broker=BROKER_URI, backend=BACKEND_URI) @worker.task def notify_dingding(msg): ...
簡單學(xué)習(xí)一下celery 的 crontab 定時任務(wù)。
-
給 worker 加上定時任務(wù)
from celery.schedules import crontab worker.conf.update( timezone='Asia/Shanghai', enable_utc=True, beat_schedule={ "morning_msg_1": { "task": "celery_worker.notify_dingding", "schedule": crontab(minute=0, hour=7), "args": ("早偏友,起床了喲蔬胯,先去做個早飯吧",) }, "morning_msg_2": { "task": "celery_worker.notify_dingding", "schedule": crontab(minute=30, hour=7), "args": ("我就問問你在干活咩?",) } } )
-
最后程序末尾加一個小測試,看看服務(wù)是不是起來了:
notify_dingding("小仙女上線啦")
-
開啟 redis-server
nohup redis-server &
-
開啟我們的 celery worker约谈,這里的
-B
是 celery 的 beat 服務(wù)笔宿,可以理解為一個周期任務(wù)。celery -A celery_worker worker -B
服務(wù)起來嘍: