Openstack服務(wù)的啟動(dòng)機(jī)制

從今天開始將陸陸續(xù)續(xù)發(fā)表一些openstack相關(guān)的文章措嵌。

openstack服務(wù)的啟動(dòng)

基本所有的openstack服務(wù)都依賴 evenlet 完成各種并發(fā)任務(wù),它的進(jìn)程可分為兩類:
1蔬捷、?WSGIService: 接收和處理 http 請(qǐng)求,依賴eventlet.wsgi?的?wsgi server?處理 http 請(qǐng)求榔袋,比如nova-api
2周拐、?Service: 接收和處理 rpc 請(qǐng)求,如?nova-operation
無論是?WSGIService?還是?Service?類型的進(jìn)程凰兑,每當(dāng)接收到一個(gè)請(qǐng)求(http 或 rpc)妥粟,都會(huì)在線程池中分配一個(gè)協(xié)程處理該請(qǐng)求

一、WSGIService的啟動(dòng)

下面以nova服務(wù)為例吏够。
nova-api 由?nova/cmd/api.py?啟動(dòng)勾给,它初始化一個(gè) WSGIService(由?service.py?定義) 對(duì)象滩报。

def main():
objects.register_all()
CONF(sys.argv[1:], project='nova',
version=version.version_string())
logging.setup(CONF, "nova")

rpc.init(CONF)
launcher = service.get_launcher()
server = service.WSGIService('osapi_nova')
launcher.launch_service(server, workers=server.workers)
launcher.wait()

api中從service層獲取一個(gè)啟動(dòng)器對(duì)象,最后將server對(duì)象傳入啟動(dòng)器對(duì)象的launch_service方法中锦秒,launch_service(server, workers=server.workers)方法定義如下:

class Launcher(object):
def __init__(self):
super(Launcher, self).__init__()
self.launch_service = serve
self.wait = wait

該方法被引用到serve方法露泊,serve方法定義如下:

def serve(server, workers=None):
global _launcher
if _launcher:
raise RuntimeError(_('serve() can only be called once'))

_launcher = service.launch(CONF, server, workers=workers)

最終調(diào)用了oslo_service/service.py下的launch方法,launch方法定義如下:

def launch(conf, service, workers=1, restart_method='reload'):


if workers is not None and workers <= 0:
raise ValueError(_("Number of workers should be positive!"))

if workers is None or workers == 1:
launcher = ServiceLauncher(conf, restart_method=restart_method)
else:
launcher = ProcessLauncher(conf, restart_method=restart_method)
launcher.launch_service(service, workers=workers)

可以看到這里使用到了兩種啟動(dòng)器旅择,在進(jìn)一步講解啟動(dòng)的過程中先介紹下openstack中的啟動(dòng)器

二惭笑、Openstack中的Launcher

Openstack中有一個(gè)叫Launcher的概念,即專門用來啟動(dòng)服務(wù)的生真,這個(gè)類被放在了oslo_service這個(gè)包里面沉噩,Launcher分為兩種:
一種是ServiceLauncher
另一種為ProcessLauncher柱蟀。
ServiceLauncher用來啟動(dòng)單進(jìn)程的服務(wù)川蒙;而ProcessLauncher用來啟動(dòng)有多個(gè)worker子進(jìn)程的服務(wù),如各類api服務(wù)(nova-api长已、cinder-api)等

oslo_service/service.py

1畜眨、ServiceLauncher

ServiceLauncher繼承自Launcher,啟動(dòng)服務(wù)的一個(gè)重要成員就是launcher_service术瓮,ServiceLauncher的該成員就是繼承于Launcher

def launch_service(self, service, workers=1):

if workers is not None and workers != 1:
raise ValueError(_("Launcher asked to start multiple workers"))
_check_service_base(service)
service.backdoor_port = self.backdoor_port
self.services.add(service)

aucher_service就是將服務(wù)添加到self.services成員里面康聂,services成員的類型是class Services,看看它的add方法

class Services(object):

def __init__(self):
self.services = []
self.tg = threadgroup.ThreadGroup()
self.done = event.Event()

