最近遇到一個(gè)需求舅桩,一批數(shù)據(jù),要去請求A B C 多個(gè)接口寨蹋。不同接口返回不同字段的值松蒜。然后設(shè)置到原來的對象中。
其中 A BC 接口每次請求都對數(shù)量有限制已旧。
好了秸苗,使用CompleteFuture來解決
代碼:
@GetMapping("/user2")
public List<User> getData() throws Exception {
List<User> userList = new ArrayList<>(2000);
for (int j = 0; j < 1000; j++) {
User u1 = new User();
u1.setId(j + "");
u1.setAddress("地址:" + j);
u1.setAge(j + "");
userList.add(u1);
}
long l = System.currentTimeMillis();
// getOtherInfo(userList);
// getUserName(userList);
// getMobile(userList);
CompletableFuture<Void> otherTask = CompletableFuture.runAsync(() -> {
getOtherInfo(userList);
}, poolExecutor);
CompletableFuture<Void> nameTask = CompletableFuture.runAsync(() -> {
getUserName(userList);
}, poolExecutor);
CompletableFuture<Void> mobileTask = CompletableFuture.runAsync(() -> {
getMobile(userList);
}, poolExecutor);
try {
CompletableFuture.allOf(nameTask, mobileTask, otherTask).join();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("================調(diào)用第三方耗時(shí):" + (System.currentTimeMillis() - l) + " 毫秒");
return userList;
}
原始請求接口 A B C
/**
* 模擬調(diào)用第三方,獲取其他信息
*
* @param users
*/
private void getOtherInfo(List<User> users) {
long beginTime = System.currentTimeMillis();
List<List<User>> subUsers = new ArrayList<>();
List<List<User>> partitionList = getPartitionList(users, subUsers, 50);
// partitionList.forEach(list -> {
// sendRequestToService(100);
// list.forEach(user -> {
// user.setBirth("生日: " + user.getId());
// user.setSex("性別:" + user.getId());
// });
// });
List<CompletableFuture> futures=new ArrayList<>();
for (int i = 0; i < partitionList.size(); i++) {
List<User> list = partitionList.get(i);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 正在執(zhí)行任務(wù)");
sendRequestToService(100);
list.forEach(user -> {
user.setBirth("生日: " + user.getId());
user.setSex("性別:" + user.getId());
});
});
futures.add(future);
}
CompletableFuture allFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
allFuture.join();
// System.out.println(Thread.currentThread().getName() + " 正在執(zhí)行任務(wù)");
System.out.println("獲取其他信息耗時(shí):" + (System.currentTimeMillis() - beginTime) + " 毫秒");
}
/**
* 獲取用戶姓名
*
* @param users
*/
private void getUserName(List<User> users) {
long beginTime = System.currentTimeMillis();
List<List<User>> subUsers = new ArrayList<>();
List<List<User>> partitionList = getPartitionList(users, subUsers, 200);
partitionList.forEach(list -> {
sendRequestToService(130);
list.forEach(user -> {
user.setUsername("姓名:" + user.getId());
});
});
System.out.println(Thread.currentThread().getName() + " 正在執(zhí)行任務(wù)");
System.out.println("獲取用戶姓名 耗時(shí):" + (System.currentTimeMillis() - beginTime) + " 毫秒");
}
/**
* 獲取手機(jī)
* @param users
*/
private void getMobile(List<User> users) {
long beginTime = System.currentTimeMillis();
List<List<User>> subUsers = new ArrayList<>();
List<List<User>> partitionList = getPartitionList(users, subUsers, 400);
partitionList.forEach(list -> {
sendRequestToService(150);
list.forEach(user -> {
user.setMobile("手機(jī):" + user.getId());
});
});
System.out.println(Thread.currentThread().getName() + " 正在執(zhí)行任務(wù)");
System.out.println("獲取用戶姓名 耗時(shí):" + (System.currentTimeMillis() - beginTime) + " 毫秒");
}
數(shù)據(jù)進(jìn)行分片 和模擬時(shí)間
/**
* 根據(jù)分片來獲取數(shù)據(jù)
*
* @param users
* @param subUsers
* @param count
* @return
*/
private List<List<User>> getPartitionList(List<User> users, List<List<User>> subUsers, int count) {
if (users.size() >= count) {
subUsers = Lists.partition(users, count);
} else {
subUsers.add(users);
}
return subUsers;
}
/**
* 根據(jù)傳入時(shí)間來判斷暫停接口多少毫秒
*
* @param i
*/
private void sendRequestToService(int count) {
try {
// 模擬請求對面接口count毫秒
TimeUnit.MILLISECONDS.sleep(count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Bean
@Qualifier(value = "MyThread")
private ThreadPoolTaskExecutor poolExecutor(){
ThreadPoolTaskExecutor executor =new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(10);
executor.setThreadNamePrefix("Pool-Nexus");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
最終結(jié)果:
不使用 future
http-nio-8009-exec-1 正在執(zhí)行任務(wù)
獲取其他信息耗時(shí):2012 毫秒
http-nio-8009-exec-1 正在執(zhí)行任務(wù)
獲取用戶姓名 耗時(shí):652 毫秒
http-nio-8009-exec-1 正在執(zhí)行任務(wù)
獲取用戶姓名 耗時(shí):454 毫秒
================調(diào)用第三方耗時(shí):3118 毫秒
使用3個(gè)
Pool-Nexus3 正在執(zhí)行任務(wù)
獲取用戶姓名 耗時(shí):453 毫秒
Pool-Nexus2 正在執(zhí)行任務(wù)
獲取用戶姓名 耗時(shí):654 毫秒
Pool-Nexus1 正在執(zhí)行任務(wù)
獲取其他信息耗時(shí):2011 毫秒
================調(diào)用第三方耗時(shí):2014 毫秒
使用3個(gè)再為了獲取其他信息接口再套3個(gè)
ForkJoinPool.commonPool-worker-1 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-2 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-4 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-3 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-5 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-3 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-4 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-2 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-5 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-1 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-3 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-4 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-2 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-1 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-5 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-4 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-2 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-3 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-5 正在執(zhí)行任務(wù)
ForkJoinPool.commonPool-worker-1 正在執(zhí)行任務(wù)
獲取其他信息耗時(shí):406 毫秒
Pool-Nexus3 正在執(zhí)行任務(wù)
獲取用戶姓名 耗時(shí):454 毫秒
Pool-Nexus2 正在執(zhí)行任務(wù)
獲取用戶姓名 耗時(shí):656 毫秒
================調(diào)用第三方耗時(shí):660 毫秒
1個(gè)異步加多個(gè)completeFuture