Redispy 源碼學習(五) --- RESP協(xié)議實現(xiàn)--解碼

編碼發(fā)送數(shù)據(jù)到redis服務(wù)污淋,客戶端完成了第一個交互過程蝇更,即請求的過程。接下來客戶端還要接受并解析服務(wù)端的響應(yīng)回復(fù)取刃。這個過程我們需要將RESP協(xié)議編碼的字節(jié)串解析成python的字串吧彪。

由于響應(yīng)回復(fù)有多種待侵,并且有多行的存在。因此解析響應(yīng)的時候要注意對CRLF的處理姨裸,即tcp包的數(shù)據(jù)分界方式秧倾。在我們尚為進行真正的網(wǎng)絡(luò)通信的時候,我們創(chuàng)建一個變量用于表示redis服務(wù)器返回的進入的socket緩沖區(qū)傀缩。此時的代碼邏輯與讀取真實的socket數(shù)據(jù)很像那先,后面我們再介紹redis.py的socket交互。

read_response

redispy中赡艰,調(diào)用PythonParser類的read_response方法來讀取redis的數(shù)據(jù)售淡。該方法又會相繼調(diào)用_buffer對象的readline和read方法。后兩者分別調(diào)用SocketBuffer類的_read_from_socket方法來讀取socket慷垮。為了模擬從socket中讀取數(shù)據(jù)揖闸,我們會修改_read_from_socket方法,使其讀socket的數(shù)據(jù)改成從我們假設(shè)的緩沖區(qū)變量讀取料身。

class Socket(object):

    def __init__(self, data):
        self.data = data

    def recv(self, length):
        data = self.data[:length]
        self.data=self.data[length:]
        return data

用我們定義的Socket類模擬網(wǎng)絡(luò)數(shù)據(jù)流汤纸,其中recv方法則從data中返回數(shù)據(jù)。為了簡化學習芹血,我們暫時把所有錯誤的處理都忽略贮泞。

狀況回復(fù)

從前面的RESP協(xié)議可以得知,狀態(tài)回復(fù)以+開頭祟牲,后面跟著狀態(tài)消息隙畜,最后以CRLF結(jié)束。

測試的代碼如下:

    data = b'+OK\r\n'
    pp = PythonParser(socket_read_size=65536)
    pp.on_connect(data)
    print(pp.read_response())

打印的結(jié)果為b'OK'说贝。我們先看下PythonParser類的定義议惰。

class PythonParser(object):
    encoding = None

    def __init__(self, socket_read_size):
        self.socket_read_size = socket_read_size
        self._sock = None
        self._buffer = None

PythonParser類定義了讀取socket的數(shù)據(jù)大小,已經(jīng)socket對象和buffer對象乡恕。

再看on_connect方法言询,主要是初始化了我們假定的Socket對象和SocketBuffer對象。

    def on_connect(self, data):
        self._sock = Socket(data)
        self._buffer = SocketBuffer(self._sock, self.socket_read_size)

SocketBuffer

SocketBuffer類的主要職能就是把從socket中讀取的數(shù)據(jù)傲宜,以bytes的方式存儲到內(nèi)存中运杭。然后從內(nèi)存中解析該數(shù)據(jù)。通過控制buffer的寫入和寫出的值函卒,可以精確的設(shè)置什么時候從socket中讀數(shù)據(jù)辆憔。

class SocketBuffer(object):
    def __init__(self, socket, socket_read_size):
        self._sock = socket
        self.socket_read_size = socket_read_size
        self._buffer = BytesIO()
        self.bytes_written = 0
        self.bytes_read = 0

    @property
    def length(self):
        return self.bytes_written - self.bytes_read

    def _read_from_socket(self, length=None):
        pass

    def purge(self):
        pass
        
    def read(self, length):
        pass
        
    def readline(self):
        pass

該類實例化的時候會初始化socket對象和_buffer對象,后者是BytesIO的實例,用于讀取寫入內(nèi)存字節(jié)數(shù)據(jù)虱咧。

回到我們的測試代碼中熊榛,一旦調(diào)用了on_connect方法,下面就是調(diào)用read_response方法腕巡。在該方法中玄坦,首先會調(diào)用_buffer對象的readline方法:

    def readline(self):
        buf = self._buffer
        buf.seek(self.bytes_read)
        data = buf.readline()
        # 處理包結(jié)束
        while not data.endswith(SYM_CRLF):
            self._read_from_socket()
            buf.seek(self.bytes_read)
            data = buf.readline()

        self.bytes_read += len(data)
        if self.bytes_read == self.bytes_written:
            self.purge()

        return data[:-2]

readline方法的主要功能就是從socket中讀取一行數(shù)據(jù)。首先將bytes的指針seek到起始的位置绘沉。然后判斷是否以CRLF結(jié)尾煎楣,即表示是否讀取了redis的一個編碼單位。如果尚未讀取车伞,就會調(diào)用_read_from_socket方法從socket緩沖區(qū)讀取數(shù)據(jù)到內(nèi)存緩沖區(qū)中择懂。最后再從內(nèi)存中讀取一行數(shù)據(jù)到data變量中。