def add(self, service):
"""Add a service to a list and create a thread to run it.

:param service: service to run
"""
self.services.append(service)
self.tg.add_thread(self.run_service, service, self.done)

Services這個(gè)類的初始化很簡(jiǎn)單胞四,即創(chuàng)建一個(gè)ThreadGroup恬汁,ThreadGroup其實(shí)是eventlet的GreenPool,Openstack利用eventlet實(shí)現(xiàn)并發(fā)辜伟,add方法氓侧,將self.run_service這個(gè)方法放入pool中,而service就是它的參數(shù)导狡。run_service方法很簡(jiǎn)單约巷,就是調(diào)用service的start方法,這樣就完成了服務(wù)的啟動(dòng)

2旱捧、ProcessLauncher

ProcessLauncher直接繼承于Object载庭,同樣也有l(wèi)aunch_service方法

def launch_service(self, service, workers=1):

_check_service_base(service)
wrap = ServiceWrapper(service, workers)

LOG.info('Starting %d workers', wrap.workers)
while self.running and len(wrap.children) < wrap.workers:
self._start_child(wrap)

lauch_service除了接受service以外,還需要接受一個(gè)workers參數(shù)廊佩,即子進(jìn)程的個(gè)數(shù),然后調(diào)用_start_child啟動(dòng)多個(gè)子進(jìn)程

def _start_child(self, wrap):
if len(wrap.forktimes) > wrap.workers:
# Limit ourselves to one process a second (over the period of
# number of workers * 1 second). This will allow workers to
# start up quickly but ensure we don't fork off children that
# die instantly too quickly.
if time.time() - wrap.forktimes[0] < wrap.workers:
LOG.info('Forking too fast, sleeping')
time.sleep(1)

wrap.forktimes.pop(0)

wrap.forktimes.append(time.time())

pid = os.fork()
if pid == 0:
self.launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(
self.launcher)
if not _is_sighup_and_daemon(signo):
self.launcher.wait()
break
self.launcher.restart()

os._exit(status)

LOG.debug('Started child %d', pid)

wrap.children.add(pid)
self.children[pid] = wrap

看見熟悉的fork沒有靖榕,只是簡(jiǎn)單的調(diào)用了一個(gè)os.fork()标锄,然后子進(jìn)程開始運(yùn)行,子進(jìn)程調(diào)用_child_process

def _child_process(self, service):
self._child_process_handle_signal()

# Reopen the eventlet hub to make sure we don't share an epoll
# fd with parent and/or siblings, which would be bad
eventlet.hubs.use_hub()

# Close write to ensure only parent has it open
os.close(self.writepipe)
# Create greenthread to watch for parent to close pipe
eventlet.spawn_n(self._pipe_watcher)

# Reseed random number generator
random.seed()

launcher = Launcher(self.conf, restart_method=self.restart_method)
launcher.launch_service(service)
return launcher

_child_process其實(shí)很簡(jiǎn)單茁计,創(chuàng)建一個(gè)Launcher料皇,調(diào)用Laucher.launch_service方法谓松,前面介紹過,其實(shí)ServiceLauncher繼承自Launcher践剂,也是調(diào)用的launcher_service方法鬼譬,將服務(wù)啟動(dòng),因此接下來的步驟可以參考前面逊脯,最終都將調(diào)用service.start方法啟動(dòng)服務(wù)

三优质、WSGIService的啟動(dòng)—續(xù)

回到前面的啟動(dòng)部分,從launcher節(jié)的說明军洼,我們知道服務(wù)的啟動(dòng)最終調(diào)用了service的start方法巩螃,而這里的service就是我們最開始在api.py中創(chuàng)建的service,然后一層層傳進(jìn)后面的啟動(dòng)器中的匕争,我們繼續(xù)回到WSGIService類中的start(self)方法

def start(self):

if self.manager:
self.manager.init_host()
self.server.start()
self.port = self.server.port

這里調(diào)用了oslo_service/wsgi.py中的start(self)方法

def start(self):

self.dup_socket = self.socket.dup()

if self._use_ssl:
self.dup_socket = sslutils.wrap(self.conf, self.dup_socket)

