Celery
標簽(空格分隔): celery
Celery是一個分布式任務隊列工具,是一個異步的任務隊列基于分布式消息傳遞肄鸽。參考官網(wǎng)勤篮。
1. 基礎概念
- Broker绿满,簡單說就是"消息隊列"辛蚊,Celery的基本工作是管理分配任務到不同的服務器粤蝎,至于服務器之間的通信則是交給了如“RabbitMQ” 第三方服務。
- Task袋马,任務初澎,在Celery中,每個Python function就是一個task虑凛,只要在function前面修飾”@task()“碑宴,Celery就知道這是一個task,需要異步調用task的時候桑谍,task.delay()就可以了延柠,當然有更復雜的調用函數(shù),task.apply_async()锣披,這里面可以指定啥時候調用什么的贞间,這里的調用就是將task加入到queue中,而queue是保存在指定的Broker中的盈罐。
- Worker榜跌,Celery將你要異步處理的task加入到一個queue中,然后空閑的Worker就會將queue中的task給取走交給服務器盅粪。
2. 任務調用
任務(task)調用有三個API:
#給任務發(fā)送消息
apply_async(args[,kwargs[, ...]])
#給任務發(fā)送消息的簡單版钓葫,但是不支持execution options(apply_async有三個部分的參數(shù),第一部分就是task里面的python function的參數(shù)票顾,比如add(x,y)的x,y础浮,第二個參數(shù)叫作keyword arguments,就是設定一些環(huán)境變量奠骄,第三個參數(shù)就是execution options豆同,也就是這個task本身的執(zhí)行選項,時間啊之類)
delay(*args, **kwargs)
#類似直接調用的意思含鳞,即不是讓worker來執(zhí)行任務影锈,而是當前的進程來執(zhí)行。
calling(__call__)
關于execution options有這些主要的參數(shù):
Link(callbacks/errbacks)
就是一個任務接著一個蝉绷,回調任務作為一個partial argument在父任務完成的時候被調用鸭廷。
add.apply_async((2, 2), link=add.s(16))
即(2 + 2) + 16 = 20
ETA and countdown
eta必須是一個datatime對象。你可以設定一個eta熔吗,可以讓你的任務在這個eta開始辆床。
countdown是一個整型,它是eta的簡版桅狠,以秒為單位讼载。
這兩個都保證任務會在這個時間后執(zhí)行轿秧,但是這個時間無法非常確定,因為各方面的原因(網(wǎng)絡延時咨堤,任務隊列太繁忙)菇篡。
Expiration
expires參數(shù)定義了一個可選的到期時間,可以以秒為單位吱型,也可以以datetime逸贾。
序列化
數(shù)據(jù)在客戶端于worker之間的傳遞需要序列化。
Celery內建的支持 pickle,json,yaml和msgpack方式序列化津滞,當然也可以自定義序列化方式(要在Kombu中注冊)铝侵。
如果要指定某種內建的方式:
>>> add.apply_async((10, 10), serializer='json')
壓縮
Celery支持gzip和bzip2,當然你也可以自定義壓縮的方式(要在kombu中注冊)触徐。
指定某種方式的話:
>>> add.apply_async((2, 2), compression='zlib')
3. Canvas:Designing Workflows
有時候咪鲜,你需要把一個調用函數(shù)的某些信息傳遞到一個進程或者做為參數(shù)傳遞到另外一個函數(shù)的時候,Celery用一種叫subtasks完成這種任務撞鹉。如:
>>> add.subtask((2,2), countdown=10)
tasks.add(2,2)
簡寫方式:
>>> add.s(2,2)
tasks.add(2,2)
subtasks的實例也支持調用API疟丙,也就是delay和apply_async方法。
但對于subtasks也有一些不同鸟雏,看例子:
>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4
add任務接受兩個參數(shù)享郊,這個subtasks指定了兩個參數(shù),它就完成了一個complete signature孝鹊。
但是這樣:
# incomplete partial: add(?, 2)
>>> s2 = add.s(2)
這種incomplete signatures稱之為partials炊琉。
這個時候,s2需要另外一個參數(shù)才能完成運算又活,而再次調用的時候苔咪,就可以完成了,像這樣:
# resolves the partial: add(8, 2)
>>> res = s2.delay(8)
>>> res.get()
10
The Primitives(原語柳骄?)
The primitives are subtasks themselves, so that they can be combined in any number of ways to compose complex workflows.
Groups
一個group調用了一系列parallel的任務团赏,看例子:
>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(i, i) for i in xrange(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Partial group:
>>> g = group(add.s(i) for i in xrange(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
add.s(i)這個就是partial,只有一個參數(shù)耐薯,所以在g(10)中傳遞10這個參數(shù)之后舔清,每個add.s(i,10),就make a complete signature了曲初。
Chains
任務是可以被鏈接的体谒,也就是一個任務完成后,就把結果返回給另外一個任務复斥,像這樣:
>>> from celery import chain
>>> from proj.tasks import add, mul
# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64
還有partial chain:
# (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64
還有這樣的寫法也是可以的:
>>> (add.s(4, 4) | mul.s(8))().get()
64
Chords
一個chord就是group帶有一個回調:
>>> from celery import chord
>>> from proj.tasks import add, xsum
>>> chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()
90
一個group通過chain另一個任務营密,就會自動變成一個chord:
>>> (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()
90
由于這些primitives都是subtask類型械媒,所以可以任意組合成你想要的樣子目锭,比如:
>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)
Signatures
signature()用一種能傳遞給一個函數(shù)的方式评汰,包含了一個任務調用的 arguments(參數(shù),即任務本身的參數(shù)痢虹,像add(x,y)中的參數(shù)), keyword arguments(關鍵字參數(shù)被去,就是debug=false,true這類參數(shù)), and execution options(執(zhí)行選項,比如運行時間countdown奖唯,到期時間expirt)惨缆。
signatures通常也被叫作"subtasks"》峤荩可以這樣用:
>>> from celery import signature
>>> signature('tasks.add', args=(2, 2), countdown=10)
tasks.add(2, 2)
或者你直接使用task的subtask方法:
>>> add.subtask((2, 2), countdown=10)
tasks.add(2, 2)
簡寫版就是這樣:
>>> add.s(2, 2)
tasks.add(2, 2)
Immutabel signatures
意思就是partial的任務可以在回調的時候坯墨,把參數(shù)值再傳進來,但有時候并不想得到某個函數(shù)的值病往,這個時候就可以把這個函數(shù)的immutable設成true:
>>> add.subtask((2, 2), immutable=True)
也可以簡寫:
>>> add.si(2, 2)
看例子:
>>> res = (add.si(2, 2) | add.si(4, 4) | add.s(8, 8))()
>>> res.get()
16
>>> res.parent.get()
8
>>> res.parent.parent.get()
4