異步、線程池與CompletableFuture異步編排

一两残、創(chuàng)建線程的幾種方式

1.繼承thread類

public class ThreadTest {
    public static void main(String[] args) {
        Thread01 thread = new Thread01();
        thread.start();
    }
    public static class Thread01 extends Thread{
        @Override
        public void run() {
            System.out.println(""+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
        }
    }
}

2.實(shí)現(xiàn)runnable接口

public class ThreadTest {
    public static void main(String[] args) {
        Thread thread = new Thread(new Runnable01());
        thread.start();
    }

    public static class Runnable01 implements Runnable{
        @Override
        public void run() {
            System.out.println(""+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
        }
    }

}


3.實(shí)現(xiàn)Callable通過FutureTask創(chuàng)建線程

public class ThreadTest {
    public static void main(String[] args) {

        FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
        Thread thread = new Thread(futureTask);
        thread.start();
    }
    public static class Callable01 implements Callable<Integer>{
        @Override
        public Integer call() throws Exception {
            System.out.println(""+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
            return i;
        }
    }
}

4.線程池創(chuàng)建線程

七大參數(shù):
corePoolSize:固定線程數(shù)
maximumPoolSize:最大線程數(shù)
keepAliveTime:存活時(shí)間
unit:時(shí)間單位
BlockingQueue<Runnable>:阻塞隊(duì)列
ThreadFactory: 線程工廠
RejectedExecutionHandler:拒絕策略

疑問:拒絕策略丟棄的任務(wù)如何去解決

1.改寫拒絕策略,延遲任務(wù)重新投向線程池
2.打印對(duì)應(yīng)任務(wù)參數(shù)击碗,可以做塞回?cái)?shù)據(jù)庫(kù),或者打印出來方便排查問題
問題
Q:如何打印線程參數(shù)
A:RetryPolicy#rejectedExecution里面通過判斷runnable的類型们拙,然后進(jìn)行打印相關(guān)參數(shù)

Q:有沒有其他方案
A:有的稍途,比如說有些就不用使用延遲隊(duì)列,比如說我們是從數(shù)據(jù)庫(kù)讀取到的任務(wù)砚婆,執(zhí)行成功就修改執(zhí)行的標(biāo)識(shí)械拍,如果不成功或者任務(wù)被拒絕了,它下次掃描還是會(huì)繼續(xù)塞回去

Q:延遲隊(duì)列如果宕機(jī)的話射沟,任務(wù)也丟失了怎么辦
A:這里的打印日志就很重要了殊者,可以記錄起來,或者加個(gè)hook回調(diào)鉤子验夯,在宕機(jī)的時(shí)候?qū)⑦@些數(shù)據(jù)寫回?cái)?shù)據(jù)庫(kù)(kill -9 pid不會(huì)調(diào)用hook~)

5.CompletableFuture異步編排

創(chuàng)建異步對(duì)象:

image.png

1猖吴、runXxxx 都是沒有返回結(jié)果的,supplyXxx 都是可以獲取返回結(jié)果的
2挥转、可以傳入自定義的線程池海蔽,否則就用默認(rèn)的線程池;
列子:

CompletableFuture<Void> futureRunnable = CompletableFuture.runAsync(()->{
            System.out.println("啟動(dòng)執(zhí)行"+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
        },threadPoolExecutor);
        try {
            futureRunnable.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("啟動(dòng)執(zhí)行" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("運(yùn)行結(jié)果:" + i);
            return i;
        }, threadPoolExecutor).whenCompleteAsync((res,e)->{
            System.out.println("whenCompleteAsync===>:"+res+"===>"+e);
        },threadPoolExecutor);
        System.out.println("返回結(jié)果===》:"+ future.get());

計(jì)算完成時(shí)回調(diào):

image.png

whenComplete 可以處理正常和異常的計(jì)算結(jié)果绑谣,exceptionally 處理異常情況党窜。
whenComplete 和 whenCompleteAsync 的區(qū)別:
whenComplete:是執(zhí)行當(dāng)前任務(wù)的線程執(zhí)行繼續(xù)執(zhí)行 whenComplete 的任務(wù)。
whenCompleteAsync:是執(zhí)行把 whenCompleteAsync 這個(gè)任務(wù)繼續(xù)提交給線程池
來進(jìn)行執(zhí)行借宵。
方法不以 Async 結(jié)尾幌衣,意味著 Action 使用相同的線程執(zhí)行,而 Async 可能會(huì)使用其他線程
執(zhí)行(如果是使用相同的線程池壤玫,也可能會(huì)被同一個(gè)線程選中執(zhí)行)
列子:

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("啟動(dòng)執(zhí)行" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("運(yùn)行結(jié)果:" + i);
            return i;
        }, threadPoolExecutor).whenCompleteAsync((res,e)->{
            System.out.println("whenCompleteAsync===>:"+res+"===>"+e);
        },threadPoolExecutor).exceptionally(throwable->{
            return 10;
        });
        System.out.println("返回結(jié)果===》:"+ future.get());