wsgi_kwargs = {
'func': eventlet.wsgi.server,
'sock': self.dup_socket,
'site': self.app,
'protocol': self._protocol,
'custom_pool': self._pool,
'log': self._logger,
'log_format': self.conf.wsgi_log_format,
'debug': False,
'keepalive': self.conf.wsgi_keep_alive,
'socket_timeout': self.client_socket_timeout
}

if self._max_url_len:
wsgi_kwargs['url_length_limit'] = self._max_url_len

self._server = eventlet.spawn(**wsgi_kwargs)

注意 wsgi_kwargs 中的參數(shù) func避乏,它的值為 eventlet.wsgi.server,在?eventlet/wsgi.py?的定義如下:

def server(sock, site,

try:
serv.log.info("(%s) wsgi starting up on %s" % (
serv.pid, socket_repr(sock)))
while is_accepting:
try:
client_socket = sock.accept()
client_socket[0].settimeout(serv.socket_timeout)
serv.log.debug("(%s) accepted %r" % (
serv.pid, client_socket[1]))
try:
pool.spawn_n(serv.process_request, client_socket)
except AttributeError:
warnings.warn("wsgi's pool should be an instance of "
"eventlet.greenpool.GreenPool, is %s. Please convert your"
" call site to use GreenPool instead" % type(pool),
DeprecationWarning, stacklevel=2)
pool.execute_async(serv.process_request, client_socket)
except ACCEPT_EXCEPTIONS as e:
if support.get_errno(e) not in ACCEPT_ERRNO:
raise
except (KeyboardInterrupt, SystemExit):
serv.log.info("wsgi exiting")
break
finally:
pool.waitall() …

看甘桑,是不是看到熟悉的一幕了拍皮!sock.accept() 監(jiān)聽請(qǐng)求,每當(dāng)接收到一個(gè)新請(qǐng)求跑杭,調(diào)用 pool.spawn_n() 啟動(dòng)一個(gè)協(xié)程處理該請(qǐng)求

四铆帽、Service的啟動(dòng)

Service 類型的進(jìn)程同樣由 nova/cmd/* 目錄下某些文件創(chuàng)建:

  • nova-schedule: nova/cmd/schedule.py

  • ……
    作為消息中間件的消費(fèi)者,它們監(jiān)聽各自的 queue艘蹋,每當(dāng)有 rpc 請(qǐng)求來臨時(shí)锄贼,它們創(chuàng)建一個(gè)新的協(xié)程處理 rpc 請(qǐng)求。以nova-schedule為例女阀,啟動(dòng)時(shí)初始化一個(gè) Server(由?service.py?定義) 對(duì)象宅荤。整個(gè)Launcher過程跟WSGIServer一樣,只是service的start()有些區(qū)別而已

def start(self):

target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
endpoints.extend(self.manager.additional_endpoints)
serializer = objects_base.KarborObjectSerializer()
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()

經(jīng)過層層調(diào)用浸策,最終生成了這樣一個(gè)RPCServer對(duì)象

class RPCServer(msg_server.MessageHandlingServer):
def __init__(self, transport, target, dispatcher, executor='blocking'):
super(RPCServer, self).__init__(transport, dispatcher, executor)
self._target = target

該類繼承自MessageHandlingServer冯键;
注:nova 的各個(gè)組件都依賴 oslo.messaging 訪問消息服務(wù)器,通過?oslo/messaging/server.py?初始化一個(gè) MessageHandlingServer 的對(duì)象庸汗,監(jiān)聽消息隊(duì)列惫确。最終調(diào)用了該service的start方法

def start(self, override_pool_size=None):

if self._started:
LOG.warning(_LW('Restarting a MessageHandlingServer is inherently '
'racy. It is deprecated, and will become a noop '
'in a future release of oslo.messaging. If you '
'need to restart MessageHandlingServer you should '
'instantiate a new object.'))
self._started = True

executor_opts = {}

if self.executor_type in ("threading", "eventlet"):
executor_opts["max_workers"] = (
override_pool_size or self.conf.executor_thread_pool_size
)
self._work_executor = self._executor_cls(**executor_opts)

try:
self.listener = self._create_listener()
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)

# HACK(sileht): We temporary pass the executor to the rabbit
# listener to fix a race with the deprecated blocking executor.
# We do this hack because this is need only for 'synchronous'
# executor like blocking. And this one is deprecated. Making
# driver working in an sync and an async way is complicated
# and blocking have 0% tests coverage.
if hasattr(self.listener, '_poll_style_listener'):
l = self.listener._poll_style_listener
if hasattr(l, "_message_operations_handler"):
l._message_operations_handler._executor = (
self.executor_type)

self.listener.start(self._on_incoming)

上述的對(duì)象又初始化一個(gè) EventletExecutor(由?oslo/messaging/_executors/impl_eventlet.py) 類型的 excuete 對(duì)象,它調(diào)用?self.listener.poll()?監(jiān)聽 rpc 請(qǐng)求蚯舱,每當(dāng)接收到一個(gè)請(qǐng)求改化,創(chuàng)建一個(gè)協(xié)程處理該請(qǐng)求。

class EventletExecutor(base.ExecutorBase):
......

def start(self):
if self._thread is not None:
return

@excutils.forever_retry_uncaught_exceptions
def _executor_thread():
try:
while True:
incoming = self.listener.poll()
spawn_with(ctxt=self.dispatcher(incoming),
pool=self._greenpool)
except greenlet.GreenletExit:
return

self._thread = eventlet.spawn(_executor_thread)

博客:https://tunsuy.github.io/

github:https://github.com/tunsuy

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末枉昏,一起剝皮案震驚了整個(gè)濱河市陈肛,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌兄裂,老刑警劉巖句旱,帶你破解...
    沈念sama閱讀 212,383評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件阳藻,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡谈撒,警方通過查閱死者的電腦和手機(jī)腥泥,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來啃匿,“玉大人蛔外,你說我怎么就攤上這事×⒁耍” “怎么了冒萄?”我有些...
    開封第一講書人閱讀 157,852評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)橙数。 經(jīng)常有香客問我尊流,道長(zhǎng),這世上最難降的妖魔是什么灯帮? 我笑而不...
    開封第一講書人閱讀 56,621評(píng)論 1 284
  • 正文 為了忘掉前任崖技,我火速辦了婚禮,結(jié)果婚禮上钟哥,老公的妹妹穿的比我還像新娘迎献。我一直安慰自己,他們只是感情好腻贰,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,741評(píng)論 6 386
  • 文/花漫 我一把揭開白布吁恍。 她就那樣靜靜地躺著,像睡著了一般播演。 火紅的嫁衣襯著肌膚如雪冀瓦。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,929評(píng)論 1 290
  • 那天写烤,我揣著相機(jī)與錄音翼闽,去河邊找鬼。 笑死洲炊,一個(gè)胖子當(dāng)著我的面吹牛感局,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播暂衡,決...
    沈念sama閱讀 39,076評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼询微,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了狂巢?” 一聲冷哼從身側(cè)響起拓提,我...
    開封第一講書人閱讀 37,803評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎隧膘,沒想到半個(gè)月后代态,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,265評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡疹吃,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,582評(píng)論 2 327
  • 正文 我和宋清朗相戀三年蹦疑,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片萨驶。...
    茶點(diǎn)故事閱讀 38,716評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡歉摧,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出腔呜,到底是詐尸還是另有隱情叁温,我是刑警寧澤,帶...
    沈念sama閱讀 34,395評(píng)論 4 333
  • 正文 年R本政府宣布核畴,位于F島的核電站膝但,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏谤草。R本人自食惡果不足惜跟束,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,039評(píng)論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望丑孩。 院中可真熱鬧冀宴,春花似錦、人聲如沸温学。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)仗岖。三九已至逃延,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間箩帚,已是汗流浹背真友。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評(píng)論 1 266
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留紧帕,地道東北人询筏。 一個(gè)月前我還...
    沈念sama閱讀 46,488評(píng)論 2 361
  • 正文 我出身青樓琅轧,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子夺鲜,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,612評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容