1. 概覽
Java中主要有三種IO模型烁设,分別是同步阻塞IO(BIO)、同步非阻塞IO(NIO)、異步非阻塞IO(AIO)。
Java IO的演進(jìn)宏多,其實(shí)是依賴于操作系統(tǒng)的IO操作接口實(shí)現(xiàn)。比如在Linux 2.6以后澡罚,Java中NIO和AIO都是通過(guò)epoll來(lái)實(shí)現(xiàn)的伸但;而在Windows上,AIO是通過(guò)IOCP來(lái)實(shí)現(xiàn)的留搔。
Linux(UNIX)操作系統(tǒng)中共有五種IO模型砌烁,分別是:阻塞IO模型、非阻塞IO模型催式、IO復(fù)用模型函喉、信號(hào)驅(qū)動(dòng)IO模型、異步IO模型荣月。
回顧
同步與異步
- 同步:發(fā)起一個(gè)調(diào)用后管呵,被調(diào)用者未處理完請(qǐng)求前,不向調(diào)用者返回響應(yīng)哺窄。
- 異步:發(fā)起一個(gè)調(diào)用后捐下,被調(diào)用者收到請(qǐng)求后立刻向調(diào)用者回應(yīng)已接收到請(qǐng)求账锹,但是被調(diào)用者并沒(méi)有返回結(jié)果,此時(shí)調(diào)用者可以處理其他操作坷襟,被調(diào)用者通常依靠事件奸柬,回調(diào)等機(jī)制來(lái)通知調(diào)用者其返回結(jié)果。
阻塞和非阻塞
- 阻塞:發(fā)起一個(gè)請(qǐng)求后婴程,調(diào)用者一直等待請(qǐng)求結(jié)果返回廓奕,也就是當(dāng)前線程會(huì)被掛起,無(wú)法從事其他任務(wù)档叔,只有當(dāng)條件就緒才能繼續(xù)桌粉。
- 非阻塞:發(fā)起一個(gè)請(qǐng)求后,調(diào)用者不用一直等著結(jié)果返回衙四,可以先去干其他事情铃肯。
這里介紹Java BIO/NIO/AIO 是結(jié)合Socket網(wǎng)絡(luò)I/O而談,位于 java.net.*
包下传蹈。BIO 使用 java.io.*
包下阻塞IO押逼;NIO使用 java.nio.*
包下非阻塞IO;AIO也就是NIO2.0版惦界,是在NIO基礎(chǔ)上提供異步支持挑格。
NIO 是 JDK1.4 提供的API。nio中n有兩層含義:
- new: 表示新的io接口
- Non-blocking: 非阻塞
AIO 是 JDK1.7 在
java.nio.*
包下提供的API
2. BIO
Blocking I/O表锻,同步阻塞I/O模式,數(shù)據(jù)的讀取寫入必須阻塞在一個(gè)線程內(nèi)等待其完成乞娄。
我們常說(shuō)的I/O一般都是指BIO瞬逊。
2.1. BIO 基本模型
2.1.1. 特點(diǎn)描述
Socket 的連接(accept()
),數(shù)據(jù)的讀寫(read()
/write()
)仪或,都是阻塞的确镊。請(qǐng)求一旦建立連接,就無(wú)法再接收其他連接范删。
2.1.2. 代碼示例
public class SocketIO {
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(9090); // #1
System.out.println("1. new ServerSocket(9090)");
Socket client = server.accept(); // 阻塞1 // #2
System.out.println("2. client connect\t" + client.getPort());
InputStream in = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
System.out.println(reader.readLine()); // 阻塞2 // #3
while (true) {
}
}
}
上述代碼中蕾域,有兩處阻塞,分別是 server.accept()
和 reader.readLine()
到旦。
使用
strace -ff -o outlog java SocketIO
命令查看JVM調(diào)用了內(nèi)核(kernel)的方法過(guò)程
#1
位置 new ServerSocket(9090)
旨巷,調(diào)用了內(nèi)核:
socket(···) = 5 # 創(chuàng)建一個(gè)未連接的Socket,5是個(gè)fd(文件描述符)指向這個(gè)socket
bind(5, 9090, ···) # 綁定9090端口到socket上
listen(5, ···) # 把這個(gè)socket監(jiān)聽(tīng)起來(lái)
#2
位置 server.accept()
添忘,調(diào)用了內(nèi)核:
poll([{fd=5, ···}], 1, -1) = 1 ([{fd=5, revents=POLLIN}]) # 等待請(qǐng)求過(guò)來(lái)連接采呐。如果沒(méi)有請(qǐng)求將阻塞,-1表示無(wú)限阻塞搁骑;有請(qǐng)求時(shí)斧吐,5fd退出監(jiān)聽(tīng)交給accept建立連接
accept(5, {客戶端信息}, [28]) = 6 # 從5fd中創(chuàng)建一個(gè)新的socket又固,連接client和server,釋放5fd煤率,讓其繼續(xù)處于監(jiān)聽(tīng)狀態(tài)仰冠,等待下一個(gè)連接
# 如果使用jdk1.4之前的版本運(yùn)行,只調(diào)用了內(nèi)核的 accept 方法蝶糯,接收連接阻塞在此處
#3
位置 server.accept()
洋只,調(diào)用了內(nèi)核:
recvfrom(6, "hahahaha\n", 8192, 0, NULL, NULL) = 9 # 從6fd這個(gè)socket上讀取接收到的數(shù)據(jù),如果socket上沒(méi)有消息裳涛,將在此阻塞
# 如果使用jdk1.4之前的版本運(yùn)行木张,調(diào)用了內(nèi)核的 recv 方法,讀數(shù)據(jù)阻塞在此處
2.1.3. 問(wèn)題瓶頸
主要問(wèn)題突出點(diǎn):
- 接收客戶端連接阻塞
- 讀取/寫入數(shù)據(jù)阻塞
- 只能接收一個(gè)連接
2.2. BIO 傳統(tǒng)模型
2.2.1. 特點(diǎn)描述
服務(wù)端使用一個(gè)Acceptor線程端三,用于監(jiān)聽(tīng)接收請(qǐng)求舷礼,收到請(qǐng)求后,創(chuàng)建Socket和一個(gè)新線程來(lái)服務(wù)客戶端郊闯。
這種模型特點(diǎn)是請(qǐng)求數(shù)與線程數(shù)1:1
2.2.2. 代碼示例
public class SocketIO2 {
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(9090);
System.out.println("1. new ServerSocket(9090)");
while (true) {
Socket client = server.accept(); // 阻塞1
System.out.println("2. client connect\t" + client.getPort());
Thread thread = new Thread(() -> {
try {
InputStream in = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line); // 阻塞2
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("client closed\t" + client.getPort());
});
thread.start();
}
}
}
上述代碼妻献,可以實(shí)現(xiàn)多個(gè)客戶都安連接,但是還存在兩處阻塞团赁。一個(gè)連接育拨,占用一個(gè)線程。
2.1.3. 問(wèn)題瓶頸
主要問(wèn)題突出點(diǎn):
- 接收客戶端連接阻塞
- 讀取/寫入數(shù)據(jù)阻塞
- 服務(wù)器不能連接過(guò)多請(qǐng)求(C10K問(wèn)題)
2.3. BIO 偽異步模型
2.3.1. 特點(diǎn)描述
服務(wù)端的連接響應(yīng)欢摄,使用線程池管理熬丧。可實(shí)現(xiàn)請(qǐng)求數(shù)與線程m:n(m可以大于n)
服務(wù)端性能的優(yōu)化怀挠,取決于線程池的優(yōu)化
2.3.2. 代碼示例
public class SocketIO3 {
private static int THREAD_POOL_SIZE = 1;
private static ThreadPoolExecutor THREAD_POOL = getThreadPoolExecutor();
private static ThreadPoolExecutor getThreadPoolExecutor() { // 服務(wù)性能的優(yōu)化析蝴,取決于線程池的優(yōu)化
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "server-thread-" + mThreadNum.getAndIncrement());
}
};
return new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(THREAD_POOL_SIZE), threadFactory);
}
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(9090);
System.out.println("1. new ServerSocket(9090)");
while (true) {
Socket client = server.accept(); // 阻塞1
System.out.println("2. client connect\t" + client.getPort());
try {
THREAD_POOL.execute(() -> responseHandler(client)); // 響應(yīng)處理
} catch (RejectedExecutionException e) {
e.printStackTrace();
rejectedHandler(client); // 線程池已滿,拒絕處理
}
}
}
private static void responseHandler(Socket client) {
try {
InputStream in = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line); // 阻塞2
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("client closed\t" + client.getPort());
}
private static void rejectedHandler(Socket client) throws Exception {
OutputStream out = client.getOutputStream();
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
writer.write("請(qǐng)求已達(dá)上線");
writer.close();
client.close();
}
}
2.3.3. 問(wèn)題瓶頸
主要問(wèn)題突出點(diǎn):
- 接收客戶端連接阻塞
- 讀取/寫入數(shù)據(jù)阻塞
- 服務(wù)器不能連接過(guò)多請(qǐng)求(比 2.2 會(huì)好一點(diǎn))
2.4 總結(jié)
BIO從底層看绿淋,如果不能從底層根本上結(jié)果阻塞問(wèn)題闷畸,其性能問(wèn)題就不能得到有效解決。這種模型單一簡(jiǎn)單吞滞,適用于連接不多的場(chǎng)景佑菩。
3. NIO
- Non-blocking: NIO 流是非阻塞 IO 而 IO 流是阻塞 IO,非阻塞體現(xiàn)在 socket網(wǎng)絡(luò)裁赠,內(nèi)核機(jī)制上殿漠。
- new: NIO 有3個(gè)核心組件/特性:Channel、Buffer佩捞、Selector凸舵,體現(xiàn)在JDK上。
3.1. NIO 基本模型
3.1.1. 特點(diǎn)描述
Java中非阻塞IO失尖,JDK中可通過(guò) sockeChannel.configureBlocking(false)
把 accept()
和 read()/write()
設(shè)置成非阻塞啊奄,調(diào)用系統(tǒng)內(nèi)核時(shí)渐苏,如果沒(méi)有連接/數(shù)據(jù)讀寫,就返回-1菇夸。
SocketChannel 中提供數(shù)據(jù)的流入流出兩個(gè)通道琼富。數(shù)據(jù)讀寫時(shí),先直接由操作系統(tǒng)將數(shù)據(jù)放入Buffer緩沖區(qū)中庄新。JVM中Server對(duì)數(shù)據(jù)的讀寫直接訪問(wèn)Byffer緩沖區(qū)鞠眉。
Channel 與 Buffer 搭配:
- 多個(gè)channel配一個(gè)buffer
- 一個(gè)channel配一個(gè)buffer(使用最多配置)
- 一個(gè)channel配兩個(gè)buffer(一個(gè)讀一個(gè)寫)
Buffer 緩沖區(qū)創(chuàng)建位置:
- JVM內(nèi)(堆內(nèi))
- JVM外(堆外,由OS管理分配)
服務(wù)器與外部收發(fā)數(shù)據(jù)择诈,首先由OS管理械蹋,由系統(tǒng)將網(wǎng)卡數(shù)據(jù)放到對(duì)外,如果JVM想要使用羞芍,需要將數(shù)據(jù)拷貝到JVM內(nèi)部哗戈,存在JVM內(nèi)外來(lái)回拷貝,影響性能還浪費(fèi)空間荷科。
Linux內(nèi)核支持共享空間(mmap)唯咬,可以將網(wǎng)卡數(shù)據(jù)直接放到共享空間內(nèi)存,這樣JVM和OS就可以共同使用畏浆。Java中的 零拷貝 就是基于此胆胰。ByteBuffer 的創(chuàng)建位置,和 channel 的搭配方式根據(jù)實(shí)際應(yīng)用場(chǎng)景靈活選擇
3.1.2. 代碼示例
public class SocketNIO {
private static final String CHARSET = "UTF-8";
public static void main(String[] args) throws Exception {
List<SocketChannel> clientList = new LinkedList<>();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(9090)); // 完成端口綁定刻获,socket監(jiān)聽(tīng)
ssc.configureBlocking(false); // false指定該ServerSocketChannel為非阻塞 // #1
System.out.println("1. ServerSocketChannel 9090 non-blocking");
while (true) {
Thread.sleep(1000);
SocketChannel client = ssc.accept(); // 接受連接非阻塞
if (client == null) {
System.out.print(".");
} else {
client.configureBlocking(false); // #2
System.out.println();
System.out.println("2. client connect\t" + client.socket().getPort());
clientList.add(client);
}
ByteBuffer buffer = ByteBuffer.allocateDirect(1024); // 配置緩沖區(qū)蜀涨,可以堆內(nèi),可以堆外(allocate())
for (SocketChannel sc : clientList) {
int len = sc.read(buffer); // 讀取數(shù)據(jù)蝎毡,非阻塞
if (len > 0) {
buffer.flip();
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);
buffer.clear();
String data = new String(bytes, CHARSET);
System.out.println();
System.out.println(String.format("received data[%-5d]: %s", sc.socket().getPort(), data));
}
}
}
}
}
#1
位置厚柳,讓服務(wù)端接后客戶端連接時(shí) ssc.accept()
非阻塞
#2
位置,與客戶端連接的Socket讀/寫數(shù)據(jù)時(shí) sc.read(buffer)
非阻塞
運(yùn)行代碼顶掉,JVM調(diào)用了內(nèi)核一下方法
socket(···) = 6
bind(6, 9090, ···)
listen(6, ···)
# 以上三步同BIO一樣
# ssc.configureBlocking(false);
fcntl(6, F_SETFL, O_RDWR|O_NONBLOCK) = 0 # 設(shè)置6fd為非阻塞
# accept 不再阻塞草娜。沒(méi)有連接時(shí)挑胸,返回-1痒筒;有連接時(shí),返回建立好的連接
accept(6, xxx, xxx) = -1 EAGAIN (Resource temporarily unavailable)
accept(6, {客戶端信息}, [28]) = 7
# read 讀取數(shù)據(jù)不在阻塞茬贵。channel沒(méi)有數(shù)據(jù)返回-1
read(7, 0x7f92b428e590, 1024) = -1 EAGAIN (Resource temporarily unavailable)
read(7, "hello\n", 1024) = 6
3.1.3. 問(wèn)題瓶頸
雖然NIO接收客戶端連接和讀/寫數(shù)據(jù)都不在阻塞簿透,但是上面的額連接、讀寫都在一個(gè)線程解藻,操作是串行化的老充。高并發(fā)下,后面的讀寫操作降低了可接入連接的性能螟左》茸牵可將客戶端連接由一個(gè)線程處理觅够,數(shù)據(jù)讀寫由兩一個(gè)線程處理。
資源浪費(fèi):
-
while(true)
中巷嚣,每次都要詢問(wèn)/調(diào)用accept()
喘先,看是否由客戶端請(qǐng)求接入。 - 如果接入了 1w/100w+ 個(gè)連接廷粒,每次都需要把所有連接
read()
一下看是否有數(shù)據(jù)過(guò)來(lái)(復(fù)雜度O(n))窘拯。
3.2. NIO 多路復(fù)用模型
3.2.1. 特點(diǎn)描述
Selector 多路復(fù)用器,JDK中使用Selector對(duì)象來(lái)注冊(cè)/管理Channel(包含Server和Client)坝茎,當(dāng)有事件觸發(fā)時(shí)涤姊,使用 selector.select()
和 selector.selectedKeys()
來(lái)獲取有事件觸發(fā)的Channel,我們根據(jù)事件類型來(lái)做相應(yīng)處理嗤放。
注意Selector獲取的是一個(gè)Channel狀態(tài)思喊,數(shù)據(jù)的讀/寫還是需要用戶自己觸發(fā),即讀寫過(guò)程依然是同步斤吐。
Selector 的實(shí)現(xiàn)依賴內(nèi)核支持搔涝,如:select/poll/epoll等
select/poll: 每次詢問(wèn)是否有事件到達(dá),需要傳入所有socket fd和措,只是所有fd的循環(huán)遍歷交給了內(nèi)存來(lái)完成(JDK不再干這個(gè)事)庄呈。
epoll: 先創(chuàng)建一個(gè)Selector fd,然后所有socket都添加到這個(gè)fd中派阱,只添加一次即可诬留。由這個(gè)fd管理所有socket的事件。
3.2.2. 代碼示例
ServerSocketChannel
的有效事件為 OP_ACCEPT
贫母。
SocketChannel
的有效事件為 OP_CONNECT
文兑、OP_READ
、OP_WRITE
/**
* NIO腺劣,單線程多路復(fù)用
*/
public class SocketNIOMultiplexing {
private ServerSocketChannel server;
private Selector selector; // 多路復(fù)用器
public void initServer() throws IOException {
server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(9090));
server.configureBlocking(false); // accept非阻塞
selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT); // 將ServerSocketChannel注冊(cè)到Selector中绿贞,注冊(cè)過(guò)程創(chuàng)建SelectionKey,表示Channel向Selector中注冊(cè)的token
System.out.println("1. Server started [ServerSocketChannel Selector 9090]");
}
public void start() throws IOException {
initServer();
while (true) {
if (selector.select(0) < 1) { // 0表示每次詢問(wèn)不阻塞橘原,立即返回籍铁;非0表示阻塞時(shí)間
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 取出有效的key
System.out.println("2. selector 中有事件狀態(tài)進(jìn)來(lái) - size=" + selectionKeys.size());
for (Iterator<SelectionKey> it = selectionKeys.iterator(); it.hasNext(); it.remove()) { // 注意,這里key處理后要從keys中remove趾断,不然下次會(huì)會(huì)獲取沒(méi)有事件key拒名,poll進(jìn)入死循環(huán)
SelectionKey selectionKey = it.next();
if (selectionKey.isAcceptable()) { // selectionKey中持用的Socket狀態(tài)是可連接
acceptHandler(selectionKey); // 從socket(Server)中建立連接
} else if (selectionKey.isReadable()) { // selectionKey中持用的Socket狀態(tài)是可讀
readHandler(selectionKey); // 從socket(Client)中讀取數(shù)據(jù)
}
}
}
}
private void acceptHandler(SelectionKey selectionKey) {
ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
System.out.println("3.1 接受連接");
try {
SocketChannel client = ssc.accept();
client.configureBlocking(false); // 非阻塞
ByteBuffer buffer = ByteBuffer.allocate(1024); // 為該client配置一個(gè)buffer,堆內(nèi)
client.register(selector, SelectionKey.OP_READ, buffer); // 將SocketChannel注冊(cè)到Selector中
System.out.println("一個(gè)新客戶端連接: client=" + client.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
}
private void readHandler(SelectionKey selectionKey) {
SocketChannel sc = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
System.out.println("3.2 讀取數(shù)據(jù)");
buffer.clear(); // 讀之前默認(rèn)清空一次buffer
try {
while (true) {
int len = sc.read(buffer); // 讀到數(shù)據(jù)返回?cái)?shù)據(jù)字節(jié)長(zhǎng)度芋酌;流沒(méi)有結(jié)束增显,沒(méi)有數(shù)據(jù)返回0,流結(jié)束返回-1
if (len > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
sc.write(buffer); // 把數(shù)據(jù)直接返回去
}
buffer.clear();
} else if (len == 0) {
break;
} else { // -1 注意bug脐帝,當(dāng)tcp連接處于 close_wait 時(shí)同云,selectionKey.isReadable()返回true糖权,這里出現(xiàn)死循環(huán),cpu飆高
sc.close(); // 去掉bug炸站,這里client close温兼,主要是使用 key.cancel(); 從多路復(fù)用器的key集合中移除
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
SocketNIOMultiplexing server = new SocketNIOMultiplexing();
server.start();
}
}
運(yùn)行代碼,JVM調(diào)用了內(nèi)核一下方法
socket(···) = 6
bind(6, 9090, ···)
listen(6, ···)
# 以上三步同BIO一樣
fcntl(6, F_SETFL, O_RDWR|O_NONBLOCK) = 0 # 設(shè)置6fd非阻塞
pipe([7, 8]) # 創(chuàng)建一個(gè)channel武契,7fd表示管道的讀取端募判,8fd表示管道的寫端
fcntl(7, F_SETFL, O_RDONLY|O_NONBLOCK) = 0 # 設(shè)置7fd只讀端口非阻塞
fcntl(8, F_SETFL, O_WRONLY|O_NONBLOCK) = 0 # 設(shè)置8fd只寫端口非阻塞
# 創(chuàng)建 Selector 實(shí)例
epoll_create(256) = 9 # 創(chuàng)建一個(gè)新的epoll實(shí)例9fd
# channel.register(selector)
epoll_ctl(9, EPOLL_CTL_ADD, 7, ···) = 0 # 將7fd注冊(cè)到9fd上,并將事件連接到7fd上
epoll_wait(9, ···) # 阻塞咒唆,9fd上等待事件到來(lái)觸發(fā)
# client連接
rt_sigaction(SIGRT_30, ···, 8) = 0 # 信號(hào)安裝登記
accept(6, {客戶端信息}, [28]) = 10
3.2.3. 問(wèn)題瓶頸
上面的模型不需要每次詢問(wèn) accept()
和所有client的 read()
届垫,每次只需要詢問(wèn)一次多路復(fù)用器Selector即可,顯然復(fù)雜度降為O(1)全释,解決了資源浪費(fèi)問(wèn)題装处。
但是Selector詢問(wèn)、Channel客戶端連接浸船、數(shù)據(jù)讀寫依然串行化妄迁。可使用多線將這個(gè)三者分開李命,Netty的模型就是基于此設(shè)計(jì)登淘。
3.3. NIO 多路復(fù)用模型-多線程
3.3.1. 特點(diǎn)描述
把 3.2 中的問(wèn)題,進(jìn)行多線程分開封字。boss線程接收連接請(qǐng)求后黔州,快速記錄,交給worker線程阔籽,worker線程負(fù)責(zé)耗時(shí)的accept和數(shù)據(jù)的讀寫操作流妻。(這里就有些netty的味道了)
3.3.2. 代碼示例
/**
* NIO,多路復(fù)用笆制,多線程
*/
public class SocketNioMultiplexing2 {
private static final String CHARSET = "UTF-8";
private ServerSocketChannel server;
private NioThread boss;
private NioThread[] workers;
static class NioThread extends Thread {
private static volatile int workerNum = 0; // worker線程數(shù)量
private static BlockingQueue<SocketChannel>[] workerQueues; // SocketChannel隊(duì)列绅这,長(zhǎng)度=workerNum
private static AtomicInteger workerBalance = new AtomicInteger();
private Selector selector; // 多路復(fù)用器
private Integer tid; // 線程ID,null表示boss線程
public void config(Selector selector, Integer tid) {
this.selector = selector;
this.tid = tid;
if (tid == null) {
System.out.println("boss thread ready");
this.setName("boss");
} else {
System.out.println("worker-" + tid + " thread ready");
this.setName("worker-" + tid);
}
}
@Override
public void run() {
System.out.println(">> " + Thread.currentThread().getName() + " start");
try {
if (tid == null) {
bossLoop();
} else {
workerLoop();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void bossLoop() throws IOException {
while (true) {
if (selector.select(10) < 1) { // 詢問(wèn)是否有事件到達(dá)在辆,最多阻塞10ms
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("2. boss-selector 中有請(qǐng)求事件狀態(tài)進(jìn)來(lái) - size=" + selectionKeys.size());
for (Iterator<SelectionKey> it = selectionKeys.iterator(); it.hasNext(); it.remove()) {
SelectionKey selectionKey = it.next();
if (!selectionKey.isAcceptable()) {
continue;
}
ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
try {
SocketChannel client = ssc.accept();
client.configureBlocking(false);
int tid = workerBalance.getAndIncrement() % workerNum; // 負(fù)載均衡
workerQueues[tid].add(client);
System.out.println("3. 連接請(qǐng)求加入隊(duì)列-" + tid);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private void workerLoop() throws IOException {
while (true) {
acceptHandler(workerQueues[tid]); // 建立連接
if (selector.select(10) < 1) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("2. worker-selector 中有讀取事件狀態(tài)進(jìn)來(lái) - size=" + selectionKeys.size());
for (Iterator<SelectionKey> it = selectionKeys.iterator(); it.hasNext(); it.remove()) {
SelectionKey selectionKey = it.next();
if (selectionKey.isReadable()) {
readHandler(selectionKey); // 讀取數(shù)據(jù)
}
}
}
}
private void acceptHandler(BlockingQueue<SocketChannel> queue) throws IOException {
SocketChannel client = queue.poll();
if (client == null) {
return;
}
// 建立Channel連接证薇,配置緩沖器
ByteBuffer buffer = ByteBuffer.allocate(1024); // 字節(jié)對(duì)齊?
client.register(this.selector, SelectionKey.OP_READ, buffer);
System.out.println("4. 一個(gè)新客戶端連接: client=" + client.getRemoteAddress());
}
private void readHandler(SelectionKey selectionKey) {
SocketChannel sc = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
System.out.println("3 讀取數(shù)據(jù)");
buffer.clear(); // 讀之前默認(rèn)清空一次buffer
try {
while (true) {
int len = sc.read(buffer);
if (len > 0) {
buffer.flip();
while (buffer.hasRemaining()) { // Echo應(yīng)答
ByteBuffer prefix = ByteBuffer.allocate(20);
prefix.put("Echo:".getBytes(CHARSET));
prefix.flip();
sc.write(new ByteBuffer[]{prefix, buffer});
}
buffer.clear();
} else if (len == 0) {
break;
} else {
sc.close();
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public SocketNioMultiplexing2 init(NioThread boss, NioThread[] workers) throws IOException {
this.boss = boss;
this.workers = workers;
server = ServerSocketChannel.open();
server.configureBlocking(false);
Selector boosSelector = Selector.open();
boss.config(boosSelector, null);
server.register(boosSelector, SelectionKey.OP_ACCEPT);
int workerNum = workers.length;
NioThread.workerNum = workerNum;
NioThread.workerQueues = new LinkedBlockingQueue[workerNum];
for (int i = 0; i < workerNum; i++) {
workers[i] = new NioThread();
workers[i].config(Selector.open(), i); // worker 線程开缎,指定線程ID
NioThread.workerQueues[i] = new LinkedBlockingQueue<>();
}
return this;
}
public SocketNioMultiplexing2 port(int port) throws IOException {
server.bind(new InetSocketAddress(port));
return this;
}
public void start() throws IOException {
boss.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (NioThread worker : workers) {
worker.start();
}
System.out.println(String.format("1. Server started [boss: %s, workers: %d]", server.getLocalAddress().toString(), workers.length));
}
public static void main(String[] args) throws IOException {
NioThread boss = new NioThread(); // boss 線程
NioThread[] workers = new NioThread[2]; // worker 線程
SocketNioMultiplexing2 server = new SocketNioMultiplexing2();
server.init(boss, workers)
.port(9090)
.start();
}
}
4. AIO
Asynchronous I/O棕叫,異步非阻塞I/O模型林螃。這里的 異步 是基于事件和回調(diào)機(jī)制實(shí)現(xiàn)的奕删,也就是應(yīng)用操作之后會(huì)直接返回,不會(huì)堵塞在那里疗认,當(dāng)后臺(tái)處理完成完残,操作系統(tǒng)會(huì)通知相應(yīng)的線程進(jìn)行后續(xù)的操作伏钠。
在大多數(shù)業(yè)務(wù)場(chǎng)景中,我們往往在讀/寫數(shù)據(jù)時(shí)需要阻塞谨设,獲取讀取到的數(shù)據(jù)/寫入數(shù)據(jù)狀態(tài)等熟掂。
目前來(lái)看,Linux上 AIO 的應(yīng)用還不真正的異步扎拣,可以說(shuō)時(shí)偽異步赴肚,Netty 之前也嘗試使用過(guò) AIO,不過(guò)又放棄回歸到NIO上二蓝。
參考