celery源碼解析一: Events的實現(xiàn)

Celery作為一個分布式任務(wù)框架图仓,提供了events對外進行狀態(tài)和信息傳遞叫确,而程序運行過程的中的數(shù)據(jù)是較為關(guān)鍵的慷荔。

事件關(guān)注點

對于事件菊碟,有以下關(guān)注點

  1. 事件是如何產(chǎn)生的
  2. 事件是如何傳遞的
  3. 事件是如何捕獲的

對于項目結(jié)構(gòu)裕照,有以下關(guān)注點

  1. 對象之間的關(guān)系
  2. 實現(xiàn)的是否耦合攒发,是否可插拔

evnet sender

Celery內(nèi)置的事件的類型根據(jù)使用者的不同大致可分為Worker/Task的事件。從worker的事件看起晋南。

先看個自己寫的demo:

def event_sender():
    # use to send events
    with app.events.default_dispatcher() as d:
        d.send('task-result', msg='i am cute result {}'.format(datetime.datetime.now()), name='gin')
        d.flush()

dispatcher作為事件的分發(fā)器惠猿,并且由于dispatcher內(nèi)部實現(xiàn)了上下文管理器,所以這里直接就用with來進行初始化了(上下文管理器非常方便~實名推薦一波)负间。這里發(fā)送的事件task-result是我們自定義的一個類型偶妖,用于和內(nèi)置類型區(qū)分開,方便觀察結(jié)果政溃。在這里先實例化了一個dispatcher趾访,然后調(diào)用了dispatcher.send方法。這里我們的broker使用redis董虱,所以我們看一下redis的dispatcher的send方法扼鞋。

    def send(self, type, blind=False, utcoffset=utcoffset, retry=False,
             retry_policy=None, Event=Event, **fields):
   
        if self.enabled:
            groups, group = self.groups, group_from(type)
            if groups and group not in groups:
                return
            if group in self.buffer_group:
                clock = self.clock.forward()
                event = Event(type, hostname=self.hostname,
                              utcoffset=utcoffset(),
                              pid=self.pid, clock=clock, **fields)
                buf = self._group_buffer[group]
                buf.append(event)
                if len(buf) >= self.buffer_limit:
                    self.flush()
                elif self.on_send_buffered:
                    self.on_send_buffered()
            else:
                return self.publish(type, fields, self.producer, blind=blind,
                                    Event=Event, retry=retry,
                                    retry_policy=retry_policy)

前面根據(jù)type分組然后判斷flush or store in the buffer,然后用參數(shù)實例化events愤诱,這里做的比較人性化的一點是可傳入自定義的fields來初始化Events云头,因為作為事件來說,通常有自定義的數(shù)據(jù)的需求淫半,而這樣處理就比較優(yōu)雅了溃槐。然后發(fā)送者發(fā)送整個事件,也就意味著所有發(fā)送者希望發(fā)送的消息能夠被完整的傳遞科吭。with mutex昏滴,用了一個線程的互斥鎖,然后進行了publish对人, 然后是這里最為關(guān)鍵是publish影涉,看看publish的底層實現(xiàn)

from kombu import Producer
    def _publish(self, event, producer, routing_key, retry=False,
                 retry_policy=None, utcoffset=utcoffset):
        exchange = self.exchange
        try:
            res = producer.publish(
                event,
                routing_key=routing_key,
                exchange=exchange.name,
                retry=retry,
                retry_policy=retry_policy,
                declare=[exchange],
                serializer=self.serializer,
                headers=self.headers,
                delivery_mode=self.delivery_mode,
            )
        except Exception as exc:  # pylint: disable=broad-except
            if not self.buffer_while_offline:
                raise
            self._outbound_buffer.append((event, routing_key, exc))

   def enable(self):
        self.producer = Producer(self.channel or self.connection,
                                 exchange=self.exchange,
                                 serializer=self.serializer,
                                 auto_declare=False)
        self.enabled = True
        for callback in self.on_enabled:
            callback()

主要邏輯是調(diào)用了self.producer.publish,而self.producer是在dispatcher的enable里創(chuàng)建的规伐,可以看到enable用了kombu的producer蟹倾,kombu是celery’內(nèi)部對于amqp的封裝,用于實現(xiàn)消息傳遞猖闪,其支持了各種的broker鲜棠,比如說redis,rabbitmq培慌。鑒于咱們使用的是redis的broker豁陆,所以也很明顯這里使用redis為broker的pub/sub模式。所以就很清楚啦吵护,celery events的實現(xiàn)是基于redis的pub/sub模式盒音,這也解釋了為什么在筆者測試的時候表鳍,沒有消息的存儲,以及events在開啟監(jiān)聽以后才能夠收到祥诽。

event receiver

最后貼一個自己寫的事件監(jiān)聽譬圣。

def monitor_events():
    def on_event(event):
        # this is the callback when events come in
        print('[recv] {} '.format(event))

    with app.connection() as conn:
        recv = app.events.Receiver(conn, handlers={'task-result':
        on_event})
        recv.capture(limit=None, timeout=None, wakeup=True)

events使用場景

