做一個(gè)高性能的java流式存儲(chǔ)項(xiàng)目你需要知道的一些事兒

1蠕趁、目前能夠在網(wǎng)上搜到的java相關(guān)的高性能文件io文章都比較基礎(chǔ)哄啄,想深入的話需要既了解java的文件操作api原理,又了解文件操作相關(guān)的系統(tǒng)調(diào)用笙蒙,這就造成了學(xué)習(xí)困難
2、實(shí)際上elasticsearch庆锦、kafka捅位、rocketmq里關(guān)于文件操作的java實(shí)現(xiàn)已經(jīng)很好,本文也參考了其中不少代碼實(shí)現(xiàn)
3搂抒、本文的每個(gè)結(jié)論均提供測(cè)試代碼驗(yàn)證艇搀,方便讀者在自己的機(jī)器上驗(yàn)證
4、本文貼出的jdk native源碼版本是openjdk的jdk8-b120
5求晶、本文需要讀者對(duì)java文件操作焰雕、虛擬內(nèi)存、物理內(nèi)存誉帅、pagecache有一定基礎(chǔ)了解
6淀散、本文的測(cè)試代碼需要在TEST_PATH目錄下提前準(zhǔn)備test1-test30文件(根據(jù)自己測(cè)試環(huán)境調(diào)整),可以通過fallocate -l 1G test1快速創(chuàng)建30個(gè)1g大小的測(cè)試文件
7蚜锨、測(cè)試代碼依賴如下档插,引入netty只是為了使用其中的PlatformDependent工具類,引入JNA是為了執(zhí)行系統(tǒng)調(diào)用

        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-core</artifactId>
            <version>1.37</version>
        </dependency>
        <dependency>
            <groupId>org.openjdk.jmh</groupId>
            <artifactId>jmh-generator-annprocess</artifactId>
            <version>1.37</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.101.Final</version>
        </dependency>
        <dependency>
            <groupId>net.java.dev.jna</groupId>
            <artifactId>jna</artifactId>
            <version>5.13.0</version>
        </dependency>

基礎(chǔ)部分

FileChannel亚再、MappedByteBuffer的初始化和釋放

  • FileChannel的map函數(shù)是對(duì)系統(tǒng)調(diào)用mmap的閹割式封裝郭膛,僅提供三種mode,READ_ONLY/READ_WRITE/PRIVATE對(duì)應(yīng)mmap的prot氛悬、flags的部分組合则剃,下面會(huì)貼出jdk源碼,mmap系統(tǒng)調(diào)用原型:void *mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset);
  • 直接使用netty的工具類釋放MappedByteBuffer如捅,原理不展開了棍现,讀者可以額外查閱資料學(xué)習(xí),也可以看我之前的文章http://www.reibang.com/p/4f026fe063aa
  • 不論是何種方式獲取的FileChannel镜遣,僅需要關(guān)閉FileChannel本身己肮。此處無需關(guān)閉RandomAccessFile
  • 需要注意關(guān)閉FileChannel是不會(huì)釋放MappedByteBuffer的,也就是說僅關(guān)閉FileChannel后MappedByteBuffer仍然可以繼續(xù)使用
    FileChannel channel = new RandomAccessFile(TEST_PATH + "test1", "rw").getChannel();
    MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
    PlatformDependent.freeDirectBuffer(mappedByteBuffer);
    fileChannel.close();
  • jdk map native源碼部分核心如下,可以看到三種mode只對(duì)應(yīng)mmap中prot和flags的三種組合谎僻,這也是我為什么說map方法是對(duì)mmap系統(tǒng)調(diào)用的閹割封裝娄柳,mmap還有很多實(shí)用的flag我們?cè)趈ava中沒法直接使用

完整源碼:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/sun/nio/ch/FileChannelImpl.c

    Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this, jint prot, jlong off, jlong len)
    {
        if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
            protections = PROT_READ;
            flags = MAP_SHARED;
        } else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
            protections = PROT_WRITE | PROT_READ;
            flags = MAP_SHARED;
        } else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
            protections =  PROT_WRITE | PROT_READ;
            flags = MAP_PRIVATE;
        }
        mapAddress = mmap64(
                0,                    /* Let OS decide location */
                len,                  /* Number of bytes to map */
                protections,          /* File permissions */
                flags,                /* Changes are shared */
                fd,                   /* File descriptor of mapped file */
                off);                 /* Offset into file */
    }
  • FileChannel close最終調(diào)用FileChannelImpl的implCloseChannel方法,源碼如下

