線程池的監(jiān)控與優(yōu)化

多線程池是我們最常用的并行編程工具审编,多線程是性能優(yōu)化在多核處理器時代是最常用的手段。而線程池是處理并發(fā)請求和任務的常用方法叮趴,使用線程池可以減少在創(chuàng)建和銷毀線程上所花的時間以及系統(tǒng)資源的開銷割笙,解決系統(tǒng)資源利用不足的問題,創(chuàng)建一個線程池來并發(fā)的任務看起來非常簡單眯亦,其實線程池的參數(shù)是很有講究的伤溉。

以 Java 為例,一個標準的線程池創(chuàng)建方法如下:

/** Thread Pool Executor */
public ThreadPoolExecutor(
   int corePoolSize, //核心線程數(shù)
   int maxPoolSize, //最大線程數(shù)
   long keepAliveTime, //存活時間妻率,超過corePoolSize的空閑線程在此時間之后會被回收
   TimeUnit unit, //存活時間單位
   BlockingQueue<Runnable> workQueue//阻塞的任務隊列
   RejectedExecutionHandler handler //當隊列已滿乱顾,線程數(shù)已達maxPoolSize時的策略
) {...}

雖然JDK 提供了一些默認實現(xiàn),比如:

  • static ExecutorService newCachedThreadPool()
  • static ExecutorService newFixedThreadPool(int nThreads)
  • static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

這些線程池并不能滿足不了各種各樣的業(yè)務場景宫静,我們要為 ThreadPoolExecutor 設置更加合理的線程池參數(shù)來達到最優(yōu)走净,以滿足應用的性能需求。

1. 根據(jù)經(jīng)驗和通式公式按需求設置相對合理的參數(shù)

拿線程數(shù)來說孤里, 我們需要考慮線程數(shù)設置多少才合適伏伯, 這個取決于諸多因素:

  • 服務器的 CPU 資源。
  • 取決任務的類型和其消耗的資源情況捌袜。

如果任務是讀寫數(shù)據(jù)庫说搅, 那么它取決于數(shù)據(jù)庫連接池的連接數(shù)目, 以及數(shù)據(jù)庫的負載和性能虏等, 而如果任務是通過網(wǎng)絡訪問第三方服務弄唧,那么它取決于網(wǎng)絡負載大小适肠,以及第三方服務的負載和性能。
通常來說候引,CPU 密集型的任務占用CPU 時間較長侯养,線程數(shù)可以設置的小一點, I/O密集型的任務占用CPU時間較短澄干,線程數(shù)可以設的大一點逛揩。
我們的目的是充分利用給到我們的 CPU 資源,如果線程的任務有很多等待時間傻寂,比如等待磁盤和網(wǎng)絡I/O息尺,那么就把線程數(shù)設多一點携兵,如果任務本身非常耗費CPU的計算資源疾掰,CPU 處理時間較長,那么就把線程數(shù)設得小一點徐紧。

根據(jù)以下公式

線程數(shù) = CPU核數(shù) * 希望的CPU使用率 * (1 + 等待時間/處理時間)

假設我們的服務器為4核CPU静檬,我們要創(chuàng)建一個線程池來發(fā)送度量數(shù)據(jù)指標到遠端的 Kafka 上,網(wǎng)絡延遲約為50ms并级,數(shù)據(jù)解析編碼壓縮時間大約5ms拂檩,CPU占用率希望在10%之內(nèi)。根據(jù)下面的計算結果嘲碧,得出我們需要4.4, 約5個線程

4 * 0.1 * (1 + 50 / 5) = 4.4

于是稻励, 我們設置參數(shù)如下:

參數(shù) 賦值 解釋
int corePoolSize 5 核心線程數(shù)
int maxPoolSize 10 最大線程數(shù)
long keepAliveTime 5000 線程保活時間愈涩,超過核心線程數(shù)的空閑線程在此時間之后會被回收望抽,這個值設長一點有利于避免頻繁的創(chuàng)建和銷毀線程
TimeUnit unit TimeUnit.MILLISECOND 保活時間的單位, 這里用毫秒
BlockingQueue<Runnable> workQueue new LinkedBlockingQueue(500) 暫存線程任務的阻塞隊列履婉,先入先出的場景就用LinkedBlockingQueue 好了
ThreadFactory threadfactory new DefaultThreadFactory() 線程創(chuàng)建工廠
RejectedExecutionHandler handler new DefaultRejectedExecutionHandler() 當線程隊列和線程數(shù)已滿煤篙,或者線程池關閉,對新任務的拒絕服務策略毁腿,內(nèi)置的有4種策略:
1) AbortPolicy,
2) CallerRunsPolicy,
3) DiscardPolicy,
4) DiscardOldestPolicy

2. 根據(jù)度量指標進行調(diào)整

為了進行充分的度量辑奈,我們必需對線程池的各種指標進行記錄和展示。
先來簡單了解一些度量術語已烤,詳情參見https://metrics.dropwizard.io/4.1.2/manual/core.html

