使用 Java NIO 實現(xiàn) Simple Redis 服務(wù)端 客戶端

最近在學(xué)習(xí)NIO,想找個demo來練練手是鬼,然后發(fā)現(xiàn)用java nio來簡單實現(xiàn)redis應(yīng)該挺有趣的紫新。

Java NIO 概覽

首先芒率,java nio有3個重要的類:

  • ByteBuffer: 用于讀寫數(shù)據(jù),實際上是byte數(shù)組的一個封裝充择。它的使用方式還是比較有趣的聪铺,需要注意的一點是在寫模式轉(zhuǎn)換為讀模式時需要flip()铃剔。
  • Channel: 用于維護服務(wù)端與客戶端的通信通道。
  • Selector: 多路復(fù)用器凤类,管理被注冊的通道集合信息和就緒狀態(tài)谜疤。還有SelectionKey用于維護Selector與Channel之間的關(guān)系现诀,即Channel對哪些事件感興趣仔沿,Selector會幫忙通知它封锉。

然后是使用方法——java nio網(wǎng)絡(luò)編程的主要使用流程如下:

  1. 創(chuàng)建Selector
  2. 創(chuàng)建SocketChannel
  3. 將SockerChannel注冊到Selector上,設(shè)置SelectionKey
  4. 綁定端口或者開啟連接
  5. 之后就不斷從Selector中抽取準備好了的SelectionKey碾局,然后對對應(yīng)的Channel作處理

還有一點補充净当,SelectionKey可以包含一個任意類型的attachment像啼,可以用來輔助Channel的處理品擎。

Redis序列化協(xié)議

關(guān)于Redis序列化協(xié)議萄传,可以參看這里
我只實現(xiàn)了數(shù)組秀菱、單行字符串、定長字符串赶么。

這里我把單行字符串辫呻、定長字符串的解碼稱為單步解碼放闺,
即整個解碼過程由若干單步解碼組成缕坎。

拆包粘包

Redis的通信是基于TCP協(xié)議的谜叹,關(guān)于TCP網(wǎng)絡(luò)編程荷腊,比較麻煩的一點就是處理拆包和粘包的問題。

JDK NIO并沒有為我們直接提供處理拆包粘包的一些機制很钓,所以需要我們自己處理码倦。

這里處理粘包的思路很直接锭碳,即利用應(yīng)用層的協(xié)議擒抛,因為Redis序列化協(xié)議已經(jīng)很完善歧沪,遵循它的協(xié)議來實現(xiàn)處理Channel的代碼诊胞,就能分割開不同請求的數(shù)據(jù)段。

而處理拆包會比較麻煩迈着,它首先會分為兩種情況:

  1. 目前讀取的數(shù)據(jù)段不夠裕菠,不足以進行單步解碼操作奴潘。
  2. 目前讀取的數(shù)據(jù)段剛好能進行單步解碼操作,但總的請求并沒有完全讀取画髓,相應(yīng)的解碼并沒有完全完成雀扶。

對于第一種情況愚墓,基于協(xié)議規(guī)定的分割符或者已知的限定長度浪册,不斷等待讀取事件準備好村象,處理讀取事件,直到滿足單步解碼操作的要求躁劣。(這里可以保存之前部分解碼的數(shù)據(jù)和狀態(tài)账忘,也可以不要鳖擒,因為數(shù)據(jù)量較小烫止,性能影響不大)馆蠕。
而對于第二種情況,依然可以使用類似第一種情況的方式,不斷等待數(shù)據(jù)完全準備好行拢,若還沒有準備好诞吱,則放棄之前解碼完成的數(shù)據(jù)房维,等下次數(shù)據(jù)到來時再重新解碼咙俩。不過由于數(shù)據(jù)已經(jīng)解碼了一部分阿趁,重新解碼比較耗時脖阵,所以需要保存下之前解碼的數(shù)據(jù)和狀態(tài)命黔,等下次數(shù)據(jù)到來時再繼續(xù)解碼。

這里我的方案是:在Decoder對象中保存目前的解碼狀態(tài)蘑辑,保存的狀態(tài)較為細節(jié),第一種情況中的部分解碼的數(shù)據(jù)和狀態(tài)也保存了坠宴。然后讓SelectionKey帶上這個Decoder(通過attachment)以躯,當每次SelectionKey準備好時,取出這個Decocer進行繼續(xù)處理啄踊。

代碼部分

首先是核心部分——Redis序列化協(xié)議解碼器:

  1. 狀態(tài)部分:
public class RespDecoder {

    private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    private SocketChannel channel;

    // 是否只拿到了CR忧设,LF還沒拿到
    private boolean isOnlyGetCR;

    // 字符串緩沖區(qū)
    private StringBuilder sb = new StringBuilder();

    // 目前的還沒讀取的定長字符串長度
    private int stringLength;

    // 目前讀取到的字符串數(shù)組
    private List<String> wordList = new ArrayList<String>();

    // 目前讀取到的mark
    private byte mark;