可以看到會(huì)對(duì)parent執(zhí)行close艘绍,所以對(duì)FileChannel執(zhí)行close就會(huì)對(duì)RandomAccessFile執(zhí)行close赤拒。其實(shí)對(duì)RandomAccessFile執(zhí)行close也會(huì)對(duì)FileChannel執(zhí)行close,所以這倆關(guān)閉一個(gè)就行诱鞠。一般我們只把RandomAccessFile當(dāng)做獲取FileChannel的工具人挎挖,不會(huì)保留它的引用,所以最終只會(huì)執(zhí)行FileChannel的close

    protected void implCloseChannel() throws IOException {
        if (this.fileLockTable != null) {
            Iterator var1 = this.fileLockTable.removeAll().iterator();
            while(var1.hasNext()) {
                FileLock var2 = (FileLock)var1.next();
                synchronized(var2) {
                    if (var2.isValid()) {
                        this.nd.release(this.fd, var2.position(), var2.size());
                        ((FileLockImpl)var2).invalidate();
                    }
                }
            }
        }
        this.threads.signalAndWait();
        if (this.parent != null) {
            ((Closeable)this.parent).close();
        } else {
            this.nd.close(this.fd);
        }
    }

FileChannel write和DirectBuffer的關(guān)系

  • 源Buffer是DirectBuffer則直接寫入航夺,是HeapBuffer則在cache中拿一塊DirectBuffer肋乍,把數(shù)據(jù)拷貝進(jìn)去再寫入
  • 這個(gè)cache由jdk維護(hù),可以看到里面的DirectBuffer沒有主動(dòng)釋放邏輯敷存,只能隨著gc釋放,因此有oom隱患
  • 所以最佳實(shí)踐是用戶自己構(gòu)建DirectBuffer寫入堪伍,并自己控制該DirectBuffer的釋放锚烦,避免數(shù)據(jù)拷貝,也避免堆外內(nèi)存的OOM
  • write方法最后使用sun.nio.ch.IOUtil的write方法帝雇,核心源碼如下
    static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1 instanceof DirectBuffer) {
            return writeFromNativeBuffer(var0, var1, var2, var4);
        } else {
            int var5 = var1.position();
            int var6 = var1.limit();
            int var7 = var5 <= var6 ? var6 - var5 : 0;
            ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);
            int var10;
            try {
                var8.put(var1);
                var8.flip();
                var1.position(var5);
                int var9 = writeFromNativeBuffer(var0, var8, var2, var4);
                if (var9 > 0) {
                    var1.position(var5 + var9);
                }
                var10 = var9;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(var8);
            }
            return var10;
        }
    }

FileChannel read和DirectBuffer的關(guān)系

  • 和write同理涮俄,直接使用DirectBuffer最好
  • 最終調(diào)用sun.nio.ch.IOUtil的read方法,核心源碼如下
    static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
        if (var1.isReadOnly()) {
            throw new IllegalArgumentException("Read-only buffer");
        } else if (var1 instanceof DirectBuffer) {
            return readIntoNativeBuffer(var0, var1, var2, var4);
        } else {
            ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());
            int var7;
            try {
                int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
                var5.flip();
                if (var6 > 0) {
                    var1.put(var5);
                }
                var7 = var6;
            } finally {
                Util.offerFirstTemporaryDirectBuffer(var5);
            }
            return var7;
        }
    }