MetricRegistry

各種度量數(shù)據(jù)的容器鸠窗,類似于 windows 的系統(tǒng)注冊表,各項度量數(shù)據(jù)都可以在其中進行注冊胯究。

度量類型

  • Gauge 計量器稍计,它代表一個瞬時變化的值,比如連接數(shù)唐片,線程數(shù)等

  • Counter 計數(shù)器丙猬,它代表一個連續(xù)變化的值涨颜,比如線程隊列長度,不會突變茧球,但是會遞增或遞減

  • Meter 測量儀, 它用來統(tǒng)計基于時間單位的處理速率庭瑰,比如TPS(每秒事務數(shù)), DAU(日均活躍用戶)等

  • Timer 計時器抢埋,它用來統(tǒng)計所花費時間的統(tǒng)計分布值弹灭,比如線程的忙閑程度,平均響應時間等

線程相關度量指標

  1. 線程數(shù): 最大揪垄,最小和實時的線程數(shù)
  2. 線程隊列長度: 最大長度限制和實時長度
  3. 任務處理速率:任務提交與完成速度
  4. 任務運行數(shù)量
  5. 線程的忙閑比
  6. 任務被拒絕的數(shù)量
  7. 任務在隊列中等待的時間:最大和實時的等待時間
  8. 超過最大等待時間的任務數(shù)量

線程的度量與監(jiān)控的方法

  1. 創(chuàng)建線程池并注冊各項度量指標

  2. 運行線程池并收集度量指標

  3. 觀察度量指標并相應地調(diào)整參數(shù)

線程的度量與監(jiān)控的實例

我們可以應用 dropwizard 的 metrics 庫中的 https://metrics.dropwizard.io/ 類庫 InstrumentedExecutorService 來幫助我們進行上述指標的統(tǒng)計穷吮,部分關鍵代碼如下:

InstrumentedExecutorService

public class InstrumentedExecutorService implements ExecutorService {
    private static final AtomicLong NAME_COUNTER = new AtomicLong();
    private final ExecutorService delegate;
    private final Meter submitted;
    private final Counter running;
    private final Meter completed;
    private final Timer idle;
    private final Timer duration;

    public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry) {
        this(delegate, registry, "instrumented-delegate-" + NAME_COUNTER.incrementAndGet());
    }

    public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry, String name) {
        this.delegate = delegate;
        this.submitted = registry.meter(MetricRegistry.name(name, new String[]{"submitted"}));
        this.running = registry.counter(MetricRegistry.name(name, new String[]{"running"}));
        this.completed = registry.meter(MetricRegistry.name(name, new String[]{"completed"}));
        this.idle = registry.timer(MetricRegistry.name(name, new String[]{"idle"}));
        this.duration = registry.timer(MetricRegistry.name(name, new String[]{"duration"}));
    }
    //...
    private class InstrumentedRunnable implements Runnable {
        private final Runnable task;
        private final Timer.Context idleContext;

        InstrumentedRunnable(Runnable task) {
            this.task = task;
            this.idleContext = idle.time();
        }

        @Override
        public void run() {
            idleContext.stop();
            running.inc();
            try (Timer.Context durationContext = duration.time()) {
                task.run();
            } finally {
                running.dec();
                completed.mark();
            }
        }
    }
}

它通過裝飾器模式對原來的 Executor Service 進行包裝,記錄了 submited, running, completed, idle , duration 這些指標饥努,我們可以另外再記錄一些指標捡鱼,部分代碼如下:

1) 先定義一個線程池參數(shù)對象

package com.github.walterfan.helloconcurrency;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;

import java.time.Duration;


/**
 * @Author: Walter Fan
 **/
@Getter
@Setter
@Builder
public class ThreadPoolParam {
    private int minPoolSize;
    private int maxPoolSize;
    private Duration keepAliveTime;
    private int queueSize;
    private String threadPrefix;
    private boolean daemon;
    private MetricRegistry metricRegistry;

}

2) 再寫一個創(chuàng)建線程池的工具類:

  • ThreadPoolUtil.java

package com.github.walterfan.helloconcurrency;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.Meter;

import com.codahale.metrics.MetricRegistry;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import java.util.function.Supplier;

import static com.codahale.metrics.MetricRegistry.name;

/**
 * @Author: Walter Fan
 **/
@Slf4j
public class ThreadPoolUtil {
    /*
    和系統(tǒng)內(nèi)置的 ThreadPoolExecutor.CallerRunsPolicy 差不多,
    如果被拒絕酷愧,就用提交任務的線程來執(zhí)行任務.
    */
    public static class DiscardAndLogPolicy implements RejectedExecutionHandler {
        final MetricRegistry metricRegistry;
        final Meter rejectedMeter;
        final Counter rejectedCounter;

