美團(tuán)動態(tài)線程池實(shí)踐思路開源項(xiàng)目(DynamicTp)篷就,線程池源碼解析及通知告警篇

大家好,這篇文章我們來聊下動態(tài)線程池開源項(xiàng)目(DynamicTp)的通知告警模塊近忙。目前項(xiàng)目提供以下通知告警功能竭业,每一個(gè)通知項(xiàng)都可以獨(dú)立配置是否開啟、告警閾值及舍、告警間隔時(shí)間未辆、平臺等,具體代碼請看core模塊notify包击纬。

1.核心參數(shù)變更通知

2.線程池活躍度告警

3.隊(duì)列容量告警

4.拒絕策略告警

5.任務(wù)執(zhí)行超時(shí)告警

6.任務(wù)排隊(duì)超時(shí)告警


DynamicTp項(xiàng)目地址

目前700star鼎姐,感謝你的star,歡迎pr更振,業(yè)務(wù)之余一起給開源貢獻(xiàn)一份力量

gitee地址https://gitee.com/yanhom/dynamic-tp

github地址https://github.com/lyh200/dynamic-tp


系列文章

美團(tuán)動態(tài)線程池實(shí)踐思路炕桨,開源了

動態(tài)線程池框架(DynamicTp),監(jiān)控及源碼解析篇

動態(tài)線程池(DynamicTp)肯腕,動態(tài)調(diào)整Tomcat献宫、Jetty、Undertow線程池參數(shù)篇


線程池解讀

[圖片上傳失敗...(image-d6e670-1649297221316)]

上篇文章里大概講到了JUC線程池的執(zhí)行流程实撒,我們這里再仔細(xì)回顧下姊途,上圖是JUC下線程池ThreadPoolExecutor類的繼承體系。

頂級接口Executor提供了一種方式知态,解耦任務(wù)的提交和執(zhí)行捷兰,只定義了一個(gè)execute(Runnable command)方法用來提交任務(wù),至于具體任務(wù)怎么執(zhí)行則交給他的實(shí)現(xiàn)者去自定義實(shí)現(xiàn)负敏。

ExecutorService接口繼承Executor贡茅,且擴(kuò)展了生命周期管理的方法、返回Futrue的方法其做、批量提交任務(wù)的方法

void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

AbstractExecutorService抽象類繼承ExecutorService接口顶考,對ExecutorService相關(guān)方法提供了默認(rèn)實(shí)現(xiàn),用RunnableFuture的實(shí)現(xiàn)類FutureTask包裝Runnable任務(wù)妖泄,交給execute()方法執(zhí)行驹沿,然后可以從該FutureTask阻塞獲取執(zhí)行結(jié)果,并且對批量任務(wù)的提交做了編排

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
    
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