FileChannel force參數(shù)true/false的區(qū)別

  • 對(duì)文件的寫入其實(shí)都不會(huì)直接寫入磁盤(directIO除外)尸闸,只會(huì)寫到buffer中彻亲,由操作系統(tǒng)統(tǒng)一調(diào)度寫入磁盤,所以如果需要實(shí)時(shí)存儲(chǔ)就需要主動(dòng)調(diào)用force方法直接寫入磁盤
  • jdk文檔表示對(duì)FileChannel執(zhí)行force不會(huì)保證對(duì)該FileChannel通過map方法獲得的MappedByteBuffer進(jìn)行刷盤吮廉,如果對(duì)MappedByteBuffer有修改需要刷盤苞尝,需要調(diào)用MappedByteBuffer自己的force方法
  • force方法有參數(shù)true/false,jdk文檔說的不夠具體宦芦,只說true的時(shí)候會(huì)額外寫入metadata宙址。實(shí)際上true/false對(duì)應(yīng)的系統(tǒng)調(diào)用分別是fdatasync/fsync(兩個(gè)系統(tǒng)調(diào)用的詳細(xì)區(qū)別本文不展開),核心源碼如下
  • fsync會(huì)比fdatasync多一次尋址调卑,將文件大小抡砂、修改時(shí)間等metadata寫入磁盤,性能會(huì)比fdatasync差一點(diǎn)恬涧,在文件大小固定的情況下可以僅調(diào)用fdatasync注益,文件大小不固定的情況下調(diào)用fdatasync沒來的及更新文件大小可能會(huì)造成丟數(shù)據(jù)。其實(shí)由下面的壓測(cè)結(jié)果可知這兩個(gè)系統(tǒng)調(diào)用性能差距也不是很大(這里可能是固態(tài)硬盤和機(jī)械硬盤的區(qū)別溯捆?機(jī)械硬盤可能差距更大)

完整源碼請(qǐng)看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/sun/nio/ch/FileDispatcherImpl.c

    Java_sun_nio_ch_FileDispatcherImpl_force0(JNIEnv *env, jobject this, jobject fdo, jboolean md)
    {
        if (md == JNI_FALSE) {
            result = fdatasync(fd);
        } else {
            result = fsync(fd);
        }
    }

FileChannel的transferTo方法介紹

對(duì)該方法的解析可以看我之前的文章:http://www.reibang.com/p/11ed05ca62ff

  • transferTo可以利用sendFile系統(tǒng)調(diào)用做到真正的零拷貝傳輸數(shù)據(jù)丑搔,但是僅限于文件到文件,文件到socket兩條路
  • 在文件下載場景可以使用該方法,將文件直接transferTo到socket中低匙,我們自己的程序中是無法感知到文件內(nèi)容的旷痕。最后通過文件hash值判斷文件是否正確完整下載
  • 在非文件下載場景基本無法使用transferTo,因?yàn)槲覀兊某绦蜻壿嬂锎蠖嘈枰獜拇疟P文件中讀取到數(shù)據(jù)顽冶,然后做自己的業(yè)務(wù)處理欺抗,再將數(shù)據(jù)寫入socket
  • 如果硬用transferTo做文件讀取(文件到socket),考慮到下面的性能測(cè)試强重,我認(rèn)為整體也不會(huì)比用MappedByteBuffer性能好

MappedByteBuffer load方法

  • 先執(zhí)行l(wèi)oad0 native方法绞呈,然后根據(jù)頁大小,對(duì)每個(gè)頁讀取了一下间景,最后通過一個(gè)累加的x避免編譯器認(rèn)為這段代碼是dead code優(yōu)化掉
  • 可以看到load方法的目標(biāo)就是主動(dòng)觸發(fā)缺頁佃声,從而將文件內(nèi)容真正填充進(jìn)物理內(nèi)存中
  • MappedByteBuffer一開始僅僅是虛擬內(nèi)存,不會(huì)分配真正的物理內(nèi)存倘要,用到的時(shí)候才會(huì)分配
    public final MappedByteBuffer load() {
        load0(mappingAddress(offset), length);
        // Read a byte from each page to bring it into memory. A checksum
        // is computed as we go along to prevent the compiler from otherwise
        // considering the loop as dead code.
        Unsafe unsafe = Unsafe.getUnsafe();
        int ps = Bits.pageSize();
        int count = Bits.pageCount(length);
        long a = mappingAddress(offset);
        byte x = 0;
        for (int i=0; i<count; i++) {
            x ^= unsafe.getByte(a);
            a += ps;
        }
        if (unused != 0)
            unused = x;
        return this;
    }
  • load0核心源碼如下圾亏,主要就是執(zhí)行系統(tǒng)調(diào)用madvise并且傳入?yún)?shù)MADV_WILLNEED,本文對(duì)madvise不做展開封拧,讀者可以自行查閱