handle 方法:可改變返回值

image.png
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("啟動(dòng)執(zhí)行" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
            return i;
        }, threadPoolExecutor).handle((res,e)->{ //第一個(gè)參數(shù) 線程返回值豁护,第二個(gè)是 異常返回
            if (res != null){
                return res*2;
            }
            if (e == null){
                return 0;
            }
            return -1;
        });

線程串行化方法

image.png

thenApply 方法:當(dāng)一個(gè)線程依賴另一個(gè)線程時(shí),獲取上一個(gè)任務(wù)返回的結(jié)果欲间,并返回當(dāng)前
任務(wù)的返回值楚里。
thenAccept 方法:消費(fèi)處理結(jié)果。接收任務(wù)的處理結(jié)果猎贴,并消費(fèi)處理班缎,無返回結(jié)果。
thenRun 方法:只要上面的任務(wù)執(zhí)行完成她渴,就開始執(zhí)行 thenRun达址,只是處理完任務(wù)后,執(zhí)行
thenRun 的后續(xù)操作
帶有 Async 默認(rèn)是異步執(zhí)行的趁耗。同之前苏携。
以上都要前置任務(wù)成功完成。
Function<? super T,? extends U>
T:上一個(gè)任務(wù)返回結(jié)果的類型
U:當(dāng)前任務(wù)的返回值類型

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
            return i;
        }, threadPoolExecutor).thenApplyAsync(res -> {
            System.out.println("第二任務(wù)啟動(dòng)");
            return "hallo"+res;
        }, threadPoolExecutor);
==================================

        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
            return i;
        }, threadPoolExecutor).thenAcceptAsync(res -> {
            System.out.println("開啟另一個(gè)任務(wù)");
        }, threadPoolExecutor);

兩任務(wù)組合 - 都要完成

image.png

image.png

兩個(gè)任務(wù)必須都完成对粪,觸發(fā)該任務(wù)右冻。
thenCombine:組合兩個(gè) future装蓬,獲取兩個(gè) future 的返回結(jié)果,并返回當(dāng)前任務(wù)的返回值
thenAcceptBoth:組合兩個(gè) future纱扭,獲取兩個(gè) future 任務(wù)的返回結(jié)果牍帚,然后處理任務(wù),沒有
返回值乳蛾。
runAfterBoth:組合兩個(gè) future暗赶,不需要獲取 future 的結(jié)果,只需兩個(gè) future 處理完任務(wù)后肃叶,
處理該任務(wù)蹂随。

        CompletableFuture<Integer> futur1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果:" + i);
            return i;
        }, threadPoolExecutor);
        CompletableFuture<Integer> futur2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程:" + Thread.currentThread().getId());
            int i = 10 / 5;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("結(jié)束當(dāng)前線程");
            return i;
        }, threadPoolExecutor);
        CompletableFuture<String> stringCompletableFuture = futur1.thenCombineAsync(futur2, (f1, f2) -> {
            System.out.println("全部結(jié)束:");
            return f1 + ":" + f2;
        }, threadPoolExecutor);

兩任務(wù)組合 - 一個(gè)完成

image.png

image.png

