java nio
- Java的IO體系:
- 舊IO
- 新IO:nio蛮寂,用ByteBuffer和FileChannel讀寫
- nio通道管理:Selector
- Okio:io的封裝盾戴,好像不關(guān)nio的事
- Netty:目的是快速的實現(xiàn)任何協(xié)議的server和client端
- 所以說你可以用netty通過channel等實現(xiàn)一個httpclient油吭,和URLConnection平級
- 這個課題太大了勾扭,應(yīng)該分層次學(xué):
- 第一層是官方的文檔矫钓,寫幾個helloworld
- 第二層就是官方的example浮禾,研究server和client
- 第三層是權(quán)威指南,研究TCP份汗,UDP等的常見問題盈电,谷歌的protobuf,自己實現(xiàn)http服務(wù)器等
- github:https://github.com/netty/netty
- 其他:
- Gzip
- 大文件讀寫杯活,如2G
- 文件鎖
- 主題:
- 通道和緩沖器:提高讀寫速度匆帚,Channel,ByteBuffer旁钧,速度怎么提高的
- ByteBuffer的操作是很底層的吸重,底層就快互拾,底層怎么就快
- ByteBuffer傾向于大塊的操作字節(jié)流,大塊就快
- 異步IO:提高線程的利用率嚎幸,增加系統(tǒng)吞吐量颜矿,selector,key等嫉晶,但以犧牲實時性為代價(折衷是永恒不變的主題)
- channel管理:向Selector注冊Channel骑疆,然后調(diào)用它的select()方法。這個方法會一直阻塞到某個注冊的通道有事件就緒
- Selector允許單線程處理多個 Channel替废。如果你的應(yīng)用打開了多個連接(通道)箍铭,但每個連接的流量都很低,使用Selector就會很方便椎镣。例如诈火,在一個聊天服務(wù)器中
- 怎么就犧牲實時性了,一組IO状答,輪詢看有沒有可讀信息冷守,所以一個IO來消息了,不會立刻就輪詢到它
- 所以負(fù)責(zé)輪詢IO的線程惊科,讀到消息就得立刻分發(fā)出去教沾,盡量不能有耗時操作
- 特別注意:
- Channel和Selector配合時,必須channel.configureBlocking(false)切換到非阻塞模式
- 而FileChannel沒有非阻塞模式译断,只有Socket相關(guān)的Channel才有
- channel管理:向Selector注冊Channel骑疆,然后調(diào)用它的select()方法。這個方法會一直阻塞到某個注冊的通道有事件就緒
- 通道和緩沖器:提高讀寫速度匆帚,Channel,ByteBuffer旁钧,速度怎么提高的
1 通道和緩沖器
1.1 簡介
java.nio.*包的引入是為了提高速度,并且舊的IO包已經(jīng)用nio重新實現(xiàn)過或悲,所以即使你不用nio孙咪,也已經(jīng)收益了
下面的格式可能比較亂,需要配合GetChannel例子來理解
- 如何理解:
- 你要讀數(shù)據(jù)的地方巡语,可能說的是IO流翎蹈,可以看做一個煤礦,煤就是數(shù)據(jù)
- 通道:Channel男公,包含煤層的礦藏荤堪,就是一個煤礦里有煤的地方
- 緩沖器:ByteBuffer,可以看做運煤的車枢赔,注意這里裝車和卸車也是有意義的動作
- 我們能做的就是派運煤的車去通道澄阳,也就是將緩沖器ByteBuffer和Channel連起來,往里送煤(寫)踏拜,往外運煤(讀)
- -------緩一緩-------
- ByteBuffer是很底層的類碎赢,直接存儲未加工的字節(jié)
- 初始化:
- 要寫數(shù)據(jù)時,已經(jīng)有數(shù)據(jù)了速梗,所以可以得到byte[]
- ByteBuffer.wrap(byte[]) //相當(dāng)于wrap(array, 0, array.length);
- ByteBuffer.wrap(byte[], offset, length) //offset + length不能超出byte數(shù)組的長度
- 要讀數(shù)據(jù)時肮塞,最多只能拿到ByteBuffer可能需要大小
- ByteBuffer buff = ByteBuffer.allocate((int) fc.size());
- 接口:byte的讀寫襟齿,不支持對象,連String也不支持
- 將數(shù)據(jù)放到ByteBuffer里:裝車
- 上面的wrap方法
- 一系列put方法枕赵,只支持基本類型
- 將數(shù)據(jù)從ByteBuffer中轉(zhuǎn)出來:卸車
- 一系列g(shù)et方法猜欺,只支持基本類型,注意flip
- String str = new String(buff.array(), "utf-8")拷窜,buff.array()开皿,跟ByteBuffer指針無關(guān)
- ByteBuffer內(nèi)部指針:
- ByteBuffer里有個指針
- fc.read(buff)會從往ByteBuffer里寫(裝車),從哪兒寫装黑,總有個起始位置副瀑,就是ByteBuffer指針的位置
- 寫完,指針直到最后恋谭,也就是第一個空白可寫區(qū)域
- 讀取里面的信息(卸車)糠睡,就需要回到起始位置
- flip一下
- positon操作可以跳到任意位置
- 初始化:
- FileChannel:FileInputStream, FileOutputStream, RandomAccessFile這三個舊類被修改了,以支持channel
- Reader和Writer不直接支持Channel疚颊,但Channel里提供了便利方法來支持他們
- 獲得FileChannel:
- FileChannel fc = new FileOutputStream("data.txt").getChannel(); //寫
- FileChannel fc = new FileInputStream("cl1024.json").getChannel(); //讀
- FileChannel fc = new RandomAccessFile("data.txt", "rw").getChannel(); //可讀可寫
- 移動文件指針:append寫時狈孔,斷點續(xù)傳時能用
- fc.position(fc.size()); // Move to the end
- 寫,將一個ByteBuffer寫到Channel里:
- fc.write(ByteBuffer.wrap("Some text ".getBytes()));
- 讀材义,將一個channel里的內(nèi)容均抽,讀到ByteBuffer里,讀多少其掂,由ByteBuffer的長度決定
- fc.read(buff);
- buff.flip(); 讀出來的ByteBuffer一般需要再次解析出來油挥,通過getInt,getFloat等操作,讀寫切換時款熬,需要flip一下
- flip怎么理解:fc.read(buff)深寥,ByteBuffer里有個指針
- fc.read(buff)會從往ByteBuffer里寫,從哪兒寫贤牛,總有個起始位置惋鹅,就是ByteBuffer指針的位置
- 寫完,指針直到最后殉簸,也就是第一個空白可寫區(qū)域
- 所以現(xiàn)在就好理解了闰集,讀完文件,也就是往ByteBuffer寫完般卑,指針指向ByteBuffer最后武鲁,你再讀取里面的信息,就需要回到起始位置
- 總結(jié):
- FileInputStream蝠检,F(xiàn)ileOutputStream洞坑,這相當(dāng)于煤礦
- 以前你直接操作stream的read,write蝇率,參數(shù)是byte[]
- read迟杂,write直接操作煤礦
- 直接通過byte[]讀寫刽沾,相當(dāng)于用鐵锨鏟煤
- 在new io里,你不能直接操作煤礦了排拷,而是獲取一個通道:FileChannel
- 通過channel的read侧漓,write來操作數(shù)據(jù),position监氢,seek等布蔗,就是移動指針(文件指針)
- read,write的參數(shù)是ByteBuffer
- 通過ByteBuffer來包裝數(shù)據(jù)浪腐,相當(dāng)于用車?yán)?/li>
- 由于把byte[]用ByteBuffer包裝起來纵揍,又面臨一個裝車和卸車的問題
- 裝車:寫文件(wrap, put等方法),讀文件(channel.read(buff))
- 卸車:讀文件(get各種基本類型)议街,寫文件(channel.write(buff))
- 全車操作:array
- 注意flip的問題泽谨,讀寫切換時,需要flip一下特漩,而且這還不確定就是指針操作
- 注意rewind的問題吧雹,讀著讀著,想回頭從頭再讀涂身,就得rewind雄卷,這個肯定是指針操作
- buff.hasRemaining(),指針是否到頭了
- 可以看出蛤售,Channel和ByteBuffer提供的接口都比較低級丁鹉,直接和操作系統(tǒng)契合,說是這就是快的原因
- FileInputStream蝠检,F(xiàn)ileOutputStream洞坑,這相當(dāng)于煤礦
- 關(guān)于Channel:
- FileChannel
- DatagramChannel:通過UDP讀寫網(wǎng)絡(luò)悴能,無連接的
- SocketChannel:通過TCP讀寫網(wǎng)絡(luò)
- ServerSocketChannel:監(jiān)聽新來的TCP連接揣钦,每個新進(jìn)來的連接都會創(chuàng)建一個SocketChannel
例子,代碼比較短搜骡,直接貼過來
package com.cowthan.nio;
import java.nio.*;
import java.nio.channels.*;
import java.io.*;
public class GetChannel {
private static final int BSIZE = 1024;
public static void main(String[] args) throws Exception {
// 寫文件
FileChannel fc = new FileOutputStream("data.txt").getChannel();
fc.write(ByteBuffer.wrap("Some text ".getBytes())); //
fc.close();
// 寫文件:append
fc = new RandomAccessFile("data.txt", "rw").getChannel();
fc.position(fc.size()); // Move to the end
fc.write(ByteBuffer.wrap("Some more".getBytes()));
fc.close();
// 讀文件
fc = new FileInputStream("data.txt").getChannel();
ByteBuffer buff = ByteBuffer.allocate((int) fc.size());
fc.read(buff);
buff.flip();
System.out.println("讀取:");
String str = new String(buff.array(), "utf-8");
System.out.println(str);
System.out.println("讀取2:");
while (buff.hasRemaining()){
System.out.print((char) buff.get());
}
}
} /*
* Output: Some text Some more
*/// :~
1.2 更多:flip, clear佑女,compact和mark记靡,reset操作
- flip,clear团驱,compact和mark摸吠,reset
- 這里說的讀寫都是相對于ByteBuffer
- 由寫轉(zhuǎn)讀:flip
- 由寫轉(zhuǎn)讀:clear清空緩沖區(qū),compact清空緩沖區(qū)的已讀數(shù)據(jù)(結(jié)果就是再裝車嚎花,就是從未讀數(shù)據(jù)后面開始)
- 隨機讀寫:mark和reset寸痢,如果要一會寫一會讀,mark會記錄當(dāng)前position紊选,position就是讀寫的起點啼止,reset會回滾
- ByteBuffer.allocate(len)的大小問題道逗,大塊的移動數(shù)據(jù)是快的關(guān)鍵,所以長度很重要献烦,但沒啥標(biāo)準(zhǔn)滓窍,根據(jù)情況定吧,1024(1K)小了
- ByteBuffer.wrap(byte[])巩那,不會再復(fù)制數(shù)組吏夯,而是直接以參數(shù)為底層數(shù)組,快
- 復(fù)制文件時即横,一個ByteBuffer對象會不斷從src的channel來read噪生,并寫入dest的channel,注意:
- src.read(buff); buff.flip(); dest.write(buff); buff.clear()
- ByteBuffer必須clear了东囚,才能重新從Channel讀
- ByteBuffer.flip(), clear()比較拙劣跺嗽,但這正是為了最大速度付出的代價
///復(fù)制文件的部分代碼(更優(yōu)化的復(fù)制文件是用transfer接口,直接通道相連)
ByteBuffer buff = ByteBuffer.allocate(1024); //1K
while(src.read(buff) != -1){
buff.flip(); //準(zhǔn)備卸車
dest.write(buff); //卸車了
buff.clear(); //其實這才是真正的卸車舔庶,并送回通道那頭(可以再次read(buff)了)
}
緩沖器細(xì)節(jié):四大索引
看圖:
- 四大索引:
- mark:標(biāo)記抛蚁,mark方法記錄當(dāng)前位置,reset方法回滾到上次mark的位置
- position:位置惕橙,當(dāng)前位置瞧甩,讀和寫都是在這個位置操作,并且會影響這個位置弥鹦,position方法可以seek
- limit:界限肚逸,
- 作為讀的界限時:指到buffer當(dāng)前被填入了多少數(shù)據(jù),get方法以此為界限彬坏,
- flip一下朦促,limit才有值,指向postion栓始,才能有個讀的界限
- 作為寫的界限時:
- allocate或者clear時务冕,直接可寫,limit指向capacity幻赚,表示最多寫到這
- wrap時禀忆,直接可讀,所以position是0落恼,limit是指到之后箩退,capacity也是指到最后,直接進(jìn)入可讀狀態(tài)
- 作為讀的界限時:指到buffer當(dāng)前被填入了多少數(shù)據(jù),get方法以此為界限彬坏,
- capacity:容量佳谦,指到buffer的最后戴涝,這不是字節(jié)數(shù),而是能寫入的個數(shù),對于ByteBuffer啥刻,就是byte個數(shù)奸鸯,對于IntBuffer,就是int個數(shù)
- allocate方法的參數(shù)就是capacity
- 所以,可以推斷一下,ByteBuffer.capacity = 5時窗声,如果轉(zhuǎn)成IntBuffer,capacity是1钝满,不會指向最后,而是留出了最后一個字節(jié)申窘,被忽略了弯蚜,沒法通過Int讀寫
- allocate方法的參數(shù)就是capacity
對應(yīng)的方法:
public final Buffer flip() {
limit = position;
position = 0;
mark = UNSET_MARK;
return this;
}
public final Buffer rewind() {
position = 0;
mark = UNSET_MARK;
return this;
}
public final boolean hasRemaining() {
return position < limit;
}
public final Buffer clear() {
position = 0;
mark = UNSET_MARK;
limit = capacity;
return this;
}
public final Buffer mark() {
mark = position;
return this;
}
public final Buffer reset() {
if (mark == UNSET_MARK) {
throw new InvalidMarkException("Mark not set");
}
position = mark;
return this;
}
例子:交換相鄰的兩個字符
/**
* 給一個字符串,交換相鄰的兩個字符
*/
private static void symmetricScramble(CharBuffer buffer) {
while (buffer.hasRemaining()) {
buffer.mark();
char c1 = buffer.get();
char c2 = buffer.get();
buffer.reset();
buffer.put(c2).put(c1);
}
}
/*
思考:如果沒有mark和reset功能剃法,你怎么做碎捺?用postion方法記錄和恢復(fù)剛才位置
*/
private static void symmetricScramble2(CharBuffer buffer) {
while (buffer.hasRemaining()) {
int position = buffer.position();
char c1 = buffer.get();
char c2 = buffer.get();
buffer.position(position);
buffer.put(c2).put(c1);
}
}
- 總結(jié):
- flip:一般用于由寫轉(zhuǎn)讀,flip之后可以:
- 讀:是從頭讀贷洲,能讀到剛才寫的長度
- 寫:是從頭寫收厨,會覆蓋剛才寫入的內(nèi)容
- clear:一般用于讀轉(zhuǎn)寫,clear之后可以:
- 讀:但是讀不到什么了
- 寫:是從頭寫
- mark和reset:一般用于讀寫交替
- mark:相當(dāng)于int postion = buffer.postion()优构,記下當(dāng)前位置
- reset:相當(dāng)于buffer.postion(position)诵叁,回到剛才記錄的位置
- flip:一般用于由寫轉(zhuǎn)讀,flip之后可以:
1.3 連接通道
上面說過,nio通過大塊數(shù)據(jù)的移動來加快讀寫速度钦椭,前面這個大小都由ByteBuffer來控制拧额,
其實還有方法可以直接將讀寫兩個Channel相連
這也是實現(xiàn)文件復(fù)制的更好的方法
public class TransferTo {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("arguments: sourcefile destfile");
System.exit(1);
}
FileChannel in = new FileInputStream(args[0]).getChannel(), out = new FileOutputStream(
args[1]).getChannel();
in.transferTo(0, in.size(), out);
// 或者:
// out.transferFrom(in, 0, in.size());
}
} // /:~
1.4 字符流:CharBuffer和Charset,其實就是byte[]和編碼問題
ByteBuffer是最原始的彪腔,其實就是字節(jié)流侥锦,適用于二進(jìn)制數(shù)據(jù)的讀寫,圖片文件等
但我們更常用的德挣,其實是字符串
-
字符串涉及到的類:
- CharBuffer:注意恭垦,Channel是直接和ByteBuffer交流,所以CharBuffer只能算是上層封裝
- Charset:編碼相關(guān)格嗅,字節(jié)流到字符串番挺,肯定會有編碼相關(guān)的問題
- CharBuffer.toString():得到字符串
-
怎么得到CharBuffer
- 方法1:ByteBuffer.asCharBuffer(),局限在于使用系統(tǒng)默認(rèn)編碼
- 方法2:Charset.forName("utf-8").decode(buff)吗浩,相當(dāng)于new String(buff.array(), "utf-8")的高級版
- 相對的建芙,Charset.forName("utf-8").encode(cbuff)没隘,返回個ByteBuffer懂扼,就相當(dāng)于String.getBytes("utf-8)
-
CharBuffer讀寫
- put(String):寫
- toString():讀,就拿到了字符串
====先休息一下,說說怎么得到編碼相關(guān)的一些信息吧====
//打印系統(tǒng)支持的所有編碼阀湿,及其別名
import java.nio.charset.*;
import java.util.*;
public class AvailableCharSets {
public static void main(String[] args) {
SortedMap<String, Charset> charSets = Charset.availableCharsets();
Iterator<String> it = charSets.keySet().iterator();
while (it.hasNext()) {
String csName = it.next();
System.out.print(csName);
Iterator aliases = charSets.get(csName).aliases().iterator();
if (aliases.hasNext())
System.out.print(": ");
while (aliases.hasNext()) {
System.out.print(aliases.next());
if (aliases.hasNext())
System.out.print(", ");
}
System.out.println();
}
}
}
/*
部分輸出:
KOI8-U: koi8_u
Shift_JIS: shift_jis, x-sjis, sjis, shift-jis, ms_kanji, csShiftJIS
TIS-620: tis620, tis620.2533
US-ASCII: ANSI_X3.4-1968, cp367, csASCII, iso-ir-6, ASCII, iso_646.irv:1983, ANSI_X3.4-1986, ascii7, default, ISO_646.irv:1991, ISO646-US, IBM367, 646, us
UTF-16: UTF_16, unicode, utf16, UnicodeBig
UTF-16BE: X-UTF-16BE, UTF_16BE, ISO-10646-UCS-2, UnicodeBigUnmarked
UTF-16LE: UnicodeLittleUnmarked, UTF_16LE, X-UTF-16LE
UTF-32: UTF_32, UTF32
UTF-32BE: X-UTF-32BE, UTF_32BE
UTF-32LE: X-UTF-32LE, UTF_32LE
UTF-8: unicode-1-1-utf-8, UTF8
windows-1250: cp1250, cp5346
windows-1251: cp5347, ansi-1251, cp1251
windows-1252: cp5348, cp1252
windows-1253: cp1253, cp5349
*/
=====ByteBuffer.asCharBuffer()的局限:沒指定編碼赶熟,容易亂碼=====
- 這個一般情況下不能用,為何:
- asCharBuffer()會把ByteBuffer轉(zhuǎn)為CharBuffer陷嘴,但用的是系統(tǒng)默認(rèn)編碼
1.5 視圖緩沖器:ShortBuffer映砖,IntBuffer, LongBuffer,F(xiàn)loatBuffer灾挨,DoubleBuffer邑退,CharBuffer
- Buffer類型:
- ByteBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
- CharBuffer 字符串的緩沖區(qū)
- MappedByteBuffer 大文件的緩沖區(qū)
ByteBuffer系列的類繼承關(guān)系挺有意思,可以研究研究
ByteArrayBuffer是其最通用子類劳澄,一般操作的都是ByteArrayBuffer
ByteBuffer.asLongBuffer(), asIntBuffer(), asDoubleBuffer()等一系列
- 不多說:
- ByteBuffer底層是一個byte[]地技,get()方法返回一個byte,1字節(jié)秒拔,8bit莫矗,10字節(jié)可以get幾次?10次
- ByteBuffer.asIntBuffer()得到IntBuffer砂缩,底層是一個int[]作谚,get()方法返回一個int,還是10字節(jié)庵芭,可以get幾次妹懒?
- 同理,還有ShortBuffer, LongBuffer, FloatBuffer, DoubleBuffer喳挑,這些就是ByteBuffer的一個視圖彬伦,所以叫視圖緩沖器
- asIntBuffer時,如果ByteBuffer本身有5個byte伊诵,則其中前4個會變成IntBuffer的第0個元素单绑,第5個被忽略了,但并未被丟棄
- 往新的IntBuffer放數(shù)據(jù)(put(int))曹宴,默認(rèn)時會從頭開始寫搂橙,寫入的數(shù)據(jù)會反映到原來的ByteBuffer上
- 總結(jié):
具體也說不明白了,其實就是你有什么類型的數(shù)據(jù)笛坦,就用什么類型的Buffer
- 但直接往通道讀寫的区转,肯定是ByteBuffer,所以首先得有個ByteBuffer版扩,其他視圖Buffer废离,就得從ByteBuffer來
- 怎么從ByteBuffer來呢,ByteBuffer.asIntBuffer()等方法
例子:ViewBuffers.java
1.6 字節(jié)序
- 簡介:
- 高位優(yōu)先礁芦,Big Endian蜻韭,最重要的字節(jié)放地址最低的存儲單元悼尾,ByteBuffer默認(rèn)以高位優(yōu)先,網(wǎng)絡(luò)傳輸大部分也以高位優(yōu)先
- 低位優(yōu)先肖方,Little Endian
- ByteBuffer.order()方法切換字節(jié)序
- ByteOrderr.BIG_ENDIAN
- ByteOrderr.LITTLE_ENDIAN
- 對于00000000 01100001闺魏,按short來讀,如果是big endian俯画,就是97析桥, 以little endian,就是24832
1.7 Scatter/Gather
一個Channel艰垂,多個Buffer泡仗,相當(dāng)于多個運煤車在一個通道工作
讀到多個Buffer里:
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.read(bufferArray);
多個Buffer往channel寫:
//注意,Buffer的長度是100猜憎,但只有50個數(shù)據(jù)沮焕,就只會寫入50,換句話說拉宗,只有position和limit之間的內(nèi)容會被寫入(put完先flip一下峦树,才能往channel寫?旦事?魁巩?)
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] bufferArray = { header, body };
channel.write(bufferArray);
1.8 內(nèi)存映射文件:大文件的讀寫
大文件,如2G的文件姐浮,沒法一下加載到內(nèi)存中讀寫
MappedByteBuffer提供了一個映射功能谷遂,可以將文件部分載入到內(nèi)存中,但你使用時卖鲤,
感覺文件都在內(nèi)存中了
MappedByteBuffer繼承了ByteBuffer肾扰,所以可以像上面那樣使用
MappedByteBuffer性能很高,遠(yuǎn)高于FileInputStream蛋逾,F(xiàn)ileOutputStream集晚,RandomAccessFile的原始方式的讀寫,百倍速度
public static void main(String[] args) throws Exception {
//創(chuàng)建個文件区匣,大小是128M
MappedByteBuffer out = new RandomAccessFile("test.dat", "rw")
.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, length);
//寫入
for (int i = 0; i < length; i++)
out.put((byte) 'x');
System.out.println("寫入完畢");
//讀取
for (int i = length / 2; i < length / 2 + 6; i++)
System.out.println((char) out.get(i));
}
1.9 文件加鎖
- 簡介
- 有時我們需要對文件加鎖偷拔,以同步訪問某個文件
- FileLock是使用了操作系統(tǒng)提供的文件加鎖功能,所以可以影響到其他系統(tǒng)進(jìn)程亏钩,其他普通進(jìn)程莲绰,即使不是java寫的
- FileLock.lock()會阻塞,tryLock不會阻塞
- lock系列方法可以帶參數(shù):
- 加鎖文件的某一部分姑丑,多個進(jìn)程可以分別加鎖文件的一部分蛤签,數(shù)據(jù)庫就是這樣
- 參數(shù)3可以決定是否共享鎖,這里又出現(xiàn)個共享鎖和獨占鎖栅哀,共享鎖需要操作系統(tǒng)支持
用法:
public static void main(String[] args) throws Exception {
FileOutputStream fos = new FileOutputStream("file.txt");
FileLock fl = fos.getChannel().tryLock();//---------
if (fl != null) {
System.out.println("Locked File");
TimeUnit.MILLISECONDS.sleep(100);
fl.release();//---------------------------------
System.out.println("Released Lock");
}
fos.close();
}
更多例子
package com.cowthan.nio;
//: io/LockingMappedFiles.java
// Locking portions of a mapped file.
// {RunByHand}
import java.nio.*;
import java.nio.channels.*;
import java.io.*;
public class LockingMappedFiles {
static final int LENGTH = 0x8FFFFFF; // 128 MB
static FileChannel fc;
public static void main(String[] args) throws Exception {
fc = new RandomAccessFile("test.dat", "rw").getChannel();
MappedByteBuffer out = fc
.map(FileChannel.MapMode.READ_WRITE, 0, LENGTH);
for (int i = 0; i < LENGTH; i++)
out.put((byte) 'x');
new LockAndModify(out, 0, 0 + LENGTH / 3);
new LockAndModify(out, LENGTH / 2, LENGTH / 2 + LENGTH / 4);
}
private static class LockAndModify extends Thread {
private ByteBuffer buff;
private int start, end;
LockAndModify(ByteBuffer mbb, int start, int end) {
this.start = start;
this.end = end;
mbb.limit(end);
mbb.position(start);
buff = mbb.slice();
start();
}
public void run() {
try {
// Exclusive lock with no overlap:
FileLock fl = fc.lock(start, end, false);
System.out.println("Locked: " + start + " to " + end);
// Perform modification:
while (buff.position() < buff.limit() - 1)
buff.put((byte) (buff.get() + 1));
fl.release();
System.out.println("Released: " + start + " to " + end);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
} // /:~
2 異步IO
-
關(guān)于Channel:
- FileChannel:永遠(yuǎn)都是阻塞模式震肮,當(dāng)然讀本地文件也不會阻塞多久踏枣,沒法和Selector配合
- DatagramChannel:通過UDP讀寫網(wǎng)絡(luò),無連接的
- SocketChannel:通過TCP讀寫網(wǎng)絡(luò)
- ServerSocketChannel:監(jiān)聽新來的TCP連接钙蒙,每個新進(jìn)來的連接都會創(chuàng)建一個SocketChannel
-
簡介:
- Selector提供了一個線程管理多個Channel的功能,與之相比间驮,舊的Socket處理方式是每個Socket連接都在一個線程上阻塞
- Channel和Selector配合時躬厌,必須channel.configureBlocking(false)切換到非阻塞模式
- 而FileChannel沒有非阻塞模式,只有Socket相關(guān)的Channel才有
- 概括:
- SocketServerChannel和SocketChannel的基本用法竞帽,參考socket.nio.NioXXServer和Client
- 可能會阻塞扛施,可以通過channel.configureBlocking(false)設(shè)置非阻塞的地方:
- SocketChannel.connect(new InetSocketAddress(hostname, port)), 配合sc.finishConnect()判斷是否連接成功
- SocketChannel sc = ssc.accept()屹篓,在非阻塞模式下疙渣,無新連接進(jìn)來時返回值會是null
2.1 舊IO處理Socket的方式
要讀取Socket上的Stream,就得在read時阻塞堆巧,所以每一個Socket都得一個線程管理妄荔,對于服務(wù)器來說,能開的線程數(shù)是有限的
2.2 不使用Selector谍肤,自己想法管理SocketChannel
@Override
public void run() {
while(!isClosed && !Thread.interrupted()){
for(String key: map.keySet()){
SocketChannel sc = map.get(key);
ByteBuffer buf = ByteBuffer.allocate(1024);
try {
int bytesRead = sc.read(buf);
buf.flip();
if(bytesRead <= 0){
}else{
System.out.println("收到消息(來自" + key + "):" + Charset.forName("utf-8").decode(buf));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
-
弊端分析:
- 不斷循環(huán)讀取所有Channel啦租,有數(shù)據(jù)則讀出來
- 問題1:在while里,你sleep還是不sleep荒揣,sleep就損失太多實時性篷角,不sleep就導(dǎo)致CPU大量空轉(zhuǎn)
- 問題2:對于ServerSocketChannel,如果accept非阻塞系任,則需要while(true)不斷判斷是否有新連接恳蹲,也浪費CPU
- 問題3:對于ServerSocket.connect(),如果非阻塞俩滥,則需要while(true)不斷判斷是否連接服務(wù)器成功嘉蕾,也浪費CPU
-
所以現(xiàn)在我們知道需要什么了
- 需要SocketChannel的read方法不阻塞
- 或者需要一個東西,可以在所有SocketChannel上等待霜旧,任何一個有了消息荆针,就可以喚醒,這里面就有個監(jiān)聽的概念
- 并且可讀颁糟,可寫航背,accept(), connect()都應(yīng)該對應(yīng)不同的事件
- 這就引出了Selector,Selector就是java從語言層面和系統(tǒng)層面對這個問題的解決方案
2.3 Selector
使用Selector的完整示例:
Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
int readyChannels = selector.select(); //就在這阻塞棱貌,但已經(jīng)實現(xiàn)了一個線程管理多個Channel(SocketChannel-讀寫玖媚,connect事件,DatagramChannel-讀寫事件婚脱,SocketServerChannel-accept事件)
if(readyChannels == 0) continue;
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
}
Selector selector = Selector.open();
SelectionKey selectionKey = sc.register(selector, SelectionKey.OP_READ);
//看Selector對哪些事件感興趣
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ) == SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE;
//通道中已經(jīng)就緒的集合今魔,每一次selection都得先訪問這個勺像,知道是因為哪些事件被喚醒的
int readySet = selectionKey.readyOps();
//或者:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
//拿到Channel和Selector
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
//對應(yīng)關(guān)系是:1個Selector,多個Channel错森,多個SelectionKey吟宦,一個Channel對應(yīng)一個SelectionKey,而且一個SelectionKey可以添加一個extra數(shù)據(jù)涩维,以滿足特定需求
//select方法:這才是會阻塞的地方殃姓,注意,在這里阻塞瓦阐,是性能最佳的表現(xiàn)
int readyCount = selector.select() //select()阻塞到至少有一個通道在你注冊的事件上就緒了
int readyCount = selector.select(long timeout) //最長會阻塞timeout毫秒(參數(shù))
int readyCount = selector.selectNow() //不會阻塞蜗侈,無則0
//返回值:有幾個通道就緒
/*
select()方法返回的int值表示有多少通道已經(jīng)就緒。亦即睡蟋,自上次調(diào)用select()方法后有多少通
道變成就緒狀態(tài)踏幻。如果調(diào)用select()方法,因為有一個通道變成就緒狀態(tài)戳杀,返回了1该面,若再次調(diào)用select()方法,
如果另一個通道就緒了信卡,它會再次返回1吆倦。如果對第一個就緒的channel沒有做任何操作,現(xiàn)在就有兩個就緒的通
道坐求,但在每次select()方法調(diào)用之間蚕泽,只有一個通道就緒了
*/
//有通道就緒了桥嗤,就得得到這個Channel须妻,通道存在SelectionKey里,而selector可以獲得一個SelectionKey集合
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
Channel channel = key.channel();
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
-
register方法參數(shù):Channel事件
- 參數(shù)表示Selector對Channel的什么事件感興趣
- Connect:SelectionKey.OP_CONNECT
- Accept:SelectionKey.OP_ACCEPT
- Read:SelectionKey.OP_READ
- Write:SelectionKey.OP_WRITE
- 可以組合:int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
-
SelectionKey都有啥信息:
- interest集合:對哪些事件感興趣
- ready集合:感興趣的事件集中泛领,哪些事件準(zhǔn)備就緒了
- Channel:監(jiān)聽的哪個Channel
- Selector:誰在監(jiān)聽
- 可選的extra
- 喚醒阻塞的Selector:在select方法的阻塞
- 情況1:有感興趣的事件來了
- 情況2:手動調(diào)用Selector.wakeup()荒吏,只要讓其它線程在第一個線程調(diào)用select()方法的那個對象上調(diào)用Selector.wakeup()方法即可
- 如果有其它線程調(diào)用了wakeup()方法,但當(dāng)前沒有線程阻塞在select()方法上渊鞋,下個調(diào)用select()方法的線程會立即“醒來(wake up)”绰更。
- 關(guān)閉Selector
- close()方法,關(guān)閉該Selector锡宋,且使注冊到該Selector上的所有SelectionKey實例無效
- 通道本身并不會關(guān)閉
3 DatagramChannel:UDP通信
略過
4 Pipe
- 簡介:
- Pipe用于線程通信儡湾,兩個Thread由一個Pipe連接
- pipe的兩端,一端是SinkChannel执俩,負(fù)責(zé)寫入徐钠,一端是SourceChannel,負(fù)責(zé)讀取
- 所以pipe是單向通信
- 兩個Pipe就可以實現(xiàn)雙向通信
看圖:
Pipe pipe = Pipe.open();
//寫入
Pipe.SinkChannel sinkChannel = pipe.sink();
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
sinkChannel.write(buf);
}
//讀取
Pipe.SourceChannel sourceChannel = pipe.source();
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = sourceChannel.read(buf);
5 Okio
github: https://github.com/square/okio
只是對舊IO的封裝役首,沒用到Channel尝丐,也沒用到ByteBuffer
5.1 簡介:
- 基本接口
- Source:接口显拜,like InputStream
- 輸入流,輸入到內(nèi)存爹袁,從Source讀
- long read(Buffer sink, long byteCount) throws IOException
- 返回-1表示EOF远荠,寫到sink里
- Timeout timeout()
- 返回這個source的超時信息
- void close() throws IOException
- Sink:接口,like OutputStream
- 輸出流失息,從內(nèi)存輸出譬淳,往Sink寫
- void write(Buffer source, long byteCount) throws IOException
- 從source讀到sink
- Timeout timeout();
- void close() throws IOException;
- BufferedSource:接口,extends Source
- 輸出緩沖流
- 提供了一系列讀方法
- 實現(xiàn)類:RealBufferedSource根时,需要傳入一個Source,所以這是一個包裝類
- BufferedSink:接口辰晕,extends sink
- 輸入緩沖流
- 提供了一系列寫方法
- 實現(xiàn)類:RealBufferedSink蛤迎,需要傳入一個Sink,所以這是一個包裝類
- Sink和Source只有3個接口含友,實現(xiàn)方便替裆,而BufferedSource和BufferedSink提供了一堆便利方法
- Timeout:讀寫時有Timeout,主要給Socket用
- byte stream和char stream的讀寫沒有什么區(qū)別窘问,當(dāng)做byte[], utf8 String辆童,big-endian,little-endian都行,不再用InputStreamReader了
- Easy to test. The Buffer class implements both BufferedSource and BufferedSink so your test code is simple and clear.
- 互操作:Source和InputStream可以互換惠赫,Sink和OutputStream可以互換把鉴,無縫兼容
- Source:接口显拜,like InputStream
-
實用類:
- DeflaterSink,InflaterSource
- ForwardingSink儿咱,F(xiàn)orwardingSource
- GzipSink庭砍,GzipSource
- HashingSink,HashingSource
-
ByteString和Buffer
- ByteString:處理字符串
- 一個不可變的byte序列混埠,immutable sequence of bytes
- String是基本的怠缸,ByteString是String的long lost brother
- 提供便利方法處理byte
- 能decode和encode,處理hex, base64, and UTF-8
- Buffer:處理byte流
- 一個可變的byte序列钳宪,mutable sequence of bytes揭北,像個ArrayList
- 讀寫時行為像Queue,write to end吏颖,read from front
- 不需要考慮大小搔体,屏蔽了ByteBuffer的capacity,limit半醉,position等
- 緩存:把一個utf-8 String decode成ByteString嫉柴,會緩存,下次再decode奉呛,就快了
- Buffer是一個Segment的LinkedList计螺,所以拷貝不是真的拷貝夯尽,只是移動,所以更快
- 多線程工作時就有優(yōu)勢了登馒,連接network的線程可以迅速的把數(shù)據(jù)發(fā)給work線程(without any copying or ceremony)
- ByteString:處理字符串
-
工具
- AsyncTimeout
- Base64
- Options
- Timeout
- Util
- Okio
-
Segment相關(guān)
- Segment
- SegmentPool
- SegmentedByteString
5.2 使用
構(gòu)造BufferedSink和BufferedSource
//創(chuàng)建Source
Source source = Okio.source(final InputStream in, final Timeout timeout);
source(InputStream in); //new Timeout()
source(File file);
source(Path path, OpenOption... options); //java7
source(Socket socket);
//創(chuàng)建Sink
Sink sink = Okio.sink(OutputStream out);
sink(final OutputStream out, final Timeout timeout);
sink(File file)
appendingSink(File file)
sink(Path path, OpenOption... options)
sink(Socket socket)
//創(chuàng)建BufferedSource:
BufferedSource pngSource = Okio.buffer(Source source); //返回RealBufferedSource對象
BufferedSink pngSink = Okio.buffer(Sink sink); //返回RealBufferedSink對象
//從BufferedSource讀取
看例子吧
//往BufferedSink寫入
看例子吧
//ByteString
看例子吧
//Buffer
看例子吧
5.3 例子:來自官網(wǎng)
package com.cowthan.nio.okio;
import java.io.IOException;
import java.io.InputStream;
import okio.Buffer;
import okio.BufferedSource;
import okio.ByteString;
import okio.Okio;
public class Test1_png {
public static void main(String[] args) throws IOException {
InputStream in = Test1_png.class.getResourceAsStream("/com/demo/1.png");
decodePng(in);
}
private static final ByteString PNG_HEADER = ByteString
.decodeHex("89504e470d0a1a0a");
public static void decodePng(InputStream in) throws IOException {
BufferedSource pngSource = Okio.buffer(Okio.source(in));
ByteString header = pngSource.readByteString(PNG_HEADER.size());
if (!header.equals(PNG_HEADER)) {
throw new IOException("Not a PNG.");
}
while (true) {
Buffer chunk = new Buffer();
// Each chunk is a length, type, data, and CRC offset.
int length = pngSource.readInt();
String type = pngSource.readUtf8(4);
pngSource.readFully(chunk, length);
int crc = pngSource.readInt();
decodeChunk(type, chunk);
if (type.equals("IEND"))
break;
}
pngSource.close();
}
private static void decodeChunk(String type, Buffer chunk) {
if (type.equals("IHDR")) {
int width = chunk.readInt();
int height = chunk.readInt();
System.out.printf("%08x: %s %d x %d%n", chunk.size(), type, width,
height);
} else {
System.out.printf("%08x: %s%n", chunk.size(), type);
}
}
}