1.Buffer
常用ByteBuffer
- 其中的三個(gè)屬性position,limit,capacity
- capacity 指的是容量,如capacity為1024的IntBuffer可以存放1024個(gè)int類型的值
- position默認(rèn)在位置0,代表下一個(gè) 要讀或?qū)懙奈恢?每次讀寫一個(gè)數(shù)據(jù),后移一位
- limit 寫模式下,代表最大寫入多少個(gè)數(shù)據(jù),此時(shí)和capacity相同,讀模式下,代表最多讀多少個(gè)數(shù)據(jù)(數(shù)據(jù)個(gè)數(shù))
- 一些常用方法
ByteBuffer byteBuf = ByteBuffer.allocate(1024);
public static ByteBuffer wrap(byte[] array) {
...
}
- 一些api
寫模式切換到讀模式的flip
public final Buffer flip() {
limit = position; // 將 limit 設(shè)置為實(shí)際寫入的數(shù)據(jù)數(shù)量
position = 0; // 重置 position 為 0
mark = -1; // mark 之后再說
return this;
}
讀完之后可能重新用buffer來寫,使用clear
重新設(shè)置
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
當(dāng)讀到第5個(gè)數(shù)據(jù)時(shí)先mark一下,讀到10時(shí)reset一下就可以實(shí)現(xiàn)重新從第5個(gè)數(shù)據(jù)讀mark reset
.就是將position的位置記錄一下,等會(huì)再修改回來.
public final Buffer mark() {
mark = position;
return this;
}
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
2.channel
- 和buffer的讀寫
將數(shù)據(jù)從buffer寫到channel里面channel.write(buffer)
- channel的實(shí)現(xiàn)類有
FileChannel SocketChannel ServerSocketChannel
- 使用channel實(shí)現(xiàn)文件拷貝
static void channel() throws IOException {
File file = new File("F:\\in.txt");
File file1 = new File("F:\\out.txt");
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
FileInputStream fileInputStream = new FileInputStream(file);
FileOutputStream outputStream = new FileOutputStream(file1);
FileChannel inChannel = fileInputStream.getChannel();
FileChannel outChannel = outputStream.getChannel();
int len = -1;
while ((len = inChannel.read(byteBuffer)) != -1) {
byteBuffer.flip();
outChannel.write(byteBuffer);
byteBuffer.clear();
}
fileInputStream.close();
outputStream.close();
}
SocketChannel和ServerSocketChannel
// 打開一個(gè)通道
SocketChannel socketChannel = SocketChannel.open();
// 發(fā)起連接
socketChannel.connect(new InetSocketAddress("https://www.javadoop.com", 80));
// 實(shí)例化
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 監(jiān)聽 8080 端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
while (true) {
// 一旦有一個(gè) TCP 連接進(jìn)來,就對(duì)應(yīng)創(chuàng)建一個(gè) SocketChannel 進(jìn)行處理
SocketChannel socketChannel = serverSocketChannel.accept();
}
3.Selector
可以監(jiān)聽多個(gè)Channel
可以監(jiān)聽channel的讀事件,寫事件,建立連接的事件
selectionKey = channel.register(selector,read)
將通道注冊(cè)到selector上,并監(jiān)聽可讀的請(qǐng)求.
之后可以通過Set<SelectionKey> selectedKeys = selector.selectedKeys();
獲取SelectionKey的集合,通過遍歷處理事件(如果是連接請(qǐng)求,創(chuàng)建一個(gè)新的SocketChannel并注冊(cè)到selector上,如果是可讀的事件,調(diào)用channel.read(buffer)
接收數(shù)據(jù))
舉例
public class SelectorServer {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.socket().bind(new InetSocketAddress(8080));
// 將其注冊(cè)到 Selector 中,監(jiān)聽 OP_ACCEPT 事件
server.configureBlocking(false);
server.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int readyChannels = selector.select();
if (readyChannels == 0) {
continue;
}
Set<SelectionKey> readyKeys = selector.selectedKeys();
// 遍歷
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
// 有已經(jīng)接受的新的到服務(wù)端的連接
SocketChannel socketChannel = server.accept();
// 有新的連接并不代表這個(gè)通道就有數(shù)據(jù)坞笙,
// 這里將這個(gè)新的 SocketChannel 注冊(cè)到 Selector鞋邑,監(jiān)聽 OP_READ 事件,等待數(shù)據(jù)
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 有數(shù)據(jù)可讀
// 上面一個(gè) if 分支中注冊(cè)了監(jiān)聽 OP_READ 事件的 SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int num = socketChannel.read(readBuffer);
if (num > 0) {
// 處理進(jìn)來的數(shù)據(jù)...
System.out.println("收到數(shù)據(jù):" + new String(readBuffer.array()).trim());
ByteBuffer buffer = ByteBuffer.wrap("返回給客戶端的數(shù)據(jù)...".getBytes());
socketChannel.write(buffer);
} else if (num == -1) {
// -1 代表連接已經(jīng)關(guān)閉
socketChannel.close();
}
}
}
}
}
}
4.NIO的Selector和linux的select poll epoll
Selector封裝了后面三種方式,對(duì)于linux來講,可能用的其中一個(gè)
5.select
一個(gè)進(jìn)程管理多個(gè)socket的連接
- 進(jìn)程A調(diào)用select進(jìn)入阻塞,即進(jìn)入每個(gè)socket的等待隊(duì)列.由于CPU并行執(zhí)行其他進(jìn)程,因此進(jìn)入阻塞的進(jìn)程不會(huì)占用CPU
- select調(diào)用時(shí)傳入需要管理的socket集合,將對(duì)應(yīng)的文件描述符由用戶進(jìn)程傳入內(nèi)核,并進(jìn)行檢查有沒有對(duì)應(yīng)的事件到達(dá),如果有則立即返回.
- 當(dāng)網(wǎng)絡(luò)數(shù)據(jù)到達(dá)時(shí),引發(fā)中斷,cpu調(diào)用中斷處理程序,將數(shù)據(jù)通過DMA 硬件等方式由網(wǎng)卡傳遞到socket的讀緩沖區(qū)
- 進(jìn)程A由于等待的事件到達(dá)被喚醒,從socket的等待隊(duì)列移動(dòng)到進(jìn)程運(yùn)行的隊(duì)列
- 即select函數(shù)返回值大于0,表示有多少個(gè)等待的事件到達(dá),這時(shí)需要遍歷所有socket,看看是哪個(gè)socket的事件發(fā)生了
- 處理事件
epoll
epoll_create創(chuàng)建對(duì)象
epoll_ctl將管理的socket加入到epoll對(duì)象當(dāng)中
epoll_wait等待數(shù)據(jù)到達(dá) - 首先調(diào)用epoll_create創(chuàng)建eventpoll對(duì)象
- 調(diào)用epoll_ctl會(huì)將eventpoll對(duì)象加入到需要管理的socket的等待隊(duì)列當(dāng)中
- 當(dāng)數(shù)據(jù)到達(dá)網(wǎng)卡,觸發(fā)中斷信號(hào),中斷程序?qū)?shù)據(jù)由網(wǎng)卡通過DMA 硬件等方式拷貝到內(nèi)存,還會(huì)操作socket的等待隊(duì)列中的eventpoll對(duì)象,其維護(hù)了一個(gè)就緒列表,存放有事件發(fā)送的socket
- 調(diào)用epol_wait時(shí),如果eventpoll對(duì)象里面的就緒列表中已經(jīng)有引用的socket,就會(huì)返回,如果沒有會(huì)阻塞進(jìn)程.
- eventpoll對(duì)象維護(hù)監(jiān)視的socket集合是通過紅黑樹來維護(hù),而就緒列表是通過雙向鏈表來維護(hù).