        public DiscardAndLogPolicy(String threadPrefix, MetricRegistry metricRegistry) {
            this.metricRegistry = metricRegistry;
            this.rejectedMeter =  metricRegistry.meter(threadPrefix + ".rejected-meter");
            this.rejectedCounter = metricRegistry.counter(threadPrefix + ".rejected-counter");
        }


        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            rejectedMeter.mark();
            rejectedCounter.inc();
            if (!e.isShutdown()) {
                log.warn("reject task and run {} directly", r);
                r.run();

            }
        }
    }

    //創(chuàng)建線程執(zhí)行器驾诈,注冊了幾個度量指標
    public static ThreadPoolExecutor createThreadExecutor(ThreadPoolParam threadPoolParam) {
        MetricRegistry metricRegistry = threadPoolParam.getMetricRegistry();

        metricRegistry.register(threadPoolParam.getThreadPrefix() + ".min", createIntGauge(() -> threadPoolParam.getMinPoolSize()));
        metricRegistry.register(threadPoolParam.getThreadPrefix() + ".max", createIntGauge(() -> threadPoolParam.getMaxPoolSize()));
        metricRegistry.register(threadPoolParam.getThreadPrefix() + ".queue_limitation", createIntGauge(() -> threadPoolParam.getQueueSize()));


        ThreadPoolExecutor executor = new ThreadPoolExecutor(threadPoolParam.getMinPoolSize(),
                threadPoolParam.getMaxPoolSize(),
                threadPoolParam.getKeepAliveTime().toMillis(),
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(threadPoolParam.getQueueSize()),
                createThreadFactory(threadPoolParam),
                createRejectedExecutionHandler(threadPoolParam));

        metricRegistry.register(threadPoolParam.getThreadPrefix() + ".pool_size", createIntGauge(() -> executor.getPoolSize()));
        metricRegistry.register(threadPoolParam.getThreadPrefix() + ".queue_size", createIntGauge(() -> executor.getQueue().size()));
        return executor;
    }

    //創(chuàng)建線程執(zhí)行服務,用 InstrumentedExecutorService 來包裝和度量線程任務
    public static ExecutorService createExecutorService(ThreadPoolParam threadPoolParam) {
        ThreadPoolExecutor executor = createThreadExecutor(threadPoolParam);

        return new InstrumentedExecutorService(executor,
                threadPoolParam.getMetricRegistry(),
                threadPoolParam.getThreadPrefix());
    }

    private static Gauge<Integer> createIntGauge(Supplier<Integer> suppier) {
        return () -> suppier.get();
    }

    public static ThreadFactory createThreadFactory(ThreadPoolParam threadPoolParam) {
        return new ThreadFactoryBuilder()
                .setDaemon(threadPoolParam.isDaemon())
                .setNameFormat(threadPoolParam.getThreadPrefix() + "-%d")
                .build();
    }

    public static RejectedExecutionHandler createRejectedExecutionHandler(ThreadPoolParam threadPoolParam) {
        return new DiscardAndLogPolicy(threadPoolParam.getThreadPrefix(), threadPoolParam.getMetricRegistry());
    }
}

注意: 我們在這個線程池中埋設了12個度量指標溶浴,看你能不能在代碼中找出來設置的地方 乍迄。

  1. cards-thread-pool.completed
  2. cards-thread-pool.max
  3. cards-thread-pool.queue_limitation
  4. cards-thread-pool.rejected-meter
  5. cards-thread-pool.duration
  6. cards-thread-pool.min
  7. cards-thread-pool.queue_size
  8. cards-thread-pool.running
  9. cards-thread-pool.idle
  10. cards-thread-pool.pool_size
  11. cards-thread-pool.rejected-counter
  12. cards-thread-pool.submitted

3)用線程池執(zhí)行多副撲克牌的排序任務

以我們最常用的打撲克牌為例,分別用冒泡排序士败,插入排序和 JDK 自帶的 TimSort 來對若干副牌排序闯两,總共創(chuàng)建20個任務,都放入線程池中執(zhí)行谅将,當我們采用不同的線程池參數(shù)時漾狼,效果大不相同。

3.1) 撲克牌對象類

  • Poker.java
package com.github.walterfan.helloconcurrency;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @Author: Walter Fan
 **/
public class Poker {
    public static class Card {
        enum Suite {
            Spades(4), Hearts(3), Clubs(2), Diamonds(1);
            int value;

            Suite(int value) {
                this.value = value;

            }

            private static Map<Integer, Suite> valueMap = new HashMap<>();

            static {
                for (Suite suite : Suite.values()) {
                    valueMap.put(suite.value, suite);
                }
            }

            public static Suite valueOf(int pageType) {
                return valueMap.get(pageType);
            }

        }
        Suite suite;
        //1~13
        int point;

        public Card(int suiteValue, int point) {
            this.suite = Suite.valueOf(suiteValue);
            this.point = point;
        }