ThreadPoolExecutor繼承AbstractExecutorService蹈胡,采用池化思想管理一定數(shù)量的線程來調(diào)度執(zhí)行提交的任務(wù)渊季,且定義了一套線程池的生命周期狀態(tài)朋蔫,用一個(gè)ctl變量來同時(shí)保存當(dāng)前池狀態(tài)(高3位)和當(dāng)前池線程數(shù)(低29位)∷笥颍看過源碼的小伙伴會發(fā)現(xiàn)斑举,ThreadPoolExecutor類里的方法大量有同時(shí)需要獲取或更新池狀態(tài)和池當(dāng)前線程數(shù)的場景搅轿,放一個(gè)原子變量里病涨,可以很好的保證數(shù)據(jù)的一致性以及代碼的簡潔性。

  // 用此變量保存當(dāng)前池狀態(tài)(高3位)和當(dāng)前線程數(shù)(低29位)
  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
  private static final int COUNT_BITS = Integer.SIZE - 3;
  private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

  // runState is stored in the high-order bits
  // 可以接受新任務(wù)提交璧坟,也會處理任務(wù)隊(duì)列中的任務(wù)
  // 結(jié)果:111 00000000000000000000000000000
  private static final int RUNNING    = -1 << COUNT_BITS;
  
  // 不接受新任務(wù)提交既穆,但會處理任務(wù)隊(duì)列中的任務(wù)
  // 結(jié)果:000 00000000000000000000000000000
  private static final int SHUTDOWN   =  0 << COUNT_BITS;
  
  // 不接受新任務(wù),不執(zhí)行隊(duì)列中的任務(wù)雀鹃,且會中斷正在執(zhí)行的任務(wù)
  // 結(jié)果:001 00000000000000000000000000000
  private static final int STOP       =  1 << COUNT_BITS;
  
  // 任務(wù)隊(duì)列為空幻工,workerCount = 0,線程池的狀態(tài)在轉(zhuǎn)換為TIDYING狀態(tài)時(shí)黎茎,會執(zhí)行鉤子方法terminated()
  // 結(jié)果:010 00000000000000000000000000000
  private static final int TIDYING    =  2 << COUNT_BITS;
  
  // 調(diào)用terminated()鉤子方法后進(jìn)入TERMINATED狀態(tài)
  // 結(jié)果:010 00000000000000000000000000000
  private static final int TERMINATED =  3 << COUNT_BITS;

  // Packing and unpacking ctl
  // 低29位變?yōu)?囊颅,得到了線程池的狀態(tài)
  private static int runStateOf(int c)     { return c & ~CAPACITY; }
  // 高3位變?yōu)闉?,得到了線程池中的線程數(shù)
  private static int workerCountOf(int c)  { return c & CAPACITY; }
  private static int ctlOf(int rs, int wc) { return rs | wc; }

核心入口execute()方法執(zhí)行邏輯如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

可以總結(jié)出如下主要執(zhí)行流程傅瞻,當(dāng)然看上述代碼會有一些異常分支判斷踢代,可以自己順理加到下述執(zhí)行主流程里

1.判斷線程池的狀態(tài),如果不是RUNNING狀態(tài)嗅骄,直接執(zhí)行拒絕策略

2.如果當(dāng)前線程數(shù) < 核心線程池胳挎,則新建一個(gè)線程來處理提交的任務(wù)

3.如果當(dāng)前線程數(shù) > 核心線程數(shù)且任務(wù)隊(duì)列沒滿,則將任務(wù)放入任務(wù)隊(duì)列等待執(zhí)行

4.如果 核心線程池 < 當(dāng)前線程池?cái)?shù) < 最大線程數(shù)溺森,且任務(wù)隊(duì)列已滿慕爬,則創(chuàng)建新的線程執(zhí)行提交的任務(wù)

5.如果當(dāng)前線程數(shù) > 最大線程數(shù),且隊(duì)列已滿屏积,則拒絕該任務(wù)

