關(guān)于線程池介紹叫乌,我不在此贅敘柴罐,請參考http://www.reibang.com/p/ade771d2c9c0
線程池中queue一般設(shè)置大小默認(rèn)是Integer.MAX_VALUE,如果設(shè)置了大小憨奸,就必須實(shí)現(xiàn)一個(gè)丟棄策略革屠,而默認(rèn)的丟棄策略居然是拋異常。
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
當(dāng)任務(wù)量超大膀藐,內(nèi)存被撐滿造成宕機(jī),會導(dǎo)致所有的任務(wù)都丟失了红省。當(dāng)然额各,可以使用MQ來解決類似的問題。在此我們只討論使用線程池本身來解決吧恃。
那能不能人為控制隊(duì)列大小虾啦,當(dāng)隊(duì)列達(dá)到該值,就不再往線程池隊(duì)列里提交任務(wù)呢痕寓?以下采用ReentrantLock可重入鎖機(jī)制來實(shí)現(xiàn)
/**
* Created on 2018/1/22 16:29
* <p>
* Description: [測試控制線程池隊(duì)列大小]
* <p>
* Company: [xxxx]
*
* @author [aichiyu]
*/
public class TestLockPool {
private int maxSize = 100 ;
private final ReentrantLock lock = new ReentrantLock();
private List<Condition> list = new LinkedList<>();
private ThreadPoolExecutor executor =new ThreadPoolExecutor(20, 100,
60L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
public void init(){
scheduledExecutorService.scheduleAtFixedRate(()->{
int queueSize = executor.getQueue().size();
//每秒檢查一次傲醉,當(dāng)隊(duì)列中任務(wù)被執(zhí)行完就解鎖一批任務(wù),繼續(xù)往隊(duì)列中加
if( queueSize < maxSize * 0.8 && list.size() > 0 ){
System.out.println("unlock !!~~");
lock.lock();
int i = 0 ;
Iterator<Condition> iterator = list.iterator();
while (i < maxSize-queueSize && iterator.hasNext()){
iterator.next().signal();
iterator.remove();
i++;
}
System.out.println("signal over!!~~,num="+(i));
lock.unlock();
}
},1,1, TimeUnit.SECONDS);
}
private void consume(){
try {
//當(dāng)隊(duì)列大小超過限制呻率,阻塞當(dāng)前線程硬毕,等待隊(duì)列空閑
if(executor.getQueue().size() >= maxSize ){
System.out.println(Thread.currentThread()+" wait !!~"+"pool queue size = "+executor.getQueue().size());
lock.lock();
Condition condition = lock.newCondition();
list.add(condition);
condition.await();
System.out.println(Thread.currentThread()+"wait over!~~");
lock.unlock();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.submit(()->{
System.out.println(Thread.currentThread()+" execute !!~~"+"pool queue size = "+executor.getQueue().size());
try {
//模擬任務(wù)阻塞
Thread.sleep(2500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
public static void main(String[] args) {
TestLockPool testLock = new TestLockPool();
testLock.init();
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 200; i++) {
service.submit(()->testLock.consume());
}
System.out.println("main over!~");
}
}