    // 目前還需讀取到數(shù)組中的字符串數(shù)量
    private int size;
  1. 主要解碼邏輯
    解碼主體邏輯decode0()使用遞歸實現(xiàn),比較方便颠通。
    decode()方法處理異常址晕,設(shè)置解碼完成與否的標識顿锰,返回給調(diào)用者谨垃。
    public boolean decode() throws Exception {
        boolean isComplete = true;
        try {
            decode0();
        } catch (ReadEmptyException e) {
            // 捕獲到異常启搂,則返回false,提示調(diào)用者目前還未解碼完成刘陶。
            isComplete = false;
        }
        return isComplete;
    }

    private void decode0() throws Exception {
        // mark默認為0胳赌,若不為0,則為中間狀態(tài)
        mark = (mark != 0 ? mark : readOneByte());
        if (mark == '*') {
            // size默認為0匙隔,若不為0疑苫,則為中間狀態(tài)
            size = (size != 0 ? size : readInteger());

            while ((size--) > 0) {
                decode0();   // 遞歸解析
            }
        } else if (mark == '$') {
            // stringLength默認為0,若不為0纷责,則之前讀取了一部分的字符串
            stringLength = (stringLength != 0 ? stringLength : readInteger() + 2);     // +2 : 加上CRLF
            readFixString();
            // 沒有拋異常就保存字符串
            saveString();
        } else if (mark == '+') {
            readString();
            // 沒有拋異常就保存字符串
            saveString();
        }
    }
  1. 讀取數(shù)據(jù)到buffer
    若目前Channel沒有數(shù)據(jù)可讀捍掺,則拋出ReadEmptyException異常,上層調(diào)用代碼捕獲到異常后可以進行相應(yīng)處理
     /**
     * 若byteBuffer為空再膳,讀取數(shù)據(jù)到byteBuffer
     * @throws Exception
     */
    private void readToBuffer() throws Exception{
        if (!byteBuffer.hasRemaining()) {
            byteBuffer.clear();
            int count = channel.read(byteBuffer);
            if (count < 0) {
                throw new RuntimeException("connection error");
            } else if (count == 0) {
                throw new ReadEmptyException("read empty");
            }
            byteBuffer.flip();
        }
    }
  1. 讀取定長字符串
    讀取字符串等單步解碼結(jié)構(gòu)差不多挺勿,這里以讀取定長字符串為例。這里我使用了遞歸的方法喂柒,來遞歸讀取剩余的數(shù)據(jù)不瓶。若readToBuffer()拋出異常,則不進行處理灾杰,直接拋出該異常給上層湃番。
     /**
     * 讀一個定長字符串
     * @return
     */
    private void readFixString() throws Exception{

        readToBuffer();

        byte[] bytes;
        if (byteBuffer.remaining() < stringLength) {
            int currentSize = byteBuffer.remaining();
            bytes = new byte[currentSize];
            byteBuffer.get(bytes);
            String str = new String(bytes, "UTF-8");
            sb.append(str);
            stringLength -= currentSize;
            readFixString();     // 遞歸讀取剩余的數(shù)據(jù)
        } else {
            bytes = new byte[stringLength];
            byteBuffer.get(bytes);
            String str = new String(bytes, "UTF-8");
            sb.append(str.replaceAll("\r|\n", ""));     // 去除CRLF
            stringLength = 0;
        }

    }

然后是服務(wù)器代碼,常規(guī)的NIO操作吭露。這里用ConcurrentHashMap來實現(xiàn)Redis緩存業(yè)務(wù)吠撮,實際上用HashMap就夠了,因為這個服務(wù)器代碼只有一個線程讲竿。

public class RedisServer {

    private Map<String, String> redisMap = new ConcurrentHashMap<String, String>();

    public void start() throws Exception{

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        ServerSocket serverSocket = serverSocketChannel.socket();
        Selector selector = Selector.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 綁定6379
        serverSocket.bind(new InetSocketAddress(6379));
        System.out.println("Listen to " + 6379);

        while (true) {
            int n = selector.select();
            if (n == 0) {
                continue;
            }
            Iterator it = selector.selectedKeys().iterator();

            while (it.hasNext()) {
                SelectionKey key = (SelectionKey) it.next();
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = server.accept();

                    System.out.println(socketChannel.getLocalAddress() + " accepted");

                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);


                }

                if (key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    if (key.attachment() == null) {
                        key.attach(new RespDecoder(socketChannel));
                    }
                    RespDecoder decoder = (RespDecoder) key.attachment();

                    try {
                        boolean isComplete = decoder.decode();
                        if (isComplete) {
                            List<String> wordList = decoder.getWordList();
                            System.out.println(Arrays.toString(wordList.toArray()));
                            String message = operate(wordList);
                            // 解碼結(jié)束泥兰,清空decoder的狀態(tài)
                            decoder.clear();

                            // 發(fā)送到客戶端
                            send(message, socketChannel);
                        }
                    } catch (Exception e) {
                        key.cancel();
                        socketChannel.socket().close();
                        socketChannel.close();
                    }



                }
                // 清除處理過的鍵
                it.remove();
            }
        }
    }

    private String operate(List<String> wordList) throws Exception{
        String result = null;
        if (wordList.get(0).equals("set")) {
            redisMap.put(wordList.get(1), wordList.get(2));
            result = "OK";
        } else if (wordList.get(0).equals("get")) {
            result = redisMap.get(wordList.get(1));
        }
        return result;
    }

    private void send(String message, SocketChannel channel) throws Exception{
        ByteBuffer writeBuffer = RespEncoder.encode(message);

        while (writeBuffer.hasRemaining()) {
            channel.write(writeBuffer);
        }
    }
}