可以看出load0的作用是告訴操作系統(tǒng)這段內(nèi)存接下來willneed志鹃,操作系統(tǒng)可以對(duì)這段內(nèi)存做些優(yōu)化處理,比如預(yù)讀和提前加載之類的泽西,可以加快后續(xù)對(duì)每個(gè)頁都觸發(fā)填充的效率
完整源碼請(qǐng)看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/java/nio/MappedByteBuffer.c

    Java_java_nio_MappedByteBuffer_load0(JNIEnv *env, jobject obj, jlong address, jlong len)
    {
        int result = madvise((caddr_t)a, (size_t)len, MADV_WILLNEED);
    }

MappedByteBuffer isLoad方法

  • 主要就是直接調(diào)用isLoaded0 native方法
    public final boolean isLoaded() {
        return isLoaded0(mappingAddress(offset), length, Bits.pageCount(length));
    }
  • isLoaded0核心源碼如下曹铃,通過系統(tǒng)調(diào)用mincore檢查每個(gè)頁是否都在物理內(nèi)存中,mincore不做展開捧杉,讀者可以自行查閱

完整源碼請(qǐng)看:https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/solaris/native/java/nio/MappedByteBuffer.c

    Java_java_nio_MappedByteBuffer_isLoaded0(JNIEnv *env, jobject obj, jlong address, jlong len, jint numPages)
    {
        jboolean loaded = JNI_TRUE;
        unsigned char *vec = (unsigned char *)malloc(numPages * sizeof(char));
        mincore(address, (size_t)len, vec);
        for (i=0; i<numPages; i++) {
            if (vec[i] == 0) {
                loaded = JNI_FALSE;
                break;
            }
        }
        return loaded;
    }

調(diào)用了MappedByteBuffer的load方法后陕见,isLoad不一定一直為true

當(dāng)pagecache不夠用了的時(shí)候,操作系統(tǒng)會(huì)將cache按照規(guī)則釋放或者放入swap區(qū)味抖,測(cè)試代碼如下评甜,如果測(cè)試電腦的可用內(nèi)存小于30g,基本上最后會(huì)顯示false

    public static void main(String[] args) throws IOException, InterruptedException {
        RandomAccessFile r = new RandomAccessFile(TEST_PATH + "/test1", "rw");
        FileChannel fileChannel = r.getChannel();
        MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
        byteBuffer.load();
        new Thread(() -> {
            try {
                for (int i = 2; i < 31; i++) {
                    RandomAccessFile f = new RandomAccessFile(TEST_PATH + "test" + i, "rw");
                    FileChannel channel = f.getChannel();
                    MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
                    buffer.load();
                }
                TimeUnit.DAYS.sleep(1);
            } catch (Exception e) {
            }
        }).start();
        TimeUnit.SECONDS.sleep(20);
        System.out.println(byteBuffer.isLoaded());
    }

FileChannel和MappedByteBuffer讀寫測(cè)試

  • 大部分研究java文件io的應(yīng)該都看過這篇文章仔涩,或者各種轉(zhuǎn)載抄襲的復(fù)制品蜕着,但是并不知道他是如何測(cè)試的,因此對(duì)這篇文章的結(jié)論不可全信

http://thinkinjava.cn/2019/05/12/2019/05-12-java-nio/

  • 使用JMH進(jìn)行基準(zhǔn)測(cè)試红柱,對(duì)JMH不做展開承匣,讀者可以自行查閱,推薦資料

https://developer.aliyun.com/article/899469
https://dunwu.github.io/java-tutorial/pages/747d3e/#warmup

  • 本測(cè)試力求單純測(cè)試讀寫锤悄,盡量避免其他的開銷
  • 本測(cè)試僅讀寫文件的第一個(gè)頁韧骗,避免pagecache的不確定性影響,如何獲得當(dāng)前頁大小在下面的進(jìn)階部分有說明
  • 本測(cè)試使用的文件均固定大小1G零聚,提前創(chuàng)建袍暴,參照本文最開始的說明(在文章最上面)
  • 測(cè)試讀取時(shí)使用的目標(biāo)ByteBuffer和數(shù)組均通過Threadlocal保存復(fù)用些侍,避免每次創(chuàng)建ByteBuffer和數(shù)組的開銷干擾
  • 測(cè)試寫入時(shí)使用的源ByteBuffer和數(shù)組提前創(chuàng)建并共用,避免每次寫入都創(chuàng)建的干擾開銷
  • 測(cè)試包括如下幾項(xiàng)

