Future 是Java原生API中JUC包下提供的接口娄昆。JUC是Java用來(lái)處理并發(fā)內(nèi)容的集合凛忿,簡(jiǎn)單來(lái)說(shuō)Future就是用來(lái)處理在同步執(zhí)行代碼中獲取異步執(zhí)行結(jié)果的上層接口,其下有多種實(shí)現(xiàn)類用于不同并發(fā)場(chǎng)景冰悠。
原生Future中的get()方法节槐,會(huì)阻塞當(dāng)前線程,知道Future中執(zhí)行的異步任務(wù)完成后锯仪。Netty在原生基礎(chǔ)上又增加了監(jiān)聽(tīng)器(Listener)接口泵督,用來(lái)更精確的控制異步任務(wù)執(zhí)行時(shí)間。
Future常用實(shí)現(xiàn)類
FutureTask
- 主要繼承關(guān)系:實(shí)現(xiàn)Runnable接口和Future接口
- 主要功能:可以被看作Runnable對(duì)象提交線程池執(zhí)行庶喜,可以看作Future對(duì)象獲取異步執(zhí)行結(jié)果
簡(jiǎn)單使用示例
package com.ht.actuatorlearn.curr.future;
import java.util.concurrent.*;
/**
* FutureTask 類示例
* 線程池異步執(zhí)行任務(wù)小腊,在同步代碼塊中阻塞獲取異步執(zhí)行結(jié)果
*
* @author: lht
* @date: 2023-03-02
*/
public class FutureTaskTest {
/**
* 聲明線程池
*/
private static ExecutorService EXECUTOR = null;
public static void main(String[] args) {
EXECUTOR = Executors.newSingleThreadExecutor();
// 創(chuàng)建異步執(zhí)行Future對(duì)象
FutureTask<String> futureTask = new FutureTask<>(() -> {
// 注意這里使用的 Callable 類異步執(zhí)行
// 休眠5s 代替耗時(shí)操作
TimeUnit.SECONDS.sleep(5);
return "hello";
});
try {
// 提交執(zhí)行任務(wù)
EXECUTOR.execute(futureTask);
// 同步代碼塊1
System.out.println("Waiting async Results....");
// 獲取異步執(zhí)行結(jié)果 線程阻塞
System.out.println("Result is: " + futureTask.get());
// 同步代碼塊2
System.out.println("Completed...");
EXECUTOR.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
ScheduledFuture
- 主要功能:用于獲取在程序中計(jì)劃定期執(zhí)行任務(wù)的結(jié)果
簡(jiǎn)單使用示例
package com.ht.actuatorlearn.curr.future;
import java.util.concurrent.*;
/**
* 延遲執(zhí)行任務(wù),并在接收到特定結(jié)果下執(zhí)行一系列操作(關(guān)閉任務(wù))
*
* @author: lht
* @date: 2023-03-03
*/
public class ScheduledFutureTest {
public static void main(String[] args) {
// 聲明工作線程池
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
// 創(chuàng)建隨機(jī)數(shù)任務(wù)
Callable<Integer> task = () -> {
return (int) (Math.random() * 10);
};
// 延時(shí)任務(wù)提交線程池
ScheduledFuture<Integer> schedule = executorService.schedule(task, 5, TimeUnit.SECONDS);
try {
Integer result = schedule.get();
System.out.println("Task Result: " + result);
// 關(guān)閉線程池
executorService.shutdown();
}catch (Exception e) {
e.printStackTrace();
}
}
}
ChannelFuture
Netty中久窟,ChannelFuture類似于Future秩冈,表示異步的I/O操作結(jié)果。當(dāng)創(chuàng)建新通道或使用Channel進(jìn)行發(fā)送或接收數(shù)據(jù)時(shí)斥扛,這些耗時(shí)操作在Netty的底層都是異步進(jìn)行處理的入问。如果實(shí)際業(yè)務(wù)邏輯中需要獲取上述異步的執(zhí)行結(jié)果,那就需要使用到ChannelFuture.
常用示例
- 添加監(jiān)聽(tīng)器(最常用處理):ChannelFuture基于Future的阻塞模型上又實(shí)現(xiàn)了監(jiān)聽(tīng)器回調(diào)模式稀颁,可以更精確的獲取異步執(zhí)行結(jié)果和操作信息芬失,并且不阻塞主線程
Object message = new Object();
// channel 異步發(fā)送數(shù)據(jù)
ChannelFuture future = ctx.writeAndFlush(message);
future.addListener((ChannelFutureListener) channelFuture -> {
if (future.isSuccess()) {
// 消息發(fā)送成功 回調(diào)
} else {
// 消息發(fā)送失敗 回調(diào)
}
});
2.同步等待操作完成:使用sync()方法可以時(shí)耗時(shí)異步動(dòng)作變?yōu)橥絼?dòng)作,同步等待耗時(shí)操作完成再執(zhí)行后續(xù)代碼
// 同步推送數(shù)據(jù)
ChannelFuture channelFuture = ctx.writeAndFlush(message).sync();
System.out.println(channelFuture.isSuccess());
3.異步等待: 使用ChannelFuture 的 await() 異步等待I/O動(dòng)作完成匾灶,等待中將阻塞當(dāng)前線程麸折,只到操作完成或中斷
// 阻塞當(dāng)前線程 異步等待
ChannelFuture channelFuture = ctx.writeAndFlush(message).await();
System.out.println(channelFuture.isSuccess());
實(shí)際上:從ChannelFuture的源碼中我們可以看到,它其實(shí)是Future的拓展粘昨,并且不具有返回值的異步調(diào)用垢啼,同時(shí)和一個(gè)Channel進(jìn)行綁定。ChannelPromise接口又在其上繼承了ChannelFuture類和Promise類张肾,使之既可以寫異步執(zhí)行結(jié)果芭析,又具備監(jiān)聽(tīng)通道的功能。