一、場景描述
例如: 需要指定3個線程來處理1000條數(shù)據(jù), 其中線程數(shù)右蒲、數(shù)據(jù)量是可變的。
二赶熟、 思路
- 針對數(shù)據(jù)量進(jìn)行分片瑰妄, 片的數(shù)量即線程數(shù)。
- 控制線程數(shù)可通過CountDownLatch來處理钧大。
三翰撑、代碼
1. 程序執(zhí)行類
/**
* @ClassName : LimitThreadNumApp
* @Description : 限定線程的執(zhí)行數(shù),去執(zhí)行大數(shù)據(jù)
* @Author : hack2012
* @Date: 2020-12-16 22:53
*/
public class LimitThreadNumApp {
Logger log = LoggerFactory.getLogger(getClass());
static final ExecutorService executorService = Executors.newFixedThreadPool(3);
@Test
public void testProcess() throws Exception{
//數(shù)據(jù)量
int size = 100;
//定義執(zhí)行任務(wù)的線程數(shù)
int threadNum = 3;
process(threadNum, size);
}
public void process(int threadNum, int size) throws InterruptedException {
List<TaskModel> datas = initData(size);
//計算每個分片的大小
int shardSize = datas.size() / threadNum + 1;
CountDownLatch latch = new CountDownLatch(threadNum);
int cnt = 0;
for (int i = 0; i < threadNum; i++) {
List<TaskModel> processDatas = shardTask(datas, i, shardSize);
cnt += processDatas.size();
executorService.submit(new TaskCallable(processDatas, latch));
}
latch.await();
log.info("預(yù)期數(shù)據(jù)量:{};實際執(zhí)行數(shù)據(jù)量:{}", size, cnt);
}
/** 任務(wù)分片 */
private List<TaskModel> shardTask(List<TaskModel> datas, int tnum, int size){
int start = tnum * size;
int end = (tnum + 1) * size;
if (end > datas.size()) {
end = datas.size();
}
return datas.subList(start, end);
}
private List<TaskModel> initData(int size){
List<TaskModel> datas = Lists.newArrayList();
for (int i = 1; i <= size; i++) {
TaskModel data = new TaskModel();
data.setId(String.valueOf(i));
datas.add(data);
}
return datas;
}
}
2. 定義任務(wù)模型
/**
* @ClassName : TaskModel
* @Description : 數(shù)據(jù)模型
* @Author : hack2012
* @Date: 2020-12-16 22:55
*/
public class TaskModel {
private String id ;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
3. 定義任務(wù)執(zhí)行類
/**
* @ClassName : TaskCallable
* @Description : 任務(wù)執(zhí)行器
* @Author : hack2012
* @Date: 2020-12-16 22:58
*/
public class TaskCallable implements Callable<Boolean> {
Logger log = LoggerFactory.getLogger(getClass());
List<TaskModel> datas;
CountDownLatch latch;
TaskCallable(List<TaskModel> datas, CountDownLatch latch) {
this.datas = datas;
this.latch = latch;
}
@Override
public Boolean call() throws Exception {
try {
int cnt = 0;
for (TaskModel data : datas) {
cnt++;
}
log.info("[{}]-線程執(zhí)行數(shù)量:{}", Thread.currentThread().getName(), cnt);
latch.countDown();
return true;
} catch (Exception e) {
log.error(Thread.currentThread().getName() + "線程執(zhí)行失敗:{}", e.getMessage());
}
return false;
}
}
四、測試結(jié)果
1. 線程3眶诈, 數(shù)量 98
[2020-12-1623:36:55下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-1]-線程執(zhí)行數(shù)量:33
-[2020-12-1623:36:55下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-3]-線程執(zhí)行數(shù)量:32
-[2020-12-1623:36:55下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-2]-線程執(zhí)行數(shù)量:33
-[2020-12-1623:36:55下午]:INFOthread.LimitThreadNumApp.process(LimitThreadNumApp.java:44)預(yù)期數(shù)據(jù)量:98;實際執(zhí)行數(shù)據(jù)量:98
1. 線程3涨醋, 數(shù)量 99
[2020-12-1623:37:40下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-3]-線程執(zhí)行數(shù)量:31
-[2020-12-1623:37:40下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-1]-線程執(zhí)行數(shù)量:34
-[2020-12-1623:37:40下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-2]-線程執(zhí)行數(shù)量:34
-[2020-12-1623:37:40下午]:INFOthread.LimitThreadNumApp.process(LimitThreadNumApp.java:44)預(yù)期數(shù)據(jù)量:99;實際執(zhí)行數(shù)據(jù)量:99
1. 線程3, 數(shù)量 100
[2020-12-1623:38:18下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-1]-線程執(zhí)行數(shù)量:34
-[2020-12-1623:38:18下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-2]-線程執(zhí)行數(shù)量:34
-[2020-12-1623:38:18下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-3]-線程執(zhí)行數(shù)量:32
-[2020-12-1623:38:18下午]:INFOthread.LimitThreadNumApp.process(LimitThreadNumApp.java:44)預(yù)期數(shù)據(jù)量:100;實際執(zhí)行數(shù)據(jù)量:100