java.nio 包里屁倔,是java用于處理IO的新的API翅娶,它使用channel弄屡、select等模型,重新對(duì)IO操作進(jìn)行了新的實(shí)現(xiàn)搬设。
DirectByteBuffer就是nio包下面的一個(gè)類穴店。這個(gè)類用于保存byte數(shù)組,其特別之處在于:他將數(shù)據(jù)保存在堆外內(nèi)存拿穴。不像傳統(tǒng)的對(duì)象泣洞,對(duì)象都在堆中。這樣的好處就是對(duì)于 IO操作默色,減少了內(nèi)存copy次數(shù)球凰,從而增加效率。這里以文件IO進(jìn)行講解
在這里我們先把結(jié)論說一下:
a. 傳統(tǒng)的IO操作(就是使用java.io包的api)訪問磁盤文件腿宰,數(shù)據(jù)需要copy的次數(shù):
1. 磁盤文件的數(shù)據(jù) copy 內(nèi)核page cache
2. 內(nèi)核的數(shù)據(jù) copy 應(yīng)用程序空間(即:jvm 堆外內(nèi)存)
3. jvm堆外內(nèi)存 copy jvm堆內(nèi) 內(nèi)存
為什么2弟蚀、和3 不合并,將內(nèi)核數(shù)據(jù) copy jvm堆內(nèi)內(nèi)存酗失。 因?yàn)閖vm進(jìn)行系統(tǒng)調(diào)用進(jìn)行讀文件時(shí)候,此時(shí)發(fā)生gc昧绣,那么堆內(nèi)存的對(duì)應(yīng)地址就會(huì)移動(dòng)规肴,所以直接copy到堆內(nèi)是有問題的。
b. 使用DirectByteBuffer訪問磁盤文件,數(shù)據(jù)需要copy的次數(shù):
1. 磁盤文件的數(shù)據(jù) copy 內(nèi)核page cache
2. 內(nèi)核的數(shù)據(jù) copy 應(yīng)用程序空間(即:DirectByteBuffer)
所以DirectByteBuffer減少了內(nèi)存copy次數(shù)拖刃。
1.傳統(tǒng)文件IO解析
文件讀取示例:
FileInputStream input = new FileInputStream("/data");
byte[] b = new byte[SIZE];
input.read(b);
byte數(shù)組示堆內(nèi)存對(duì)象删壮,此處將數(shù)據(jù)copy 到j(luò)vm堆內(nèi)存。我們看一下read函數(shù)內(nèi)部實(shí)現(xiàn)
public int read(byte b[]) throws IOException {
return readBytes(b, 0, b.length);
}
private native int readBytes(byte b[], int off, int len) throws IOException;
我們看到 read函數(shù)最終調(diào)用 native函數(shù) readBytes兑牡。
jint readBytes(JNIEnv *env, jobject this, jbyteArray bytes, jint off, jint len, jfieldID fid) {
jint nread;
char stackBuf[ BUF_SIZE];
char *buf = NULL;
FD fd;
if (IS_NULL(bytes)) {
JNU_ThrowNullPointerException(env, NULL);
return -1;
}
if (outOfBounds(env, off, len, bytes)) {
JNU_ThrowByName(env, "java/lang/IndexOutOfBoundsException", NULL);
return -1;
}
if (len == 0) {
return 0;
} else if (len > BUF_SIZE) {
buf = malloc(len);
if (buf == NULL) {
JNU_ThrowOutOfMemoryError(env, NULL);
return 0;
}
} else {
buf = stackBuf;
}
fd = GET_FD(this, fid);
if (fd == -1) {
JNU_ThrowIOException(env, "Stream Closed");
nread = -1;
} else {
nread = IO_Read(fd, buf, len);
if (nread > 0) {
( * env)->SetByteArrayRegion(env, bytes, off, nread, (jbyte *)buf);
} else if (nread == -1) {
JNU_ThrowIOExceptionWithLastError(env, "Read error");
} else { /* EOF */
nread = -1;
}
}
if (buf != stackBuf) {
free(buf);
}
return nread;
}
我們看到最終通過IO_Read將緩沖數(shù)據(jù)讀到buf中去央碟,這個(gè)IO_Read其實(shí)是一個(gè)宏定義:
define IO_Read handleRead
handleRead函數(shù)實(shí)現(xiàn)如下,這里你可以看到這里進(jìn)行了read系統(tǒng)調(diào)用:
ssize_t handleRead(FD fd, void *buf, jint len) {
ssize_t result;
RESTARTABLE(read(fd, buf, len), result);
return result;
}
buf返回之后均函,由SetByteArrayRegion這個(gè)JNI函數(shù)拷貝到了bytes亿虽,它的具體實(shí)現(xiàn)如下(下面定義了一個(gè)通用的宏函數(shù)來表示各種數(shù)據(jù)類型數(shù)組區(qū)域的設(shè)置,可以將Result宏替換成Byte即可理解):
JNI_ENTRY(void, jni_Set##Result##ArrayRegion(JNIEnv *env, ElementType##Array array, jsize start, jsize len, const ElementType *buf))
JNIWrapper("Set" XSTR(Result) "ArrayRegion");
DTRACE_PROBE5(hotspot_jni, Set##Result##ArrayRegion__entry, env, array, start, len, buf);
DT_VOID_RETURN_MARK(Set##Result##ArrayRegion);
typeArrayOop dst = typeArrayOop(JNIHandles::resolve_non_null(array));
if (start < 0 || len < 0 || ((unsigned int)start + (unsigned int)len > (unsigned int)dst->length())) {
THROW(vmSymbols::java_lang_ArrayIndexOutOfBoundsException());
} else {
if (len > 0) {
int sc = TypeArrayKlass::cast(dst->klass())->log2_element_size();
memcpy((u_char*) dst->Tag##_at_addr(start),
(u_char*) buf,
len << sc);
}
}
JNI_END
(以上內(nèi)容部門來源:https://www.zhihu.com/question/65415926)
由此可見苞也,native方法洛勉,readBytes而采用了C Heap - JVM Heap進(jìn)行內(nèi)存拷貝的方式進(jìn)行數(shù)據(jù)傳遞。
而readBytes 通過調(diào)用 handleRead 進(jìn)行讀寫如迟。handleRead就是讀取內(nèi)核緩存區(qū)數(shù)據(jù)收毫。內(nèi)核數(shù)據(jù)來源文件。
2. DirectByteBuffer
DirectByteBuffer 是構(gòu)建在堆外的內(nèi)存的對(duì)象殷勘。
DirectByteBuffer是包級(jí)別可訪問的此再,通過 ByteBuffer.allocateDirect(int capacity) 進(jìn)行構(gòu)造。
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
我們看一下DirectByteBuffer 構(gòu)造函數(shù)實(shí)現(xiàn)
DirectByteBuffer(int cap) {// package-private
super(-1,0, cap, cap);
boolean pa = VM.isDirectMemoryPageAligned();
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps :0));
Bits.reserveMemory(size, cap);
long base =0;
try {
base =unsafe.allocateMemory(size);
}catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
unsafe.setMemory(base, size, (byte)0);
if (pa && (base % ps !=0)) {
// Round up to page boundary
address = base + ps - (base & (ps -1));
}else {
address = base;
}
cleaner = Cleaner.create(this,new Deallocator(base, size, cap));
att =null;
}
這里我們主要關(guān)注這幾個(gè)地方:
1.unsafe.allocateMemory(size);
利用 unsafe 類在堆外內(nèi)存(C_HEAP)中分配了一塊空間玲销,這是一個(gè) native 函數(shù)输拇,轉(zhuǎn)到進(jìn)行堆外內(nèi)存分配的 C/C++ 代碼
inline char* AllocateHeap( size_t size, MEMFLAGS flags, address pc = 0, AllocFailType alloc_failmode = AllocFailStrategy::EXIT_OOM){
// ... 省略
char*p=(char*)os::malloc(size, flags, pc);
// 分配在 C_HEAP 上并返回指向內(nèi)存區(qū)域的指針
// ... 省略
return p;
}
2.cleaner = Cleaner.create(this,new Deallocator(base, size, cap));
cleaner對(duì)象是對(duì)DirectByteBuffer占用對(duì)堆外內(nèi)存進(jìn)行清理。DirectByteBuffer.cleaner().clean() 進(jìn)行手動(dòng)清理痒玩。我們看一下clean() 函數(shù)
public void clean() {
//....省略
this.thunk.run();
//....省略
}
其中 thunk就是我們 Cleaner.create(this,new Deallocator(base, size, cap)); 中的Deallocator淳附。看一下Deallocator蠢古。
private static class Deallocator implements Runnable {
//奴曙。。草讶。省略
public void run() {
if (address ==0) {
// Paranoia
return;
}
unsafe.freeMemory(address);
address =0;
Bits.unreserveMemory(size,capacity);
}
}
可以看到其是一個(gè)線程進(jìn)行 堆外內(nèi)存的釋放動(dòng)作洽糟。
cleaner是PhantomReference的子類。
PhantomReference它其實(shí)主要是用來跟蹤對(duì)象何時(shí)被回收的堕战,它不能影響gc決策坤溃,但是gc過程中如果發(fā)現(xiàn)某個(gè)對(duì)象除了只有PhantomReference引用它之外,并沒有其他的地方引用它了嘱丢,那將會(huì)把這個(gè)引用放到j(luò)ava.lang.ref.Reference.pending隊(duì)列里薪介,在gc完畢的時(shí)候通知ReferenceHandler這個(gè)守護(hù)線程去執(zhí)行一些后置處理。這個(gè)處理方法中越驻,就會(huì)判斷是否是cleaner對(duì)象汁政,如果是道偷,就執(zhí)行clean()函數(shù)。
因此DirectByteBuffer并不需要我們手動(dòng)清理內(nèi)存记劈。當(dāng)jvm進(jìn)行g(shù)c(oldgc)的時(shí)候勺鸦,就會(huì)清理沒有引用的 dirctByteBuffer。
當(dāng)我們一直申請(qǐng)DirectByteBuffer目木。其實(shí)占用的是堆外內(nèi)存换途,堆內(nèi)內(nèi)存只是占用一個(gè)引用。如果一直觸發(fā)不了gc刽射,那么堆外內(nèi)存就不會(huì)回收军拟,導(dǎo)致jvm進(jìn)程占用內(nèi)存很大柄冲。我們可以通過-XX:MaxDirectMemorySize限制DirecByteBuffer占用堆外內(nèi)存的大小
3.Bits.reserveMemory(size, cap);
static void reserveMemory(long size,int cap) {
synchronized (Bits.class) {
if (!memoryLimitSet && VM.isBooted()) {
maxMemory = VM.maxDirectMemory();
memoryLimitSet =true;
}
// -XX:MaxDirectMemorySize limits the total capacity rather than the
// actual memory usage, which will differ when buffers are page
// aligned.
if (cap <=maxMemory -totalCapacity) {
reservedMemory += size;
totalCapacity += cap;
count++;
return;
}
}
System.gc();
try {
Thread.sleep(100);
}catch (InterruptedException x) {
// Restore interrupt status
Thread.currentThread().interrupt();
}
synchronized (Bits.class) {
if (totalCapacity + cap >maxMemory)
throw new OutOfMemoryError("Direct buffer memory");
reservedMemory += size;
totalCapacity += cap;
count++;
}
}
該函數(shù)用于統(tǒng)計(jì)DirectByteBuffer占用的大小。VM.maxDirectMemory()是jvm允許申請(qǐng)的最大DirectBuffer的大邢趾帷(XX:MaxDirectMemorySize 通過這個(gè)參數(shù)設(shè)置)
如果發(fā)現(xiàn)當(dāng)前申請(qǐng)的空間,大于限制的空間戒祠,就會(huì)觸發(fā)一次gc骇两,上面說過gc會(huì)回收哪些之前不使用的directBuffer。然后再次申請(qǐng)姜盈。
VM.maxDirectMemory() 大小是如何設(shè)置的內(nèi)低千,在VM類有這樣一段代碼
public static void saveAndRemoveProperties(Properties var0) {
//....
String var1 = (String)var0.remove("sun.nio.MaxDirectMemorySize");
if (var1 !=null) {
if (var1.equals("-1")) {
directMemory = Runtime.getRuntime().maxMemory();
}else {
long var2 = Long.parseLong(var1);
if (var2 > -1L) {
directMemory = var2;
}
}
//...
}
"sun.nio.MaxDirectMemorySize" 這個(gè)屬性就是通過 -XX:MaxDirectMemorySize 這個(gè)參數(shù)設(shè)置的。如果我們不指定這個(gè)jvm參數(shù)馏颂,筆者在jdk8中測試了一下示血,默認(rèn)是-1,這樣就導(dǎo)致directBufffer內(nèi)存限制為進(jìn)程最大內(nèi)存救拉。當(dāng)然這也是一個(gè)潛在風(fēng)險(xiǎn)难审。
風(fēng)險(xiǎn)案例:
筆者曾在線上運(yùn)行一個(gè)應(yīng)用。該應(yīng)用就是從消息隊(duì)列中消費(fèi)數(shù)據(jù)亿絮,然后將數(shù)據(jù)處理后存到Hbase中告喊。但是應(yīng)用運(yùn)行每次運(yùn)行2周左右,機(jī)器就會(huì)出現(xiàn)swap占用過大派昧。經(jīng)過分析黔姜,是jvm進(jìn)程占用內(nèi)存太大,但是分析jvm相關(guān)參數(shù)(堆蒂萎、線程大懈殉场),并沒有設(shè)置的很大五慈。最后發(fā)現(xiàn)原來是directBuffer占用達(dá)到了10G纳寂。后面通過-XX:MaxDirectMemorySize=2048m 限制directbuffer使用量实苞,解決了問題。每次directBuffer占用達(dá)到2G烈疚,就會(huì)觸發(fā)一次fullgc,將之前的無用directbuffer回收掉聪轿。hbase一個(gè)坑爷肝,有時(shí)間筆者會(huì)整理這個(gè)案例。
3.DirectByteBuffer文件IO
文件讀取示例:
FileChannel filechannel=new RandomAccessFile("/data/appdatas/cat/mmm","rw").getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(SIZE);
filechannel.read(byteBuffer)
我們看一下read函數(shù)
public int read(ByteBuffer var1)throws IOException {
//陆错。灯抛。。音瓷。
var3 = IOUtil.read(this.fd, var1, -1L,this.nd);
//对嚼。。绳慎。纵竖。
}
主要邏輯調(diào)用IOUtil.read。我們看一下這個(gè)函數(shù)
static int read(FileDescriptor var0, ByteBuffer var1,long var2, NativeDispatcher var4)throws IOException {
if (var1.isReadOnly()) {
throw new IllegalArgumentException("Read-only buffer");
}else if (var1instanceof DirectBuffer) {
return readIntoNativeBuffer(var0, var1, var2, var4);
}else {
ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());
int var7;
try {
int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
var5.flip();
if (var6 >0) {
var1.put(var5);
}
var7 = var6;
}finally {
Util.offerFirstTemporaryDirectBuffer(var5);
}
return var7;
}
}
主要方法就是通過 readIntoNativeBuffer 這個(gè)函數(shù)將數(shù)據(jù)讀入 directBuffer中杏愤,其中readIntoNativeBuffer也是調(diào)用一個(gè)native方法靡砌。
通過上面的代碼珊楼,我們會(huì)看到厕宗,如果fielchannel.read(ByteBuffer) 也可以傳入一個(gè)HeapByteBuffer,這個(gè)類是堆中曲聂。如果是這個(gè)類蛇受,那么內(nèi)部讀取的時(shí)候兢仰,會(huì)把數(shù)據(jù)先讀到DirectByteBuffer中,然后在copy到HeapByteBuffer中把将。Util.getTemporaryDirectBuffer(var1.remaining());就是獲取一個(gè)DirectBuffer對(duì)像察蹲。因?yàn)镈irectBuffer創(chuàng)建的時(shí)候催训,開銷比較大漫拭,所以使用的時(shí)候一般會(huì)用一個(gè)池子來管理采驻。有興趣可以看一下Util這個(gè)類里面的實(shí)現(xiàn)匈勋。