背景
在開發(fā)Mock中心的過程中及汉,每個(gè)server
與client
通訊的時(shí)候志秃,需要使用unix socket
這種高效的本機(jī)通訊協(xié)議來交換數(shù)據(jù)圈膏,但是unix socket
通訊協(xié)議是基于文件的澳厢,也就是當(dāng)并發(fā)量大的時(shí)候思瘟,單個(gè)文件作為通訊信道會(huì)出現(xiàn)擁堵的情況荸百。
思路
解決的思路很簡(jiǎn)單,不使用單文件作為通訊信道潮太。
TCP
協(xié)議中管搪,應(yīng)對(duì)并發(fā)是有多種方式的。最常規(guī)的方式就是以多線程的方式铡买,監(jiān)聽多個(gè)通訊信道更鲁,還有Linux
上比較經(jīng)典的epoll
的方式。
從本質(zhì)上來說奇钞,多線程的方式澡为,其實(shí)就是開啟了多個(gè)通信信道,在Linux
系統(tǒng)底層看來景埃,就是多個(gè)socket
文件媒至。而epoll
的方式顶别,其實(shí)就是極致的壓榨單信道的性能。
在設(shè)計(jì)這個(gè)通訊方式的時(shí)候拒啰,其實(shí)就是為了簡(jiǎn)便的實(shí)現(xiàn)高性能驯绎。
在server
監(jiān)聽的時(shí)候,僅監(jiān)聽一個(gè)文件谋旦,但是回包的時(shí)候剩失,client
在請(qǐng)求之前先監(jiān)聽一個(gè)文件,然后把文件地址帶到請(qǐng)求串中册着,server
在收到這個(gè)請(qǐng)求之后拴孤,回包就不通過原路返回,而是回到這個(gè)client
監(jiān)聽的地址甲捏。
代碼
- client
class UnixSocketUDPServer(object):
"""由于unix socket的特殊模式演熟,如果有返回值的,必須在發(fā)包前監(jiān)聽一個(gè)socket文件"""
def __init__(self, srv_addr, soc_model=socket.SOCK_DGRAM):
try:
os.unlink(srv_addr)
except OSError:
if os.path.exists(srv_addr):
raise EnvironmentError("path is exist")
self.srv_addr = srv_addr
self.rsp_data = ""
self.sock = socket.socket(socket.AF_UNIX, soc_model, 0)
self.sock.bind(self.srv_addr)
def __del__(self):
os.unlink(self.srv_addr)
def unpack_package(data):
"""
unix socket 的解包方法司顿,對(duì)應(yīng)下面的pack_package芒粹。
由于struck的unpack方法解出來的數(shù)據(jù)都是tuple類型,所以取數(shù)據(jù)的時(shí)候需要注意忽略tuple的第二個(gè)參數(shù)
:param data:
:return: tuple免猾,tuple
"""
total_len, addr_len, body_len = struct.unpack("iii", data[:4 * 3])
addr = struct.unpack("{addr_len}s".format(addr_len=addr_len), data[12:addr_len + 12])
body = struct.unpack("{body_len}s".format(body_len=body_len), data[addr_len + 12:])
return addr, body
def pack_package(addr, body):
"""
unix socket 的打包數(shù)據(jù)的方法是辕,打包的內(nèi)容是:地址+數(shù)據(jù)。
打包的格式是: 包總長(zhǎng)度+地址長(zhǎng)度+數(shù)據(jù)長(zhǎng)度+地址+數(shù)據(jù)
:param addr:
:param body:
:return: 打包好的二進(jìn)制數(shù)據(jù)
"""
addr_len = len(addr)
body_len = len(body)
total_len = addr_len + body_len + 12
_package = struct.pack("iii{addr_len}s{body_len}s".format(addr_len=addr_len,
body_len=body_len),
total_len, addr_len, body_len, addr, body)
return _package
addr = "/tmp/{_addr}.sock".format(_addr=random_utils.get_uuid())
# 這里在請(qǐng)求時(shí)需要先監(jiān)聽一個(gè)unix socket文件猎提。
us = network_utils.UnixSocketUDPServer(addr)
us.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 這里使用unicode把數(shù)據(jù)包起來是為了防止解析后某些數(shù)據(jù)非unicode導(dǎo)致數(shù)據(jù)轉(zhuǎn)換失敗
_req_data = unicode(self.method) + u"|" + unicode(str(getattr(self.t, "m_data")), errors="ignore")
logger.info("===========call unix socket to agent===========")
req_data = string_utils.pack_package(addr, str(_req_data))
logger.info(repr(req_data))
us.sock.sendto(req_data, 0, US_ADDR)
sec = 0
usec = 10000
timeval = struct.pack('ll', sec, usec)
us.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO, timeval)
raw_data, _ = us.sock.recvfrom(20480)
- server
s = net_util.unix_socket_server("/tmp/mock_mgr_agent_addr.sock")
while True:
data = s.recv(20480)
_addr, _body = unpack_package(data)
addr = _addr[0]
body = _body[0]
......
ret_info = pack_package(addr, str(_ret_info))
unix_socket_send(addr, ret_info)
說明
- 打包和解包
這里打包和解包用了strunt
方法获三,把字符串打成二進(jìn)制,這樣可以加快傳輸?shù)男氏撬铡M瑫r(shí)也把地址的長(zhǎng)度位和數(shù)據(jù)的長(zhǎng)度位打包進(jìn)去疙教,方便server
截取。
- client
在client的代碼中伞租,先生成了一個(gè)unix socket
的對(duì)象贞谓,監(jiān)聽了一個(gè)用uuid
生成的隨機(jī)文件地址,然后把這個(gè)地址信息和需要傳遞的數(shù)據(jù)一起發(fā)送給服務(wù)端葵诈。
- server
服務(wù)端就是傳統(tǒng)的socket服務(wù)裸弦,只是在回包的時(shí)候,發(fā)往的地址是收到的數(shù)據(jù)中的地址作喘。
- socket端口和地址復(fù)用
一般的理疙,socket綁定了一個(gè)地址,那么就不會(huì)變泞坦,但是我們這里的client端窖贤,需要監(jiān)聽一個(gè)地址的同時(shí),發(fā)送消息到另一個(gè)地址,這里就需要使用這個(gè)socket.SOL_SOCKET
赃梧,通過setsockopt
滤蝠,對(duì)socket
進(jìn)行設(shè)置,允許使用端口和地址復(fù)用授嘀。socket.SO_REUSEADDR
這個(gè)參數(shù)提供如下功能:
SO_REUSEADDR允許啟動(dòng)一個(gè)監(jiān)聽服務(wù)器并捆綁其眾所周知端口物咳,即使以前建立的將此端口用做他們的本地端口的連接仍存在。這通常是重啟監(jiān)聽服務(wù)器時(shí)出現(xiàn)粤攒,若不設(shè)置此選項(xiàng)所森,則bind時(shí)將出錯(cuò)。
SO_REUSEADDR允許在同一端口上啟動(dòng)同一服務(wù)器的多個(gè)實(shí)例夯接,只要每個(gè)實(shí)例捆綁一個(gè)不同的本地IP地址即可。對(duì)于TCP纷妆,我們根本不可能啟動(dòng)捆綁相同IP地址和相同端口號(hào)的多個(gè)服務(wù)器盔几。
SO_REUSEADDR允許單個(gè)進(jìn)程捆綁同一端口到多個(gè)套接口上,只要每個(gè)捆綁指定不同的本地IP地址即可掩幢。這一般不用于TCP服務(wù)器逊拍。
SO_REUSEADDR允許完全重復(fù)的捆綁:當(dāng)一個(gè)IP地址和端口綁定到某個(gè)套接口上時(shí),還允許此IP地址和端口捆綁到另一個(gè)套接口上际邻。一般來說芯丧,這個(gè)特性僅在支持多播的系統(tǒng)上才有,而且只對(duì)UDP套接口而言(TCP不支持多播)世曾。
簡(jiǎn)單的理解就是缨恒,這個(gè)參數(shù)后面的值不等于0,就可以在監(jiān)聽的同時(shí)轮听,發(fā)送數(shù)據(jù)到其他地址骗露。