1 資料集合
Fork/Join框架(一)引言
Fork/Join框架(二)創(chuàng)建一個Fork/Join池
Fork/Join框架(三)加入任務的結(jié)果
Fork/Join框架(四)異步運行任務
Fork/Join框架(五)在任務中拋出異常
Fork/Join框架(六)取消任務
全部摘自并發(fā)編程網(wǎng)莫其,特別好的翻譯和校對蔽挠,可以看特別多的內(nèi)容毛俏,感謝方騰飛,感謝并發(fā)編程網(wǎng) 希望你們越做越好溢十。
2 為什么要用?——業(yè)務需求
有200萬左右的歷史用戶需要贈送會員并根據(jù)用戶行為給予30天或365天的期限瓢喉。
因為有業(yè)務需求摹蘑,所以無法直接走DB變更。
因為用戶量并沒有特別大扰法,也沒用zk或者其他分布式服務均分用戶去做(服務器同時啟動對DB壓力會很大)蛹含。
最后準備用預發(fā)單節(jié)點去刷200萬的用戶。很適合用多線程去處理塞颁。
3 Fork/Join 框架
多線程經(jīng)驗并不豐富浦箱,但是知道用線程池來管理線程比較好,查詢一些Java并發(fā)的類祠锣,很多都是集中在concurrent包中酷窥,F(xiàn)ork/Join框架也在其中(Duog Lea 大神威武)。
既然是一個框架锤岸,自然已經(jīng)處理了線程池和待任務竖幔。使用過程主要依賴ForkJoinPool 和 ForkJoinTask的兩個子類RecursiveTask和RecursiveAction;ForkJoinPool繼承自AbstractExecutorService是偷,F(xiàn)orkJoinTask實現(xiàn)了Future接口拳氢。
典型ForkJoinPool的使用:
// 聲明pool
ForkJoinPool pool = new ForkJoinPool();
// 執(zhí)行task
pool.execute(task);
// 觀察執(zhí)行中
do {
System.out.printf("******************************************\n");
System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
System.out.printf("Main: Active Threads: %d\n", pool.getActiveThreadCount());
System.out.printf("Main: Task Count: %d\n", pool.getQueuedTaskCount());
System.out.printf("Main: Steal Count: %d\n", pool.getStealCount());
System.out.printf("******************************************\n");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!task.isDone());
// 關閉池
pool.shutdown();
// 等待關閉
try {
pool.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
典型ForkJoinTask的設計使用:
If (problem size < default size){
tasks=divide(task);
execute(tasks);
} else {
resolve problem using another algorithm;
}
ForkJoinTask簡單實例(重寫compute方法):
@Override
protected Integer compute() {
int result = 0;
if (end - start < 10) {
result = processLines(document, start, end, word);
} else {
int mid = (start + end) / 2;
DocumentTask task1 = new DocumentTask(document, start, mid, word);
DocumentTask task2 = new DocumentTask(document, mid, end, word);
invokeAll(task1, task2);
try {
result = groupResults(task1.get(), task2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return result;
}
4 結(jié)合業(yè)務場景使用 Fork/Join
Main類使用ForkJoinPool:
RefreshAgentToMemberTask task = new RefreshAgentToMemberTask(startId, endId, applicationContext);
ForkJoinPool pool = new ForkJoinPool();
pool.execute(task);
do {
logger.info("Thread Count: {}", pool.getActiveThreadCount());
logger.info("Thread Steal: {}", pool.getStealCount());
logger.info("Parallelism: {}", pool.getParallelism());
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!task.isDone());
pool.shutdown();
if (task.isCompletedNormally()) {
logger.info("The process has completed normally.");
}
是不是很類似于之前的例子募逞?其實幾乎所有對ForkJoinPool使用方式都類似。
ForkJoinTask 類設計:
public class RefreshAgentToMemberTask extends RecursiveAction {
private static final long serialVersionUID = -7434939182574934204L;
private Logger logger = LoggerFactory.getLogger(RefreshAgentToMemberTask.class);
private long startId;
private long endId;
private ApplicationContext applicationContext; // 用于獲取spring bean
private static final int THRESHOLD = 100;//10000; TODO test=100, prod=10000
private Map<String, Long> times = new HashMap<>();
RefreshAgentToMemberTask(long startId, long endId, ApplicationContext applicationContext) {
this.startId = startId;
this.endId = endId;
this.applicationContext = applicationContext;
}
@Override
protected void compute() {
if (endId - startId <= THRESHOLD) {
doAgentToMember();
} else {
long middleId = (startId + endId) / 2;
RefreshAgentToMemberTask task1 = new RefreshAgentToMemberTask(startId, middleId, applicationContext);
RefreshAgentToMemberTask task2 = new RefreshAgentToMemberTask(middleId, endId, applicationContext);
invokeAll(task1, task2);
}
}
/**
* 到達閥值以內(nèi)的處理方法
*/
private void doAgentToMember() {
logger.info("task doAgentToMember range: startId={}, endId={}.", startId, endId);
long start = System.currentTimeMillis();
......
logger.info("startId={}, endId={}. total cost={}", startId, endId, (System.currentTimeMillis() - start));
}
}
5 測試結(jié)果
很遺憾馋评,沒有在線上環(huán)境使用放接,因為功能被大老板砍了。沒想到技術沒干成的事最后是老板砍了留特,估計產(chǎn)品也沒想到纠脾。
不過在測試環(huán)境還是做了嘗試,利用本機(Mac JVM 能拿到的CPU核數(shù)是4)蜕青,數(shù)據(jù)量1.8萬苟蹈,兩次執(zhí)行遍歷所有用時分別為330s,360s右核。全部遍歷大概需要 38,888s ~ 10小時慧脱。挺好的,預發(fā)機器估計比我性能好贺喝,上班點一下菱鸥,下班了就可以驗證。
6 感想
業(yè)務有需要躏鱼,加上一點點平時的積累氮采,沒準就能學到一些有用的新技術。為了技術點贊~染苛。