addWorker()方法邏輯

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 獲取當(dāng)前池狀態(tài)
            int rs = runStateOf(c);

            // 1.判斷如果線程池狀態(tài) > SHUTDOWN医窿,直接返回false,否則2
            // 2.如果線程池狀態(tài) = SHUTDOWN炊林,并且firstTask不為null則直接返回false姥卢,因?yàn)镾HUTDOWN狀態(tài)的線程池不能在接受新任務(wù),否則3
            // 3.如果線程池狀態(tài) = SHUTDOWN铛铁,并且firstTask == null隔显,此時(shí)如果任務(wù)隊(duì)列為空,則直接返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 1.如果當(dāng)前線程池線程數(shù)大于等于CAPACITY(理論上的最大值5億)饵逐,則返回fasle
                // 2.如果創(chuàng)建核心線程情況下當(dāng)前池線程數(shù) >= corePoolSize括眠,則返回false
                // 3.如果創(chuàng)建非核心線程情況下當(dāng)前池線程數(shù) >= maximumPoolSize,則返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // cas 增加當(dāng)前池線程數(shù)量倍权,成功則退出循環(huán)    
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // cas 增加當(dāng)前池線程數(shù)量失斨啦颉(多線程并發(fā))捞烟,則重新獲取ctl,計(jì)算出當(dāng)前線程池狀態(tài)当船,如果不等于上述計(jì)算的狀態(tài)rs题画,則說明線程池狀態(tài)發(fā)生了改變,需要跳到外層循環(huán)重新進(jìn)行狀態(tài)判斷德频,否則執(zhí)行內(nèi)部循環(huán)
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 至此說明線程池狀態(tài)校驗(yàn)通過苍息,且增加池線程數(shù)量成功,則創(chuàng)建一個(gè)Worker線程來執(zhí)行任務(wù)
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 訪問worker set時(shí)需要獲取mainLock全局鎖
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    // 1.當(dāng)前池狀態(tài) < SHUTDOWN壹置,也就是RUNNING狀態(tài)竞思,如果已經(jīng)started,拋出異常
                    // 2.當(dāng)前池狀態(tài) = SHUTDOWN钞护,且firstTask == null盖喷,需要處理任務(wù)隊(duì)列中的任務(wù),如果已經(jīng)started难咕,拋出異常
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 剛創(chuàng)建線程添加到workers集合中
                        workers.add(w);
                        int s = workers.size();
                        // 判斷更新歷史最大線程數(shù)量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 啟動新建線程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                // 啟動失敗课梳,workerCount--,workers里移除該worker
                addWorkerFailed(w);
        }
        return workerStarted;
    }

線程池中的線程并不是直接用的Thread類余佃,而是定義了一個(gè)內(nèi)部工作線程Worker類暮刃,實(shí)現(xiàn)了AQS以及Runnable接口,然后持有一個(gè)Thread類的引用及一個(gè)firstTask(創(chuàng)建后第一個(gè)要執(zhí)行的任務(wù))咙冗,每個(gè)Worker線程啟動后會執(zhí)行run()方法沾歪,該方法會調(diào)用執(zhí)行外層runWorker(Worker w)方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 1.如果task不為空,則作為該線程的第一個(gè)任務(wù)直接執(zhí)行
        // 2.如果task為空雾消,則通過getTask()方法從任務(wù)隊(duì)列中獲取任務(wù)執(zhí)行
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 線程池狀態(tài) >= STOP灾搏,則中斷線程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 實(shí)際執(zhí)行任務(wù)前調(diào)用的鉤子方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 實(shí)際執(zhí)行任務(wù)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 任務(wù)執(zhí)行后調(diào)用的鉤子方法
                    afterExecute(task, thrown);
                }
            } finally {
                // 任務(wù)置為null,重新獲取新任務(wù)立润,完成數(shù)++
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 無任務(wù)可執(zhí)行狂窑,執(zhí)行worker銷毀邏輯
        processWorkerExit(w, completedAbruptly);
    }
}

getTask()方法邏輯

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // 以下兩種情況遞減工作線程數(shù)量
        // 1. rs >= STOP
        // 2. rs == SHUTDOWN && workQueue.isEmpty()
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // 允許核心線程超時(shí) 或者 當(dāng)前線程數(shù) > 核心線程數(shù),有可能發(fā)生超時(shí)關(guān)閉
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // wc什么情況 > maximumPoolSize桑腮,調(diào)用setMaximumPoolSize()方法將maximumPoolSize調(diào)小了泉哈,會發(fā)生這種情況,此時(shí)需要關(guān)閉多余線程
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 阻塞隊(duì)列獲取任務(wù)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // 發(fā)生中斷破讨,進(jìn)行重試
            timedOut = false;
        }
    }
}

以上內(nèi)容比較詳細(xì)的介紹了ThreadPoolExecutor的繼承體系丛晦,以及相關(guān)的核心源碼,基于此提陶,現(xiàn)在我們來看DynamicTp提供的告警通知能力烫沙。


核心參數(shù)變更通知

對應(yīng)配置中心的監(jiān)聽端監(jiān)聽到配置變更后,封裝到DtpProperties中然后交由DtpRegistry類中的refresh()方法去做配置更新隙笆,同時(shí)通知時(shí)會高亮顯示有變更的字段