從FileChannel讀到HeapByteBuffer
從FileChannel讀到DirectByteBuffer
從HeapByteBuffer寫入FileChannel
從DirectByteBuffer寫入FileChannel
從DirectByteBuffer寫入FileChannel并force(false)
從DirectByteBuffer寫入FileChannel并force(true)
從MappedByteBuffer讀到數(shù)組
從數(shù)組寫入MappedByteBuffer
從HeapByteBuffer寫入MappedByteBuffer
從DirectByteBuffer寫入MappedByteBuffer
從DirectByteBuffer寫入MappedByteBuffer并force

  • Linux version 3.10.0-327.ali2017.alios7.x86_64 16c32g的機(jī)器測(cè)試結(jié)果如下政模,可以說在文件大小固定的前提下MappedByteBuffer全面完勝FileChannel

FileChannel讀到DirectBuffer比讀到HeapBuffer性能好岗宣,如上文所述,符合預(yù)期
FileChannel寫入DirectBuffer比HeapBuffer淋样,如上文所述耗式,符合預(yù)期
FileChannel force(false)比force(true)好一些
MappedByteBuffer讀到數(shù)組中有壓倒性讀取優(yōu)勢(shì),比FileChannel高兩個(gè)數(shù)量級(jí)
MappedByteBuffer寫入數(shù)組數(shù)據(jù)性能好一些趁猴,寫入DirectBuffer和HeapBuffer差不多
MappedByteBuffer比FileChannel的寫入性能刊咳、force性能都好
mac上測(cè)試結(jié)果差不多,但是MappedByteBuffer從數(shù)組儡司、HeapBuffer娱挨、DirectBuffer寫入差距不像linux上明顯

Benchmark                                               Mode  Cnt      Score   Error   Units
MyBenchmark.fileChannelRead2DirectBuffer               thrpt         968.349          ops/ms
MyBenchmark.fileChannelRead2HeapBuffer                 thrpt         669.176          ops/ms
MyBenchmark.fileChannelWriteFromDirectBuffer           thrpt         286.462          ops/ms
MyBenchmark.fileChannelWriteFromDirectBufferForce      thrpt           3.992          ops/ms
MyBenchmark.fileChannelWriteFromDirectBufferForceMeta  thrpt           3.169          ops/ms
MyBenchmark.fileChannelWriteFromHeapBuffer             thrpt         258.994          ops/ms
MyBenchmark.mappedRead2Array                           thrpt       67872.596          ops/ms
MyBenchmark.mappedWriteFromArray                       thrpt        1675.428          ops/ms
MyBenchmark.mappedWriteFromDirectBuffer                thrpt        1585.909          ops/ms
MyBenchmark.mappedWriteFromDirectBufferForce           thrpt           6.651          ops/ms
MyBenchmark.mappedWriteFromHeapBuffer                  thrpt        1559.150          ops/ms
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(1)
@Warmup(iterations = 3, time = 10)
@Measurement(iterations = 1, time = 10)
@Threads(16)
@State(Scope.Benchmark)
public class MyBenchmark {
    private static final int SIZE = pageSize;
    private FileChannel fileChannel;
    private MappedByteBuffer mappedByteBuffer;
    private final ThreadLocal<ByteBuffer> fileChannelRead2HeapBufferThreadLocal = new ThreadLocal<>();
    private final ThreadLocal<ByteBuffer> fileChannelRead2DirectBufferThreadLocal = new ThreadLocal<>();
    private final ThreadLocal<byte[]> mappedRead2ArrayThreadLocal = new ThreadLocal<>();
    private final byte[] srcByteArray = new byte[SIZE];
    private final ByteBuffer srcHeapBuffer = ByteBuffer.allocate(SIZE);
    private final ByteBuffer srcDirectBuffer = ByteBuffer.allocateDirect(SIZE);

