創(chuàng)建線程池的方式
- ThreadPoolExecutor
- ScheduledThreadPoolExecutor
- ForkJoinPool
ThreadPoolExecutor
- 案例 多線程處理大量數(shù)據(jù)
CountDownLatch 來(lái)使主線程等待線程池中的線程執(zhí)行完畢驻啤。
package com.example.demo.threadpool;
import cn.hutool.core.io.file.FileReader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 多線程處理大量數(shù)據(jù)
* @author rp
*/
@RestController
@RequestMapping("/vast")
public class VastDataController {
private static List<String> mysqlData;
private static CountDownLatch threadsSignal;
/**
* 每個(gè)線程處理的數(shù)據(jù)量
* */
private static final int count = 10000;
static class ThreadNameFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, threadNumber.getAndIncrement()+"線程");
}
}
/**
* 定義線程池?cái)?shù)量為8,每個(gè)線程處理1000條數(shù)據(jù)
* */
private static ThreadPoolExecutor execPool = new ThreadPoolExecutor(8, 8,
5, TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ThreadNameFactory());
/**
* 多線程模擬處理大量數(shù)據(jù)菲驴,模擬請(qǐng)求
* */
@RequestMapping("/dataHandle")
public static String dataHandle(){
mysqlData = new ArrayList<>();
long start = System.currentTimeMillis();
batchAddData();
long end = System.currentTimeMillis();
System.out.println("==========");
String s = end - start + "ms";
System.out.println(s);
mysqlData = null;
return s;
}
/**
* 多線程批量執(zhí)行插入,百萬(wàn)數(shù)據(jù)需要大約不到20秒 64位4核處理
*/
public static void batchAddData() {
//需要插入數(shù)據(jù)庫(kù)的數(shù)據(jù)
List<String> limodel = readFile();
try {
if(limodel.size() <= count) {
threadsSignal = new CountDownLatch(1);
execPool.submit(new InsertDate(limodel));
}else {
List<List<String>> li = createList(limodel, count);
threadsSignal = new CountDownLatch(li.size());
for(List<String> liop : li) {
execPool.submit(new InsertDate(liop));
}
}
threadsSignal.await();
} catch (Exception e) {
e.printStackTrace();
}
limodel = null;
}
/**
* 數(shù)據(jù)拆分
* @param targe
* @param size
* @return
*/
private static List<List<String>> createList(List<String> targe, int size) {
List<List<String>> listArr = new ArrayList<>();
//獲取被拆分的數(shù)組個(gè)數(shù)
int arrSize = targe.size() % size == 0 ? targe.size() / size : targe.size() / size + 1;
for(int i = 0 ; i < arrSize ; i++) {
List<String> sub = new ArrayList<String>();
//把指定索引數(shù)據(jù)放入到list中
for(int j = i*size ; j <= size*(i+1)-1 ; j++) {
if(j <= targe.size()-1) {
sub.add(targe.get(j));
}
}
listArr.add(sub);
}
return listArr;
}
/**
* 內(nèi)部類,開(kāi)啟線程批量保存數(shù)據(jù)
* @author rp
*
*/
static class InsertDate extends Thread{
List<String> lienties = new ArrayList<>();
InsertDate(List<String> listModel){
//可對(duì)數(shù)據(jù)進(jìn)行定制化處理/計(jì)算等
lienties.addAll(listModel);
}
@Override
public void run() {
//模擬存入數(shù)據(jù)庫(kù)
mysqlData.addAll(lienties);
threadsSignal.countDown();
}
}
public static List<String> readFile(){
//默認(rèn)UTF-8編碼骑冗,可以在構(gòu)造中傳入第二個(gè)參數(shù)做為編碼赊瞬,vast.txt 測(cè)試數(shù)據(jù)文件
FileReader fileReader = new FileReader("/Users/****/develop/***/interview/src/main/resources/vast.txt");
String[] split = fileReader.readString().split(",");
return Arrays.asList(split);
}
}
ScheduledThreadPoolExecutor
package com.example.demo.threadpool;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 線程池ScheduledThreadPoolExecutor詳解
* @author rp
*/
@Slf4j
public class ScheduledThreadPoolExecutorTest {
static class ThreadNameFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(5);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, threadNumber.getAndIncrement()+"線程");
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
5,
new ThreadNameFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
/**
* 延遲3秒執(zhí)行任務(wù)
* */
executor.schedule(
new Runnable() {
@Override
public void run() {
log.info("延遲3秒后執(zhí)行~~~");
}
},
3,
TimeUnit.SECONDS
);
/**
* 延遲5秒后執(zhí)行
* */
ScheduledFuture<String> schedule = executor.schedule(
new Callable<String>() {
@Override
public String call() throws Exception {
return "獲得執(zhí)行結(jié)果";
}
},
5,
TimeUnit.SECONDS
);
log.info(schedule.get());
/**
* scheduleAtFixedRate
* */
executor.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
log.info("執(zhí)行任務(wù)");
}
},
// 第一次執(zhí)行任務(wù)延遲多久
0,
// 每隔多久執(zhí)行一次任務(wù)
3,
TimeUnit.SECONDS
);
/**
* scheduleWithFixedDelay
* */
executor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
log.info("執(zhí)行任務(wù)");
}
},
// 第一次執(zhí)行任務(wù)延遲多久
0,
// 每次執(zhí)行完任務(wù)之后,延遲多久再次執(zhí)行這個(gè)任務(wù)
3,
TimeUnit.SECONDS
);
}
}
ForkJoinPool
package com.example.demo.threadpool;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* ForkJoinPool 實(shí)現(xiàn)1-100的求和
* ForkJoinPool 實(shí)際中不怎么使用
* @author rp
*/
@Slf4j
public class ForkJoinPoolTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> task = forkJoinPool.submit(new Task(1, 1000));
Integer integer = task.get();
log.info("求和結(jié)果:{}",integer);
}
static class Task extends RecursiveTask<Integer> {
/**
* 當(dāng)前任務(wù)計(jì)算的起始
*/
private Integer start;
/**
* 當(dāng)前任務(wù)計(jì)算的結(jié)束
*/
private Integer end;
/**
* 閾值沐旨,end-start在閾值以內(nèi)褒侧,那么就不用再去細(xì)分任務(wù)
*/
private static final int threshold = 2;
public Integer getStart() {
return start;
}
public void setStart(Integer start) {
this.start = start;
}
public Integer getEnd() {
return end;
}
public void setEnd(Integer end) {
this.end = end;
}
public Task(Integer start, Integer end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
boolean needFork = (end - start) > threshold;
if (needFork){
int middle = (end + start)/2;
Task leftTask = new Task(start, middle);
Task rightTask = new Task(middle+1, end);
//執(zhí)行子任務(wù)
leftTask.fork();
rightTask.fork();
//子任務(wù)執(zhí)行完成之后的結(jié)果
Integer leftJoin = leftTask.join();
Integer rightJoin = rightTask.join();
sum = leftJoin+rightJoin;
}else {
for (int i = start;i <= end;i++){
sum += i;
}
}
return sum;
}
}
}
線程池調(diào)優(yōu)
調(diào)優(yōu)工具類
package com.example.demo.threadpool.tuning;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A class that calculates the optimal thread pool boundaries. It takes the desired target utilization and the desired
* work queue memory consumption as input and retuns thread count and work queue capacity.
*
* @author Niklas Schlimm
*/
public abstract class PoolSizeCalculator {
/**
* The sample queue size to calculate the size of a single {@link Runnable} element.
*/
private final int SAMPLE_QUEUE_SIZE = 1000;
/**
* Accuracy of test run. It must finish within 20ms of the testTime otherwise we retry the test. This could be
* configurable.
*/
private final int EPSYLON = 20;
/**
* Control variable for the CPU time investigation.
*/
private volatile boolean expired;
/**
* Time (millis) of the test run in the CPU time calculation.
*/
private final long testtime = 3000;
/**
* Calculates the boundaries of a thread pool for a given {@link Runnable}.
*
* @param targetUtilization the desired utilization of the CPUs (0 <= targetUtilization <= 1)
* @param targetQueueSizeBytes the desired maximum work queue size of the thread pool (bytes)
*/
protected void calculateBoundaries(BigDecimal targetUtilization, BigDecimal targetQueueSizeBytes) {
calculateOptimalCapacity(targetQueueSizeBytes);
Runnable task = creatTask();
start(task);
start(task); // warm up phase
long cputime = getCurrentThreadCPUTime();
start(task); // test intervall
cputime = getCurrentThreadCPUTime() - cputime;
long waittime = (testtime * 1000000) - cputime;
calculateOptimalThreadCount(cputime, waittime, targetUtilization);
}
private void calculateOptimalCapacity(BigDecimal targetQueueSizeBytes) {
long mem = calculateMemoryUsage();
BigDecimal queueCapacity = targetQueueSizeBytes.divide(new BigDecimal(mem), RoundingMode.HALF_UP);
System.out.println("Target queue memory usage (bytes): " + targetQueueSizeBytes);
System.out.println("createTask() produced " + creatTask().getClass().getName() + " which took " + mem
+ " bytes in a queue");
System.out.println("Formula: " + targetQueueSizeBytes + " / " + mem);
System.out.println("* Recommended queue capacity (bytes): " + queueCapacity);
}
/**
* Brian Goetz' optimal thread count formula, see 'Java Concurrency in Practice' (chapter 8.2)
*
* @param cpu cpu time consumed by considered task
* @param wait wait time of considered task
* @param targetUtilization target utilization of the system
*/
private void calculateOptimalThreadCount(long cpu, long wait, BigDecimal targetUtilization) {
BigDecimal waitTime = new BigDecimal(wait);
BigDecimal computeTime = new BigDecimal(cpu);
BigDecimal numberOfCPU = new BigDecimal(Runtime.getRuntime().availableProcessors());
BigDecimal optimalthreadcount = numberOfCPU.multiply(targetUtilization).multiply(
new BigDecimal(1).add(waitTime.divide(computeTime, RoundingMode.HALF_UP)));
System.out.println("Number of CPU: " + numberOfCPU);
System.out.println("Target utilization: " + targetUtilization);
System.out.println("Elapsed time (nanos): " + (testtime * 1000000));
System.out.println("Compute time (nanos): " + cpu);
System.out.println("Wait time (nanos): " + wait);
System.out.println("Formula: " + numberOfCPU + " * " + targetUtilization + " * (1 + " + waitTime + " / "
+ computeTime + ")");
System.out.println("* Optimal thread count: " + optimalthreadcount);
}
static class ThreadNameFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, threadNumber.getAndIncrement() + "線程");
}
}
/**
* Runs the {@link Runnable} over a period defined in {@link #testtime}. Based on Heinz Kabbutz' ideas
* (http://www.javaspecialists.eu/archive/Issue124.html).
*
* @param task the runnable under investigation
*/
public void start(Runnable task) {
long start = 0;
int runs = 0;
do {
if (++runs > 5) {
throw new IllegalStateException("Test not accurate");
}
expired = false;
start = System.currentTimeMillis();
// Timer timer = new Timer();
// timer.schedule(new TimerTask() {
// @Override
// public void run() {
// expired = true;
// }
// }, testtime);
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
1,
new ThreadNameFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
executor.schedule(
new Runnable() {
@Override
public void run() {
expired = true;
}
},
testtime,
TimeUnit.MILLISECONDS
);
while (!expired) {
task.run();
}
start = System.currentTimeMillis() - start;
executor.shutdown();
// timer.cancel();
} while (Math.abs(start - testtime) > EPSYLON);
collectGarbage(3);
}
private void collectGarbage(int times) {
for (int i = 0; i < times; i++) {
System.gc();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
/**
* Calculates the memory usage of a single element in a work queue. Based on Heinz Kabbutz' ideas
* (http://www.javaspecialists.eu/archive/Issue029.html).
*
* @return memory usage of a single {@link Runnable} element in the thread pools work queue
*/
public long calculateMemoryUsage() {
BlockingQueue<Runnable> queue = createWorkQueue();
for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {
queue.add(creatTask());
}
long mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
queue = null;
collectGarbage(15);
mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
queue = createWorkQueue();
for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {
queue.add(creatTask());
}
collectGarbage(15);
mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
return (mem1 - mem0) / SAMPLE_QUEUE_SIZE;
}
/**
* Create your runnable task here.
*
* @return an instance of your runnable task under investigation
*/
protected abstract Runnable creatTask();
/**
* Return an instance of the queue used in the thread pool.
*
* @return queue instance
*/
protected abstract BlockingQueue<Runnable> createWorkQueue();
/**
* Calculate current cpu time. Various frameworks may be used here, depending on the operating system in use. (e.g.
* http://www.hyperic.com/products/sigar). The more accurate the CPU time measurement, the more accurate the results
* for thread count boundaries.
*
* @return current cpu time of current thread
*/
protected abstract long getCurrentThreadCPUTime();
}
調(diào)優(yōu)案例
package com.example.demo.threadpool.tuning;
import lombok.extern.slf4j.Slf4j;
import java.lang.management.ManagementFactory;
import java.math.BigDecimal;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 線程池調(diào)優(yōu)示例
* 線程數(shù)調(diào)優(yōu)
* 1. CPU密集型任務(wù) N+1
* 2. IO密集型任務(wù) 2N
* 3. 混合型任務(wù)
* N * ∪ * (1+WT/ST)
* ●N: CPU核心數(shù)
* ●U: 目標(biāo)CPU利用率
* ●WT: 線程等待時(shí)間
* ●ST: 線程運(yùn)行時(shí)間
* @author rp
*/
@Slf4j
public class MyPoolSizeCalculator extends PoolSizeCalculator {
/**
* 運(yùn)行結(jié)果將打印出應(yīng)設(shè)置相關(guān)調(diào)優(yōu)參數(shù)數(shù)值
* */
public static void main(String[] args) {
MyPoolSizeCalculator calculator = new MyPoolSizeCalculator();
calculator.calculateBoundaries(
//CPU目標(biāo)利用率
new BigDecimal(1.0),
// BlockingQueue 占用的內(nèi)存大小,byte
new BigDecimal(100000));
}
@Override
protected long getCurrentThreadCPUTime() {
//當(dāng)前線程占用的總時(shí)間
return ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();
}
/**
* 實(shí)際項(xiàng)目中需要運(yùn)行的任務(wù)
* */
@Override
protected Runnable creatTask() {
return new Runnable() {
@Override
public void run() {
//log.info("實(shí)際項(xiàng)目中需要運(yùn)行的任務(wù)");
}
};
}
/**
* 計(jì)算BlockingQueue大小
* */
@Override
protected BlockingQueue createWorkQueue() {
return new LinkedBlockingQueue<>();
}
}
運(yùn)行結(jié)果
- Recommended queue capacity (bytes): 2500
對(duì)應(yīng)隊(duì)列數(shù) new LinkedBlockingQueue<>(2500)
- Formula: 12 * 1 * (1 + 9672000 / 2990328000)
對(duì)應(yīng)公式 N * ∪ * (1+WT/ST)
- Optimal thread count: 12
對(duì)應(yīng)線程數(shù) corePoolSize=12冠王,maximumPoolSize=12