異步并行加載框架
一父泳、 需求
隨著業(yè)務(wù)越來(lái)越復(fù)雜澈吨,對(duì)應(yīng)的代碼也越來(lái)越復(fù)雜,耗時(shí)也越來(lái)越多纲熏,因此急需一套并行框架妆丘,通過(guò)搜索發(fā)現(xiàn)阿里提供了一個(gè)并行框架asyncLoad(https://github.com/alibaba/asyncload.git),但是此框架不支持注解的方式局劲,使的對(duì)應(yīng)的代碼侵入性很大勺拣,所以對(duì)asyncLoad框架進(jìn)行擴(kuò)展,使其能夠支持對(duì)應(yīng)的注解配置的方式來(lái)進(jìn)行異步并行鱼填。</pre>
二药有、 實(shí)現(xiàn)原理
1. 異步并行基本實(shí)現(xiàn)方式:
- 原理:基本就是線程池 + Future 的結(jié)合來(lái)實(shí)現(xiàn);
- 優(yōu)點(diǎn):開(kāi)發(fā)靈活
- 缺點(diǎn):需要開(kāi)發(fā)人員了解整個(gè)并行框架苹丸,且對(duì)代碼有侵入愤惰,而且對(duì)應(yīng)的返回結(jié)果是Future<對(duì)象> 和本身原來(lái)的返回結(jié)果<對(duì)象> 不一樣,那么在寫(xiě)代碼的時(shí)候就要考慮這些差異赘理;
為了解決異步并行基本實(shí)現(xiàn)方式的缺點(diǎn)宦言,就是不需要開(kāi)發(fā)人員為了實(shí)現(xiàn)異步并行使原來(lái)代碼實(shí)現(xiàn)的方式要進(jìn)行改變,因此需要另外一種異步并行更通用的實(shí)現(xiàn)方式商模,如asyncLoad這種;
2. 異步并行通用實(shí)現(xiàn)方式:
- 原理:線程池 + Future + Cglib 結(jié)合的方式來(lái)實(shí)現(xiàn)
- 優(yōu)點(diǎn):正好是異步并行基本實(shí)現(xiàn)方式對(duì)應(yīng)的缺點(diǎn)奠旺,不需要基礎(chǔ)開(kāi)發(fā)人員了解更多異步并行實(shí)現(xiàn)方式,可以讓基礎(chǔ)開(kāi)發(fā)人員還是按照原來(lái)開(kāi)發(fā)串行執(zhí)行代碼一樣進(jìn)行開(kāi)發(fā)施流,唯一的不同就是在需要異步并行執(zhí)行的方法上增加對(duì)應(yīng)的配置(xml响疚,注解等方式);
- 缺點(diǎn):cglib本身的方式帶來(lái)的缺點(diǎn)(通過(guò)實(shí)現(xiàn)對(duì)應(yīng)目標(biāo)類的子類來(lái)實(shí)現(xiàn)動(dòng)態(tài)代理,并且可以在生成子類的時(shí)候在對(duì)應(yīng)方法上進(jìn)行攔截瞪醋,增強(qiáng)子類方法的功能)忿晕,但是這種方式的代理本身不支持final類(因?yàn)閒inal類不支持生成對(duì)應(yīng)的子類),因此對(duì)應(yīng)的像基元類型就不支持银受;靈活性不如基礎(chǔ)的好践盼;
三、 增加注解配置
1. 設(shè)計(jì)
Asyncload原先整體設(shè)計(jì)對(duì)應(yīng)的類圖如下:
AsyncLoad 注解對(duì)應(yīng)的類圖如下:
2. 實(shí)現(xiàn)
2.1類功能簡(jiǎn)介
- 注解@AsyncClassDef
@Documented
@Retention(RUNTIME)
@Target({ TYPE })
@Inherited
/**
* @author yujiakui
*
* 下午3:06:31
*
*/
public @interface AsyncClassDef {
/**
* 異步方法列表
*
* @return
*/
AsyncMethodDef[] asyncMethods() default {};
/**
* 類級(jí)別線程池配置
*
* @return
*/
AsyncThreadPoolConfig classThreadPoolConf() default @AsyncThreadPoolConfig;
}
注解@AsyncClassDef表示的是對(duì)應(yīng)這個(gè)類需要對(duì)應(yīng)的異步宾巍,但是其中的屬性可以過(guò)濾指定的方法能夠進(jìn)行異步并行處理宏侍,如下是其對(duì)應(yīng)的屬性:- 注解@AsyncMethodDef
@Documented
@Retention(RUNTIME)
@Target({ METHOD, ElementType.ANNOTATION_TYPE })
/**
* @author yujiakui
*
* 下午3:02:52
*
* 異步并行方法對(duì)應(yīng)的注解
*
*/
public @interface AsyncMethodDef {
/**
* 方法匹配對(duì)應(yīng)的正則PatternMatchUtils
*
* 注意:將這個(gè)注解放在方法上則這個(gè)對(duì)應(yīng)的methodMatchRegex將不起作用,因?yàn)榇藭r(shí)就是對(duì)應(yīng)這個(gè)方法
*
* @return
*/
String[] methodMatchRegex() default {};
/**
* 排除方法匹配模式
*
* 注意:將這個(gè)注解放在方法上則這個(gè)對(duì)應(yīng)的methodMatchRegex將不起作用蜀漆,因?yàn)榇藭r(shí)就是對(duì)應(yīng)這個(gè)方法
*
* @return
*/
String[] excludeMethodMatchRegex() default {};
/**
* 默認(rèn)超時(shí)時(shí)間
*
* @return
*/
long timeout() default 1000;
/**
* 開(kāi)啟的異步線程池中的對(duì)應(yīng)執(zhí)行的線程是否繼承當(dāng)前線程的threadLocal谅河,默認(rèn)不繼承
*
* @return
*/
boolean inheritThreadLocal() default false;
/**
* 方法線程池配置
*
* @return
*/
AsyncThreadPoolConfig methodThreadPoolConf() default @AsyncThreadPoolConfig;
}
注解@AsyncMethodDef表示的是對(duì)應(yīng)的方法是可以進(jìn)行異步并行處理,并且可以指定對(duì)應(yīng)方法的線程池信息(注意:這個(gè)線程池信息可以覆蓋類上對(duì)應(yīng)的線程池信息,類上又可以覆蓋全局線程池信息绷耍,對(duì)應(yīng)的優(yōu)先級(jí)如下 方法上 > 類上 > 全局的)吐限,下表是對(duì)應(yīng)注解的屬性:- 注解@AsyncThreadPoolConfig
@Documented
@Retention(RUNTIME)
@Target({ ElementType.ANNOTATION_TYPE })
/**
* @author yujiakui
*
* 下午3:43:29
*
* 異步線程池配置
*/
public @interface AsyncThreadPoolConfig {
/**
* 是否生效,默認(rèn)為不生效(如果方法上的線程池不生效褂始,則找類上面的诸典,如果類上面的也不生效,則只有默認(rèn)全局的)
*
* @return
*/
boolean effect() default false;
/**
* 線程池核心線程數(shù)和最大線程數(shù)的大小崎苗,默認(rèn)是20
*
* @return
*/
int poolSize() default 20;
/**
* 隊(duì)列大小狐粱,默認(rèn)是100
*
* @return
*/
int queueSize() default 100;
/**
* 線程池拒絕處理策略
*
* @return
*/
PoolRejectHandleMode rejectPolicy() default PoolRejectHandleMode.CALLERRUN;
}
注解@AsyncThreadPoolConfig表示的線程池對(duì)應(yīng)的配置屬性信息,具體屬性信息如下表所示:其中對(duì)應(yīng)的PoolRejectHandleMode對(duì)象是一個(gè)枚舉類型胆数,目前主要提供了兩種類型:REJECT(線程池隊(duì)列滿了之后再來(lái)請(qǐng)求直接拒絕)和CALLERRUN(用于被拒絕任務(wù)的處理程序肌蜻,它直接在 execute 方法的調(diào)用線程中運(yùn)行被拒絕的任務(wù);如果執(zhí)行程序已關(guān)閉必尼,則會(huì)丟棄該任務(wù))
- 注解@EnableAsyncClass
@Documented
@Retention(RUNTIME)
@Target({ TYPE })
@Inherited
/**
* @author yujiakui
*
* 上午11:35:46
*
*/
public @interface EnableAsyncClass {
/**
* 異步并行類方法信息列表
*
* @return
*/
EnableAsyncClassMethodInfo[] classMethodInfos() default { @EnableAsyncClassMethodInfo };
}
注解@EnableAsyncClass表示的是對(duì)應(yīng)的類開(kāi)啟對(duì)應(yīng)的異步并行蒋搜,也就是只要被這個(gè)類調(diào)用的方法,只要被調(diào)用方法支持對(duì)應(yīng)的異步并行調(diào)用判莉,在這個(gè)類中就可以被異步并行調(diào)用了豆挽,對(duì)應(yīng)的屬性如下表所示:- 注解@EnableAsyncClassMethodInfo
@Documented
@Retention(RUNTIME)
@Target(TYPE_USE)
/**
* @author yujiakui
*
* 上午11:44:26
*
* 開(kāi)啟異步并行類方法信息
*/
public @interface EnableAsyncClassMethodInfo {
/**
* 對(duì)應(yīng)類的全名,默認(rèn)是ALL券盅,就是全部(即是標(biāo)記了異步并行定義的類)
*
* @return
*/
String classFullName() default AsyncLoadAnnotationConstants.ALL_CLASSES;
/**
* 默認(rèn)全部標(biāo)記定義了所有異步并行加載的方法
*
* @return
*/
String[] methodMatchRegex() default { AsyncLoadAnnotationConstants.ALL_METHODS };
}
注解@EnableAsyncClassMethodInfo表示的是開(kāi)啟異步并行調(diào)用對(duì)應(yīng)的類和方法信息帮哈,就是開(kāi)啟哪些能夠被異步并行的方法信息(注意:一個(gè)是調(diào)用方A是開(kāi)啟異步并行調(diào)用,另一個(gè)是B和C定義了能夠被并行異步調(diào)用锰镀,對(duì)于定義了能夠進(jìn)行異步并行調(diào)用的方法但汞,只有在調(diào)用方開(kāi)啟了異步調(diào)用才起作用),對(duì)應(yīng)的注解屬性信息如下:- 注解@EnableAsyncMethod
@Documented
@Retention(RUNTIME)
@Target(METHOD)
/**
* @author yujiakui
*
* 下午3:54:02
*
*/
public @interface EnableAsyncMethod {
/**
* 異步并行類方法信息列表
*
* @return
*/
EnableAsyncClassMethodInfo[] classMethodInfos() default { @EnableAsyncClassMethodInfo };
}
注解@ EnableAsyncMethod開(kāi)啟異步并行調(diào)用的方法互站,即是被這個(gè)開(kāi)啟異步并行方法調(diào)用的方法開(kāi)啟對(duì)應(yīng)的異步并行調(diào)用,比如如果方法A開(kāi)啟了異步并行調(diào)用僵缺,并行方法A調(diào)用了方法B和C胡桃,且B和C都是定義了能夠進(jìn)行異步并行調(diào)用的,則可以在方法A中實(shí)現(xiàn)B和C對(duì)應(yīng)的異步并行方法調(diào)用(前提是A開(kāi)啟的方法對(duì)應(yīng)的屬性配置中包括B和C)磕潮,注解EnableAsyncMethod對(duì)應(yīng)的屬性如下表所示:- 類AsyncAnnotationParserFactory
類AsyncAnnotationParserFactory主要的功能則是解析注解AsyncClassDef翠胰,AsyncMethodDef和AsyncThreadPoolConfig,通過(guò)解析他們獲得那些定義了能夠進(jìn)行異步并行調(diào)用的配置信息自脯;并且此類還可以從全局的配置信息中獲取對(duì)應(yīng)的異步并行全局的線程池配置之景。 - 類AsyncLoadHandleFactory
類AsyncLoadHandleFactory主要的功能是根據(jù)類AsyncAnnotationParserFactory解析注解得到的異步并行調(diào)用的配置信息,來(lái)對(duì)具體方法調(diào)用進(jìn)行攔截并開(kāi)啟對(duì)應(yīng)的異步并行調(diào)度(開(kāi)啟線程池和cglib包裝對(duì)應(yīng)的返回結(jié)果)膏潮。 - 類AsyncLoadHandlerAdvice
類AsyncLoadHandlerAdvice主要的功能是攔截標(biāo)記了注解@AsyncClassDef的類锻狗,并對(duì)應(yīng)其中標(biāo)記了@AsyncMethodDef的方法進(jìn)行異步并行調(diào)用,在進(jìn)行異步并行調(diào)用之前,還要經(jīng)過(guò)是否開(kāi)啟異步并行調(diào)用的判斷轻纪,如果沒(méi)有開(kāi)啟油额,則不會(huì)進(jìn)行異步并行調(diào)用。 - 類AsyncLoadEnableAdvice
類AsyncLoadEnableAdvice主要的功能是攔截標(biāo)記了注解@ EnableAsyncClass的類刻帚,并對(duì)注解@EnableAsyncClassMethodInfo和@EnableAsyncMethod進(jìn)行解析獲得對(duì)應(yīng)的開(kāi)啟異步并行調(diào)用的配置信息潦嘶,這個(gè)配置信息通過(guò)ThreadLocal傳給類AsyncLoadHandlerAdvice進(jìn)行異步并行調(diào)用之前的開(kāi)啟功能判斷。
2.2對(duì)應(yīng)的處理流程
對(duì)應(yīng)整體調(diào)用的序列圖如下:3. 使用
- 定義可以進(jìn)行異步并行調(diào)用的方法崇众,如下所示:
@Component
@AsyncClassDef
public class AsyncLoadAnnotationTestServiceImpl extends AsyncLoadTestServiceImpl {
@Override
@AsyncMethodDef(timeout = 10)
public AsyncLoadTestModel getRemoteModel(String name, long sleep) {
return super.getRemoteModel(name, sleep);
}
}
- 開(kāi)啟對(duì)應(yīng)的異步并行調(diào)用掂僵,如下所示:
@Component
@EnableAsyncClass
public class AsyncLoadAnnotationMultiMethodTest {
@Autowired
private AsyncLoadAnnotationTestServiceImpl asyncLoadAnnotationTestServiceImpl;
@EnableAsyncMethod
public List<AsyncLoadTestModel> multiHandler(String name, long sleep) {
List<AsyncLoadTestModel> results = Lists.newArrayList();
for (int i = 0; i < 5; i++) {
AsyncLoadTestModel model = asyncLoadAnnotationTestServiceImpl.getRemoteModel(name,
sleep);
results.add(model);
}
return results;
}
}
- 測(cè)試
public class AsyncLoadAnnotationMultiTest {
public static void main(String[] args) {
AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(
"com.alibaba.asyncload.impl.annotation",
"com.alibaba.asyncload.annotation",
"com.alibaba.asyncload.domain");
// 執(zhí)行測(cè)試
AsyncLoadAnnotationMultiMethodTest service = annotationConfigApplicationContext
.getBean(AsyncLoadAnnotationMultiMethodTest.class);
List<AsyncLoadTestModel> models = service.multiHandler("xxx", 10000);
long start = 0, end = 0;
for (AsyncLoadTestModel model : models) {
start = System.currentTimeMillis();
System.out.println(model.getDetail());
end = System.currentTimeMillis();
System.out.println("costTime:" + (end - start));
}
}
}