最近在學(xué)習(xí)NIO,想找個demo來練練手是鬼,然后發(fā)現(xiàn)用java nio來簡單實現(xiàn)redis應(yīng)該挺有趣的紫新。
Java NIO 概覽
首先芒率,java nio有3個重要的類:
- ByteBuffer: 用于讀寫數(shù)據(jù),實際上是byte數(shù)組的一個封裝充择。它的使用方式還是比較有趣的聪铺,需要注意的一點是在寫模式轉(zhuǎn)換為讀模式時需要flip()铃剔。
- Channel: 用于維護服務(wù)端與客戶端的通信通道。
- Selector: 多路復(fù)用器凤类,管理被注冊的通道集合信息和就緒狀態(tài)谜疤。還有SelectionKey用于維護Selector與Channel之間的關(guān)系现诀,即Channel對哪些事件感興趣仔沿,Selector會幫忙通知它封锉。
然后是使用方法——java nio網(wǎng)絡(luò)編程的主要使用流程如下:
- 創(chuàng)建Selector
- 創(chuàng)建SocketChannel
- 將SockerChannel注冊到Selector上,設(shè)置SelectionKey
- 綁定端口或者開啟連接
- 之后就不斷從Selector中抽取準備好了的SelectionKey碾局,然后對對應(yīng)的Channel作處理
還有一點補充净当,SelectionKey可以包含一個任意類型的attachment像啼,可以用來輔助Channel的處理品擎。
Redis序列化協(xié)議
關(guān)于Redis序列化協(xié)議萄传,可以參看這里
我只實現(xiàn)了數(shù)組秀菱、單行字符串、定長字符串赶么。
這里我把單行字符串辫呻、定長字符串的解碼稱為單步解碼放闺,
即整個解碼過程由若干單步解碼組成缕坎。
拆包粘包
Redis的通信是基于TCP協(xié)議的谜叹,關(guān)于TCP網(wǎng)絡(luò)編程荷腊,比較麻煩的一點就是處理拆包和粘包的問題。
JDK NIO并沒有為我們直接提供處理拆包粘包的一些機制很钓,所以需要我們自己處理码倦。
這里處理粘包的思路很直接锭碳,即利用應(yīng)用層的協(xié)議擒抛,因為Redis序列化協(xié)議已經(jīng)很完善歧沪,遵循它的協(xié)議來實現(xiàn)處理Channel的代碼诊胞,就能分割開不同請求的數(shù)據(jù)段。
而處理拆包會比較麻煩迈着,它首先會分為兩種情況:
- 目前讀取的數(shù)據(jù)段不夠裕菠,不足以進行單步解碼操作奴潘。
- 目前讀取的數(shù)據(jù)段剛好能進行單步解碼操作,但總的請求并沒有完全讀取画髓,相應(yīng)的解碼并沒有完全完成雀扶。
對于第一種情況愚墓,基于協(xié)議規(guī)定的分割符或者已知的限定長度浪册,不斷等待讀取事件準備好村象,處理讀取事件,直到滿足單步解碼操作的要求躁劣。(這里可以保存之前部分解碼的數(shù)據(jù)和狀態(tài)账忘,也可以不要鳖擒,因為數(shù)據(jù)量較小烫止,性能影響不大)馆蠕。
而對于第二種情況,依然可以使用類似第一種情況的方式,不斷等待數(shù)據(jù)完全準備好行拢,若還沒有準備好诞吱,則放棄之前解碼完成的數(shù)據(jù)房维,等下次數(shù)據(jù)到來時再重新解碼咙俩。不過由于數(shù)據(jù)已經(jīng)解碼了一部分阿趁,重新解碼比較耗時脖阵,所以需要保存下之前解碼的數(shù)據(jù)和狀態(tài)命黔,等下次數(shù)據(jù)到來時再繼續(xù)解碼。
這里我的方案是:在Decoder對象中保存目前的解碼狀態(tài)蘑辑,保存的狀態(tài)較為細節(jié),第一種情況中的部分解碼的數(shù)據(jù)和狀態(tài)也保存了坠宴。然后讓SelectionKey帶上這個Decoder(通過attachment)以躯,當每次SelectionKey準備好時,取出這個Decocer進行繼續(xù)處理啄踊。
代碼部分
首先是核心部分——Redis序列化協(xié)議解碼器:
- 狀態(tài)部分:
public class RespDecoder {
private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
private SocketChannel channel;
// 是否只拿到了CR忧设,LF還沒拿到
private boolean isOnlyGetCR;
// 字符串緩沖區(qū)
private StringBuilder sb = new StringBuilder();
// 目前的還沒讀取的定長字符串長度
private int stringLength;
// 目前讀取到的字符串數(shù)組
private List<String> wordList = new ArrayList<String>();
// 目前讀取到的mark
private byte mark;
// 目前還需讀取到數(shù)組中的字符串數(shù)量
private int size;
- 主要解碼邏輯
解碼主體邏輯decode0()使用遞歸實現(xiàn),比較方便颠通。
decode()方法處理異常址晕,設(shè)置解碼完成與否的標識顿锰,返回給調(diào)用者谨垃。
public boolean decode() throws Exception {
boolean isComplete = true;
try {
decode0();
} catch (ReadEmptyException e) {
// 捕獲到異常启搂,則返回false,提示調(diào)用者目前還未解碼完成刘陶。
isComplete = false;
}
return isComplete;
}
private void decode0() throws Exception {
// mark默認為0胳赌,若不為0,則為中間狀態(tài)
mark = (mark != 0 ? mark : readOneByte());
if (mark == '*') {
// size默認為0匙隔,若不為0疑苫,則為中間狀態(tài)
size = (size != 0 ? size : readInteger());
while ((size--) > 0) {
decode0(); // 遞歸解析
}
} else if (mark == '$') {
// stringLength默認為0,若不為0纷责,則之前讀取了一部分的字符串
stringLength = (stringLength != 0 ? stringLength : readInteger() + 2); // +2 : 加上CRLF
readFixString();
// 沒有拋異常就保存字符串
saveString();
} else if (mark == '+') {
readString();
// 沒有拋異常就保存字符串
saveString();
}
}
- 讀取數(shù)據(jù)到buffer
若目前Channel沒有數(shù)據(jù)可讀捍掺,則拋出ReadEmptyException異常,上層調(diào)用代碼捕獲到異常后可以進行相應(yīng)處理
/**
* 若byteBuffer為空再膳,讀取數(shù)據(jù)到byteBuffer
* @throws Exception
*/
private void readToBuffer() throws Exception{
if (!byteBuffer.hasRemaining()) {
byteBuffer.clear();
int count = channel.read(byteBuffer);
if (count < 0) {
throw new RuntimeException("connection error");
} else if (count == 0) {
throw new ReadEmptyException("read empty");
}
byteBuffer.flip();
}
}
- 讀取定長字符串
讀取字符串等單步解碼結(jié)構(gòu)差不多挺勿,這里以讀取定長字符串為例。這里我使用了遞歸的方法喂柒,來遞歸讀取剩余的數(shù)據(jù)不瓶。若readToBuffer()拋出異常,則不進行處理灾杰,直接拋出該異常給上層湃番。
/**
* 讀一個定長字符串
* @return
*/
private void readFixString() throws Exception{
readToBuffer();
byte[] bytes;
if (byteBuffer.remaining() < stringLength) {
int currentSize = byteBuffer.remaining();
bytes = new byte[currentSize];
byteBuffer.get(bytes);
String str = new String(bytes, "UTF-8");
sb.append(str);
stringLength -= currentSize;
readFixString(); // 遞歸讀取剩余的數(shù)據(jù)
} else {
bytes = new byte[stringLength];
byteBuffer.get(bytes);
String str = new String(bytes, "UTF-8");
sb.append(str.replaceAll("\r|\n", "")); // 去除CRLF
stringLength = 0;
}
}
然后是服務(wù)器代碼,常規(guī)的NIO操作吭露。這里用ConcurrentHashMap來實現(xiàn)Redis緩存業(yè)務(wù)吠撮,實際上用HashMap就夠了,因為這個服務(wù)器代碼只有一個線程讲竿。
public class RedisServer {
private Map<String, String> redisMap = new ConcurrentHashMap<String, String>();
public void start() throws Exception{
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
Selector selector = Selector.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 綁定6379
serverSocket.bind(new InetSocketAddress(6379));
System.out.println("Listen to " + 6379);
while (true) {
int n = selector.select();
if (n == 0) {
continue;
}
Iterator it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
System.out.println(socketChannel.getLocalAddress() + " accepted");
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
if (key.attachment() == null) {
key.attach(new RespDecoder(socketChannel));
}
RespDecoder decoder = (RespDecoder) key.attachment();
try {
boolean isComplete = decoder.decode();
if (isComplete) {
List<String> wordList = decoder.getWordList();
System.out.println(Arrays.toString(wordList.toArray()));
String message = operate(wordList);
// 解碼結(jié)束泥兰,清空decoder的狀態(tài)
decoder.clear();
// 發(fā)送到客戶端
send(message, socketChannel);
}
} catch (Exception e) {
key.cancel();
socketChannel.socket().close();
socketChannel.close();
}
}
// 清除處理過的鍵
it.remove();
}
}
}
private String operate(List<String> wordList) throws Exception{
String result = null;
if (wordList.get(0).equals("set")) {
redisMap.put(wordList.get(1), wordList.get(2));
result = "OK";
} else if (wordList.get(0).equals("get")) {
result = redisMap.get(wordList.get(1));
}
return result;
}
private void send(String message, SocketChannel channel) throws Exception{
ByteBuffer writeBuffer = RespEncoder.encode(message);
while (writeBuffer.hasRemaining()) {
channel.write(writeBuffer);
}
}
}
對于編碼器,直接按Redis序列化協(xié)議去編碼就好了题禀,應(yīng)該沒什么問題鞋诗。
客戶端代碼與服務(wù)器類似,就不列出來了迈嘹。
有興趣可以點擊后文給出的源碼地址查看削彬。
演示部分
1. 用Redis自帶的客戶端去連接我們的Simple Redis服務(wù)器:
開啟我們的Simple Redis服務(wù)器
使用Redis自帶的命令行客戶端
2. 用我們的Simple Redis客戶端去連接Redis服務(wù)器:
開啟Redis服務(wù)器
使用我們的Simple Redis命令行客戶端
用Redis自帶的命令行客戶端驗證
3. 用我們的Simple Redis客戶端去連接我們的Simple Redis服務(wù)器:
開啟我們的Simple Redis服務(wù)器
使用我們的Simple Redis命令行客戶端
最后
源碼地址
github
源碼中還有一個Netty的版本。
再最后
本人剛接觸NIO不久秀仲,若大家發(fā)現(xiàn)有問題請不吝賜教融痛,謝謝。