多線程池是我們最常用的并行編程工具审编,多線程是性能優(yōu)化在多核處理器時代是最常用的手段。而線程池是處理并發(fā)請求和任務的常用方法叮趴,使用線程池可以減少在創(chuàng)建和銷毀線程上所花的時間以及系統(tǒng)資源的開銷割笙,解決系統(tǒng)資源利用不足的問題,創(chuàng)建一個線程池來并發(fā)的任務看起來非常簡單眯亦,其實線程池的參數(shù)是很有講究的伤溉。
以 Java 為例,一個標準的線程池創(chuàng)建方法如下:
/** Thread Pool Executor */
public ThreadPoolExecutor(
int corePoolSize, //核心線程數(shù)
int maxPoolSize, //最大線程數(shù)
long keepAliveTime, //存活時間妻率,超過corePoolSize的空閑線程在此時間之后會被回收
TimeUnit unit, //存活時間單位
BlockingQueue<Runnable> workQueue//阻塞的任務隊列
RejectedExecutionHandler handler //當隊列已滿乱顾,線程數(shù)已達maxPoolSize時的策略
) {...}
雖然JDK 提供了一些默認實現(xiàn),比如:
- static ExecutorService newCachedThreadPool()
- static ExecutorService newFixedThreadPool(int nThreads)
- static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
這些線程池并不能滿足不了各種各樣的業(yè)務場景宫静,我們要為 ThreadPoolExecutor 設置更加合理的線程池參數(shù)來達到最優(yōu)走净,以滿足應用的性能需求。
1. 根據(jù)經(jīng)驗和通式公式按需求設置相對合理的參數(shù)
拿線程數(shù)來說孤里, 我們需要考慮線程數(shù)設置多少才合適伏伯, 這個取決于諸多因素:
- 服務器的 CPU 資源。
- 取決任務的類型和其消耗的資源情況捌袜。
如果任務是讀寫數(shù)據(jù)庫说搅, 那么它取決于數(shù)據(jù)庫連接池的連接數(shù)目, 以及數(shù)據(jù)庫的負載和性能虏等, 而如果任務是通過網(wǎng)絡訪問第三方服務弄唧,那么它取決于網(wǎng)絡負載大小适肠,以及第三方服務的負載和性能。
通常來說候引,CPU 密集型的任務占用CPU 時間較長侯养,線程數(shù)可以設置的小一點, I/O密集型的任務占用CPU時間較短澄干,線程數(shù)可以設的大一點逛揩。
我們的目的是充分利用給到我們的 CPU 資源,如果線程的任務有很多等待時間傻寂,比如等待磁盤和網(wǎng)絡I/O息尺,那么就把線程數(shù)設多一點携兵,如果任務本身非常耗費CPU的計算資源疾掰,CPU 處理時間較長,那么就把線程數(shù)設得小一點徐紧。
根據(jù)以下公式
線程數(shù) = CPU核數(shù) * 希望的CPU使用率 * (1 + 等待時間/處理時間)
假設我們的服務器為4核CPU静檬,我們要創(chuàng)建一個線程池來發(fā)送度量數(shù)據(jù)指標到遠端的 Kafka 上,網(wǎng)絡延遲約為50ms并级,數(shù)據(jù)解析編碼壓縮時間大約5ms拂檩,CPU占用率希望在10%之內(nèi)。根據(jù)下面的計算結果嘲碧,得出我們需要4.4, 約5個線程
4 * 0.1 * (1 + 50 / 5) = 4.4
于是稻励, 我們設置參數(shù)如下:
參數(shù) | 賦值 | 解釋 |
---|---|---|
int corePoolSize | 5 | 核心線程數(shù) |
int maxPoolSize | 10 | 最大線程數(shù) |
long keepAliveTime | 5000 | 線程保活時間愈涩,超過核心線程數(shù)的空閑線程在此時間之后會被回收望抽,這個值設長一點有利于避免頻繁的創(chuàng)建和銷毀線程 |
TimeUnit unit | TimeUnit.MILLISECOND | 保活時間的單位, 這里用毫秒 |
BlockingQueue<Runnable> workQueue | new LinkedBlockingQueue(500) | 暫存線程任務的阻塞隊列履婉,先入先出的場景就用LinkedBlockingQueue 好了 |
ThreadFactory threadfactory | new DefaultThreadFactory() | 線程創(chuàng)建工廠 |
RejectedExecutionHandler handler | new DefaultRejectedExecutionHandler() | 當線程隊列和線程數(shù)已滿煤篙,或者線程池關閉,對新任務的拒絕服務策略毁腿,內(nèi)置的有4種策略: 1) AbortPolicy, 2) CallerRunsPolicy, 3) DiscardPolicy, 4) DiscardOldestPolicy |
2. 根據(jù)度量指標進行調(diào)整
為了進行充分的度量辑奈,我們必需對線程池的各種指標進行記錄和展示。
先來簡單了解一些度量術語已烤,詳情參見https://metrics.dropwizard.io/4.1.2/manual/core.html
MetricRegistry
各種度量數(shù)據(jù)的容器鸠窗,類似于 windows 的系統(tǒng)注冊表,各項度量數(shù)據(jù)都可以在其中進行注冊胯究。
度量類型
Gauge 計量器稍计,它代表一個瞬時變化的值,比如連接數(shù)唐片,線程數(shù)等
Counter 計數(shù)器丙猬,它代表一個連續(xù)變化的值涨颜,比如線程隊列長度,不會突變茧球,但是會遞增或遞減
Meter 測量儀, 它用來統(tǒng)計基于時間單位的處理速率庭瑰,比如TPS(每秒事務數(shù)), DAU(日均活躍用戶)等
Timer 計時器抢埋,它用來統(tǒng)計所花費時間的統(tǒng)計分布值弹灭,比如線程的忙閑程度,平均響應時間等
線程相關度量指標
- 線程數(shù): 最大揪垄,最小和實時的線程數(shù)
- 線程隊列長度: 最大長度限制和實時長度
- 任務處理速率:任務提交與完成速度
- 任務運行數(shù)量
- 線程的忙閑比
- 任務被拒絕的數(shù)量
- 任務在隊列中等待的時間:最大和實時的等待時間
- 超過最大等待時間的任務數(shù)量
線程的度量與監(jiān)控的方法
創(chuàng)建線程池并注冊各項度量指標
運行線程池并收集度量指標
觀察度量指標并相應地調(diào)整參數(shù)
線程的度量與監(jiān)控的實例
我們可以應用 dropwizard 的 metrics 庫中的 https://metrics.dropwizard.io/ 類庫 InstrumentedExecutorService 來幫助我們進行上述指標的統(tǒng)計穷吮,部分關鍵代碼如下:
InstrumentedExecutorService
public class InstrumentedExecutorService implements ExecutorService {
private static final AtomicLong NAME_COUNTER = new AtomicLong();
private final ExecutorService delegate;
private final Meter submitted;
private final Counter running;
private final Meter completed;
private final Timer idle;
private final Timer duration;
public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry) {
this(delegate, registry, "instrumented-delegate-" + NAME_COUNTER.incrementAndGet());
}
public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry, String name) {
this.delegate = delegate;
this.submitted = registry.meter(MetricRegistry.name(name, new String[]{"submitted"}));
this.running = registry.counter(MetricRegistry.name(name, new String[]{"running"}));
this.completed = registry.meter(MetricRegistry.name(name, new String[]{"completed"}));
this.idle = registry.timer(MetricRegistry.name(name, new String[]{"idle"}));
this.duration = registry.timer(MetricRegistry.name(name, new String[]{"duration"}));
}
//...
private class InstrumentedRunnable implements Runnable {
private final Runnable task;
private final Timer.Context idleContext;
InstrumentedRunnable(Runnable task) {
this.task = task;
this.idleContext = idle.time();
}
@Override
public void run() {
idleContext.stop();
running.inc();
try (Timer.Context durationContext = duration.time()) {
task.run();
} finally {
running.dec();
completed.mark();
}
}
}
}
它通過裝飾器模式對原來的 Executor Service 進行包裝,記錄了 submited, running, completed, idle , duration 這些指標饥努,我們可以另外再記錄一些指標捡鱼,部分代碼如下:
1) 先定義一個線程池參數(shù)對象
package com.github.walterfan.helloconcurrency;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import java.time.Duration;
/**
* @Author: Walter Fan
**/
@Getter
@Setter
@Builder
public class ThreadPoolParam {
private int minPoolSize;
private int maxPoolSize;
private Duration keepAliveTime;
private int queueSize;
private String threadPrefix;
private boolean daemon;
private MetricRegistry metricRegistry;
}
2) 再寫一個創(chuàng)建線程池的工具類:
- ThreadPoolUtil.java
package com.github.walterfan.helloconcurrency;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static com.codahale.metrics.MetricRegistry.name;
/**
* @Author: Walter Fan
**/
@Slf4j
public class ThreadPoolUtil {
/*
和系統(tǒng)內(nèi)置的 ThreadPoolExecutor.CallerRunsPolicy 差不多,
如果被拒絕酷愧,就用提交任務的線程來執(zhí)行任務.
*/
public static class DiscardAndLogPolicy implements RejectedExecutionHandler {
final MetricRegistry metricRegistry;
final Meter rejectedMeter;
final Counter rejectedCounter;
public DiscardAndLogPolicy(String threadPrefix, MetricRegistry metricRegistry) {
this.metricRegistry = metricRegistry;
this.rejectedMeter = metricRegistry.meter(threadPrefix + ".rejected-meter");
this.rejectedCounter = metricRegistry.counter(threadPrefix + ".rejected-counter");
}
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
rejectedMeter.mark();
rejectedCounter.inc();
if (!e.isShutdown()) {
log.warn("reject task and run {} directly", r);
r.run();
}
}
}
//創(chuàng)建線程執(zhí)行器驾诈,注冊了幾個度量指標
public static ThreadPoolExecutor createThreadExecutor(ThreadPoolParam threadPoolParam) {
MetricRegistry metricRegistry = threadPoolParam.getMetricRegistry();
metricRegistry.register(threadPoolParam.getThreadPrefix() + ".min", createIntGauge(() -> threadPoolParam.getMinPoolSize()));
metricRegistry.register(threadPoolParam.getThreadPrefix() + ".max", createIntGauge(() -> threadPoolParam.getMaxPoolSize()));
metricRegistry.register(threadPoolParam.getThreadPrefix() + ".queue_limitation", createIntGauge(() -> threadPoolParam.getQueueSize()));
ThreadPoolExecutor executor = new ThreadPoolExecutor(threadPoolParam.getMinPoolSize(),
threadPoolParam.getMaxPoolSize(),
threadPoolParam.getKeepAliveTime().toMillis(),
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(threadPoolParam.getQueueSize()),
createThreadFactory(threadPoolParam),
createRejectedExecutionHandler(threadPoolParam));
metricRegistry.register(threadPoolParam.getThreadPrefix() + ".pool_size", createIntGauge(() -> executor.getPoolSize()));
metricRegistry.register(threadPoolParam.getThreadPrefix() + ".queue_size", createIntGauge(() -> executor.getQueue().size()));
return executor;
}
//創(chuàng)建線程執(zhí)行服務,用 InstrumentedExecutorService 來包裝和度量線程任務
public static ExecutorService createExecutorService(ThreadPoolParam threadPoolParam) {
ThreadPoolExecutor executor = createThreadExecutor(threadPoolParam);
return new InstrumentedExecutorService(executor,
threadPoolParam.getMetricRegistry(),
threadPoolParam.getThreadPrefix());
}
private static Gauge<Integer> createIntGauge(Supplier<Integer> suppier) {
return () -> suppier.get();
}
public static ThreadFactory createThreadFactory(ThreadPoolParam threadPoolParam) {
return new ThreadFactoryBuilder()
.setDaemon(threadPoolParam.isDaemon())
.setNameFormat(threadPoolParam.getThreadPrefix() + "-%d")
.build();
}
public static RejectedExecutionHandler createRejectedExecutionHandler(ThreadPoolParam threadPoolParam) {
return new DiscardAndLogPolicy(threadPoolParam.getThreadPrefix(), threadPoolParam.getMetricRegistry());
}
}
注意: 我們在這個線程池中埋設了12個度量指標溶浴,看你能不能在代碼中找出來設置的地方 乍迄。
- cards-thread-pool.completed
- cards-thread-pool.max
- cards-thread-pool.queue_limitation
- cards-thread-pool.rejected-meter
- cards-thread-pool.duration
- cards-thread-pool.min
- cards-thread-pool.queue_size
- cards-thread-pool.running
- cards-thread-pool.idle
- cards-thread-pool.pool_size
- cards-thread-pool.rejected-counter
- cards-thread-pool.submitted
3)用線程池執(zhí)行多副撲克牌的排序任務
以我們最常用的打撲克牌為例,分別用冒泡排序士败,插入排序和 JDK 自帶的 TimSort 來對若干副牌排序闯两,總共創(chuàng)建20個任務,都放入線程池中執(zhí)行谅将,當我們采用不同的線程池參數(shù)時漾狼,效果大不相同。
3.1) 撲克牌對象類
- Poker.java
package com.github.walterfan.helloconcurrency;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @Author: Walter Fan
**/
public class Poker {
public static class Card {
enum Suite {
Spades(4), Hearts(3), Clubs(2), Diamonds(1);
int value;
Suite(int value) {
this.value = value;
}
private static Map<Integer, Suite> valueMap = new HashMap<>();
static {
for (Suite suite : Suite.values()) {
valueMap.put(suite.value, suite);
}
}
public static Suite valueOf(int pageType) {
return valueMap.get(pageType);
}
}
Suite suite;
//1~13
int point;
public Card(int suiteValue, int point) {
this.suite = Suite.valueOf(suiteValue);
this.point = point;
}
public String toString() {
String strPoint = Integer.toString(point);
if (point > 10) {
switch (point) {
case 11:
strPoint = "J";
break;
case 12:
strPoint = "Q";
break;
case 13:
strPoint = "K";
break;
}
}
return suite.name() + ":" + strPoint;
}
public int getScore() {
return suite.value * 100 + point;
}
}
public static List<Card> createCardList(int suiteCount) {
List<Card> cards = new ArrayList<>(52);
for(int i = 1; i < 5; i++) {
for(int j = 1; j < 14 ;++j) {
cards.add(new Card(i, j));
}
}
List<Card> totalCards = new ArrayList<>(suiteCount );
for(int j = 0; j < suiteCount; j++) {
totalCards.addAll(new ArrayList<>(cards));
}
Collections.shuffle(totalCards);
return totalCards;
}
public static class CardComparator implements Comparator<Card> {
@Override
public int compare(Card o1, Card o2) {
return o1.getScore() - o2.getScore();
}
}
}
3.2) 排序任務類
任務很簡單戏自,就象我們平常打撲克那樣邦投,將幾副牌排序,可用三種排序方法
1)冒泡排序
2)插入排序
3)Tim 排序擅笔,JDK7 中用的一種結合了插入排序和歸并排序的高效排序方法
- SortCardTask.java
package com.github.walterfan.helloconcurrency;
import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* @Author: Walter Fan
**/
@Slf4j
public class SortCardTask implements Callable<Long> {
public enum SortMethod { BUBBLE_SORT, INSERT_SORT, TIM_SORT}
private final List<Poker.Card> cards;
private final SortMethod sortMethod;
private final int taskNumber;
private final AtomicInteger taskCounter;
public SortCardTask(List<Poker.Card> cards, SortMethod method, int taskNumber, AtomicInteger taskCounter) {
this.cards = cards;
this.sortMethod = method;
this.taskNumber = taskNumber;
this.taskCounter = taskCounter;
}
@Override
public Long call() {
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("* {} begin to sort {} cards by {}", this.taskNumber, cards.size(), sortMethod);
switch(sortMethod) {
case BUBBLE_SORT:
bubbleSort(cards, new Poker.CardComparator());
break;
case INSERT_SORT:
insertSort(cards, new Poker.CardComparator());
break;
case TIM_SORT:
timSort(cards, new Poker.CardComparator());
break;
}
stopwatch.stop();
long millis = stopwatch.elapsed(MILLISECONDS);
log.info("* {} end to sort {} cards sort by {} spend {} milliseconds - {}" , this.taskNumber, cards.size(), sortMethod, millis, stopwatch); // formatted string like "12.3 ms"
taskCounter.incrementAndGet();
return millis;
}
public static <T> void bubbleSort(List<T> aList, Comparator<T> comparator) {
boolean sorted = false;
int loopCount = aList.size() - 1;
while (!sorted) {
sorted = true;
for (int i = 0; i < loopCount; i++) {
if (comparator.compare(aList.get(i), aList.get(i + 1)) > 0) {
Collections.swap(aList, i, i + 1);
sorted = false;
}
}
}
}
public static <T> void insertSort(List<T> aList, Comparator<T> comparator) {
int size = aList.size();
for (int i = 1; i < size; ++i) {
T selected = aList.get(i);
if (size < 10) {
log.info("{} insert to {}", selected, aList.subList(0, i).stream().map(String::valueOf).collect(Collectors.joining(", ")));
}
int j = i - 1;
//find a position for insert currentElement in the left sorted collection
while (j >= 0 && comparator.compare(selected, aList.get(j)) < 0) {
//it does not overwrite existed element because the j+1=i that is currentElement at beginging
aList.set(j + 1, aList.get(j));
j--;
}
aList.set(j + 1, selected);
}
}
public static <T> void timSort(List<T> aList, Comparator<T> comparator) {
aList.stream().sorted(comparator).collect(Collectors.toList());
}
@Override
public String toString() {
return "SortCardTask{" +
"taskNumber=" + taskNumber +
", sortMethod=" + sortMethod +
'}';
}
}
3.3) 線程池演示類
- ThreadPoolDemo.java
package com.github.walterfan.helloconcurrency;
import com.codahale.metrics.CsvReporter;
import com.codahale.metrics.Slf4jReporter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author: Walter Fan
**/
@Slf4j
public class ThreadPoolDemo {
private static AtomicInteger finishCounter = new AtomicInteger(0);
private AtomicInteger taskNumber = new AtomicInteger(0);
private ExecutorService executorService;
public ThreadPoolDemo(ThreadPoolParam threadPoolParam) {
executorService = ThreadPoolUtil.createExecutorService(threadPoolParam);
}
public Callable<Long> createTask(int cardSuiteCount, SortCardTask.SortMethod method) {
List<Poker.Card> cards = Poker.createCardList(cardSuiteCount);
return new SortCardTask(cards, method, taskNumber.incrementAndGet(), finishCounter);
}
public List<Future<Long>> exeucteTasks(List<Callable<Long>> tasks, Duration waitTime) {
try {
return this.executorService.invokeAll(tasks, waitTime.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("invokeAll interrupt", e);
return Collections.emptyList();
}
}
public void waitUntil(long ms) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(ms, TimeUnit.MILLISECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException ex) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) {
Stopwatch stopwatch = Stopwatch.createStarted();
log.info("--- start ---");
MetricRegistry metricRegistry = new MetricRegistry();
final CsvReporter csvReporter = CsvReporter.forRegistry(metricRegistry)
.formatFor(Locale.US)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build(new File("./"));
csvReporter.start(100, TimeUnit.MILLISECONDS);
/* final Slf4jReporter logReporter = Slf4jReporter.forRegistry(metricRegistry)
.outputTo(log)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build();
logReporter.start(1, TimeUnit.MINUTES);*/
ThreadPoolParam threadPoolParam = ThreadPoolParam.builder()
.minPoolSize(1)
.maxPoolSize(4)
.daemon(true)
.keepAliveTime(Duration.ofSeconds(1))
.queueSize(5)
.threadPrefix("cards-thread-pool")
.metricRegistry(metricRegistry)
.build();
ThreadPoolDemo demo = new ThreadPoolDemo(threadPoolParam);
List<Callable<Long>> tasks = new ArrayList<>();
//30 tasks, 10, 40, 90 ... 1000 suite cards
for(int i=1; i<=10; i++) {
tasks.add(demo.createTask(i*i*10, SortCardTask.SortMethod.BUBBLE_SORT));
tasks.add(demo.createTask(i*i*10, SortCardTask.SortMethod.INSERT_SORT));
tasks.add(demo.createTask(i*i*10, SortCardTask.SortMethod.TIM_SORT));
}
List<Future<Long>> results = demo.exeucteTasks(tasks, Duration.ofMinutes(1));
//logReporter.report();
stopwatch.stop();
log.info("--- end finish {}, spent {} ---", finishCounter.get(), stopwatch);
results.stream().filter(x -> !x.isDone()).forEach(x -> log.info("{} is not done", x));
}
}
上述代碼讓線程池執(zhí)行了30個排序任務志衣,最多排序1000副牌(52000張),
10任務用冒泡排序,10個任務用插入排序猛们,10個任務用 Tim 排序, 總共花了18秒多
執(zhí)行結果如下:
17:09:47.341 [main] INFO com.github.walterfan.helloconcurrency.ThreadPoolDemo - --- start ---
17:09:47.497 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 1 begin to sort 520 cards (10 suite) by BUBBLE_SORT
17:09:47.496 [main] WARN com.github.walterfan.helloconcurrency.ThreadPoolUtil - reject task and run java.util.concurrent.FutureTask@61baa894 directly
17:09:47.497 [cards-thread-pool-1] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 7 begin to sort 4680 cards (90 suite) by BUBBLE_SORT
17:09:47.497 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 9 begin to sort 4680 cards (90 suite) by TIM_SORT
17:09:47.497 [cards-thread-pool-2] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 8 begin to sort 4680 cards (90 suite) by INSERT_SORT
17:09:47.498 [main] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 10 begin to sort 8320 cards (160 suite) by BUBBLE_SORT
17:09:47.515 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 9 end to sort 4680 cards (90 suite)sort by TIM_SORT spend 17 milliseconds - 17.70 ms
17:09:47.520 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 2 begin to sort 520 cards (10 suite) by INSERT_SORT
17:09:47.520 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 1 end to sort 520 cards (10 suite)sort by BUBBLE_SORT spend 22 milliseconds - 22.91 ms
17:09:47.521 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 3 begin to sort 520 cards (10 suite) by TIM_SORT
17:09:47.522 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 3 end to sort 520 cards (10 suite)sort by TIM_SORT spend 0 milliseconds - 528.3 μs
17:09:47.522 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 4 begin to sort 2080 cards (40 suite) by BUBBLE_SORT
17:09:47.531 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 2 end to sort 520 cards (10 suite)sort by INSERT_SORT spend 10 milliseconds - 10.60 ms
17:09:47.531 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 5 begin to sort 2080 cards (40 suite) by INSERT_SORT
17:09:47.546 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 5 end to sort 2080 cards (40 suite)sort by INSERT_SORT spend 14 milliseconds - 14.85 ms
17:09:47.547 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 6 begin to sort 2080 cards (40 suite) by TIM_SORT
17:09:47.548 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 6 end to sort 2080 cards (40 suite)sort by TIM_SORT spend 1 milliseconds - 1.666 ms
17:09:47.560 [cards-thread-pool-2] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 8 end to sort 4680 cards (90 suite)sort by INSERT_SORT spend 63 milliseconds - 63.22 ms
17:09:47.579 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 4 end to sort 2080 cards (40 suite)sort by BUBBLE_SORT spend 57 milliseconds - 57.37 ms
17:09:47.715 [cards-thread-pool-1] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 7 end to sort 4680 cards (90 suite)sort by BUBBLE_SORT spend 218 milliseconds - 218.0 ms
17:09:48.171 [main] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 10 end to sort 8320 cards (160 suite)sort by BUBBLE_SORT spend 672 milliseconds - 672.3 ms
17:09:48.171 [main] WARN com.github.walterfan.helloconcurrency.ThreadPoolUtil - reject task and run java.util.concurrent.FutureTask@b065c63 directly
17:09:48.171 [main] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 16 begin to sort 18720 cards (360 suite) by BUBBLE_SORT
17:09:48.171 [cards-thread-pool-2] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 12 begin to sort 8320 cards (160 suite) by TIM_SORT
17:09:48.172 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 13 begin to sort 13000 cards (250 suite) by BUBBLE_SORT
17:09:48.171 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 11 begin to sort 8320 cards (160 suite) by INSERT_SORT
17:09:48.173 [cards-thread-pool-1] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 14 begin to sort 13000 cards (250 suite) by INSERT_SORT
17:09:48.178 [cards-thread-pool-2] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 12 end to sort 8320 cards (160 suite)sort by TIM_SORT spend 6 milliseconds - 6.314 ms
17:09:48.178 [cards-thread-pool-2] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 15 begin to sort 13000 cards (250 suite) by TIM_SORT
17:09:48.187 [cards-thread-pool-2] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 15 end to sort 13000 cards (250 suite)sort by TIM_SORT spend 8 milliseconds - 8.673 ms
17:09:48.228 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 11 end to sort 8320 cards (160 suite)sort by INSERT_SORT spend 56 milliseconds - 56.62 ms
17:09:48.333 [cards-thread-pool-1] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 14 end to sort 13000 cards (250 suite)sort by INSERT_SORT spend 159 milliseconds - 159.2 ms
17:09:49.595 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 13 end to sort 13000 cards (250 suite)sort by BUBBLE_SORT spend 1423 milliseconds - 1.424 s
17:09:50.520 [main] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 16 end to sort 18720 cards (360 suite)sort by BUBBLE_SORT spend 2348 milliseconds - 2.348 s
17:09:50.520 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 17 begin to sort 18720 cards (360 suite) by INSERT_SORT
17:09:50.520 [cards-thread-pool-4] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 22 begin to sort 33280 cards (640 suite) by BUBBLE_SORT
17:09:50.521 [main] WARN com.github.walterfan.helloconcurrency.ThreadPoolUtil - reject task and run java.util.concurrent.FutureTask@449b2d27 directly
17:09:50.521 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 24 begin to sort 33280 cards (640 suite) by TIM_SORT
17:09:50.521 [main] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 26 begin to sort 42120 cards (810 suite) by INSERT_SORT
17:09:50.521 [cards-thread-pool-6] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 25 begin to sort 42120 cards (810 suite) by BUBBLE_SORT
17:09:50.537 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 24 end to sort 33280 cards (640 suite)sort by TIM_SORT spend 16 milliseconds - 16.03 ms
17:09:50.537 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 18 begin to sort 18720 cards (360 suite) by TIM_SORT
17:09:50.545 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 18 end to sort 18720 cards (360 suite)sort by TIM_SORT spend 7 milliseconds - 7.866 ms
17:09:50.545 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 19 begin to sort 25480 cards (490 suite) by BUBBLE_SORT
17:09:50.772 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 17 end to sort 18720 cards (360 suite)sort by INSERT_SORT spend 251 milliseconds - 251.9 ms
17:09:50.772 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 20 begin to sort 25480 cards (490 suite) by INSERT_SORT
17:09:51.614 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 20 end to sort 25480 cards (490 suite)sort by INSERT_SORT spend 841 milliseconds - 841.2 ms
17:09:51.614 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 21 begin to sort 25480 cards (490 suite) by TIM_SORT
17:09:51.615 [main] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 26 end to sort 42120 cards (810 suite)sort by INSERT_SORT spend 1093 milliseconds - 1.094 s
17:09:51.642 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 21 end to sort 25480 cards (490 suite)sort by TIM_SORT spend 28 milliseconds - 28.11 ms
17:09:51.643 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 23 begin to sort 33280 cards (640 suite) by INSERT_SORT
17:09:52.308 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 23 end to sort 33280 cards (640 suite)sort by INSERT_SORT spend 664 milliseconds - 664.9 ms
17:09:52.308 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 27 begin to sort 42120 cards (810 suite) by TIM_SORT
17:09:52.318 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 27 end to sort 42120 cards (810 suite)sort by TIM_SORT spend 9 milliseconds - 9.679 ms
17:09:52.318 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 28 begin to sort 52000 cards (1000 suite) by BUBBLE_SORT
17:09:55.392 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 19 end to sort 25480 cards (490 suite)sort by BUBBLE_SORT spend 4846 milliseconds - 4.847 s
17:09:55.392 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 29 begin to sort 52000 cards (1000 suite) by INSERT_SORT
17:09:56.511 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 29 end to sort 52000 cards (1000 suite)sort by INSERT_SORT spend 1119 milliseconds - 1.119 s
17:09:56.512 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 30 begin to sort 52000 cards (1000 suite) by TIM_SORT
17:09:56.523 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 30 end to sort 52000 cards (1000 suite)sort by TIM_SORT spend 11 milliseconds - 11.68 ms
17:09:58.528 [cards-thread-pool-4] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 22 end to sort 33280 cards (640 suite)sort by BUBBLE_SORT spend 8007 milliseconds - 8.008 s
17:10:02.026 [cards-thread-pool-6] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 25 end to sort 42120 cards (810 suite)sort by BUBBLE_SORT spend 11504 milliseconds - 11.50 s
17:10:07.127 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 28 end to sort 52000 cards (1000 suite)sort by BUBBLE_SORT spend 14808 milliseconds - 14.81 s
17:10:07.128 [main] INFO com.github.walterfan.helloconcurrency.ThreadPoolDemo - --- end finish 30, spent 19.79 s ---
我用 CsvReporter 把若干度量指標打印到Csv文件中念脯,共有如下12個 CSV 文件
- cards-thread-pool.completed.csv
- cards-thread-pool.max.csv
- cards-thread-pool.queue_limitation.csv
- cards-thread-pool.rejected-meter.csv
- cards-thread-pool.duration.csv
- cards-thread-pool.min.csv
- cards-thread-pool.queue_size.csv
- cards-thread-pool.running.csv
- cards-thread-pool.idle.csv
- cards-thread-pool.pool_size.csv
- cards-thread-pool.rejected-counter.csv
- cards-thread-pool.submitted.csv
基于這些度量指標,我們可以看到任務特點和線程池的參數(shù)是否合理
1) 線程任務執(zhí)行時間
看結果三種排序方法的效率差別很大弯淘,只排兩副牌時绿店,三種方法差不太多,而排序1000副牌(52000張)時, TimSort 花了大約11 毫秒假勿, InsertSort 花了大約 1 秒 借嗽,而 BubbleSort 花了14 秒多。
對于任務執(zhí)行時間转培,我們可以通過記錄的度量指標文件來作一個分析恶导,簡單畫一個線形圖,
- csv 文件內(nèi)容 cards-thread-pool.duration.csv
t,count,max,mean,min,stddev,p50,p75,p95,p98,p99,p999,mean_rate,m1_rate,m5_rate,m15_rate,rate_unit,duration_unit
1585473845,0,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,calls/second,milliseconds
1585473846,0,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,calls/second,milliseconds
1585473846,8,63.069663,37.658003,2.579674,25.915099,39.007040,61.809917,63.069663,63.069663,63.069663,63.069663,33.614807,0.000000,0.000000,0.000000,calls/second,milliseconds
1585473846,8,63.069663,37.658003,2.579674,25.915099,39.007040,61.809917,63.069663,63.069663,63.069663,63.069663,23.777415,0.000000,0.000000,0.000000,calls/second,milliseconds
1585473846,8,63.069663,37.658003,2.579674,25.915099,39.007040,61.809917,63.069663,63.069663,63.069663,63.069663,18.448678,0.000000,0.000000,0.000000,calls/second,milliseconds
1585473846,9,384.357751,76.180197,2.579674,111.663094,58.345330,62.339820,384.357751,384.357751,384.357751,384.357751,16.884697,0.000000,0.000000,0.000000,calls/second,milliseconds
# 省略余下內(nèi)容
- 分析任務執(zhí)行時間的 Python 腳本
import matplotlib.pyplot as plt
import pandas as pd
durations = pd.read_csv('cards-thread-pool.duration.csv')
print(durations.head(1))
plt.plot(durations['t'], durations['max'], label = 'max')
plt.plot(durations['t'], durations['mean'], label = 'mean')
plt.plot(durations['t'], durations['min'], label = 'min')
plt.ylabel("milliSeconds")
plt.xlabel("timestamp")
plt.legend(prop = {'size': 10})
plt.show()
2) 線程池中的線程數(shù)變化
import matplotlib.pyplot as plt
import pandas as pd
durations = pd.read_csv('cards-thread-pool.pool_size.csv')
print(durations.head(1))
plt.plot(durations['t'], durations['value'], label = 'pool size')
plt.ylabel("thread count")
plt.xlabel("timestamp")
plt.legend(prop = {'size': 10})
plt.show()
線程池的線程數(shù)應該比較平穩(wěn)浸须,避免頻繁的創(chuàng)建和銷毀線程惨寿,這張圖揭示如果系統(tǒng)資源足夠的話,corePoolSize, maxPoolSize 和 keepAliveTime 時間可以適當調(diào)大删窒。
線程池隊列長度
import matplotlib.pyplot as plt
import pandas as pd
durations = pd.read_csv('cards-thread-pool.queue_size.csv')
print(durations.head(1))
plt.plot(durations['t'], durations['value'], label = 'queue size')
plt.ylabel("queue size")
plt.xlabel("timestamp")
plt.legend(prop = {'size': 10})
plt.show()
線程池拒絕的任務數(shù)
import matplotlib.pyplot as plt
import pandas as pd
durations = pd.read_csv('cards-thread-pool.rejected-counter.csv')
print(durations.head(1))
plt.plot(durations['t'], durations['count'], label = 'rejected count')
plt.ylabel("rejected count")
plt.xlabel("timestamp")
plt.legend(prop = {'size': 10})
plt.show()
基于被拒絕的任務數(shù)來看裂垦,顯然核心線程數(shù)和隊列長度應該增大。
在實際工作中肌索, ConsoleReporter, Slf4jReporter蕉拢,還是CsvReporter 這些 metrics-core 自帶的報告器都是定時采樣并打印度量指標,分析查詢很不方便驶社。
它們都是對線程池的度量指標定時采樣記錄企量,我們可以利用一些時間序列數(shù)據(jù)庫(例如 InfluxDB,Promethues 等)將這些指標保存起來亡电,再利用報表分析工具(Grafana, Graphite等)對它們進行分析。