對于編碼器,直接按Redis序列化協(xié)議去編碼就好了题禀,應(yīng)該沒什么問題鞋诗。
客戶端代碼與服務(wù)器類似,就不列出來了迈嘹。
有興趣可以點擊后文給出的源碼地址查看削彬。

演示部分

1. 用Redis自帶的客戶端去連接我們的Simple Redis服務(wù)器:

開啟我們的Simple Redis服務(wù)器


image.png

使用Redis自帶的命令行客戶端


image.png

2. 用我們的Simple Redis客戶端去連接Redis服務(wù)器:

開啟Redis服務(wù)器


image.png

使用我們的Simple Redis命令行客戶端


image.png

用Redis自帶的命令行客戶端驗證


image.png

3. 用我們的Simple Redis客戶端去連接我們的Simple Redis服務(wù)器:

開啟我們的Simple Redis服務(wù)器


image.png

使用我們的Simple Redis命令行客戶端


image.png

最后

源碼地址

github
源碼中還有一個Netty的版本。

再最后

本人剛接觸NIO不久秀仲,若大家發(fā)現(xiàn)有問題請不吝賜教融痛,謝謝。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末神僵,一起剝皮案震驚了整個濱河市雁刷,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌保礼,老刑警劉巖沛励,帶你破解...
    沈念sama閱讀 210,978評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件责语,死亡現(xiàn)場離奇詭異,居然都是意外死亡目派,警方通過查閱死者的電腦和手機坤候,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,954評論 2 384
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來企蹭,“玉大人白筹,你說我怎么就攤上這事×范裕” “怎么了?”我有些...
    開封第一講書人閱讀 156,623評論 0 345
  • 文/不壞的土叔 我叫張陵吹害,是天一觀的道長螟凭。 經(jīng)常有香客問我,道長它呀,這世上最難降的妖魔是什么螺男? 我笑而不...
    開封第一講書人閱讀 56,324評論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮纵穿,結(jié)果婚禮上下隧,老公的妹妹穿的比我還像新娘。我一直安慰自己谓媒,他們只是感情好淆院,可當我...
    茶點故事閱讀 65,390評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著句惯,像睡著了一般土辩。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上抢野,一...
    開封第一講書人閱讀 49,741評論 1 289
  • 那天拷淘,我揣著相機與錄音,去河邊找鬼指孤。 笑死启涯,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的恃轩。 我是一名探鬼主播结洼,決...
    沈念sama閱讀 38,892評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼叉跛!你這毒婦竟也來了补君?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,655評論 0 266
  • 序言:老撾萬榮一對情侶失蹤昧互,失蹤者是張志新(化名)和其女友劉穎挽铁,沒想到半個月后伟桅,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,104評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡叽掘,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,451評論 2 325
  • 正文 我和宋清朗相戀三年楣铁,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片更扁。...
    茶點故事閱讀 38,569評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡盖腕,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出浓镜,到底是詐尸還是另有隱情溃列,我是刑警寧澤,帶...
    沈念sama閱讀 34,254評論 4 328
  • 正文 年R本政府宣布膛薛,位于F島的核電站听隐,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏哄啄。R本人自食惡果不足惜雅任,卻給世界環(huán)境...
    茶點故事閱讀 39,834評論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望咨跌。 院中可真熱鬧沪么,春花似錦、人聲如沸锌半。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,725評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽刊殉。三九已至哭当,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間冗澈,已是汗流浹背钦勘。 一陣腳步聲響...
    開封第一講書人閱讀 31,950評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留亚亲,地道東北人彻采。 一個月前我還...
    沈念sama閱讀 46,260評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像捌归,于是被迫代替她去往敵國和親肛响。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,446評論 2 348

推薦閱讀更多精彩內(nèi)容

  • Java NIO(New IO)是從Java 1.4版本開始引入的一個新的IO API惜索,可以替代標準的Java I...
    zhisheng_blog閱讀 1,115評論 0 7
  • Java NIO(New IO)是從Java 1.4版本開始引入的一個新的IO API特笋,可以替代標準的Java I...
    JackChen1024閱讀 7,546評論 1 143
  • Java NIO(New IO)是從Java 1.4版本開始引入的一個新的IO API,可以替代標準的Java I...
    編碼前線閱讀 2,263評論 0 5
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理巾兆,服務(wù)發(fā)現(xiàn)猎物,斷路器虎囚,智...
    卡卡羅2017閱讀 134,628評論 18 139
  • 驀然,鏡中人26了蔫磨。左右相反淘讥,熟悉陌生,默然堤如。 好似他人都在多姿多彩蒲列、蒸蒸日上,唯有我不溫不火搀罢、不移不化蝗岖,不...
    f1a83516d7a5閱讀 451評論 0 0