ExtensionLoader的使用
Dubbo中隨處可見這樣的代碼:
ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
這段代碼非常重要,了解它的實現是掌握dubbo源碼的重中之重络断,否則很多地方的源碼會不知其所以然,接下來以獲取ThreadPool為例答捕,講解dubbo如何通過ExtensionLoader根據url獲取具體的ThreadPool祷肯;
Provider&ThreadPool
Dubbo服務端接收到所有ChannelState類型的請求,都在com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler中處理国拇,以Provider端處理接收到的請求為例昙篙,處理的代碼如下:
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
getExecutorService()就是獲取連接池ExecutorService的秘豹;對應的就是父類WrappedChannelHandler中的申明
protected final ExecutorService executor
携御;在new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)時,調用父類WrappedChannelHandler的構造方法時初始化既绕,源碼如下:
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
componentKey = Constants.CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
}
從這段代碼可知啄刹,獲取ExecutorService的代碼就是ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
,接下來我們看看dubbo如何通過這一行代碼獲取具體的ThreadPool;
ExtensionLoader
這個類非常重要凄贩,幾乎貫穿在dubbo中每一個模塊誓军,例如Protocol,LoadBalance等疲扎;因為dubbo采用SPI微內核實現方式昵时,所以外部可以從容擴展自定義實現,例如自定義實現一種線程池AfeiThreadPool椒丧,而dubbo決定具體采用哪個實現類壹甥,都在這個類中決定,ExtensionLoader主要有兩大功能:1壶熏、獲取ExtensionLoader實例
句柠,2、獲取Adaptive實例
棒假。調用方得到Adaptive實例后就能調用具體業(yè)務實現溯职,例如FixedThreadPool,下面的分析以獲取ThreadPool的實現為例帽哑;
1. 獲取ExtensionLoader實例
首先根據ThreadPool得到ExtensionLoader谜酒,實現源碼如下:
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null)
throw new IllegalArgumentException("Extension type == null");
// 獲取ExtensionLoader時參數即ThreadPool必須是接口類型
if(!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
// 獲取ExtensionLoader時參數即ThreadPool必須有SPI注解,否則不可以擴展
if(!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type +
") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
}
// 將接口類型與其對應的ExtensionLoader本地緩存到`ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS`中
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
2. 獲取Adaptive實例
得到ExtensionLoader后祝拯,再得到具體的Adaptive實例甚带;其中cachedAdaptiveInstance定義為:Holder<Object> cachedAdaptiveInstance
她肯,如果本地沒有緩存實例,那么調用createAdaptiveExtension()獲取鹰贵,源碼如下:
public T getAdaptiveExtension() {
// 先從本地緩存中查詢能否得到Adaptive實例
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
// double-check方式保證線程安全(單實例的一種實現方式)
if(createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
// 獲取實例
instance = createAdaptiveExtension();
// 將實例本地緩存
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
}
}
}
}
else {
throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}
return (T) instance;
}
Hold申明如下晴氨,用volatile來保證數據的一致性:
/**
* Helper Class for hold a value.
*
* @author william.liangf
*/
public class Holder<T> {
private volatile T value;
public void set(T value) {
this.value = value;
}
public T get() {
return value;
}
}
如果本地還沒有緩存實例,那么調用createAdaptiveExtension()獲取Adaptive實例碉输,源碼如下:
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
}
}
getAdaptiveExtensionClass()源碼如下籽前,先調用getExtensionClasses();,如果不能得到Adaptive類敷钾,那么調用createAdaptiveExtensionClass()獲取Adaptive類枝哄;
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
接下來我們看getExtensionClasses()方法中如何得到Adaptive擴展類,其主要分為如下幾個步驟:
- 得到ThreadPool接口上申明的SPI注解的值("fixed")阻荒,就是默認名稱cachedDefaultName 挠锥;
- 遍歷類路徑下三個目錄("META-INF/dubbo/internal","META-INF/dubbo/"侨赡,"META-INF/services/")蓖租,查找所有命名為type.getName()即com.alibaba.dubbo.common.threadpool.ThreadPool的文件,得到文件中所有申明的實現集合羊壹;
- 查找有Adaptive注解的類蓖宦,就是要找的Adaptive類;
- 如果沒有Adaptive類油猫,那么嘗試能否通過Wrapper方式獲取類稠茂,如果可以獲取,那么緩存到
Set<Class<?>> cachedWrapperClasses;
中情妖;- 把com.alibaba.dubbo.common.threadpool.ThreadPool文件中得到的所有類緩存到
Holder<Map<String, Class<?>>> cachedClasses
中睬关;
private Map<String, Class<?>> loadExtensionClasses() {
// 如果入參類型申明了SPI注解,那么SPI注解中的value就是默認實現名稱毡证,即cachedDefaultName (SPI注解中value的值只能有1個共螺,超過1個會拋出異常);
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if(defaultAnnotation != null) {
String value = defaultAnnotation.value();
if(value != null && (value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if(names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if(names.length == 1) cachedDefaultName = names[0];
}
}
// 遍歷類路徑下的(DUBBO_INTERNAL_DIRECTORY情竹,DUBBO_DIRECTORY藐不,SERVICES_DIRECTORY)三個目錄,將符合type類型的所有實現存到Map中秦效;
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadFile(extensionClasses, DUBBO_DIRECTORY);
loadFile(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}
2.2 創(chuàng)建Adaptive擴展類
由于getExtensionClasses()中不能得到ThreadPool類型的Adaptive類(帶有
Adaptive
注解)雏蛮,所以需要通過createAdaptiveExtensionClass()創(chuàng)建Adaptive擴展類,由源碼可知阱州,其創(chuàng)建Adaptive擴展類分為如下幾個步驟:
- 通過createAdaptiveExtensionClassCode()得到用于生成Adaptive擴展類的源代碼code
- 得到類加載器ClassLoader
- 通過擴展機制得到編譯器(默認用JavassistCompiler)將擴展類源代碼code編譯成字節(jié)碼從而得到Adaptive類
private Class<?> createAdaptiveExtensionClass() {
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
通過DEBUG方式得到ThreadPool的Adaptive擴展類的源代碼如下挑秉,由下面的源碼可知獲取ThreadPool類型的Adaptive類的步驟:
- 判斷請求URL中是否申明threadpool,如果沒有申明threadpool苔货,那么默認為fixed犀概,即擴展名稱立哑;
- 根據擴展名稱調用
ExtensionLoader.getExtension(extName)
得到具體的實現類
從用于生成Adaptive擴展類的源碼可以得知,URL中對threadpool申明的優(yōu)先級要高于SPI注解申明姻灶;所以如果我們想修改默認的ThreadPool實現方式铛绰,可以通過URL中指定threadpool;
package com.alibaba.dubbo.common.threadpool;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ThreadPool$Adpative implements com.alibaba.dubbo.common.threadpool.ThreadPool {
public java.util.concurrent.Executor getExecutor(com.alibaba.dubbo.common.URL arg0) {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = url.getParameter("threadpool", "fixed");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.common.threadpool.ThreadPool) name from url(" + url.toString() + ") use keys([threadpool])");
com.alibaba.dubbo.common.threadpool.ThreadPool extension = (com.alibaba.dubbo.common.threadpool.ThreadPool)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.threadpool.ThreadPool.class).getExtension(extName);
return extension.getExecutor(arg0);
}
}
創(chuàng)建Adaptive擴展類后产喉,看.getExtension(String)
方法如何得到具體的實現類捂掰,實現源碼如下,即首先嘗試從本地緩存cachedInstances
中獲取曾沈,如果獲取不到这嚣,那么調用createExtension(name)
創(chuàng)建擴展類實例并本地緩存;這里dubbo的作者又用到了double-check方式保證線程安全實現單實例塞俱;
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) {
return getDefaultExtension();
}
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
createExtension(String name)部分實現源碼如下姐帚,由于前面已經調用了getExtensionClasses()得到所有ThreadPool類型并本地緩存在Holder<Map<String, Class<?>>> cachedClasses
中,cachedClasses.get()得到保存的數據如下:
fixed=com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool
cached=com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool
limited=com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool
根據name的值就能夠取得ThreadPool的具體實現類障涯,例如默認fixed對應的具體實現類就是FixedThreadPool卧土;如果設置threadpool="limited",那么對應的具體實現類就是LimitedThreadPool像樊;如果擴展了自定義AfeiThreadPool并在META-INF/dubbo/com.alibaba.dubbo.common.threadpool.ThreadPool文件中申明afei=com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool,那么設置threadpool="afei"旅敷,就能得到自定義的AfeiThreadPool:
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
... ...
}
配置ThreadPool
得到具體的ThreadPool實現后生棍,dubbo根據url調整ThreadPoolExecutor,以FixedThreadPool為例(dubbo provider端默認采用的連接池處理方式)媳谁,源碼如下:
public class FixedThreadPool implements ThreadPool {
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
- 如果url中配置了線程名稱涂滴,那么進行設置,否則基于Constants.DEFAULT_THREAD_NAME得到默認線程名稱;
- 如果url中配置了線程數量晴音,那么進行設置柔纵,否則用默認線程數量:Constants.DEFAULT_THREADS,即默認200個線程锤躁;
- 如果url中配置了線程隊列搁料,那么進行設置,否則用默認線程隊列:Constants.DEFAULT_QUEUES系羞,即默認為0郭计;
例如我們可以在provider.xml中對線程池做如下自定義:
<dubbo:protocol threadpool="fixed" threads="288" queues="10"/>