介紹
ThreadPool 我們在開發(fā)過程中經(jīng)常使用跺嗽,java線程池的相關(guān)知識見
線程池相關(guān)文章
dubbo也不例外會使用線程池,見dubbo線程池
看完本文章主要學(xué)習(xí):
- dubbo的線程池是如何實現(xiàn)
- dubbo線程如何配置
- 我們自己是先線程池注意事項
使用方式
需要通過不同的派發(fā)策略和不同的線程池配置的組合來應(yīng)對不同的場景:
<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />
服務(wù)提供者
<dubbo:provider>標簽下配置threadpool 屬性羔味,默認是fixed
詳細說明
源碼位置
繼承關(guān)系
Cached:緩存線程池乡革,空閑一分鐘自動刪除高帖,需要時重建
Fixed:固定大小的線程池族沃,默認
Limited:可伸縮線程池误辑,但池中的線程數(shù)只會增長不會收縮茉兰。只增長不收縮的目的是為了避免收縮時突然來了大流量引起的性能問題尤泽。
ThreadPool
/**
* ThreadPool
*
* @author william.liangf
*/
@SPI("fixed")
public interface ThreadPool {
/**
* 線程池
*
* @param url 線程參數(shù)
* @return 線程池
*/
@Adaptive({Constants.THREADPOOL_KEY})
Executor getExecutor(URL url);
}
ThreadPool$Adaptive
啟動的時候創(chuàng)建的適配對象
注:Adaptive注解在類上和方法上面表達的意義不一樣
注解在類上:代表人工實現(xiàn),實現(xiàn)一個裝飾類(設(shè)計模式中的裝飾模式)
注解在方法上:代表自動生成和編譯一個動態(tài)的Adpative類,它主要是用于SPI坯约,因為spi的類是不固定熊咽、未知的擴展類,所以設(shè)計了動態(tài)$Adaptive類.
public class ThreadPool$Adaptive implements com.alibaba.dubbo.common.threadpool.ThreadPool {
@Override
public java.util.concurrent.Executor getExecutor(com.alibaba.dubbo.common.URL arg0) {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = url.getParameter("threadpool", "fixed");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.common.threadpool.ThreadPool) name from url(" + url.toString() + ") use keys([threadpool])");
ThreadPool extension = ExtensionLoader.getExtensionLoader(ThreadPool.class).getExtension(extName);
return extension.getExecutor(arg0);
}
}
FixedThreadPool
/**
* Creates a thread pool that reuses a fixed number of threads
* 固定線程池
*
* @see java.util.concurrent.Executors#newFixedThreadPool(int)
*/
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 線程名
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 固定線程數(shù)
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// 隊列大小
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 線程池執(zhí)行器
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
CachedThreadPool
/**
* This thread pool is self-tuned. Thread will be recycled after idle for one minute, and new thread will be created for
* the upcoming request.
* 緩存線程池闹丐,空閑一分鐘自動刪除横殴,需要時重建
*
* @see java.util.concurrent.Executors#newCachedThreadPool()
*/
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 線程池名稱
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 核心線程池個數(shù)
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// 最大線程池個數(shù)
int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
//隊列個數(shù)
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 存活時間,默認是毫秒數(shù)
int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
LimitedThreadPool
/**
* Creates a thread pool that creates new threads as needed until limits reaches. This thread pool will not shrink
* automatically.
* 可伸縮線程池妇智,但池中的線程數(shù)只會增長不會收縮滥玷。只增長不收縮的目的是為了避免收縮時突然來了大流量引起的性能問題。
*/
public class LimitedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 線程名
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 核心線程池個數(shù)
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
// 最大線程池個數(shù)
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// 隊列大小
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 線程池執(zhí)行器
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
AbortPolicyWithReport
帶打印異常堆棧的拒絕策略
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
// 將異常拋出去
throw new RejectedExecutionException(msg);
}
private void dumpJStack() {
long now = System.currentTimeMillis();
//dump every 10 minutes 10分鐘打印一次
if (now - lastPrintTime < 10 * 60 * 1000) {
return;
}
if (!guard.tryAcquire()) {
return;
}
// 異步打印
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
//獲取打印的目錄
String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home"));
SimpleDateFormat sdf;
//系統(tǒng)
String OS = System.getProperty("os.name").toLowerCase();
// window system don't support ":" in file name
if(OS.contains("win")){
sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
}else {
sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
}
String dateStr = sdf.format(new Date());
FileOutputStream jstackStream = null;
try {
jstackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr));
// 打印 JStack
JVMUtil.jstack(jstackStream);
} catch (Throwable t) {
logger.error("dump jstack error", t);
} finally {
guard.release();
if (jstackStream != null) {
try {
jstackStream.flush();
jstackStream.close();
} catch (IOException e) {
}
}
}
lastPrintTime = System.currentTimeMillis();
}
});
}
總結(jié)
dubbo客戶端使用的是 shared類型的線程池
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException{
super(url, wrapChannelHandler(url, handler));
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
return ChannelHandlers.wrap(handler, url);
}
public static final String DEFAULT_CLIENT_THREADPOOL = "shared";
服務(wù)提供者使用的是fixed方式的線程池