[圖片上傳失敗...(image-b9dce5-1649297221316)]


線程池活躍度告警

活躍度 = activeCount / maximumPoolSize

服務(wù)啟動后會開啟一個(gè)定時(shí)監(jiān)控任務(wù)锌蓄,每隔一定時(shí)間(可配置)去計(jì)算線程池的活躍度升筏,達(dá)到配置的threshold閾值后會觸發(fā)一次告警,告警間隔內(nèi)多次觸發(fā)不會發(fā)送告警通知

[圖片上傳失敗...(image-f0a4a5-1649297221316)]


隊(duì)列容量告警

容量使用率 = queueSize / queueCapacity

服務(wù)啟動后會開啟一個(gè)定時(shí)監(jiān)控任務(wù)瘸爽,每隔一定時(shí)間去計(jì)算任務(wù)隊(duì)列的使用率您访,達(dá)到配置的threshold閾值后會觸發(fā)一次告警,告警間隔內(nèi)多次觸發(fā)不會發(fā)送告警通知

[圖片上傳失敗...(image-aca3b5-1649297221316)]


拒絕策略告警

/**
 * Do sth before reject.
 * @param executor ThreadPoolExecutor instance
 */
default void beforeReject(ThreadPoolExecutor executor) {
    if (executor instanceof DtpExecutor) {
        DtpExecutor dtpExecutor = (DtpExecutor) executor;
        dtpExecutor.incRejectCount(1);
        Runnable runnable = () -> AlarmManager.doAlarm(dtpExecutor, REJECT);
        AlarmManager.triggerAlarm(dtpExecutor.getThreadPoolName(), REJECT.getValue(), runnable);
    }
}

線程池線程數(shù)達(dá)到配置的最大線程數(shù)剪决,且任務(wù)隊(duì)列已滿灵汪,再提交任務(wù)會觸發(fā)拒絕策略。DtpExecutor線程池用到的RejectedExecutionHandler是經(jīng)過動態(tài)代理包裝過的昼捍,在執(zhí)行具體的拒絕策略之前會執(zhí)行RejectedAware類beforeReject()方法识虚,此方法會去做拒絕數(shù)量累加(總數(shù)值累加、周期值累加)妒茬。且判斷如果周期累計(jì)值達(dá)到配置的閾值,則會觸發(fā)一次告警通知(同時(shí)重置周期累加值為0及上次告警時(shí)間為當(dāng)前時(shí)間)蔚晨,告警間隔內(nèi)多次觸發(fā)不會發(fā)送告警通知

[圖片上傳失敗...(image-d784a9-1649297221316)]


任務(wù)隊(duì)列超時(shí)告警

重寫ThreadPoolExecutor的execute()方法和beforeExecute()方法乍钻,如果配置了執(zhí)行超時(shí)或排隊(duì)超時(shí)值,則會用DtpRunnable包裝任務(wù)铭腕,同時(shí)記錄任務(wù)的提交時(shí)間submitTime银择,beforeExecute根據(jù)當(dāng)前時(shí)間和submitTime的差值就可以計(jì)算到該任務(wù)在隊(duì)列中的等待時(shí)間,然后判斷如果差值大于配置的queueTimeout則累加排隊(duì)超時(shí)任務(wù)數(shù)量(總數(shù)值累加累舷、周期值累加)浩考。且判斷如果周期累計(jì)值達(dá)到配置的閾值,則會觸發(fā)一次告警通知(同時(shí)重置周期累加值為0及上次告警時(shí)間為當(dāng)前時(shí)間)被盈,告警間隔內(nèi)多次觸發(fā)不會發(fā)送告警通知

