一、背景知識(shí)
Socket
概念
-
Socket是應(yīng)用層與TCP/IP協(xié)議族通信的中間軟件抽象層例获,是應(yīng)用程序通過網(wǎng)絡(luò)協(xié)議進(jìn)行通信的接口。
socket交互基本流程
- 服務(wù)器端先初始化Socket壳快,然后與端口綁定(bind)洼专,對(duì)端口進(jìn)行監(jiān)聽(listen),調(diào)用accept,等待客戶端連接立莉。在這時(shí)如果有個(gè)客戶端初始化一個(gè)Socket绢彤,然后連接服務(wù)器(connect),如果連接成功蜓耻,這時(shí)客戶端與服務(wù)器端的連接就建立了茫舶。客戶端發(fā)送數(shù)據(jù)請(qǐng)求刹淌,服務(wù)器端接收請(qǐng)求并處理請(qǐng)求饶氏,然后把回應(yīng)數(shù)據(jù)發(fā)送給客戶端,客戶端讀取數(shù)據(jù)有勾,最后關(guān)閉連接嚷往,一次交互結(jié)束。
Linux網(wǎng)絡(luò)IO
- Linux中內(nèi)存分為用戶空間和內(nèi)核空間兩個(gè)部分柠衅。如果用戶想要操作內(nèi)核空間的數(shù)據(jù)皮仁,則需要把數(shù)據(jù)從內(nèi)核空間拷貝到用戶空間。
- 如果服務(wù)器收到了從客戶端過來的請(qǐng)求菲宴,并且想要進(jìn)行處理贷祈,那么需要經(jīng)過如下步驟:
- 服務(wù)器的網(wǎng)絡(luò)驅(qū)動(dòng)接收到消息之后,向內(nèi)核申請(qǐng)空間喝峦,并在收到完整的數(shù)據(jù)包(這個(gè)過程會(huì)產(chǎn)生延時(shí)势誊,因?yàn)橛锌赡苁峭ㄟ^分組傳送過來的)后,將其復(fù)制到內(nèi)核空間谣蠢;
- 數(shù)據(jù)從內(nèi)核空間拷貝到用戶空間粟耻;
-
用戶程序進(jìn)行處理。
二眉踱、Linux IO模型
阻塞IO模型
- python實(shí)現(xiàn)
server
import socket
HOST = socket.gethostname()
PORT = 12345
BUFSIZ = 1024
# 模擬handle socket
def handle_socket(request):
return '{} received'.format(request)
s = socket.socket()
s.bind((HOST, PORT))
# 開始 TCP 監(jiān)聽挤忙。backlog 參數(shù)指定在拒絕連接之前,操作系統(tǒng)可以掛起的最大連接數(shù)量谈喳。
# 該值至少為 1册烈,大部分應(yīng)用程序設(shè)為 5 就可以了。
s.listen(5)
while True:
print('waiting for connection...')
conn, addr = s.accept()
print('connecting from: {}'.format(addr))
req = conn.recv(BUFSIZ)
resp = handle_socket(req.decode('utf-8'))
conn.send(resp.encode())
conn.close()
client
import socket
HOST = socket.gethostname()
PORT = 12345
BUFSIZ = 1024
s = socket.socket()
s.connect((HOST, PORT))
s.send(b'test message')
while True:
data = s.recv(BUFSIZ)
if not data:
break
print(data.decode('utf-8'))
s.close()
- 缺點(diǎn):
recv只能監(jiān)視單個(gè)socket婿禽,因此server同一時(shí)間只能服務(wù)一個(gè)client赏僧。 - 一種改進(jìn)方案是為每個(gè)客戶端開啟一個(gè)單獨(dú)線程來處理。但由于線程也需要占用資源扭倾,不可能無限的開啟線程淀零。
非阻塞IO模型
非阻塞的recv系統(tǒng)調(diào)用之后,進(jìn)程沒有被阻塞膛壹,操作系統(tǒng)立馬把結(jié)果返回給進(jìn)程驾中,如果數(shù)據(jù)還沒準(zhǔn)備好唉堪,則拋出異常,進(jìn)程可以去做其他的事哀卫,然后在發(fā)起recv系統(tǒng)調(diào)用巨坊,重復(fù)上述過程(這個(gè)過程通常被稱為輪詢),一直到數(shù)據(jù)準(zhǔn)備好此改,再拷貝數(shù)據(jù)到進(jìn)程進(jìn)行數(shù)據(jù)處理趾撵。需要注意,拷貝數(shù)據(jù)的整個(gè)過程共啃,進(jìn)程仍然是屬于阻塞狀態(tài)占调。
- python實(shí)現(xiàn)
server
import socket
HOST = socket.gethostname()
PORT = 12345
BUFSIZ = 1024
# 模擬handle socket
def handle_socket(request):
return '{} received'.format(request)
server = socket.socket()
server.bind((HOST, PORT))
server.listen(5)
# accept默認(rèn)是阻塞的,設(shè)置后accept成為非阻塞
server.setblocking(False)
conn = None
while True:
try:
# print('waiting for connection...')
conn, addr = server.accept()
print('connecting from: {}'.format(addr))
# accept被設(shè)置為非阻塞后移剪,要求必須有connect來連接, 否則拋出BlockingIOError
except BlockingIOError:
continue
try:
client_req = conn.recv(BUFSIZ)
resp = handle_socket(client_req.decode('utf-8'))
conn.send(resp.encode())
conn.close()
except (BlockingIOError, ConnectionResetError):
pass
- 缺點(diǎn):需要不斷循環(huán)向操作系統(tǒng)拿數(shù)據(jù)究珊,因此CPU占用率很高
多路復(fù)用IO模型
- 多路復(fù)用模型使用一個(gè)線程來檢查多個(gè)文件描述符(Socket)的就緒狀態(tài),如果有一個(gè)文件描述符就緒纵苛,則返回剿涮,否則阻塞直到超時(shí)。得到就緒狀態(tài)后進(jìn)行真正的操作可以在同一個(gè)線程里執(zhí)行攻人,也可以啟動(dòng)多線程執(zhí)行取试。
- IO多路復(fù)用有select, poll和epoll三種機(jī)制
- 但select,poll怀吻,epoll本質(zhì)上都是同步I/O瞬浓,因?yàn)樗麄兌夹枰谧x寫事件就緒后自己負(fù)責(zé)進(jìn)行讀寫,也就是說這個(gè)讀寫過程是阻塞的蓬坡,而異步I/O則無需自己負(fù)責(zé)進(jìn)行讀寫猿棉,異步I/O的實(shí)現(xiàn)會(huì)負(fù)責(zé)把數(shù)據(jù)從內(nèi)核拷貝到用戶空間。
select
select()的機(jī)制中提供一種fd_set的數(shù)據(jù)結(jié)構(gòu)屑咳,它是一個(gè)long類型的數(shù)組萨赁,數(shù)組的每一個(gè)元素與一打開的文件句柄(Socket)建立聯(lián)系。程序發(fā)起一個(gè)select調(diào)用乔宿,select使整個(gè)進(jìn)程阻塞位迂。select會(huì)不斷輪詢fd_set中的所有socket,當(dāng)任何一個(gè)socket收到數(shù)據(jù)详瑞,就會(huì)喚醒進(jìn)程。
當(dāng)進(jìn)程 被喚醒后臣缀,它知道至少有一個(gè) Socket 接收了數(shù)據(jù)坝橡。程序只需遍歷一遍 Socket 列表,就可以得到就緒的 Socket
進(jìn)程再進(jìn)行read操作精置,直接從緩沖中把數(shù)據(jù)拷貝到進(jìn)程计寇。
python實(shí)現(xiàn)
import socket
import select
HOST = socket.gethostname()
PORT = 12345
BUFSIZ = 1024
# 模擬handle socket
def handle_socket(request):
return '{} received'.format(request)
server = socket.socket()
server.bind((HOST, PORT))
server.listen(5)
server.setblocking(False)
inputs = [server, ]
while True:
print('waiting for connection...')
# 監(jiān)聽第一個(gè)列表的文件描述符,如果其中有文件描述符發(fā)生改變,則捕獲并放到rlist中
# rlist-- wait until ready for reading
# wlist -- wait until ready for writing
# xlist -- wait for an ``exceptional condition''
rlist, wlist, elist = select.select(inputs, [], [])
for r in rlist:
# 當(dāng)客戶端第一次連接服務(wù)端時(shí)
if r == server:
conn, addr = r.accept()
inputs.append(conn)
print('connecting from: {}'.format(addr))
# 當(dāng)客戶端連接上服務(wù)端之后番宁,再次發(fā)送數(shù)據(jù)時(shí)
else:
client_req = r.recv(BUFSIZ)
resp = handle_socket(client_req.decode('utf-8'))
r.send(resp.encode())
inputs.remove(r)
r.close()
- 缺點(diǎn):
- 每次調(diào)用 Select 都需要將進(jìn)程加入到所有監(jiān)視 Socket 的等待隊(duì)列元莫,每次喚醒都需要從每個(gè)隊(duì)列中移除。這里涉及了兩次遍歷蝶押,而且每次都要將整個(gè) FDS 列表傳遞給內(nèi)核踱蠢,有一定的開銷。
- 為了減少數(shù)據(jù)拷貝帶來的性能損壞棋电,內(nèi)核對(duì)被監(jiān)控的fd_set集合大小做了限制茎截,并且這個(gè)是通過宏控制的,大小不可改變(限制為1024)
poll
poll的機(jī)制與select類似赶盔,只是poll沒有最大文件描述符數(shù)量的限制企锌。因此poll仍然有select的缺點(diǎn)1。
epoll
- epoll先用 epoll_ctl 維護(hù)等待隊(duì)列于未,再調(diào)用 epoll_wait 阻塞進(jìn)程撕攒。
- epoll維護(hù)了一個(gè)就緒列表Rdlist,當(dāng)程序執(zhí)行到 epoll_wait 時(shí)烘浦,如果 Rdlist 已經(jīng)引用了 Socket抖坪,那么 epoll_wait 直接返回,喚醒進(jìn)程谎倔。如果 Rdlist 為空柳击,阻塞進(jìn)程。進(jìn)程能夠通過Rdlist獲取就緒的socket片习,從而避免了遍歷捌肴。
- epoll使用mmap將用戶空間的一塊地址和內(nèi)核空間的一塊地址同時(shí)映射到相同的一塊物理內(nèi)存地址,使得這塊物理內(nèi)存對(duì)內(nèi)核和對(duì)用戶均可見藕咏,減少用戶態(tài)和內(nèi)核態(tài)之間的數(shù)據(jù)交換状知。加速內(nèi)核與用戶空間的消息傳遞。
- epoll使用紅黑樹數(shù)據(jù)結(jié)構(gòu)來維護(hù)需要監(jiān)視的 Socket孽查。當(dāng)添加或者刪除一個(gè)套接字時(shí)(epoll_ctl)饥悴,都在紅黑樹上去處理。
- epoll模型的工作模式:
1)LT模式:當(dāng)epoll檢測(cè)到描述符事件發(fā)生并將此事件通知應(yīng)用程序盲再,應(yīng)用程序可以不立即處理該事件西设。下次調(diào)用epoll時(shí),會(huì)再次響應(yīng)應(yīng)用程序并通知此事件答朋。
2)ET模式:當(dāng)epoll檢測(cè)到描述符事件發(fā)生并將此事件通知應(yīng)用程序贷揽,應(yīng)用程序必須立即處理該事件。如果不處理梦碗,下次調(diào)用epoll時(shí)禽绪,不會(huì)再次響應(yīng)應(yīng)用程序并通知此事件蓖救。
- python實(shí)現(xiàn):
import socket
import select
HOST = socket.gethostname()
PORT = 12345
BUFSIZ = 1024
def handle_socket(request):
return '{} received'.format(request)
server = socket.socket()
server.bind((HOST, PORT))
server.listen(5)
server.setblocking(False)
#創(chuàng)建epoll對(duì)象
epoll = select.epoll()
#將創(chuàng)建的套接字添加到epoll的事件監(jiān)聽中
#事件類型:
#select.EPOLLIN 可讀事件
#select.EPOLLOUT 可寫事件
#select.EPOLLERR 錯(cuò)誤事件
#select.EPOLLHUP 客戶端斷開事件
epoll.register(server.fileno(), select.EPOLLIN)
conns = {}
while True:
print('waiting for connection...')
#輪詢注冊(cè)的事件集合
epoll_list = epoll.poll()
for fd, events in epoll_list:
#新連接
if fd == server.fileno():
conn, addr = server.accept()
#注冊(cè)新連接fd到待讀事件集合
epoll.register(conn.fileno(), select.EPOLLIN)
conns[conn.fileno()] = conn
#可讀事件
elif events == select.EPOLLIN:
client_req = conns[fd].recv(BUFSIZ)
resp = handle_socket(client_req.decode('utf-8'))
conns[fd].send(resp.encode())
epoll.unregister(fd)
conns[fd].close()
del conns[fd]