異步調(diào)用如何使用最好?

一炫刷、異步調(diào)用方式分析

今天在寫代碼的時候擎宝,想要調(diào)用異步的操作,這里我是用的java8的流式異步調(diào)用浑玛,但是使用過程中呢绍申,發(fā)現(xiàn)這個異步方式有兩個方法,如下所示:

image.png

區(qū)別是一個 需要指定線程池顾彰,一個不需要极阅。

那么指定線程池有哪些好處呢?直觀的說有以下兩點好處:
1涨享、可以根據(jù)我們的服務(wù)器性能筋搏,通過池的管理更好的規(guī)劃我們的線程數(shù)。
2厕隧、可以對我們使用的線程自定義名稱奔脐,這里也是阿里java開發(fā)規(guī)范所提到的俄周。

1.1 java8異步調(diào)用默認線程池方式

當然常規(guī)使用默認的也沒什么問題。我們通過源碼分析下使用默認線程池的過程髓迎。

   public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }

看下這個asyncPool是什么峦朗?如下所示,useCommonPool如果為真排龄,就使用ForkJoinPool.commonPool()波势,否則創(chuàng)建一個new ThreadPerTaskExecutor():

    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

看看useCommonPool 是什么?

    private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);
  /**
    * 公共池的目標并行度級別
    */
    public static int getCommonPoolParallelism() {
        return commonParallelism;
    }

最終這個并行級別并沒有給出默認值

static final int commonParallelism;

通過找到這個常量的調(diào)用橄维,我們看看是如何進行初始化的艰亮,在ForkJoinPool中有一個靜態(tài)代碼塊,啟動時會對commonParallelism進行初始化挣郭,我們只關(guān)注最后一句話就好了迄埃,:

    // Unsafe mechanics
    private static final sun.misc.Unsafe U;
    private static final int  ABASE;
    private static final int  ASHIFT;
    private static final long CTL;
    private static final long RUNSTATE;
    private static final long STEALCOUNTER;
    private static final long PARKBLOCKER;
    private static final long QTOP;
    private static final long QLOCK;
    private static final long QSCANSTATE;
    private static final long QPARKER;
    private static final long QCURRENTSTEAL;
    private static final long QCURRENTJOIN;

    static {
        // initialize field offsets for CAS etc
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ForkJoinPool.class;
            CTL = U.objectFieldOffset
                (k.getDeclaredField("ctl"));
            RUNSTATE = U.objectFieldOffset
                (k.getDeclaredField("runState"));
            STEALCOUNTER = U.objectFieldOffset
                (k.getDeclaredField("stealCounter"));
            Class<?> tk = Thread.class;
            PARKBLOCKER = U.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            Class<?> wk = WorkQueue.class;
            QTOP = U.objectFieldOffset
                (wk.getDeclaredField("top"));
            QLOCK = U.objectFieldOffset
                (wk.getDeclaredField("qlock"));
            QSCANSTATE = U.objectFieldOffset
                (wk.getDeclaredField("scanState"));
            QPARKER = U.objectFieldOffset
                (wk.getDeclaredField("parker"));
            QCURRENTSTEAL = U.objectFieldOffset
                (wk.getDeclaredField("currentSteal"));
            QCURRENTJOIN = U.objectFieldOffset
                (wk.getDeclaredField("currentJoin"));
            Class<?> ak = ForkJoinTask[].class;
            ABASE = U.arrayBaseOffset(ak);
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {
            throw new Error(e);
        }

        commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
        defaultForkJoinWorkerThreadFactory =
            new DefaultForkJoinWorkerThreadFactory();
        modifyThreadPermission = new RuntimePermission("modifyThread");

        common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});
         // 即使線程被禁用也是1,至少是個1
        int par = common.config & SMASK;
        commonParallelism = par > 0 ? par : 1;
    }

如下所示兑障,默認是7:

image.png

所以接著下面的代碼看:

    private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);

這里一定是返回true侄非,證明當前是并行的。

    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

上面會返回一個大小是七的的默認線程池

image.png

