一. 為什么需要
解決多請(qǐng)求問題,但是這些請(qǐng)求不需要一直占有整個(gè)線程資源(比如IO操作時(shí)不必一直等待),所以不適合使用一個(gè)請(qǐng)求分配一個(gè)線程的多線程方案;類似于消息隊(duì)列模型,但是是事件驅(qū)動(dòng)痴腌,沒有Queue來做緩沖;優(yōu)點(diǎn):解耦燃领、高效士聪、提高復(fù)用,缺點(diǎn):需要操作系統(tǒng)底層支持猛蔽、內(nèi)部回調(diào)復(fù)雜剥悟。
二. 預(yù)備知識(shí)
IO操作主要分成兩部分:
- 數(shù)據(jù)準(zhǔn)備,將數(shù)據(jù)從磁盤加載到內(nèi)核緩存
- 將數(shù)據(jù)從內(nèi)核緩存加載到用戶緩存
2.1 IO的4種模型
- 阻塞曼库、非阻塞(等待數(shù)據(jù)全部讀取成功再返回区岗,還是讀取為空馬上返回然后下次再讀)
- 同步、異步(用戶緩存主動(dòng)去讀取內(nèi)核緩存毁枯,還是內(nèi)核緩存讀取磁盤成功后通知用戶緩存)
- NIO是同步非阻塞模型慈缔,也是IO多路復(fù)用基礎(chǔ)
- Reactor模式基于同步I/O,Proactor模式基于異步I/O
2.2 IO多路復(fù)用
區(qū)別于傳統(tǒng)的多進(jìn)程并發(fā)模型 (每有新的IO流就分配一個(gè)新的進(jìn)程管理)种玛,IO多路復(fù)用僅使用單個(gè)線程藐鹤,通過記錄跟蹤每個(gè)I/O流的狀態(tài)來同時(shí)管理多個(gè)I/O流(哪個(gè)IO流ready線程就處理哪個(gè))
select, poll, epoll 都是I/O多路復(fù)用的具體的實(shí)現(xiàn):
select:僅返回有無事件不返回具體事件Id,只能監(jiān)控1024個(gè)連接赂韵,線程不安全
poll:連接數(shù)無限制
epoll:返回具體事件Id娱节,線程安全
三. 反應(yīng)器模式
處理一個(gè)或多個(gè)客戶端并發(fā)請(qǐng)求服務(wù)的事件設(shè)計(jì)模式。當(dāng)請(qǐng)求抵達(dá)后右锨,服務(wù)處理程序使用I/O多路復(fù)用策略,然后同步地派發(fā)這些請(qǐng)求至相關(guān)的請(qǐng)求處理程序碌秸。
3.1 模塊組成
包括5個(gè)模塊:
- Handle:事件(網(wǎng)絡(luò)編程中就是一個(gè)Socket绍移,數(shù)據(jù)庫(kù)操作中就是一個(gè)DBConnection,Java NIO中的Channel)
- EventHandler:事件處理器讥电,用于處理不同狀態(tài)的事件
- Concrete Event Handler:事件處理器的具體實(shí)現(xiàn)蹂窖,實(shí)現(xiàn)了事件處理器所提供的各種回調(diào)方法,從而實(shí)現(xiàn)特定于業(yè)務(wù)的邏輯
- Synchronous Event Demultiplexer:用于等待事件的發(fā)生恩敌,調(diào)用方在調(diào)用它的時(shí)候會(huì)被阻塞瞬测,一直阻塞到同步事件分離器上有事件產(chǎn)生為止(NIO中對(duì)應(yīng)Selector,當(dāng)Selector.select()返回時(shí)說明有事件發(fā)生,然后調(diào)用Selector的selectedKeys()方法獲取Set<SelectionKey>月趟,一個(gè)SelectionKey表示一個(gè)有事件發(fā)生的Channel以及該Channel上的事件類型)
- Initiation Dispatcher:用于管理EventHandler灯蝴、分發(fā)event。通過Synchronous Event Demultiplexer來等待事件的發(fā)生孝宗,一旦事件發(fā)生穷躁,Initiation Dispatcher首先會(huì)分離出每一個(gè)事件,然后調(diào)用事件處理器因妇,最后調(diào)用相關(guān)的回調(diào)方法來處理這些事件
3.2 運(yùn)行流程
- 初始化dispatcher问潭,注冊(cè)具體事件處理器到分發(fā)器(即指定什么事件觸發(fā)什么事件處理器)
- 注冊(cè)完畢后,分發(fā)器調(diào)用handle_events方法啟動(dòng)事件循環(huán)婚被,并啟動(dòng)Synchronous Event Demultiplexer等待事件發(fā)生(阻塞等待)
- 當(dāng)有事件發(fā)生狡忙,即某個(gè)Handle變?yōu)閞eady狀態(tài)(如TCP socket變?yōu)榈却x狀態(tài)),Synchronous Event Demultiplexer就會(huì)通知Initiation Dispatcher
- Initiation Dispatcher根據(jù)發(fā)生的事件址芯,將被事件源激活的Handle作為『key』來尋找并分發(fā)恰當(dāng)?shù)氖录幚砥骰卣{(diào)方法
3.3 具體模型分類
- 單線程模型(I/O灾茁、非I/O業(yè)務(wù)操作都在一個(gè)線程上處理,可能會(huì)大大延遲I/O請(qǐng)求的響應(yīng))
- 工作站線程池模型(非I/O操作從Reactor線程中移出轉(zhuǎn)交給工作者線程池執(zhí)行)
- 多線程模型(mainReactor線程主要負(fù)責(zé)接收客戶端的連接請(qǐng)求是复,然后將接收到的SocketChannel傳遞給subReactor删顶,由subReactor來完成和客戶端的通信),但是注意subReactor線程只負(fù)責(zé)完成I/O的read()或者write()操作淑廊,在讀取到數(shù)據(jù)后業(yè)務(wù)邏輯的處理仍然放入到工作者線程池中完成逗余,可避免因?yàn)閞ead()數(shù)據(jù)量太大而導(dǎo)致后面的客戶端連接請(qǐng)求得不到即時(shí)處理的情況
四. 源碼分析
高性能NIO框架netty、騰訊開源RPC框架Tars的NIO模型都是很典型的Reactor設(shè)計(jì)模式季惩,下面以Tars源碼來分析Reactor模式的java NIO實(shí)現(xiàn)(僅展示關(guān)鍵實(shí)現(xiàn))录粱。
package com.qq.tars.net.core.nio;
4.1 Reactor
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SelectableChannel;
public final class Reactor extends Thread {
protected volatile Selector selector = null;
private Acceptor acceptor = null;
//啟動(dòng)
public Reactor(SelectorManager selectorManager) throws IOException {
this.acceptor = new TCPAcceptor(selectorManager);
this.selector = Selector.open();
}
//注冊(cè)
public void registerChannel(SelectableChannel channel, int ops, Object attachment) throws IOException {
SelectionKey key = channel.register(this.selector, ops, attachment);
}
//循環(huán)事件
public void run() {
for (;;) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
dispatchEvent(key);
}
}
}
//處理事件
private void dispatchEvent(final SelectionKey key) throws IOException {
if (key.isConnectable()) {
acceptor.handleConnectEvent(key);
} else if (key.isAcceptable()) {
acceptor.handleAcceptEvent(key);
} else if (key.isReadable()) {
acceptor.handleReadEvent(key);
} else if (key.isValid() && key.isWritable()) {
acceptor.handleWriteEvent(key);
}
}
}
4.2 TCPAcceptor
處理不同事件,以處理connect画拾、read事件為例:
public void handleConnectEvent(SelectionKey key) throws IOException {
//1. Get the client channel
SocketChannel client = (SocketChannel) key.channel();
//2. Set the session status
TCPSession session = (TCPSession) key.attachment();
if (session == null) throw new RuntimeException("The session is null when connecting to ...");
//3. Connect to server
try {
client.finishConnect();
key.interestOps(SelectionKey.OP_READ);
session.setStatus(SessionStatus.CLIENT_CONNECTED);
} finally {
session.finishConnect();
}
}
public void handleReadEvent(SelectionKey key) throws IOException {
TCPSession session = (TCPSession) key.attachment();
if (session == null) throw new RuntimeException("The session is null when reading data...");
session.read();
}
4.3 TCPSession
以read事件的readResponse方法為例:
//放入工作線程池
response = selectorManager.getProtocolFactory().getDecoder().decodeResponse(tempBuffer, this);
selectorManager.getThreadPool().execute(new WorkThread(response, selectorManager));
4.4 工作線程池
SelectorManager提供線程池啥繁,WorkThread具體進(jìn)行業(yè)務(wù)處理