一两残、創(chuàng)建線程的幾種方式
1.繼承thread類
public class ThreadTest {
public static void main(String[] args) {
Thread01 thread = new Thread01();
thread.start();
}
public static class Thread01 extends Thread{
@Override
public void run() {
System.out.println(""+Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運(yùn)行結(jié)果:" + i);
}
}
}
2.實(shí)現(xiàn)runnable接口
public class ThreadTest {
public static void main(String[] args) {
Thread thread = new Thread(new Runnable01());
thread.start();
}
public static class Runnable01 implements Runnable{
@Override
public void run() {
System.out.println(""+Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運(yùn)行結(jié)果:" + i);
}
}
}
3.實(shí)現(xiàn)Callable通過FutureTask創(chuàng)建線程
public class ThreadTest {
public static void main(String[] args) {
FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
Thread thread = new Thread(futureTask);
thread.start();
}
public static class Callable01 implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println(""+Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運(yùn)行結(jié)果:" + i);
return i;
}
}
}
4.線程池創(chuàng)建線程
七大參數(shù):
corePoolSize:固定線程數(shù)
maximumPoolSize:最大線程數(shù)
keepAliveTime:存活時(shí)間
unit:時(shí)間單位
BlockingQueue<Runnable>:阻塞隊(duì)列
ThreadFactory: 線程工廠
RejectedExecutionHandler:拒絕策略
疑問:拒絕策略丟棄的任務(wù)如何去解決
1.改寫拒絕策略,延遲任務(wù)重新投向線程池
2.打印對(duì)應(yīng)任務(wù)參數(shù)击碗,可以做塞回?cái)?shù)據(jù)庫(kù),或者打印出來方便排查問題
問題
Q:如何打印線程參數(shù)
A:RetryPolicy#rejectedExecution里面通過判斷runnable的類型们拙,然后進(jìn)行打印相關(guān)參數(shù)
Q:有沒有其他方案
A:有的稍途,比如說有些就不用使用延遲隊(duì)列,比如說我們是從數(shù)據(jù)庫(kù)讀取到的任務(wù)砚婆,執(zhí)行成功就修改執(zhí)行的標(biāo)識(shí)械拍,如果不成功或者任務(wù)被拒絕了,它下次掃描還是會(huì)繼續(xù)塞回去
Q:延遲隊(duì)列如果宕機(jī)的話射沟,任務(wù)也丟失了怎么辦
A:這里的打印日志就很重要了殊者,可以記錄起來,或者加個(gè)hook回調(diào)鉤子验夯,在宕機(jī)的時(shí)候?qū)⑦@些數(shù)據(jù)寫回?cái)?shù)據(jù)庫(kù)(kill -9 pid不會(huì)調(diào)用hook~)
5.CompletableFuture異步編排
創(chuàng)建異步對(duì)象:
1猖吴、runXxxx 都是沒有返回結(jié)果的,supplyXxx 都是可以獲取返回結(jié)果的
2挥转、可以傳入自定義的線程池海蔽,否則就用默認(rèn)的線程池;
列子:
CompletableFuture<Void> futureRunnable = CompletableFuture.runAsync(()->{
System.out.println("啟動(dòng)執(zhí)行"+Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運(yùn)行結(jié)果:" + i);
},threadPoolExecutor);
try {
futureRunnable.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("啟動(dòng)執(zhí)行" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("運(yùn)行結(jié)果:" + i);
return i;
}, threadPoolExecutor).whenCompleteAsync((res,e)->{
System.out.println("whenCompleteAsync===>:"+res+"===>"+e);
},threadPoolExecutor);
System.out.println("返回結(jié)果===》:"+ future.get());
計(jì)算完成時(shí)回調(diào):
whenComplete 可以處理正常和異常的計(jì)算結(jié)果绑谣,exceptionally 處理異常情況党窜。
whenComplete 和 whenCompleteAsync 的區(qū)別:
whenComplete:是執(zhí)行當(dāng)前任務(wù)的線程執(zhí)行繼續(xù)執(zhí)行 whenComplete 的任務(wù)。
whenCompleteAsync:是執(zhí)行把 whenCompleteAsync 這個(gè)任務(wù)繼續(xù)提交給線程池
來進(jìn)行執(zhí)行借宵。
方法不以 Async 結(jié)尾幌衣,意味著 Action 使用相同的線程執(zhí)行,而 Async 可能會(huì)使用其他線程
執(zhí)行(如果是使用相同的線程池壤玫,也可能會(huì)被同一個(gè)線程選中執(zhí)行)
列子:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("啟動(dòng)執(zhí)行" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("運(yùn)行結(jié)果:" + i);
return i;
}, threadPoolExecutor).whenCompleteAsync((res,e)->{
System.out.println("whenCompleteAsync===>:"+res+"===>"+e);
},threadPoolExecutor).exceptionally(throwable->{
return 10;
});
System.out.println("返回結(jié)果===》:"+ future.get());
handle 方法:可改變返回值
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("啟動(dòng)執(zhí)行" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運(yùn)行結(jié)果:" + i);
return i;
}, threadPoolExecutor).handle((res,e)->{ //第一個(gè)參數(shù) 線程返回值豁护,第二個(gè)是 異常返回
if (res != null){
return res*2;
}
if (e == null){
return 0;
}
return -1;
});
線程串行化方法
thenApply 方法:當(dāng)一個(gè)線程依賴另一個(gè)線程時(shí),獲取上一個(gè)任務(wù)返回的結(jié)果欲间,并返回當(dāng)前
任務(wù)的返回值楚里。
thenAccept 方法:消費(fèi)處理結(jié)果。接收任務(wù)的處理結(jié)果猎贴,并消費(fèi)處理班缎,無返回結(jié)果。
thenRun 方法:只要上面的任務(wù)執(zhí)行完成她渴,就開始執(zhí)行 thenRun达址,只是處理完任務(wù)后,執(zhí)行
thenRun 的后續(xù)操作
帶有 Async 默認(rèn)是異步執(zhí)行的趁耗。同之前苏携。
以上都要前置任務(wù)成功完成。
Function<? super T,? extends U>
T:上一個(gè)任務(wù)返回結(jié)果的類型
U:當(dāng)前任務(wù)的返回值類型
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("當(dāng)前線程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運(yùn)行結(jié)果:" + i);
return i;
}, threadPoolExecutor).thenApplyAsync(res -> {
System.out.println("第二任務(wù)啟動(dòng)");
return "hallo"+res;
}, threadPoolExecutor);
==================================
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("當(dāng)前線程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運(yùn)行結(jié)果:" + i);
return i;
}, threadPoolExecutor).thenAcceptAsync(res -> {
System.out.println("開啟另一個(gè)任務(wù)");
}, threadPoolExecutor);
兩任務(wù)組合 - 都要完成
兩個(gè)任務(wù)必須都完成对粪,觸發(fā)該任務(wù)右冻。
thenCombine:組合兩個(gè) future装蓬,獲取兩個(gè) future 的返回結(jié)果,并返回當(dāng)前任務(wù)的返回值
thenAcceptBoth:組合兩個(gè) future纱扭,獲取兩個(gè) future 任務(wù)的返回結(jié)果牍帚,然后處理任務(wù),沒有
返回值乳蛾。
runAfterBoth:組合兩個(gè) future暗赶,不需要獲取 future 的結(jié)果,只需兩個(gè) future 處理完任務(wù)后肃叶,
處理該任務(wù)蹂随。
CompletableFuture<Integer> futur1 = CompletableFuture.supplyAsync(() -> {
System.out.println("當(dāng)前線程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運(yùn)行結(jié)果:" + i);
return i;
}, threadPoolExecutor);
CompletableFuture<Integer> futur2 = CompletableFuture.supplyAsync(() -> {
System.out.println("當(dāng)前線程:" + Thread.currentThread().getId());
int i = 10 / 5;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("結(jié)束當(dāng)前線程");
return i;
}, threadPoolExecutor);
CompletableFuture<String> stringCompletableFuture = futur1.thenCombineAsync(futur2, (f1, f2) -> {
System.out.println("全部結(jié)束:");
return f1 + ":" + f2;
}, threadPoolExecutor);
兩任務(wù)組合 - 一個(gè)完成
當(dāng)兩個(gè)任務(wù)中,任意一個(gè) future 任務(wù)完成的時(shí)候因惭,執(zhí)行任務(wù)岳锁。
applyToEither:兩個(gè)任務(wù)有一個(gè)執(zhí)行完成,獲取它的返回值蹦魔,處理任務(wù)并有新的返回值激率。
acceptEither:兩個(gè)任務(wù)有一個(gè)執(zhí)行完成,獲取它的返回值勿决,處理任務(wù)乒躺,沒有新的返回值。
runAfterEither:兩個(gè)任務(wù)有一個(gè)執(zhí)行完成低缩,不需要獲取 future 的結(jié)果嘉冒,處理任務(wù),也沒有返
回值咆繁。
CompletableFuture<Integer> futur1 = CompletableFuture.supplyAsync(() -> {
System.out.println("當(dāng)前線程futur1:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運(yùn)行結(jié)果futur1:" + i);
return i;
}, threadPoolExecutor);
CompletableFuture<Integer> futur2 = CompletableFuture.supplyAsync(() -> {
System.out.println("當(dāng)前線程futur2:" + Thread.currentThread().getId());
int i = 10 / 5;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("結(jié)束當(dāng)前線程futur2");
return i;
}, threadPoolExecutor);
CompletableFuture<Integer> future = futur1.applyToEitherAsync(futur2, (s) -> {
System.out.println("第三個(gè)任務(wù)future");
return s;
}, threadPoolExecutor);
多任務(wù)組合
allOf:等待所有任務(wù)完成
anyOf:只要有一個(gè)任務(wù)完成
CompletableFuture<Integer> futur1 = CompletableFuture.supplyAsync(() -> {
System.out.println("當(dāng)前線程futur1:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運(yùn)行結(jié)果futur1:" + i);
return i;
}, threadPoolExecutor);
CompletableFuture<Integer> futur2 = CompletableFuture.supplyAsync(() -> {
System.out.println("當(dāng)前線程futur2:" + Thread.currentThread().getId());
int i = 10 / 5;
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("結(jié)束當(dāng)前線程futur2");
return i;
}, threadPoolExecutor);
CompletableFuture<Void> futureAllOf = CompletableFuture.allOf(futur1, futur2);
System.out.println(futureAllOf.get());
CompletableFuture<Object> future = CompletableFuture.anyOf(futur1, futur2);
System.out.println(future.get());