基于celery內(nèi)置的事件,可以對于task的執(zhí)行狀態(tài)信息雄坪,和執(zhí)行結(jié)果信息進行實時處理厘熟,例如可以將所有的事件執(zhí)行結(jié)果進行暫存然后結(jié)合時間序列數(shù)據(jù)庫類似influxdb和展示平臺類似grafana進行展示,這樣就有了比較完善的一套結(jié)果數(shù)據(jù)的流程维哈。

也可以自定義事件绳姨,對一些關(guān)注的信息進行實時處理。例如在task執(zhí)行過程中的信息收集阔挠,可以通過events來完成飘庄。

總結(jié)

所以對于上面我們提出的問題,有以下總結(jié)

  1. 事件是如何產(chǎn)生的:事件由事件的發(fā)送者的初始化购撼,并且發(fā)送竭宰,在celery里事件的發(fā)送往往伴隨著狀態(tài)的改變,例如對于 task的事件中包括 發(fā)送份招,被接收切揭,被執(zhí)行,執(zhí)行成功锁摔,執(zhí)行失敗廓旬,重試。
  2. 事件是如何傳遞的:如果采用redis作為broker谐腰,那么事件是基于redis的pub/sub模式而傳遞的孕豹。
  3. 事件是如何捕獲的:同上,基于訂閱模式十气,事件得以被接收励背。

對于對象的關(guān)系呢
在這一部分我們接觸到的對象有
app,dispatcher砸西,events叶眉,publisher,receiver

其中app作為拉起整個celery項目的核心對象芹枷,events對象被掛載到app上衅疙,而dispatcher和receiver則被掛載到了events上。dispatcher和receiver借由kombu的底層實現(xiàn)無關(guān)性鸳慈,直接傳入不同的connection uri實例化kombu的message queue饱溢。整個結(jié)構(gòu)還是比較清晰的,并且面向接口編程也少了很多的耦合走芋〖ɡ桑總的來說潘鲫,還是非常值得借鑒的一種寫法。

參考

  1. Monitoring and Management Guide — Celery 3.1.7 文檔
  2. Kombu Documentation — Kombu 4.6.0 documentation
  3. 立強的博客
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末肋杖,一起剝皮案震驚了整個濱河市溉仑,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌兽愤,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,194評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件挪圾,死亡現(xiàn)場離奇詭異浅萧,居然都是意外死亡,警方通過查閱死者的電腦和手機哲思,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評論 2 385
  • 文/潘曉璐 我一進店門洼畅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人棚赔,你說我怎么就攤上這事帝簇。” “怎么了靠益?”我有些...
    開封第一講書人閱讀 156,780評論 0 346
  • 文/不壞的土叔 我叫張陵丧肴,是天一觀的道長。 經(jīng)常有香客問我胧后,道長芋浮,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,388評論 1 283
  • 正文 為了忘掉前任壳快,我火速辦了婚禮纸巷,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘眶痰。我一直安慰自己瘤旨,他們只是感情好,可當我...
    茶點故事閱讀 65,430評論 5 384
  • 文/花漫 我一把揭開白布竖伯。 她就那樣靜靜地躺著存哲,像睡著了一般。 火紅的嫁衣襯著肌膚如雪七婴。 梳的紋絲不亂的頭發(fā)上宏胯,一...
    開封第一講書人閱讀 49,764評論 1 290
  • 那天,我揣著相機與錄音本姥,去河邊找鬼肩袍。 笑死,一個胖子當著我的面吹牛婚惫,可吹牛的內(nèi)容都是我干的氛赐。 我是一名探鬼主播魂爪,決...
    沈念sama閱讀 38,907評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼艰管!你這毒婦竟也來了滓侍?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,679評論 0 266
  • 序言:老撾萬榮一對情侶失蹤牲芋,失蹤者是張志新(化名)和其女友劉穎撩笆,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體缸浦,經(jīng)...
    沈念sama閱讀 44,122評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡夕冲,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,459評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了裂逐。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片歹鱼。...
    茶點故事閱讀 38,605評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖卜高,靈堂內(nèi)的尸體忽然破棺而出弥姻,到底是詐尸還是另有隱情,我是刑警寧澤掺涛,帶...
    沈念sama閱讀 34,270評論 4 329
  • 正文 年R本政府宣布庭敦,位于F島的核電站,受9級特大地震影響薪缆,放射性物質(zhì)發(fā)生泄漏螺捐。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,867評論 3 312
  • 文/蒙蒙 一矮燎、第九天 我趴在偏房一處隱蔽的房頂上張望定血。 院中可真熱鬧,春花似錦诞外、人聲如沸澜沟。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,734評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽茫虽。三九已至,卻和暖如春既们,著一層夾襖步出監(jiān)牢的瞬間濒析,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,961評論 1 265
  • 我被黑心中介騙來泰國打工啥纸, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留号杏,地道東北人。 一個月前我還...
    沈念sama閱讀 46,297評論 2 360
  • 正文 我出身青樓斯棒,卻偏偏與公主長得像盾致,于是被迫代替她去往敵國和親主经。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,472評論 2 348

推薦閱讀更多精彩內(nèi)容