    @Setup
    public void setup() throws IOException {
        for (int i = 0; i < SIZE; i++) {
            srcByteArray[i] = 9;
        }
        srcHeapBuffer.put(srcByteArray, 0, SIZE);
        srcHeapBuffer.flip();
        srcDirectBuffer.put(srcByteArray, 0, SIZE);
        srcDirectBuffer.flip();
        RandomAccessFile r = new RandomAccessFile(TEST_PATH + "test1", "rw");
        fileChannel = r.getChannel();
        mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
    }

    @TearDown
    public void tearDown() throws IOException {
        PlatformDependent.freeDirectBuffer(mappedByteBuffer);
        fileChannel.close();
    }

    @Benchmark
    public void fileChannelRead2HeapBuffer(Blackhole blackhole) throws IOException {
        ByteBuffer byteBuffer = fileChannelRead2HeapBufferThreadLocal.get();
        if (byteBuffer == null) {
            byteBuffer = ByteBuffer.allocate(SIZE);
            fileChannelRead2HeapBufferThreadLocal.set(byteBuffer);
        }
        blackhole.consume(fileChannel.read(byteBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelRead2DirectBuffer(Blackhole blackhole) throws IOException {
        ByteBuffer byteBuffer = fileChannelRead2DirectBufferThreadLocal.get();
        if (byteBuffer == null) {
            byteBuffer = ByteBuffer.allocateDirect(SIZE);
            fileChannelRead2DirectBufferThreadLocal.set(byteBuffer);
        }
        blackhole.consume(fileChannel.read(byteBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelWriteFromHeapBuffer(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcHeapBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelWriteFromDirectBuffer(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
    }

    @Benchmark
    public void fileChannelWriteFromDirectBufferForce(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
        fileChannel.force(false);
    }

    @Benchmark
    public void fileChannelWriteFromDirectBufferForceMeta(Blackhole blackhole) throws IOException {
        blackhole.consume(fileChannel.write(srcDirectBuffer.slice(), 0L));
        fileChannel.force(true);
    }

    @Benchmark
    public void mappedRead2Array(Blackhole blackhole) {
        byte[] array = mappedRead2ArrayThreadLocal.get();
        if (array == null) {
            array = new byte[SIZE];
            mappedRead2ArrayThreadLocal.set(array);
        }
        blackhole.consume(mappedByteBuffer.slice().get(array, 0, SIZE));
    }

    @Benchmark
    public void mappedWriteFromArray(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcByteArray, 0, SIZE));
    }

    @Benchmark
    public void mappedWriteFromHeapBuffer(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcHeapBuffer.slice()));
    }

    @Benchmark
    public void mappedWriteFromDirectBuffer(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcDirectBuffer.slice()));
    }

    @Benchmark
    public void mappedWriteFromDirectBufferForce(Blackhole blackhole) {
        blackhole.consume(mappedByteBuffer.slice().put(srcDirectBuffer.slice()));
        mappedByteBuffer.force();
    }
}

進(jìn)階部分

java工程中通過JNA調(diào)用libc標(biāo)準(zhǔn)庫函數(shù)

對(duì)JNA不做展開介紹,讀者可以自行查閱學(xué)習(xí):https://github.com/java-native-access/jna/blob/master/www/GettingStarted.md

import com.sun.jna.Library;
import com.sun.jna.Native;
import com.sun.jna.NativeLong;
import com.sun.jna.Platform;
import com.sun.jna.Pointer;

public interface LibC extends Library {
    LibC INSTANCE = Native.load(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
    // 是不是很熟悉捕犬,大家學(xué)的第一個(gè)函數(shù)應(yīng)該就是這個(gè)吧:)
    void printf(String format, Object... args);
    // 本文的核心跷坝,后面詳細(xì)介紹
    int mlock(Pointer var1, NativeLong var2);
}

通過mlock將鎖定物理內(nèi)存,避免內(nèi)存被swap碉碉,保持物理內(nèi)存常駐

mlock不做詳細(xì)展開探孝,讀者可以自行查閱,推薦文章如下
http://www.daileinote.com/computer/linux_sys/32
https://www.cnblogs.com/linhaostudy/p/15972330.html

