image.png
思路
多個(gè)并行任務(wù)战转,執(zhí)行的時(shí)候躬审,最后一個(gè)任務(wù)執(zhí)行后開始執(zhí)行 (可以自己實(shí)現(xiàn),建議不要矮男,容易出問題)后面出具 reactor版的方案
參數(shù)說明:
size 并行個(gè)數(shù)
ChildTask<T> childTask 子任務(wù)
EndTask<T> endTask 合并結(jié)束后執(zhí)行的任務(wù)
int timeout, 超時(shí)時(shí)間
Executor multiThreadExecutor 子任務(wù)執(zhí)行線程
Executor complateThreadExecutor 任務(wù)結(jié)束后執(zhí)行線程
AsyncThreadSwitchListener asyncThreadSwitchListene 線程切換參數(shù)
代碼部分
public static <T> void disassemblyTasks(int size, ChildTask<T> childTask, EndTask<T> endTask, int timeout, Executor multiThreadExecutor,Executor complateThreadExecutor, AsyncThreadSwitchListener asyncThreadSwitchListener) {
asyncThreadSwitchListener.hold();
CompletableFuture<T>[] completableFutures = new CompletableFuture[size];
//執(zhí)行子任務(wù)
for (int i = 0; i < size; i++) {
int finalI = i;
completableFutures[i] = new CompletableFuture<>();
multiThreadExecutor.execute(() -> {
asyncThreadSwitchListener.cover();
try {
childTask.run(completableFutures[finalI], finalI);
} finally {
asyncThreadSwitchListener.clear();
}
});
}
CompletableFuture<Void> voidCompletableFuture = CompletableFuture
.allOf(completableFutures)
.whenCompleteAsync((unused, throwable) -> {
asyncThreadSwitchListener.cover();
try {
endTask.run(completableFutures, throwable);
} finally {
asyncThreadSwitchListener.clear();
}
}, complateThreadExecutor);
//超時(shí)控制
CompletableFutureUtil.within(voidCompletableFuture, timeout, TimeUnit.MILLISECONDS);
}
/**
* <h1>同步場景會(huì)失效</h1>
* 線程切換回調(diào)函數(shù)
* 線程切換上下文通過這個(gè)進(jìn)行切換
*/
public interface AsyncThreadSwitchListener {
AsyncThreadSwitchListener ASYNC_THREAD_SWITCH_LISTENR = new AsyncThreadSwitchListener() {
@Override
public void hold() {
}
@Override
public void clear() {
}
@Override
public void cover() {
}
};
void hold();
void clear();
void cover();
}
import java.util.concurrent.CompletableFuture;
public interface ChildTask<T> {
/**
* @description 子任務(wù)執(zhí)行
* @author xinjiu
*/
void run(CompletableFuture<T> completableFuture, int number);
}
import java.util.concurrent.CompletableFuture;
public interface EndTask<T> {
//結(jié)束任務(wù)
void run(CompletableFuture<T>[] completableFutures, Throwable throwable);
}
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
/*
* 通用的異步處理工具 超時(shí)工具
* */
public class CompletableFutureUtil {
public static <T> void within(CompletableFuture<T> future, long timeout, TimeUnit unit) {
final CompletableFuture<T> timeoutFuture = timeoutAfter(timeout, unit);
// 哪個(gè)先完成 就apply哪一個(gè)結(jié)果 這是一個(gè)關(guān)鍵的API
future.applyToEitherAsync(timeoutFuture, Function.identity());
}
public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
CompletableFuture<T> result = new CompletableFuture<>();
// timeout 時(shí)間后 拋出TimeoutException 類似于sentinel / watcher
Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException("MultiTask timeOut :"+timeout)), timeout, unit);
return result;
}
/**
* Singleton delay scheduler, used only for starting and * cancelling tasks.
*/
static final class Delayer {
static final class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureDelayScheduler");
return t;
}
}
static final ScheduledThreadPoolExecutor delayer;
// 注意移必,這里使用一個(gè)線程就可以搞定 因?yàn)檫@個(gè)線程并不真的執(zhí)行請(qǐng)求 而是僅僅拋出一個(gè)異常
static {
(delayer = new ScheduledThreadPoolExecutor(
1, new DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
}
}