編碼發(fā)送數(shù)據(jù)到redis服務(wù)污淋,客戶端完成了第一個交互過程蝇更,即請求的過程。接下來客戶端還要接受并解析服務(wù)端的響應(yīng)回復(fù)取刃。這個過程我們需要將RESP協(xié)議編碼的字節(jié)串解析成python的字串吧彪。
由于響應(yīng)回復(fù)有多種待侵,并且有多行的存在。因此解析響應(yīng)的時候要注意對CRLF的處理姨裸,即tcp包的數(shù)據(jù)分界方式秧倾。在我們尚為進行真正的網(wǎng)絡(luò)通信的時候,我們創(chuàng)建一個變量用于表示redis服務(wù)器返回的進入的socket緩沖區(qū)傀缩。此時的代碼邏輯與讀取真實的socket數(shù)據(jù)很像那先,后面我們再介紹redis.py的socket交互。
read_response
redispy中赡艰,調(diào)用PythonParser
類的read_response
方法來讀取redis的數(shù)據(jù)售淡。該方法又會相繼調(diào)用_buffer對象的readline和read方法。后兩者分別調(diào)用SocketBuffer
類的_read_from_socket方法來讀取socket慷垮。為了模擬從socket中讀取數(shù)據(jù)揖闸,我們會修改_read_from_socket方法,使其讀socket的數(shù)據(jù)改成從我們假設(shè)的緩沖區(qū)變量讀取料身。
class Socket(object):
def __init__(self, data):
self.data = data
def recv(self, length):
data = self.data[:length]
self.data=self.data[length:]
return data
用我們定義的Socket類模擬網(wǎng)絡(luò)數(shù)據(jù)流汤纸,其中recv方法則從data中返回數(shù)據(jù)。為了簡化學習芹血,我們暫時把所有錯誤的處理都忽略贮泞。
狀況回復(fù)
從前面的RESP協(xié)議可以得知,狀態(tài)回復(fù)以+
開頭祟牲,后面跟著狀態(tài)消息隙畜,最后以CRLF結(jié)束。
測試的代碼如下:
data = b'+OK\r\n'
pp = PythonParser(socket_read_size=65536)
pp.on_connect(data)
print(pp.read_response())
打印的結(jié)果為b'OK'
说贝。我們先看下PythonParser
類的定義议惰。
class PythonParser(object):
encoding = None
def __init__(self, socket_read_size):
self.socket_read_size = socket_read_size
self._sock = None
self._buffer = None
PythonParser
類定義了讀取socket的數(shù)據(jù)大小,已經(jīng)socket對象和buffer對象乡恕。
再看on_connect
方法言询,主要是初始化了我們假定的Socket對象和SocketBuffer
對象。
def on_connect(self, data):
self._sock = Socket(data)
self._buffer = SocketBuffer(self._sock, self.socket_read_size)
SocketBuffer
SocketBuffer類的主要職能就是把從socket中讀取的數(shù)據(jù)傲宜,以bytes的方式存儲到內(nèi)存中运杭。然后從內(nèi)存中解析該數(shù)據(jù)。通過控制buffer的寫入和寫出的值函卒,可以精確的設(shè)置什么時候從socket中讀數(shù)據(jù)辆憔。
class SocketBuffer(object):
def __init__(self, socket, socket_read_size):
self._sock = socket
self.socket_read_size = socket_read_size
self._buffer = BytesIO()
self.bytes_written = 0
self.bytes_read = 0
@property
def length(self):
return self.bytes_written - self.bytes_read
def _read_from_socket(self, length=None):
pass
def purge(self):
pass
def read(self, length):
pass
def readline(self):
pass
該類實例化的時候會初始化socket對象和_buffer對象,后者是BytesIO的實例,用于讀取寫入內(nèi)存字節(jié)數(shù)據(jù)虱咧。
回到我們的測試代碼中熊榛,一旦調(diào)用了on_connect方法,下面就是調(diào)用read_response方法腕巡。在該方法中玄坦,首先會調(diào)用_buffer
對象的readline
方法:
def readline(self):
buf = self._buffer
buf.seek(self.bytes_read)
data = buf.readline()
# 處理包結(jié)束
while not data.endswith(SYM_CRLF):
self._read_from_socket()
buf.seek(self.bytes_read)
data = buf.readline()
self.bytes_read += len(data)
if self.bytes_read == self.bytes_written:
self.purge()
return data[:-2]
readline
方法的主要功能就是從socket中讀取一行數(shù)據(jù)。首先將bytes的指針seek到起始的位置绘沉。然后判斷是否以CRLF結(jié)尾煎楣,即表示是否讀取了redis的一個編碼單位。如果尚未讀取车伞,就會調(diào)用_read_from_socket
方法從socket緩沖區(qū)讀取數(shù)據(jù)到內(nèi)存緩沖區(qū)中择懂。最后再從內(nèi)存中讀取一行數(shù)據(jù)到data變量中。
例如我們的例子中帖世,redis返回的數(shù)據(jù)是b'+OK\r\n'
休蟹,此時會將所有數(shù)據(jù)都讀取到BytesIO中沸枯,然后從BytesIO讀取到data日矫,最后返回+OK
。
下面再看read_response
方法:
def read_response(self):
response = self._buffer.readline()
byte, response = byte_to_chr(response[0]), response[1:]
if byte not in ('-', '+', ':', '$', '*'):
raise RedisError
# server returned an error
if byte == '-':
response = nativestr(response)
# 處理錯誤
return response
# single value
elif byte == '+':
pass
# int value
elif byte == ':':
response = int(response)
# bulk response
elif byte == '$':
length = int(response)
if length == -1:
return None
response = self._buffer.read(length)
# multi-bulk response
elif byte == '*':
length = int(response)
if length == -1:
return None
response = [self.read_response() for i in range(length)]
if isinstance(response, bytes) and self.encoding:
response = response.decode(self.encoding)
return response
該方法會讀取stocketbuffer對象的返回绑榴,即上面的+OK
哪轿。通過判斷第一個字節(jié)的類型來判斷回復(fù)的類型。此時比較簡單翔怎,直接返回OK
窃诉。錯誤回復(fù)也類似,直接把錯誤類型和錯誤信息返回即可赤套。
分段讀取
上面的例子中飘痛,socket的recv一次調(diào)用的字節(jié)是65536∪菸眨可以把socket緩沖區(qū)的數(shù)據(jù)全部讀取宣脉。如果設(shè)定的大小是每次只讀取一個字節(jié)呢?
修改測試代碼再運行剔氏,我們看見輸入依然正常塑猖。因為在readline代碼中,while not data.endswith(SYM_CRLF)
的判斷可以幫我們斷定什么時候讀取完谈跛。無論一次讀多少個字節(jié)羊苟,data的數(shù)據(jù)從BytesIO讀取都是一行,因此最后總會讀到CRLF中的\n感憾。此時data的數(shù)據(jù)就是以\r\n
結(jié)尾蜡励,結(jié)束從socket中讀數(shù)據(jù)。由此可以,tcp的讀取數(shù)據(jù)是沒有界限的凉倚,就像流水一樣彭则,除非我們在協(xié)議中規(guī)定以什么字符標記作為分界。上面描述的過程大致錄制了一個小視頻占遥,點擊下載俯抖。
數(shù)字回復(fù)
數(shù)字回復(fù)和狀態(tài)回復(fù)類似,只不過回復(fù)的token類型以:
開頭瓦胎,其他過程和狀態(tài)回復(fù)類似芬萍。不同在于客戶端的解析要轉(zhuǎn)換成數(shù)字類型。
批量回復(fù)
狀態(tài)回復(fù)很簡單搔啊,redis操作中柬祠,批量回復(fù)也很常見。并且會比較復(fù)雜负芋÷祝基于上面的代碼運行原理。我們首先也是讀取一行旧蛾,然后接觸回復(fù)類型莽龟。因為批量回復(fù)的token會告訴我們返回的字串的長度∠翘欤可以根據(jù)該信息確定我們read_byte位置毯盈,然后將剩余的socket全部讀取。
例如返回的數(shù)據(jù)如果是 $6\r\nfoobar\r\n
病袄, 經(jīng)過第一次readline的數(shù)據(jù)搂赋,我們得到的response為$6\r\n
。當確定了返回類型是批量回復(fù)益缠,將會繼續(xù)調(diào)用read
方法脑奠,將剩下的數(shù)據(jù)(foobar\r\n)讀取。read的代碼如下:
def read(self, length):
length = length + 2
if length > self.length:
self._read_from_socket(length - self.length)
self._buffer.seek(self.bytes_read)
data = self._buffer.read(length)
self.bytes_read += len(data)
if self.bytes_read == self.bytes_written:
self.purge()
return data[:-2]
read
方法比readline
簡單幅慌。它只需要判斷BytesIO中的數(shù)據(jù)是否是所有redis的數(shù)據(jù)宋欺。對于$6\r\nfoobar\r\n
而言,如果一次讀5個字節(jié)欠痴,那么readline調(diào)用之后迄靠,BytesIO中還有一個f
字符,即長度為1喇辽。因為返回了字符串是6+2個字節(jié)(最后的CRLF)掌挚,因此8>1,說明還要從socket中讀取7個字節(jié)菩咨。即再次調(diào)用_read_from_socket
方法吠式,與readline類似陡厘,讀取到CRLF結(jié)束并返回。當讀取完畢之后特占,需要調(diào)用self.purge情況buffer對象糙置。為了更好的展示這個過程,也錄制了一個小視頻是目。
由于每次讀取5個socket字節(jié)谤饭,因此在從socket中讀取了兩次。如果多讀了呢懊纳。多讀了也沒有關(guān)系揉抵,即使BytesIO多讀了socket的數(shù)據(jù)。在buffer對象讀取的時候還有一個length參數(shù)嗤疯,這個參數(shù)會保證以CRLF結(jié)尾冤今。這也是redis設(shè)計協(xié)議的時候,為什么字符串返回要在$
后加上字節(jié)的長度茂缚。
多批量回復(fù)
多批量回復(fù)以*
開頭戏罢,這個編碼格式和請求的命令一樣。多個字節(jié)串分別編碼脚囊,然后再和*
參數(shù)數(shù)結(jié)合龟糕。例如下面一個回復(fù)樣式:
*3\r\n$3\r\n777\r\n$6\r\n\xe4\xbd\xa0\xe5\xa5\xbd\r\n$5\r\nhello\r\n
再看read_response中解析多批量回復(fù)的代碼:
elif byte == '*':
length = int(response)
if length == -1:
return None
response = [self.read_response() for i in range(length)]
一旦是多批量回復(fù),因為*
后跟著返回的參數(shù)個數(shù)凑术,而這些參數(shù)個數(shù)的編碼和批量回復(fù)的一模一樣翩蘸。既然如此,那么遞歸調(diào)用read_response淮逊,再解析出來的批量回復(fù)組合起來即可。
特殊類型回復(fù)
RESP的回復(fù)我們都介紹了扶踊,所謂的特殊泄鹏。是數(shù)據(jù)情況特別的時候,比如返回空字符串的時候秧耗,token會是0备籽,返回Nil值的時候,token可能是-1分井。具體這些情況车猬,可以參考官方文檔的案例。
總結(jié)
經(jīng)過上面的分析尺锚,我們了解了redispy是如何解析redis服務(wù)器返回的RESP編碼的數(shù)據(jù)珠闰。解碼的關(guān)鍵在于對socket數(shù)據(jù)的讀取。盡管我們是模擬了socket對象瘫辩。上面的代碼和實際socket交互是完全一樣的伏嗜。因為真實的socket.recv調(diào)用也只是應(yīng)用層的程序代碼從socket的緩沖區(qū)讀取數(shù)據(jù)坛悉。緩存區(qū)直接的IO則是內(nèi)核在tcp層處理的內(nèi)容。
我們把真實的socket.recv讀取數(shù)據(jù)從內(nèi)核轉(zhuǎn)移到一個Socket類承绸,這樣的模擬也是合理的裸影,并且易于調(diào)試。不然還得先模擬發(fā)送命令給redis军熏,然后打斷點等待回復(fù)轩猩。
盡管我們的模擬抽象很好,可是真實的編碼還是需要處理socket的數(shù)據(jù)流荡澎,尤其是對于通信錯誤的處理界轩。完整的代碼可以閱讀redis.py項目。
簽名我們介紹了編碼衔瓮,創(chuàng)建連接和現(xiàn)在接受數(shù)據(jù)并解碼浊猾。接下來將會實現(xiàn)redis.py中的另外一個特性,連接池的實現(xiàn)热鞍。
文中相關(guān)代碼