1蠕趁、目前能夠在網(wǎng)上搜到的java相關(guān)的高性能文件io文章都比較基礎(chǔ)哄啄,想深入的話需要既了解java的文件操作api原理,又了解文件操作相關(guān)的系統(tǒng)調(diào)用笙蒙,這就造成了學(xué)習(xí)困難
2、實(shí)際上elasticsearch庆锦、kafka捅位、rocketmq里關(guān)于文件操作的java實(shí)現(xiàn)已經(jīng)很好,本文也參考了其中不少代碼實(shí)現(xiàn)
3搂抒、本文的每個(gè)結(jié)論均提供測(cè)試代碼驗(yàn)證艇搀,方便讀者在自己的機(jī)器上驗(yàn)證
4、本文貼出的jdk native源碼版本是openjdk的jdk8-b120
5求晶、本文需要讀者對(duì)java文件操作焰雕、虛擬內(nèi)存、物理內(nèi)存誉帅、pagecache有一定基礎(chǔ)了解
6淀散、本文的測(cè)試代碼需要在TEST_PATH目錄下提前準(zhǔn)備test1-test30文件(根據(jù)自己測(cè)試環(huán)境調(diào)整),可以通過fallocate -l 1G test1
快速創(chuàng)建30個(gè)1g大小的測(cè)試文件
7蚜锨、測(cè)試代碼依賴如下档插,引入netty只是為了使用其中的PlatformDependent工具類,引入JNA是為了執(zhí)行系統(tǒng)調(diào)用
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.37</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.37</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.101.Final</version>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>5.13.0</version>
</dependency>
基礎(chǔ)部分
FileChannel亚再、MappedByteBuffer的初始化和釋放
- FileChannel的map函數(shù)是對(duì)系統(tǒng)調(diào)用mmap的閹割式封裝郭膛,僅提供三種mode,READ_ONLY/READ_WRITE/PRIVATE對(duì)應(yīng)mmap的prot氛悬、flags的部分組合则剃,下面會(huì)貼出jdk源碼,mmap系統(tǒng)調(diào)用原型:
void *mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset);
- 直接使用netty的工具類釋放MappedByteBuffer如捅,原理不展開了棍现,讀者可以額外查閱資料學(xué)習(xí),也可以看我之前的文章http://www.reibang.com/p/4f026fe063aa
- 不論是何種方式獲取的FileChannel镜遣,僅需要關(guān)閉FileChannel本身己肮。此處無需關(guān)閉RandomAccessFile
- 需要注意關(guān)閉FileChannel是不會(huì)釋放MappedByteBuffer的,也就是說僅關(guān)閉FileChannel后MappedByteBuffer仍然可以繼續(xù)使用
FileChannel channel = new RandomAccessFile(TEST_PATH + "test1", "rw").getChannel();
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
PlatformDependent.freeDirectBuffer(mappedByteBuffer);
fileChannel.close();
- jdk map native源碼部分核心如下,可以看到三種mode只對(duì)應(yīng)mmap中prot和flags的三種組合谎僻,這也是我為什么說map方法是對(duì)mmap系統(tǒng)調(diào)用的閹割封裝娄柳,mmap還有很多實(shí)用的flag我們?cè)趈ava中沒法直接使用
完整源碼:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/sun/nio/ch/FileChannelImpl.c
Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this, jint prot, jlong off, jlong len)
{
if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
protections = PROT_READ;
flags = MAP_SHARED;
} else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
protections = PROT_WRITE | PROT_READ;
flags = MAP_SHARED;
} else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
protections = PROT_WRITE | PROT_READ;
flags = MAP_PRIVATE;
}
mapAddress = mmap64(
0, /* Let OS decide location */
len, /* Number of bytes to map */
protections, /* File permissions */
flags, /* Changes are shared */
fd, /* File descriptor of mapped file */
off); /* Offset into file */
}
- FileChannel close最終調(diào)用FileChannelImpl的implCloseChannel方法,源碼如下
可以看到會(huì)對(duì)parent執(zhí)行close艘绍,所以對(duì)FileChannel執(zhí)行close就會(huì)對(duì)RandomAccessFile執(zhí)行close赤拒。其實(shí)對(duì)RandomAccessFile執(zhí)行close也會(huì)對(duì)FileChannel執(zhí)行close,所以這倆關(guān)閉一個(gè)就行诱鞠。一般我們只把RandomAccessFile當(dāng)做獲取FileChannel的工具人挎挖,不會(huì)保留它的引用,所以最終只會(huì)執(zhí)行FileChannel的close
protected void implCloseChannel() throws IOException {
if (this.fileLockTable != null) {
Iterator var1 = this.fileLockTable.removeAll().iterator();
while(var1.hasNext()) {
FileLock var2 = (FileLock)var1.next();
synchronized(var2) {
if (var2.isValid()) {
this.nd.release(this.fd, var2.position(), var2.size());
((FileLockImpl)var2).invalidate();
}
}
}
}
this.threads.signalAndWait();
if (this.parent != null) {
((Closeable)this.parent).close();
} else {
this.nd.close(this.fd);
}
}
FileChannel write和DirectBuffer的關(guān)系
- 源Buffer是DirectBuffer則直接寫入航夺,是HeapBuffer則在cache中拿一塊DirectBuffer肋乍,把數(shù)據(jù)拷貝進(jìn)去再寫入
- 這個(gè)cache由jdk維護(hù),可以看到里面的DirectBuffer沒有主動(dòng)釋放邏輯敷存,只能隨著gc釋放,因此有oom隱患
- 所以最佳實(shí)踐是用戶自己構(gòu)建DirectBuffer寫入堪伍,并自己控制該DirectBuffer的釋放锚烦,避免數(shù)據(jù)拷貝,也避免堆外內(nèi)存的OOM
- write方法最后使用sun.nio.ch.IOUtil的write方法帝雇,核心源碼如下
static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
if (var1 instanceof DirectBuffer) {
return writeFromNativeBuffer(var0, var1, var2, var4);
} else {
int var5 = var1.position();
int var6 = var1.limit();
int var7 = var5 <= var6 ? var6 - var5 : 0;
ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);
int var10;
try {
var8.put(var1);
var8.flip();
var1.position(var5);
int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
if (var9 > 0) {
var1.position(var5 + var9);
}
var10 = var9;
} finally {
Util.offerFirstTemporaryDirectBuffer(var8);
}
return var10;
}
}
FileChannel read和DirectBuffer的關(guān)系
- 和write同理涮俄,直接使用DirectBuffer最好
- 最終調(diào)用sun.nio.ch.IOUtil的read方法,核心源碼如下
static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
if (var1.isReadOnly()) {
throw new IllegalArgumentException("Read-only buffer");
} else if (var1 instanceof DirectBuffer) {
return readIntoNativeBuffer(var0, var1, var2, var4);
} else {
ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());
int var7;
try {
int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
var5.flip();
if (var6 > 0) {
var1.put(var5);
}
var7 = var6;
} finally {
Util.offerFirstTemporaryDirectBuffer(var5);
}
return var7;
}
}
FileChannel force參數(shù)true/false的區(qū)別
- 對(duì)文件的寫入其實(shí)都不會(huì)直接寫入磁盤(directIO除外)尸闸,只會(huì)寫到buffer中彻亲,由操作系統(tǒng)統(tǒng)一調(diào)度寫入磁盤,所以如果需要實(shí)時(shí)存儲(chǔ)就需要主動(dòng)調(diào)用force方法直接寫入磁盤
- jdk文檔表示對(duì)FileChannel執(zhí)行force不會(huì)保證對(duì)該FileChannel通過map方法獲得的MappedByteBuffer進(jìn)行刷盤吮廉,如果對(duì)MappedByteBuffer有修改需要刷盤苞尝,需要調(diào)用MappedByteBuffer自己的force方法
- force方法有參數(shù)true/false,jdk文檔說的不夠具體宦芦,只說true的時(shí)候會(huì)額外寫入metadata宙址。實(shí)際上true/false對(duì)應(yīng)的系統(tǒng)調(diào)用分別是fdatasync/fsync(兩個(gè)系統(tǒng)調(diào)用的詳細(xì)區(qū)別本文不展開),核心源碼如下
- fsync會(huì)比fdatasync多一次尋址调卑,將文件大小抡砂、修改時(shí)間等metadata寫入磁盤,性能會(huì)比fdatasync差一點(diǎn)恬涧,在文件大小固定的情況下可以僅調(diào)用fdatasync注益,文件大小不固定的情況下調(diào)用fdatasync沒來的及更新文件大小可能會(huì)造成丟數(shù)據(jù)。其實(shí)由下面的壓測(cè)結(jié)果可知這兩個(gè)系統(tǒng)調(diào)用性能差距也不是很大(這里可能是固態(tài)硬盤和機(jī)械硬盤的區(qū)別溯捆?機(jī)械硬盤可能差距更大)
完整源碼請(qǐng)看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/sun/nio/ch/FileDispatcherImpl.c
Java_sun_nio_ch_FileDispatcherImpl_force0(JNIEnv *env, jobject this, jobject fdo, jboolean md)
{
if (md == JNI_FALSE) {
result = fdatasync(fd);
} else {
result = fsync(fd);
}
}
FileChannel的transferTo方法介紹
對(duì)該方法的解析可以看我之前的文章:http://www.reibang.com/p/11ed05ca62ff
- transferTo可以利用sendFile系統(tǒng)調(diào)用做到真正的零拷貝傳輸數(shù)據(jù)丑搔,但是僅限于文件到文件,文件到socket兩條路
- 在文件下載場景可以使用該方法,將文件直接transferTo到socket中低匙,我們自己的程序中是無法感知到文件內(nèi)容的旷痕。最后通過文件hash值判斷文件是否正確完整下載
- 在非文件下載場景基本無法使用transferTo,因?yàn)槲覀兊某绦蜻壿嬂锎蠖嘈枰獜拇疟P文件中讀取到數(shù)據(jù)顽冶,然后做自己的業(yè)務(wù)處理欺抗,再將數(shù)據(jù)寫入socket
- 如果硬用transferTo做文件讀取(文件到socket),考慮到下面的性能測(cè)試强重,我認(rèn)為整體也不會(huì)比用MappedByteBuffer性能好
MappedByteBuffer load方法
- 先執(zhí)行l(wèi)oad0 native方法绞呈,然后根據(jù)頁大小,對(duì)每個(gè)頁讀取了一下间景,最后通過一個(gè)累加的x避免編譯器認(rèn)為這段代碼是dead code優(yōu)化掉
- 可以看到load方法的目標(biāo)就是主動(dòng)觸發(fā)缺頁佃声,從而將文件內(nèi)容真正填充進(jìn)物理內(nèi)存中
- MappedByteBuffer一開始僅僅是虛擬內(nèi)存,不會(huì)分配真正的物理內(nèi)存倘要,用到的時(shí)候才會(huì)分配
public final MappedByteBuffer load() {
load0(mappingAddress(offset), length);
// Read a byte from each page to bring it into memory. A checksum
// is computed as we go along to prevent the compiler from otherwise
// considering the loop as dead code.
Unsafe unsafe = Unsafe.getUnsafe();
int ps = Bits.pageSize();
int count = Bits.pageCount(length);
long a = mappingAddress(offset);
byte x = 0;
for (int i=0; i<count; i++) {
x ^= unsafe.getByte(a);
a += ps;
}
if (unused != 0)
unused = x;
return this;
}
- load0核心源碼如下圾亏,主要就是執(zhí)行系統(tǒng)調(diào)用madvise并且傳入?yún)?shù)MADV_WILLNEED,本文對(duì)madvise不做展開封拧,讀者可以自行查閱
可以看出load0的作用是告訴操作系統(tǒng)這段內(nèi)存接下來willneed志鹃,操作系統(tǒng)可以對(duì)這段內(nèi)存做些優(yōu)化處理,比如預(yù)讀和提前加載之類的泽西,可以加快后續(xù)對(duì)每個(gè)頁都觸發(fā)填充的效率
完整源碼請(qǐng)看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/java/nio/MappedByteBuffer.c
Java_java_nio_MappedByteBuffer_load0(JNIEnv *env, jobject obj, jlong address, jlong len)
{
int result = madvise((caddr_t)a, (size_t)len, MADV_WILLNEED);
}
MappedByteBuffer isLoad方法
- 主要就是直接調(diào)用isLoaded0 native方法
public final boolean isLoaded() {
return isLoaded0(mappingAddress(offset), length, Bits.pageCount(length));
}
- isLoaded0核心源碼如下曹铃,通過系統(tǒng)調(diào)用mincore檢查每個(gè)頁是否都在物理內(nèi)存中,mincore不做展開捧杉,讀者可以自行查閱
完整源碼請(qǐng)看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/java/nio/MappedByteBuffer.c
Java_java_nio_MappedByteBuffer_isLoaded0(JNIEnv *env, jobject obj, jlong address, jlong len, jint numPages)
{
jboolean loaded = JNI_TRUE;
unsigned char *vec = (unsigned char *)malloc(numPages * sizeof(char));
mincore(address, (size_t)len, vec);
for (i=0; i<numPages; i++) {
if (vec[i] == 0) {
loaded = JNI_FALSE;
break;
}
}
return loaded;
}
調(diào)用了MappedByteBuffer的load方法后陕见,isLoad不一定一直為true
當(dāng)pagecache不夠用了的時(shí)候,操作系統(tǒng)會(huì)將cache按照規(guī)則釋放或者放入swap區(qū)味抖,測(cè)試代碼如下评甜,如果測(cè)試電腦的可用內(nèi)存小于30g,基本上最后會(huì)顯示false
public static void main(String[] args) throws IOException, InterruptedException {
RandomAccessFile r = new RandomAccessFile(TEST_PATH + "/test1", "rw");
FileChannel fileChannel = r.getChannel();
MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
byteBuffer.load();
new Thread(() -> {
try {
for (int i = 2; i < 31; i++) {
RandomAccessFile f = new RandomAccessFile(TEST_PATH + "test" + i, "rw");
FileChannel channel = f.getChannel();
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
buffer.load();
}
TimeUnit.DAYS.sleep(1);
} catch (Exception e) {
}
}).start();
TimeUnit.SECONDS.sleep(20);
System.out.println(byteBuffer.isLoaded());
}
FileChannel和MappedByteBuffer讀寫測(cè)試
- 大部分研究java文件io的應(yīng)該都看過這篇文章仔涩,或者各種轉(zhuǎn)載抄襲的復(fù)制品蜕着,但是并不知道他是如何測(cè)試的,因此對(duì)這篇文章的結(jié)論不可全信
- 使用JMH進(jìn)行基準(zhǔn)測(cè)試红柱,對(duì)JMH不做展開承匣,讀者可以自行查閱,推薦資料
https://developer.aliyun.com/article/899469
https://dunwu.github.io/java-tutorial/pages/747d3e/#warmup
- 本測(cè)試力求單純測(cè)試讀寫锤悄,盡量避免其他的開銷
- 本測(cè)試僅讀寫文件的第一個(gè)頁韧骗,避免pagecache的不確定性影響,如何獲得當(dāng)前頁大小在下面的進(jìn)階部分有說明
- 本測(cè)試使用的文件均固定大小1G零聚,提前創(chuàng)建袍暴,參照本文最開始的說明(在文章最上面)
- 測(cè)試讀取時(shí)使用的目標(biāo)ByteBuffer和數(shù)組均通過Threadlocal保存復(fù)用些侍,避免每次創(chuàng)建ByteBuffer和數(shù)組的開銷干擾
- 測(cè)試寫入時(shí)使用的源ByteBuffer和數(shù)組提前創(chuàng)建并共用,避免每次寫入都創(chuàng)建的干擾開銷
- 測(cè)試包括如下幾項(xiàng)
從FileChannel讀到HeapByteBuffer
從FileChannel讀到DirectByteBuffer
從HeapByteBuffer寫入FileChannel
從DirectByteBuffer寫入FileChannel
從DirectByteBuffer寫入FileChannel并force(false)
從DirectByteBuffer寫入FileChannel并force(true)
從MappedByteBuffer讀到數(shù)組
從數(shù)組寫入MappedByteBuffer
從HeapByteBuffer寫入MappedByteBuffer
從DirectByteBuffer寫入MappedByteBuffer
從DirectByteBuffer寫入MappedByteBuffer并force
-
Linux version 3.10.0-327.ali2017.alios7.x86_64
16c32g的機(jī)器測(cè)試結(jié)果如下政模,可以說在文件大小固定的前提下MappedByteBuffer全面完勝FileChannel
FileChannel讀到DirectBuffer比讀到HeapBuffer性能好岗宣,如上文所述,符合預(yù)期
FileChannel寫入DirectBuffer比HeapBuffer淋样,如上文所述耗式,符合預(yù)期
FileChannel force(false)比force(true)好一些
MappedByteBuffer讀到數(shù)組中有壓倒性讀取優(yōu)勢(shì),比FileChannel高兩個(gè)數(shù)量級(jí)
MappedByteBuffer寫入數(shù)組數(shù)據(jù)性能好一些趁猴,寫入DirectBuffer和HeapBuffer差不多
MappedByteBuffer比FileChannel的寫入性能刊咳、force性能都好
mac上測(cè)試結(jié)果差不多,但是MappedByteBuffer從數(shù)組儡司、HeapBuffer娱挨、DirectBuffer寫入差距不像linux上明顯
Benchmark Mode Cnt Score Error Units
MyBenchmark.fileChannelRead2DirectBuffer thrpt 968.349 ops/ms
MyBenchmark.fileChannelRead2HeapBuffer thrpt 669.176 ops/ms
MyBenchmark.fileChannelWriteFromDirectBuffer thrpt 286.462 ops/ms
MyBenchmark.fileChannelWriteFromDirectBufferForce thrpt 3.992 ops/ms
MyBenchmark.fileChannelWriteFromDirectBufferForceMeta thrpt 3.169 ops/ms
MyBenchmark.fileChannelWriteFromHeapBuffer thrpt 258.994 ops/ms
MyBenchmark.mappedRead2Array thrpt 67872.596 ops/ms
MyBenchmark.mappedWriteFromArray thrpt 1675.428 ops/ms
MyBenchmark.mappedWriteFromDirectBuffer thrpt 1585.909 ops/ms
MyBenchmark.mappedWriteFromDirectBufferForce thrpt 6.651 ops/ms
MyBenchmark.mappedWriteFromHeapBuffer thrpt 1559.150 ops/ms
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(1)
@Warmup(iterations = 3, time = 10)
@Measurement(iterations = 1, time = 10)
@Threads(16)
@State(Scope.Benchmark)
public class MyBenchmark {
private static final int SIZE = pageSize;
private FileChannel fileChannel;
private MappedByteBuffer mappedByteBuffer;
private final ThreadLocal<ByteBuffer> fileChannelRead2HeapBufferThreadLocal = new ThreadLocal<>();
private final ThreadLocal<ByteBuffer> fileChannelRead2DirectBufferThreadLocal = new ThreadLocal<>();
private final ThreadLocal<byte[]> mappedRead2ArrayThreadLocal = new ThreadLocal<>();
private final byte[] srcByteArray = new byte[SIZE];
private final ByteBuffer srcHeapBuffer = ByteBuffer.allocate(SIZE);
private final ByteBuffer srcDirectBuffer = ByteBuffer.allocateDirect(SIZE);
@Setup
public void setup() throws IOException {
for (int i = 0; i < SIZE; i++) {
srcByteArray[i] = 9;
}
srcHeapBuffer.put(srcByteArray, 0, SIZE);
srcHeapBuffer.flip();
srcDirectBuffer.put(srcByteArray, 0, SIZE);
srcDirectBuffer.flip();
RandomAccessFile r = new RandomAccessFile(TEST_PATH + "test1", "rw");
fileChannel = r.getChannel();
mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
}
@TearDown
public void tearDown() throws IOException {
PlatformDependent.freeDirectBuffer(mappedByteBuffer);
fileChannel.close();
}
@Benchmark
public void fileChannelRead2HeapBuffer(Blackhole blackhole) throws IOException {
ByteBuffer byteBuffer = fileChannelRead2HeapBufferThreadLocal.get();
if (byteBuffer == null) {
byteBuffer = ByteBuffer.allocate(SIZE);
fileChannelRead2HeapBufferThreadLocal.set(byteBuffer);
}
blackhole.consume(fileChannel.read(byteBuffer.slice(), 0L));
}
@Benchmark
public void fileChannelRead2DirectBuffer(Blackhole blackhole) throws IOException {
ByteBuffer byteBuffer = fileChannelRead2DirectBufferThreadLocal.get();
if (byteBuffer == null) {
byteBuffer = ByteBuffer.allocateDirect(SIZE);
fileChannelRead2DirectBufferThreadLocal.set(byteBuffer);
}
blackhole.consume(fileChannel.read(byteBuffer.slice(), 0L));
}
@Benchmark
public void fileChannelWriteFromHeapBuffer(Blackhole blackhole) throws IOException {
blackhole.consume(fileChannel.write(srcHeapBuffer.slice(), 0L));
}
@Benchmark
public void fileChannelWriteFromDirectBuffer(Blackhole blackhole) throws IOException {
blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
}
@Benchmark
public void fileChannelWriteFromDirectBufferForce(Blackhole blackhole) throws IOException {
blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
fileChannel.force(false);
}
@Benchmark
public void fileChannelWriteFromDirectBufferForceMeta(Blackhole blackhole) throws IOException {
blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
fileChannel.force(true);
}
@Benchmark
public void mappedRead2Array(Blackhole blackhole) {
byte[] array = mappedRead2ArrayThreadLocal.get();
if (array == null) {
array = new byte[SIZE];
mappedRead2ArrayThreadLocal.set(array);
}
blackhole.consume(mappedByteBuffer.slice().get(array, 0, SIZE));
}
@Benchmark
public void mappedWriteFromArray(Blackhole blackhole) {
blackhole.consume(mappedByteBuffer.slice().put(srcByteArray, 0, SIZE));
}
@Benchmark
public void mappedWriteFromHeapBuffer(Blackhole blackhole) {
blackhole.consume(mappedByteBuffer.slice().put(srcHeapBuffer.slice()));
}
@Benchmark
public void mappedWriteFromDirectBuffer(Blackhole blackhole) {
blackhole.consume(mappedByteBuffer.slice().put(srcDirectBuffer.slice()));
}
@Benchmark
public void mappedWriteFromDirectBufferForce(Blackhole blackhole) {
blackhole.consume(mappedByteBuffer.slice().put(srcDirectBuffer.slice()));
mappedByteBuffer.force();
}
}
進(jìn)階部分
java工程中通過JNA調(diào)用libc標(biāo)準(zhǔn)庫函數(shù)
對(duì)JNA不做展開介紹,讀者可以自行查閱學(xué)習(xí):https://github.com/java-native-access/jna/blob/master/www/GettingStarted.md
import com.sun.jna.Library;
import com.sun.jna.Native;
import com.sun.jna.NativeLong;
import com.sun.jna.Platform;
import com.sun.jna.Pointer;
public interface LibC extends Library {
LibC INSTANCE = Native.load(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
// 是不是很熟悉捕犬,大家學(xué)的第一個(gè)函數(shù)應(yīng)該就是這個(gè)吧:)
void printf(String format, Object... args);
// 本文的核心跷坝,后面詳細(xì)介紹
int mlock(Pointer var1, NativeLong var2);
}
通過mlock將鎖定物理內(nèi)存,避免內(nèi)存被swap碉碉,保持物理內(nèi)存常駐
mlock不做詳細(xì)展開探孝,讀者可以自行查閱,推薦文章如下
http://www.daileinote.com/computer/linux_sys/32
https://www.cnblogs.com/linhaostudy/p/15972330.html
- 其實(shí)mmap方法就可以直接設(shè)置flag為MAP_LOCKED將內(nèi)存鎖住誉裆,但是jdk沒有提供該能力,所以我們需要才需要直接調(diào)用mlock鎖住內(nèi)存
- 用戶可以鎖住的內(nèi)存大小受操作系統(tǒng)限制缸濒,可以
ulimit -l
查看足丢,mac和linux一般應(yīng)該都是無限的unlimited - sysctl_max_map_count 規(guī)定了進(jìn)程虛擬內(nèi)存空間所能包含VmArea的最大個(gè)數(shù),可以通過 /proc/sys/vm/max_map_count 內(nèi)核參數(shù)來調(diào)整 sysctl_max_map_count庇配,一次mmap對(duì)應(yīng)產(chǎn)生一個(gè)VmArea
- 對(duì)mmap的內(nèi)存執(zhí)行mlock后就已經(jīng)觸發(fā)了缺頁將所有物理內(nèi)存填充好了斩跌,也就不需要madvise,也不需要再調(diào)用MappedByteBuffer的load方法了捞慌,測(cè)試代碼如下耀鸦,最后byteBuffer.isLoaded()會(huì)顯示true
public static void main(String[] args) throws IOException, InterruptedException {
RandomAccessFile r = new RandomAccessFile(TEST_PATH + "test1", "rw");
FileChannel fileChannel = r.getChannel();
MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
long address = PlatformDependent.directBufferAddress(byteBuffer);
Pointer p = new Pointer(address);
System.out.println(LibC.INSTANCE.mlock(p, new NativeLong(1024 * 1024 * 1024)));
System.out.println(byteBuffer.isLoaded());
}
- mlock之后無需調(diào)用munlock,會(huì)隨著MappedByteBuffer的釋放自動(dòng)munlock(也符合文檔說明啸澡,munmap自動(dòng)munlock)袖订,測(cè)試代碼如下
該測(cè)試程序可以永遠(yuǎn)執(zhí)行下去,mlock的返回值也一直會(huì)是0(系統(tǒng)調(diào)用返回值0代表正常)
讀者可以試試只調(diào)用mlock嗅虏,但是不釋放MappedByteBuffer的情況洛姑,小心電腦死機(jī) : )
public static void main(String[] args) throws IOException, InterruptedException {
try {
while (true) {
for (int i = 1; i < 31; i++) {
RandomAccessFile f = new RandomAccessFile(TEST_PATH + "test" + i, "rw");
FileChannel channel = f.getChannel();
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
long add = PlatformDependent.directBufferAddress(buffer);
Pointer p = new Pointer(add);
System.out.println(LibC.INSTANCE.mlock(p, new NativeLong(1024 * 1024 * 1024)));
channel.close();
PlatformDependent.freeDirectBuffer(buffer);
}
}
} catch (Exception e) {
}
- windows環(huán)境沒有mlock,需要調(diào)用kernel32.dll中的VirtualLock系統(tǒng)調(diào)用皮服,按照J(rèn)NA文檔需要實(shí)現(xiàn)JNA的StdCallLibrary接口
https://github.com/java-native-access/jna/blob/master/www/GettingStarted.md
- windows環(huán)境下要更復(fù)雜一點(diǎn)楞艾,VirtualLock受大小和個(gè)數(shù)影響参咙。在elasticsearch中有使用到,可以參考源碼學(xué)習(xí)硫眯,本文不做展開(感覺在windows上搞這個(gè)優(yōu)化沒啥意義)
獲取程序運(yùn)行機(jī)器的操作系統(tǒng)頁大小
- 參考netty中PlatformDependent0中執(zhí)行Bits類中unaligned方法的過程
private int pageSize = AccessController.doPrivileged((PrivilegedAction<Integer>) () -> {
try {
Class<?> bitsClass = Class.forName("java.nio.Bits", false, PlatformDependent.getSystemClassLoader());
Method pageSizeMethod = bitsClass.getDeclaredMethod("pageSize");
pageSizeMethod.setAccessible(true);
return (Integer) pageSizeMethod.invoke(null);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
總結(jié)
- 在java領(lǐng)域?qū)崿F(xiàn)高性能流式存儲(chǔ)現(xiàn)在看就比較清晰了蕴侧,固定文件大小,通過MappedByteBuffer配合系統(tǒng)調(diào)用mlock两入,將最新的文件mlock到物理內(nèi)存中净宵,保證新數(shù)據(jù)的實(shí)時(shí)讀取
- 根據(jù)調(diào)用請(qǐng)求分析將部分熱點(diǎn)老文件也mlock住,并做動(dòng)態(tài)調(diào)整谆刨,避免大量冷讀造成讀取性能下降
- RocketMQ中文件預(yù)熱部分有mlock相關(guān)使用塘娶,但是為了兼容使用預(yù)熱和不使用預(yù)熱兩種情況,在預(yù)熱部分有些冗余調(diào)用痊夭,可以理解刁岸。預(yù)熱過程既然調(diào)用了mlock就無需對(duì)每個(gè)頁寫一個(gè)字節(jié)填充物理頁了,也無需調(diào)用madvise