前言
每個 L3 Agent 運行在一個 network namespace 中哈肖,以 qrouter-<router-UUID>命名躬拢。網(wǎng)絡(luò)節(jié)點如果不支持 Linux namespace 的躲履,只能運行一個 Virtual Router。通過配置項use_namespaces = True開啟namespace估灿。本文只做單純的分析代碼崇呵,研究了neutron的l3-agent代碼(m版本),代碼路徑為/neutron/agent/l3/agent.py馅袁。主要的類是:
- class L3PluginApi(object): l3-agent的rpc接口域慷,用于回復(fù)和查詢l3-plugin的。
- class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback,ha.AgentMixin,dvr.AgentMixin,manager.Manager): l3-agent實現(xiàn)類。
主要來說說L3NATAgent犹褒。該類在運行的時候抵窒,會啟動兩個周期任務(wù):
- periodic_sync_routers_task:周期性獲取router列表,并將需要操作的router加入到隊列中叠骑;
- _process_routers_loop:從上個任務(wù)中獲取隊列中的router李皇,并進行處理(增刪和更新)。
接下來依次分析這兩個任務(wù)宙枷。
periodic_sync_routers_task
注意該類繼承了FWaaSL3AgentRpcCallback和Manager
兩個類掉房。Manager類里有個方法periodic_tasks
,運行的是父類的方法run_periodic_tasks
:
class Manager(periodic_task.PeriodicTasks):
# Set RPC API version to 1.0 by default.
target = oslo_messaging.Target(version='1.0')
def periodic_tasks(self, context, raise_on_error=False):
self.run_periodic_tasks(context, raise_on_error=raise_on_error)
在neutron/service.py里的類Service慰丛,啟動start
的時候卓囚,會加載Manager的periodic_tasks方法,并設(shè)置周期時間:
class Service(n_rpc.Service):
def start(self):
self.manager.init_host()
super(Service, self).start()
if self.report_interval:
pulse = loopingcall.FixedIntervalLoopingCall(self.report_state)
pulse.start(interval=self.report_interval,initial_delay=self.report_interval)
self.timers.append(pulse)
if self.periodic_interval:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
periodic = loopingcall.FixedIntervalLoopingCall(self.periodic_tasks)
periodic.start(interval=self.periodic_interval,initial_delay=initial_delay)
self.timers.append(periodic)
self.manager.after_start()
def periodic_tasks(self, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
ctxt = context.get_admin_context()
self.manager.periodic_tasks(ctxt, raise_on_error=raise_on_error)
然后說回L3NATAgent類诅病,它定義了一個方法periodic_sync_routers_task
哪亿,被@periodic_task.periodic_task(spacing=1, run_immediately=True)進行了裝飾,可以理解為間隔時間為1秒運行一次任務(wù)贤笆。periodic_task是oslo_service/periodic_task.py里的裝飾器方法蝇棉,用于將被裝飾的方法聲明為一個周期性任務(wù);而該文件里還有一個方法也就是上文提到的run_periodic_tasks
芥永,用于運行所有的周期任務(wù):
# NOTE(kevinbenton): this is set to 1 second because the actual interval
# is controlled by a FixedIntervalLoopingCall in neutron/service.py that
# is responsible for task execution.
@periodic_task.periodic_task(spacing=1, run_immediately=True)
def periodic_sync_routers_task(self, context):
...
try:
with self.namespaces_manager as ns_manager:
self.fetch_and_sync_all_routers(context, ns_manager)
periodic_sync_routers_task執(zhí)行了一個任務(wù)fetch_and_sync_all_routers
篡殷,即是通過l3-plugin RPC獲取到router列表,然后將需要進行操作的router加入到 _queue 中恤左。理解了這個贴唇,就能明白接下來的一個線程循環(huán)任務(wù) _process_routers_loop。
_process_routers_loop
在l3-agent開始啟動過程after_start
中飞袋,還會啟動一個線程來循環(huán)執(zhí)行一個任務(wù)_process_routers_loop
:
def after_start(self):
# Note: the FWaaS' vArmourL3NATAgent is a subclass of L3NATAgent. It
# calls this method here. So Removing this after_start() would break
# vArmourL3NATAgent. We need to find out whether vArmourL3NATAgent
# can have L3NATAgentWithStateReport as its base class instead of
# L3NATAgent.
eventlet.spawn_n(self._process_routers_loop)
LOG.info(_LI("L3 agent started"))
def _process_routers_loop(self):
LOG.debug("Starting _process_routers_loop")
pool = eventlet.GreenPool(size=8)
while True:
pool.spawn_n(self._process_router_update)
再來看看方法_process_router_update
戳气,方法一開始就會通過_queue.each_update_to_next_router從上一個任務(wù)的隊列里獲取需要進行操作的router信息。然后會針對不通的action巧鸭,進行一些處理瓶您,真正進行router操作的是_process_router_if_compatible
:
def _process_router_update(self):
for rp, update in self._queue.each_update_to_next_router():
LOG.debug("Starting router update for %s, action %s, priority %s",
update.id, update.action, update.priority)
if update.action == queue.PD_UPDATE:
self.pd.process_prefix_update()
LOG.debug("Finished a router update for %s", update.id)
continue
router = update.router
......
try:
self._process_router_if_compatible(router)
......
LOG.debug("Finished a router update for %s", update.id)
rp.fetched_and_processed(update.timestamp)
_process_router_if_compatible方法的主要實現(xiàn),在最后:
def _process_router_if_compatible(self, router):
......
if router['id'] not in self.router_info:
self._process_added_router(router)
else:
self._process_updated_router(router)
以_process_updated_router
為例:
def _process_updated_router(self, router):
ri = self.router_info[router['id']]
ri.router = router
registry.notify(resources.ROUTER, events.BEFORE_UPDATE,self, router=ri)
ri.process(self)
registry.notify(resources.ROUTER, events.AFTER_UPDATE, self, router=ri)
針對router的所有操作都在neutron/agent/l3/router_info.py中的方法process
里纲仍。具體細節(jié)在這里就不做分析了呀袱,包括對external-port,internal-port和floating ip的一系列處理郑叠。
個人分析夜赵,歡迎指正,若轉(zhuǎn)載請注明出處
歡迎訪問我的主頁