入口
首先程序的入口為gunicorn/app/wsgiapp
這個模塊。
def run():
"""\
The ``gunicorn`` command line runner for launching Gunicorn with
generic WSGI applications.
"""
from gunicorn.app.wsgiapp import WSGIApplication
WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()
if __name__ == '__main__':
run()
WSGIApplication
這個類繼承于Application
,然后繼承于BaseApplication
.而且這三個類只有BaseApplication
是有構(gòu)造函數(shù)的。
def __init__(self, usage=None, prog=None):
self.usage = usage
self.cfg = None
self.callable = None
self.prog = prog
self.logger = None
self.do_load_config()
這里面useage, prog
就是兩個字符串,忽略味榛,其他的下面分析类早。賦值完后進(jìn)入do_load_config
方法锌唾。這個方法做了兩件事灼伤,第一件是將一個Config對象賦值給self.cfg
參數(shù)触徐。這個對象可以從命令行中解析參數(shù),將一些配置綁定狐赡。第二件是調(diào)用一個在Application
中才實現(xiàn)的方法load_config
撞鹉。這個方法通過各種途徑將參數(shù)綁定到cfg對象中,其中包括調(diào)用一次WSGIApplicagion
的init方法,同樣也是綁定相關(guān)參數(shù)猾警。
但這里有個比較神奇的技巧孔祸,關(guān)于cfg的,一開始沒看懂发皿,看到后來發(fā)現(xiàn)cfg中包含了很多可以使用的方法崔慧,卻不知道是什么時候偷偷綁定上來的。現(xiàn)在來仔細(xì)看一下穴墅,之前說過了惶室,cfg就是一個Config
對象。
KNOWN_SETTINGS = []
def make_settings(ignore=None):
settings = {}
ignore = ignore or ()
for s in KNOWN_SETTINGS:
setting = s()
if setting.name in ignore:
continue
settings[setting.name] = setting.copy()
return settings
class Config(object):
def __init__(self, usage=None, prog=None):
self.settings = make_settings()
...
目前來看玄货,KNOWN_SETTINGS
是一個空列表皇钞,所以self.settings
也應(yīng)該是一個空字典。但其實不然松捉。
class SettingMeta(type):
def __new__(cls, name, bases, attrs):
super_new = super(SettingMeta, cls).__new__
parents = [b for b in bases if isinstance(b, SettingMeta)]
if not parents:
return super_new(cls, name, bases, attrs)
attrs["order"] = len(KNOWN_SETTINGS)
attrs["validator"] = staticmethod(attrs["validator"])
new_class = super_new(cls, name, bases, attrs)
new_class.fmt_desc(attrs.get("desc", ""))
KNOWN_SETTINGS.append(new_class)
return new_class
class Setting(object):
pass
Setting = SettingMeta('Setting', (Setting,), {})
class Workers(Setting):
name = 'xxx'
...
validator = xxx
pass
config.py這個模塊中還有很多個類似Workers
一樣的類夹界,結(jié)構(gòu)都是差不多的,首先都是繼承Setting
類隘世,而Setting
類是一個由SettingMeta
創(chuàng)造出來的類可柿,大家應(yīng)該都知道創(chuàng)造類是new這個方法來完成的,這里也不例外丙者,在new方法中复斥,通過type這個元類來生成一個新的類,并通過attrs["validator"] = staticmethod(attrs["validator"])
來給類綁定一個方法械媒。同時將新的Setting
類加入KNOWN_SETTINGS
中目锭,這樣后續(xù)定義的類似Workers
的類都會被加入列表中,從而綁定到cfg這個對象上纷捞。
簡單的說痢虹,在調(diào)用run方法之前,初始化了一些參數(shù)主儡,主要是給cfg這個對象綁定了很多熟悉和方法奖唯。
run方法
run
方法最終調(diào)用的是Arbiter
對象的run方法,創(chuàng)建Arbiter
對象時傳入Application
對象作為參數(shù)缀辩。根據(jù)類注釋臭埋,可以很清楚的了解這個類的主要作用。
class Arbiter(object):
"""
Arbiter maintain the workers processes alive. It launches or
kills them if needed. It also manages application reloading
via SIGHUP/USR2.
"""
...
def run(self):
"Main master loop."
self.start()
...
try:
self.manage_workers()
while True:
self.maybe_promote_master()
...
...
except Exception:
...
sys.exit(-1)
在Arbiter
的run方法中臀玄,先調(diào)用start()來創(chuàng)建socket監(jiān)聽,然后通過manage_workers()來控制worker的數(shù)量瓢阴,現(xiàn)在來看下manage_workers
的代碼。
def manage_workers(self):
"""\
Maintain the number of workers by spawning or killing
as required.
"""
if len(self.WORKERS.keys()) < self.num_workers:
self.spawn_workers()
workers = self.WORKERS.items()
workers = sorted(workers, key=lambda w: w[1].age)
while len(workers) > self.num_workers:
(pid, _) = workers.pop(0)
self.kill_worker(pid, signal.SIGTERM)
...
def spawn_workers(self):
"""\
Spawn new workers as needed.
This is where a worker process leaves the main loop
of the master process.
"""
for _ in range(self.num_workers - len(self.WORKERS.keys())):
self.spawn_worker()
time.sleep(0.1 * random.random())
def spawn_worker(self):
self.worker_age += 1
worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
self.app, self.timeout / 2.0,
self.cfg, self.log)
self.cfg.pre_fork(self, worker)
pid = os.fork()
if pid != 0:
worker.pid = pid
self.WORKERS[pid] = worker
return pid
# Do not inherit the temporary files of other workers
for sibling in self.WORKERS.values():
sibling.tmp.close()
# Process Child
worker.pid = os.getpid()
try:
util._setproctitle("worker [%s]" % self.proc_name)
self.log.info("Booting worker with pid: %s", worker.pid)
self.cfg.post_fork(self, worker)
worker.init_process()
sys.exit(0)
except SystemExit:
raise
...
如果worker少于cfg.num_workers健无,調(diào)用spawn_workers方法增加worker數(shù)量荣恐,增加的方法就是os.fork()。
如果數(shù)量大于cfg.num_workers累贤,根據(jù)worker.age的屬性排序后kill一個worker叠穆。
我們主要看下增加worker的過程,增加worker是通過調(diào)用os.fork來實現(xiàn)的臼膏,調(diào)用os.fork的進(jìn)程稱為主進(jìn)程硼被,生成的進(jìn)程稱為子進(jìn)程,對于這兩個進(jìn)程渗磅,os.fork的返回值是不一樣的嚷硫,子進(jìn)程的返回值是0,父進(jìn)程返回的是子進(jìn)程的進(jìn)程id始鱼。所以如果是主進(jìn)程則記錄子進(jìn)程id后返回到run里的無限循環(huán)仔掸。如果是子進(jìn)程,則成為一個worker進(jìn)程医清,執(zhí)行worker.init_process()起暮。正常情況不會執(zhí)行sys.exit(0)
語句。
我們現(xiàn)在回到剛才os.fork的主進(jìn)程会烙,他執(zhí)行完os.fork后就返回到run里的無限循環(huán).
try:
self.manage_workers()
while True:
self.maybe_promote_master()
sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
if sig is None:
self.sleep()
self.murder_workers()
self.manage_workers()
continue
if sig not in self.SIG_NAMES:
self.log.info("Ignoring unknown signal: %s", sig)
continue
signame = self.SIG_NAMES.get(sig)
handler = getattr(self, "handle_%s" % signame, None)
if not handler:
self.log.error("Unhandled signal: %s", signame)
continue
self.log.info("Handling signal: %s", signame)
handler()
self.wakeup()
except StopIteration:
self.halt()
except KeyboardInterrupt:
self.halt()
except HaltServer as inst:
self.halt(reason=inst.reason, exit_status=inst.exit_status)
except SystemExit:
raise
except Exception:
self.log.info("Unhandled exception in main loop",
exc_info=True)
self.stop(False)
if self.pidfile is not None:
self.pidfile.unlink()
sys.exit(-1)
主進(jìn)程在執(zhí)行maybe_promote_master
方法负懦,將自己標(biāo)識為master進(jìn)程,然后根據(jù)信號量來進(jìn)行一些控制進(jìn)程的操作持搜。如果信號量為空密似,則通過sleep方法進(jìn)入睡眠狀態(tài),sleep的代碼是這樣的:
def sleep(self):
"""\
Sleep until PIPE is readable or we timeout.
A readable PIPE means a signal occurred.
"""
try:
ready = select.select([self.PIPE[0]], [], [], 1.0)
if not ready[0]:
return
while os.read(self.PIPE[0], 1):
pass
except (select.error, OSError) as e:
# TODO: select.error is a subclass of OSError since Python 3.3.
error_number = getattr(e, 'errno', e.args[0])
if error_number not in [errno.EAGAIN, errno.EINTR]:
raise
except KeyboardInterrupt:
sys.exit()
循環(huán)的監(jiān)聽管道葫盼,如果有信號量就退出循環(huán)残腌,關(guān)于select這一塊我也不是很清楚。退出循環(huán)后回到上一段的循環(huán)中,首先保持worker的數(shù)量為配置信息里的值贫导,然后讀取信號量的名字抛猫,根據(jù)不同的名字調(diào)用不同的hander方法。之后不斷的重復(fù)孩灯,master進(jìn)程大概就是這樣娩鹉。
Worker進(jìn)程
通過上面的分析,可以看出來worker進(jìn)程才是真正用來處理請求的進(jìn)程羡儿,入口是worker.init_process()
.這個worker的來歷大概是這樣的,worker -> self.work_class(*args) -> self.cfg.worker_class() -> util.load_class()
寨昙。util.load_class接受一個字符串參數(shù),是配置中的worker_class
變量掀亩,默認(rèn)為SyncWorker舔哪。但是也能變成gevent, threadworker等更高效的worker.我們先看下默認(rèn)的SyncWorker的邏輯是怎么樣的。
所有的worker模塊都在gunicorn/workers包中槽棍。SyncWorker
繼承自base.Worker.SyncWorker
的init_process()方法來自于父類捉蚤。
def init_process(self):
"""\
If you override this method in a subclass, the last statement
in the function should be to call this method with
super(MyWorkerClass, self).init_process() so that the ``run()``
loop is initiated.
"""
# set environment' variables
if self.cfg.env:
for k, v in self.cfg.env.items():
os.environ[k] = v
util.set_owner_process(self.cfg.uid, self.cfg.gid,
initgroups=self.cfg.initgroups)
# Reseed the random number generator
util.seed()
# For waking ourselves up
self.PIPE = os.pipe()
for p in self.PIPE:
util.set_non_blocking(p)
util.close_on_exec(p)
# Prevent fd inheritance
for s in self.sockets:
util.close_on_exec(s)
util.close_on_exec(self.tmp.fileno())
self.wait_fds = self.sockets + [self.PIPE[0]]
self.log.close_on_exec()
self.init_signals()
# start the reloader
if self.cfg.reload:
def changed(fname):
self.log.info("Worker reloading: %s modified", fname)
self.alive = False
self.cfg.worker_int(self)
time.sleep(0.1)
sys.exit(0)
reloader_cls = reloader_engines[self.cfg.reload_engine]
self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files,
callback=changed)
self.reloader.start()
self.load_wsgi()
self.cfg.post_worker_init(self)
# Enter main run loop
self.booted = True
self.run()
- init_signals()注冊信號量
- load_wsgi(): self.wsgi = self.app.wsgi(),一般就是python框架里起的app炼七,比如Flask里的
app = Flask(__name__)
. - run(). 現(xiàn)在我們到syncworker的run方法看一看缆巧。
def run(self):
timeout = self.timeout or 0.5
for s in self.sockets:
s.setblocking(0)
if len(self.sockets) > 1:
self.run_for_multiple(timeout)
else:
self.run_for_one(timeout)
def run_for_one(self, timeout):
listener = self.sockets[0]
while self.alive:
self.notify()
try:
self.accept(listener)
continue
except EnvironmentError as e:
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
raise
if not self.is_parent_alive():
return
try:
self.wait(timeout)
except StopWaiting:
return
def run_for_multiple(self, timeout):
while self.alive:
self.notify()
try:
ready = self.wait(timeout)
except StopWaiting:
return
if ready is not None:
for listener in ready:
if listener == self.PIPE[0]:
continue
try:
self.accept(listener)
except EnvironmentError as e:
if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
errno.EWOULDBLOCK):
raise
if not self.is_parent_alive():
return
我把一些注釋刪了,run方法之后進(jìn)入的兩個方法同樣也都是無限循環(huán)豌拙,不斷的接收socket陕悬。accept
方法很簡潔,就是在建立連接的socket上獲取client端的地址等信息按傅,并設(shè)置socket為阻塞的墩莫,也就是同一時間只能處理一個請求。然后調(diào)用handle方法處理請求逞敷,handle方法如下:
def handle(self, listener, client, addr):
req = None
try:
if self.cfg.is_ssl:
client = ssl.wrap_socket(client, server_side=True,
**self.cfg.ssl_options)
parser = http.RequestParser(self.cfg, client)
req = six.next(parser)
self.handle_request(listener, req, client, addr)
except http.errors.NoMoreData as e:
self.log.debug("Ignored premature client disconnection. %s", e)
except StopIteration as e:
self.log.debug("Closing connection. %s", e)
except ssl.SSLError as e:
if e.args[0] == ssl.SSL_ERROR_EOF:
self.log.debug("ssl connection closed")
client.close()
else:
self.log.debug("Error processing SSL request.")
self.handle_error(req, client, addr, e)
except EnvironmentError as e:
if e.errno not in (errno.EPIPE, errno.ECONNRESET):
self.log.exception("Socket error processing request.")
else:
if e.errno == errno.ECONNRESET:
self.log.debug("Ignoring connection reset")
else:
self.log.debug("Ignoring EPIPE")
except Exception as e:
self.handle_error(req, client, addr, e)
finally:
util.close(client)
def handle_request(self, listener, req, client, addr):
environ = {}
resp = None
try:
self.cfg.pre_request(self, req)
request_start = datetime.now()
resp, environ = wsgi.create(req, client, addr,
listener.getsockname(), self.cfg)
# Force the connection closed until someone shows
# a buffering proxy that supports Keep-Alive to
# the backend.
resp.force_close()
self.nr += 1
if self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")
self.alive = False
respiter = self.wsgi(environ, resp.start_response)
try:
if isinstance(respiter, environ['wsgi.file_wrapper']):
resp.write_file(respiter)
else:
for item in respiter:
resp.write(item)
resp.close()
request_time = datetime.now() - request_start
self.log.access(resp, req, environ, request_time)
finally:
if hasattr(respiter, "close"):
respiter.close()
except EnvironmentError:
# pass to next try-except level
six.reraise(*sys.exc_info())
except Exception:
if resp and resp.headers_sent:
# If the requests have already been sent, we should close the
# connection to indicate the error.
self.log.exception("Error handling request")
try:
client.shutdown(socket.SHUT_RDWR)
client.close()
except EnvironmentError:
pass
raise StopIteration()
raise
finally:
try:
self.cfg.post_request(self, req, environ, resp)
except Exception:
self.log.exception("Exception in post_request hook")
了解過wsgi協(xié)議的應(yīng)該知道狂秦,服務(wù)器是如何跟框架交互的。簡單的說就是服務(wù)器會調(diào)用一個方法并傳入兩個參數(shù)推捐,第一個參數(shù)為environ,這個參數(shù)包含了所有請求有關(guān)的信息裂问,比如headers, body等等。第二個參數(shù)是一個回調(diào)函數(shù)牛柒,后臺服務(wù)處理完業(yè)務(wù)后調(diào)用這個函數(shù)將response傳給服務(wù)器堪簿,服務(wù)器再傳給客戶端。但是這里還有很多細(xì)節(jié)皮壁,水平有限椭更,看不大明白,但是整體的流程應(yīng)該還是很清楚蛾魄。所以這里先parser了http請求的相關(guān)信息虑瀑,保存在environ中,然后生成回調(diào)函數(shù)resp.strt_response滴须,然后調(diào)用wsgi(environ, resp.start_response)舌狗。這里的wsgi就是框架中的app.
GeventWorker
我最近接觸到的是配合gevent起一個服務(wù),所以我也分析一下geventworker的邏輯扔水。首先geventworker繼承自asyncworker痛侍,asyncworker繼承自base.worker。上面提到了魔市,默認(rèn)的worker是一個阻塞的模型主届,同一時間只能處理一個請求赵哲,所以效率比較低,生產(chǎn)環(huán)境一般不會使用君丁。
AsyncWorker
AsyncWorker
的構(gòu)造函數(shù)先是調(diào)用了父類的構(gòu)造函數(shù)誓竿,然后又添加了一個額外的參數(shù)worker_connections
,這個參數(shù)也是在cfg中設(shè)置的谈截,且只在eventlet
和gevent
兩種模式下起作用,作用是限制最大的同時的客戶端連接數(shù)涧偷。
前面的SyncWorker的init_process是繼承自worker簸喂。但是GeventWorker重寫了這個方法。用過gevent的應(yīng)該知道燎潮,gevent底層實現(xiàn)的方法叫做猴子補丁-monkey_patch喻鳄。修改了大多數(shù)的底層庫,將一些阻塞的底層實現(xiàn)确封,重新?lián)Q成非阻塞的除呵。所以GeventWorker先是打上補丁,然后調(diào)用worker的init_process方法爪喘,最終進(jìn)入GeventWorker的run方法開始執(zhí)行處理請求任務(wù)颜曾。run方法代碼如下:
def run(self):
...
for s in self.sockets:
s.setblocking(1)
pool = Pool(self.worker_connections)
if self.server_class is not None:
environ = base_environ(self.cfg)
environ.update({
"wsgi.multithread": True,
"SERVER_SOFTWARE": VERSION,
})
server = self.server_class(
s, application=self.wsgi, spawn=pool, log=self.log,
handler_class=self.wsgi_handler, environ=environ,
**ssl_args)
else:
hfun = partial(self.handle, s)
server = StreamServer(s, handle=hfun, spawn=pool, **ssl_args)
server.start()
servers.append(server)
while self.alive:
self.notify()
gevent.sleep(1.0)
try:
# Stop accepting requests
for server in servers:
if hasattr(server, 'close'): # gevent 1.0
server.close()
if hasattr(server, 'kill'): # gevent < 1.0
server.kill()
# Handle current requests until graceful_timeout
ts = time.time()
while time.time() - ts <= self.cfg.graceful_timeout:
accepting = 0
for server in servers:
if server.pool.free_count() != server.pool.size:
accepting += 1
# if no server is accepting a connection, we can exit
if not accepting:
return
self.notify()
gevent.sleep(1.0)
# Force kill all active the handlers
self.log.warning("Worker graceful timeout (pid:%s)" % self.pid)
for server in servers:
server.stop(timeout=1)
except:
pass
- 創(chuàng)建tcpServer。并用pool限制了最大連接數(shù)秉剑。這個server的實現(xiàn)在gevent中泛豪,沒看懂。
- hfun這個方法侦鹏,是一個綁定了參數(shù)的handle诡曙,是asyncWorker的handle。過程跟前面的同步的差不多略水。但是遇到阻塞是gevent會幫助切換价卤,所以提高了并發(fā)量。
- 創(chuàng)建完server進(jìn)入無限循環(huán)渊涝,notify網(wǎng)上查了一下說是給Arbiter發(fā)信號的慎璧,這里我不大懂。
總結(jié)
gunicorn代碼比較多跨释,且有很多底層的東西炸卑。很多地方不懂,都跳過了煤傍,分析可能也有很多錯誤盖文,看到可以指出。
相比于之前看過的flask蚯姆、request五续、tornado等等洒敏。gunicorn顯然難很多,也沒有那么清晰疙驾,有很多方法凶伙,參數(shù)來的不明不白;而且跟gevent牽扯很大它碎,gevent的代碼更加難懂函荣。
但應(yīng)該還是有點收獲吧,雖然暫時沒察覺到~