Celery源碼筆記(五)Consumer啟動(dòng)

Consumer啟動(dòng)
Consumer類的啟動(dòng)由Worker類中Blueprint實(shí)例調(diào)用start函數(shù)開始垫蛆,首先我們來看該函數(shù)的定義

class Consumer(object):
    """Consumer blueprint."""

    def start(self):
        blueprint = self.blueprint
        while blueprint.state not in STOP_CONDITIONS:
            maybe_shutdown()
            if self.restart_count:
                try:
                    self._restart_state.step()
                except RestartFreqExceeded as exc:
                    crit('Frequent restarts detected: %r', exc, exc_info=1)
                    sleep(1)
            self.restart_count += 1
            try:
                blueprint.start(self) # 調(diào)用blueprint的start函數(shù)啟動(dòng)各個(gè)組件
            except self.connection_errors as exc:
                # If we're not retrying connections, no need to catch
                # connection errors
                if not self.app.conf.broker_connection_retry:
                    raise
                if isinstance(exc, OSError) and exc.errno == errno.EMFILE:
                    raise  # Too many open files
                maybe_shutdown()
                if blueprint.state not in STOP_CONDITIONS:
                    if self.connection:
                        self.on_connection_error_after_connected(exc)
                    else:
                        self.on_connection_error_before_connected(exc)
                    self.on_close()
                    blueprint.restart(self)

與Worker類似蜓洪,Consumer的啟動(dòng)過程中乳丰,也是通過blueprint調(diào)用各個(gè)啟動(dòng)步驟的start函數(shù)進(jìn)行啟動(dòng)的。

其他組件啟動(dòng)完畢后缀旁,啟動(dòng)event loop組件并開始事件循環(huán),至此Worker啟動(dòng)完成。

組件啟動(dòng)流程

Connection

class Connection(bootsteps.StartStopStep):
    """Service managing the consumer broker connection."""

    def start(self, c):
        c.connection = c.connect() # 調(diào)用Consumer的connect函數(shù)
        info('Connected to %s', c.connection.as_uri())
class Consumer(object):
    """Consumer blueprint."""
    def connect(self):
        """Establish the broker connection used for consuming tasks.

        Retries establishing the connection if the
        :setting:`broker_connection_retry` setting is enabled
        """
        conn = self.connection_for_read(heartbeat=self.amqheartbeat) # 與隊(duì)列建立連接
        if self.hub:
            conn.transport.register_with_event_loop(conn.connection, self.hub) # 將連接加入事件循環(huán)
        return conn

Connection啟動(dòng)時(shí)會(huì)調(diào)用Consumer的connect函數(shù)以連接隊(duì)列,最終會(huì)創(chuàng)建celery.app.amqp.Connection實(shí)例灭忠,而這里實(shí)際上是使用kombu庫的Connection與隊(duì)列連接。連接建立之后座硕,會(huì)將Connection注冊進(jìn)kombu庫的Transport的事件循環(huán)中

Events


class Events(bootsteps.StartStopStep):
    """Service used for sending monitoring events."""

    def start(self, c):
        # flush events sent while connection was down.
        prev = self._close(c)
        dis = c.event_dispatcher = c.app.events.Dispatcher(
            c.connection_for_write(),
            hostname=c.hostname,
            enabled=self.send_events,
            groups=self.groups,
            # we currently only buffer events when the event loop is enabled
            # XXX This excludes eventlet/gevent, which should actually buffer.
            buffer_group=['task'] if c.hub else None,
            on_send_buffered=c.on_send_event_buffered if c.hub else None,
        ) # 創(chuàng)建事件分發(fā)器
        if prev:
            dis.extend_buffer(prev)
            dis.flush()

Events主要的功能是創(chuàng)建并初始化了事件分發(fā)器弛作,用于分發(fā)事件消息,這里創(chuàng)建的是celery.events.dispatcher.EventDispatcher實(shí)例

Mingle

class Mingle(bootsteps.StartStopStep):
    """Bootstep syncing state with neighbor workers.

    At startup, or upon consumer restart, this will:

    - Sync logical clocks.
    - Sync revoked tasks.

    """

    def start(self, c):
        self.sync(c)

    def sync(self, c):
        info('mingle: searching for neighbors')
        replies = self.send_hello(c)
        if replies:
            info('mingle: sync with %s nodes',
                 len([reply for reply, value in items(replies) if value]))
            [self.on_node_reply(c, nodename, reply)
             for nodename, reply in items(replies) if reply]
            info('mingle: sync complete')
        else:
            info('mingle: all alone')

Mingle的作用是同步各個(gè)Worker的狀態(tài),celery的各個(gè)Worker使用broker進(jìn)行通信华匾,詳情可以瀏覽celery.app.control.Control類的定義映琳,以后會(huì)進(jìn)行分析。

Gossip

class Gossip(bootsteps.ConsumerStep):
    """Bootstep consuming events from other workers.

    This keeps the logical clock value up to date.
    """

    def start(self, c):
        super(Gossip, self).start(c)
        self.dispatcher = c.event_dispatcher

Gossip用于處理其他Worker的事件,用于與其他Worker進(jìn)行通信萨西。

Heart