        public String toString() {
            String strPoint = Integer.toString(point);
            if (point > 10) {
                switch (point) {
                    case 11:
                        strPoint = "J";
                        break;
                    case 12:
                        strPoint = "Q";
                        break;
                    case 13:
                        strPoint = "K";
                        break;

                }
            }

            return suite.name() + ":" + strPoint;
        }

        public int getScore() {
            return suite.value * 100 + point;
        }
    }




    public static List<Card> createCardList(int suiteCount) {
        List<Card> cards = new ArrayList<>(52);
        for(int i = 1; i < 5; i++) {
            for(int j = 1; j < 14 ;++j) {
                cards.add(new Card(i, j));
            }
        }

        List<Card> totalCards = new ArrayList<>(suiteCount );

        for(int j = 0; j < suiteCount; j++) {
            totalCards.addAll(new ArrayList<>(cards));
        }

        Collections.shuffle(totalCards);
        return totalCards;
    }

    public static class CardComparator implements Comparator<Card> {

        @Override
        public int compare(Card o1, Card o2) {
            return o1.getScore() - o2.getScore();
        }
    }

}


3.2) 排序任務類

任務很簡單戏自,就象我們平常打撲克那樣邦投,將幾副牌排序,可用三種排序方法
1)冒泡排序
2)插入排序
3)Tim 排序擅笔,JDK7 中用的一種結合了插入排序和歸并排序的高效排序方法

  • SortCardTask.java
package com.github.walterfan.helloconcurrency;

import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

/**
 * @Author: Walter Fan
 **/
@Slf4j
public class SortCardTask implements Callable<Long> {
    public enum SortMethod { BUBBLE_SORT, INSERT_SORT, TIM_SORT}
    private final List<Poker.Card> cards;
    private final SortMethod sortMethod;
    private final int taskNumber;

    private final AtomicInteger taskCounter;

    public SortCardTask(List<Poker.Card> cards, SortMethod method, int taskNumber, AtomicInteger taskCounter) {
        this.cards = cards;
        this.sortMethod = method;
        this.taskNumber = taskNumber;
        this.taskCounter = taskCounter;
    }

    @Override
    public Long call() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        log.info("* {} begin to sort {} cards by {}", this.taskNumber, cards.size(), sortMethod);
        switch(sortMethod) {
            case BUBBLE_SORT:
                bubbleSort(cards, new Poker.CardComparator());
                break;
            case INSERT_SORT:
                insertSort(cards, new Poker.CardComparator());
                break;
            case TIM_SORT:
                timSort(cards, new Poker.CardComparator());
                break;
        }

        stopwatch.stop();

        long millis = stopwatch.elapsed(MILLISECONDS);
        log.info("* {} end to sort {} cards sort by {} spend {} milliseconds - {}" , this.taskNumber, cards.size(), sortMethod, millis, stopwatch); // formatted string like "12.3 ms"
        taskCounter.incrementAndGet();
        return millis;
    }

    public static <T> void bubbleSort(List<T> aList, Comparator<T> comparator) {
        boolean sorted = false;
        int loopCount = aList.size() - 1;
        while (!sorted) {
            sorted = true;
            for (int i = 0; i < loopCount; i++) {
                if (comparator.compare(aList.get(i), aList.get(i + 1)) > 0) {
                    Collections.swap(aList, i, i + 1);
                    sorted = false;
                }
            }
        }
    }

    public static <T> void insertSort(List<T> aList, Comparator<T> comparator) {
        int size = aList.size();
        for (int i = 1; i < size; ++i) {
            T selected = aList.get(i);

            if (size < 10) {
                log.info("{} insert to {}", selected, aList.subList(0, i).stream().map(String::valueOf).collect(Collectors.joining(", ")));
            }

            int j = i - 1;
            //find a position for insert currentElement in the left sorted collection
            while (j >= 0 && comparator.compare(selected, aList.get(j)) < 0) {
                //it does not overwrite existed element because the j+1=i that is currentElement at beginging
                aList.set(j + 1, aList.get(j));
                j--;
            }
            aList.set(j + 1, selected);

        }
    }

    public static <T> void timSort(List<T> aList, Comparator<T> comparator) {
        aList.stream().sorted(comparator).collect(Collectors.toList());
    }

    @Override
    public String toString() {
        return "SortCardTask{" +
                "taskNumber=" + taskNumber +
                ", sortMethod=" + sortMethod +
                '}';
    }
}

3.3) 線程池演示類

  • ThreadPoolDemo.java
package com.github.walterfan.helloconcurrency;


import com.codahale.metrics.CsvReporter;
import com.codahale.metrics.Slf4jReporter;

import com.codahale.metrics.MetricRegistry;

import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Callable;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author: Walter Fan
 **/
@Slf4j
public class ThreadPoolDemo {
    private static AtomicInteger finishCounter = new AtomicInteger(0);

    private AtomicInteger taskNumber = new AtomicInteger(0);

    private ExecutorService executorService;

