Tornado是一個高性能非阻塞的web框架音五,利用非阻塞和epoll,每秒可以處理數(shù)以千計的連接羔沙。Torando框架的源碼代碼簡潔躺涝,實現(xiàn)精巧,結(jié)構(gòu)清晰扼雏,demo豐富坚嗜,適合進(jìn)一步閱讀研究。
Tornado框架包括了底層IO的實現(xiàn)诗充,TCP傳輸控制的實現(xiàn)苍蔬,HTTP層的實現(xiàn),以及web相關(guān)的靜態(tài)文件渲染蝴蜓,auth碟绑,路由等功能,還有一些測試代碼和平臺相關(guān)的代碼。本文從一個簡單的例子出發(fā)蜈敢,閱讀相關(guān)的tornado源碼辜荠。
說明:
- 本文python版本2.7.12,tornado版本4.5.3
- 在展示類中的函數(shù)時抓狭,本文中只展示相關(guān)的函數(shù)伯病。
1. 一個簡單的例子
from tornado import ioloop, web, httpserver
class MainHandler(web.RequestHandler):
def get(self):
self.write("Hello, world")
app = web.Application([
(r"/index", MainHandler),
])
if __name__ == "__main__":
server = httpserver.HTTPServer(app)
server.bind(8888)
server.start()
ioloop.IOLoop.current().start()
上述代碼首先初始化了一個應(yīng)用(Application)。Application實現(xiàn)的是路由功能否过,將請求根據(jù)路徑午笛,host等屬性與相關(guān)的處理函數(shù)匹配起來。然后用一個Httpserver去實現(xiàn)這個應(yīng)用苗桂。最后啟動ioloop药磺,進(jìn)行網(wǎng)絡(luò)io的操作。因為ioloop是單例的煤伟,所以癌佩,啟動ioloop時無須指定HTTPServer。
這里有幾點值得注意的:
- 可以省略HTTPServer這一步便锨,使用app.listen或server.listen會自動創(chuàng)建一個HTTPServer比如:
app.listen(8888)
ioloop.IOLoop.current().start()
或者
server = httpserver.HTTPServer(app)
server.listen(8888)
ioloop.IOLoop.current().start()
- HTTPServer可以以多進(jìn)程的方式啟動围辙,通過start參數(shù)指定子進(jìn)程個數(shù),None或<=0會默認(rèn)按照cpu核數(shù)啟動相應(yīng)個數(shù)的子進(jìn)程,放案;不填或等于1的話姚建,就以單進(jìn)程啟動(強(qiáng)迫癥晚期的我不禁要問,為啥不能起一個子進(jìn)程呢吱殉?)掸冤;>1會啟動相應(yīng)個數(shù)的子進(jìn)程。友雳。例如:
server.start(4) # 啟動1個父進(jìn)程稿湿,4個子進(jìn)程
- 突發(fā)奇想, 能否同時監(jiān)聽多個端口呢沥阱?經(jīng)過嘗試缎罢,發(fā)現(xiàn)僅當(dāng)單進(jìn)程模式時,可以啟動兩個server考杉,且兩個server都有效。具體原因不知舰始。
server1 = httpserver.HTTPServer(app)
server1.bind(8888)
server1.start()
server2 = httpserver.HTTPServer(app)
server2.bind(9888)
server2.start()
ioloop.IOLoop.current().start()
3. server = httpserver.HTTPServer(app)
這行代碼是啟動了一個HTTPServer崇棠。HTTPServer這個類繼承了TCPServer, Configurable,
HTTPServerConnectionDelegate這三個類。其中Configurable類可以認(rèn)為是一個抽象類丸卷,利用new函數(shù)實現(xiàn)了工廠方法枕稀,可以根據(jù)不同配置,創(chuàng)建不同類型的對象。同時這里HTTPServer繼承了多個父類萎坷,從而引出了鉆石繼承的問題凹联。
- HTTPServer是一個新式類,因此鉆石繼承的方法解析順序(MRO)使用廣度有限搜索哆档。
- __new__函數(shù)是構(gòu)造對象蔽挠,__init__是初始化,因此__new__先于__init__被調(diào)用瓜浸。而initialize在__new__中被調(diào)用澳淑,因此initialize也先于__init__被調(diào)用。
可以閱讀一下Configurable的__new__函數(shù)插佛。
class Configurable(object):
"""Base class for configurable interfaces.
A configurable interface is an (abstract) class whose constructor
acts as a factory function for one of its implementation subclasses.
The implementation subclass as well as optional keyword arguments to
its initializer can be set globally at runtime with `configure`.
By using the constructor as the factory method, the interface
looks like a normal class, `isinstance` works as usual, etc. This
pattern is most useful when the choice of implementation is likely
to be a global decision (e.g. when `~select.epoll` is available,
always use it instead of `~select.select`), or when a
previously-monolithic class has been split into specialized
subclasses.
Configurable subclasses must define the class methods
`configurable_base` and `configurable_default`, and use the instance
method `initialize` instead of ``__init__``.
"""
def __new__(cls, *args, **kwargs):
base = cls.configurable_base()
init_kwargs = {}
if cls is base:
impl = cls.configured_class()
if base.__impl_kwargs:
init_kwargs.update(base.__impl_kwargs)
else:
impl = cls
init_kwargs.update(kwargs)
instance = super(Configurable, cls).__new__(impl)
# initialize vs __init__ chosen for compatibility with AsyncHTTPClient
# singleton magic. If we get rid of that we can switch to __init__
# here too.
instance.initialize(*args, **init_kwargs)
return instance
以下是HTTPServer的代碼杠巡,HTTPServer是一個非阻塞,單線程的http server雇寇。__init__函數(shù)是必須的氢拥,雖然沒有任何有效代碼,但是阻止了父類__init__函數(shù)被調(diào)用锨侯。
class HTTPServer(TCPServer, Configurable,
httputil.HTTPServerConnectionDelegate):
def __init__(self, *args, **kwargs):
# Ignore args to __init__; real initialization belongs in
# initialize since we're Configurable. (there's something
# weird in initialization order between this class,
# Configurable, and TCPServer so we can't leave __init__ out
# completely)
pass
def initialize(self, request_callback, no_keep_alive=False, io_loop=None,
xheaders=False, ssl_options=None, protocol=None,
decompress_request=False,
chunk_size=None, max_header_size=None,
idle_connection_timeout=None, body_timeout=None,
max_body_size=None, max_buffer_size=None,
trusted_downstream=None):
self.request_callback = request_callback
self.no_keep_alive = no_keep_alive
self.xheaders = xheaders
self.protocol = protocol
self.conn_params = HTTP1ConnectionParameters(
decompress=decompress_request,
chunk_size=chunk_size,
max_header_size=max_header_size,
header_timeout=idle_connection_timeout or 3600,
max_body_size=max_body_size,
body_timeout=body_timeout,
no_keep_alive=no_keep_alive)
TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options,
max_buffer_size=max_buffer_size,
read_chunk_size=chunk_size)
self._connections = set()
self.trusted_downstream = trusted_downstream
其中兄一,考慮到MRO順序,初始化TCPServer的代碼也可以用super寫识腿,但是不夠直觀:
super(HTTPServer, self).__init__(io_loop=io_loop, ssl_options=ssl_options,
max_buffer_size=max_buffer_size,
read_chunk_size=chunk_size)
2. server.bind(8888); server.start()
class TCPServer(object):
def add_sockets(self, sockets):
"""Makes this server start accepting connections on the given sockets.
The ``sockets`` parameter is a list of socket objects such as
those returned by `~tornado.netutil.bind_sockets`.
`add_sockets` is typically used in combination with that
method and `tornado.process.fork_processes` to provide greater
control over the initialization of a multi-process server.
"""
if self.io_loop is None:
self.io_loop = IOLoop.current()
for sock in sockets:
self._sockets[sock.fileno()] = sock
add_accept_handler(sock, self._handle_connection,
io_loop=self.io_loop)
def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128,
reuse_port=False):
"""Binds this server to the given port on the given address.
To start the server, call `start`. If you want to run this server
in a single process, you can call `listen` as a shortcut to the
sequence of `bind` and `start` calls.
Address may be either an IP address or hostname. If it's a hostname,
the server will listen on all IP addresses associated with the
name. Address may be an empty string or None to listen on all
available interfaces. Family may be set to either `socket.AF_INET`
or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise
both will be used if available.
The ``backlog`` argument has the same meaning as for
`socket.listen <socket.socket.listen>`. The ``reuse_port`` argument
has the same meaning as for `.bind_sockets`.
This method may be called multiple times prior to `start` to listen
on multiple ports or interfaces.
.. versionchanged:: 4.4
Added the ``reuse_port`` argument.
"""
sockets = bind_sockets(port, address=address, family=family,
backlog=backlog, reuse_port=reuse_port)
if self._started:
self.add_sockets(sockets)
else:
self._pending_sockets.extend(sockets)
def start(self, num_processes=1):
"""Starts this server in the `.IOLoop`.
By default, we run the server in this process and do not fork any
additional child process.
If num_processes is ``None`` or <= 0, we detect the number of cores
available on this machine and fork that number of child
processes. If num_processes is given and > 1, we fork that
specific number of sub-processes.
Since we use processes and not threads, there is no shared memory
between any server code.
Note that multiple processes are not compatible with the autoreload
module (or the ``autoreload=True`` option to `tornado.web.Application`
which defaults to True when ``debug=True``).
When using multiple processes, no IOLoops can be created or
referenced until after the call to ``TCPServer.start(n)``.
"""
assert not self._started
self._started = True
if num_processes != 1:
process.fork_processes(num_processes)
sockets = self._pending_sockets
self._pending_sockets = []
self.add_sockets(sockets)
當(dāng)有需要監(jiān)聽的IP包含多個實際的IP時(例如0.0.0.0)出革,bind_sockets可能會返回多個socket對象。
3. ioloop.IOLoop.current().start()
IOLoop是底層處理IO事件的庫渡讼。核心類是IOLoop骂束。IOLoop是單例模式的。
class IOLoop(Configurable):
@staticmethod
def instance():
"""Returns a global `IOLoop` instance.
Most applications have a single, global `IOLoop` running on the
main thread. Use this method to get this instance from
another thread. In most other cases, it is better to use `current()`
to get the current thread's `IOLoop`.
"""
if not hasattr(IOLoop, "_instance"):
with IOLoop._instance_lock:
if not hasattr(IOLoop, "_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
return IOLoop._instance
@staticmethod
def current(instance=True):
"""Returns the current thread's `IOLoop`.
If an `IOLoop` is currently running or has been marked as
current by `make_current`, returns that instance. If there is
no current `IOLoop`, returns `IOLoop.instance()` (i.e. the
main thread's `IOLoop`, creating one if necessary) if ``instance``
is true.
In general you should use `IOLoop.current` as the default when
constructing an asynchronous object, and use `IOLoop.instance`
when you mean to communicate to the main thread from a different
one.
.. versionchanged:: 4.1
Added ``instance`` argument to control the fallback to
`IOLoop.instance()`.
"""
current = getattr(IOLoop._current, "instance", None)
if current is None and instance:
return IOLoop.instance()
return current
IOLoop同樣繼承了Configurable類成箫。這是IOLoop可以根據(jù)操作系統(tǒng)環(huán)境展箱,使用EPollLoop還是SelectIOLoop的關(guān)鍵。
class IOLoop(Configurable):
@classmethod
def configurable_default(cls):
if hasattr(select, "epoll"):
from tornado.platform.epoll import EPollIOLoop
return EPollIOLoop
if hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
from tornado.platform.kqueue import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select import SelectIOLoop
return SelectIOLoop
在python中蹬昌,select.select是一個函數(shù)混驰,而select.epoll是一個類,所以tornado將select封裝成了SelectIOLoop類皂贩,具有和select.epoll一致的對外接口栖榨。這兩個類具備注冊,修改和刪除監(jiān)聽列表并執(zhí)行監(jiān)聽的能力明刷。
class _Select(object):
"""A simple, select()-based IOLoop implementation for non-Linux systems"""
def __init__(self):
self.read_fds = set()
self.write_fds = set()
self.error_fds = set()
self.fd_sets = (self.read_fds, self.write_fds, self.error_fds)
def close(self):
pass
def register(self, fd, events):
if fd in self.read_fds or fd in self.write_fds or fd in self.error_fds:
raise IOError("fd %s already registered" % fd)
if events & IOLoop.READ:
self.read_fds.add(fd)
if events & IOLoop.WRITE:
self.write_fds.add(fd)
if events & IOLoop.ERROR:
self.error_fds.add(fd)
# Closed connections are reported as errors by epoll and kqueue,
# but as zero-byte reads by select, so when errors are requested
# we need to listen for both read and error.
# self.read_fds.add(fd)
def modify(self, fd, events):
self.unregister(fd)
self.register(fd, events)
def unregister(self, fd):
self.read_fds.discard(fd)
self.write_fds.discard(fd)
self.error_fds.discard(fd)
def poll(self, timeout):
readable, writeable, errors = select.select(
self.read_fds, self.write_fds, self.error_fds, timeout)
events = {}
for fd in readable:
events[fd] = events.get(fd, 0) | IOLoop.READ
for fd in writeable:
events[fd] = events.get(fd, 0) | IOLoop.WRITE
for fd in errors:
events[fd] = events.get(fd, 0) | IOLoop.ERROR
return events.items()
class SelectIOLoop(PollIOLoop):
def initialize(self, **kwargs):
super(SelectIOLoop, self).initialize(impl=_Select(), **kwargs)
Epoll的性能是遠(yuǎn)遠(yuǎn)高于select的婴栽,因而tornado會優(yōu)先選擇epoll。
此前的add_sockets這一個函數(shù)中辈末,就會調(diào)用IOLoop的register方法愚争,注冊需要監(jiān)聽IO事件映皆。
當(dāng)IOLoop start以后,就開始進(jìn)入一個web服務(wù)最基本的死循環(huán)了——監(jiān)聽IO事件轰枝,并在接收到請求后調(diào)用相應(yīng)的handler處理捅彻。