大家好,這篇文章我們來聊下動態(tài)線程池開源項(xiàng)目(DynamicTp)的通知告警模塊近忙。目前項(xiàng)目提供以下通知告警功能竭业,每一個(gè)通知項(xiàng)都可以獨(dú)立配置是否開啟、告警閾值及舍、告警間隔時(shí)間未辆、平臺等,具體代碼請看core模塊notify包击纬。
1.核心參數(shù)變更通知
2.線程池活躍度告警
3.隊(duì)列容量告警
4.拒絕策略告警
5.任務(wù)執(zhí)行超時(shí)告警
6.任務(wù)排隊(duì)超時(shí)告警
DynamicTp項(xiàng)目地址
目前700star鼎姐,感謝你的star,歡迎pr更振,業(yè)務(wù)之余一起給開源貢獻(xiàn)一份力量
gitee地址:https://gitee.com/yanhom/dynamic-tp
github地址:https://github.com/lyh200/dynamic-tp
系列文章
美團(tuán)動態(tài)線程池實(shí)踐思路炕桨,開源了
動態(tài)線程池框架(DynamicTp),監(jiān)控及源碼解析篇
動態(tài)線程池(DynamicTp)肯腕,動態(tài)調(diào)整Tomcat献宫、Jetty、Undertow線程池參數(shù)篇
線程池解讀
[圖片上傳失敗...(image-d6e670-1649297221316)]
上篇文章里大概講到了JUC線程池的執(zhí)行流程实撒,我們這里再仔細(xì)回顧下姊途,上圖是JUC下線程池ThreadPoolExecutor類的繼承體系。
頂級接口Executor提供了一種方式知态,解耦任務(wù)的提交和執(zhí)行捷兰,只定義了一個(gè)execute(Runnable command)方法用來提交任務(wù),至于具體任務(wù)怎么執(zhí)行則交給他的實(shí)現(xiàn)者去自定義實(shí)現(xiàn)负敏。
ExecutorService接口繼承Executor贡茅,且擴(kuò)展了生命周期管理的方法、返回Futrue的方法其做、批量提交任務(wù)的方法
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
AbstractExecutorService抽象類繼承ExecutorService接口顶考,對ExecutorService相關(guān)方法提供了默認(rèn)實(shí)現(xiàn),用RunnableFuture的實(shí)現(xiàn)類FutureTask包裝Runnable任務(wù)妖泄,交給execute()方法執(zhí)行驹沿,然后可以從該FutureTask阻塞獲取執(zhí)行結(jié)果,并且對批量任務(wù)的提交做了編排
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
ThreadPoolExecutor繼承AbstractExecutorService蹈胡,采用池化思想管理一定數(shù)量的線程來調(diào)度執(zhí)行提交的任務(wù)渊季,且定義了一套線程池的生命周期狀態(tài)朋蔫,用一個(gè)ctl變量來同時(shí)保存當(dāng)前池狀態(tài)(高3位)和當(dāng)前池線程數(shù)(低29位)∷笥颍看過源碼的小伙伴會發(fā)現(xiàn)斑举,ThreadPoolExecutor類里的方法大量有同時(shí)需要獲取或更新池狀態(tài)和池當(dāng)前線程數(shù)的場景搅轿,放一個(gè)原子變量里病涨,可以很好的保證數(shù)據(jù)的一致性以及代碼的簡潔性。
// 用此變量保存當(dāng)前池狀態(tài)(高3位)和當(dāng)前線程數(shù)(低29位)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 可以接受新任務(wù)提交璧坟,也會處理任務(wù)隊(duì)列中的任務(wù)
// 結(jié)果:111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 不接受新任務(wù)提交既穆,但會處理任務(wù)隊(duì)列中的任務(wù)
// 結(jié)果:000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 不接受新任務(wù),不執(zhí)行隊(duì)列中的任務(wù)雀鹃,且會中斷正在執(zhí)行的任務(wù)
// 結(jié)果:001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 任務(wù)隊(duì)列為空幻工,workerCount = 0,線程池的狀態(tài)在轉(zhuǎn)換為TIDYING狀態(tài)時(shí)黎茎,會執(zhí)行鉤子方法terminated()
// 結(jié)果:010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 調(diào)用terminated()鉤子方法后進(jìn)入TERMINATED狀態(tài)
// 結(jié)果:010 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 低29位變?yōu)?囊颅,得到了線程池的狀態(tài)
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 高3位變?yōu)闉?,得到了線程池中的線程數(shù)
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
核心入口execute()方法執(zhí)行邏輯如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以總結(jié)出如下主要執(zhí)行流程傅瞻,當(dāng)然看上述代碼會有一些異常分支判斷踢代,可以自己順理加到下述執(zhí)行主流程里
1.判斷線程池的狀態(tài),如果不是RUNNING狀態(tài)嗅骄,直接執(zhí)行拒絕策略
2.如果當(dāng)前線程數(shù) < 核心線程池胳挎,則新建一個(gè)線程來處理提交的任務(wù)
3.如果當(dāng)前線程數(shù) > 核心線程數(shù)且任務(wù)隊(duì)列沒滿,則將任務(wù)放入任務(wù)隊(duì)列等待執(zhí)行
4.如果 核心線程池 < 當(dāng)前線程池?cái)?shù) < 最大線程數(shù)溺森,且任務(wù)隊(duì)列已滿慕爬,則創(chuàng)建新的線程執(zhí)行提交的任務(wù)
5.如果當(dāng)前線程數(shù) > 最大線程數(shù),且隊(duì)列已滿屏积,則拒絕該任務(wù)
addWorker()方法邏輯
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 獲取當(dāng)前池狀態(tài)
int rs = runStateOf(c);
// 1.判斷如果線程池狀態(tài) > SHUTDOWN医窿,直接返回false,否則2
// 2.如果線程池狀態(tài) = SHUTDOWN炊林,并且firstTask不為null則直接返回false姥卢,因?yàn)镾HUTDOWN狀態(tài)的線程池不能在接受新任務(wù),否則3
// 3.如果線程池狀態(tài) = SHUTDOWN铛铁,并且firstTask == null隔显,此時(shí)如果任務(wù)隊(duì)列為空,則直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 1.如果當(dāng)前線程池線程數(shù)大于等于CAPACITY(理論上的最大值5億)饵逐,則返回fasle
// 2.如果創(chuàng)建核心線程情況下當(dāng)前池線程數(shù) >= corePoolSize括眠,則返回false
// 3.如果創(chuàng)建非核心線程情況下當(dāng)前池線程數(shù) >= maximumPoolSize,則返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// cas 增加當(dāng)前池線程數(shù)量倍权,成功則退出循環(huán)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// cas 增加當(dāng)前池線程數(shù)量失斨啦颉(多線程并發(fā))捞烟,則重新獲取ctl,計(jì)算出當(dāng)前線程池狀態(tài)当船,如果不等于上述計(jì)算的狀態(tài)rs题画,則說明線程池狀態(tài)發(fā)生了改變,需要跳到外層循環(huán)重新進(jìn)行狀態(tài)判斷德频,否則執(zhí)行內(nèi)部循環(huán)
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 至此說明線程池狀態(tài)校驗(yàn)通過苍息,且增加池線程數(shù)量成功,則創(chuàng)建一個(gè)Worker線程來執(zhí)行任務(wù)
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 訪問worker set時(shí)需要獲取mainLock全局鎖
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 1.當(dāng)前池狀態(tài) < SHUTDOWN壹置,也就是RUNNING狀態(tài)竞思,如果已經(jīng)started,拋出異常
// 2.當(dāng)前池狀態(tài) = SHUTDOWN钞护,且firstTask == null盖喷,需要處理任務(wù)隊(duì)列中的任務(wù),如果已經(jīng)started难咕,拋出異常
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 剛創(chuàng)建線程添加到workers集合中
workers.add(w);
int s = workers.size();
// 判斷更新歷史最大線程數(shù)量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動新建線程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 啟動失敗课梳,workerCount--,workers里移除該worker
addWorkerFailed(w);
}
return workerStarted;
}
線程池中的線程并不是直接用的Thread類余佃,而是定義了一個(gè)內(nèi)部工作線程Worker類暮刃,實(shí)現(xiàn)了AQS以及Runnable接口,然后持有一個(gè)Thread類的引用及一個(gè)firstTask(創(chuàng)建后第一個(gè)要執(zhí)行的任務(wù))咙冗,每個(gè)Worker線程啟動后會執(zhí)行run()方法沾歪,該方法會調(diào)用執(zhí)行外層runWorker(Worker w)方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 1.如果task不為空,則作為該線程的第一個(gè)任務(wù)直接執(zhí)行
// 2.如果task為空雾消,則通過getTask()方法從任務(wù)隊(duì)列中獲取任務(wù)執(zhí)行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 線程池狀態(tài) >= STOP灾搏,則中斷線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 實(shí)際執(zhí)行任務(wù)前調(diào)用的鉤子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 實(shí)際執(zhí)行任務(wù)
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任務(wù)執(zhí)行后調(diào)用的鉤子方法
afterExecute(task, thrown);
}
} finally {
// 任務(wù)置為null,重新獲取新任務(wù)立润,完成數(shù)++
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 無任務(wù)可執(zhí)行狂窑,執(zhí)行worker銷毀邏輯
processWorkerExit(w, completedAbruptly);
}
}
getTask()方法邏輯
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 以下兩種情況遞減工作線程數(shù)量
// 1. rs >= STOP
// 2. rs == SHUTDOWN && workQueue.isEmpty()
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 允許核心線程超時(shí) 或者 當(dāng)前線程數(shù) > 核心線程數(shù),有可能發(fā)生超時(shí)關(guān)閉
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// wc什么情況 > maximumPoolSize桑腮,調(diào)用setMaximumPoolSize()方法將maximumPoolSize調(diào)小了泉哈,會發(fā)生這種情況,此時(shí)需要關(guān)閉多余線程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 阻塞隊(duì)列獲取任務(wù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
// 發(fā)生中斷破讨,進(jìn)行重試
timedOut = false;
}
}
}
以上內(nèi)容比較詳細(xì)的介紹了ThreadPoolExecutor的繼承體系丛晦,以及相關(guān)的核心源碼,基于此提陶,現(xiàn)在我們來看DynamicTp提供的告警通知能力烫沙。
核心參數(shù)變更通知
對應(yīng)配置中心的監(jiān)聽端監(jiān)聽到配置變更后,封裝到DtpProperties中然后交由DtpRegistry類中的refresh()方法去做配置更新隙笆,同時(shí)通知時(shí)會高亮顯示有變更的字段
[圖片上傳失敗...(image-b9dce5-1649297221316)]
線程池活躍度告警
活躍度 = activeCount / maximumPoolSize
服務(wù)啟動后會開啟一個(gè)定時(shí)監(jiān)控任務(wù)锌蓄,每隔一定時(shí)間(可配置)去計(jì)算線程池的活躍度升筏,達(dá)到配置的threshold閾值后會觸發(fā)一次告警,告警間隔內(nèi)多次觸發(fā)不會發(fā)送告警通知
[圖片上傳失敗...(image-f0a4a5-1649297221316)]
隊(duì)列容量告警
容量使用率 = queueSize / queueCapacity
服務(wù)啟動后會開啟一個(gè)定時(shí)監(jiān)控任務(wù)瘸爽,每隔一定時(shí)間去計(jì)算任務(wù)隊(duì)列的使用率您访,達(dá)到配置的threshold閾值后會觸發(fā)一次告警,告警間隔內(nèi)多次觸發(fā)不會發(fā)送告警通知
[圖片上傳失敗...(image-aca3b5-1649297221316)]
拒絕策略告警
/**
* Do sth before reject.
* @param executor ThreadPoolExecutor instance
*/
default void beforeReject(ThreadPoolExecutor executor) {
if (executor instanceof DtpExecutor) {
DtpExecutor dtpExecutor = (DtpExecutor) executor;
dtpExecutor.incRejectCount(1);
Runnable runnable = () -> AlarmManager.doAlarm(dtpExecutor, REJECT);
AlarmManager.triggerAlarm(dtpExecutor.getThreadPoolName(), REJECT.getValue(), runnable);
}
}
線程池線程數(shù)達(dá)到配置的最大線程數(shù)剪决,且任務(wù)隊(duì)列已滿灵汪,再提交任務(wù)會觸發(fā)拒絕策略。DtpExecutor線程池用到的RejectedExecutionHandler是經(jīng)過動態(tài)代理包裝過的昼捍,在執(zhí)行具體的拒絕策略之前會執(zhí)行RejectedAware類beforeReject()方法识虚,此方法會去做拒絕數(shù)量累加(總數(shù)值累加、周期值累加)妒茬。且判斷如果周期累計(jì)值達(dá)到配置的閾值,則會觸發(fā)一次告警通知(同時(shí)重置周期累加值為0及上次告警時(shí)間為當(dāng)前時(shí)間)蔚晨,告警間隔內(nèi)多次觸發(fā)不會發(fā)送告警通知
[圖片上傳失敗...(image-d784a9-1649297221316)]
任務(wù)隊(duì)列超時(shí)告警
重寫ThreadPoolExecutor的execute()方法和beforeExecute()方法乍钻,如果配置了執(zhí)行超時(shí)或排隊(duì)超時(shí)值,則會用DtpRunnable包裝任務(wù)铭腕,同時(shí)記錄任務(wù)的提交時(shí)間submitTime银择,beforeExecute根據(jù)當(dāng)前時(shí)間和submitTime的差值就可以計(jì)算到該任務(wù)在隊(duì)列中的等待時(shí)間,然后判斷如果差值大于配置的queueTimeout則累加排隊(duì)超時(shí)任務(wù)數(shù)量(總數(shù)值累加累舷、周期值累加)浩考。且判斷如果周期累計(jì)值達(dá)到配置的閾值,則會觸發(fā)一次告警通知(同時(shí)重置周期累加值為0及上次告警時(shí)間為當(dāng)前時(shí)間)被盈,告警間隔內(nèi)多次觸發(fā)不會發(fā)送告警通知
@Override
public void execute(Runnable command) {
if (CollUtil.isNotEmpty(taskWrappers)) {
for (TaskWrapper t : taskWrappers) {
command = t.wrap(command);
}
}
if (runTimeout > 0 || queueTimeout > 0) {
command = new DtpRunnable(command);
}
super.execute(command);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if (!(r instanceof DtpRunnable)) {
super.beforeExecute(t, r);
return;
}
DtpRunnable runnable = (DtpRunnable) r;
long currTime = System.currentTimeMillis();
if (runTimeout > 0) {
runnable.setStartTime(currTime);
}
if (queueTimeout > 0) {
long waitTime = currTime - runnable.getSubmitTime();
if (waitTime > queueTimeout) {
queueTimeoutCount.incrementAndGet();
Runnable alarmTask = () -> AlarmManager.doAlarm(this, QUEUE_TIMEOUT);
AlarmManager.triggerAlarm(this.getThreadPoolName(), QUEUE_TIMEOUT.getValue(), alarmTask);
}
}
super.beforeExecute(t, r);
}
[圖片上傳失敗...(image-4708ec-1649297221316)]
任務(wù)執(zhí)行超時(shí)告警
重寫ThreadPoolExecutor的afterExecute()方法析孽,根據(jù)當(dāng)前時(shí)間和beforeExecute()中設(shè)置的startTime的差值即可算出任務(wù)的實(shí)際執(zhí)行時(shí)間,然后判斷如果差值大于配置的runTimeout則累加排隊(duì)超時(shí)任務(wù)數(shù)量(總數(shù)值累加只怎、周期值累加)袜瞬。且判斷如果周期累計(jì)值達(dá)到配置的閾值,則會觸發(fā)一次告警通知(同時(shí)重置周期累加值為0及上次告警時(shí)間為當(dāng)前時(shí)間)身堡,告警間隔內(nèi)多次觸發(fā)不會發(fā)送告警通知
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (runTimeout > 0) {
DtpRunnable runnable = (DtpRunnable) r;
long runTime = System.currentTimeMillis() - runnable.getStartTime();
if (runTime > runTimeout) {
runTimeoutCount.incrementAndGet();
Runnable alarmTask = () -> AlarmManager.doAlarm(this, RUN_TIMEOUT);
AlarmManager.triggerAlarm(this.getThreadPoolName(), RUN_TIMEOUT.getValue(), alarmTask);
}
}
super.afterExecute(r, t);
}
[圖片上傳失敗...(image-bc1744-1649297221316)]
告警通知相關(guān)配置項(xiàng)
如果想使用通知告警功能邓尤,配置文件必須要配置platforms字段,且可以配置多個(gè)平臺贴谎,如釘釘汞扎、企微等;notifyItems配置具體告警項(xiàng)擅这,包括閾值澈魄、平臺、告警間隔等蕾哟。
spring:
dynamic:
tp:
# 省略其他項(xiàng)
platforms: # 通知平臺
- platform: wechat
urlKey: 38a98-0c5c3b649c
receivers: test
- platform: ding
urlKey: f80db3e801d593604f4a08dcd6a
secret: SECb5444a6f375d5b9d21
receivers: 17811511815
executors: # 動態(tài)線程池配置一忱,都有默認(rèn)值莲蜘,采用默認(rèn)值的可以不配置該項(xiàng),減少配置量
- threadPoolName: dtpExecutor1
executorType: common # 線程池類型common帘营、eager:適用于io密集型
corePoolSize: 2
maximumPoolSize: 4
queueCapacity: 200
queueType: VariableLinkedBlockingQueue # 任務(wù)隊(duì)列票渠,查看源碼QueueTypeEnum枚舉類
rejectedHandlerType: CallerRunsPolicy # 拒絕策略,查看RejectedTypeEnum枚舉類
keepAliveTime: 50
allowCoreThreadTimeOut: false
threadNamePrefix: dtp1 # 線程名前綴
waitForTasksToCompleteOnShutdown: false # 參考spring線程池設(shè)計(jì)
awaitTerminationSeconds: 5 # 單位(s)
preStartAllCoreThreads: false # 是否預(yù)熱核心線程芬迄,默認(rèn)false
runTimeout: 200 # 任務(wù)執(zhí)行超時(shí)閾值问顷,目前只做告警用,單位(ms)
queueTimeout: 100 # 任務(wù)在隊(duì)列等待超時(shí)閾值禀梳,目前只做告警用杜窄,單位(ms)
taskWrapperNames: ["ttl"] # 任務(wù)包裝器名稱,集成TaskWrapper接口
notifyItems: # 報(bào)警項(xiàng)算途,不配置自動會按默認(rèn)值配置(變更通知塞耕、容量報(bào)警、活性報(bào)警嘴瓤、拒絕報(bào)警扫外、任務(wù)超時(shí)報(bào)警)
- type: capacity # 報(bào)警項(xiàng)類型,查看源碼 NotifyTypeEnum枚舉類
threshold: 80 # 報(bào)警閾值
platforms: [ding,wechat] # 可選配置廓脆,不配置默認(rèn)拿上層platforms配置的所以平臺
interval: 120 # 報(bào)警間隔(單位:s)
- type: change
- type: liveness
threshold: 80
interval: 120
- type: reject
threshold: 1
interval: 160
- type: run_timeout
threshold: 1
interval: 120
- type: queue_timeout
threshold: 1
interval: 140
總結(jié)
本文開頭介紹了線程池ThreadPoolExecutor的繼承體系筛谚,核心流程的源碼解讀。然后介紹了DynamicTp提供的以上6種告警通知能力停忿,希望通過監(jiān)控+告警可以讓我們及時(shí)感知到我們業(yè)務(wù)線程池的執(zhí)行負(fù)載情況驾讲,第一時(shí)間做出調(diào)整,防止事故的發(fā)生席赂。
聯(lián)系我
對項(xiàng)目有什么想法或者建議吮铭,可以加我微信交流,或者創(chuàng)建issues氧枣,一起完善項(xiàng)目
公眾號:CodeFox
微信:yanhom1314