Python socket編程
引言
sockets
的歷史悠久,它們最早在 1971 年的 APPANET 中使用,后來成為1983年發(fā)布的Berkeley Software Distribution(BSD)操作系統(tǒng)中的API,稱為Berkeley sockets
。
Web服務器和瀏覽器并不是使用sockets
的唯一程序中鼠,各種規(guī)模和類型的客戶端 - 服務器(client - server)應用程序也得到了廣泛使用。
今天沿癞,盡管socket API
使用的底層協(xié)議已經(jīng)發(fā)展多年援雇,而且已經(jīng)有新協(xié)議出現(xiàn),但是底層 API 仍然保持不變椎扬。
最常見的套接字應用程序類型是客戶端 - 服務器(client - server)應用程序惫搏,其中一方充當服務器并等待來自客戶端的連接。
Socket API介紹
Python
中的socket模塊提供了一個到Berkeley sockets API
的接口蚕涤,其中的主要接口函數(shù)如下:
- socket()
- bind()
- listen()
- accept()
- connect()
- connect_ex()
- send()
- recv()
- close()
這些方便使用的接口函數(shù)和系統(tǒng)底層的功能調用相一致筐赔。
TCP Sockets
我們準備構建一個基于 TCP 協(xié)議的socket
對象,為什么使用 TCP 呢揖铜,因為:
- 可靠性:如果在傳輸過程中因為網(wǎng)絡原因導致數(shù)據(jù)包丟失茴丰,會有相關機制檢測到并且進行重新傳輸
- 按序到達:一方發(fā)送到另一方的數(shù)據(jù)包是按發(fā)送順序被接收的。
對比之下天吓,UDP 協(xié)議是不提供這些保證的贿肩,但是它的響應效率更高,資源消耗更少龄寞。
TCP 協(xié)議并不需要我們自己去實現(xiàn)汰规,在底層都已經(jīng)實現(xiàn)好了,我們只需要使用Python
的socket
模塊物邑,進行協(xié)議指定就可以了溜哮。socket.SOCK_STREAM
表示使用 TCP 協(xié)議,socket.SOCK_DGRAM
表示使用 UDP 協(xié)議
我們來看看基于 TCP 協(xié)議socket
的 API 調用和數(shù)據(jù)傳送流程圖拂封,右邊的一列是服務器端(server)茬射,左邊的一列是客戶端(client)。
要實現(xiàn)左邊的處于監(jiān)聽狀態(tài)的server
冒签,我們需要按照順序調用這樣幾個函數(shù):
- socket(): 創(chuàng)建一個
socket
對象 - bind(): 關聯(lián)對應 ip 地址和端口號
- listen(): 允許對象接收其他
socket
的連接 - accept(): 接收其他
socket
的連接在抛,返回一個元組(conn, addr),conn 是一個新的socket
對象萧恕,代表這個連接刚梭,addr 是連接端的地址信息。
client
調用connect()
時票唆,會通過 TCP 的三次握手朴读,建立連接。當client
連接到server
時走趋,server
會調用accept()
完成這次連接衅金。
雙方通過send()
和recv()
來接收和發(fā)送數(shù)據(jù),最后通過close()
來關閉這次連接,釋放資源氮唯。一般server
端是不關閉的鉴吹,會繼續(xù)等待其他的連接。
Echo Client and Server
剛才我們弄清楚了server
和client
使用socket
進行通信的過程惩琉,我們現(xiàn)在要自己進行一個簡單的也是經(jīng)典的實現(xiàn):server
復述從client
接收的信息豆励。
Echo Server
import socket
HOST = '127.0.0.1' # Standard loopback interface address (localhost)
PORT = 65431 # Port to listen on (non-privileged ports are > 1023)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen()
conn, addr = s.accept()
with conn:
print('Connected by', addr)
while True:
data = conn.recv(1024)
if not data:
break
conn.sendall(data)
socket.socket()
創(chuàng)建了一個socket
對象,它實現(xiàn)了上下文管理器協(xié)議瞒渠,我們直接用 with 語句進行創(chuàng)建即可良蒸,而且最后不需要調用close()
函數(shù)。
socket()
中的兩個參數(shù)指明了連接需要的 ip地址類型和傳輸協(xié)議類型伍玖,socket.AF_INET 表示使用 IPv4的地址進行連接嫩痰,socket.SOCK_STREAM 表示使用 TCP 協(xié)議進行數(shù)據(jù)的傳輸。
bind()
用來將socket
對象和特定的網(wǎng)絡對象和端口號進行關聯(lián)私沮,函數(shù)中的兩個參數(shù)是由創(chuàng)建socket
對象時指定的 ip地址類型 決定的始赎,這里使用的是socket.AF_INET
(IPv4),因此仔燕,bind()
函數(shù)接收一個元組對象作為參數(shù)(HOST, PORT)
host
可以是一個主機名造垛,IP地址,或者空字符串晰搀。如果使用的是 IP地址五辽,host
必須是 IPv4格式的地址字符串。127.0.0.1
是本地環(huán)路的標準寫法外恕,因此只有在主機上的進程才能夠連接到server
杆逗,如果設置為空字符串,它可以接受所有合法 IPv4地址的連接鳞疲。port
應該是從1 - 65535
的一個整數(shù)(0被保留了)罪郊,它相當于是一個窗口和其他的客戶端建立連接,如果想使用1 - 1024
的端口尚洽,一些系統(tǒng)可能會要求要有管理員權限悔橄。
listen()
使得server
可以接受連接,它可以接受一個參數(shù):backlog
腺毫,用來指明系統(tǒng)可以接受的連接數(shù)量癣疟,雖然同一時刻只能與一端建立連接,但是其他的連接請求可以被放入等待隊列中潮酒,當前面的連接斷開睛挚,后面的請求會依次被處理,超過這個數(shù)量的連接請求再次發(fā)起后急黎,會被server
直接拒絕扎狱。
從Python 3.5
開始侧到,這個參數(shù)是可選的,如果我們不明確指明委乌,它就采用系統(tǒng)默認值床牧。如果server
端在同一時刻會收到大量的連接請求荣回,通常要把這個值調大一些遭贸,在Linux
中,可以在/proc/sys/net/core/somaxconn
看到值的情況心软,詳細請參閱:
accept()
監(jiān)聽連接的建立壕吹,是一個阻塞式調用,當有client
連接之后删铃,它會返回一個代表這個連接的新的socket
對象和代表client
地址信息的元組耳贬。對于 IPv4 的地址連接,地址信息是 (host, port)猎唁,對于 IPv6 咒劲,(host, port, flowinfo, scopeid)
有一件事情需要特別注意,accept()
之后诫隅,我們獲得了一個新的socket
對象腐魂,它和server
以及client
都不同,我們用它來進行和client
的通信逐纬。
conn, addr = s.accept()
with conn:
print('Connected by', addr)
while True:
data = conn.recv(1024)
if not data:
break
conn.sendall(data)
conn
是我們新獲得的socket
對象蛔屹,conn.recv()
也是一個阻塞式調用,它會等待底層的 I/O 響應豁生,直到獲得數(shù)據(jù)才繼續(xù)向下執(zhí)行兔毒。外面的while
循環(huán)保證server
端一直監(jiān)聽,通過conn.sendall
將數(shù)據(jù)再發(fā)送回去甸箱。
Echo Client
import socket
HOST = '127.0.0.1' # The server's hostname or IP address
PORT = 65431 # The port used by the server
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
s.sendall("Hello, world".encode("utf8"))
data = s.recv(1024)
print('Received', data.decode("utf8"))
和server
相比育叁,client
更加簡單,先是創(chuàng)建了一個socket
對象芍殖,然后將它和server
連接豪嗽,通過s.sendall()
將信息發(fā)送給server
,通過s.recv()
獲得來自server
的數(shù)據(jù)围小,然后將其打印輸出昵骤。
在發(fā)送數(shù)據(jù)時,只支持發(fā)送字節(jié)型數(shù)據(jù)肯适,所以我們要將需要發(fā)送的數(shù)據(jù)進行編碼变秦,在收到server
端的回應后,將得到的數(shù)據(jù)進行解碼框舔,就能還原出我們能夠識別的字符串了蹦玫。
啟動程序
我們要先啟動server
端赎婚,做好監(jiān)聽準備,然后再啟動client
端樱溉,進行連接挣输。
這個信息是在client
連接后打印出來的。
可以使用netstat
這個命令查看socket
的狀態(tài)福贞,更詳細使用可以查閱幫助文檔撩嚼。
查看系統(tǒng)中處于監(jiān)聽狀態(tài)的socket
,過濾出了使用 TCP協(xié)議 和 IPv4 地址的對象:
如果先啟動了client
挖帘,會有下面這個經(jīng)典的錯誤:
造成的原因可能是端口號寫錯了完丽,或者server
根本就沒運行,也可能是在server
端存在防火墻阻值了連接建立拇舀,下面是一些常見的錯誤異常:
Exception | errno Constant | Description |
---|---|---|
BlockingIOError | EWOULDBLOCK | Resource temporarily unavailable. For example, in non-blocking mode, when calling send() and the peer is busy and not reading, the send queue (network buffer) is full. Or there are issues with the network. Hopefully this is a temporary condition. |
OSError | ADDRINUSE | Address already in use. Make sure there’s not another process running that’s using the same port number and your server is setting the socket option SO_REUSEADDR: socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1). |
ConnectionResetError | ECONNRESET | Connection reset by peer. The remote process crashed or did not close its socket properly (unclean shutdown). Or there’s a firewall or other device in the network path that’s missing rules or misbehaving. |
TimeoutError | ETIMEDOUT | Operation timed out. No response from peer. |
ConnectionRefusedError | ECONNREFUSED | Connection refused. No application listening on specified port. |
連接的建立
現(xiàn)在我們仔細看一下server
和client
是怎樣建通信的:
當使用環(huán)路網(wǎng)絡((IPv4 address 127.0.0.1 or IPv6 address ::1))的時候逻族,數(shù)據(jù)沒有離開過主機跑到外部的網(wǎng)絡。如圖所示骄崩,環(huán)路網(wǎng)絡是在主機內部建立的聘鳞,數(shù)據(jù)就經(jīng)過它來發(fā)送,從主機上運行的一個程序發(fā)送到另一個程序要拂,從主機發(fā)到主機抠璃。這就是為什么我們喜歡說環(huán)路網(wǎng)絡和 IP地址 127.0.0.1(IPv4) 或 ::1(IPv6) 都表示主機
如果server
使用的時其他的合法IP地址,它就會通過以太網(wǎng)接口與外部網(wǎng)絡建立聯(lián)系:
如何處理多端連接
echo server
最大的缺點就是它同一時間只能服務一個client
宇弛,直到連接的斷開鸡典,echo client
同樣也有不足,當client
進行如下操作時枪芒,有可能s.recv()
只返回了一個字節(jié)的數(shù)據(jù)彻况,數(shù)據(jù)并不完整。
data = s.recv(1024)
這里所設定的參數(shù) 1024 表示單次接收的最大數(shù)據(jù)量舅踪,并不是說會返回 1024 字節(jié)的數(shù)據(jù)纽甘。在server
中使用的send()
與之類似,調用后它有一個返回值抽碌,標示已經(jīng)發(fā)送出去的數(shù)據(jù)量悍赢,可能是小于我們實際要發(fā)送的數(shù)據(jù)量,比如說有 6666 字節(jié)的數(shù)據(jù)要發(fā)送货徙,用上面的發(fā)送方式要發(fā)送很多此才行左权,也就是說一次調用send()
數(shù)據(jù)并沒有被完整發(fā)送,我們需要自己做這個檢查來確保數(shù)據(jù)完整發(fā)送了痴颊。
因此赏迟,這里使用了sendall()
,它會不斷地幫我們發(fā)送數(shù)據(jù)直到數(shù)據(jù)全部發(fā)送或者出現(xiàn)錯誤蠢棱。
所以锌杀,目前有兩個問題:
- 怎樣同時處理多個連接甩栈?
- 怎樣調用
send()
和recv()
直到數(shù)據(jù)全部發(fā)送或接收。
要實現(xiàn)并發(fā)糕再,傳統(tǒng)方法是使用多線程量没,最近比較流行的方法是使用在Python3.4
中引入的異步IO模塊asyncio
。
這里準備用更加傳統(tǒng)突想,但是更容易理解的方式來實現(xiàn)殴蹄,基于系統(tǒng)底層的一個調用:select()
,Python
中也提供了對應的模塊:selectors 蒿柳,它在原生的實現(xiàn)上進行了封裝饶套,通過使用DefaultSelector
,能更加簡單的完成任務垒探。
select()
通過了一種機制,它來監(jiān)聽操作發(fā)生情況怠李,一旦某個操作準備就緒(一般是讀就緒或者是寫就緒)圾叼,然后將需要進行這些操作的應用程序select出來,進行相應的讀和寫操作捺癞。到這里夷蚊,你可能會發(fā)現(xiàn)這并沒有實現(xiàn)并發(fā),但是它的響應速度非乘杞椋快惕鼓,通過異步操作,足夠模擬并發(fā)的效果了唐础。
Muti-Connection Client and Server
Multi-Connection Server
import selectors
sel = selectors.DefaultSelector()
# ...
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.bind((host, port))
lsock.listen()
print('listening on', (host, port))
lsock.setblocking(False)
sel.register(lsock, selectors.EVENT_READ, data=None)
和echo server
最大的不同就在于箱歧,通過lsock.setblocking(False)
,將這個socket
對象設置成了非阻塞形式一膨,與sel.select()
一起使用呀邢,就可以在一個或多個socket
對象上等待事件,然后在數(shù)據(jù)準備就緒時進行數(shù)據(jù)的讀寫操作豹绪。
sel.register()
給server
注冊了我們需要的事件价淌,對server
來說袜腥,我們需要 I/O 可讀眷蚓,從而進行client
發(fā)送數(shù)據(jù)的讀入窗宇,因此蚊荣,通過selector.EVENT_READ
來指明堰汉。
data
用來存儲和socket
有關的任何數(shù)據(jù)帮孔,當sel.select()
返回結果時祭务,它也被返回蝙搔,我們用它作為一個標志钓辆,來追蹤擁有讀入和寫入操作的socket
對象剪验。
接下來是事件循環(huán):
import selectors
sel = selectors.DefaultSelector()
# ...
while True:
events = sel.select(timeout=None)
for key, mask in events:
if key.data is None:
accept_wrapper(key.fileobj)
else:
service_connection(key, mask)
sel.select(timeout=None)
是一個阻塞式調用肴焊,直到有socket
對象準備好了 I/O 操作,或者等待時間超過設定的timeout
功戚。它將返回(key, events)
這類元組構成的一個列表娶眷,每一個對應一個就緒的socket
對象。
key
是一個SeletorKey
類型的實例啸臀,它有一個fileobj
的屬性届宠,這個屬性就是sokect
對象。
mask
是就緒操作的狀態(tài)掩碼乘粒。
如果key.data is None
豌注,我們就知道,這是一個server
對象灯萍,于是要調用accept()
方法轧铁,用來等待client
的連接。不過我們要調用我們自己的accept_wrapper()
函數(shù)旦棉,里面還會包含其他的邏輯齿风。
如果key.data is not None
,我們就知道绑洛,這是一個client
對象救斑,它帶著數(shù)據(jù)來建立連接啦!然后我們要為它提供服務真屯,于是就調用service_connection(key, mask)
脸候,完成所有的服務邏輯。
def accept_wrapper(sock):
conn, addr = sock.accept() # Should be ready to read
print('accepted connection from', addr)
conn.setblocking(False)
data = types.SimpleNamespace(addr=addr, inb=b'', outb=b'')
events = selectors.EVENT_READ | selectors.EVENT_WRITE
sel.register(conn, events, data=data)
這個函數(shù)用來處理與client
的連接绑蔫,使用conn.setblocking(False)
將該對象設置為非阻塞狀態(tài)运沦,這正是我們在這個版本的程序中所需要的,否則晾匠,整個server
會停止茶袒,直到它返回,這意味著其他socket
對象進入等待狀態(tài)凉馆。
然后薪寓,使用types.SimplleNamespace()
構建了一個data
對象,存儲我們想保存的數(shù)據(jù)和socket
對象澜共。
因為數(shù)據(jù)的讀寫都是通過conn
向叉,所以使用selectors.EVENT_READ | selectors.EVENT_WRITE
,然后用sel.register(conn, events, data=data)
進行注冊嗦董。
def service_connection(key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024) # Should be ready to read
if recv_data:
data.outb += recv_data
else:
print('closing connection to', data.addr)
sel.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if data.outb:
print('echoing', repr(data.outb), 'to', data.addr)
sent = sock.send(data.outb) # Should be ready to write
data.outb = data.outb[sent:]
這就時服務邏輯的核心母谎,key
中包含了socket
對象和data
對象,mask
是已經(jīng)就緒操作的掩碼京革。根據(jù)sock
可以讀奇唤,將數(shù)據(jù)保存在data.outb
中幸斥,這也將成為寫出的數(shù)據(jù)。
if recv_data:
data.outb += recv_data
else:
print('closing connection to', data.addr)
sel.unregister(sock)
sock.close()
如果沒有接收到數(shù)據(jù)咬扇,說明client
數(shù)據(jù)發(fā)完了甲葬,sock
的狀態(tài)不再被追蹤,然后關閉這次連接懈贺。
Multi-Connection Client
messages = [b'Message 1 from client.', b'Message 2 from client.']
def start_connections(host, port, num_conns):
server_addr = (host, port)
for i in range(0, num_conns):
connid = i + 1
print('starting connection', connid, 'to', server_addr)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
sock.connect_ex(server_addr)
events = selectors.EVENT_READ | selectors.EVENT_WRITE
data = types.SimpleNamespace(connid=connid,
msg_total=sum(len(m) for m in messages),
recv_total=0,
messages=list(messages),
outb=b'')
sel.register(sock, events, data=data)
使用connect_ex()
而不是connect()
经窖,因為connect()
會立即引發(fā)BlockingIOError異常。connect_ex()
只返回錯誤碼 errno.EINPROGRESS梭灿,而不是在連接正在進行時引發(fā)異常画侣。連接完成后,socket
對象就可以進行讀寫堡妒,并通過select()
返回配乱。
連接建立完成后,我們使用了types.SimpleNamespace
構建出和socket
對象一同保存的數(shù)據(jù)涕蚤,里面的messages
對我們要發(fā)送的數(shù)據(jù)做了一個拷貝宪卿,因為在后續(xù)的發(fā)送過程中,它會被修改万栅。client
需要發(fā)送什么,已經(jīng)發(fā)送了什么以及已經(jīng)接收了什么都要進行追蹤西疤,總共要發(fā)送的數(shù)據(jù)字節(jié)數(shù)也保存在了data
對象中烦粒。
def service_connection(key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024) # Should be ready to read
if recv_data:
print('received', repr(recv_data), 'from connection', data.connid)
data.recv_total += len(recv_data)
if not recv_data or data.recv_total == data.msg_total:
print('closing connection', data.connid)
sel.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if not data.outb and data.messages:
data.outb = data.messages.pop(0)
if data.outb:
print('sending', repr(data.outb), 'to connection', data.connid)
sent = sock.send(data.outb) # Should be ready to write
data.outb = data.outb[sent:]
client
要追蹤來自server
的數(shù)據(jù)字節(jié)數(shù),如果收到的數(shù)據(jù)字節(jié)數(shù)和發(fā)送的相等代赁,或者有一次沒有收到數(shù)據(jù)扰她,說明數(shù)據(jù)接收完成,本次服務目的已經(jīng)達成芭碍,就可以關閉這次連接了徒役。
data.outb
用來維護發(fā)送的數(shù)據(jù),前面提到過窖壕,一次發(fā)送不一定能將數(shù)據(jù)全部送出忧勿,使用data.outb = data.outb[sent:]
來更新數(shù)據(jù)的發(fā)送。發(fā)送完畢后瞻讽,再messages
中取出數(shù)據(jù)準備再次發(fā)送鸳吸。
可以在這里看到最后的完整代碼:
最后的運行效果如下:
還是要先啟動server
,進入監(jiān)聽狀態(tài)速勇,然后client
啟動晌砾,與server
建立兩條連接,要發(fā)送的信息有兩條烦磁,這里分開發(fā)送养匈,先將fist message
分別發(fā)送到server
哼勇,然后再發(fā)送second message
。server
端收到信息后進行暫時保存呕乎,當兩條信息都收到了才開始進行echo积担,client
端收到完整信息后表示服務結束,斷開連接楣嘁。