    public ThreadPoolDemo(ThreadPoolParam threadPoolParam) {
        executorService = ThreadPoolUtil.createExecutorService(threadPoolParam);

    }

    public Callable<Long> createTask(int cardSuiteCount, SortCardTask.SortMethod method) {
        List<Poker.Card> cards = Poker.createCardList(cardSuiteCount);
        return new SortCardTask(cards, method, taskNumber.incrementAndGet(), finishCounter);

    }

    public List<Future<Long>> exeucteTasks(List<Callable<Long>> tasks, Duration waitTime)  {
        try {
            return this.executorService.invokeAll(tasks, waitTime.toMillis(), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.warn("invokeAll interrupt", e);
            return Collections.emptyList();
        }
    }

    public void waitUntil(long ms) {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(ms, TimeUnit.MILLISECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException ex) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) {
        Stopwatch stopwatch = Stopwatch.createStarted();
        log.info("--- start ---");
        MetricRegistry metricRegistry = new MetricRegistry();

        final CsvReporter csvReporter = CsvReporter.forRegistry(metricRegistry)
                .formatFor(Locale.US)
                .convertRatesTo(TimeUnit.SECONDS)
                .convertDurationsTo(TimeUnit.MILLISECONDS)
                .build(new File("./"));
        csvReporter.start(100, TimeUnit.MILLISECONDS);

/*        final Slf4jReporter logReporter = Slf4jReporter.forRegistry(metricRegistry)
                .outputTo(log)
                .convertRatesTo(TimeUnit.SECONDS)
                .convertDurationsTo(TimeUnit.MILLISECONDS)
                .build();

        logReporter.start(1, TimeUnit.MINUTES);*/

        ThreadPoolParam threadPoolParam = ThreadPoolParam.builder()
                .minPoolSize(1)
                .maxPoolSize(4)
                .daemon(true)
                .keepAliveTime(Duration.ofSeconds(1))
                .queueSize(5)
                .threadPrefix("cards-thread-pool")
                .metricRegistry(metricRegistry)
                .build();

        ThreadPoolDemo demo = new ThreadPoolDemo(threadPoolParam);
        List<Callable<Long>> tasks = new ArrayList<>();
        //30 tasks, 10, 40, 90 ... 1000 suite cards
        for(int i=1; i<=10; i++) {
            tasks.add(demo.createTask(i*i*10, SortCardTask.SortMethod.BUBBLE_SORT));
            tasks.add(demo.createTask(i*i*10, SortCardTask.SortMethod.INSERT_SORT));
            tasks.add(demo.createTask(i*i*10, SortCardTask.SortMethod.TIM_SORT));
        }



        List<Future<Long>> results = demo.exeucteTasks(tasks, Duration.ofMinutes(1));

        //logReporter.report();
        stopwatch.stop();
        log.info("--- end finish {}, spent {} ---", finishCounter.get(), stopwatch);
        results.stream().filter(x -> !x.isDone()).forEach(x -> log.info("{} is not done", x));


    }
}

上述代碼讓線程池執(zhí)行了30個排序任務志衣,最多排序1000副牌(52000張),
10任務用冒泡排序,10個任務用插入排序猛们,10個任務用 Tim 排序, 總共花了18秒多
執(zhí)行結果如下:


17:09:47.341 [main] INFO com.github.walterfan.helloconcurrency.ThreadPoolDemo - --- start ---
17:09:47.497 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 1 begin to sort 520 cards (10 suite) by BUBBLE_SORT
17:09:47.496 [main] WARN com.github.walterfan.helloconcurrency.ThreadPoolUtil - reject task and run java.util.concurrent.FutureTask@61baa894 directly
17:09:47.497 [cards-thread-pool-1] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 7 begin to sort 4680 cards (90 suite) by BUBBLE_SORT
17:09:47.497 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 9 begin to sort 4680 cards (90 suite) by TIM_SORT
17:09:47.497 [cards-thread-pool-2] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 8 begin to sort 4680 cards (90 suite) by INSERT_SORT
17:09:47.498 [main] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 10 begin to sort 8320 cards (160 suite) by BUBBLE_SORT
17:09:47.515 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 9 end to sort 4680 cards (90 suite)sort by TIM_SORT spend 17 milliseconds - 17.70 ms
17:09:47.520 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 2 begin to sort 520 cards (10 suite) by INSERT_SORT
17:09:47.520 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 1 end to sort 520 cards (10 suite)sort by BUBBLE_SORT spend 22 milliseconds - 22.91 ms
17:09:47.521 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 3 begin to sort 520 cards (10 suite) by TIM_SORT
17:09:47.522 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 3 end to sort 520 cards (10 suite)sort by TIM_SORT spend 0 milliseconds - 528.3 μs
17:09:47.522 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 4 begin to sort 2080 cards (40 suite) by BUBBLE_SORT
17:09:47.531 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 2 end to sort 520 cards (10 suite)sort by INSERT_SORT spend 10 milliseconds - 10.60 ms
17:09:47.531 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 5 begin to sort 2080 cards (40 suite) by INSERT_SORT
17:09:47.546 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 5 end to sort 2080 cards (40 suite)sort by INSERT_SORT spend 14 milliseconds - 14.85 ms
17:09:47.547 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 6 begin to sort 2080 cards (40 suite) by TIM_SORT
17:09:47.548 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 6 end to sort 2080 cards (40 suite)sort by TIM_SORT spend 1 milliseconds - 1.666 ms
17:09:47.560 [cards-thread-pool-2] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 8 end to sort 4680 cards (90 suite)sort by INSERT_SORT spend 63 milliseconds - 63.22 ms
17:09:47.579 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 4 end to sort 2080 cards (40 suite)sort by BUBBLE_SORT spend 57 milliseconds - 57.37 ms
17:09:47.715 [cards-thread-pool-1] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 7 end to sort 4680 cards (90 suite)sort by BUBBLE_SORT spend 218 milliseconds - 218.0 ms
17:09:48.171 [main] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 10 end to sort 8320 cards (160 suite)sort by BUBBLE_SORT spend 672 milliseconds - 672.3 ms
17:09:48.171 [main] WARN com.github.walterfan.helloconcurrency.ThreadPoolUtil - reject task and run java.util.concurrent.FutureTask@b065c63 directly
17:09:48.171 [main] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 16 begin to sort 18720 cards (360 suite) by BUBBLE_SORT
17:09:48.171 [cards-thread-pool-2] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 12 begin to sort 8320 cards (160 suite) by TIM_SORT
17:09:48.172 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 13 begin to sort 13000 cards (250 suite) by BUBBLE_SORT
17:09:48.171 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 11 begin to sort 8320 cards (160 suite) by INSERT_SORT
17:09:48.173 [cards-thread-pool-1] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 14 begin to sort 13000 cards (250 suite) by INSERT_SORT
17:09:48.178 [cards-thread-pool-2] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 12 end to sort 8320 cards (160 suite)sort by TIM_SORT spend 6 milliseconds - 6.314 ms
17:09:48.178 [cards-thread-pool-2] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 15 begin to sort 13000 cards (250 suite) by TIM_SORT
17:09:48.187 [cards-thread-pool-2] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 15 end to sort 13000 cards (250 suite)sort by TIM_SORT spend 8 milliseconds - 8.673 ms
17:09:48.228 [cards-thread-pool-3] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 11 end to sort 8320 cards (160 suite)sort by INSERT_SORT spend 56 milliseconds - 56.62 ms
17:09:48.333 [cards-thread-pool-1] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 14 end to sort 13000 cards (250 suite)sort by INSERT_SORT spend 159 milliseconds - 159.2 ms
17:09:49.595 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 13 end to sort 13000 cards (250 suite)sort by BUBBLE_SORT spend 1423 milliseconds - 1.424 s
17:09:50.520 [main] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 16 end to sort 18720 cards (360 suite)sort by BUBBLE_SORT spend 2348 milliseconds - 2.348 s
17:09:50.520 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 17 begin to sort 18720 cards (360 suite) by INSERT_SORT
17:09:50.520 [cards-thread-pool-4] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 22 begin to sort 33280 cards (640 suite) by BUBBLE_SORT
17:09:50.521 [main] WARN com.github.walterfan.helloconcurrency.ThreadPoolUtil - reject task and run java.util.concurrent.FutureTask@449b2d27 directly
17:09:50.521 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 24 begin to sort 33280 cards (640 suite) by TIM_SORT
17:09:50.521 [main] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 26 begin to sort 42120 cards (810 suite) by INSERT_SORT
17:09:50.521 [cards-thread-pool-6] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 25 begin to sort 42120 cards (810 suite) by BUBBLE_SORT
17:09:50.537 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 24 end to sort 33280 cards (640 suite)sort by TIM_SORT spend 16 milliseconds - 16.03 ms
17:09:50.537 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 18 begin to sort 18720 cards (360 suite) by TIM_SORT
17:09:50.545 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 18 end to sort 18720 cards (360 suite)sort by TIM_SORT spend 7 milliseconds - 7.866 ms
17:09:50.545 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 19 begin to sort 25480 cards (490 suite) by BUBBLE_SORT
17:09:50.772 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 17 end to sort 18720 cards (360 suite)sort by INSERT_SORT spend 251 milliseconds - 251.9 ms
17:09:50.772 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 20 begin to sort 25480 cards (490 suite) by INSERT_SORT
17:09:51.614 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 20 end to sort 25480 cards (490 suite)sort by INSERT_SORT spend 841 milliseconds - 841.2 ms
17:09:51.614 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 21 begin to sort 25480 cards (490 suite) by TIM_SORT
17:09:51.615 [main] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 26 end to sort 42120 cards (810 suite)sort by INSERT_SORT spend 1093 milliseconds - 1.094 s
17:09:51.642 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 21 end to sort 25480 cards (490 suite)sort by TIM_SORT spend 28 milliseconds - 28.11 ms
17:09:51.643 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 23 begin to sort 33280 cards (640 suite) by INSERT_SORT
17:09:52.308 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 23 end to sort 33280 cards (640 suite)sort by INSERT_SORT spend 664 milliseconds - 664.9 ms
17:09:52.308 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 27 begin to sort 42120 cards (810 suite) by TIM_SORT
17:09:52.318 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 27 end to sort 42120 cards (810 suite)sort by TIM_SORT spend 9 milliseconds - 9.679 ms
17:09:52.318 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 28 begin to sort 52000 cards (1000 suite) by BUBBLE_SORT
17:09:55.392 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 19 end to sort 25480 cards (490 suite)sort by BUBBLE_SORT spend 4846 milliseconds - 4.847 s
17:09:55.392 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 29 begin to sort 52000 cards (1000 suite) by INSERT_SORT
17:09:56.511 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 29 end to sort 52000 cards (1000 suite)sort by INSERT_SORT spend 1119 milliseconds - 1.119 s
17:09:56.512 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 30 begin to sort 52000 cards (1000 suite) by TIM_SORT
17:09:56.523 [cards-thread-pool-5] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 30 end to sort 52000 cards (1000 suite)sort by TIM_SORT spend 11 milliseconds - 11.68 ms
17:09:58.528 [cards-thread-pool-4] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 22 end to sort 33280 cards (640 suite)sort by BUBBLE_SORT spend 8007 milliseconds - 8.008 s
17:10:02.026 [cards-thread-pool-6] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 25 end to sort 42120 cards (810 suite)sort by BUBBLE_SORT spend 11504 milliseconds - 11.50 s
17:10:07.127 [cards-thread-pool-0] INFO com.github.walterfan.helloconcurrency.SortCardTask - * 28 end to sort 52000 cards (1000 suite)sort by BUBBLE_SORT spend 14808 milliseconds - 14.81 s
17:10:07.128 [main] INFO com.github.walterfan.helloconcurrency.ThreadPoolDemo - --- end finish 30, spent 19.79 s ---

我用 CsvReporter 把若干度量指標打印到Csv文件中念脯,共有如下12個 CSV 文件

