第三篇總結(jié)下TCP交互數(shù)據(jù)流與多進(jìn)程編程以及python中多客戶端編程的幾種實(shí)現(xiàn)方案误堡,測試環(huán)境為macos10.12和ubuntu16.04通熄。
1 交互數(shù)據(jù)流
先看一段簡單的代碼韩肝,這里先把服務(wù)端更加簡化一下,只接收一次數(shù)據(jù)就關(guān)閉客戶端的連接泛豪,客戶端代碼不變敢艰,如下所示舵匾。
#onceserver.py
import socket
def start_server(ip, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((ip, port))
sock.listen(1)
while True:
conn, cliaddr = sock.accept()
print 'server connect from: ', cliaddr
data = conn.recv(1024)
print 'server received:', data
conn.send(data.upper())
conn.close()
except Exception, ex:
print 'exception occured:', ex
finally:
sock.close()
if __name__ == "__main__":
start_server('127.0.0.1', 7777)
#client.py
from socket import *
import sys
def start_client(ip, port):
try:
sock = socket(AF_INET, SOCK_STREAM, 0)
sock.connect((ip, port))
print 'connected'
while True:
data = sys.stdin.readline().strip()
if not data: break
sock.send(data)
result = sock.recv(1024)
if not result:
print 'other side has closed'
else:
print 'response from server:%s' % result
sock.close()
except Exception, ex:
print ex
if __name__ == "__main__":
start_client('127.0.0.1', 7777)
先開一個(gè)終端python onceserver.py
俊抵,再開另一個(gè)終端運(yùn)行python client.py
,然后在客戶端依次輸入haha
, hehe
, wawa
坐梯,可以發(fā)現(xiàn)結(jié)果如下:
ssj@ssj-mbp ~/Prog/network $ python client.py
connected
haha
response from server:HAHA
hehe
other side has closed
wawa
[Errno 32] Broken pipe
而對(duì)應(yīng)到wireshark里面徽诲,可以看到數(shù)據(jù)包如下,出現(xiàn)這個(gè)結(jié)果也很容易解釋了:序號(hào)5的數(shù)據(jù)包是客戶端發(fā)送了4個(gè)字節(jié)的數(shù)據(jù)haha
給服務(wù)端吵血;序號(hào)6的數(shù)據(jù)包是服務(wù)端回應(yīng)一個(gè)ACK包谎替,可以看到序號(hào)6的ACK的值比序號(hào)5上一個(gè)Seq的增加了4,這是因?yàn)閭鬏斄?個(gè)字節(jié)的數(shù)據(jù)蹋辅,所以請(qǐng)求的下一個(gè)seq的值加了4钱贯。接著的序號(hào)7的數(shù)據(jù)包是服務(wù)端發(fā)給客戶端的4個(gè)字節(jié)的數(shù)據(jù)HAHA
,ACK的值不變侦另,PSH標(biāo)志置位秩命。序號(hào)8是客戶端對(duì)這四個(gè)字節(jié)的ACK包。序號(hào)9則是服務(wù)端關(guān)閉連接的FIN包褒傅,然后序號(hào)10是客戶端對(duì)FIN的ACK包弃锐。
前一段都是正常的,下面看看后面的輸入產(chǎn)生這個(gè)結(jié)果的原因殿托,這個(gè)時(shí)候霹菊,服務(wù)端已經(jīng)關(guān)閉了該連接,我們在客戶端再次輸入hehe
支竹,這時(shí)對(duì)應(yīng)序號(hào)11旋廷,而由于服務(wù)端已經(jīng)關(guān)閉了連接鸠按,所以回應(yīng)了一個(gè)RST包,對(duì)應(yīng)序號(hào)12饶碘〈纾客戶端send完數(shù)據(jù)后就不管了,收到RST包后熊镣,發(fā)現(xiàn)數(shù)據(jù)為0卑雁,所以打印出other side has closed
,但是這個(gè)時(shí)候并不能立刻通知應(yīng)用程序绪囱,而是保存在內(nèi)核的TCP協(xié)議層测蹲,這樣直到最后再一次準(zhǔn)備發(fā)送wawa
的時(shí)候,由于TCP協(xié)議層已經(jīng)處于RST狀態(tài)了鬼吵,因此不會(huì)將數(shù)據(jù)發(fā)出扣甲,而是發(fā)一個(gè)SIGPIPE信號(hào)給應(yīng)用層,SIGPIPE信號(hào)的缺省處理動(dòng)作是終止程序齿椅,所以看到上面的現(xiàn)象琉挖。為了避免客戶端異常退出,上面的代碼應(yīng)該在判斷對(duì)方關(guān)閉了連接后break出循環(huán)涣脚,而不是繼續(xù)send示辈。而服務(wù)端要多次接收數(shù)據(jù),則改成之前文章中那樣遣蚀。
2 處理多客戶端請(qǐng)求-多進(jìn)程方案
上一節(jié)修正后的服務(wù)端和客戶端代碼如下:
#server.py
import socket
def start_server(ip, port):
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
listensock.bind((ip, port))
listensock.listen(5)
while True:
conn, cliaddr = listensock.accept()
print 'server connect from: ', cliaddr
while True:
data = conn.recv(1024)
if not data:
print 'client closed:', cliaddr
break
print 'server received:', data
conn.send(data.upper())
conn.close()
except Exception, ex:
print 'exception occured:', ex
finally:
listensock.close()
if __name__ == "__main__":
start_server('127.0.0.1', 7777)
#client.py
from socket import *
import sys
def start_client(ip, port):
try:
sock = socket(AF_INET, SOCK_STREAM, 0)
sock.connect((ip, port))
print 'connected'
while True:
data = sys.stdin.readline().strip()
if not data: break
sock.send(data)
result = sock.recv(1024)
if not result:
print 'other side has closed'
break
else:
print 'response from server:%s' % result
sock.close()
except Exception, ex:
print ex
if __name__ == "__main__":
start_client('127.0.0.1', 7777)
這個(gè)時(shí)候開啟第一個(gè)終端矾麻,運(yùn)行python server.py
,這時(shí)候再開啟第二個(gè)終端運(yùn)行python client.py
芭梯,輸入數(shù)據(jù)险耀,也得到了正常的回應(yīng),可是當(dāng)我們開啟另外一個(gè)終端運(yùn)行第二個(gè)客戶端的時(shí)候玖喘,會(huì)發(fā)現(xiàn)發(fā)送數(shù)據(jù)后并只得到了一個(gè)ACK回應(yīng)甩牺,服務(wù)端并沒有發(fā)送數(shù)據(jù)過來。原因也很簡單累奈,服務(wù)端還卡在第二個(gè)循環(huán)里面贬派,第一個(gè)客戶端連接不退出,服務(wù)端不會(huì)再次運(yùn)行accept函數(shù)處理新的連接费尽。
處理多客戶端有幾種方式赠群,比如多進(jìn)程,一個(gè)進(jìn)程對(duì)應(yīng)一個(gè)連接旱幼,還有多線程查描,以及進(jìn)程和線程混合模式等。當(dāng)然還有更好的select,epoll等方案可以一個(gè)進(jìn)程處理多個(gè)客戶端冬三,這節(jié)就用多進(jìn)程的來實(shí)現(xiàn)下多客戶端處理匀油。修改代碼如下:
import socket
import os
import sys
def start_server(ip, port):
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
listensock.bind((ip, port))
listensock.listen(5)
while True:
conn, cliaddr = listensock.accept()
try:
pid = os.fork()
except OSError, e:
break
if pid == 0:
print 'server connect from: ', cliaddr
listensock.close()
while True:
data = conn.recv(1024)
if not data:
print 'client closed:', cliaddr
break
print 'server received:', data
conn.send(data.upper())
conn.close()
os._exit(0)
else:
conn.close()
except Exception, ex:
print 'exception occured:', ex
finally:
listensock.close()
這樣每次來一個(gè)連接,就創(chuàng)建一個(gè)新的子進(jìn)程來處理勾笆,處理完子進(jìn)程退出敌蚜,就可以達(dá)到處理多個(gè)客戶端的情況了。注意的是窝爪,這里子進(jìn)程退出了而父進(jìn)程也不進(jìn)行回收處理的話弛车,子進(jìn)程會(huì)變成僵尸進(jìn)程,如下圖所示蒲每,一個(gè)客戶端退出后纷跛,可以看到多了一個(gè)Python的僵尸進(jìn)程,狀態(tài)是Z+
邀杏,在linux下面會(huì)顯示狀態(tài)為<defunct>
贫奠。
? data ps aux|grep Python
ssj 7908 0.0 0.0 0 0 s001 Z+ 4:14下午 0:00.00 (Python)
為什么會(huì)有僵尸進(jìn)程的存在呢?我們知道一個(gè)進(jìn)程在終止時(shí)會(huì)關(guān)閉所有文件描述符望蜡,釋放在用戶空間分配的內(nèi)存唤崭,但是它的進(jìn)程控制塊(PCB)還保留著,內(nèi)核在其中保存了一些信息:如果是正常終止則保存著退出狀態(tài)脖律,如果是異常終止則保存著導(dǎo)致該進(jìn)程終止的信號(hào)是哪個(gè)谢肾。如果一個(gè)進(jìn)程已經(jīng)終止,但是它的父進(jìn)程尚未調(diào)用wait或waitpid對(duì)它進(jìn)行清理状您,這時(shí)的進(jìn)程狀態(tài)稱為僵尸進(jìn)程勒叠。也可以參考下stackoverflow上面的這個(gè)問題 why-zombie-processes-exist。
為了解決僵尸進(jìn)程問題膏孟,父進(jìn)程需要處理SIGCHLD信號(hào)并調(diào)用wait清理僵尸進(jìn)程,當(dāng)然為了簡單起見拌汇,我這里是在父進(jìn)程里面直接忽略SIGCHLD信號(hào)柒桑,相當(dāng)于直接告訴系統(tǒng),我不關(guān)心子進(jìn)程的狀態(tài)噪舀,不要產(chǎn)生僵尸進(jìn)程魁淳,這樣也可以達(dá)到解決僵尸進(jìn)程的目的,修改后的代碼如下:
......
import signal #導(dǎo)入signal模塊
def start_server(ip, port):
signal.signal(signal.SIGCHLD, signal.SIG_IGN) #忽略SIGCHLD信號(hào)
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
......
另外說一句与倡,與僵尸進(jìn)程對(duì)應(yīng)的還有個(gè)孤兒進(jìn)程界逛,就是父進(jìn)程已經(jīng)退出,而子進(jìn)程還沒有退出時(shí)所處的狀態(tài)纺座,孤兒進(jìn)程的父進(jìn)程退出后會(huì)被init進(jìn)程接管息拜,也就是說它的父進(jìn)程會(huì)被設(shè)置為1,子進(jìn)程運(yùn)行結(jié)束會(huì)被init進(jìn)程回收,不會(huì)產(chǎn)生僵尸進(jìn)程少欺。另外一點(diǎn)喳瓣,如果要終止一個(gè)僵尸進(jìn)程是不能通過kill命令來實(shí)現(xiàn)的,因?yàn)榻┦M(jìn)程已經(jīng)終止了赞别,沒法再kill畏陕,正確的方法是kill掉僵尸進(jìn)程的父進(jìn)程,讓init進(jìn)程接管僵尸進(jìn)程并回收仿滔。
3 處理多客戶端請(qǐng)求-select方案
在之前提到的TCP編程中惠毁,其中的socket是阻塞socket,因?yàn)閜ython程序會(huì)停止運(yùn)行崎页,直到一個(gè)event發(fā)生仁讨。其中accept()調(diào)用會(huì)阻塞,直到接收到一個(gè)客戶端連接实昨。而recv()調(diào)用也會(huì)阻塞洞豁,直到這次接收客戶端數(shù)據(jù)完成(或者沒有更多的數(shù)據(jù)要接收)。send()調(diào)用也會(huì)阻塞荒给,直到將這次需要返回給客戶端的數(shù)據(jù)都放到Linux的發(fā)送緩沖隊(duì)列中丈挟。使用多進(jìn)程或者多線程來處理多客戶端請(qǐng)求,容易引起性能問題志电,異步socket是一種不錯(cuò)的解決方案曙咽。異步socket在python的API里面有select,poll挑辆,epoll三種例朱,其中epoll性能最好,select性能較差鱼蝉,因?yàn)樗看味家喸兂绦蜴i需要的所有socket去查找感興趣的event洒嗤。注意一下,select在這里雖然稱之為異步socket魁亦,并不是說它的讀取和寫入不阻塞渔隶,只是因?yàn)閟elect函數(shù)給你找到了已經(jīng)有的讀事件和寫事件的socket,你在accept洁奈,recv间唉,send調(diào)用的時(shí)候可以直接讀取到數(shù)據(jù)而不需要再等待,因?yàn)閿?shù)據(jù)已經(jīng)到達(dá)利术。
select幾乎在所有平臺(tái)都能支持呈野,良好的跨平臺(tái)支持是它為數(shù)不多的優(yōu)點(diǎn)了。select的一個(gè)缺點(diǎn)在于單個(gè)進(jìn)程能夠監(jiān)視的文件描述符的數(shù)量存在最大限制印叁,如果要增大則需要修改參數(shù)重新編譯內(nèi)核被冒。另外军掂,select()所維護(hù)的socket文件描述符的數(shù)據(jù)結(jié)構(gòu),隨著文件描述符數(shù)量的增大姆打,調(diào)用select()掃描所有的socket的開銷也會(huì)增加良姆。poll()與select()類似,這里就不再討論幔戏。select()將就緒的讀寫事件的socket告訴進(jìn)程后玛追,如果進(jìn)程沒有對(duì)其進(jìn)行IO操作,那么下次調(diào)用select()的時(shí)候?qū)⒃俅畏祷剡@些socket闲延,所以它們一般不會(huì)丟失消息(比如在下面代碼中第一次不處理wset中的socket痊剖,第二次select的時(shí)候還是會(huì)返回對(duì)應(yīng)的socket的集合)。這種方式稱為水平觸發(fā)(Level Triggered)垒玲,后面會(huì)看到epoll里面支持水平觸發(fā)和垂直觸發(fā)陆馁。
select服務(wù)端的實(shí)現(xiàn)如下所示:
#selectserver.py
import socket
import os
import select
import Queue
def start_server(ip, port):
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
listensock.bind((ip, port))
listensock.listen(511)
inputs = [listensock]
outputs = []
msg_queue = {}
while inputs:
print 'waiting for next event'
rset, wset, expset = select.select(inputs, outputs, inputs)
if not rset and not wset and not expset:
print 'timeout'
break
print 'rset %s, wset:%s' % (rset, wset)
#處理讀事件
for s in rset:
if s is listensock: #如果是監(jiān)聽socket,則accept接受連接合愈。
conn, cliaddr = s.accept()
print 'connect from ', cliaddr
inputs.append(conn)
msg_queue[conn] = Queue.Queue() #為每個(gè)連接分配一個(gè)隊(duì)列接收數(shù)據(jù)
else:
data = s.recv(1024)
if data:
print 'server received %s from %s' % (data, s.getpeername())
msg_queue[s].put(data)
if s not in outputs:
outputs.append(s)
else:
print 'client %s closed' % s.getpeername()
if s in outputs:
outputs.remove(s) //客戶端關(guān)閉叮贩,將對(duì)應(yīng)socket從outputs中移除。
inputs.remove(s)
del msg_queue[s]
s.close()
#處理寫事件
for s in wset:
try:
#用get_nowait()防止阻塞佛析,如果隊(duì)列為空會(huì)拋出Empty異常益老,python隊(duì)列用get會(huì)阻塞。
next_msg = msg_queue[s].get_nowait()
print 'server sending %s to %s' % (next_msg.upper(), s.getpeername())
s.send(next_msg.upper())
except Queue.Empty:
print s.getpeername(), 'queue empty'
outputs.remove(s)
#處理異常
for s in expset:
print 'exception on %s' % s.getpeername()
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
del msg_queue[s]
except Exception, ex:
print 'exception occured:', ex
finally:
listensock.close()
if __name__ == "__main__":
start_server('127.0.0.1', 7777)
運(yùn)行python selectserver.py
寸莫,然后在另一個(gè)終端開啟python client.py
捺萌,輸入數(shù)據(jù)hehe
,可以看到服務(wù)端的輸出如下膘茎,也就是說桃纯,select會(huì)阻塞等待,等到有事件來的時(shí)候披坏,select函數(shù)會(huì)遍歷所有的socket态坦,找到有讀取事件和寫入事件的socket,然后讀取事件的socket設(shè)置在rset中刮萌,寫入事件的socket的設(shè)置在wset中驮配,異常的socket在exception中,然后分別處理即可着茸。注意讀取事件有個(gè)特例是監(jiān)聽關(guān)鍵字,要單獨(dú)處理琐旁。
ssj@ssj-mbp ~/Prog/network/data $ python selectserver.py
waiting for next event
rset [<socket._socketobject object at 0x1022e37c0>], wset:[]
connect from ('127.0.0.1', 61612)
waiting for next event
rset [<socket._socketobject object at 0x1022e39f0>], wset:[]
server received haha from ('127.0.0.1', 61612)
waiting for next event
rset [], wset:[<socket._socketobject object at 0x1022e39f0>]
server sending HAHA to ('127.0.0.1', 61612)
waiting for next event
rset [], wset:[<socket._socketobject object at 0x1022e39f0>]
('127.0.0.1', 61612) queue empty
waiting for next event
4 處理多客戶端請(qǐng)求-epoll方案
上一節(jié)的select方案是不需要多進(jìn)程了涮阔,只要有I/O事件產(chǎn)生,我們的程序就會(huì)阻塞在select處灰殴。但是依然有個(gè)問題敬特,我們從select那里僅僅知道I/O事件發(fā)生掰邢,但卻并不知道是那幾個(gè)socket的I/O事件(可能有一個(gè),多個(gè)伟阔,甚至全部)辣之,于是只能無差別輪詢所有流,找出能讀出數(shù)據(jù)皱炉,或者寫入數(shù)據(jù)的流怀估,對(duì)他們進(jìn)行操作。輪詢的時(shí)間復(fù)雜度為O(n)合搅,而且socket越多多搀,時(shí)間越長。epoll就是對(duì)select的改進(jìn)灾部,它不再需要輪詢所有的socket了康铭,而是把哪個(gè)socket發(fā)生了什么I/O事件直接通知給我們,如下代碼中的epoll.poll()
方法就是返回有I/O事件的socket的文件描述符和事件類型赌髓,大大降低了時(shí)間復(fù)雜度从藤,提高了性能。關(guān)于epoll的原理可以參見參考資料5锁蠕,python中的API已經(jīng)簡化了不少操作夷野。
epoll有水平觸發(fā)(LT, level triggered)和邊緣觸發(fā)(ET, edge triggered)兩種方式。其中LT是默認(rèn)的工作方式匿沛,LT模式同時(shí)支持block和no-block socket扫责,內(nèi)核告訴你一個(gè)文件描述符是否就緒了,然后你可以對(duì)這個(gè)就緒的fd進(jìn)行IO操作逃呼。如果你不作任何操作鳖孤,內(nèi)核還是會(huì)繼續(xù)通知,這種模式編程出錯(cuò)誤可能性要小一點(diǎn)抡笼。而ET是一種加速模式苏揣,當(dāng)一個(gè)新的事件到來時(shí),ET模式下可以從poll調(diào)用中獲取到這個(gè)事件推姻,可是如果這次沒有把這個(gè)事件對(duì)應(yīng)的套接字緩沖區(qū)處理完,在這個(gè)套接字中沒有新的事件再次到來時(shí)藏古,在ET模式下是無法再次從poll調(diào)用中獲取這個(gè)事件的增炭,使用ET方式的epoll代碼可以參見參考資料4。macos沒有epoll方法拧晕,這里用的測試環(huán)境為Ubuntu16.04.
python中使用epoll代碼如下:
import socket
import os
import select
import Queue
def start_server(ip, port):
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listensock.bind((ip, port))
listensock.listen(511)
listensock.setblocking(0)
epoll = select.epoll()
epoll.register(listensock.fileno(), select.EPOLLIN)
try:
connections = {}
msg_queue = {}
while True:
events = epoll.poll(1)
for fileno, event in events:
if fileno == listensock.fileno():
conn, cliaddr = listensock.accept()
conn.setblocking(0)
epoll.register(conn.fileno(), select.EPOLLIN)
connections[conn.fileno()] = conn
msg_queue[conn.fileno()] = Queue.Queue()
elif event & select.EPOLLIN:
data = connections[fileno].recv(1024)
if data:
print 'server recv ', data
msg_queue[fileno].put(data)
epoll.modify(fileno, select.EPOLLOUT)
else:
print 'no data recv, server close ', fileno
epoll.modify(fileno, select.EPOLLHUP)
connections[fileno].shutdown(socket.SHUT_RDWR)
elif event & select.EPOLLOUT:
try:
data = msg_queue[fileno].get_nowait()
print 'server send ', data
connections[fileno].send(data.upper())
except Queue.Empty:
epoll.modify(fileno, select.EPOLLIN)
elif event & select.EPOLLHUP:
print 'close ', fileno
epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
except Exception, ex:
print 'exception occured:', ex
finally:
epoll.unregister(listensock.fileno())
epoll.close()
listensock.close()
if __name__ == "__main__":
start_server('127.0.0.1', 7777)
5 參考資料
- Linux C一站式編程 - 網(wǎng)絡(luò)編程相關(guān)章節(jié)
- Python網(wǎng)絡(luò)編程中的select 和 poll I/O復(fù)用的簡單使用
- epoll或者kqueue的原理是什么
- Python中如何使用Linux的epoll
- 高并發(fā)編程之epoll詳解