Java Nio 系列
Java Nio 之Buffer
Java Nio 之直接內(nèi)存
Java Nio 之高級搬磚工(FileChannel) 一
Java Nio 之高級搬磚工(FileChannel)二
前言
前段時間同事分享了一篇文章給我:為什么Kafka速度這么快墩虹? 雳锋,這篇文章相信大家也都看了揩瞪。這篇文章說Kafka 有個作弊的技能 :直接從文件某個位置處讀取某個長度的字節(jié)直接發(fā)送給消費(fèi)者,不需要讀到應(yīng)用程序里然后緩存在ByteBuffer 然后再往 客戶端寫;當(dāng)時就對這項(xiàng)技術(shù)很著迷,上網(wǎng)搜了很多資料 很納悶它是怎么實(shí)現(xiàn)的震捣;上周在介紹FileChannel 的時候本來想只寫一篇文章的,后來看到了它的map 闹炉、transeferTo以及TranseferFrom 方法就覺得一篇文章寫不完蒿赢,因?yàn)槿唛L的文章誰都不想看,所以另寫一篇來研究一下 FileChannel 的高性能之處渣触,以及介紹下Kafka是怎么使用的羡棵。
談?wù)劻憧截?/h2>
牢騷一下
Kafka 的高性能的重要點(diǎn)之一就在零拷貝上。零拷貝不是真的零拷貝嗅钻,只不過是減少了拷貝的次數(shù)皂冰,為的不是減少DMA的拷貝次數(shù)店展,而是CPU 的拷貝次數(shù),為啥呢秃流?因?yàn)榭截愂莻€很簡單的操作赂蕴,占著CPU 的時間片簡直就是高射炮打蚊子。
傳統(tǒng) Linux 服務(wù)器 傳輸數(shù)據(jù) 的流程
- 1.應(yīng)用程序調(diào)用系統(tǒng)方法read(),切換上下文:用戶——內(nèi)核舶胀,操作系統(tǒng)會先檢查頁面緩存里是否有要read 的內(nèi)容概说,如果有則進(jìn)行第二步,如果沒有則需要讓DMA 從指定磁盤位置上拷貝數(shù)據(jù)到內(nèi)核緩沖區(qū)中峻贮,第一次拷貝由DMA 執(zhí)行
- 2.CPU 將數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶緩沖區(qū)席怪,read 調(diào)用返回应闯,切換上下文:內(nèi)核——用戶纤控,第二次拷貝由CPU 執(zhí)行
- 3.應(yīng)用程序調(diào)用系統(tǒng)write() 函數(shù) ,切換上下文:用戶——內(nèi)核碉纺,CPU 將用戶緩沖區(qū)的數(shù)據(jù)拷貝到socket 緩沖區(qū)船万,第三次拷貝由CPU執(zhí)行
- 4.write 調(diào)用返回,切換上下文:內(nèi)核——用戶骨田,然后DMA 異步將socket 緩沖區(qū)的數(shù)據(jù)拷貝到協(xié)議引擎中
總結(jié)一下耿导,需要4次拷貝,其中有兩次是需要CPU的執(zhí)行态贤,切換了4次上下文
零拷貝的兩種實(shí)現(xiàn)方式
mmap + write 方式
-
何為mmap 呢—— 將一個文件或者其它對象映射進(jìn)內(nèi)存舱呻。映射到的這塊內(nèi)存區(qū)域在用戶程序使用的內(nèi)存空間 和 棧之間不在內(nèi)核內(nèi)存空間, 因此內(nèi)核程序和用戶程序都可以訪問悠汽,如下草圖:
mmap
箱吕、munmap
和msync()
函數(shù)
void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
addr
映射區(qū)的開始位置length
映射區(qū)的長度-
prot
期望的內(nèi)存保護(hù)標(biāo)志,可由如下幾種方式組合:- PROT_EXEC 頁內(nèi)容可被執(zhí)行
- PROT_READ 頁內(nèi)容可被讀取
- PROT_WRITE 頁內(nèi)容可被寫入
- PROT_NONE 頁內(nèi)容不可以被訪問
-
flags
影響內(nèi)存區(qū)域的各種特性,可由以下幾種方式組合:- MAP_FIXED 使用指定的起始位置柿冲,若是addr和length 重疊于現(xiàn)存的映射空間則重疊部分會丟失茬高,不會對地址做出修正,不建議使用該選項(xiàng)假抄。
- MAP_SHARED 對該映射區(qū)域的更改會同步到文件里怎栽,而且允許其它映射該文件的進(jìn)程共享該映射區(qū)域
- MAP_PRIVATE 對映射區(qū)域的寫入操作會產(chǎn)生一個映射文件的復(fù)制,即私有的“寫入時復(fù)制”(copy on write)對此區(qū)域作的任何修改都不會寫回原來的文件內(nèi)容宿饱。
- MAP_NORESERVE 不要為這個映射保留交換空間熏瞄。當(dāng)交換空間被保留,對映射區(qū)修改的可能會得到保證谬以。當(dāng)交換空間不被保留强饮,同時內(nèi)存不足,對映射區(qū)的修改會引起段違例信號
- MAP_ANONYMOUS 匿名映射蛉签,不與任何文件關(guān)聯(lián)
fd
被映射對象胡陪,若為匿名映射則為-1offset
被映射對象的起始偏移
int munmap(void *addr, size_t length);
調(diào)用該函數(shù)可以解除 映射對象與addr 處開始的length 長度的內(nèi)存空間的映射關(guān)系
addr
mmap 函數(shù)返回的映射區(qū)域首地址
length
映射區(qū)域的長度
int msync ( void * addr , size_t len, int flags) ;
一般情況下 對映射空間的共享內(nèi)容更改不會直接寫到文件里沥寥,當(dāng)然執(zhí)行完 munmap 函數(shù)也可以,除了執(zhí)行它柠座,還可以執(zhí)行 msync 函數(shù)來將修改的共享內(nèi)容同步到文件
說說mmap + write 的 流程
- 用戶程序調(diào)用mmap函數(shù)邑雅,將 文件內(nèi)容映射到內(nèi)存映射區(qū)域。先由內(nèi)存空間切換到內(nèi)核空間妈经,然后由內(nèi)核空間切換到用戶空間淮野,完成兩次上下文切換,DMA 將文件內(nèi)容拷貝到內(nèi)存映射區(qū)域
- 調(diào)用write 函數(shù)吹泡,cpu將 內(nèi)存映射區(qū)域的內(nèi)容拷貝到 socket緩沖區(qū)骤星,程序調(diào)用返回然后DMA 異步從socket 緩沖區(qū)拷貝到協(xié)議引擎的緩沖區(qū)
發(fā)生 1次cpu 拷貝,2次DMA 拷貝
來看看Java 下的mmap
抽象類 MappedByteBuffer
定義
直接字節(jié)緩沖區(qū)爆哑,其內(nèi)容是文件的內(nèi)存映射區(qū)域洞难。可由FileChannel#map 方法創(chuàng)建揭朝。該類通過增加對內(nèi)存映射區(qū)域的特定操作擴(kuò)展了ByteFuffer 類队贱。映射字節(jié)緩沖區(qū)與它所映射的文件直到它自己被垃圾回收之前都是存在的。
tips
映射字節(jié)緩沖區(qū)的內(nèi)容任何時候都可以被修改潭袱,例如柱嫌,映射文件對應(yīng)的區(qū)域被當(dāng)前程序或者其他程序所更改。至于是否發(fā)生或者什么時候發(fā)生屯换,都由操作系統(tǒng)來決定编丘。
映射字節(jié)緩沖區(qū)的部分或者全部在任何時候都會變得不可訪問,例如彤悔,映射的文件被截斷了嘉抓。嘗試訪問不可訪問的映射字節(jié)的緩沖區(qū)的那一部分,將會有不友好的異常拋出蜗巧。需要強(qiáng)烈的提醒掌眠,避免讓當(dāng)前程序或者其他程序?qū)@個映射文件進(jìn)行操作,除了讀或者寫它的內(nèi)容幕屹。
方法
load()
該方法會盡最大可能將映射文件里的內(nèi)容加載到物理內(nèi)存中蓝丙,可能會在加載的時候?qū)е乱恍╉撁驽e誤和IO操作。
isLoad()
返回 映射文件內(nèi)容是否駐留在物理內(nèi)存中望拖。
force()
對映射內(nèi)存區(qū)域的寫入渺尘,并不會直接同步到文件中, 在解除映射關(guān)系的時候修改的內(nèi)容才會同步到文件中说敏。 調(diào)用該方法會將對映射區(qū)域的修改同步到磁盤鸥跟,這就與上面的方法msync
方法對應(yīng)。
FileChannel # map 方法
方法簽名
public abstract MappedByteBuffer map(MapMode mode,long position, long size) throws IOException;
參數(shù)小解
mode
為MapMode 中的READ_ONLY,READ_WRITE,PRIVATE中的其中一個,分別表示 只讀医咨,可讀可寫和寫時復(fù)制與上述 mmap
方法中flags
參數(shù)對應(yīng)
position
從文件的哪里開始映射,對應(yīng)上述 mmap
方法 中的offset
參數(shù)
size
從文件position處開始映射多少個字節(jié)
Java 語言實(shí)現(xiàn) mmap+write
簡述:將文件a.txt 中的0到14個字節(jié)發(fā)給服務(wù)端
package zym.netty.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
/**
* file channel map study
*
* @author 24160
*/
public class FileChannelMapStudy {
public static final String FILE_CHANNEL_MAP_STUDY_TXT = "a.txt";
public static final int INT_BYTES_LENGTH = 4;
public static void main(String[] args) {
prepareEnviroment();
try (FileChannel fileChannel = FileChannel.open(Paths.get(FILE_CHANNEL_MAP_STUDY_TXT), StandardOpenOption.READ)) {
long size = fileChannel.size();
//將a.txt 文件映射到內(nèi)存緩沖區(qū)枫匾,從0位置處映射,映射10個字節(jié)長度拟淮,該映射內(nèi)存緩沖區(qū)只可讀
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, 14);
//創(chuàng)建一個SocketChannel實(shí)例
SocketChannel client = SocketChannel.open();
//連接服務(wù)端
client.connect(new InetSocketAddress("127.0.0.1", 8080));
//寫文件內(nèi)容到服務(wù)端
client.write(mappedByteBuffer);
//讀取文件內(nèi)容 網(wǎng)絡(luò)協(xié)議為 head + body 如6zengyi
ByteBuffer head = ByteBuffer.allocate(INT_BYTES_LENGTH);
while (client.read(head) != 0) {}
//切換讀寫模式
head.flip();
//讀取body
ByteBuffer body = ByteBuffer.allocate(head.getInt());
while (client.read(body) != 0) {}
//切換讀寫模式
body.flip();
System.out.println(String.format("發(fā)送字節(jié)成功干茉,服務(wù)端返回:%s", new String(body.array())));
} catch (IOException e) {
e.printStackTrace();
}
}
private static void prepareEnviroment() {
try (FileChannel fileChannel = FileChannel.open(Paths.get(FILE_CHANNEL_MAP_STUDY_TXT), StandardOpenOption.CREATE,StandardOpenOption.READ, StandardOpenOption.WRITE)) {
//將a.txt 映射文件到內(nèi)存映射區(qū)域,模式為可讀可寫
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 14);
//放進(jìn)去一個int 為10
mappedByteBuffer.putInt(10);
mappedByteBuffer.put("zengyiming".getBytes());
//強(qiáng)制刷盤
mappedByteBuffer.force();
} catch (IOException e) {
e.printStackTrace();
}
}
}
服務(wù)端代碼詳見:NioServer.java
下面我們來看看kafka 是如何使用mmap,kafka AbstractIndex.scala
代碼片段
@volatile
protected var mmap: MappedByteBuffer = {
val newlyCreated = file.createNewFile()
val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
try {
/* 如果是新創(chuàng)建則給file 預(yù)留分配空間 maxIndexSize 不超過50MB 單位為字節(jié) */
if(newlyCreated) {
if(maxIndexSize < entrySize)
throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
}
/* memory-map the file */
/* 開始內(nèi)存映射文件*/
_length = raf.length()
val idx = {
if (writable)
/*如果可寫,則映射模式為可讀可寫*/
raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
else
/*若可讀,則映射模式為可讀*/
raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
}
/* set the position in the index for the next entry */
/*為下一個條目 設(shè)置 buffer 中的position值*/
if(newlyCreated)
idx.position(0)
else
// if this is a pre-existing index, assume it is valid and set position to last entry
//如果這是一個預(yù)先存在的索引很泊,則假設(shè)它有效并將位置設(shè)置為最后一個條目
idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
idx
} finally {
CoreUtils.swallow(raf.close(), AbstractIndex)
}
}
kafka 的索引文件是映射到內(nèi)存映射區(qū)域的角虫,對消息偏移量的讀寫都是基于MappedByteBuffer 之上,當(dāng)然牛逼的kafka 作者們 發(fā)明了一個簡單且緩存命中友好的二叉查找算法委造,這個算法有機(jī)會和大家聊下戳鹅。