  1. cards-thread-pool.completed.csv
  2. cards-thread-pool.max.csv
  3. cards-thread-pool.queue_limitation.csv
  4. cards-thread-pool.rejected-meter.csv
  5. cards-thread-pool.duration.csv
  6. cards-thread-pool.min.csv
  7. cards-thread-pool.queue_size.csv
  8. cards-thread-pool.running.csv
  9. cards-thread-pool.idle.csv
  10. cards-thread-pool.pool_size.csv
  11. cards-thread-pool.rejected-counter.csv
  12. cards-thread-pool.submitted.csv

基于這些度量指標,我們可以看到任務特點和線程池的參數(shù)是否合理

1) 線程任務執(zhí)行時間

看結果三種排序方法的效率差別很大弯淘,只排兩副牌時绿店,三種方法差不太多,而排序1000副牌(52000張)時, TimSort 花了大約11 毫秒假勿, InsertSort 花了大約 1 秒 借嗽,而 BubbleSort 花了14 秒多。

對于任務執(zhí)行時間转培,我們可以通過記錄的度量指標文件來作一個分析恶导,簡單畫一個線形圖,

  • csv 文件內(nèi)容 cards-thread-pool.duration.csv
t,count,max,mean,min,stddev,p50,p75,p95,p98,p99,p999,mean_rate,m1_rate,m5_rate,m15_rate,rate_unit,duration_unit
1585473845,0,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,calls/second,milliseconds
1585473846,0,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,calls/second,milliseconds
1585473846,8,63.069663,37.658003,2.579674,25.915099,39.007040,61.809917,63.069663,63.069663,63.069663,63.069663,33.614807,0.000000,0.000000,0.000000,calls/second,milliseconds
1585473846,8,63.069663,37.658003,2.579674,25.915099,39.007040,61.809917,63.069663,63.069663,63.069663,63.069663,23.777415,0.000000,0.000000,0.000000,calls/second,milliseconds
1585473846,8,63.069663,37.658003,2.579674,25.915099,39.007040,61.809917,63.069663,63.069663,63.069663,63.069663,18.448678,0.000000,0.000000,0.000000,calls/second,milliseconds
1585473846,9,384.357751,76.180197,2.579674,111.663094,58.345330,62.339820,384.357751,384.357751,384.357751,384.357751,16.884697,0.000000,0.000000,0.000000,calls/second,milliseconds
# 省略余下內(nèi)容
  • 分析任務執(zhí)行時間的 Python 腳本