  • 其實(shí)mmap方法就可以直接設(shè)置flag為MAP_LOCKED將內(nèi)存鎖住誉裆,但是jdk沒有提供該能力,所以我們需要才需要直接調(diào)用mlock鎖住內(nèi)存
  • 用戶可以鎖住的內(nèi)存大小受操作系統(tǒng)限制缸濒,可以ulimit -l查看足丢,mac和linux一般應(yīng)該都是無限的unlimited
  • sysctl_max_map_count 規(guī)定了進(jìn)程虛擬內(nèi)存空間所能包含VmArea的最大個(gè)數(shù),可以通過 /proc/sys/vm/max_map_count 內(nèi)核參數(shù)來調(diào)整 sysctl_max_map_count庇配,一次mmap對(duì)應(yīng)產(chǎn)生一個(gè)VmArea
  • 對(duì)mmap的內(nèi)存執(zhí)行mlock后就已經(jīng)觸發(fā)了缺頁將所有物理內(nèi)存填充好了斩跌,也就不需要madvise,也不需要再調(diào)用MappedByteBuffer的load方法了捞慌,測(cè)試代碼如下耀鸦,最后byteBuffer.isLoaded()會(huì)顯示true
    public static void main(String[] args) throws IOException, InterruptedException {
        RandomAccessFile r = new RandomAccessFile(TEST_PATH + "test1", "rw");
        FileChannel fileChannel = r.getChannel();
        MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
        long address = PlatformDependent.directBufferAddress(byteBuffer);
        Pointer p = new Pointer(address);
        System.out.println(LibC.INSTANCE.mlock(p, new NativeLong(1024 * 1024 * 1024)));
        System.out.println(byteBuffer.isLoaded());
}
  • mlock之后無需調(diào)用munlock,會(huì)隨著MappedByteBuffer的釋放自動(dòng)munlock(也符合文檔說明啸澡,munmap自動(dòng)munlock)袖订,測(cè)試代碼如下

該測(cè)試程序可以永遠(yuǎn)執(zhí)行下去,mlock的返回值也一直會(huì)是0(系統(tǒng)調(diào)用返回值0代表正常)
讀者可以試試只調(diào)用mlock嗅虏,但是不釋放MappedByteBuffer的情況洛姑,小心電腦死機(jī) : )

    public static void main(String[] args) throws IOException, InterruptedException {
        try {
            while (true) {
                for (int i = 1; i < 31; i++) {
                    RandomAccessFile f = new RandomAccessFile(TEST_PATH + "test" + i, "rw");
                    FileChannel channel = f.getChannel();
                    MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 1024);
                    long add = PlatformDependent.directBufferAddress(buffer);
                    Pointer p = new Pointer(add);
                    System.out.println(LibC.INSTANCE.mlock(p, new NativeLong(1024 * 1024 * 1024)));
                    channel.close();
                    PlatformDependent.freeDirectBuffer(buffer);
                }
            }
        } catch (Exception e) {
        }
  • windows環(huán)境沒有mlock,需要調(diào)用kernel32.dll中的VirtualLock系統(tǒng)調(diào)用皮服,按照J(rèn)NA文檔需要實(shí)現(xiàn)JNA的StdCallLibrary接口

https://github.com/java-native-access/jna/blob/master/www/GettingStarted.md

  • windows環(huán)境下要更復(fù)雜一點(diǎn)楞艾,VirtualLock受大小和個(gè)數(shù)影響参咙。在elasticsearch中有使用到,可以參考源碼學(xué)習(xí)硫眯,本文不做展開(感覺在windows上搞這個(gè)優(yōu)化沒啥意義)

https://github.com/elastic/elasticsearch/blob/951640b73f71909013f57645cd30e1f19d8c2323/server/src/main/java/org/elasticsearch/bootstrap/JNAKernel32Library.java#L30

獲取程序運(yùn)行機(jī)器的操作系統(tǒng)頁大小

