本文轉(zhuǎn)載自:https://github.com/jasonGeng88/blog
本系列文章將整理到我在GitHub上的《Java面試指南》倉(cāng)庫(kù)误债,更多精彩內(nèi)容請(qǐng)到我的倉(cāng)庫(kù)里查看
喜歡的話麻煩點(diǎn)下Star哈
文章將同步到我的個(gè)人博客:
本文是微信公眾號(hào)【Java技術(shù)江湖】的《不可輕視的Java網(wǎng)絡(luò)編程》其中一篇擎淤,本文部分內(nèi)容來(lái)源于網(wǎng)絡(luò)隘庄,為了把本文主題講得清晰透徹,也整合了很多我認(rèn)為不錯(cuò)的技術(shù)博客內(nèi)容,引用其中了一些比較好的博客文章财忽,如有侵權(quán),請(qǐng)聯(lián)系作者泣侮。
該系列博文會(huì)告訴你如何從計(jì)算機(jī)網(wǎng)絡(luò)的基礎(chǔ)知識(shí)入手即彪,一步步地學(xué)習(xí)Java網(wǎng)絡(luò)基礎(chǔ),從socket到nio活尊、bio隶校、aio和netty等網(wǎng)絡(luò)編程知識(shí),并且進(jìn)行實(shí)戰(zhàn)蛹锰,網(wǎng)絡(luò)編程是每一個(gè)Java后端工程師必須要學(xué)習(xí)和理解的知識(shí)點(diǎn)深胳,進(jìn)一步來(lái)說(shuō),你還需要掌握Linux中的網(wǎng)絡(luò)編程原理铜犬,包括IO模型舞终、網(wǎng)絡(luò)編程框架netty的進(jìn)階原理,才能更完整地了解整個(gè)Java網(wǎng)絡(luò)編程的知識(shí)體系翎苫,形成自己的知識(shí)框架权埠。
為了更好地總結(jié)和檢驗(yàn)?zāi)愕膶W(xué)習(xí)成果,本系列文章也會(huì)提供部分知識(shí)點(diǎn)對(duì)應(yīng)的面試題以及參考答案煎谍。
如果對(duì)本系列文章有什么建議攘蔽,或者是有什么疑問(wèn)的話,也可以關(guān)注公眾號(hào)【Java技術(shù)江湖】聯(lián)系作者呐粘,歡迎你參與本系列博文的創(chuàng)作和修訂满俗。
當(dāng)前環(huán)境
- jdk == 1.8
代碼地址
git 地址:https://github.com/jasonGeng88/java-network-programming
知識(shí)點(diǎn)
- nio 下 I/O 阻塞與非阻塞實(shí)現(xiàn)
- SocketChannel 介紹
- I/O 多路復(fù)用的原理
- 事件選擇器與 SocketChannel 的關(guān)系
- 事件監(jiān)聽(tīng)類(lèi)型
- 字節(jié)緩沖 ByteBuffer 數(shù)據(jù)結(jié)構(gòu)
場(chǎng)景
接著上一篇中的站點(diǎn)訪問(wèn)問(wèn)題转捕,如果我們需要并發(fā)訪問(wèn)10個(gè)不同的網(wǎng)站,我們?cè)撊绾翁幚恚?/p>
在上一篇中唆垃,我們使用了java.net.socket
類(lèi)來(lái)實(shí)現(xiàn)了這樣的需求五芝,以一線程處理一連接的方式,并配以線程池的控制辕万,貌似得到了當(dāng)前的最優(yōu)解枢步。可是這里也存在一個(gè)問(wèn)題渐尿,連接處理是同步的醉途,也就是并發(fā)數(shù)量增大后,大量請(qǐng)求會(huì)在隊(duì)列中等待砖茸,或直接異常拋出隘擎。
為解決這問(wèn)題,我們發(fā)現(xiàn)元兇處在“一線程一請(qǐng)求”上凉夯,如果一個(gè)線程能同時(shí)處理多個(gè)請(qǐng)求货葬,那么在高并發(fā)下性能上會(huì)大大改善。這里就借住 JAVA 中的 nio 技術(shù)來(lái)實(shí)現(xiàn)這一模型劲够。
nio 的阻塞實(shí)現(xiàn)
關(guān)于什么是 nio震桶,從字面上理解為 New IO,就是為了彌補(bǔ)原本 I/O 上的不足征绎,而在 JDK 1.4 中引入的一種新的 I/O 實(shí)現(xiàn)方式尼夺。簡(jiǎn)單理解,就是它提供了 I/O 的阻塞與非阻塞的兩種實(shí)現(xiàn)方式(當(dāng)然炒瘸,默認(rèn)實(shí)現(xiàn)方式是阻塞的。)寝衫。
下面顷扩,我們先來(lái)看下 nio 以阻塞方式是如何處理的。
建立連接
有了上一篇 socket 的經(jīng)驗(yàn)慰毅,我們的第一步一定也是建立 socket 連接隘截。只不過(guò),這里不是采用 new socket()
的方式汹胃,而是引入了一個(gè)新的概念 SocketChannel
婶芭。它可以看作是 socket 的一個(gè)完善類(lèi),除了提供 Socket 的相關(guān)功能外着饥,還提供了許多其他特性犀农,如后面要講到的向選擇器注冊(cè)的功能。
建立連接代碼實(shí)現(xiàn):
<pre>// 初始化 socket宰掉,建立 socket 與 channel 的綁定關(guān)系
SocketChannel socketChannel = SocketChannel.open();
// 初始化遠(yuǎn)程連接地址
SocketAddress remote = new InetSocketAddress(this.host, port);
// I/O 處理設(shè)置阻塞呵哨,這也是默認(rèn)的方式赁濒,可不設(shè)置
socketChannel.configureBlocking(true);
// 建立連接
socketChannel.connect(remote);</pre>
獲取 socket 連接
因?yàn)槭峭瑯邮?I/O 阻塞的實(shí)現(xiàn),所以后面的關(guān)于 socket 輸入輸出流的處理孟害,和上一篇的基本相同拒炎。唯一差別是,這里需要通過(guò) channel 來(lái)獲取 socket 連接挨务。
- 獲取 socket 連接
<pre>Socket socket = socketChannel.socket();</pre>
- 處理輸入輸出流
<pre>PrintWriter pw = getWriter(socketChannel.socket());
BufferedReader br = getReader(socketChannel.socket());</pre>
完整示例
<pre>package com.jason.network.mode.nio;
import com.jason.network.constant.HttpConstant;
import com.jason.network.util.HttpUtil;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
public class NioBlockingHttpClient {
private SocketChannel socketChannel;
private String host;
public static void main(String[] args) throws IOException {
for (String host: HttpConstant.HOSTS) {
NioBlockingHttpClient client = new NioBlockingHttpClient(host, HttpConstant.PORT);
client.request();
}
}
public NioBlockingHttpClient(String host, int port) throws IOException {
this.host = host;
socketChannel = SocketChannel.open();
socketChannel.socket().setSoTimeout(5000);
SocketAddress remote = new InetSocketAddress(this.host, port);
this.socketChannel.connect(remote);
}
public void request() throws IOException {
PrintWriter pw = getWriter(socketChannel.socket());
BufferedReader br = getReader(socketChannel.socket());
pw.write(HttpUtil.compositeRequest(host));
pw.flush();
String msg;
while ((msg = br.readLine()) != null){
System.out.println(msg);
}
}
private PrintWriter getWriter(Socket socket) throws IOException {
OutputStream out = socket.getOutputStream();
return new PrintWriter(out);
}
private BufferedReader getReader(Socket socket) throws IOException {
InputStream in = socket.getInputStream();
return new BufferedReader(new InputStreamReader(in));
}
}</pre>
nio 的非阻塞實(shí)現(xiàn)
原理分析
nio 的阻塞實(shí)現(xiàn)击你,基本與使用原生的 socket 類(lèi)似,沒(méi)有什么特別大的差別谎柄。
下面我們來(lái)看看它真正強(qiáng)大的地方丁侄。到目前為止,我們將的都是阻塞 I/O谷誓。何為阻塞 I/O绒障,看下圖:
我們主要觀察圖中的前三種 I/O 模型,關(guān)于異步 I/O捍歪,一般需要依靠操作系統(tǒng)的支持户辱,這里不討論。
從圖中可以發(fā)現(xiàn)糙臼,阻塞過(guò)程主要發(fā)生在兩個(gè)階段上:
- 第一階段:等待數(shù)據(jù)就緒庐镐;
- 第二階段:將已就緒的數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶(hù)空間;
這里產(chǎn)生了一個(gè)從內(nèi)核到用戶(hù)空間的拷貝变逃,主要是為了系統(tǒng)的性能優(yōu)化考慮必逆。假設(shè),從網(wǎng)卡讀到的數(shù)據(jù)直接返回給用戶(hù)空間揽乱,那勢(shì)必會(huì)造成頻繁的系統(tǒng)中斷名眉,因?yàn)閺木W(wǎng)卡讀到的數(shù)據(jù)不一定是完整的,可能斷斷續(xù)續(xù)的過(guò)來(lái)凰棉。通過(guò)內(nèi)核緩沖區(qū)作為緩沖损拢,等待緩沖區(qū)有足夠的數(shù)據(jù),或者讀取完結(jié)后撒犀,進(jìn)行一次的系統(tǒng)中斷福压,將數(shù)據(jù)返回給用戶(hù),這樣就能避免頻繁的中斷產(chǎn)生或舞。
了解了 I/O 阻塞的兩個(gè)階段荆姆,下面我們進(jìn)入正題∮车剩看看一個(gè)線程是如何實(shí)現(xiàn)同時(shí)處理多個(gè) I/O 調(diào)用的胆筒。從上圖中的非阻塞 I/O 可以看出,僅僅只有第二階段需要阻塞魏宽,第一階段的數(shù)據(jù)等待過(guò)程腐泻,我們是不需要關(guān)心的决乎。不過(guò)該模型是頻繁地去檢查是否就緒,造成了 CPU 無(wú)效的處理派桩,反而效果不好构诚。如果有一種類(lèi)似的好萊塢原則— “不要給我們打電話,我們會(huì)打給你” 铆惑。這樣一個(gè)線程可以同時(shí)發(fā)起多個(gè) I/O 調(diào)用范嘱,并且不需要同步等待數(shù)據(jù)就緒。在數(shù)據(jù)就緒完成的時(shí)候员魏,會(huì)以事件的機(jī)制丑蛤,來(lái)通知我們。這樣不就實(shí)現(xiàn)了單線程同時(shí)處理多個(gè) IO 調(diào)用的問(wèn)題了嗎撕阎?即所說(shuō)的“I/O 多路復(fù)用模型”受裹。
廢話講了一大堆,下面就來(lái)實(shí)際操刀一下虏束。
創(chuàng)建選擇器
由上面分析可以棉饶,我們得有一個(gè)選擇器,它能監(jiān)聽(tīng)所有的 I/O 操作镇匀,并且以事件的方式通知我們哪些 I/O 已經(jīng)就緒了照藻。
代碼如下:
<pre>import java.nio.channels.Selector;
...
private static Selector selector;
static {
try {
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
</pre>
創(chuàng)建非阻塞 I/O
下面,我們來(lái)創(chuàng)建一個(gè)非阻塞的 SocketChannel
汗侵,代碼與阻塞實(shí)現(xiàn)類(lèi)型幸缕,唯一不同是socketChannel.configureBlocking(false)
。
注意:只有在socketChannel.configureBlocking(false)
之后的代碼晰韵,才是非阻塞的发乔,如果socketChannel.connect()
在設(shè)置非阻塞模式之前,那么連接操作依舊是阻塞調(diào)用的雪猪。
<pre>SocketChannel socketChannel = SocketChannel.open();
SocketAddress remote = new InetSocketAddress(host, port);
// 設(shè)置非阻塞模式
socketChannel.configureBlocking(false);
socketChannel.connect(remote);</pre>
建立選擇器與 socket 的關(guān)聯(lián)
選擇器與 socket 都創(chuàng)建好了列疗,下一步就是將兩者進(jìn)行關(guān)聯(lián),好讓選擇器和監(jiān)聽(tīng)到 Socket 的變化浪蹂。這里采用了以 SocketChannel
主動(dòng)注冊(cè)到選擇器的方式進(jìn)行關(guān)聯(lián)綁定,這也就解釋了告材,為什么不直接new Socket()
坤次,而是以SocketChannel
的方式來(lái)創(chuàng)建 socket。
代碼如下:
<pre>socketChannel.register(selector,
SelectionKey.OP_CONNECT
| SelectionKey.OP_READ
| SelectionKey.OP_WRITE);</pre>
上面代碼斥赋,我們將 socketChannel 注冊(cè)到了選擇器中缰猴,并且對(duì)它的連接、可讀疤剑、可寫(xiě)事件進(jìn)行了監(jiān)聽(tīng)滑绒。
具體的事件監(jiān)聽(tīng)類(lèi)型如下:
操作類(lèi)型 | 值 | 描述 | 所屬對(duì)象 |
---|---|---|---|
OP_READ | 1 << 0 | 讀操作 | SocketChannel |
OP_WRITE | 1 << 2 | 寫(xiě)操作 | SocketChannel |
OP_CONNECT | 1 << 3 | 連接socket操作 | SocketChannel |
OP_ACCEPT | 1 << 4 | 接受socket操作 | ServerSocketChannel |
選擇器監(jiān)聽(tīng) socket 變化
現(xiàn)在闷堡,選擇器已經(jīng)與我們關(guān)心的 socket 進(jìn)行了關(guān)聯(lián)。下面就是感知事件的變化疑故,然后調(diào)用相應(yīng)的處理機(jī)制杠览。
這里與 Linux 下的 selector 有點(diǎn)不同,nio 下的 selecotr 不會(huì)去遍歷所有關(guān)聯(lián)的 socket纵势。我們?cè)谧?cè)時(shí)設(shè)置了我們關(guān)心的事件類(lèi)型踱阿,每次從選擇器中獲取的,只會(huì)是那些符合事件類(lèi)型钦铁,并且完成就緒操作的 socket软舌,減少了大量無(wú)效的遍歷操作。
public void select() throws IOException {
// 獲取就緒的 socket 個(gè)數(shù)
while (selector.select() > 0){
// 獲取符合的 socket 在選擇器中對(duì)應(yīng)的事件句柄 key
Set keys = selector.selectedKeys();
// 遍歷所有的key
Iterator it = keys.iterator();
while (it.hasNext()){
// 獲取對(duì)應(yīng)的 key牛曹,并從已選擇的集合中移除
SelectionKey key = (SelectionKey)it.next();
it.remove();
if (key.isConnectable()){
// 進(jìn)行連接操作
connect(key);
}
else if (key.isWritable()){
// 進(jìn)行寫(xiě)操作
write(key);
}
else if (key.isReadable()){
// 進(jìn)行讀操作
receive(key);
}
}
}
}
注意:這里的selector.select()
是同步阻塞的佛点,等待有事件發(fā)生后,才會(huì)被喚醒黎比。這就防止了 CPU 空轉(zhuǎn)的產(chǎn)生超营。當(dāng)然,我們也可以給它設(shè)置超時(shí)時(shí)間焰手,selector.select(long timeout)
來(lái)結(jié)束阻塞過(guò)程糟描。
處理連接就緒事件
下面,我們分別來(lái)看下书妻,一個(gè) socket 是如何來(lái)處理連接船响、寫(xiě)入數(shù)據(jù)和讀取數(shù)據(jù)的(這些操作都是阻塞的過(guò)程,只是我們將等待就緒的過(guò)程變成了非阻塞的了)躲履。
處理連接代碼:
<pre>// SelectionKey 代表 SocketChannel 在選擇器中注冊(cè)的事件句柄
private void connect(SelectionKey key) throws IOException {
// 獲取事件句柄對(duì)應(yīng)的 SocketChannel
SocketChannel channel = (SocketChannel) key.channel();
// 真正的完成 socket 連接
channel.finishConnect();
// 打印連接信息
InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = remote.getHostName();
int port = remote.getPort();
System.out.println(String.format("訪問(wèn)地址: %s:%s 連接成功!", host, port));
}</pre>
處理寫(xiě)入就緒事件
<pre>// 字符集處理類(lèi)
private Charset charset = Charset.forName("utf8");
private void write(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = remote.getHostName();
// 獲取 HTTP 請(qǐng)求见间,同上一篇
String request = HttpUtil.compositeRequest(host);
// 向 SocketChannel 寫(xiě)入事件
channel.write(charset.encode(request));
// 修改 SocketChannel 所關(guān)心的事件
key.interestOps(SelectionKey.OP_READ);
}</pre>
這里有兩個(gè)地方需要注意:
- 第一個(gè)是使用
channel.write(charset.encode(request));
進(jìn)行數(shù)據(jù)寫(xiě)入。有人會(huì)說(shuō)工猜,為什么不能像上面同步阻塞那樣米诉,通過(guò)PrintWriter
包裝類(lèi)進(jìn)行操作。因?yàn)?code>PrintWriter的write()
方法是阻塞的篷帅,也就是說(shuō)要等數(shù)據(jù)真正從 socket 發(fā)送出去后才返回史侣。
這與我們這里所講的阻塞是不一致的,這里的操作雖然也是阻塞的魏身,但它發(fā)生的過(guò)程是在數(shù)據(jù)從用戶(hù)空間到內(nèi)核緩沖區(qū)拷貝過(guò)程惊橱。至于系統(tǒng)將緩沖區(qū)的數(shù)據(jù)通過(guò) socket 發(fā)送出去,這不在阻塞范圍內(nèi)箭昵。也解釋了為什么要用 Charset
對(duì)寫(xiě)入內(nèi)容進(jìn)行編碼了税朴,因?yàn)榫彌_區(qū)接收的格式是ByteBuffer
。
-
第二,選擇器用來(lái)監(jiān)聽(tīng)事件變化的兩個(gè)參數(shù)是
interestOps
與readyOps
正林。interestOps:表示
SocketChannel
所關(guān)心的事件類(lèi)型泡一,也就是告訴選擇器,當(dāng)有這幾種事件發(fā)生時(shí)觅廓,才來(lái)通知我鼻忠。這里通過(guò)key.interestOps(SelectionKey.OP_READ);
告訴選擇器,之后我只關(guān)心“讀就緒”事件哪亿,其他的不用通知我了粥烁。readyOps:表示
SocketChannel
當(dāng)前就緒的事件類(lèi)型。以key.isReadable()
為例蝇棉,判斷依據(jù)就是:return (readyOps() & OP_READ) != 0;
處理讀取就緒事件
<pre>private void receive(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
String receiveData = charset.decode(buffer).toString();
// 當(dāng)再?zèng)]有數(shù)據(jù)可讀時(shí)讨阻,取消在選擇器中的關(guān)聯(lián),并關(guān)閉 socket 連接
if ("".equals(receiveData)) {
key.cancel();
channel.close();
return;
}
System.out.println(receiveData);
}</pre>
這里的處理基本與寫(xiě)入一致篡殷,唯一要注意的是钝吮,這里我們需要自行處理去緩沖區(qū)讀取數(shù)據(jù)的操作。首先會(huì)分配一個(gè)固定大小的緩沖區(qū)板辽,然后從內(nèi)核緩沖區(qū)中奇瘦,拷貝數(shù)據(jù)至我們剛分配固定緩沖區(qū)上。這里存在兩種情況:
- 我們分配的緩沖區(qū)過(guò)大劲弦,那多余的部分以0補(bǔ)充(初始化時(shí)耳标,其實(shí)會(huì)自動(dòng)補(bǔ)0)。
- 我們分配的緩沖去過(guò)小邑跪,因?yàn)檫x擇器會(huì)不停的遍歷次坡。只要
SocketChannel
處理讀就緒狀態(tài),那下一次會(huì)繼續(xù)讀取画畅。當(dāng)然砸琅,分配過(guò)小,會(huì)增加遍歷次數(shù)轴踱。
最后症脂,將一下 ByteBuffer
的結(jié)構(gòu),它主要有 position, limit,capacity 以及 mark 屬性淫僻。以 buffer.flip();
為例诱篷,講下各屬性的作用(mark 主要是用來(lái)標(biāo)記之前 position 的位置,是在當(dāng)前 postion 無(wú)法滿(mǎn)足的情況下使用的雳灵,這里不作討論)兴蒸。
從圖中看出,
- 容量(capacity):表示緩沖區(qū)可以保存的數(shù)據(jù)容量细办;
- 極限(limit):表示緩沖區(qū)的當(dāng)前終點(diǎn),即寫(xiě)入、讀取都不可超過(guò)該重點(diǎn)笑撞;
- 位置(position):表示緩沖區(qū)下一個(gè)讀寫(xiě)單元的位置岛啸;
完整代碼
<pre>package com.jason.network.mode.nio;
import com.jason.network.constant.HttpConstant;
import com.jason.network.util.HttpUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
public class NioNonBlockingHttpClient {
private static Selector selector;
private Charset charset = Charset.forName("utf8");
static {
try {
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
NioNonBlockingHttpClient client = new NioNonBlockingHttpClient();
for (String host: HttpConstant.HOSTS) {
client.request(host, HttpConstant.PORT);
}
client.select();
}
public void request(String host, int port) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.socket().setSoTimeout(5000);
SocketAddress remote = new InetSocketAddress(host, port);
socketChannel.configureBlocking(false);
socketChannel.connect(remote);
socketChannel.register(selector,
SelectionKey.OP_CONNECT
| SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
public void select() throws IOException {
while (selector.select(500) > 0){
Set keys = selector.selectedKeys();
Iterator it = keys.iterator();
while (it.hasNext()){
SelectionKey key = (SelectionKey)it.next();
it.remove();
if (key.isConnectable()){
connect(key);
}
else if (key.isWritable()){
write(key);
}
else if (key.isReadable()){
receive(key);
}
}
}
}
private void connect(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
channel.finishConnect();
InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = remote.getHostName();
int port = remote.getPort();
System.out.println(String.format("訪問(wèn)地址: %s:%s 連接成功!", host, port));
}
private void write(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
String host = remote.getHostName();
String request = HttpUtil.compositeRequest(host);
System.out.println(request);
channel.write(charset.encode(request));
key.interestOps(SelectionKey.OP_READ);
}
private void receive(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.read(buffer);
buffer.flip();
String receiveData = charset.decode(buffer).toString();
if ("".equals(receiveData)) {
key.cancel();
channel.close();
return;
}
System.out.println(receiveData);
}
}
</pre>
示例效果
總結(jié)
本文從 nio 的阻塞方式講起,介紹了阻塞 I/O 與非阻塞 I/O 的區(qū)別茴肥,以及在 nio 下是如何一步步構(gòu)建一個(gè) IO 多路復(fù)用的模型的客戶(hù)端坚踩。文中需要理解的內(nèi)容比較多,如果有理解錯(cuò)誤的地方瓤狐,歡迎指正~
后續(xù)
- Netty 下的異步請(qǐng)求實(shí)現(xiàn)