Consumer啟動(dòng)
Consumer類的啟動(dòng)由Worker類中Blueprint實(shí)例調(diào)用start函數(shù)開始垫蛆,首先我們來看該函數(shù)的定義
class Consumer(object):
"""Consumer blueprint."""
def start(self):
blueprint = self.blueprint
while blueprint.state not in STOP_CONDITIONS:
maybe_shutdown()
if self.restart_count:
try:
self._restart_state.step()
except RestartFreqExceeded as exc:
crit('Frequent restarts detected: %r', exc, exc_info=1)
sleep(1)
self.restart_count += 1
try:
blueprint.start(self) # 調(diào)用blueprint的start函數(shù)啟動(dòng)各個(gè)組件
except self.connection_errors as exc:
# If we're not retrying connections, no need to catch
# connection errors
if not self.app.conf.broker_connection_retry:
raise
if isinstance(exc, OSError) and exc.errno == errno.EMFILE:
raise # Too many open files
maybe_shutdown()
if blueprint.state not in STOP_CONDITIONS:
if self.connection:
self.on_connection_error_after_connected(exc)
else:
self.on_connection_error_before_connected(exc)
self.on_close()
blueprint.restart(self)
與Worker類似蜓洪,Consumer的啟動(dòng)過程中乳丰,也是通過blueprint調(diào)用各個(gè)啟動(dòng)步驟的start函數(shù)進(jìn)行啟動(dòng)的。
其他組件啟動(dòng)完畢后缀旁,啟動(dòng)event loop組件并開始事件循環(huán),至此Worker啟動(dòng)完成。
組件啟動(dòng)流程
Connection
class Connection(bootsteps.StartStopStep):
"""Service managing the consumer broker connection."""
def start(self, c):
c.connection = c.connect() # 調(diào)用Consumer的connect函數(shù)
info('Connected to %s', c.connection.as_uri())
class Consumer(object):
"""Consumer blueprint."""
def connect(self):
"""Establish the broker connection used for consuming tasks.
Retries establishing the connection if the
:setting:`broker_connection_retry` setting is enabled
"""
conn = self.connection_for_read(heartbeat=self.amqheartbeat) # 與隊(duì)列建立連接
if self.hub:
conn.transport.register_with_event_loop(conn.connection, self.hub) # 將連接加入事件循環(huán)
return conn
Connection啟動(dòng)時(shí)會(huì)調(diào)用Consumer的connect函數(shù)以連接隊(duì)列,最終會(huì)創(chuàng)建celery.app.amqp.Connection實(shí)例灭忠,而這里實(shí)際上是使用kombu庫的Connection與隊(duì)列連接。連接建立之后座硕,會(huì)將Connection注冊進(jìn)kombu庫的Transport的事件循環(huán)中
Events
class Events(bootsteps.StartStopStep):
"""Service used for sending monitoring events."""
def start(self, c):
# flush events sent while connection was down.
prev = self._close(c)
dis = c.event_dispatcher = c.app.events.Dispatcher(
c.connection_for_write(),
hostname=c.hostname,
enabled=self.send_events,
groups=self.groups,
# we currently only buffer events when the event loop is enabled
# XXX This excludes eventlet/gevent, which should actually buffer.
buffer_group=['task'] if c.hub else None,
on_send_buffered=c.on_send_event_buffered if c.hub else None,
) # 創(chuàng)建事件分發(fā)器
if prev:
dis.extend_buffer(prev)
dis.flush()
Events主要的功能是創(chuàng)建并初始化了事件分發(fā)器弛作,用于分發(fā)事件消息,這里創(chuàng)建的是celery.events.dispatcher.EventDispatcher實(shí)例
Mingle
class Mingle(bootsteps.StartStopStep):
"""Bootstep syncing state with neighbor workers.
At startup, or upon consumer restart, this will:
- Sync logical clocks.
- Sync revoked tasks.
"""
def start(self, c):
self.sync(c)
def sync(self, c):
info('mingle: searching for neighbors')
replies = self.send_hello(c)
if replies:
info('mingle: sync with %s nodes',
len([reply for reply, value in items(replies) if value]))
[self.on_node_reply(c, nodename, reply)
for nodename, reply in items(replies) if reply]
info('mingle: sync complete')
else:
info('mingle: all alone')
Mingle的作用是同步各個(gè)Worker的狀態(tài),celery的各個(gè)Worker使用broker進(jìn)行通信华匾,詳情可以瀏覽celery.app.control.Control類的定義映琳,以后會(huì)進(jìn)行分析。
Gossip
class Gossip(bootsteps.ConsumerStep):
"""Bootstep consuming events from other workers.
This keeps the logical clock value up to date.
"""
def start(self, c):
super(Gossip, self).start(c)
self.dispatcher = c.event_dispatcher
Gossip用于處理其他Worker的事件,用于與其他Worker進(jìn)行通信萨西。
Heart
class Heart(bootsteps.StartStopStep):
"""Bootstep sending event heartbeats.
This service sends a ``worker-heartbeat`` message every n seconds.
用于發(fā)送心跳信息
Note:
Not to be confused with AMQP protocol level heartbeats.
"""
def start(self, c):
c.heart = heartbeat.Heart(
c.timer, c.event_dispatcher, self.heartbeat_interval,
)
c.heart.start()
Heart的主要功能是用于發(fā)送信條信息黍瞧,在start函數(shù)中創(chuàng)建了celery.worker.heartbeat.Heart類的實(shí)例,并調(diào)用了該實(shí)例的start函數(shù)
Tasks
class Tasks(bootsteps.StartStopStep):
"""Bootstep starting the task message consumer."""
def start(self, c):
"""Start task consumer."""
c.update_strategies()
# - RabbitMQ 3.3 completely redefines how basic_qos works..
# This will detect if the new qos smenatics is in effect,
# and if so make sure the 'apply_global' flag is set on qos updates.
qos_global = not c.connection.qos_semantics_matches_spec
# set initial prefetch count
c.connection.default_channel.basic_qos(
0, c.initial_prefetch_count, qos_global,
)
c.task_consumer = c.app.amqp.TaskConsumer(
c.connection, on_decode_error=c.on_decode_error,
) # 創(chuàng)建Consumer
def set_prefetch_count(prefetch_count):
return c.task_consumer.qos(
prefetch_count=prefetch_count,
apply_global=qos_global,
)
c.qos = QoS(set_prefetch_count, c.initial_prefetch_count) # 創(chuàng)建QoS
Tasks類用于創(chuàng)建消息的Consumer以及QoS原杂,這里用到的Consumer以及QoS均為kombu庫所提供的類。
Control
class Pidbox(object):
"""Worker mailbox."""
def start(self, c):
self.node.channel = c.connection.channel() # 獲取信道
self.consumer = self.node.listen(callback=self.on_message) # 監(jiān)聽信道
self.consumer.on_decode_error = c.on_decode_error
Control類啟動(dòng)的是celery.worker.pidbox.Pidbox類的實(shí)例 這里可以看到Pidbox所做的工作即為創(chuàng)建信道之后對信道進(jìn)行監(jiān)聽您机,若收到消息后則回調(diào)相應(yīng)的函數(shù)進(jìn)行處理
Event loop
class Evloop(bootsteps.StartStopStep):
"""Event loop service.
Note:
This is always started last.
"""
label = 'event loop'
last = True
def start(self, c):
self.patch_all(c)
c.loop(*c.loop_args())
def patch_all(self, c):
c.qos._mutex = DummyLock()
這里可以看到Evloop的代碼極為簡單穿肄,主要部分是啟動(dòng)事件循環(huán),并且該組件需要在最后啟動(dòng)