了解RESP協(xié)議臭觉,并實(shí)現(xiàn)了pack編碼昆雀,下一步就是發(fā)送命令給redis服務(wù)器,等待結(jié)果返回就好啦胧谈。
當(dāng)然忆肾,想要發(fā)送命令,還需要先創(chuàng)建連接菱肖。如果連接都不存在客冈,就無所謂發(fā)送命令了。我們已經(jīng)了解發(fā)送網(wǎng)絡(luò)數(shù)據(jù)需要先創(chuàng)建socket連接稳强。因此下面寫了一個(gè)簡單的客戶端代碼场仲,基于前面編碼RESP的代碼:
command = Connection().pack_command(*args)
print(command)
import socket
address = ('127.0.0.1', 6379)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(address)
sock.sendall(command[0])
print(sock.recv(1024))
可以看到服務(wù)返回了并打印了b'+PONG\r\n'
結(jié)果。一切工作正常退疫。等等渠缕,如果每次我們發(fā)送數(shù)據(jù)都需要這么原始的創(chuàng)建socket,然后調(diào)用連接褒繁,發(fā)送數(shù)據(jù)亦鳞,那么這個(gè)腳本也太丑了。因此棒坏,我們的主題就是學(xué)習(xí)redispy是如何抽象封裝連接管理的燕差。
連接
redispy定義了一個(gè)Connection類用于連接的管理,而定義了一個(gè)PythonParser類用來適配Hiredis坝冕。同時(shí)還實(shí)現(xiàn)了一個(gè)SocketBuffer方法用于收發(fā)處理socket數(shù)據(jù)徒探。當(dāng)然,本篇只討論創(chuàng)建連接發(fā)送數(shù)據(jù)喂窟。
我們的客戶端代碼為
conn = Connection()
conn.send_command("PING")
print(conn.read_response())
創(chuàng)建一個(gè)Connection實(shí)例测暗,然后發(fā)送命令到redis服務(wù)器央串,最后再讀取服務(wù)器的響應(yīng)。當(dāng)然碗啄,conn.read_response的源碼分析暫時(shí)可以忽略质和。
send_command 方法內(nèi)會(huì)先打包編碼命令,然后再調(diào)用send_packed_command方法發(fā)送命令挫掏。send_packed_command方法的源碼如下:
def send_packed_command(self, command):
"""將編碼后的redis命令發(fā)送到redis服務(wù)器"""
if not self._sock:
self.connect()
try:
if isinstance(command, str):
command = [command]
for item in command:
self._sock.sendall(item)
except socket.timeout:
self.disconnect()
raise TimeoutError('Timeout writing to socket')
except socket.error:
e = sys.exc_info()[1]
self.disconnect()
if len(e.args) == 1:
errno, errmsg = 'UNKNOWN', e.args[0]
else:
errno = e.args[0]
errmsg = e.args[1]
raise ConnectionError("Error {} while writing to socket. {}.".format(errno, errmsg))
except:
self.disconnect()
raise
Connection實(shí)例化的時(shí)候會(huì)初始一些熟悉侦另,例如 _sock和_parser對象,兩者分別是用于通信的socket和解析resp協(xié)議的適配器尉共。
send_packed_command首先會(huì)檢查_sock
,即socket是否創(chuàng)建褒傅。如果尚未創(chuàng)建,則會(huì)調(diào)用connect方法創(chuàng)建連接袄友。connect返回創(chuàng)建的socket連接殿托。
一旦有了socket,就可以直接調(diào)用sendall方法發(fā)送命令了剧蚣。再socket交互的過程中支竹,還得注意可能產(chǎn)生的異常和錯(cuò)誤。
創(chuàng)建連接
Conneciton使用connect方法創(chuàng)建連接鸠按,該方法如下:
def connect(self):
if self._sock:
return
try:
sock = self._connect()
except socket.error:
e = sys.exc_info()[1]
raise ConnectionError(self._error_message(e))
self._sock = sock
try:
self.on_connect()
except RedisError:
self.disconnect()
raise
同樣也會(huì)先檢查一下_sock
對象礼搁,然后調(diào)用_connect
方法創(chuàng)建socket。再創(chuàng)建完畢socket之后目尖,還調(diào)用了一個(gè)on_connect
方法做一些連接后的工作馒吴。
_connect
的源碼略多,但是很清晰:
def _connect(self):
""" 創(chuàng)建 socket 連接,并返回socket對象
"""
err = None
for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM):
family, socktype, proto, canonname, socket_address = res
try:
# 創(chuàng)建 tcp socket 對象
sock = socket.socket(family, socktype, proto)
# TCP_NODELAY
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.socket_keepalive:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
for k, v in iteritems(self.socket_keepalive_options):
sock.setsockopt(socket.SOL_TCP, k, v)
# 設(shè)置創(chuàng)建連接的超時(shí)時(shí)間
sock.settimeout(self.socket_connect_timeout)
sock.connect(socket_address)
# 設(shè)置連接之后的超時(shí)時(shí)間
sock.settimeout(self.socket_timeout)
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
raise socket.error("socket.getaddrinfo returned an empty list")
首先調(diào)用socket的getaddrinfo方法瑟曲,確定通信的族協(xié)議和socket類型饮戳。即IP4和tcp通信方式。然后設(shè)置socket選項(xiàng)洞拨,開啟TCP_NODELAY模式扯罐,即禁用Nagle
算法。TCP為了保證網(wǎng)絡(luò)的性能烦衣,通常會(huì)把緩沖區(qū)的數(shù)據(jù)積累到一定程度再一起發(fā)送歹河。而對于redis這種一問一答的通信方式,需要你盡可能的發(fā)送消息花吟,而不用等待启泣,因此會(huì)禁用掉該算法。
Nagle算法的基本定義是任意時(shí)刻示辈,最多只能有一個(gè)未被確認(rèn)的小段。 所謂“小段”遣蚀,指的是小于MSS尺寸的數(shù)據(jù)塊矾麻,
所謂“未被確認(rèn)”纱耻,是指一個(gè)數(shù)據(jù)塊發(fā)送出去后,沒有收到對方發(fā)送的ACK確認(rèn)該數(shù)據(jù)已收到险耀。
Nagle算法只允許一個(gè)未被ACK的包存在于網(wǎng)絡(luò)弄喘,它并不管包的大小,因此它事實(shí)上就是一個(gè)擴(kuò)展的停-等協(xié)議甩牺,只不過它是基于包停-等的蘑志,而不是基于字節(jié)停-等的。
TCP_NODELAY 選項(xiàng)
默認(rèn)情況下贬派,發(fā)送數(shù)據(jù)采用Nagle 算法急但。這樣雖然提高了網(wǎng)絡(luò)吞吐量,但是實(shí)時(shí)性卻降低了搞乏,在一些交互性很強(qiáng)的應(yīng)用程序來說是不允許的波桩,
使用TCP_NODELAY選項(xiàng)可以禁止Nagle 算法。
然后就是對長連接(keepalive)的選項(xiàng)設(shè)置请敦,最后設(shè)置連接的超時(shí)時(shí)間镐躲,然后調(diào)用socket的connect方法創(chuàng)建連接。創(chuàng)建連接后侍筛,自然要把連接對象的socket返回萤皂。
連接后初始化
redispy在創(chuàng)建了連接之后,會(huì)調(diào)用on_connect 方法做一些初始化工作:
def on_connect(self):
self._parser.on_connect(self)
if self.password:
self.send_command('AUTH', self.password)
if nativestr(self.read_response()) != 'OK':
raise AuthenticationError('Invalid Password')
if self.db:
self.send_command('SELECT', self.db)
if nativestr(self.read_response()) != 'OK':
raise ConnectionError('Invalid Database')
on_connect會(huì)先通過PythonParser的on_connect方法匣椰,初始化ScoketBuffer裆熙,這樣做的目的就是為了接下來接受redis的回復(fù)。暫時(shí)可以忽略窝爪,主要看其他的邏輯弛车。
如果需要認(rèn)證,就再調(diào)用 send_command 方法發(fā)送認(rèn)證的命令蒲每,如果創(chuàng)建連接指定了數(shù)據(jù)庫纷跛,就發(fā)送選擇數(shù)據(jù)庫的指令,默認(rèn)是選擇數(shù)據(jù)庫0邀杏。
這里的代碼很有意思贫奠,我們的入口就是 send_command 方法。send_command 內(nèi)的流程中又會(huì)先檢查連接望蜡,連接沒有就創(chuàng)建唤崭,連接有了才返回。此時(shí)再次調(diào)用send_command方法脖律,顯然已經(jīng)有了連接谢肾,因此代碼會(huì)走到self._sock.sendall(item)
發(fā)送命令。
從這里也可以看出小泉,redispy創(chuàng)建連接是惰性的芦疏,只有發(fā)送查詢的命令的時(shí)候冕杠,才會(huì)檢查連接,從而決定是否創(chuàng)建socket連接酸茴。
總結(jié)
redispy中創(chuàng)建連接并沒有很多高深的內(nèi)容分预。通過封裝一個(gè)連接對象管理連接。代碼不復(fù)雜并不代碼沒有用薪捍。實(shí)際上笼痹,正式因?yàn)樵O(shè)計(jì)了這么簡單的連接方式,才為后面的連接池實(shí)現(xiàn)帶來了便利酪穿。
收發(fā)數(shù)據(jù)都是基于socket凳干,socket創(chuàng)建是表示可以進(jìn)入數(shù)據(jù)的收發(fā)狀態(tài),但這只是最底層的連接昆稿。redis的邏輯連接還需要我們認(rèn)證或者選擇數(shù)據(jù)庫纺座。因?yàn)楹笳咭呀?jīng)涉及了socket數(shù)據(jù)通信,因此也是再連接創(chuàng)立之后的操作溉潭。redispy為了考慮這種方法净响,很好的復(fù)用了send_command 方法。通過on_connect的方式創(chuàng)建redis的邏輯連接喳瓣。
至于read_response的內(nèi)容馋贤,將會(huì)是RESP協(xié)議的解析,和redis回復(fù)內(nèi)容的話題畏陕。我們將會(huì)在接下來的文中分析配乓。