??最近想研究一下關(guān)于長鏈接的相關(guān)內(nèi)容幻馁,在B站上看到了Zinx框架的視頻,是Golang語言的框架售睹,本著更好的理解框架的內(nèi)容桩警,按照整個(gè)Zinx課程的進(jìn)度,制作一個(gè)Python版本的Zinx框架昌妹。有關(guān)于Zinx框架的具體內(nèi)容捶枢,可以看框架作者的介紹。
??python版本的Zinx飞崖,基于Gevent 22.10.2烂叔,使用協(xié)程功能。
??golang版本的Zinx項(xiàng)目固歪,項(xiàng)目中兩個(gè)文件夾蒜鸡,ziface和znet。
- ziface主要是存放一些Zinx框架的全部模塊的抽象層接口類牢裳。
- znet模塊是zinx框架中網(wǎng)絡(luò)相關(guān)功能的實(shí)現(xiàn)逢防,所有網(wǎng)絡(luò)相關(guān)模塊都會定義在znet模塊中。
└── zinx
?├── ziface
?│??└──
?└── znet
????├──
??python中的關(guān)鍵字沒有interface蒲讯,但是可以使用抽象基類(abstract base class)和第三方庫來實(shí)現(xiàn)類似于接口的功能忘朝。在實(shí)際開發(fā)中,我們可以根據(jù)具體需求選擇合適的實(shí)現(xiàn)方式伶椿。
??暫時(shí)使用抽象基類的形式模擬接口的實(shí)現(xiàn)辜伟。
??上一節(jié)封裝了全局的配置,這節(jié)做一下消息的封裝脊另。在封裝request的時(shí)候關(guān)于數(shù)據(jù)并沒有做過多的處理窃爷。這節(jié)繼續(xù)豐富一下,將數(shù)據(jù)的分類痊夭、長度构捡、數(shù)據(jù)進(jìn)行打包處理。
??在ziface中創(chuàng)建imessage.py文件。
# -*- coding: utf-8 -*-
from abc import ABC, abstractmethod
class IMessage(ABC):
"""
將請求的一個(gè)消息封裝到message中枚赡,定義抽象層接口
"""
@abstractmethod
def GetDataLen(self) -> int:
"""
獲取消息數(shù)據(jù)段長度
:return:
"""
pass
@abstractmethod
def GetMsgId(self) -> int:
"""
獲取消息ID
:return:
"""
pass
@abstractmethod
def GetData(self) -> bytes:
"""
獲取消息內(nèi)容
:return:
"""
pass
@abstractmethod
def SetDataLen(self, m_len: int):
"""
設(shè)置消息數(shù)據(jù)段長度
:param m_len:
"""
pass
@abstractmethod
def SetMsgId(self, m_id: int):
"""
設(shè)置消息ID
:param m_id:
"""
pass
@abstractmethod
def SetData(self, data: bytes):
"""
設(shè)置消息內(nèi)容
:param data:
"""
pass
??在znet中做一個(gè)實(shí)現(xiàn)類氓癌。
# -*- coding: utf-8 -*-
from ziface.imessage import IMessage
class Message(IMessage):
"""
將請求的一個(gè)消息封裝到message中,定義抽象層接口
"""
def __init__(self, m_id: int, m_len: int, m_data: bytes):
self.id: int = m_id
self.length: int = m_len
self.data: bytes = m_data
def GetDataLen(self) -> int:
"""
獲取消息數(shù)據(jù)段長度
:return:
"""
return self.length
def GetMsgId(self) -> int:
"""
獲取消息ID
:return:
"""
return self.id
def GetData(self) -> bytes:
"""
獲取消息內(nèi)容
:return:
"""
return self.data
def SetDataLen(self, m_len: int):
"""
設(shè)置消息數(shù)據(jù)段長度
:param m_len:
"""
self.length = m_len
def SetMsgId(self, m_id: int):
"""
設(shè)置消息ID
:param m_id:
"""
self.id = m_id
def SetData(self, m_data: bytes):
"""
設(shè)置消息內(nèi)容
:param m_data:
"""
self.data = m_data
def NewMessage(m_id: int, m_len: int, m_data: bytes) -> IMessage:
ns = Message(m_id, m_len, m_data)
return ns
??接下來做封包贫橙、拆包贪婉。統(tǒng)一數(shù)據(jù)格式:數(shù)據(jù)長度(4位)|數(shù)據(jù)ID(4位)|數(shù)據(jù)內(nèi)容
??在ziface中創(chuàng)建idatapack。
# -*- coding: utf-8 -*-
from ziface.imessage import IMessage
from abc import ABC, abstractmethod
class IDataPack(ABC):
"""
封包數(shù)據(jù)和拆包數(shù)據(jù)
直接面向TCP連接中的數(shù)據(jù)流,為傳輸數(shù)據(jù)添加頭部信息卢肃,用于處理TCP粘包問題疲迂。
"""
@abstractmethod
def GetHeadLen(self) -> int:
"""
獲取包頭長度方法
:return:
"""
pass
@abstractmethod
def Pack(self, msg: IMessage) -> bytes:
"""
封包方法
:param msg:
:return:
"""
pass
@abstractmethod
def Unpack(self, data: bytes) -> IMessage:
"""
拆包方法
:param data:
:return:
"""
pass
??在znet中,做實(shí)現(xiàn)類莫湘。
# -*- coding: utf-8 -*-
from typing import Optional
from ziface.imessage import IMessage
from znet.message import NewMessage
from ziface.idatapack import IDataPack
from utils.globalobj import GlobalObject
class DataPack(IDataPack):
"""
采用經(jīng)典的TLV(Type-Len-Value)封包格式
每一個(gè)數(shù)據(jù)包的格式如下:
DataLen | ID | Data
數(shù)據(jù)長度 | 數(shù)據(jù)ID | 數(shù)據(jù)內(nèi)容
每一個(gè)數(shù)據(jù)包的格式統(tǒng)一尤蒿,從TCP流中讀取數(shù)據(jù),就按照這個(gè)格式進(jìn)行解析幅垮。
"""
def __init__(self):
pass
def GetHeadLen(self) -> int:
"""
獲取包頭長度方法
:return:
"""
# Id uint32(4字節(jié)) + DataLen uint32(4字節(jié))
return 8
def Pack(self, msg: IMessage) -> bytes:
"""
封包方法
:param msg:
:return:
"""
# 創(chuàng)建一個(gè)存放bytes字節(jié)的緩沖
buffer = bytes()
# 寫dataLen
buffer += msg.GetDataLen().to_bytes(4, "little")
# 寫msgID
buffer += msg.GetMsgId().to_bytes(4, "little")
# 寫data數(shù)據(jù)
buffer += msg.GetData()
return buffer
def Unpack(self, data: bytes) -> Optional[IMessage]:
"""
拆包方法
:param data:
:return:
"""
# 讀dataLen
data_len = int.from_bytes(data[:4], byteorder='little', signed=False)
# 度msgID
msg_id = int.from_bytes(data[4:], byteorder='little', signed=False)
# 判斷dataLen的長度是否超出我們允許的最大包長度
if 0 < GlobalObject.MaxPacketSize < data_len:
return None
msg = NewMessage(msg_id, data_len, bytes())
# 這里只需要把head的數(shù)據(jù)拆包出來就可以了腰池,然后再通過head的長度,再從conn讀取一次數(shù)據(jù)
return msg
def NewDataPack() -> IDataPack:
dp = DataPack()
return dp
??封裝好的消息處理忙芒,在IRequest中集成一下示弓。
# -*- coding: utf-8 -*-
from ziface.iconnection import IConnection
from abc import ABC, abstractmethod
class IRequest(ABC):
"""
IRequest 接口:實(shí)際上是把客戶端請求的鏈接信息 和 請求的數(shù)據(jù) 包裝到了 Request里
"""
@abstractmethod
def GetConnection(self) -> IConnection:
"""
獲取請求連接信息
:return:
"""
pass
@abstractmethod
def GetData(self) -> bytes:
"""
獲取請求消息的數(shù)據(jù)
:return:
"""
pass
@abstractmethod
def GetMsgID(self) -> int:
"""
獲取請求消息的ID
:return:
"""
pass
??封裝好的消息處理,在Request中集成一下匕争。
# -*- coding: utf-8 -*-
from ziface.irequest import IRequest
from ziface.imessage import IMessage
from ziface.iconnection import IConnection
class Request(IRequest):
def __init__(self, conn: IConnection, msg: IMessage):
"""
:param conn: 已經(jīng)和客戶端建立好的 鏈接
:param msg: 客戶端請求的數(shù)據(jù)
"""
self.Conn: IConnection = conn
self.Msg: IMessage = msg
def GetConnection(self) -> IConnection:
"""
獲取請求連接信息
:return:
"""
return self.Conn
def GetData(self) -> bytes:
"""
獲取請求消息的數(shù)據(jù)
:return:
"""
return self.Msg.GetData()
def GetMsgID(self) -> int:
"""
獲取請求消息的ID
:return:
"""
return self.Msg.GetMsgId()
def NewRequest(conn: IConnection, msg: IMessage) -> IRequest:
req = Request(conn, msg)
return req
??Request修改了避乏,調(diào)用Request的位置需要相應(yīng)的修改。修改Connection的StartReader方法甘桑。
def StartReader(self):
"""
處理讀業(yè)務(wù)
:return:
"""
print("開啟讀業(yè)務(wù)")
while True:
try:
dp = NewDataPack()
# 讀取客戶端的Msg head
head_data = self.Conn.recv(dp.GetHeadLen())
if len(head_data) == 0:
# head_data 長度為0表示客戶端已經(jīng)退出
break
# 拆包拍皮,得到msgID 和 dataLen 放在msg中
msg = dp.Unpack(head_data)
# 根據(jù) dataLen 讀取 data,放在msg.Data中
data_content = self.Conn.recv(msg.GetDataLen())
msg.SetData(data_content)
# 得到當(dāng)前conn數(shù)據(jù)的Request請求數(shù)據(jù)
req = NewRequest(self, msg)
g2 = gevent.spawn(self.RunHandler(req))
GlobalGevents.append(g2)
except Exception as e:
print("讀取數(shù)據(jù)異常 ", e)
break
print("讀業(yè)務(wù)關(guān)閉跑杭,ID", self.ConnID)
self.Stop()
??拆包封裝好了铆帽,接下來再IConnection提供一個(gè)發(fā)送封包方法SendMsg。
# -*- coding: utf-8 -*-
import socket
from abc import ABC, abstractmethod
class IConnection(ABC):
@abstractmethod
def Start(self):
"""
啟動(dòng)鏈接 讓當(dāng)前的鏈接準(zhǔn)備開始工作
:return:
"""
pass
@abstractmethod
def Stop(self):
"""
停止鏈接 結(jié)束當(dāng)前鏈接的工作
:return:
"""
pass
@abstractmethod
def GetConnID(self) -> int:
"""
獲取當(dāng)前鏈接模塊的鏈接ID
:return:
"""
pass
@abstractmethod
def GetTCPConnection(self) -> socket.socket:
"""
獲取當(dāng)前鏈接綁定的socket conn
:return:
"""
pass
@abstractmethod
def GetRemoteAddr(self) -> tuple:
"""
獲取遠(yuǎn)程客戶端的TCP狀態(tài) IP Port
:return:
"""
pass
@abstractmethod
def SendMsg(self, msgID: int, data: bytes):
"""
發(fā)送數(shù)據(jù) 將數(shù)據(jù)發(fā)送給遠(yuǎn)程的客戶端
:param msgID:
:param data:
:return:
"""
pass
??在Connection中實(shí)現(xiàn)一下德谅。
def SendMsg(self, msgID: int, data: bytes):
"""
發(fā)送數(shù)據(jù) 將數(shù)據(jù)發(fā)送給遠(yuǎn)程的客戶端
:param msgID:
:param data:
:return:
"""
if self.is_closed:
raise Exception("發(fā)送數(shù)據(jù)時(shí)客戶端鏈接被關(guān)閉")
try:
dp = NewDataPack()
msg = dp.Pack(NewMessage(msgID, len(data), data))
self.Conn.send(msg)
except Exception as e:
print(e)
??服務(wù)接口做完了爹橱。在demo\datapack中做一個(gè)客戶端client.py測試一下
import socket
import time
import sys
sys.path.append("../../")
from znet.datapack import NewDataPack
from znet.message import NewMessage
if __name__ == '__main__':
HOST = '127.0.0.1' # 服務(wù)器IP地址
PORT = 8986 # 服務(wù)器端口號
# 創(chuàng)建socket對象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 連接服務(wù)器
s.connect((HOST, PORT))
print(f"Connected to {HOST}:{PORT}")
# 發(fā)送數(shù)據(jù)
data = "Hello, Server"
msg = NewMessage(1, len(data), data.encode("utf-8"))
dp = NewDataPack()
send_data = dp.Pack(msg)
s.sendall(send_data)
# 接收響應(yīng)數(shù)據(jù)
response = s.recv(1024)
print("Received from server: ", response)
# 關(guān)閉連接
s.close()
??再做一個(gè)服務(wù)端。
# -*- coding: utf-8 -*-
import sys
sys.path.append("../../")
from znet.server import NewServer
from znet.router import BaseRouter
from ziface.irequest import IRequest
class PingRouter(BaseRouter):
"""
用戶自定義路由
"""
def __init__(self):
super().__init__()
def Handle(self, request: IRequest):
"""
處理conn業(yè)務(wù)的方法
:param request:
:return:
"""
try:
print("調(diào)用Handle")
request.GetConnection().GetTCPConnection().send("ping\n".encode("GB2312"))
request.GetConnection().GetTCPConnection().send(request.GetData())
except Exception as e:
print("ping異常", e)
if __name__ == '__main__':
server = NewServer()
server.AddRouter(PingRouter())
server.Serve()
??此時(shí)發(fā)送和接收都正常窄做。消息封裝模塊完成愧驱。