class Heart(bootsteps.StartStopStep):
    """Bootstep sending event heartbeats.

    This service sends a ``worker-heartbeat`` message every n seconds.
    用于發(fā)送心跳信息
    Note:
        Not to be confused with AMQP protocol level heartbeats.
    """

    def start(self, c):
        c.heart = heartbeat.Heart(
            c.timer, c.event_dispatcher, self.heartbeat_interval,
        )
        c.heart.start()

Heart的主要功能是用于發(fā)送信條信息黍瞧,在start函數(shù)中創(chuàng)建了celery.worker.heartbeat.Heart類的實(shí)例,并調(diào)用了該實(shí)例的start函數(shù)

Tasks

class Tasks(bootsteps.StartStopStep):
    """Bootstep starting the task message consumer."""

    def start(self, c):
        """Start task consumer."""
        c.update_strategies()

        # - RabbitMQ 3.3 completely redefines how basic_qos works..
        # This will detect if the new qos smenatics is in effect,
        # and if so make sure the 'apply_global' flag is set on qos updates.
        qos_global = not c.connection.qos_semantics_matches_spec

        # set initial prefetch count
        c.connection.default_channel.basic_qos(
            0, c.initial_prefetch_count, qos_global,
        )

        c.task_consumer = c.app.amqp.TaskConsumer(
            c.connection, on_decode_error=c.on_decode_error,
        ) # 創(chuàng)建Consumer

        def set_prefetch_count(prefetch_count):
            return c.task_consumer.qos(
                prefetch_count=prefetch_count,
                apply_global=qos_global,
            )
        c.qos = QoS(set_prefetch_count, c.initial_prefetch_count) # 創(chuàng)建QoS

Tasks類用于創(chuàng)建消息的Consumer以及QoS原杂,這里用到的Consumer以及QoS均為kombu庫所提供的類。

Control

class Pidbox(object):
    """Worker mailbox."""

    def start(self, c):
        self.node.channel = c.connection.channel() # 獲取信道
        self.consumer = self.node.listen(callback=self.on_message) # 監(jiān)聽信道
        self.consumer.on_decode_error = c.on_decode_error

Control類啟動(dòng)的是celery.worker.pidbox.Pidbox類的實(shí)例 這里可以看到Pidbox所做的工作即為創(chuàng)建信道之后對信道進(jìn)行監(jiān)聽您机,若收到消息后則回調(diào)相應(yīng)的函數(shù)進(jìn)行處理

Event loop

class Evloop(bootsteps.StartStopStep):
    """Event loop service.

    Note:
        This is always started last.
    """

    label = 'event loop'
    last = True

    def start(self, c):
        self.patch_all(c)
        c.loop(*c.loop_args())

    def patch_all(self, c):
        c.qos._mutex = DummyLock()

這里可以看到Evloop的代碼極為簡單穿肄,主要部分是啟動(dòng)事件循環(huán),并且該組件需要在最后啟動(dòng)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末际看,一起剝皮案震驚了整個(gè)濱河市咸产,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌仲闽,老刑警劉巖脑溢,帶你破解...
    沈念sama閱讀 212,080評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異赖欣,居然都是意外死亡屑彻,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,422評論 3 385
  • 文/潘曉璐 我一進(jìn)店門顶吮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來社牲,“玉大人,你說我怎么就攤上這事悴了〔簦” “怎么了?”我有些...
    開封第一講書人閱讀 157,630評論 0 348
  • 文/不壞的土叔 我叫張陵湃交,是天一觀的道長熟空。 經(jīng)常有香客問我,道長搞莺,這世上最難降的妖魔是什么息罗? 我笑而不...
    開封第一講書人閱讀 56,554評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮才沧,結(jié)果婚禮上阱当,老公的妹妹穿的比我還像新娘。我一直安慰自己糜工,他們只是感情好弊添,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,662評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著捌木,像睡著了一般油坝。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,856評論 1 290
  • 那天澈圈,我揣著相機(jī)與錄音彬檀,去河邊找鬼。 笑死瞬女,一個(gè)胖子當(dāng)著我的面吹牛窍帝,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播诽偷,決...
    沈念sama閱讀 39,014評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼坤学,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了报慕?” 一聲冷哼從身側(cè)響起深浮,我...
    開封第一講書人閱讀 37,752評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎眠冈,沒想到半個(gè)月后飞苇,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,212評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蜗顽,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,541評論 2 327
  • 正文 我和宋清朗相戀三年布卡,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片雇盖。...
    茶點(diǎn)故事閱讀 38,687評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡羽利,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出刊懈,到底是詐尸還是另有隱情这弧,我是刑警寧澤,帶...
    沈念sama閱讀 34,347評論 4 331
  • 正文 年R本政府宣布虚汛,位于F島的核電站匾浪,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏卷哩。R本人自食惡果不足惜蛋辈,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,973評論 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望将谊。 院中可真熱鬧冷溶,春花似錦、人聲如沸尊浓。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,777評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽栋齿。三九已至苗胀,卻和暖如春襟诸,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背基协。 一陣腳步聲響...
    開封第一講書人閱讀 32,006評論 1 266
  • 我被黑心中介騙來泰國打工歌亲, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人澜驮。 一個(gè)月前我還...
    沈念sama閱讀 46,406評論 2 360
  • 正文 我出身青樓陷揪,卻偏偏與公主長得像,于是被迫代替她去往敵國和親杂穷。 傳聞我的和親對象是個(gè)殘疾皇子悍缠,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,576評論 2 349