Tips:在學習Celery過程中连霉,使用的系統(tǒng)為Windows 10、Celery版本為3.1.18①啸罢、中間人使用RabbitMQ履磨。
C:\Users\foolf>celery --version
3.1.18 (Cipater)
什么是任務隊列
任務隊列是一種在線程或者機器之間分發(fā)任務的機制。
消息隊列的輸入是工作的一個單元逗宜,稱為任務雄右,獨立的職程(Worker)進程持續(xù)監(jiān)視隊列中是否有需要處理的新任務。
Celery使用消息通信纺讲,通信一般使用中間人(Broker)在客戶端和職程之間斡旋擂仍。這個過程從客戶端想隊列中添加消息開始,之后中間人將消息派送給職程刻诊。
Celery是Python編寫的防楷,但協(xié)議可以使用任何語言實現(xiàn)。
需要什么
Celery需要一個發(fā)送和接受消息的傳述者则涯。RabbbitMQ和Redis中間人的消息支持所有的特性复局,我們主要是使用RabbitMQ作中間人(關于中間人RabbitMQ的安裝可以網上搜索冲簿,有很多詳細的教程)。
Celery優(yōu)勢
在程序運行過程中亿昏,我們經常會遇到一些耗時耗資源的操作峦剔,為了避免阻塞主程序,我們會采用異步或者多線程來處理任務角钩。比如在主程序中調用一個函數吝沫,并從該函數中獲取函數返回值。如果這個函數不能很快執(zhí)行完成并返回递礼,那么主程序就會阻塞惨险,知直到函數返回。
Celery是一個強大的分布式任務隊列脊髓,它可以讓人物的執(zhí)行完全脫離主程序辫愉,甚至可以被分配到其他的主機上運行。
Celery架構:
從圖上可以看出Celery包含幾個模塊:
- 任務模塊
主要包異步任務和定時任務将硝,異步任務通常在業(yè)務邏輯中被觸發(fā)并發(fā)送到任務隊列中恭朗,而定時任務是由Celery Beat進程周期性的將任務發(fā)往任務隊列。 - 消息中間件Broker
Broker就是任務調度隊列依疼,接受任務生產者發(fā)送來的消息痰腮,將任務存入隊列,之所以需要中間人的原因是Celrey本身是不提供消息隊列的服務律罢,所以需要第三方組件實現(xiàn)膀值。 - 任務執(zhí)行單元Worker
Worker是執(zhí)行任務的單元,它實時監(jiān)控消息隊列弟翘,如果有任務就獲取任務并執(zhí)行它虫腋。 - 任務存儲Backend
Backend用于存儲任務只想的結果,存儲可以使用RabbitMQ或者Redis或者數據庫等稀余。
安裝Celery
Celery已經提交到Pypi上,所以我們可是使用Python
的工具pip
來安裝趋翻。
pip install celery==3.1.18
上面的安裝命令睛琳,如果沒有指定版本,系統(tǒng)會默認安裝最新版本的Celery踏烙,但是這里可能在后面的學習中遇到問題师骗。關于問題放到最后解釋,暫且我們先安裝3.1.18版本的Celery讨惩。
創(chuàng)建Celery實例
如果你已經安裝好了Celery辟癌,那么現(xiàn)在就可以創(chuàng)建Celery實例了:
tasks.py
# coding:utf-8
from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
Celery的第一個參數是當前模塊名稱,這個參數是必須的荐捻,第二個參數是中間人關鍵字參數黍少,指定我們所使用的的消息中間人的URL
寡夹,這里我們使用的是RabbitMQ
。我們定義了一個單一的任務厂置,稱為add
,返回兩個數字的和菩掏。
啟動Celery職程服務器(Worker)
celery -A tasks worker --loglevel=info
參數-A指定了Celery實例的位置,這個實例是在tasks.py文件中昵济,Celery會自動在該文件中查找Celery對象實例智绸。
--loglevel指定日志的級別,默認是warning访忿。
如果啟動正常瞧栗,就會看到下面的輸出。
調用任務
現(xiàn)在我們已經開啟了一個Worker了海铆,這樣我們可以在應用程序中使用 delay()或者 apply_async()方法來調用任務迹恐。
在tasks.py文件所在的目錄打開終端。
>>> from tasks import add
>>> add.delay(2, 8)
<AsyncResult: 1b50f449-8fa2-478a-9eea-561a3c29fd43>
>>>
我們先從tasks.py文件中導入add任務對象游添,然后使用delay()方法將任務發(fā)送到消息中間件系草,我們之前開啟的那個Worker會一直監(jiān)控任務隊列,知道有任務到來唆涝,就會執(zhí)行找都。
我們到Worker中可以看到多了幾條日志信息:
[2017-03-09 19:45:35,351: INFO/MainProcess] Received task: tasks.add[1b50f449-8fa2-478a-9eea-561a3c29fd43]
[2017-03-09 19:45:40,920: INFO/MainProcess] Task tasks.add[1b50f449-8fa2-478a-9eea-561a3c29fd43] succeeded in 5.56299996376s: 10
說明我們的任務被調度并執(zhí)行成功了。
獲得結果
剛我們在命令行中調用任務廊酣,很明顯任務執(zhí)行完成能耻,但是我們并不知道任務執(zhí)行后得到的結果是什么。如果我們想獲得執(zhí)行后的結果可以這樣:
>>> result = add.delay(2, 8)
>>> result.ready() # 查看任務執(zhí)行的狀態(tài)亡驰,此刻任務沒有執(zhí)行完成晓猛,顯示False
False
>>> result.ready()
True # 表示任務已經執(zhí)行完成
>>> result.get() # 獲取任務的執(zhí)行結果
10
>>>
注解:
①:之前我是在windows下學習的Celery,安裝的Celery版本是4.0.2凡辱;在運行Worker過程中遇到如下ed錯誤:
I:\Celery\celery-examples>celery -A tasks worker --loglevel=info
-------------- celery@DESKTOP-N53SFFK v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Windows-10-10.0.14393 2017-02-28 00:32:22
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x4700908
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2017-02-28 00:32:22,619: CRITICAL/MainProcess] Unrecoverable error: TypeError('argument 1 must be an integer, not _subprocess_handle',)
Traceback (most recent call last):
File "c:\python27\lib\site-packages\celery\worker\worker.py", line 203, in start
self.blueprint.start(self)
File "c:\python27\lib\site-packages\celery\bootsteps.py", line 119, in start
step.start(parent)
File "c:\python27\lib\site-packages\celery\bootsteps.py", line 370, in start
return self.obj.start()
File "c:\python27\lib\site-packages\celery\concurrency\base.py", line 131, in start
self.on_start()
File "c:\python27\lib\site-packages\celery\concurrency\prefork.py", line 112, in on_start
**self.options)
File "c:\python27\lib\site-packages\billiard\pool.py", line 1008, in __init__
self._create_worker_process(i)
File "c:\python27\lib\site-packages\billiard\pool.py", line 1117, in _create_worker_process
w.start()
File "c:\python27\lib\site-packages\billiard\process.py", line 122, in start
self._popen = self._Popen(self)
File "c:\python27\lib\site-packages\billiard\context.py", line 383, in _Popen
return Popen(process_obj)
File "c:\python27\lib\site-packages\billiard\popen_spawn_win32.py", line 64, in __init__
_winapi.CloseHandle(ht)
TypeError: argument 1 must be an integer, not _subprocess_handle
I:\Celery\celery-examples>Traceback (most recent call last):
File "<string>", line 1, in <module>
File "c:\python27\lib\site-packages\billiard\spawn.py", line 159, in spawn_main
new_handle = steal_handle(parent_pid, pipe_handle)
File "c:\python27\lib\site-packages\billiard\reduction.py", line 121, in steal_handle
_winapi.PROCESS_DUP_HANDLE, False, source_pid)
WindowsError: [Error 87]
經過搜索發(fā)現(xiàn)是因為winsows是不支持celery4的戒职。參照的回答在這https://github.com/celery/celery/issues/3551
所以我機制的將版本降低到3,運行正常透乾。記錄下來僅僅是避免其他人在學習中不會再這個小問題上浪費時間洪燥。
END
由于學習的還是celery的基礎,所以后面的更復雜的內容等學了再更乳乌。
參考文章:
http://docs.jinkan.org/docs/celery/index.html
http://www.guodongkeji.com/newsshow-24-2135-1.html