@Override
public void execute(Runnable command) {
    if (CollUtil.isNotEmpty(taskWrappers)) {
        for (TaskWrapper t : taskWrappers) {
            command = t.wrap(command);
        }
    }

    if (runTimeout > 0 || queueTimeout > 0) {
        command = new DtpRunnable(command);
    }
    super.execute(command);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
    if (!(r instanceof DtpRunnable)) {
        super.beforeExecute(t, r);
        return;
    }
    DtpRunnable runnable = (DtpRunnable) r;
    long currTime = System.currentTimeMillis();
    if (runTimeout > 0) {
        runnable.setStartTime(currTime);
    }
    if (queueTimeout > 0) {
        long waitTime = currTime - runnable.getSubmitTime();
        if (waitTime > queueTimeout) {
            queueTimeoutCount.incrementAndGet();
            Runnable alarmTask = () -> AlarmManager.doAlarm(this, QUEUE_TIMEOUT);
            AlarmManager.triggerAlarm(this.getThreadPoolName(), QUEUE_TIMEOUT.getValue(), alarmTask);
        }
    }

    super.beforeExecute(t, r);
}

[圖片上傳失敗...(image-4708ec-1649297221316)]


任務(wù)執(zhí)行超時(shí)告警

重寫ThreadPoolExecutor的afterExecute()方法析孽,根據(jù)當(dāng)前時(shí)間和beforeExecute()中設(shè)置的startTime的差值即可算出任務(wù)的實(shí)際執(zhí)行時(shí)間,然后判斷如果差值大于配置的runTimeout則累加排隊(duì)超時(shí)任務(wù)數(shù)量(總數(shù)值累加只怎、周期值累加)袜瞬。且判斷如果周期累計(jì)值達(dá)到配置的閾值,則會觸發(fā)一次告警通知(同時(shí)重置周期累加值為0及上次告警時(shí)間為當(dāng)前時(shí)間)身堡,告警間隔內(nèi)多次觸發(fā)不會發(fā)送告警通知

@Override
protected void afterExecute(Runnable r, Throwable t) {

    if (runTimeout > 0) {
        DtpRunnable runnable = (DtpRunnable) r;
        long runTime = System.currentTimeMillis() - runnable.getStartTime();
        if (runTime > runTimeout) {
            runTimeoutCount.incrementAndGet();
            Runnable alarmTask = () -> AlarmManager.doAlarm(this, RUN_TIMEOUT);
            AlarmManager.triggerAlarm(this.getThreadPoolName(), RUN_TIMEOUT.getValue(), alarmTask);
        }
    }

    super.afterExecute(r, t);
}

[圖片上傳失敗...(image-bc1744-1649297221316)]


告警通知相關(guān)配置項(xiàng)

如果想使用通知告警功能邓尤,配置文件必須要配置platforms字段,且可以配置多個(gè)平臺贴谎,如釘釘汞扎、企微等;notifyItems配置具體告警項(xiàng)擅这,包括閾值澈魄、平臺、告警間隔等蕾哟。