例如我們的例子中帖世,redis返回的數(shù)據(jù)是b'+OK\r\n'休蟹,此時會將所有數(shù)據(jù)都讀取到BytesIO中沸枯,然后從BytesIO讀取到data日矫,最后返回+OK

下面再看read_response方法:

    def read_response(self):
        response = self._buffer.readline()

        byte, response = byte_to_chr(response[0]), response[1:]

        if byte not in ('-', '+', ':', '$', '*'):
            raise RedisError

        # server returned an error
        if byte == '-':
            response = nativestr(response)
            # 處理錯誤
            return response
        # single value
        elif byte == '+':
            pass
        # int value
        elif byte == ':':
            response = int(response)
        # bulk response
        elif byte == '$':
            length = int(response)
            if length == -1:
                return None
            response = self._buffer.read(length)
        # multi-bulk response
        elif byte == '*':
            length = int(response)
            if length == -1:
                return None
            response = [self.read_response() for i in range(length)]
        if isinstance(response, bytes) and self.encoding:
            response = response.decode(self.encoding)
        return response

該方法會讀取stocketbuffer對象的返回绑榴,即上面的+OK哪轿。通過判斷第一個字節(jié)的類型來判斷回復(fù)的類型。此時比較簡單翔怎,直接返回OK窃诉。錯誤回復(fù)也類似,直接把錯誤類型和錯誤信息返回即可赤套。

分段讀取

上面的例子中飘痛,socket的recv一次調(diào)用的字節(jié)是65536∪菸眨可以把socket緩沖區(qū)的數(shù)據(jù)全部讀取宣脉。如果設(shè)定的大小是每次只讀取一個字節(jié)呢?

修改測試代碼再運行剔氏,我們看見輸入依然正常塑猖。因為在readline代碼中,while not data.endswith(SYM_CRLF)的判斷可以幫我們斷定什么時候讀取完谈跛。無論一次讀多少個字節(jié)羊苟,data的數(shù)據(jù)從BytesIO讀取都是一行,因此最后總會讀到CRLF中的\n感憾。此時data的數(shù)據(jù)就是以\r\n結(jié)尾蜡励,結(jié)束從socket中讀數(shù)據(jù)。由此可以,tcp的讀取數(shù)據(jù)是沒有界限的凉倚,就像流水一樣彭则,除非我們在協(xié)議中規(guī)定以什么字符標記作為分界。上面描述的過程大致錄制了一個小視頻占遥,點擊下載俯抖。

數(shù)字回復(fù)

數(shù)字回復(fù)和狀態(tài)回復(fù)類似,只不過回復(fù)的token類型以:開頭瓦胎,其他過程和狀態(tài)回復(fù)類似芬萍。不同在于客戶端的解析要轉(zhuǎn)換成數(shù)字類型。

批量回復(fù)

狀態(tài)回復(fù)很簡單搔啊,redis操作中柬祠,批量回復(fù)也很常見。并且會比較復(fù)雜负芋÷祝基于上面的代碼運行原理。我們首先也是讀取一行旧蛾,然后接觸回復(fù)類型莽龟。因為批量回復(fù)的token會告訴我們返回的字串的長度∠翘欤可以根據(jù)該信息確定我們read_byte位置毯盈,然后將剩余的socket全部讀取。

例如返回的數(shù)據(jù)如果是 $6\r\nfoobar\r\n病袄, 經(jīng)過第一次readline的數(shù)據(jù)搂赋,我們得到的response為$6\r\n。當確定了返回類型是批量回復(fù)益缠,將會繼續(xù)調(diào)用read方法脑奠,將剩下的數(shù)據(jù)(foobar\r\n)讀取。read的代碼如下:

    def read(self, length):
        length = length + 2
        if length > self.length:
            self._read_from_socket(length - self.length)

        self._buffer.seek(self.bytes_read)
        data = self._buffer.read(length)
        self.bytes_read += len(data)

        if self.bytes_read == self.bytes_written:
            self.purge()

        return data[:-2]

read方法比readline簡單幅慌。它只需要判斷BytesIO中的數(shù)據(jù)是否是所有redis的數(shù)據(jù)宋欺。對于$6\r\nfoobar\r\n而言,如果一次讀5個字節(jié)欠痴,那么readline調(diào)用之后迄靠,BytesIO中還有一個f字符,即長度為1喇辽。因為返回了字符串是6+2個字節(jié)(最后的CRLF)掌挚,因此8>1,說明還要從socket中讀取7個字節(jié)菩咨。即再次調(diào)用_read_from_socket方法吠式,與readline類似陡厘,讀取到CRLF結(jié)束并返回。當讀取完畢之后特占,需要調(diào)用self.purge情況buffer對象糙置。為了更好的展示這個過程,也錄制了一個小視頻是目。

