前言
之前聊過幾種常見的 I/O 模型,不過要說起當紅炸子雞還得是 I/O 多路復用悯衬,Java 1.4 引入的 nio package 提供了對這一模式的支持弹沽,著名開源框架 Netty 更是這一領域的扛鼎之作。
Netty is an asynchronous event-driven network application framework for rapid
development of maintainable high performance protocol servers & clients.
Netty 官網(wǎng)介紹其是一個異步事件驅(qū)動的網(wǎng)絡應用框架筋粗,用于快速開發(fā)可維護的高性能協(xié)議服務器和客戶端策橘。這里異步
暫且不說,事件驅(qū)動
可是大有文章娜亿,今天就聊聊這個吧丽已。
Event Driven
一個典型的網(wǎng)絡服務通常都有著如下基礎結(jié)構(gòu):
read request
decode request
process service
encode reply
send reply
細究起來,每一步的性質(zhì)和成本各不相同买决,這種不均衡會極大的影響吞吐量沛婴,分治策略通常是解決這一類型問題的最佳方案。我們可以把這其中的每一步都拆分成一個任務督赤,每個任務都以非阻塞的方式運行嘁灯。這里 I/O 事件充當了觸發(fā)器的角色,想想前端火的一塌糊涂的響應式編程躲舌,不正是這樣嗎丑婿?
Reactor Pattern
基于這個設想,就誕生了 Reactor Pattern没卸。Reactor Pattern 首先是事件驅(qū)動的羹奉,它有兩個主要角色:
Reactor,響應 I/O 事件并分發(fā)給對應的 Handler
Handlers约计,非阻塞地執(zhí)行相關操作
從簡單到復雜诀拭,Reactor Pattern 也有很多版本:單線程、多線程以及主從結(jié)構(gòu)病蛉。純理論總是看得人頭疼炫加,我們就以一個 echo server 為例瑰煎,一點點升級它來說明 Reactor Pattern 的各種版本好了。所謂 echo server俗孝,就是客戶端發(fā)什么服務端就回什么酒甸,這里我們約定以#
字符標識一條完整的消息。
單線程
如上圖所示赋铝,在單線程版本中插勤, Reactor 和 Handlers 共享一個線程,因此連接的建立革骨、數(shù)據(jù)的讀取和處理以及發(fā)送響應均在同一個線程中完成农尖。這個版本的好處是編碼簡單,弊端也是顯而易見的良哲,一旦某個環(huán)節(jié)是一個長時間執(zhí)行的任務盛卡,所有的操作都會被阻塞;并且筑凫,單線程也無法發(fā)揮多核 CPU 的優(yōu)勢滑沧。
@Slf4j
public class SingleThreadedReactor implements Runnable {
private final int port;
public SingleThreadedReactor(int port) {
this.port = port;
}
@Override
public void run() {
Selector sel = null;
ServerSocketChannel ssc = null;
try {
// 開啟Server端通道
ssc = ServerSocketChannel.open();
// 配置非阻塞
ssc.configureBlocking(false);
// 注冊一個Acceptor來處理連接
sel = Selector.open();
// Acceptor作為attachment進行掛載
ssc.register(sel, SelectionKey.OP_ACCEPT, new Acceptor(sel, ssc));
// 綁定端口
ssc.bind(new InetSocketAddress(port));
log.info("Server啟動成功,正在監(jiān)聽[{}]端口", port);
// 輪詢
iAmListening(sel);
} catch (IOException ex) {
log.error("Server啟動失敗", ex);
} finally {
// close ssc/sel omitted...
}
}
private void iAmListening(Selector sel) {
// 輪詢
while (!Thread.interrupted()) {
try {
int num = sel.select();
// 有事件則處理
if (num > 0) {
Iterator<SelectionKey> iter = sel.selectedKeys().iterator();
// 逐個處理
while (iter.hasNext()) {
SelectionKey sk = iter.next();
// 派發(fā)事件
dispatch(sk);
// 處理過即移除巍实,避免重復處理
iter.remove();
}
}
} catch (IOException ex) {
log.error("監(jiān)聽失敗", ex);
}
}
}
private void dispatch(SelectionKey sk) {
// 事件處理程序都作為attachment掛載
// 在SelectionKey上滓技,并且包裝成Runnable
Object attachment = sk.attachment();
if (attachment instanceof Runnable) {
((Runnable) attachment).run();
}
}
}
SingleThreadedReactor#run()
由兩大塊構(gòu)成,一是配置服務端的ServerSocketChannel
棚潦,這一步的核心是掛載Acceptor
(一個專門用來處理客戶端接入的 Handler )令漂;二是輪詢 I/O 事件,一旦有事件需要處理就進行派發(fā)丸边。
@Slf4j
public class Acceptor implements Runnable {
private final Selector sel;
private final ServerSocketChannel ssc;
public Acceptor(Selector sel, ServerSocketChannel ssc) {
this.sel = sel;
this.ssc = ssc;
}
@Override
public void run() {
try {
// 獲取與客戶端通信的SocketChannel
SocketChannel sc = ssc.accept();
log.info("檢測到新的客戶端連接: {}", sc.getRemoteAddress());
// Handler同樣是一個Runnable叠必,用來
// 處理與客戶端之間的 I/O 操作
new Handler(sc, sel);
} catch (IOException ex) {
log.error("處理連接失敗", ex);
}
}
}
Acceptor
的職責是處理客戶端的接入,一旦有新的客戶端加入就建立起通道并將其注冊到 Selector 上(通過 Handler 的構(gòu)造函數(shù))妹窖。
@Slf4j
public class Handler implements Runnable {
enum State {
READING,
SENDING
}
private final SocketChannel sc;
private final SelectionKey sk;
// 初始狀態(tài)為可讀
private State state = State.READING;
private final ByteBuffer input = ByteBuffer.allocate(256);
private final ByteBuffer output = ByteBuffer.allocate(256);
public Handler(SocketChannel sc, Selector sel) {
this.sc = sc;
try {
// 配置成非阻塞模式
sc.configureBlocking(false);
// 將自己也注冊到Selector上挠唆,如此即可供Reactor派發(fā)
this.sk = sc.register(sel, SelectionKey.OP_READ, this);
// 注冊完畢,讓select()調(diào)用即刻返回嘱吗,看看此時有沒有事件要處理
sel.wakeup();
} catch (IOException ex) {
throw new RuntimeException("Unable to handler event", ex);
}
}
@Override
public void run() {
try {
// 根據(jù)狀態(tài)做處理
if (state == State.READING) {
read();
} else {
write();
}
} catch (IOException ex) {
throw new RuntimeException("Unable to handler event", ex);
}
}
private void read() throws IOException {
log.info("正在讀取客戶端上送的信息");
// 讀取數(shù)據(jù)
int oldPos = input.position();
sc.read(input);
int newPos = input.position();
// 以 # 作為結(jié)束符玄组,表示一條完整的消息
for (int i = oldPos; i < newPos; i++) {
byte b = input.get(i);
// 已讀到一條消息,進行處理
if (b == '#') {
process(i);
log.info("處理完畢谒麦,準備發(fā)送響應信息");
state = State.SENDING;
sk.interestOps(SelectionKey.OP_WRITE);
}
}
}
private void write() throws IOException {
output.flip();
sc.write(output);
output.clear();
state = State.READING;
sk.interestOps(SelectionKey.OP_READ);
}
private void process(int index) throws IOException {
byte[] bytes = new byte[index + 1];
input.flip();
input.get(bytes);
input.compact();
// # 刪掉
String resp = new String(bytes, StandardCharsets.UTF_8)
.replace("#", "");
log.info("接受到客戶端[{}]的信息: {}", sc.getRemoteAddress(), resp);
// 附加一點內(nèi)容
output.put("Server says: ".getBytes(StandardCharsets.UTF_8));
// 其它信息原樣返回
output.put(resp.getBytes(StandardCharsets.UTF_8));
}
}
Handler
負責處理 read -> decode -> process -> encode -> send 這一系列流程俄讹。在我們的例子里,只要沒有讀到#
字符绕德,就會一直讀下去患膛,直到客戶端發(fā)送#
字符告訴服務端一條完整的消息已經(jīng)發(fā)送完了,服務端就著手進行處理并發(fā)送響應消息耻蛇。
多線程
??多線程版本對 Handler 進行了優(yōu)化踪蹬,通過增加 worker threads 來處理非 I/O 操作胞此,如此一來即使有耗時操作 Handler 也不會阻塞住 Reactor 了;同時多個 worker threads 也能更好地發(fā)揮多核 CPU 的優(yōu)勢跃捣。
@Slf4j
public class Handler implements Runnable {
enum State {
READING,
SENDING
}
private final SocketChannel sc;
private final SelectionKey sk;
// 初始狀態(tài)為可讀
private State state = State.READING;
private final ByteBuffer input = ByteBuffer.allocate(256);
private final ByteBuffer output = ByteBuffer.allocate(256);
// 線程池漱牵,用來處理所有入站事件
private final ExecutorService executor = new ThreadPoolExecutor(2,
8,
5, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy());
public Handler(SocketChannel sc, Selector sel) {
this.sc = sc;
try {
// 配置成非阻塞模式
sc.configureBlocking(false);
// 將自己也注冊到Selector上,如此即可供Reactor派發(fā)
this.sk = sc.register(sel, SelectionKey.OP_READ, this);
// 注冊完畢疚漆,讓select()調(diào)用即刻返回酣胀,看看此時有沒有事件要處理
sel.wakeup();
} catch (IOException ex) {
throw new RuntimeException("Unable to handler event", ex);
}
}
@Override
public void run() {
try {
// 根據(jù)狀態(tài)做處理
if (state == State.READING) {
// 讀取數(shù)據(jù)
int oldPos = read();
// 提交到線程池,異步處理
executor.execute(() -> {
log.info("當前處理的線程: {}", Thread.currentThread().getName());
try {
process(oldPos);
} catch (IOException ex) {
throw new RuntimeException("Unable to read data", ex);
}
});
} else {
// decode -> process -> encode
// 以上流程已經(jīng)在 state == State.READING 時處理好了娶聘,此時直接發(fā)送就可以了闻镶,不用異步
write();
}
} catch (IOException ex) {
throw new RuntimeException("Unable to handler event", ex);
}
}
private synchronized int read() throws IOException {
log.info("正在讀取客戶端上送的信息");
// 讀取數(shù)據(jù)
int oldPos = input.position();
sc.read(input);
return oldPos;
}
private synchronized void write() throws IOException {
log.info("正在給客戶端返回響應信息");
output.flip();
sc.write(output);
output.clear();
state = State.READING;
sk.interestOps(SelectionKey.OP_READ);
sk.selector().wakeup();
}
private synchronized void process(int oldPos) throws IOException {
// 計算讀取的長度
int newPos = input.position();
// 以 # 作為結(jié)束符,表示一條完整的消息
for (int i = oldPos; i < newPos; i++) {
byte b = input.get(i);
// 已讀到一條消息丸升,進行處理
if (b == '#') {
byte[] bytes = new byte[i + 1];
input.flip();
input.get(bytes);
input.compact();
// # 刪掉
String resp = new String(bytes, StandardCharsets.UTF_8)
.replace("#", "");
log.info("接受到客戶端[{}]的信息: {}", sc.getRemoteAddress(), resp);
// 附加一點內(nèi)容
output.put("Server says: ".getBytes(StandardCharsets.UTF_8));
// 其它信息原樣返回
output.put(resp.getBytes(StandardCharsets.UTF_8));
log.info("處理完畢铆农,準備發(fā)送響應信息");
state = State.SENDING;
sk.interestOps(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
}
}
多線程版本的Reactor
和Acceptor
和單線程版本是一樣的,區(qū)別只在 Handler 中引入了線程池狡耻,這些 worker threads 包辦了流程中的 decode -> process -> encode顿涣。
主從結(jié)構(gòu)
??主從結(jié)構(gòu)引入了多個 Reactor,將客戶端接入和 socket 讀酝豪、寫進行了解耦:MainReactor 負責處理客戶端的接入,而 SubReactor(s) 則負責在發(fā)生讀精堕、寫事件時進行數(shù)據(jù)處理孵淘。
@Slf4j
public class MainReactor implements Runnable {
private final int port;
// 子Reactor數(shù)量
private final int numberOfSubReactors;
public MainReactor(int port, int numberOfSubReactors) {
this.port = port;
this.numberOfSubReactors = numberOfSubReactors;
}
@Override
public void run() {
Selector sel = null;
ServerSocketChannel ssc = null;
try {
// 開啟Server端通道
ssc = ServerSocketChannel.open();
// 配置非阻塞
ssc.configureBlocking(false);
// 注冊一個Acceptor來處理連接
sel = Selector.open();
// Acceptor作為attachment進行掛載
ssc.register(sel, SelectionKey.OP_ACCEPT, new Acceptor(ssc, numberOfSubReactors));
// 綁定端口
ssc.bind(new InetSocketAddress(port));
log.info("Server啟動成功,正在監(jiān)聽[{}]端口", port);
// 輪詢
iAmListening(sel);
} catch (IOException ex) {
log.error("Server啟動失敗", ex);
} finally {
// close ssc/sel omitted...
}
}
private void iAmListening(Selector sel) {
// 輪詢
while (!Thread.interrupted()) {
try {
int num = sel.select();
// 有事件則處理
if (num > 0) {
Iterator<SelectionKey> iter = sel.selectedKeys().iterator();
// 逐個處理
while (iter.hasNext()) {
SelectionKey sk = iter.next();
// 派發(fā)事件
dispatch(sk);
// 處理過即移除歹篓,避免重復處理
iter.remove();
}
}
} catch (IOException ex) {
log.error("監(jiān)聽失敗", ex);
}
}
}
private void dispatch(SelectionKey sk) {
// 事件處理程序都作為attachment掛載
// 在SelectionKey上瘫证,并且包裝成Runnable
Object attachment = sk.attachment();
if (attachment instanceof Runnable) {
((Runnable) attachment).run();
}
}
}
MainReactor#run()
和多線程版本的大體相同,區(qū)別主要在Acceptor
庄撮。
@Slf4j
public class Acceptor implements Runnable {
private int next = 0;
private final Selector[] selectors;
private final ServerSocketChannel ssc;
public Acceptor(ServerSocketChannel ssc, int numberOfSubReactors) throws IOException {
this.ssc = ssc;
// 每個SubReactor持有一個Selector背捌,互不干擾,避免了潛在的線程同步需求
Selector[] selectors = new Selector[numberOfSubReactors];
SubReactor[] subReactors = new SubReactor[numberOfSubReactors];
for (int i = 0; i < numberOfSubReactors; i++) {
Selector sel = Selector.open();
selectors[i] = sel;
subReactors[i] = new SubReactor(sel);
// 啟動一個獨立的線程處理SubReactor
// SubReactor不處理連接請求洞斯,只處理OP_READ/OP_WRITE
Thread thread = new Thread(subReactors[i]);
thread.setName("SubReactor-thread-" + i);
thread.start();
}
this.selectors = selectors;
}
@Override
public void run() {
try {
// 獲取與客戶端通信的SocketChannel
SocketChannel sc = ssc.accept();
log.info("檢測到新的客戶端連接: {}毡庆,當前線程: {}", sc.getRemoteAddress(), Thread.currentThread().getName());
// 通過簡單的輪詢算法,給每一個SocketChannel分配一個Selector
// Selector關聯(lián)了SubReactor烙如,而SubReactor又關聯(lián)了一個獨立的
// 線程么抗,這樣每個SocketChannel上的發(fā)生的事件就有對應的線程來處理了
new Handler(sc, selectors[next]);
if (++next == selectors.length) next = 0;
} catch (IOException ex) {
log.error("處理連接失敗", ex);
}
}
}
主從結(jié)構(gòu)的Acceptor
就略微有點復雜了,它的主要工作有二:其一是初始化多個 SubReactor亚铁,每個 SubReactor 都綁定一個 Selector蝇刀,并且運行在獨立的線程中;其二在建立新的通道時徘溢,將其分配給某一個 SubReactor吞琐。這樣做的好處在于分攤了負載捆探,更好地均衡了 CPU 和 I/O 速率。
@Slf4j
public class SubReactor implements Runnable {
private final Selector sel;
public SubReactor(Selector sel) {
this.sel = sel;
}
@Override
public void run() {
// 輪詢
while (!Thread.interrupted()) {
try {
int num = sel.select(1000);
// 有事件則處理
if (num > 0) {
Iterator<SelectionKey> iter = sel.selectedKeys().iterator();
// 逐個處理
while (iter.hasNext()) {
SelectionKey sk = iter.next();
// 派發(fā)事件
dispatch(sk);
// 處理過即移除站粟,避免重復處理
iter.remove();
}
}
} catch (IOException ex) {
log.error("監(jiān)聽失敗", ex);
}
}
}
private void dispatch(SelectionKey sk) {
if (!sk.isValid()) return;
String ops = "";
if (sk.isReadable()) {
ops = "有新數(shù)據(jù)可讀";
}
if (sk.isWritable()) {
if (ops.length() != 0) {
ops = ops + ","+ "有數(shù)據(jù)可寫";
} else {
ops = "有數(shù)據(jù)可寫";
}
}
log.info("檢測到新的事件: {}黍图,當前線程: {}", ops, Thread.currentThread().getName());
// 事件處理程序都作為attachment掛載
// 在SelectionKey上,并且包裝成Runnable
Object attachment = sk.attachment();
if (attachment instanceof Runnable) {
((Runnable) attachment).run();
}
}
}
SubReactor 和 MainReactor 類似卒蘸,它也維護了一個事件循環(huán)雌隅,用來派發(fā) socket 讀、寫事件缸沃。Handler 和多線程版本保持一致恰起,同樣使用了線程池來處理非 I/O 任務,完整的示例看這里趾牧。
Netty 對 Reactor Pattern 的支持
Netty 對 Reactor Pattern 的幾個變種都有支持检盼,簡單配置一下 ServerBootstrap 即可實現(xiàn)。
Version | Implementation |
---|---|
單線程 | NioEventLoopGroup group = new NioEventLoopGroup(1); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group); |
多線程 | NioEventLoopGroup group = new NioEventLoopGroup(3); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group); |
主從結(jié)構(gòu) | NioEventLoopGroup bossGroup = new NioEventLoopGroup(2); NioEventLoopGroup workerGroup = new NioEventLoopGroup(4); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); |