經(jīng)典的網(wǎng)絡(luò)編程
一般網(wǎng)絡(luò)編程都具有以下幾個(gè)步驟:
- 讀取請(qǐng)求 Read request
- 解碼請(qǐng)求 Decode request
- 處理服務(wù) Process services
- 加密回應(yīng) Encode reply
- 發(fā)送回應(yīng) Send reply
但是每一步的處理的內(nèi)容和成本都不一樣酬核。xml捅位、json嗓节、file等等
每種類型的處理程序都需要在各自都線程中來(lái)進(jìn)行练湿,用代碼表示就是如下
class Server implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
} catch (IOException ex) { /* ... */ }
}
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) { socket = s; }
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { /* ... */ }
}
private byte[] process(byte[] cmd) { /* ... */ }
}
}
如果當(dāng)前運(yùn)行線程沒(méi)有被中斷就一直循環(huán)創(chuàng)建一個(gè)線程或者線程池用來(lái)處理ServerSocket里面的Socket請(qǐng)求。
注意:Thread.interrupted()和Thread.isInterrupted()
這樣會(huì)造成我們需要為每一個(gè)socket請(qǐng)求創(chuàng)建一個(gè)線程來(lái)處理對(duì)應(yīng)的數(shù)據(jù)。一旦用戶過(guò)多或者處理程序時(shí)間較長(zhǎng)就會(huì)造成各種各樣的問(wèn)題。無(wú)法并發(fā)狈谊,前面的活沒(méi)干完后面的需要等著,負(fù)載等等
優(yōu)化方向
增加負(fù)載
增加硬件 (CPU, memory, disk, bandwidth)
同時(shí)滿足可用性和性能目標(biāo)
減短延遲
滿足高峰需求
提高服務(wù)質(zhì)量
通常來(lái)說(shuō)Divide-and-conquer(分而治之)是實(shí)現(xiàn)任何可擴(kuò)展性目標(biāo)的最佳方法
Divide-and-conquer(分而治之)
將整體任務(wù)切割成小任務(wù)沟沙。每個(gè)小任務(wù)只執(zhí)行單一任務(wù)河劝,并且不會(huì)阻塞其他小任務(wù)的運(yùn)行
用IO事件來(lái)觸發(fā)每個(gè)小任務(wù)的啟動(dòng)
java.nio 中支持的基本機(jī)制
非阻塞讀取和寫入
用感測(cè)到的IO事件來(lái)調(diào)度相關(guān)的任務(wù)
事件驅(qū)動(dòng)設(shè)計(jì)中可能出現(xiàn)的無(wú)盡變化
Event-driven Designs 事件驅(qū)動(dòng)設(shè)計(jì)
比較有效的方法
占用更少的資源,每個(gè)客戶端不一定需要單獨(dú)創(chuàng)建一個(gè)線程
減少開銷矛紫。減少Context的切換可以相應(yīng)的減少鎖定
調(diào)度可能會(huì)更慢赎瞎,所以必須手動(dòng)將動(dòng)作綁定到事件
更難的編程
將動(dòng)作分解為簡(jiǎn)單非阻塞的
類似于GUI事件驅(qū)動(dòng)的動(dòng)作
無(wú)法消除所有阻塞。比如:GC颊咬,頁(yè)面錯(cuò)誤等
必須跟蹤服務(wù)的邏輯狀態(tài)
AWT事件機(jī)制
IO事件驅(qū)動(dòng)使用相似的想法务甥,但設(shè)計(jì)不同
java.awt是一個(gè)軟件包,包含用于創(chuàng)建用戶界面和繪制圖形圖像的所有分類
Reactor Pattern(反應(yīng)堆模式)
- Reactor通過(guò)調(diào)度來(lái)響應(yīng)IO事件喳篇。如:AWT thread
- Handler執(zhí)行非阻塞動(dòng)作敞临。如:AWT ActionListeners
- Manager將Handler綁定到事件上。如:AWT addActionListener
預(yù)先使用Manager將Handler綁定到指定的事件上麸澜,如onClick
用戶點(diǎn)擊按鈕的時(shí)候挺尿,Reactor獲取到事件,并調(diào)度事先綁定好的處理程序
經(jīng)典的Reactor設(shè)計(jì)
單線程版本
java.nio 支持
Channels
支持非阻塞的讀取文件和socket連接
Buffers
Channels通過(guò)Buffers可以直接讀取或者寫入對(duì)象
Selectors
通知一組Channels觸發(fā)了哪些IO事件
SelectionKeys
維護(hù)IO事件的狀態(tài)和綁定
Reactor 實(shí)現(xiàn)
Setup
class Reactor implements Runnable {
//Selector選擇器
final Selector selector;
//Socket服務(wù)通道
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
//創(chuàng)建一個(gè)Selector
selector = Selector.open();
//創(chuàng)建一個(gè)Socket Channel
serverSocket = ServerSocketChannel.open();
//將Socket Channel綁定到指定端口
serverSocket.socket().bind(
new InetSocketAddress(port));
//設(shè)置Socket Channel為非阻塞
serverSocket.configureBlocking(false);
//將Selector和Socket Channel注冊(cè)到SelectionKey
SelectionKey sk =
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//將SelectionKey附加到接受者
sk.attach(new Acceptor());
}
/*
也可以使用SPI提供接口:
SelectorProvider p = SelectorProvider.provider();
selector = p.openSelector();
serverSocket = p.openServerSocketChannel();
*/
}
Dispatch Loop
// class Reactor continued
public void run() { //通常在新線程中執(zhí)行
try {
//如果當(dāng)前線程沒(méi)有中斷就循環(huán)執(zhí)行
while (!Thread.interrupted()) {
//查詢選擇器中獲取已經(jīng)準(zhǔn)備好的并且注冊(cè)過(guò)的操作
selector.select();
//獲取所有已經(jīng)準(zhǔn)備好的并且注冊(cè)過(guò)的操作
Set selected = selector.selectedKeys();
//循環(huán)遍歷
for (Object o : selected) {
//調(diào)度任務(wù)并處理事件操作
dispatch((SelectionKey) o);
}
//移除選擇器
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
//處理事件操作
void dispatch(SelectionKey k) {
//獲取SelectionKey中綁定的處理程序,如果不為空就執(zhí)行
Runnable r = (Runnable) (k.attachment());
if (r != null)
r.run();
}
Acceptor
// class Reactor continued
// 創(chuàng)建接收器
class Acceptor implements Runnable {
public void run() {
try {
//獲取連接成功到客戶端連接
SocketChannel c = serverSocket.accept();
if (c != null) {
//如果不為空就處理客戶端連接以及selector
new Handler(selector, c);
}
} catch (IOException ex) { /* ... */ }
}
}
Handler setup
//處理程序
final class Handler implements Runnable {
//指定最大輸入bytes
private static final int MAXIN = 1024;
//指定最大輸出bytes
private static final int MAXOUT = 1024;
//客戶端連接
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
//配置非阻塞模式
c.configureBlocking(false);
//將客戶端連接和讀注冊(cè)到SelectionKey
sk = socket.register(sel, SelectionKey.OP_READ);
//將SelectionKey附加到當(dāng)前線程的run
sk.attach(this);
//將SelectionKey的操作設(shè)置為讀取
sk.interestOps(SelectionKey.OP_READ);
//喚醒Selector
sel.wakeup();
}
}
Request handling
// class Handler continued
//輸入處理完成
boolean inputIsComplete() { /* ... */
return true;
}
//輸出處理完成
boolean outputIsComplete() { /* ... */
return true;
}
//處理過(guò)程中
void process() { /* ... */ }
public void run() {
try {
//根據(jù)不同的狀態(tài)進(jìn)行不同的處理程序
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
//讀取數(shù)據(jù)
void read() throws IOException {
//從客戶端獲取數(shù)據(jù)
socket.read(input);
//如果讀取完成
if (inputIsComplete()) {
//處理數(shù)據(jù)
process();
//標(biāo)記為發(fā)送狀態(tài)
state = SENDING;
// 將SelectionKey的操作設(shè)置為寫入
sk.interestOps(SelectionKey.OP_WRITE);
}
}
//發(fā)送數(shù)據(jù)
void send() throws IOException {
//將數(shù)據(jù)寫入客戶端連接
socket.write(output);
//發(fā)送完成后將SelectionKey中的綁定取消
if (outputIsComplete()) sk.cancel();
}
}
Per-State Handlers
GoF State-Object pattern 狀態(tài)模式编矾,針對(duì)狀態(tài)重新綁定對(duì)應(yīng)的處理程序
//處理程序
class Handler {
// 初始化為讀取狀態(tài)
public void run() {
//客戶端讀取數(shù)據(jù)
socket.read(input);
//讀取完成
if (inputIsComplete()) {
//處理數(shù)據(jù)
process();
//附加新的處理程序Sender
sk.attach(new Sender());
//標(biāo)記狀態(tài)為寫入
sk.interest(SelectionKey.OP_WRITE);
//喚醒SelectionKey中綁定的Selector
sk.selector().wakeup();
}
}
//處理程序Sender
class Sender implements Runnable {
public void run(){ // ...
//寫入數(shù)據(jù)
socket.write(output);
//寫入完成之后將SelectionKey中的綁定取消
if (outputIsComplete()) sk.cancel();
}
}
}
Multithreaded Designs 多線程設(shè)計(jì)
戰(zhàn)略性的為擴(kuò)展性增加線程
主要適用于多處理器
工作線程
Reactor可以快速的觸發(fā)處理程序
因?yàn)樘幚沓绦蜻^(guò)多或者處理時(shí)間過(guò)程會(huì)減慢Reactor的速度
將非IO處理放到其他的線程
多個(gè)Reactor處理線程
Reactor線程任務(wù)過(guò)多會(huì)導(dǎo)致IO飽和
分配一些任務(wù)給其他Reactor線程
負(fù)載均衡以匹配CPU和IO速率
Worker Threads 工作線程設(shè)計(jì)
將非IO處理放到其他的線程來(lái)加快Reactor線程
比計(jì)算綁定處理重新處理為事件驅(qū)動(dòng)的形式更簡(jiǎn)單
應(yīng)該仍然是純非阻塞計(jì)算
足夠的處理勝過(guò)開銷
很難與IO重疊處理
最好能先將所有輸入讀入緩沖區(qū)
使用線程池可以進(jìn)行調(diào)優(yōu)和控制
通常需要的線程數(shù)比客戶端少得多
Handler with Thread Pool 多線程處理
class Handler implements Runnable {
// 創(chuàng)建一個(gè)線程池
static PooledExecutor pool = new PooledExecutor(...);
//設(shè)置處理狀態(tài)
static final int PROCESSING = 3;
//讀數(shù)據(jù)操作熟史,設(shè)計(jì)到多線程讀取需要加線程鎖
synchronized void read() {
//讀取數(shù)據(jù)
socket.read(input);
//讀取完成
if (inputIsComplete()) {
//標(biāo)記為處理狀態(tài)
state = PROCESSING;
//將處理過(guò)程放到線程池中執(zhí)行
pool.execute(new Processer());
}
}
//處理數(shù)據(jù)線程
class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
//處理數(shù)據(jù)并關(guān)閉
synchronized void processAndHandOff() {
//處理數(shù)據(jù)
process();
//標(biāo)記處理完成并標(biāo)記發(fā)送狀態(tài)
state = SENDING; // 或者綁定其他操作
//將SelectionKey的操作設(shè)置為寫入
sk.interest(SelectionKey.OP_WRITE);
}
}
協(xié)調(diào)任務(wù)Coordinating Tasks
Handoffs 傳遞
循環(huán)任務(wù)的啟用、觸發(fā)或調(diào)用下一個(gè)任務(wù)
通常是最快的但同時(shí)也是脆弱的
給每個(gè)處理程序觸發(fā)回調(diào)
設(shè)置狀態(tài)窄俏、附加處理程序等等
狀態(tài)模式
Queues 隊(duì)列
比如跨階段傳遞buffers
Futures
當(dāng)每個(gè)任務(wù)產(chǎn)生結(jié)果時(shí)觸發(fā)
協(xié)調(diào)層位于連接或等待/通知之上
Using PooledExecutor 使用線程池執(zhí)行
一個(gè)可優(yōu)化的工作線程池
主方法執(zhí)行(Runnable r)
控制
任務(wù)隊(duì)列的類型(任何通道)
最大線程數(shù)
最小線程數(shù)
"Warm" 與按需加載線程
保持活動(dòng)間隔蹂匹,直到空閑線程死亡
如有必要,稍后將其替換為新的
飽和策略
阻塞凹蜈、下降限寞、生產(chǎn)運(yùn)行等
Multiple Reactor Threads 多個(gè)Reactor線程
使用Reactor線程池
用于匹配CPU和IO速率
靜態(tài)或動(dòng)態(tài)構(gòu)造
每個(gè)Reactor都有自己的選擇器,線程踪区,調(diào)度循環(huán)
主接收器分配到專用的Reactor
Using other java.nio features 使用其他的java.nio特性
一個(gè)Reactor對(duì)應(yīng)多個(gè)Selectors
將不同的處理程序綁定到不同的IO事件
調(diào)度需要仔細(xì)處理線程安全
文件傳輸
自動(dòng)化的文件到網(wǎng)絡(luò)或網(wǎng)絡(luò)到文件的復(fù)制
內(nèi)存映射文件
通過(guò)緩沖區(qū)訪問(wèn)文件
直接訪問(wèn)緩沖區(qū)
有可能實(shí)現(xiàn)零拷貝傳輸嗎
但是有設(shè)置和完成的開銷
最適合長(zhǎng)時(shí)間連接的應(yīng)用
Connection-Based Extensions 基礎(chǔ)連接的擴(kuò)展
不能使用單個(gè)服務(wù)請(qǐng)求
客戶端連接
客戶端發(fā)送一系列消息/請(qǐng)求
客戶端斷開連接
舉例
數(shù)據(jù)庫(kù)和事務(wù)監(jiān)控器
多人游戲昆烁,聊天等
可以擴(kuò)展基本的網(wǎng)絡(luò)服務(wù)模式
處理許多相對(duì)長(zhǎng)連接的客戶
跟蹤客戶端和會(huì)話狀態(tài)(包括丟棄)
分布式部署服務(wù)
原文:Doug Lea Scalable IO in Java