目前移動互聯(lián)網(wǎng)應用非常流行,如微信炭晒,抖音等都需要實時與服務器通信崎页,獲取實時的信息,這種獲取信息的方式就是通過長連接socket實現(xiàn)的腰埂,那就需要了解服務端是如何處理高并發(fā)IO的飒焦。
IO(BIO)
在jdk1.4之前,java中的IO類庫實在是超級原始,很多我們現(xiàn)在熟知的概念都還沒有出現(xiàn)牺荠,比如說管道翁巍、緩沖區(qū)等等。正是由于這些等等原因休雌,C語言和C++一直都是IO方面的首選灶壶。這是原始的IO方式,也叫作BIO杈曲,它的原理很簡單驰凛,我們使用一張圖來表示一下:
也就是說BIO時代,每次有一個客戶端連接進來的時候担扑,都會有一個新的線程去處理恰响,缺點顯而易見,如果連接比較多的時候涌献,我們就要建立大量的線程去一一處理胚宦。
NIO
jdk1.4開始被正式發(fā)布了,做出的一個巨大的改變就是新增了NIO包燕垃。它提供了很多異步的IO操作方法枢劝,比如說緩沖區(qū)ByteBuffer、Pipe卜壕、Channel還有多路復用器Selector等等您旁。新的NIO類庫的出現(xiàn),極大地促進了java對異步非阻塞式編程的發(fā)展轴捎。NIO的原理也是很簡單被冒。在這里同樣使用一張圖來演示一遍:
現(xiàn)在我們可以看到,所有的客戶端連接都可以只用一個線程就可以實現(xiàn)了轮蜕。
AIO
在2011年7月28日昨悼,官方將用了將近十年的NIO類庫做了升級,也被稱為NIO2.0跃洛。后來也叫作AIO率触。AIO的原理是在之前的基礎上進行的改進,意思是異步非阻塞式IO汇竭,也就是說你的客戶端在進行讀寫操作的時候葱蝗,只需要給服務器發(fā)送一個請求,不用一直等待回答就可以去做其他的事了细燎。
由于AIO過于復雜两曼,并沒有廣泛使用,這里就不展開說了玻驻。
NIO解決的問題
Nio要解決的問題網(wǎng)上的解釋一大堆悼凑,諸如銀行取號偿枕、餐廳點餐等等。這些列子就不再具體地重復了户辫,實際上就是為了使用現(xiàn)有的資源提供更高的生產(chǎn)效率渐夸。
如何提高呢?舉個簡單例子渔欢,一個汽車生產(chǎn)廠商有若干條生產(chǎn)線(一條生產(chǎn)線負責汽車制造的所有環(huán)節(jié))墓塌,每個生產(chǎn)線都有相同的工人數(shù)目,每個工人都負責一個生產(chǎn)環(huán)節(jié)奥额,也就是說生產(chǎn)發(fā)動機和生產(chǎn)輪胎的工人數(shù)目是一樣的苫幢,但是很明顯生產(chǎn)發(fā)動機需要的時間肯定比輪胎要長很多,那么在每一條生產(chǎn)線上生產(chǎn)發(fā)動機的那個工人往往滿負荷工作垫挨,而生產(chǎn)輪胎的工人卻很閑韩肝,這樣生產(chǎn)效率很低。因此廠家打破了這種一條生產(chǎn)線生產(chǎn)汽車所有環(huán)節(jié)的模式棒拂,改為一個汽車零部件一條生產(chǎn)線,那么在發(fā)動機生產(chǎn)線雇傭的工人數(shù)目一定多于輪胎生產(chǎn)線玫氢,這樣每條生產(chǎn)線的工人都不會閑著帚屉,通過資源的合理分配最大化利用了工人的價值,提高了生產(chǎn)效率漾峡,賺取了剩余價值攻旦。
而如何通過資源合理分配來提高生產(chǎn)效率就是nio在計算機io領域要解決的問題。
同步/異步生逸、阻塞/非阻塞
同步和異步針對應用程序來牢屋,關注的是程序中間的協(xié)作關系;
阻塞與非阻塞更關注的是單個進程的執(zhí)行狀態(tài)槽袄。
同步:執(zhí)行一個操作之后烙无,等待結果,然后才繼續(xù)執(zhí)行后續(xù)的操作遍尺。
異步:執(zhí)行一個操作后截酷,可以去執(zhí)行其他的操作,然后等待通知再回來執(zhí)行剛才沒執(zhí)行完的操作乾戏。
阻塞:進程給CPU傳達一個任務之后迂苛,一直等待CPU處理完成,然后才執(zhí)行后面的操作鼓择。
非阻塞:進程給CPU傳達任我后三幻,繼續(xù)處理后續(xù)的操作,隔斷時間再來詢問之前的操作是否完成呐能。這樣的過程其實也叫輪詢念搬。
什么是Reactor
后期隨著jdk的一步步發(fā)展,nio非堵塞技術開始變多越來越加廣泛,可分為三種reactor模式锁蠕。
Reactor可以理解為反應器模式夷野。當一個主體發(fā)生改變時,所有依屬體都得到通知荣倾。不過悯搔,觀察者模式與單個事件源關聯(lián),而反應器模式則與多個事件源關聯(lián) 舌仍。
NIO 有一個主要的類Selector妒貌,這個類似一個觀察者,只要把需要探知的SocketChannel告訴Selector铸豁,接著做別的事情灌曙,當有事件發(fā)生時,他會通知我們节芥,傳回一組SelectionKey在刺,讀取這些Key,就會獲得剛剛注冊過的SocketChannel头镊,然后蚣驼,我們從這個Channel中讀取數(shù)據(jù),接著可以處理這些數(shù)據(jù)相艇。
單線程的Reactor模式
Reactor里面的單線程模式的結構圖:
當有多個請求發(fā)送到server的時候颖杏,會經(jīng)過反應器對其進行處理,相應的代碼如下所示:
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @author idea
* @data 2019/4/11
*/
@Slf4j
public class NioServer {
public static void main(String[] args) {
try {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(9090));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server is open!");
while (true) {
if (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int len = 0;
//當管道的數(shù)據(jù)都讀取完畢了
while ((len = (socketChannel.read(byteBuffer))) > 0) {
byteBuffer.flip();
System.out.println(new String(byteBuffer.array(), 0, len));
byteBuffer.clear();
}
} else if (selectionKey.isAcceptable()) {
//第一次鏈接到server坛芽,需要構建一個通道
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) selectionKey.channel();
//開通通道
SocketChannel socketChannel = acceptServerSocketChannel.accept();
//設置為非堵塞
socketChannel.configureBlocking(false);
//注冊可讀的監(jiān)聽事件
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("[server]接收到新的鏈接");
}
iterator.remove();
}
}
}
} catch (IOException e) {
log.error("[server]異常出現(xiàn)留储,信息為{}", e);
}
}
}
單線程模式的Reactor有一個很明顯的缺點,那就是在處理請求的時候咙轩,對于不同狀態(tài)的通道處理获讳,以及請求的監(jiān)聽全部都放在了單個線程上進行,(多個Channel可以注冊到同一個Selector對象上活喊,實現(xiàn)了一個線程同時監(jiān)控多個請求狀態(tài)(Channel))因此效率很低下赔嚎。因此就會有了第二種Reactor模式。
多線程的Reactor模式
在原先的單線程模式中胧弛,一個線程同時處理多個請求尤误,但是所有的讀寫請求以及對于數(shù)據(jù)的處理都在同一線程中,無法充分利用多cpu的優(yōu)勢结缚,因此誕生了這種多線程的Reactor模式损晤。
多線程的Reactor模式基本結構圖如下所示:
代碼如下所示:
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @author idea
* @data 2019/4/11
*/
@Slf4j
public class Server {
public static void main(String[] args) {
try {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(9090));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("[server]開始啟動服務器");
while (true) {
if (selector.selectNow() < 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()) {
Processor processor = (Processor) selectionKey.attachment();
processor.process(selectionKey);
} else if (selectionKey.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
//綁定處理器線程
key.attach(new Processor());
System.out.println("[server]接收到新的鏈接");
}
iterator.remove();
}
}
} catch (IOException e) {
log.error("[server]異常出現(xiàn),信息為{}", e);
}
}
}
從代碼可以看到红竭,每次當系相應的channel注冊完相應的OP_READ事件后尤勋,可以對相應的SelectionKey attach一個對象(本例中attach了一個Processor對象喘落,該對象處理讀請求),并且在獲取到可讀事件后最冰,可以取出該對象瘦棋。
再看到相應的Processor對象代碼:
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 處理器
*
* @author idea
* @data 2019/4/11
*/
public class Processor {
private static final ExecutorService service = new ThreadPoolExecutor(16, 16,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
public void process(SelectionKey selectionKey) {
service.submit(() -> {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
int count = socketChannel.read(buffer);
if (count < 0) {
socketChannel.close();
selectionKey.cancel();
System.out.println("讀取結束!");
return null;
} else if (count == 0) {
return null;
}
System.out.println("讀取內(nèi)容:" + new String(buffer.array()));
return null;
});
}
}
需要開啟一個線程池來進行數(shù)據(jù)處理的任務暖哨。這里面就將數(shù)據(jù)處理的壓力分擔給了線程池來執(zhí)行赌朋,充分利用了多線程的優(yōu)勢,將新線程的連接和數(shù)據(jù)的io操作分別放在了不同的線程中進行運行篇裁。
在上述的多線程Reactor模式中沛慢,有專門的nio-acceptor線程來用于監(jiān)聽服務器,接收客戶端的tcp連接达布。然后又有專門的線程池來處理消息的讀取团甲,發(fā)送,編碼解碼等工作黍聂。一個nio同時處理N條鏈路躺苦,每個鏈路只對應一個NIO線程。(防止了并發(fā)操作的發(fā)生)产还∑ダ澹看似這樣的安排很美好,也確實能解決大多數(shù)應用場景的問題雕沉。
但是在極端情況下仍然會有弊端集乔,單獨的NIO線程負責監(jiān)聽和處理所有的客戶端連接可能會存在性能問題去件。例如并發(fā)百萬客戶端連接坡椒,或者服務端需要對客戶端握手進行安全認證躏尉,但是認證本身非常損耗性能涯保。在這類場景下,單獨一個Acceptor線程可能會存在性能不足問題空郊,為了解決性能問題宫莱,產(chǎn)生了第三種Reactor線程模型-主從Reactor多線程模型丈攒。
主從Reactor多線程模式
主從Reactor線程模型的特點是:服務端用于接收客戶端連接的不再是個1個單獨的NIO線程,而是一個獨立的NIO線程池授霸。Acceptor接收到客戶端TCP連接請求處理完成后(可能包含接入認證等)巡验,將新創(chuàng)建的SocketChannel注冊到IO線程池(subreactor線程池)的某個IO線程上,由它負責SocketChannel的讀寫和編解碼工作碘耳。Acceptor線程池僅僅只用于客戶端的登陸显设、握手和安全認證,一旦鏈路建立成功辛辨,就將鏈路注冊到后端SubReactor線程池的IO線程上捕捂,由IO線程負責后續(xù)的IO操作瑟枫。
相應代碼:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @author idea
* @data 2019/4/11
*/
public class Server {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(1234));
//初始化通道,標志為accept類型
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
int coreNum = Runtime.getRuntime().availableProcessors();
Processor[] processors = new Processor[coreNum];
for (int i = 0; i < processors.length; i++) {
processors[i] = new Processor();
}
int index = 0;
//一直處于堵塞的狀態(tài)
while (selector.select() > 0) {
//獲取到selectionkey的集合
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("Accept request from {}" + socketChannel.getRemoteAddress());
Processor processor = processors[(int) ((index++) / coreNum)];
processor.addChannel(socketChannel);
}
iterator.remove();
}
}
}
}
處理器部分的代碼:
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class Processor {
private static final ExecutorService service =
Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
private Selector selector;
public Processor() throws IOException {
this.selector = SelectorProvider.provider().openSelector();
start();
}
public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
socketChannel.register(this.selector, SelectionKey.OP_READ);
}
public void start() {
service.submit(() -> {
while (true) {
if (selector.selectNow() <= 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) key.channel();
int count = socketChannel.read(buffer);
if (count < 0) {
socketChannel.close();
key.cancel();
System.out.println("讀取結束" + socketChannel);
continue;
} else if (count == 0) {
System.out.println("客戶端信息大兄冈堋:" + socketChannel);
continue;
} else {
System.out.println("客戶端信息:" + new String(buffer.array()));
}
}
}
}
});
}
}
通常在互聯(lián)網(wǎng)公司中慷妙,對于一些高并發(fā)的應用場景里面都會使用到了Reactor模式,其代替了常用的多線程處理方式允悦,節(jié)省系統(tǒng)的資源膝擂,提高系統(tǒng)的吞吐量。類似于一些netty框架的核心原理其實就是通過nio的Reactor模式來進行設計和開發(fā)澡屡。