Celery作為一個分布式任務(wù)框架图仓,提供了events對外進行狀態(tài)和信息傳遞叫确,而程序運行過程的中的數(shù)據(jù)是較為關(guān)鍵的慷荔。
事件關(guān)注點
對于事件菊碟,有以下關(guān)注點
- 事件是如何產(chǎn)生的
- 事件是如何傳遞的
- 事件是如何捕獲的
對于項目結(jié)構(gòu)裕照,有以下關(guān)注點
- 對象之間的關(guān)系
- 實現(xiàn)的是否耦合攒发,是否可插拔
evnet sender
Celery內(nèi)置的事件的類型根據(jù)使用者的不同大致可分為Worker/Task的事件。從worker的事件看起晋南。
先看個自己寫的demo:
def event_sender():
# use to send events
with app.events.default_dispatcher() as d:
d.send('task-result', msg='i am cute result {}'.format(datetime.datetime.now()), name='gin')
d.flush()
dispatcher作為事件的分發(fā)器惠猿,并且由于dispatcher內(nèi)部實現(xiàn)了上下文管理器,所以這里直接就用with來進行初始化了(上下文管理器非常方便~實名推薦一波)负间。這里發(fā)送的事件task-result是我們自定義的一個類型偶妖,用于和內(nèi)置類型區(qū)分開,方便觀察結(jié)果政溃。在這里先實例化了一個dispatcher趾访,然后調(diào)用了dispatcher.send方法。這里我們的broker使用redis董虱,所以我們看一下redis的dispatcher的send方法扼鞋。
def send(self, type, blind=False, utcoffset=utcoffset, retry=False,
retry_policy=None, Event=Event, **fields):
if self.enabled:
groups, group = self.groups, group_from(type)
if groups and group not in groups:
return
if group in self.buffer_group:
clock = self.clock.forward()
event = Event(type, hostname=self.hostname,
utcoffset=utcoffset(),
pid=self.pid, clock=clock, **fields)
buf = self._group_buffer[group]
buf.append(event)
if len(buf) >= self.buffer_limit:
self.flush()
elif self.on_send_buffered:
self.on_send_buffered()
else:
return self.publish(type, fields, self.producer, blind=blind,
Event=Event, retry=retry,
retry_policy=retry_policy)
前面根據(jù)type分組然后判斷flush or store in the buffer,然后用參數(shù)實例化events愤诱,這里做的比較人性化的一點是可傳入自定義的fields來初始化Events云头,因為作為事件來說,通常有自定義的數(shù)據(jù)的需求淫半,而這樣處理就比較優(yōu)雅了溃槐。然后發(fā)送者發(fā)送整個事件,也就意味著所有發(fā)送者希望發(fā)送的消息能夠被完整的傳遞科吭。with mutex昏滴,用了一個線程的互斥鎖,然后進行了publish对人, 然后是這里最為關(guān)鍵是publish影涉,看看publish的底層實現(xiàn)
from kombu import Producer
def _publish(self, event, producer, routing_key, retry=False,
retry_policy=None, utcoffset=utcoffset):
exchange = self.exchange
try:
res = producer.publish(
event,
routing_key=routing_key,
exchange=exchange.name,
retry=retry,
retry_policy=retry_policy,
declare=[exchange],
serializer=self.serializer,
headers=self.headers,
delivery_mode=self.delivery_mode,
)
except Exception as exc: # pylint: disable=broad-except
if not self.buffer_while_offline:
raise
self._outbound_buffer.append((event, routing_key, exc))
def enable(self):
self.producer = Producer(self.channel or self.connection,
exchange=self.exchange,
serializer=self.serializer,
auto_declare=False)
self.enabled = True
for callback in self.on_enabled:
callback()
主要邏輯是調(diào)用了self.producer.publish,而self.producer是在dispatcher的enable里創(chuàng)建的规伐,可以看到enable用了kombu的producer蟹倾,kombu是celery’內(nèi)部對于amqp的封裝,用于實現(xiàn)消息傳遞猖闪,其支持了各種的broker鲜棠,比如說redis,rabbitmq培慌。鑒于咱們使用的是redis的broker豁陆,所以也很明顯這里使用redis為broker的pub/sub模式。所以就很清楚啦吵护,celery events的實現(xiàn)是基于redis的pub/sub模式盒音,這也解釋了為什么在筆者測試的時候表鳍,沒有消息的存儲,以及events在開啟監(jiān)聽以后才能夠收到祥诽。
event receiver
最后貼一個自己寫的事件監(jiān)聽譬圣。
def monitor_events():
def on_event(event):
# this is the callback when events come in
print('[recv] {} '.format(event))
with app.connection() as conn:
recv = app.events.Receiver(conn, handlers={'task-result':
on_event})
recv.capture(limit=None, timeout=None, wakeup=True)
events使用場景
基于celery內(nèi)置的事件,可以對于task的執(zhí)行狀態(tài)信息雄坪,和執(zhí)行結(jié)果信息進行實時處理厘熟,例如可以將所有的事件執(zhí)行結(jié)果進行暫存然后結(jié)合時間序列數(shù)據(jù)庫類似influxdb和展示平臺類似grafana進行展示,這樣就有了比較完善的一套結(jié)果數(shù)據(jù)的流程维哈。
也可以自定義事件绳姨,對一些關(guān)注的信息進行實時處理。例如在task執(zhí)行過程中的信息收集阔挠,可以通過events來完成飘庄。
總結(jié)
所以對于上面我們提出的問題,有以下總結(jié)
- 事件是如何產(chǎn)生的:事件由事件的發(fā)送者的初始化购撼,并且發(fā)送竭宰,在celery里事件的發(fā)送往往伴隨著狀態(tài)的改變,例如對于 task的事件中包括 發(fā)送份招,被接收切揭,被執(zhí)行,執(zhí)行成功锁摔,執(zhí)行失敗廓旬,重試。
- 事件是如何傳遞的:如果采用redis作為broker谐腰,那么事件是基于redis的pub/sub模式而傳遞的孕豹。
- 事件是如何捕獲的:同上,基于訂閱模式十气,事件得以被接收励背。
對于對象的關(guān)系呢
在這一部分我們接觸到的對象有
app,dispatcher砸西,events叶眉,publisher,receiver
其中app作為拉起整個celery項目的核心對象芹枷,events對象被掛載到app上衅疙,而dispatcher和receiver則被掛載到了events上。dispatcher和receiver借由kombu的底層實現(xiàn)無關(guān)性鸳慈,直接傳入不同的connection uri實例化kombu的message queue饱溢。整個結(jié)構(gòu)還是比較清晰的,并且面向接口編程也少了很多的耦合走芋〖ɡ桑總的來說潘鲫,還是非常值得借鑒的一種寫法。
參考