由于每次讀取5個socket字節(jié)谤饭,因此在從socket中讀取了兩次。如果多讀了呢懊纳。多讀了也沒有關(guān)系揉抵,即使BytesIO多讀了socket的數(shù)據(jù)。在buffer對象讀取的時候還有一個length參數(shù)嗤疯,這個參數(shù)會保證以CRLF結(jié)尾冤今。這也是redis設(shè)計協(xié)議的時候,為什么字符串返回要在$后加上字節(jié)的長度茂缚。

多批量回復(fù)

多批量回復(fù)以*開頭戏罢,這個編碼格式和請求的命令一樣。多個字節(jié)串分別編碼脚囊,然后再和*參數(shù)數(shù)結(jié)合龟糕。例如下面一個回復(fù)樣式:

*3\r\n$3\r\n777\r\n$6\r\n\xe4\xbd\xa0\xe5\xa5\xbd\r\n$5\r\nhello\r\n

再看read_response中解析多批量回復(fù)的代碼:

elif byte == '*':
            length = int(response)
            if length == -1:
                return None
            response = [self.read_response() for i in range(length)]

一旦是多批量回復(fù),因為*后跟著返回的參數(shù)個數(shù)凑术,而這些參數(shù)個數(shù)的編碼和批量回復(fù)的一模一樣翩蘸。既然如此,那么遞歸調(diào)用read_response淮逊,再解析出來的批量回復(fù)組合起來即可。

特殊類型回復(fù)

RESP的回復(fù)我們都介紹了扶踊,所謂的特殊泄鹏。是數(shù)據(jù)情況特別的時候,比如返回空字符串的時候秧耗,token會是0备籽,返回Nil值的時候,token可能是-1分井。具體這些情況车猬,可以參考官方文檔的案例。

總結(jié)

經(jīng)過上面的分析尺锚,我們了解了redispy是如何解析redis服務(wù)器返回的RESP編碼的數(shù)據(jù)珠闰。解碼的關(guān)鍵在于對socket數(shù)據(jù)的讀取。盡管我們是模擬了socket對象瘫辩。上面的代碼和實際socket交互是完全一樣的伏嗜。因為真實的socket.recv調(diào)用也只是應(yīng)用層的程序代碼從socket的緩沖區(qū)讀取數(shù)據(jù)坛悉。緩存區(qū)直接的IO則是內(nèi)核在tcp層處理的內(nèi)容。

我們把真實的socket.recv讀取數(shù)據(jù)從內(nèi)核轉(zhuǎn)移到一個Socket類承绸,這樣的模擬也是合理的裸影,并且易于調(diào)試。不然還得先模擬發(fā)送命令給redis军熏,然后打斷點等待回復(fù)轩猩。

盡管我們的模擬抽象很好,可是真實的編碼還是需要處理socket的數(shù)據(jù)流荡澎,尤其是對于通信錯誤的處理界轩。完整的代碼可以閱讀redis.py項目。

簽名我們介紹了編碼衔瓮,創(chuàng)建連接和現(xiàn)在接受數(shù)據(jù)并解碼浊猾。接下來將會實現(xiàn)redis.py中的另外一個特性,連接池的實現(xiàn)热鞍。

文中相關(guān)代碼

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末葫慎,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子薇宠,更是在濱河造成了極大的恐慌偷办,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件澄港,死亡現(xiàn)場離奇詭異椒涯,居然都是意外死亡,警方通過查閱死者的電腦和手機回梧,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進店門废岂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人狱意,你說我怎么就攤上這事湖苞。” “怎么了详囤?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵财骨,是天一觀的道長。 經(jīng)常有香客問我藏姐,道長隆箩,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任羔杨,我火速辦了婚禮捌臊,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘问畅。我一直安慰自己娃属,他們只是感情好六荒,可當我...
    茶點故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著矾端,像睡著了一般掏击。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上秩铆,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天砚亭,我揣著相機與錄音,去河邊找鬼殴玛。 笑死捅膘,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的滚粟。 我是一名探鬼主播寻仗,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼凡壤!你這毒婦竟也來了署尤?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤亚侠,失蹤者是張志新(化名)和其女友劉穎曹体,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體硝烂,經(jīng)...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡箕别,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了滞谢。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片串稀。...
    茶點故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖爹凹,靈堂內(nèi)的尸體忽然破棺而出厨诸,到底是詐尸還是另有隱情,我是刑警寧澤禾酱,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站绘趋,受9級特大地震影響颤陶,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜陷遮,卻給世界環(huán)境...
    茶點故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一滓走、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧帽馋,春花似錦搅方、人聲如沸比吭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽衩藤。三九已至,卻和暖如春涛漂,著一層夾襖步出監(jiān)牢的瞬間赏表,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工匈仗, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留瓢剿,地道東北人。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓悠轩,卻偏偏與公主長得像间狂,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子火架,可洞房花燭夜當晚...
    茶點故事閱讀 45,077評論 2 355

推薦閱讀更多精彩內(nèi)容