import matplotlib.pyplot as plt
import pandas as pd

durations = pd.read_csv('cards-thread-pool.duration.csv')
print(durations.head(1))

plt.plot(durations['t'], durations['max'], label = 'max')
plt.plot(durations['t'], durations['mean'], label = 'mean')
plt.plot(durations['t'], durations['min'], label = 'min')



plt.ylabel("milliSeconds")
plt.xlabel("timestamp")
plt.legend(prop = {'size': 10}) 

plt.show()
排序任務時間

2) 線程池中的線程數(shù)變化

import matplotlib.pyplot as plt
import pandas as pd

durations = pd.read_csv('cards-thread-pool.pool_size.csv')
print(durations.head(1))

plt.plot(durations['t'], durations['value'], label = 'pool size')


plt.ylabel("thread count")
plt.xlabel("timestamp")
plt.legend(prop = {'size': 10}) 

plt.show()
線程池中的線程數(shù)

線程池的線程數(shù)應該比較平穩(wěn)浸须,避免頻繁的創(chuàng)建和銷毀線程惨寿,這張圖揭示如果系統(tǒng)資源足夠的話,corePoolSize, maxPoolSize 和 keepAliveTime 時間可以適當調(diào)大删窒。

線程池隊列長度

import matplotlib.pyplot as plt
import pandas as pd

