轉(zhuǎn)自 Madman
Synopsis: Richard Stevens所著的《UNIX? Network Programming Volume 1, Third Edition: The Sockets Networking》一書中昌讲,6.2章節(jié)"IO Models"列出了五種IO模型烈拒,本文將詳細(xì)介紹這幾種IO模型错蝴,并說明阻塞(blocking)和非阻塞(non-blocking)的區(qū)別、同步(synchronous)和異步(asynchronous)的區(qū)別
1. 預(yù)備知識(shí)
1.1 CPU Ring
對(duì)于操作系統(tǒng)而言遮精,穩(wěn)定且可靠地運(yùn)行是最重要的∶枘溃現(xiàn)行技術(shù)方案是將內(nèi)核與用戶進(jìn)程焦匈、用戶進(jìn)程與用戶進(jìn)程之間進(jìn)行分離德绿,內(nèi)核可以管理用戶進(jìn)程,但是用戶進(jìn)程之間不能互相干擾倦始,更不能"侵入"內(nèi)核斗遏,即使用戶的程序崩潰了,內(nèi)核也不會(huì)受影響鞋邑。
為了提升計(jì)算機(jī)安全诵次,避免惡意操作,CPU的硬件機(jī)制中一般將使用權(quán)劃分為四個(gè)特權(quán)級(jí)別:
Ring 0
級(jí)別最高枚碗,可以執(zhí)行一切指令逾一,包括像清空內(nèi)存、磁盤I/O操作等特權(quán)指令(privilege instruction)
和其它非特權(quán)指令
肮雨,內(nèi)核代碼運(yùn)行在這個(gè)模式下遵堵;Ring 3
級(jí)別最低,只能執(zhí)行非特權(quán)指令怨规,用戶進(jìn)程運(yùn)行在這個(gè)模式下陌宿。所以CPU模式(CPU models)
可以劃分為:
-
kernel mode
,也叫內(nèi)核態(tài)
-
user mode
波丰,也叫用戶態(tài)
計(jì)算機(jī)開機(jī)啟動(dòng)后壳坪,首先會(huì)加載內(nèi)核,由于占了先機(jī)
掰烟,操作系統(tǒng)內(nèi)核將自己設(shè)置為最高級(jí)別爽蝴,而之后創(chuàng)建的用戶進(jìn)程都設(shè)置為最低級(jí)別。這樣內(nèi)核就能控制CPU纫骑、內(nèi)存蝎亚、磁盤等一切資源,而用戶進(jìn)程不能直接使用這些資源先馆。例如发框,如果用戶進(jìn)程可以直接使用磁盤,就沒必要在內(nèi)核中實(shí)現(xiàn)一套文件系統(tǒng)的權(quán)限管理了
1.2 Kernel space vs. User space
不管是內(nèi)核代碼還是用戶程序代碼都需要加載到內(nèi)存中煤墙,如果不對(duì)內(nèi)存進(jìn)行管理缤底,就會(huì)出現(xiàn)用戶代碼之間、用戶代碼與內(nèi)核之間出現(xiàn)被覆蓋的情況番捂,所以內(nèi)核將內(nèi)存劃分成兩部分:
-
內(nèi)核空間(kernel space)
: 內(nèi)核代碼的運(yùn)行空間 -
用戶空間(user space)
: 用戶應(yīng)用程序代碼的運(yùn)行空間
用戶進(jìn)程只能訪問用戶空間
,而內(nèi)核可以訪問所有內(nèi)存江解。因?yàn)閮?nèi)核已將用戶進(jìn)程設(shè)置為最低級(jí)別设预,它只能運(yùn)行在CPU的Ring 3
上,所以如果用戶進(jìn)程要進(jìn)行磁盤I/O或網(wǎng)絡(luò)I/O時(shí)犁河,只能通過系統(tǒng)調(diào)用(system call)
將請(qǐng)求發(fā)給內(nèi)核鳖枕,由內(nèi)核代為執(zhí)行相應(yīng)的指令(CPU模式由用戶態(tài)
轉(zhuǎn)成內(nèi)核態(tài)
)魄梯,數(shù)據(jù)會(huì)先緩存到內(nèi)核空間
中,然后內(nèi)核將數(shù)據(jù)從內(nèi)核空間
拷貝到用戶空間
中宾符,之后用戶進(jìn)程才能繼續(xù)處理數(shù)據(jù)(CPU模式由內(nèi)核態(tài)
轉(zhuǎn)成用戶態(tài)
)
1.3 blocking vs. non-blocking
-
阻塞(blocking)
: 用戶進(jìn)程在等待某個(gè)操作完成期間酿秸,自身無法繼續(xù)干別的事情,則稱進(jìn)程在該操作上是阻塞的魏烫。Blocking I/O means that the calling system does not return control to the caller until the operation is finished -
非阻塞(non-blocking)
: 用戶進(jìn)程在等待某個(gè)操作完成期間辣苏,自身可以繼續(xù)干別的事情,則稱進(jìn)程在該操作上是非阻塞的哄褒。A non-blocking synchronous call returns control to the caller immediately. The caller is not made to wait, and the invoked system immediately returns one of two responses: If the call was executed and the results are ready, then the caller is told of that. Alternatively, the invoked system can tell the caller that the system has no resources (no data in the socket) to perform the requested action. In that case, it is the responsibility of the caller may repeat the call until it succeeds. For example, aread()
operation on a socket in non-blocking mode may return the number of read bytes or a special return code -1 with errno set toEWOULBLOCK/EAGAIN
, meaning "not ready; try again later."
1.4 Synchronous I/O vs. Asynchronous I/O
- A
synchronous I/O
operation causes the requesting process to be blocked until that I/O operation completes. - An
asynchronous I/O
operation does not cause the requesting process to be blocked.
如果用戶進(jìn)程因?yàn)镮/O操作而阻塞的話稀蟋,那就是同步I/O,否則是異步I/O呐赡。后續(xù)要介紹的blocking I/O
退客、nonblocking I/O
、I/O multiplexing
链嘀、signal driven I/O
這四種I/O模型都是同步I/O萌狂,因?yàn)榈诙A段中的真正I/O操作(比如,recvfrom
)會(huì)阻塞用戶進(jìn)程怀泊,只有asynchronous I/O
模型才是異步I/O
2. Unix中五種I/O模型
blocking I/O
nonblocking I/O
-
I/O multiplexing
(select, poll, epoll, kqueque) -
signal driven I/O
(SIGIO) -
asynchronous I/O
(the POSIX aio_read functions)
例如茫藏,用戶進(jìn)程要讀取網(wǎng)絡(luò)數(shù)據(jù)或磁盤數(shù)據(jù)時(shí)(Input),數(shù)據(jù)會(huì)經(jīng)過兩個(gè)階段:
-
內(nèi)核空間
等待數(shù)據(jù)準(zhǔn)備完成包个。Waiting for the data to be ready - 將數(shù)據(jù)從
內(nèi)核空間
拷貝到用戶空間
中刷允。Copying the data from the kernel to the process
網(wǎng)絡(luò)套接字的輸入操作第一步是等待網(wǎng)絡(luò)數(shù)據(jù)包(對(duì)CPU來說耗時(shí)特別久),當(dāng)數(shù)據(jù)包到達(dá)時(shí)碧囊,它先被復(fù)制到內(nèi)核的緩沖區(qū)中树灶,第二步是將這些數(shù)據(jù)從內(nèi)核的緩沖區(qū)復(fù)制到我們的應(yīng)用程序緩沖區(qū)中(速度快)。上述五種I/O模型的區(qū)別就在于I/O經(jīng)歷的兩個(gè)階段的不同上
2.1 blocking I/O
默認(rèn)情況下糯而,所有套接字都是阻塞的天通。下圖中我們使用UDP的數(shù)據(jù)報(bào)套接字來說明網(wǎng)絡(luò)I/O的兩個(gè)階段:
首先是我們的用戶進(jìn)程運(yùn)行(左邊),當(dāng)需要獲取網(wǎng)絡(luò)數(shù)據(jù)報(bào)(datagram)
時(shí)熄驼,用戶進(jìn)程只能通過recvfrom
系統(tǒng)調(diào)用將請(qǐng)求發(fā)給內(nèi)核像寒,然后在內(nèi)核中運(yùn)行(右邊)
用戶進(jìn)程在兩個(gè)階段都是阻塞的,這期間不能做任何其它事情瓜贾,直到數(shù)據(jù)被拷貝到用戶空間(或發(fā)生錯(cuò)誤诺祸,如系統(tǒng)調(diào)用被信號(hào)中斷)后,我們的應(yīng)用程序才能夠繼續(xù)處理數(shù)據(jù)報(bào)祭芦。即用戶進(jìn)程從調(diào)用recvfrom
到有數(shù)據(jù)返回的整個(gè)時(shí)間內(nèi)筷笨,進(jìn)程都是被阻塞的,所以它是同步I/O
舉例來說,如果要下載1000張圖片胃夏,用阻塞I/O(blocking I/O)
模型的話轴或,必須依序下載,在等待第1張圖片數(shù)據(jù)時(shí)仰禀,整個(gè)用戶進(jìn)程被阻塞照雁,只有第1張圖片數(shù)據(jù)到達(dá)內(nèi)核,并復(fù)制到用戶空間后答恶,才能保存到本地磁盤饺蚊,然后依次類推,下載其它圖片
(1) 單進(jìn)程TCP Server
如果Web服務(wù)器采用這種模式的話亥宿,那么一次只能為一個(gè)客戶服務(wù)(注意:當(dāng)服務(wù)器為這個(gè)客戶服務(wù)的時(shí)候卸勺,只要服務(wù)器的listen隊(duì)列還有空閑,那么當(dāng)其它新的客戶端發(fā)起連接后烫扼,服務(wù)器就會(huì)為新客戶端建立連接曙求,并且新客戶端也可以發(fā)送數(shù)據(jù),但服務(wù)器還不會(huì)處理映企。只有當(dāng)?shù)?個(gè)客戶關(guān)閉連接后悟狱,服務(wù)器才會(huì)一次性將第2個(gè)客戶發(fā)送的所有數(shù)據(jù)接收完,并繼續(xù)只為第2個(gè)客戶服務(wù)堰氓,依次類推):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TCP Echo Server挤渐,單進(jìn)程,阻塞 blocking I/O
import socket
# 創(chuàng)建監(jiān)聽socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# socket默認(rèn)不支持地址復(fù)用双絮,OSError: [Errno 98] Address already in use
server_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# 綁定IP地址和固定端口
server_address = ('', 9090)
print('TCP Server starting up on port {}'.format(server_address[1]))
server_sock.bind(server_address)
# socket默認(rèn)是主動(dòng)連接浴麻,調(diào)用listen()函數(shù)將socket變?yōu)楸粍?dòng)連接,這樣就可以接收客戶端連接了
server_sock.listen(5)
try:
while True:
print('Main Process, waiting for client connection...')
# client_sock是專為這個(gè)客戶端服務(wù)的socket囤攀,client_addr是包含客戶端IP和端口的元組
client_sock, client_addr = server_sock.accept()
print('Client {} is connected'.format(client_addr))
try:
while True:
# 接收客戶端發(fā)來的數(shù)據(jù)软免,阻塞,直到有數(shù)據(jù)到來
# 事實(shí)上焚挠,除非當(dāng)前客戶端關(guān)閉后膏萧,才會(huì)跳轉(zhuǎn)到外層的while循環(huán),即一次只能服務(wù)一個(gè)客戶
# 如果客戶端關(guān)閉了連接蝌衔,data是空字符串
data = client_sock.recv(4096)
if data:
print('Received {}({} bytes) from {}'.format(data, len(data), client_addr))
# 返回響應(yīng)數(shù)據(jù)榛泛,將客戶端發(fā)送來的數(shù)據(jù)原樣返回
client_sock.send(data)
print('Sent {} to {}'.format(data, client_addr))
else:
print('Client {} is closed'.format(client_addr))
break
finally:
# 關(guān)閉為這個(gè)客戶端服務(wù)的socket
client_sock.close()
finally:
# 關(guān)閉監(jiān)聽socket,不再響應(yīng)其它客戶端連接
server_sock.close()
TCP客戶端測(cè)試代碼如下噩斟,可以觀察如果指定50個(gè)客戶端時(shí)曹锨,服務(wù)端輸出結(jié)果和客戶端輸出結(jié)果:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
from datetime import datetime
import socket
server_ip = input('Please enter the TCP server ip: ')
server_port = int(input('Enter the TCP server port: '))
client_num = int(input('Enter the TCP clients count: '))
# 保存所有已成功連接的客戶端TCP socket
client_socks = []
for i in range(client_num):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((server_ip, server_port))
client_socks.append(sock)
print('Client {}[ID: {}] has connected to {}'.format(sock, i, (server_ip, server_port)))
while True:
for s in client_socks:
data = str(datetime.now()).encode('utf-8')
s.send(data)
print('Client {} has sent {} to {}'.format(s, data, (server_ip, server_port)))
# 睡眠3秒后,繼續(xù)讓每個(gè)客戶端連接向TCP Server發(fā)送數(shù)據(jù)
time.sleep(3)
Windows平臺(tái)也可以下載 Hercules SETUP utility剃允,先打開一個(gè)Hercules使用TCP Client去連接服務(wù)器艘希,如果再打開一個(gè)Hercules硼身,可以發(fā)現(xiàn),也能夠連接且可以發(fā)送數(shù)據(jù)覆享,但服務(wù)器不會(huì)處理數(shù)據(jù)也就不會(huì)返回(此時(shí),在Linux服務(wù)器上
watch -n 1 'netstat|grep tcp
查看TCP連接的狀態(tài)有很多SYN_RECV
)
(2) 多進(jìn)程TCP Server
由于上面單進(jìn)程版本中营袜,client_sock.recv(4096)
會(huì)一直阻塞撒顿,所以實(shí)際上并不能跳轉(zhuǎn)到外層while循環(huán)中去為其它新的客戶端創(chuàng)建socket,只能一次為一個(gè)客戶服務(wù)荚板。這根本滿足不了實(shí)際應(yīng)用需要凤壁,為了實(shí)現(xiàn)并發(fā)處理多個(gè)客戶端請(qǐng)求,可以使用多進(jìn)程跪另,應(yīng)用程序的主進(jìn)程只負(fù)責(zé)為每一個(gè)新的客戶端連接創(chuàng)建socket拧抖,然后為每個(gè)客戶創(chuàng)建一個(gè)子進(jìn)程,用來分別處理每個(gè)客戶的數(shù)據(jù):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TCP Echo Server免绿,多進(jìn)程唧席,阻塞 blocking I/O
import os
import socket
from multiprocessing import Process
def client_handler(client_sock, client_addr):
'''接收各個(gè)客戶端發(fā)來的數(shù)據(jù),并原樣返回'''
try:
while True:
# 接收客戶端發(fā)來的數(shù)據(jù)嘲驾,阻塞淌哟,直到有數(shù)據(jù)到來
# 如果客戶端關(guān)閉了連接,data是空字符串
data = client_sock.recv(4096)
if data:
print('Child Process [PID: {}], received {}({} bytes) from {}'.format(os.getpid(), data, len(data), client_addr))
# 返回響應(yīng)數(shù)據(jù)辽故,將客戶端發(fā)送來的數(shù)據(jù)原樣返回
client_sock.send(data)
print('Child Process [PID: {}], sent {} to {}'.format(os.getpid(), data, client_addr))
else:
print('Child Process [PID: {}], client {} is closed'.format(os.getpid(), client_addr))
break
except:
# 如果客戶端強(qiáng)制關(guān)閉連接徒仓,會(huì)報(bào)異常: ConnectionResetError: [Errno 104] Connection reset by peer
pass
finally:
# 關(guān)閉為這個(gè)客戶端服務(wù)的socket
client_sock.close()
# 創(chuàng)建監(jiān)聽socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# socket默認(rèn)不支持地址復(fù)用,OSError: [Errno 98] Address already in use
server_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# 綁定IP地址和固定端口
server_address = ('', 9090)
print('TCP Server starting up on port {}'.format(server_address[1]))
server_sock.bind(server_address)
# socket默認(rèn)是主動(dòng)連接誊垢,調(diào)用listen()函數(shù)將socket變?yōu)楸粍?dòng)連接掉弛,這樣就可以接收客戶端連接了
server_sock.listen(5)
try:
while True:
print('Main Process [PID: {}], waiting for client connection...'.format(os.getpid()))
# 主進(jìn)程只用來負(fù)責(zé)監(jiān)聽新的客戶連接
# client_sock是專為這個(gè)客戶端服務(wù)的socket,client_addr是包含客戶端IP和端口的元組
client_sock, client_addr = server_sock.accept()
print('Main Process [PID: {}], client {} is connected'.format(os.getpid(), client_addr))
# 為每個(gè)新的客戶連接創(chuàng)建一個(gè)子進(jìn)程喂走,用來處理客戶數(shù)據(jù)
client = Process(target=client_handler, args=(client_sock, client_addr))
client.start()
# 子進(jìn)程已經(jīng)復(fù)制了一份client_sock殃饿,所以主進(jìn)程中可以關(guān)閉此client_sock
client_sock.close()
finally:
# 關(guān)閉監(jiān)聽socket,不再響應(yīng)其它客戶端連接
server_sock.close()
(3) 多線程TCP Server
上面多進(jìn)程版本的問題在于缴啡,為每個(gè)客戶端連接都分別創(chuàng)建一個(gè)進(jìn)程壁晒,如果同時(shí)有10000個(gè)客戶連接,操作系統(tǒng)不可能創(chuàng)建10000個(gè)進(jìn)程业栅,那樣系統(tǒng)開銷會(huì)非常大秒咐,內(nèi)存會(huì)被耗盡,導(dǎo)致系統(tǒng)崩潰碘裕。就算沒有崩潰携取,使用了虛擬內(nèi)存,那么性能將急劇下降帮孔。同時(shí)雷滋,這么多個(gè)進(jìn)程不撑,CPU進(jìn)行進(jìn)程間切換(上下文切換
)的代價(jià)也無比巨大,最終的結(jié)果就是大部分時(shí)間都花在進(jìn)程切換上了晤斩,而為客戶提供服務(wù)的時(shí)間幾乎沒有
雖然可以使用進(jìn)程池concurrent.futures.ProcessPoolExecutor
創(chuàng)建固定數(shù)量的進(jìn)程焕檬,一旦有客戶端關(guān)閉了連接后,對(duì)應(yīng)的進(jìn)程就可以重新為下一個(gè)新的客戶連接服務(wù)澳泵,但是多進(jìn)程間的上下文切換的代價(jià)還是太大
多線程版本比多進(jìn)程版本的系統(tǒng)開銷小幾個(gè)數(shù)量級(jí)实愚,操作系統(tǒng)可以同時(shí)開啟更多的線程,而線程間的調(diào)度切換比多進(jìn)程也小很多:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TCP Echo Server兔辅,多線程腊敲,阻塞 blocking I/O
import socket
import threading
def client_handler(client_sock, client_addr):
'''接收各個(gè)客戶端發(fā)來的數(shù)據(jù),并原樣返回'''
try:
while True:
# 接收客戶端發(fā)來的數(shù)據(jù)维苔,阻塞碰辅,直到有數(shù)據(jù)到來
# 如果客戶端關(guān)閉了連接,data是空字符串
data = client_sock.recv(4096)
if data:
print('Child Thread [{}], received {}({} bytes) from {}'.format(threading.current_thread().name, data, len(data), client_addr))
# 返回響應(yīng)數(shù)據(jù)介时,將客戶端發(fā)送來的數(shù)據(jù)原樣返回
client_sock.send(data)
print('Child Thread [{}], sent {} to {}'.format(threading.current_thread().name, data, client_addr))
else:
print('Child Thread [{}], client {} is closed'.format(threading.current_thread().name, client_addr))
break
except:
# 如果客戶端強(qiáng)制關(guān)閉連接没宾,會(huì)報(bào)異常: ConnectionResetError: [Errno 104] Connection reset by peer
pass
finally:
# 關(guān)閉為這個(gè)客戶端服務(wù)的socket
client_sock.close()
# 創(chuàng)建監(jiān)聽socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# socket默認(rèn)不支持地址復(fù)用,OSError: [Errno 98] Address already in use
server_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# 綁定IP地址和固定端口
server_address = ('', 9090)
print('TCP Server starting up on port {}'.format(server_address[1]))
server_sock.bind(server_address)
# socket默認(rèn)是主動(dòng)連接潮尝,調(diào)用listen()函數(shù)將socket變?yōu)楸粍?dòng)連接榕吼,這樣就可以接收客戶端連接了
server_sock.listen(5)
try:
while True:
print('Main Thread [{}], waiting for client connection...'.format(threading.current_thread().name))
# 主進(jìn)程只用來負(fù)責(zé)監(jiān)聽新的客戶連接
# client_sock是專為這個(gè)客戶端服務(wù)的socket,client_addr是包含客戶端IP和端口的元組
client_sock, client_addr = server_sock.accept()
print('Main Thread [{}], client {} is connected'.format(threading.current_thread().name, client_addr))
# 為每個(gè)新的客戶連接創(chuàng)建一個(gè)線程勉失,用來處理客戶數(shù)據(jù)
client = threading.Thread(target=client_handler, args=(client_sock, client_addr))
client.start()
# 因?yàn)橹骶€程與子線程共享client_sock羹蚣,所以在主線程中不能關(guān)閉client_sock
# client_sock.close()
finally:
# 關(guān)閉監(jiān)聽socket,不再響應(yīng)其它客戶端連接
server_sock.close()
也可以使用線程池concurrent.futures.ThreadPoolExecutor
實(shí)現(xiàn)
2.2 nonblocking I/O
當(dāng)我們將socket設(shè)置為nonblocking時(shí)乱凿,相當(dāng)于告訴內(nèi)核:when an I/O operation that I request cannot be completed without putting the process to sleep, do not put the process to sleep, but return an error instead.
下圖中顽素,前三次系統(tǒng)調(diào)用時(shí),數(shù)據(jù)報(bào)還沒有到達(dá)徒蟆,所以內(nèi)核立即返回一個(gè)叫EWOULDBLOCK
的錯(cuò)誤胁出。第四次系統(tǒng)調(diào)用時(shí),一個(gè)數(shù)據(jù)報(bào)已經(jīng)到達(dá)段审,所以內(nèi)核不會(huì)立即返回全蝶,當(dāng)數(shù)據(jù)報(bào)從內(nèi)核空間復(fù)制到用戶空間后,recvfrom
系統(tǒng)調(diào)用返回成功信息給用戶進(jìn)程寺枉,然后用戶進(jìn)程就能處理數(shù)據(jù)了:
像這種在 nonblocking socket 上重復(fù)調(diào)用recvfrom
被稱為輪詢(polling)
抑淫,用戶進(jìn)程不斷輪詢內(nèi)核以查看某些操作是否已準(zhǔn)備就緒(忙等待,busy-waiting)姥闪,這通常很浪費(fèi)CPU時(shí)間(一般網(wǎng)絡(luò)I/O的第一階段數(shù)據(jù)到達(dá)內(nèi)核前會(huì)非常慢始苇,如果用戶進(jìn)程頻繁且不必要的向內(nèi)核發(fā)起系統(tǒng)調(diào)用,CPU就會(huì)不斷地在用戶態(tài)
和內(nèi)核態(tài)
之間切換筐喳,即用戶進(jìn)程不斷地睡眠催式、被喚醒函喉,這將極大浪費(fèi)CPU資源)。如果有1000個(gè)socket荣月,就算平均每個(gè)socket你第4次系統(tǒng)調(diào)用時(shí)告知數(shù)據(jù)到達(dá)管呵,你也要進(jìn)行4000次系統(tǒng)調(diào)用,太恐怖了喉童,所以一般不會(huì)使用這種I/O模型
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# TCP Echo Server撇寞,單進(jìn)程,非阻塞 nonblocking I/O
import socket
# 用來保存所有已成功連接的客戶端堂氯,每個(gè)列表元素是client_sock和client_addr組成的元組
clients = []
# 創(chuàng)建監(jiān)聽socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# socket默認(rèn)不支持地址復(fù)用,OSError: [Errno 98] Address already in use
server_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# 綁定IP地址和固定端口
server_address = ('', 9090)
print('TCP Server starting up on port {}'.format(server_address[1]))
server_sock.bind(server_address)
# socket默認(rèn)是主動(dòng)連接牌废,調(diào)用listen()函數(shù)將socket變?yōu)楸粍?dòng)連接咽白,這樣就可以接收客戶端連接了
server_sock.listen(5)
# 將監(jiān)聽用的server_sock設(shè)置為非阻塞
server_sock.setblocking(False)
print('Main Process, waiting for client connection...')
try:
while True:
try:
# client_sock是專為這個(gè)客戶端服務(wù)的socket,client_addr是包含客戶端IP和端口的元組
client_sock, client_addr = server_sock.accept()
except:
# server_sock設(shè)置為非堵塞后鸟缕,如果accept時(shí)晶框,恰巧沒有客戶端connect,那么accept會(huì)產(chǎn)生一個(gè)異常
pass
else:
print('Client {} is connected'.format(client_addr))
# 將新的客戶端連接socket也設(shè)置為非阻塞
client_sock.setblocking(False)
# 添加到client_socks列表中
clients.append((client_sock, client_addr))
# 循環(huán)處理每個(gè)客戶端連接
for client_sock, client_addr in clients:
try:
data = client_sock.recv(4096)
if data:
print('Received {}({} bytes) from {}'.format(data, len(data), client_addr))
# 返回響應(yīng)數(shù)據(jù)懂从,將客戶端發(fā)送來的數(shù)據(jù)原樣返回
client_sock.send(data)
print('Sent {} to {}'.format(data, client_addr))
else:
print('Client {} is closed'.format(client_addr))
# 關(guān)閉為這個(gè)客戶端服務(wù)的socket
client_sock.close()
# 從列表中刪除
clients.remove((client_sock, client_addr))
except:
# client_sock設(shè)置為非堵塞后授段,如果recv時(shí),恰巧客戶端沒有發(fā)送數(shù)據(jù)過來番甩,將會(huì)產(chǎn)生一個(gè)異常
pass
finally:
# 關(guān)閉監(jiān)聽socket侵贵,不再響應(yīng)其它客戶端連接
server_sock.close()
上面的代碼中,while循環(huán)先在監(jiān)聽socket上accept()缘薛,如果有數(shù)據(jù)表示此時(shí)有新的客戶端連接窍育,不管怎樣都要進(jìn)行一次系統(tǒng)調(diào)用。然后宴胧,for循環(huán)依次遍歷各個(gè)客戶連接socket漱抓,查看是否有哪個(gè)客戶發(fā)來數(shù)據(jù),有多少個(gè)客戶就發(fā)起多少次系統(tǒng)調(diào)用恕齐。所以乞娄,這種I/O模型非常浪費(fèi)CPU時(shí)間
第一階段不會(huì)阻塞,但數(shù)據(jù)到達(dá)內(nèi)核空間后显歧,第二階段從內(nèi)核空間復(fù)制到用戶空間時(shí)會(huì)阻塞仪或,所以它是同步I/O
2.3 I/O multiplexing
blocking I/O
使用多線程技術(shù),當(dāng)并發(fā)客戶端達(dá)到千萬級(jí)時(shí)追迟,操作系統(tǒng)不可能同時(shí)創(chuàng)建這么多線程溶其。nonblocking I/O
中用戶進(jìn)程不斷polling(輪詢)
內(nèi)核,浪費(fèi)CPU資源敦间。
每個(gè)網(wǎng)絡(luò)socket在類Unix操作系統(tǒng)中都是一個(gè)文件描述符(FD, file descriptor)
瓶逃,如果操作系統(tǒng)能提供一種機(jī)制束铭,只需要單個(gè)線程就可以同時(shí)監(jiān)視多個(gè)文件描述符,當(dāng)一個(gè)或多個(gè)描述符就緒(一般是讀就緒或?qū)懢途w)時(shí)厢绝,就通知應(yīng)用程序進(jìn)行相應(yīng)的讀寫操作的話契沫,那么效率將大幅提升
于是,現(xiàn)代操作系統(tǒng)底層陸續(xù)實(shí)現(xiàn)了各自的I/O多路復(fù)用(I/O multiplexing)
機(jī)制昔汉,multiplexing是指在單個(gè)線程中調(diào)用一次select/poll/epoll等函數(shù)懈万,可以同時(shí)監(jiān)視多個(gè)文件描述符上的可讀取事件或可寫入事件,一旦這些事件發(fā)生靶病,內(nèi)核通知用戶進(jìn)程会通,然后用戶進(jìn)程調(diào)用recvfrom等函數(shù)進(jìn)行讀寫(真正的I/O操作),這樣就能實(shí)現(xiàn)在單個(gè)線程中對(duì)多個(gè)文件描述符進(jìn)行并發(fā)讀寫(I/O multiplexing也叫event driven I/O - 事件驅(qū)動(dòng)I/O)娄周,Multiplexing Wiki: https://en.wikipedia.org/wiki/Multiplexing涕侈,可以參考電氣工程中的 "時(shí)分復(fù)用" 圖進(jìn)行理解:
下圖中,第一階段阻塞在select系統(tǒng)調(diào)用上煤辨,第二階段阻塞在recvfrom系統(tǒng)調(diào)用上裳涛,所以它是同步I/O:
(1) select
I/O多路復(fù)用
概念被提出以后,1983年左右众辨,在BSD里面最早實(shí)現(xiàn)了select
端三,目前幾乎所有的操作系統(tǒng)都提供了select
函數(shù),具體實(shí)現(xiàn)可以參考:http://man7.org/linux/man-pages/man2/select.2.html 和 《UNIX? Network Programming Volume 1, Third Edition: The Sockets Networking》的6.8章節(jié)示例
select
的缺點(diǎn):
- 能夠監(jiān)視的文件描述符最大數(shù)量有限制鹃彻,在Linux中默認(rèn)是1024個(gè)郊闯,定義在
FD_SETSIZE
中 - 當(dāng)一個(gè)或多個(gè)文件描述就緒后,
select
函數(shù)返回已就緒的文件描述符的數(shù)目(超時(shí)返回0浮声,出錯(cuò)返回-1)虚婿,用戶并不知道哪些描述符可讀、哪些描述符可寫泳挥,還要通過FD_ISSET()
宏去輪詢所有的文件描述符(假設(shè)描述符3可讀然痊,內(nèi)核會(huì)修改fd_set *readset
中描述符3對(duì)應(yīng)的位bit為1,那么FD_ISSET(3, &readset)
會(huì)返回1屉符,表示描述符3可讀)剧浸,才能知道哪個(gè)描述符可讀寫。如果是最后一個(gè)文件描述符就緒矗钟,也會(huì)從頭開始線性遍歷至結(jié)尾唆香,時(shí)間復(fù)雜度為O(n),這樣會(huì)很浪費(fèi)CPU資源 - 因?yàn)閮?nèi)核會(huì)修改文件描述符集合的位bit吨艇,所以用戶進(jìn)程每次調(diào)用
select
函數(shù)躬它,都要重新復(fù)制初始化過的文件描述符集合給內(nèi)核,當(dāng)描述符數(shù)據(jù)量大時(shí)东涡,效率低
(2) poll
http://man7.org/linux/man-pages/man2/poll.2.html
因?yàn)樗褂?strong>鏈表數(shù)據(jù)結(jié)構(gòu)pollfd
表示待監(jiān)視的文件描述符冯吓,所以沒有1024個(gè)最大文件描述符的限制倘待。但是,poll
返回后组贺,還是要通過輪詢所有文件描述符才能獲取哪些描述符可讀凸舵、哪些描述符可寫,時(shí)間復(fù)雜度為O(n)失尖。隨著文件描述符的增加啊奄,性能會(huì)線性下降
(3) epoll
http://man7.org/linux/man-pages/man7/epoll.7.html
select
和poll
都有一個(gè)缺點(diǎn),需要將要監(jiān)視的文件描述符集合在用戶空間
和內(nèi)核空間
之間來回復(fù)制掀潮,當(dāng)有描述符就緒后菇夸,又需要遍歷整個(gè)描述符集合才能得知哪些可讀寫,當(dāng)描述符很多時(shí)仪吧,性能就越差峻仇。epoll
于Linux 2.6內(nèi)核開始引入,是為了處理大批量文件描述符而作了改進(jìn)的poll(只支持Linux)
epoll
的相關(guān)函數(shù):
-
int epoll_create(int size)
: 創(chuàng)建一個(gè)epoll
實(shí)例邑商,返回代表這個(gè)實(shí)例的文件描述符。自從Linux 2.6.8之后凡蚜,size參數(shù)被忽略 -
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
:epoll
的事件注冊(cè)函數(shù)人断,它可以動(dòng)態(tài)修改要監(jiān)視的文件描述符及對(duì)應(yīng)的事件。第一個(gè)參數(shù)是epoll_create()
的返回值朝蜘;第二個(gè)參數(shù)表示注冊(cè)處理動(dòng)作恶迈,用三個(gè)宏來表示:EPOLL_CTL_ADD
(注冊(cè)新的fd到epfd中)、EPOLL_CTL_MOD
(修改已經(jīng)注冊(cè)的fd的監(jiān)聽事件)谱醇、EPOLL_CTL_DEL
(從epfd中刪除一個(gè)fd)暇仲;第三個(gè)參數(shù)是需要監(jiān)聽的fd;第四個(gè)參數(shù)是告訴內(nèi)核需要監(jiān)聽什么事副渴,比如EPOLLIN
(可讀)奈附、EPOLLOUT
(可寫) -
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
: 用戶進(jìn)程調(diào)用epoll_wait()
函數(shù)后會(huì)被阻塞,直到要監(jiān)聽的I/O事件發(fā)生煮剧,返回就緒的文件描述符數(shù)目(超時(shí)返回0)斥滤。這個(gè)步驟相當(dāng)于調(diào)用select
或poll
,但是不需要傳遞描述符集合給內(nèi)核勉盅,因?yàn)閮?nèi)核已經(jīng)在步驟2中通過epoll_ctl()
拿到了要監(jiān)視的描述符列表佑颇。epoll
采用基于事件的就緒通知方式,一旦某個(gè)文件描述符就緒時(shí)草娜,內(nèi)核會(huì)采用類似callback的回調(diào)機(jī)制挑胸,將就緒的文件描述符放到就緒鏈表里面,當(dāng)用戶進(jìn)程調(diào)用epoll_wait()
時(shí)便得到通知宰闰。另外茬贵,epoll
使用了Linux的mmap(內(nèi)存映射技術(shù))
簿透,內(nèi)核不需要復(fù)制就緒鏈表給用戶空間,用戶進(jìn)程可以直接讀取就緒鏈表中的文件描述符
epoll
的優(yōu)點(diǎn):
- 沒有最大文件描述符的限制闷沥,1GB內(nèi)存大約支持10萬左右的連接數(shù)萎战,可運(yùn)行
cat /proc/sys/fs/file-max
進(jìn)行查看 - I/O效率不隨fd數(shù)目增加而線性下降,比如有10萬個(gè)連接舆逃,由于長(zhǎng)連接和網(wǎng)絡(luò)傳輸延時(shí)等原因蚂维,同一時(shí)刻可能只有少部分是"活躍"的(有數(shù)據(jù)可讀寫),如果采用
select
或poll
路狮,每次都從頭至尾線性掃描全部描述符集合虫啥。而epoll只關(guān)心"活躍"的連接,而跟連接總數(shù)無關(guān)(因?yàn)閮?nèi)核只會(huì)為就緒的描述符調(diào)用callback)奄妨,時(shí)間復(fù)雜度為O(1) - 使用
mmap
技術(shù)涂籽,加速內(nèi)核與用戶空間的消息傳遞,調(diào)用epoll_create
后砸抛,內(nèi)核就已經(jīng)在內(nèi)核態(tài)開始準(zhǔn)備幫你存儲(chǔ)要監(jiān)控的文件描述符了评雌,每次調(diào)用epoll_ctl
只是在往內(nèi)核的數(shù)據(jù)結(jié)構(gòu)里塞入新的描述符
(4) select vs. poll vs. epoll
select | poll | epoll | |
---|---|---|---|
最大連接數(shù) | 1024(x86)或 2048(x64) | 無上限 | 無上限 |
存儲(chǔ)fds | 數(shù)組 | 鏈表 | 紅黑樹 |
傳遞fds | 每次調(diào)用select ,都需要將fds在用戶空間和內(nèi)核空間復(fù)制2次 |
每次調(diào)用poll 直焙,都需要將fds在用戶空間和內(nèi)核空間復(fù)制2次 |
調(diào)用epoll_ctl 時(shí)注冊(cè)fd到內(nèi)核中紅黑樹上景东,之后每次epoll_wait 無需復(fù)制fds |
獲取就緒的描述符 | 遍歷 | 遍歷 | 回調(diào)函數(shù) |
I/O效率 | 每次調(diào)用select 都進(jìn)行線性遍歷,時(shí)間復(fù)雜度為O(n) |
每次調(diào)用poll 都進(jìn)行線性遍歷奔誓,時(shí)間復(fù)雜度為O(n) |
基于事件通知方式斤吐,每當(dāng)fd就緒,內(nèi)核注冊(cè)的回調(diào)函數(shù)就會(huì)被調(diào)用厨喂,將就緒fd放到就緒鏈表里和措,時(shí)間復(fù)雜度為O(1) |
- select / poll / epoll: practical difference for system architects
- LINUX – IO MULTIPLEXING – SELECT VS POLL VS EPOLL
(5) Python 3.4+ selectors模塊
Python select
模塊提供了select
/poll
/epoll
/kqueue
等函數(shù),它是底層模塊蜕煌,用戶可以更精細(xì)地自主選擇使用哪個(gè)I/O multiplexing
接口派阱。而Python 3.4版本加入了selectors
高級(jí)模塊,它基于select
幌绍,但提供了統(tǒng)一的接口颁褂,會(huì)自動(dòng)選擇當(dāng)前操作系統(tǒng)中最優(yōu)化的I/O多路復(fù)用接口,比如在Linux系統(tǒng)中傀广,它會(huì)優(yōu)先使用epoll
颁独,在BSD中使用kqueque
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import selectors
import socket
# 自動(dòng)選擇當(dāng)前OS中最優(yōu)的I/O multiplexing接口,Linux中會(huì)使用selectors.EpollSelector
sel = selectors.DefaultSelector()
def accept(sock, mask):
'''監(jiān)聽套接字創(chuàng)建新的客戶端連接'''
conn, addr = sock.accept() # Should be ready
print('accepted', conn, 'from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read) # 將新的客戶端socket注冊(cè)到epoll實(shí)例上伪冰,并監(jiān)聽讀事件
def read(conn, mask):
'''接收客戶端數(shù)據(jù)誓酒,并原樣返回'''
data = conn.recv(1000) # Should be ready
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data) # Hope it won't block
else:
print('closing', conn)
sel.unregister(conn)
conn.close()
sock = socket.socket()
sock.bind(('', 9090))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
2.4 signal driven I/O
我們也可以使用信號(hào)
,告訴內(nèi)核在描述符就緒時(shí)用SIGIO
信號(hào)通知我們,我們稱之為信號(hào)驅(qū)動(dòng)的I/O
靠柑,如下圖所示:
應(yīng)用程序首先創(chuàng)建socket寨辩,并提供信號(hào)處理器(signal handler)
,然后發(fā)起sigaction
系統(tǒng)調(diào)用歼冰,該調(diào)用會(huì)立即返回靡狞,用戶進(jìn)程不會(huì)被阻塞,可以繼續(xù)往下執(zhí)行隔嫡。當(dāng)socket準(zhǔn)備就緒可以讀取數(shù)據(jù)時(shí)甸怕,內(nèi)核產(chǎn)生SIGIO
信號(hào)通知signal handler去調(diào)用recvfrom
,當(dāng)數(shù)據(jù)被復(fù)制到用戶空間后腮恩,告訴用戶進(jìn)程主循環(huán)梢杭,你要的數(shù)據(jù)已經(jīng)準(zhǔn)備好你可以直接處理了〗盏危或者武契,signal handler只是告訴用戶進(jìn)程主循環(huán)數(shù)據(jù)到達(dá)內(nèi)核空間了,你可以自己調(diào)用recvfrom
隨時(shí)去復(fù)制
總之荡含,此模型的優(yōu)勢(shì)在于第一階段不會(huì)被阻塞(等待數(shù)據(jù)到達(dá)內(nèi)核空間)咒唆,The main loop can continue executing and just wait to be notified by the signal handler that either the data is ready to process or the datagram is ready to be read.
第一階段不會(huì)阻塞,第二階段調(diào)用recvfrom會(huì)阻塞释液,所以是同步I/O
2.5 asynchronous I/O
- http://man7.org/linux/man-pages/man7/aio.7.html
- http://lse.sourceforge.net/io/aio.html
- https://oxnz.github.io/2016/10/13/linux-aio/
asynchronous I/O
是指用戶進(jìn)程發(fā)起I/O讀钧排、寫操作后,不會(huì)被阻塞均澳,當(dāng)I/O操作真正完成后(數(shù)據(jù)已被復(fù)制到用戶空間,可以直接處理)符衔,內(nèi)核使用信號(hào)或回調(diào)函數(shù)進(jìn)行異步通知用戶進(jìn)程找前。第一階段和第二階段都是是非阻塞的,是真正的異步I/O
Linux AIO API
- POSIX AIO API (
glibc
):aio_read
/aio_write
/aio_fsync
/lio_listio
/aio_cancel
/aio_suspend
/aio_return
/aio_error
- Native Linux AIO API (
libaio
):io_setup
/io_destroy
/io_submit
/io_getevents
/io_cancel
Linux AIO不夠成熟判族,Windows中的IOCP
是成熟的異步I/O
2.6 五種I/O模型的比較
下圖是五種不同I/O模型的比較躺盛。它表明前四個(gè)模型之間的主要區(qū)別是第一個(gè)階段,因?yàn)榍八膫€(gè)模型中的第二個(gè)階段是相同的:當(dāng)數(shù)據(jù)從內(nèi)核復(fù)制到調(diào)用者的緩沖區(qū)時(shí)形帮,在調(diào)用recvfrom
時(shí)阻塞進(jìn)程槽惫。根據(jù)1.4的定義,前四個(gè)I/O模型都是同步的辩撑,因?yàn)檎嬲腎/O操作recvfrom
會(huì)阻塞進(jìn)程界斜。但是,asynchronous I/O
處理的兩個(gè)階段合冀,與前四個(gè)模型不同各薇,兩個(gè)階段都不會(huì)阻塞進(jìn)程,所以只有它是異步的