介紹了jdk實(shí)現(xiàn)nio的關(guān)鍵Selector以及SelectableChannel罐盔,了解了它的原理耗跛,就明白了netty為什么是事件驅(qū)動模型:(netty極簡教程(四):Selector事件驅(qū)動以及SocketChannel
的使用斤儿,接下來將它的使用更深入一步裹虫, nio reactor模型演進(jìn)以及聊天室的實(shí)現(xiàn)飞盆;
示例源碼: https://github.com/jsbintask22/netty-learning
nio server
對于io消耗而言娄琉,我們知道提升效率的關(guān)鍵在于服務(wù)端對于io的使用;而nio壓榨cpu的關(guān)鍵在于使用Selector
實(shí)現(xiàn)的reactor
事件模型以及多線程的加入時(shí)機(jī):
單線程reactor模型
省略Selector以及ServerSocketChannel的獲取注冊吓歇; 將所有的操作至于reactor主線程
while (true) { // 1
if (selector.select(1000) == 0) { // 2
continue;
}
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); // 3
while (selectedKeys.hasNext()) {
SelectionKey selectionKey = selectedKeys.next();
SelectableChannel channel = selectionKey.channel();
if (selectionKey.isAcceptable()) { // 4
ServerSocketChannel server = (ServerSocketChannel) channel;
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(CLIENT_BUFFER_SIZE));
String serverGlobalInfo = "系統(tǒng)消息:用戶[" + client.getRemoteAddress() + "]上線了";
System.err.println(serverGlobalInfo);
forwardClientMsg(serverGlobalInfo, client); // 5
} else if (selectionKey.isReadable()) {
SocketChannel client = (SocketChannel) channel;
SocketAddress remoteAddress = null;
try {
remoteAddress = client.getRemoteAddress();
String clientMsg = retrieveClientMsg(selectionKey);
if (clientMsg.equals("")) {
return;
}
System.err.println("收到用戶[" + remoteAddress + "]消息:" + clientMsg);
forwardClientMsg("[" + remoteAddress + "]:" + clientMsg, client); // 6
} catch (Exception e) {
String msg = "系統(tǒng)消息:" + remoteAddress + "下線了";
forwardClientMsg(msg, client);
System.err.println(msg);
selectionKey.cancel(); // 7
try {
client.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
selectedKeys.remove();
}
}
- 開啟一個(gè)while循環(huán)孽水,讓Selector不斷的詢問操作系統(tǒng)是否有對應(yīng)的事件已經(jīng)準(zhǔn)備好
- Selector檢查事件(等待時(shí)間為1s),如果沒有直接開啟下一次循環(huán)
- 獲取已經(jīng)準(zhǔn)備好的事件(
SelectionKey
)城看,然后依次循環(huán)遍歷處理 - 如果是
Accept
事件女气,說明是ServerSocketChannel注冊的,說明新的連接已經(jīng)建立好了测柠,從中獲取新的連接并將新連接再次注冊到Selector - 注冊后炼鞠,然后生成消息給其它Socket,表示有新用戶上線了
- 如果是
Read
事件轰胁,說明客戶端Socket有新的數(shù)據(jù)可讀取谒主,讀取然后廣播該消息到其它所有客戶端 - 如果發(fā)生異常,表示該客戶端斷開連接了(粗略的處理)赃阀,同樣廣播一條消息霎肯,并且將該Socket從Selector上注銷
讀取以及廣播消息方法如下:
SocketChannel client = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
int len = client.read(buffer);
if (len == 0) {
return "";
}
buffer.flip();
byte[] data = new byte[buffer.remaining()];
int index = 0;
while (len != index) {
data[index++] = buffer.get();
}
buffer.clear();
return new String(data, StandardCharsets.UTF_8);
Set<SelectionKey> allClient = selector.keys();
allClient.forEach(selectionKey -> {
SelectableChannel channel = selectionKey.channel();
if (!(channel instanceof ServerSocketChannel) && channel != client) { // 1
SocketChannel otherClient = (SocketChannel) channel;
try {
otherClient.write(ByteBuffer.wrap(clientMsg.getBytes(StandardCharsets.UTF_8)));
} catch (IOException e) {
e.printStackTrace();
}
}
});
從Selector上獲取所有注冊的Channel然后遍歷,如果不是ServerSocketChannel或者當(dāng)前消息的Channel榛斯,就將消息發(fā)送出去.
以上观游,所有代碼放在同一線程中,對于單核cpu而言肖抱,相比于bio的Socket
編程备典,我們主要有一個(gè)方面的改進(jìn)
- 雖然
accept
方法依然是阻塞的,可是我們已經(jīng)知道了肯定會有新的連接進(jìn)來意述,所以調(diào)用改方法不會再阻塞而是直接獲取一個(gè)新連接 - 對于
read
方法而言同樣如此提佣,雖然該方法依然是一個(gè)阻塞的方法,可是我們已經(jīng)知道了接下來調(diào)用必定會有有效數(shù)據(jù)荤崇,這樣cpu不用再進(jìn)行等待 - 通過Selector在一個(gè)線程中便管理了多個(gè)Channel
而對于多核cpu而言拌屏,Selector雖然能夠有效規(guī)避accept和read的無用等待時(shí)間,可是它依然存在一些問題术荤;
- 上面的操作關(guān)鍵在于Selector的
select
操作倚喂,該方法必須能夠快速循環(huán)調(diào)用,不宜和其它io讀取寫入放在一起 - channel的io(read和write)操作較為耗時(shí),不宜放到同一線程中處理
多線程reactor模型
基于上面的單線程問題考慮端圈,我們可以將io操作放入線程池中處理:
- 將accept事件的廣播放入線程池中處理
- 將read事件的所有io操作放入線程池中處理
if (selectionKey.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) channel;
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(CLIENT_BUFFER_SIZE));
String serverGlobalInfo = "系統(tǒng)消息:用戶[" + client.getRemoteAddress() + "]上線了";
System.err.println(serverGlobalInfo);
executorService.submit(() -> { // 1
forwardClientMsg(serverGlobalInfo, client);
});
} else if (selectionKey.isReadable()) {
executorService.submit(() -> { // 2
SocketChannel client = (SocketChannel) channel;
SocketAddress remoteAddress = null;
try {
remoteAddress = client.getRemoteAddress();
String clientMsg = retrieveClientMsg(selectionKey);
if (clientMsg.equals("")) {
return;
}
System.err.println("收到用戶[" + remoteAddress + "]消息:" + clientMsg);
forwardClientMsg("[" + remoteAddress + "]:" + clientMsg, client);
} catch (Exception e) {
String msg = "系統(tǒng)消息:" + remoteAddress + "下線了";
forwardClientMsg(msg, client);
System.err.println(msg);
selectionKey.cancel();
try {
client.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
});
}
selectedKeys.remove();
}
在 1與2處焦读,我們加入了線程池處理,不再在reactor主線程中做任何io操作舱权。 這便是reactor多線程模型
雖然模型2有效利用了多核cpu優(yōu)勢矗晃,可是依然能夠找到瓶頸
- 雖然廣播消息是在一個(gè)獨(dú)立線程中,可是我們需要將Selector上注冊的所有的channel全部遍歷宴倍,如果Selector注冊了太多的channel张症,依舊會有效率問題
- 因?yàn)镾elector注冊了過多的Channel,所以在進(jìn)行select選取時(shí)對于主線程而言依舊會有很多的循環(huán)操作鸵贬,存在瓶頸
基于以上問題俗他,我們可以考慮引入多個(gè)Selector
,這樣主Selector只負(fù)責(zé)讀取accept操作阔逼,而其他的io操作均有子Selector負(fù)責(zé)兆衅,這便是多Reactor多線程模型
多Reactor多線程模型
基于上面的思考,我們要在單Reactor多線程模型上主要需要以下操作
- 對于accept到的新連接不再放入主Selector颜价,將其加入多個(gè)
子Selector
- 子Selector操作應(yīng)該在異步線程中進(jìn)行.
- 所有子Selector只進(jìn)行read write操作
基于以上涯保,會增加一個(gè)子Selector列表,并且將原來的accept以及讀取廣播分開周伦;
private List<Selector> subSelector = new ArrayList<>(8);
定義一個(gè)包含8個(gè)子selector的列表并進(jìn)行初始化
如圖夕春,分別開啟了一個(gè)reactor主線程,以及8個(gè)子selector子線程专挪,其中及志,主線程現(xiàn)在只進(jìn)行accept然后添加至子selector
while (true) {
if (mainSelector.select(1000) == 0) {
continue;
}
Iterator<SelectionKey> selectedKeys = mainSelector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey selectionKey = selectedKeys.next();
SelectableChannel channel = selectionKey.channel();
if (selectionKey.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) channel;
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(subSelector.get(index++), SelectionKey.OP_READ, // 1
ByteBuffer.allocate(CLIENT_BUFFER_SIZE));
if (index == 8) { // 2
index = 0;
}
String serverGlobalInfo = "系統(tǒng)消息:用戶[" + client.getRemoteAddress() + "]上線了";
System.err.println(serverGlobalInfo);
forwardClientMsg(serverGlobalInfo, client);
}
}
selectedKeys.remove();
}
- 將新連接注冊至從Selector.
- 如果當(dāng)前的selector已經(jīng)全部添加了一遍則重新從第一個(gè)開始
所有的從Selector只進(jìn)行io操作,并且本身已經(jīng)在異步線程中運(yùn)行
while (true) {
if (subSelector.select(1000) == 0) {
continue;
}
Iterator<SelectionKey> selectedKeys = subSelector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey selectionKey = selectedKeys.next();
SelectableChannel channel = selectionKey.channel();
if (selectionKey.isReadable()) {
SocketChannel client = (SocketChannel) channel;
SocketAddress remoteAddress = null;
try {
remoteAddress = client.getRemoteAddress();
String clientMsg = retrieveClientMsg(selectionKey); // 1
if (clientMsg.equals("")) {
return;
}
System.err.println("收到用戶[" + remoteAddress + "]消息:" + clientMsg);
forwardClientMsg("[" + remoteAddress + "]:" + clientMsg, client); // 2
} catch (Exception e) {
String msg = "系統(tǒng)消息:" + remoteAddress + "下線了";
forwardClientMsg(msg, client);
System.err.println(msg);
selectionKey.cancel();
try {
client.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
selectedKeys.remove();
}
- 讀取消息
-
廣播消息
啟動server寨腔,并且打開三個(gè)客戶端:
image
image
image
如圖所示速侈,上線通知,消息轉(zhuǎn)發(fā)迫卢,下線通知成功倚搬, 主Selector與從Selector交互成功
netty線程模型思考
事實(shí)上,在netty的線程模型中乾蛤,與上方的多Reactor多線程模型類似
每界,一個(gè)改進(jìn)版的多路復(fù)用多Reactor模型; Reactor主從線程模型
- 一個(gè)主線程不斷輪詢進(jìn)行accept操作家卖,將channel注冊至子Selector
- 一個(gè)線程持有一個(gè)Selector
- 一個(gè)子Selector又可以管理多個(gè)channel
- 在斷開連接前眨层,一個(gè)channel總是在同一個(gè)線程中進(jìn)行io操作處理
基于以上思考,我們將在后面在netty源碼中進(jìn)行一一驗(yàn)證上荡。