當(dāng)兩個(gè)任務(wù)中,任意一個(gè) future 任務(wù)完成的時(shí)候因惭,執(zhí)行任務(wù)岳锁。
applyToEither:兩個(gè)任務(wù)有一個(gè)執(zhí)行完成,獲取它的返回值蹦魔,處理任務(wù)并有新的返回值激率。
acceptEither:兩個(gè)任務(wù)有一個(gè)執(zhí)行完成,獲取它的返回值勿决,處理任務(wù)乒躺,沒有新的返回值。
runAfterEither:兩個(gè)任務(wù)有一個(gè)執(zhí)行完成低缩,不需要獲取 future 的結(jié)果嘉冒,處理任務(wù),也沒有返
回值咆繁。

        CompletableFuture<Integer> futur1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程futur1:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果futur1:" + i);
            return i;
        }, threadPoolExecutor);
        CompletableFuture<Integer> futur2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程futur2:" + Thread.currentThread().getId());
            int i = 10 / 5;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("結(jié)束當(dāng)前線程futur2");
            return i;
        }, threadPoolExecutor);
        CompletableFuture<Integer> future = futur1.applyToEitherAsync(futur2, (s) -> {
            System.out.println("第三個(gè)任務(wù)future");
            return s;
        }, threadPoolExecutor);

多任務(wù)組合

image.png

allOf:等待所有任務(wù)完成
anyOf:只要有一個(gè)任務(wù)完成

        CompletableFuture<Integer> futur1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程futur1:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("運(yùn)行結(jié)果futur1:" + i);
            return i;
        }, threadPoolExecutor);
        CompletableFuture<Integer> futur2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("當(dāng)前線程futur2:" + Thread.currentThread().getId());
            int i = 10 / 5;
            try {
                Thread.sleep(7000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("結(jié)束當(dāng)前線程futur2");
            return i;
        }, threadPoolExecutor);
        CompletableFuture<Void> futureAllOf = CompletableFuture.allOf(futur1, futur2);
        System.out.println(futureAllOf.get());
        CompletableFuture<Object> future = CompletableFuture.anyOf(futur1, futur2);
        System.out.println(future.get());
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末健爬,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子么介,更是在濱河造成了極大的恐慌,老刑警劉巖蜕衡,帶你破解...
    沈念sama閱讀 221,273評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件壤短,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡慨仿,警方通過查閱死者的電腦和手機(jī)久脯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來镰吆,“玉大人帘撰,你說我怎么就攤上這事⊥蛎螅” “怎么了摧找?”我有些...
    開封第一講書人閱讀 167,709評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵核行,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我蹬耘,道長(zhǎng)芝雪,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,520評(píng)論 1 296
  • 正文 為了忘掉前任综苔,我火速辦了婚禮惩系,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘如筛。我一直安慰自己堡牡,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,515評(píng)論 6 397
  • 文/花漫 我一把揭開白布杨刨。 她就那樣靜靜地躺著晤柄,像睡著了一般。 火紅的嫁衣襯著肌膚如雪拭嫁。 梳的紋絲不亂的頭發(fā)上可免,一...
    開封第一講書人閱讀 52,158評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音做粤,去河邊找鬼浇借。 笑死,一個(gè)胖子當(dāng)著我的面吹牛怕品,可吹牛的內(nèi)容都是我干的妇垢。 我是一名探鬼主播,決...
    沈念sama閱讀 40,755評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼肉康,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼闯估!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起吼和,我...
    開封第一講書人閱讀 39,660評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤涨薪,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后炫乓,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體刚夺,經(jīng)...
    沈念sama閱讀 46,203評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,287評(píng)論 3 340
  • 正文 我和宋清朗相戀三年末捣,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了侠姑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,427評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡箩做,死狀恐怖莽红,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情邦邦,我是刑警寧澤安吁,帶...
    沈念sama閱讀 36,122評(píng)論 5 349
  • 正文 年R本政府宣布醉蚁,位于F島的核電站,受9級(jí)特大地震影響柳畔,放射性物質(zhì)發(fā)生泄漏馍管。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,801評(píng)論 3 333
  • 文/蒙蒙 一薪韩、第九天 我趴在偏房一處隱蔽的房頂上張望确沸。 院中可真熱鬧,春花似錦俘陷、人聲如沸罗捎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)桨菜。三九已至,卻和暖如春捉偏,著一層夾襖步出監(jiān)牢的瞬間倒得,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工夭禽, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留霞掺,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,808評(píng)論 3 376
  • 正文 我出身青樓讹躯,卻偏偏與公主長(zhǎng)得像菩彬,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子潮梯,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,440評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容