  • 參考netty中PlatformDependent0中執(zhí)行Bits類中unaligned方法的過程
    private int pageSize = AccessController.doPrivileged((PrivilegedAction<Integer>) () -> {
        try {
            Class<?> bitsClass = Class.forName("java.nio.Bits", false, PlatformDependent.getSystemClassLoader());
            Method pageSizeMethod = bitsClass.getDeclaredMethod("pageSize");
            pageSizeMethod.setAccessible(true);
            return (Integer) pageSizeMethod.invoke(null);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    });

總結(jié)

  • 在java領(lǐng)域?qū)崿F(xiàn)高性能流式存儲(chǔ)現(xiàn)在看就比較清晰了蕴侧,固定文件大小,通過MappedByteBuffer配合系統(tǒng)調(diào)用mlock两入,將最新的文件mlock到物理內(nèi)存中净宵,保證新數(shù)據(jù)的實(shí)時(shí)讀取
  • 根據(jù)調(diào)用請(qǐng)求分析將部分熱點(diǎn)老文件也mlock住,并做動(dòng)態(tài)調(diào)整谆刨,避免大量冷讀造成讀取性能下降
  • RocketMQ中文件預(yù)熱部分有mlock相關(guān)使用塘娶,但是為了兼容使用預(yù)熱和不使用預(yù)熱兩種情況,在預(yù)熱部分有些冗余調(diào)用痊夭,可以理解刁岸。預(yù)熱過程既然調(diào)用了mlock就無需對(duì)每個(gè)頁寫一個(gè)字節(jié)填充物理頁了,也無需調(diào)用madvise
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末她我,一起剝皮案震驚了整個(gè)濱河市虹曙,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌番舆,老刑警劉巖酝碳,帶你破解...
    沈念sama閱讀 222,104評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異恨狈,居然都是意外死亡疏哗,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門禾怠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來返奉,“玉大人,你說我怎么就攤上這事吗氏⊙科” “怎么了?”我有些...
    開封第一講書人閱讀 168,697評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵弦讽,是天一觀的道長污尉。 經(jīng)常有香客問我,道長往产,這世上最難降的妖魔是什么被碗? 我笑而不...
    開封第一講書人閱讀 59,836評(píng)論 1 298
  • 正文 為了忘掉前任,我火速辦了婚禮仿村,結(jié)果婚禮上蛮放,老公的妹妹穿的比我還像新娘。我一直安慰自己奠宜,他們只是感情好包颁,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,851評(píng)論 6 397
  • 文/花漫 我一把揭開白布瞻想。 她就那樣靜靜地躺著,像睡著了一般娩嚼。 火紅的嫁衣襯著肌膚如雪蘑险。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,441評(píng)論 1 310
  • 那天岳悟,我揣著相機(jī)與錄音佃迄,去河邊找鬼。 笑死贵少,一個(gè)胖子當(dāng)著我的面吹牛呵俏,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播滔灶,決...
    沈念sama閱讀 40,992評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼普碎,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼!你這毒婦竟也來了录平?” 一聲冷哼從身側(cè)響起麻车,我...
    開封第一講書人閱讀 39,899評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎斗这,沒想到半個(gè)月后动猬,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,457評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡表箭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,529評(píng)論 3 341
  • 正文 我和宋清朗相戀三年赁咙,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片免钻。...
    茶點(diǎn)故事閱讀 40,664評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡彼水,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出伯襟,到底是詐尸還是另有隱情,我是刑警寧澤握童,帶...
    沈念sama閱讀 36,346評(píng)論 5 350
  • 正文 年R本政府宣布姆怪,位于F島的核電站,受9級(jí)特大地震影響澡绩,放射性物質(zhì)發(fā)生泄漏稽揭。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,025評(píng)論 3 334
  • 文/蒙蒙 一肥卡、第九天 我趴在偏房一處隱蔽的房頂上張望溪掀。 院中可真熱鬧,春花似錦步鉴、人聲如沸揪胃。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽喊递。三九已至随闪,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間骚勘,已是汗流浹背铐伴。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評(píng)論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留俏讹,地道東北人当宴。 一個(gè)月前我還...
    沈念sama閱讀 49,081評(píng)論 3 377
  • 正文 我出身青樓,卻偏偏與公主長得像泽疆,于是被迫代替她去往敵國和親户矢。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,675評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容