durations = pd.read_csv('cards-thread-pool.queue_size.csv')
print(durations.head(1))

plt.plot(durations['t'], durations['value'], label = 'queue size')


plt.ylabel("queue size")
plt.xlabel("timestamp")
plt.legend(prop = {'size': 10}) 

plt.show()
線程池隊列長度

線程池拒絕的任務數(shù)

import matplotlib.pyplot as plt
import pandas as pd

durations = pd.read_csv('cards-thread-pool.rejected-counter.csv')
print(durations.head(1))

plt.plot(durations['t'], durations['count'], label = 'rejected count')


plt.ylabel("rejected count")
plt.xlabel("timestamp")
plt.legend(prop = {'size': 10}) 

plt.show()
被拒絕的任務數(shù)

基于被拒絕的任務數(shù)來看裂垦,顯然核心線程數(shù)和隊列長度應該增大。

在實際工作中肌索, ConsoleReporter, Slf4jReporter蕉拢,還是CsvReporter 這些 metrics-core 自帶的報告器都是定時采樣并打印度量指標,分析查詢很不方便驶社。

它們都是對線程池的度量指標定時采樣記錄企量,我們可以利用一些時間序列數(shù)據(jù)庫(例如 InfluxDB,Promethues 等)將這些指標保存起來亡电,再利用報表分析工具(Grafana, Graphite等)對它們進行分析。

完整源代碼參見 https://github.com/walterfan/helloworld/tree/master/helloconcurrency/src/main/java/com/github/walterfan/helloconcurrency

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
禁止轉載硅瞧,如需轉載請通過簡信或評論聯(lián)系作者份乒。
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市腕唧,隨后出現(xiàn)的幾起案子或辖,更是在濱河造成了極大的恐慌,老刑警劉巖枣接,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件颂暇,死亡現(xiàn)場離奇詭異,居然都是意外死亡但惶,警方通過查閱死者的電腦和手機耳鸯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來膀曾,“玉大人县爬,你說我怎么就攤上這事√硪辏” “怎么了财喳?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我耳高,道長扎瓶,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任泌枪,我火速辦了婚禮栗弟,結果婚禮上,老公的妹妹穿的比我還像新娘工闺。我一直安慰自己乍赫,他們只是感情好,可當我...
    茶點故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布陆蟆。 她就那樣靜靜地躺著雷厂,像睡著了一般。 火紅的嫁衣襯著肌膚如雪叠殷。 梳的紋絲不亂的頭發(fā)上改鲫,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天,我揣著相機與錄音林束,去河邊找鬼像棘。 笑死,一個胖子當著我的面吹牛壶冒,可吹牛的內(nèi)容都是我干的缕题。 我是一名探鬼主播,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼胖腾,長吁一口氣:“原來是場噩夢啊……” “哼烟零!你這毒婦竟也來了?” 一聲冷哼從身側響起咸作,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤锨阿,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后记罚,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體墅诡,經(jīng)...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年桐智,在試婚紗的時候發(fā)現(xiàn)自己被綠了末早。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,789評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡酵使,死狀恐怖荐吉,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情口渔,我是刑警寧澤样屠,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響痪欲,放射性物質(zhì)發(fā)生泄漏悦穿。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一业踢、第九天 我趴在偏房一處隱蔽的房頂上張望栗柒。 院中可真熱鬧,春花似錦知举、人聲如沸瞬沦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽逛钻。三九已至,卻和暖如春锰提,著一層夾襖步出監(jiān)牢的瞬間曙痘,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工立肘, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留边坤,地道東北人。 一個月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓谅年,卻偏偏與公主長得像茧痒,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子踢故,可洞房花燭夜當晚...
    茶點故事閱讀 43,697評論 2 351

推薦閱讀更多精彩內(nèi)容