其實這個默認值是當前cpu的核心數(shù)流译,我的電腦是八核逞怨,在代碼中默認會將核心數(shù)減一,所以顯示是七個線程福澡。

        if (parallelism < 0 && //默認是1叠赦,小于核心數(shù)
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;

下面我們寫個main方法測試一下,10個線程革砸,每個阻塞10秒除秀,看結(jié)果:

    public static void main(String[] args) {
        // 創(chuàng)建10個任務(wù),每個任務(wù)阻塞10秒
        for (int i = 0; i < 10; i++) {
            CompletableFuture.runAsync(() -> {
                try {
                    Thread.sleep(10000);
                    System.out.println(new Date() + ":" + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

結(jié)果如下所示算利,前面七個任務(wù)先完成册踩,另外三個任務(wù)被阻塞10秒后,才完成:

Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-5
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-4
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-2
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-7
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-3
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-6
Mon Sep 13 11:20:57 CST 2021:ForkJoinPool.commonPool-worker-1
-----------------------------------------------------------  
Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-2
Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-5
Mon Sep 13 11:21:07 CST 2021:ForkJoinPool.commonPool-worker-4

結(jié)論:當我們使用默認的線程池進行異步調(diào)用時效拭,如果異步任務(wù)是一個IO密集型暂吉,簡單說處理時間占用長,將導致其他使用共享線程池的任務(wù)阻塞缎患,造成系統(tǒng)性能下降甚至異常慕的。甚至當一部調(diào)用接口時,如果接口超時挤渔,那么也會阻塞與超時市場相同的時間肮街。實際在計算密集的場景下使用是能提高性能的。

二蚂蕴、使用自定義的線程池

上面說到如果是IO密集型的場景低散,在異步調(diào)用時還是使用自定義線程池比較好俯邓。

針對開篇提到的兩個顯而易見的好處,此處新增一條:
1熔号、可以根據(jù)我們的服務(wù)器性能稽鞭,通過池的管理更好的規(guī)劃我們的線程數(shù)。
2引镊、可以對我們使用的線程自定義名稱朦蕴,這里也是阿里java開發(fā)規(guī)范所提到的。
3弟头、不會因為阻塞導致使用共享線程池的其他線程阻塞甚至異常吩抓。

我們自定義下面的線程池:

import cn.hutool.core.thread.NamedThreadFactory;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @description: 全局通用線程池
 * @author:weirx
 * @date:2021/9/9 18:09
 * @version:3.0
 */
@Slf4j
public class GlobalThreadPool {

    /**
     * 核心線程數(shù)
     */
    public final static int CORE_POOL_SIZE = 10;

    /**
     * 最大線程數(shù)
     */
    public final static int MAX_NUM_POOL_SIZE = 20;

    /**
     * 任務(wù)隊列大小
     */
    public final static int BLOCKING_QUEUE_SIZE = 30;

    /**
     * 線程池實例
     */
    private final static ThreadPoolExecutor instance = getInstance();


    /**
     * description: 初始化線程池
     *
     * @return: java.util.concurrent.ThreadPoolExecutor
     * @author: weirx
     * @time: 2021/9/10 9:49
     */
    private synchronized static ThreadPoolExecutor getInstance() {
        // 生成線程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_NUM_POOL_SIZE,
                60,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE),
                new NamedThreadFactory("Thread-Inbox-Model-", false));
        return executor;
    }

    private GlobalThreadPool() {
    }

    public static ThreadPoolExecutor getExecutor() {
        return instance;
    }
}

調(diào)用:

    public static void main(String[] args) {
        // 創(chuàng)建10個任務(wù),每個任務(wù)阻塞10秒
        for (int i = 0; i < 10; i++) {
            CompletableFuture.runAsync(() -> {
                try {
                    Thread.sleep(10000);
                    System.out.println(new Date() + ":" + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },GlobalThreadPool.getExecutor());
        }

        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

輸出我們指定線程名稱的線程:

Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-1
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-10
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-2
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-9
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-5
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-6
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-3
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-7
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-8
Mon Sep 13 11:32:35 CST 2021:Thread-Inbox-Model-4

三赴恨、題外話疹娶,動態(tài)線程池

在我們使用線程池的時候,是否有的時候很糾結(jié)伦连,到底設(shè)置多大的線程池參數(shù)是最合適的呢雨饺?如果不夠用了怎么辦,要改代碼重新部署嗎惑淳?

其實是不需要的额港,記得當初看過美團的一篇文章,真的讓人茅塞頓開啊歧焦,動態(tài)線程池移斩。

ThreadPoolExecutor這個類其實是提供對于線程池的屬性進行修改的,支持我們動態(tài)修改一下的屬性:

image.png

從上至下分別是線程工廠(用于指定線程名稱)绢馍、核心線程數(shù)向瓷、最大線程數(shù)、活躍時間痕貌、拒絕策略风罩。

在美團的文章當中呢,是監(jiān)控服務(wù)器線程的使用lv舵稠,當達到閾值就進行告警,然后通過配置中心去動態(tài)修改這些數(shù)值入宦。

我們也可以這么做哺徊,使用@RefreshScope加nacos就可以實現(xiàn)了。

我這呢寫了一個定時任務(wù)監(jiān)控當前服務(wù)的線程使用率乾闰,小了就擴容落追,一段時間后占用率下降,就恢復初始值涯肩。其實沒有任何難度啊轿钠,當然還有很多地方需要改進的巢钓,請大家多提意見,話不多說:

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @description: 全局線程池守護進程
 * @author:weirx
 * @date:2021/9/10 16:32
 * @version:3.0
 */
@Slf4j
@Component
public class DaemonThreadTask {

    /**
     * 服務(wù)支持最大線程數(shù)
     */
    public final static int SERVER_MAX_SIZE = 50;

    /**
     * 最大閾值Maximum threshold疗垛,百分比
     */
    private final static int MAXIMUM_THRESHOLD = 8;

    /**
     * 每次遞增最大線程數(shù)
     */
    private final static int INCREMENTAL_MAX_NUM = 10;

    /**
     * 每次遞增核心線程數(shù)
     */
    private final static int INCREMENTAL_CORE_NUM = 5;

    /**
     * 當前線程數(shù)
     */
    private static int currentSize = GlobalThreadPool.MAX_NUM_POOL_SIZE;

    /**
     * 當前核心線程數(shù)
     */
    private static int currentCoreSize = GlobalThreadPool.CORE_POOL_SIZE;

    @Scheduled(cron = "0 */5 * * * ?")
    public static void execute() {
        threadMonitor();
    }


    /**
     * description: 動態(tài)監(jiān)控并設(shè)置線程參數(shù)
     *
     * @return: void
     * @author: weirx
     * @time: 2021/9/10 13:20
     */
    private static void threadMonitor() {
        ThreadPoolExecutor instance = GlobalThreadPool.getExecutor();
        int activeCount = instance.getActiveCount();
        int size = instance.getQueue().size();
        log.info("GlobalThreadPool: the active thread count is {}", activeCount);
        // 線程數(shù)不足症汹,增加線程
        if (activeCount > GlobalThreadPool.MAX_NUM_POOL_SIZE % MAXIMUM_THRESHOLD
                && size >= GlobalThreadPool.BLOCKING_QUEUE_SIZE) {
            currentSize = currentSize + INCREMENTAL_MAX_NUM;
            currentCoreSize = currentCoreSize + INCREMENTAL_CORE_NUM;
            //當前設(shè)置最大線程數(shù)小于服務(wù)最大支持線程數(shù)才可以繼續(xù)增加線程
            if (currentSize <= SERVER_MAX_SIZE) {
                instance.setMaximumPoolSize(currentSize);
                instance.setCorePoolSize(currentCoreSize);
                log.info("this max thread size is {}", currentSize);
            } else {
                log.info("current size is more than server max size, can not add");
            }
        }
        // 線程數(shù)足夠,降低線程數(shù)贷腕,當前活躍數(shù)小于默認核心線程數(shù)
        if (activeCount < GlobalThreadPool.MAX_NUM_POOL_SIZE
                && size == 0
                && currentSize > GlobalThreadPool.MAX_NUM_POOL_SIZE) {
            currentSize = GlobalThreadPool.MAX_NUM_POOL_SIZE;
            currentCoreSize = GlobalThreadPool.CORE_POOL_SIZE;
            instance.setMaximumPoolSize(currentSize);
            instance.setCorePoolSize(currentCoreSize);
        }
    }
}

本文的簡單分析就結(jié)束了背镇,看到這了就給點個三連一下,點贊關(guān)注轉(zhuǎn)發(fā)唄泽裳。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末瞒斩,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子涮总,更是在濱河造成了極大的恐慌胸囱,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件瀑梗,死亡現(xiàn)場離奇詭異烹笔,居然都是意外死亡,警方通過查閱死者的電腦和手機夺克,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進店門箕宙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人铺纽,你說我怎么就攤上這事柬帕。” “怎么了狡门?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵陷寝,是天一觀的道長。 經(jīng)常有香客問我其馏,道長凤跑,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任叛复,我火速辦了婚禮仔引,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘褐奥。我一直安慰自己咖耘,他們只是感情好,可當我...
    茶點故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布撬码。 她就那樣靜靜地躺著儿倒,像睡著了一般。 火紅的嫁衣襯著肌膚如雪呜笑。 梳的紋絲不亂的頭發(fā)上夫否,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天彻犁,我揣著相機與錄音,去河邊找鬼凰慈。 笑死汞幢,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的溉瓶。 我是一名探鬼主播急鳄,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼堰酿!你這毒婦竟也來了疾宏?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤触创,失蹤者是張志新(化名)和其女友劉穎坎藐,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體哼绑,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡岩馍,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了抖韩。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蛀恩。...
    茶點故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖茂浮,靈堂內(nèi)的尸體忽然破棺而出双谆,到底是詐尸還是另有隱情,我是刑警寧澤席揽,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布顽馋,位于F島的核電站,受9級特大地震影響幌羞,放射性物質(zhì)發(fā)生泄漏寸谜。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一属桦、第九天 我趴在偏房一處隱蔽的房頂上張望熊痴。 院中可真熱鬧,春花似錦聂宾、人聲如沸愁拭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至盏混,卻和暖如春蔚鸥,著一層夾襖步出監(jiān)牢的瞬間惜论,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工止喷, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留馆类,地道東北人。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓弹谁,卻偏偏與公主長得像乾巧,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子预愤,可洞房花燭夜當晚...
    茶點故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容