異步文件通道-通過handler處理
AsynchronousFileChannel
使用 Future 對象的替代機(jī)制,是向異步操作注冊一個 callback 赂弓。接口 CompletionHandler 有兩個方法:
- void completed(V result, A attachment) // 在任務(wù)完成結(jié)果中具有類型 V 時執(zhí)行。
- void failed(Throwable e, A attachment) // 在任務(wù)由于 Throwable e 而失敗時執(zhí)行推姻。
兩個方法的附件參數(shù)都是一個傳遞到異步操作的對象再膳。如果相同的對象用于多個操作削祈,其可用于追蹤哪個操作已完成桥狡。
Open 命令
我們來看一個使用 AsynchronousFileChannel 類的例子卦绣。可通過將 java.nio.file.Path 對象傳遞到靜態(tài) open() 方法中咙崎,來創(chuàng)建一個新的通道:
AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(Paths.get("myfile"));
FileChannel 的新 open 命令
用于異步通道的 open 命令格式已被移植到 FileChannel 類仗处。在 NIO 中眯勾,通過在 FileInputStream、FileOutputStream婆誓、或者 RandomAccessFile 上調(diào)用 getChannel() 來獲取 FileChannel吃环。借助 NIO.2,可利用 open() 方法來直接創(chuàng)建 FileChannel洋幻。
默認(rèn)情況下郁轻,該文件已打開以供讀取。open() 方法可利用附加選項來指定如何打開該文件。例如好唯,此調(diào)用打開文件以供讀取或?qū)懭虢吣绻匾獙?chuàng)建該文件,并在通道關(guān)閉或者 JVM 終止時嘗試刪除文件:
fileChannel = AsynchronousFileChannel.open(
Paths.get("afile"),
StandardOpenOption.READ,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.DELETE_ON_CLOSE
);
替代方法 open() 提供了對通道的更好的控制骑篙,允許設(shè)置文件屬性蜕提。
實現(xiàn)一個完成處理程序
接下來,可將這些寫入文件靶端,寫入完成后谎势,就可執(zhí)行一些操作。 首先要構(gòu)造一個封裝了 “ something ” 的 CompletionHandler :
// 創(chuàng)建完成處理程序
CompletionHandler<Integer, Object> handler =
new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
System.out.println(attachment + " completed with " + result + " bytes written");
}
@Override
public void failed(Throwable e, Object attachment) {
System.err.println(attachment + " failed with:");
e.printStackTrace();
}
};
現(xiàn)在可以進(jìn)行寫入:
fileChannel.write(ByteBuffer.wrap(bytes), 0, "Write operation 1", handler);
write() 方法參數(shù):
- 包含要寫入內(nèi)容的 ByteBuffer
- 文件中的絕對位置
- 要傳遞給完成處理程序方法的附件對象
- 完成處理程序
操作必須給出進(jìn)行讀或?qū)懙奈募械慕^對位置杨名。文件具有內(nèi)部位置標(biāo)記脏榆,來指出讀/寫發(fā)生的位置,這樣做沒有意義台谍,因為在上一個操作完成之前须喂,就可以啟動新操作,它們的發(fā)生順序無法得到保證典唇。由于相同的原因镊折,在 AsynchronousFileChannel API 中沒有用于設(shè)置或查詢位置的方法胯府,在 FileChannel 中同樣也沒有介衔。
除了讀寫方法之外,還支持異步鎖定方法骂因,因此炎咖,如果當(dāng)前有其他線程保持鎖定時,可對文件進(jìn)行執(zhí)行訪問鎖定寒波,而不必在當(dāng)前線程中鎖定(或者利用 tryLock 輪詢)乘盼。
寫文件:
public class FileWriterEnd {
public void write() {
try {
final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(
Paths.get("writeshow.log"),
StandardOpenOption.WRITE,
//StandardOpenOption.DELETE_ON_CLOSE,
StandardOpenOption.CREATE
);
CompletionHandler<Integer, Object> handler = new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
System.out.println(attachment + "S>>> completed with " + result + " bytes written");
}
@Override
public void failed(Throwable e, Object attachment) {
if (e instanceof AsynchronousCloseException) {
System.out.println("S>>> File was closed before " + attachment + " executed");
} else {
System.err.println("S>>> " + attachment + " failed with:");
e.printStackTrace();
}
}
};
int count = 0;
int position = 0;
while (count < 10) {
byte[] contents = "hello ".getBytes();
System.out.println("S>>> Initiating write operation " + count);
fileChannel.write(ByteBuffer.wrap(contents), position , "Write operation "+count + " ", handler);
position += contents.length;
count++;
Thread.sleep(1000);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new FileWriterEnd().write();
}
}
讀文件:
public class FileReaderEnd {
public void read() {
try {
final AsynchronousFileChannel fileChannel = AsynchronousFileChannel.open(
Paths.get("writeshow.log"),
StandardOpenOption.READ
);
final ByteBuffer buffer = ByteBuffer.allocate(7);
CompletionHandler<Integer, Object> handler= new CompletionHandler<Integer, Object>(){
@Override
public void completed(Integer result, Object attachment) {
System.out.println("C))) Read operation completed, file contents is: " + new String(buffer.array()));
clearUp();
}
@Override
public void failed(Throwable e, Object attachment) {
System.err.println("C))) Exception performing write");
e.printStackTrace();
clearUp();
}
private void clearUp() {
try {
fileChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
};
System.out.println("C))) Initiating read operation");
//Future<Integer> future = fileChannel.read(buffer, 0); //
//System.out.println("future : " + future.get());
fileChannel.read(buffer, 0, null, handler);
Thread.sleep(3*1000); // 由于handler處理是異步的,防止主線程過早結(jié)束俄烁,等待handler處理
} catch (IOException e) {
e.printStackTrace();
} /*catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}*/ catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new FileReaderEnd().read();
}
}