spring:
  dynamic:
    tp:
      # 省略其他項(xiàng)
      platforms:                         # 通知平臺
        - platform: wechat
          urlKey: 38a98-0c5c3b649c
          receivers: test
        - platform: ding
          urlKey: f80db3e801d593604f4a08dcd6a
          secret: SECb5444a6f375d5b9d21
          receivers: 17811511815
      executors:                                   # 動態(tài)線程池配置一忱,都有默認(rèn)值莲蜘,采用默認(rèn)值的可以不配置該項(xiàng),減少配置量
        - threadPoolName: dtpExecutor1
          executorType: common                          # 線程池類型common帘营、eager:適用于io密集型
          corePoolSize: 2
          maximumPoolSize: 4
          queueCapacity: 200
          queueType: VariableLinkedBlockingQueue       # 任務(wù)隊(duì)列票渠,查看源碼QueueTypeEnum枚舉類
          rejectedHandlerType: CallerRunsPolicy        # 拒絕策略,查看RejectedTypeEnum枚舉類
          keepAliveTime: 50
          allowCoreThreadTimeOut: false
          threadNamePrefix: dtp1                         # 線程名前綴
          waitForTasksToCompleteOnShutdown: false        # 參考spring線程池設(shè)計(jì)
          awaitTerminationSeconds: 5                     # 單位(s)
          preStartAllCoreThreads: false                  # 是否預(yù)熱核心線程芬迄,默認(rèn)false
          runTimeout: 200                                # 任務(wù)執(zhí)行超時(shí)閾值问顷,目前只做告警用,單位(ms)
          queueTimeout: 100                              # 任務(wù)在隊(duì)列等待超時(shí)閾值禀梳,目前只做告警用杜窄,單位(ms)
          taskWrapperNames: ["ttl"]                      # 任務(wù)包裝器名稱,集成TaskWrapper接口
          notifyItems:                     # 報(bào)警項(xiàng)算途,不配置自動會按默認(rèn)值配置(變更通知塞耕、容量報(bào)警、活性報(bào)警嘴瓤、拒絕報(bào)警扫外、任務(wù)超時(shí)報(bào)警)
            - type: capacity               # 報(bào)警項(xiàng)類型,查看源碼 NotifyTypeEnum枚舉類
              threshold: 80                # 報(bào)警閾值
              platforms: [ding,wechat]     # 可選配置廓脆,不配置默認(rèn)拿上層platforms配置的所以平臺
              interval: 120                # 報(bào)警間隔(單位:s)
            - type: change
            - type: liveness
              threshold: 80
              interval: 120
            - type: reject
              threshold: 1
              interval: 160
            - type: run_timeout
              threshold: 1
              interval: 120
            - type: queue_timeout
              threshold: 1
              interval: 140

總結(jié)

本文開頭介紹了線程池ThreadPoolExecutor的繼承體系筛谚,核心流程的源碼解讀。然后介紹了DynamicTp提供的以上6種告警通知能力停忿,希望通過監(jiān)控+告警可以讓我們及時(shí)感知到我們業(yè)務(wù)線程池的執(zhí)行負(fù)載情況驾讲,第一時(shí)間做出調(diào)整,防止事故的發(fā)生席赂。


聯(lián)系我

對項(xiàng)目有什么想法或者建議吮铭,可以加我微信交流,或者創(chuàng)建issues氧枣,一起完善項(xiàng)目

公眾號:CodeFox

微信:yanhom1314

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末沐兵,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子便监,更是在濱河造成了極大的恐慌扎谎,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件烧董,死亡現(xiàn)場離奇詭異毁靶,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)逊移,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進(jìn)店門预吆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人胳泉,你說我怎么就攤上這事拐叉⊙乙牛” “怎么了?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵凤瘦,是天一觀的道長宿礁。 經(jīng)常有香客問我,道長蔬芥,這世上最難降的妖魔是什么梆靖? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮笔诵,結(jié)果婚禮上返吻,老公的妹妹穿的比我還像新娘。我一直安慰自己乎婿,他們只是感情好测僵,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著次酌,像睡著了一般恨课。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上岳服,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天,我揣著相機(jī)與錄音希俩,去河邊找鬼吊宋。 笑死,一個(gè)胖子當(dāng)著我的面吹牛颜武,可吹牛的內(nèi)容都是我干的璃搜。 我是一名探鬼主播,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼鳞上,長吁一口氣:“原來是場噩夢啊……” “哼这吻!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起篙议,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤唾糯,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后鬼贱,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體移怯,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年这难,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了舟误。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,117評論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡姻乓,死狀恐怖嵌溢,靈堂內(nèi)的尸體忽然破棺而出眯牧,到底是詐尸還是另有隱情,我是刑警寧澤赖草,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布学少,位于F島的核電站,受9級特大地震影響疚顷,放射性物質(zhì)發(fā)生泄漏旱易。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一腿堤、第九天 我趴在偏房一處隱蔽的房頂上張望阀坏。 院中可真熱鬧,春花似錦笆檀、人聲如沸忌堂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽士修。三九已至,卻和暖如春樱衷,著一層夾襖步出監(jiān)牢的瞬間棋嘲,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工矩桂, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留沸移,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓侄榴,卻偏偏與公主長得像雹锣,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子癞蚕,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容