ReentrantLock限制線程池隊(duì)列大小

關(guān)于線程池介紹叫乌,我不在此贅敘柴罐,請參考http://www.reibang.com/p/ade771d2c9c0
線程池中queue一般設(shè)置大小默認(rèn)是Integer.MAX_VALUE,如果設(shè)置了大小憨奸,就必須實(shí)現(xiàn)一個(gè)丟棄策略革屠,而默認(rèn)的丟棄策略居然是拋異常。

    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

當(dāng)任務(wù)量超大膀藐,內(nèi)存被撐滿造成宕機(jī),會導(dǎo)致所有的任務(wù)都丟失了红省。當(dāng)然额各,可以使用MQ來解決類似的問題。在此我們只討論使用線程池本身來解決吧恃。
那能不能人為控制隊(duì)列大小虾啦,當(dāng)隊(duì)列達(dá)到該值,就不再往線程池隊(duì)列里提交任務(wù)呢痕寓?以下采用ReentrantLock可重入鎖機(jī)制來實(shí)現(xiàn)

/**
 * Created on 2018/1/22 16:29
 * <p>
 * Description: [測試控制線程池隊(duì)列大小]
 * <p>
 * Company: [xxxx]
 *
 * @author [aichiyu]
 */
public class TestLockPool {

    private int maxSize = 100 ;

    private final ReentrantLock lock = new ReentrantLock();
    private List<Condition> list = new LinkedList<>();

    private ThreadPoolExecutor executor =new ThreadPoolExecutor(20, 100,
                                      60L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());

    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);


    public void init(){
        scheduledExecutorService.scheduleAtFixedRate(()->{
            int queueSize = executor.getQueue().size();
            //每秒檢查一次傲醉,當(dāng)隊(duì)列中任務(wù)被執(zhí)行完就解鎖一批任務(wù),繼續(xù)往隊(duì)列中加
            if( queueSize < maxSize * 0.8 && list.size() > 0  ){
                System.out.println("unlock !!~~");
                lock.lock();
                int i = 0 ;
                Iterator<Condition> iterator = list.iterator();
                while (i < maxSize-queueSize && iterator.hasNext()){
                    iterator.next().signal();
                    iterator.remove();
                    i++;
                }
                System.out.println("signal over!!~~,num="+(i));
                lock.unlock();
            }
        },1,1, TimeUnit.SECONDS);
    }

    private void consume(){

        try {
            //當(dāng)隊(duì)列大小超過限制呻率,阻塞當(dāng)前線程硬毕,等待隊(duì)列空閑
            if(executor.getQueue().size() >= maxSize ){
                System.out.println(Thread.currentThread()+" wait !!~"+"pool queue size = "+executor.getQueue().size());
                lock.lock();
                Condition condition = lock.newCondition();
                list.add(condition);
                condition.await();
                System.out.println(Thread.currentThread()+"wait over!~~");
                lock.unlock();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executor.submit(()->{
            System.out.println(Thread.currentThread()+" execute !!~~"+"pool queue size = "+executor.getQueue().size());


            try {
                //模擬任務(wù)阻塞
                Thread.sleep(2500L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    public static void main(String[] args)  {
        TestLockPool testLock = new TestLockPool();
        testLock.init();
         ExecutorService service = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 200; i++) {
            service.submit(()->testLock.consume());
        }

        System.out.println("main over!~");
    }

}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市礼仗,隨后出現(xiàn)的幾起案子吐咳,更是在濱河造成了極大的恐慌,老刑警劉巖元践,帶你破解...
    沈念sama閱讀 212,599評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件韭脊,死亡現(xiàn)場離奇詭異,居然都是意外死亡单旁,警方通過查閱死者的電腦和手機(jī)沪羔,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,629評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來象浑,“玉大人蔫饰,你說我怎么就攤上這事∮洳颍” “怎么了死嗦?”我有些...
    開封第一講書人閱讀 158,084評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長粒氧。 經(jīng)常有香客問我越除,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,708評論 1 284
  • 正文 為了忘掉前任摘盆,我火速辦了婚禮翼雀,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘孩擂。我一直安慰自己狼渊,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,813評論 6 386
  • 文/花漫 我一把揭開白布类垦。 她就那樣靜靜地躺著狈邑,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蚤认。 梳的紋絲不亂的頭發(fā)上米苹,一...
    開封第一講書人閱讀 50,021評論 1 291
  • 那天,我揣著相機(jī)與錄音砰琢,去河邊找鬼蘸嘶。 笑死,一個(gè)胖子當(dāng)著我的面吹牛陪汽,可吹牛的內(nèi)容都是我干的训唱。 我是一名探鬼主播,決...
    沈念sama閱讀 39,120評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼挚冤,長吁一口氣:“原來是場噩夢啊……” “哼况增!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起训挡,我...
    開封第一講書人閱讀 37,866評論 0 268
  • 序言:老撾萬榮一對情侶失蹤巡通,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后舍哄,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體宴凉,經(jīng)...
    沈念sama閱讀 44,308評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,633評論 2 327
  • 正文 我和宋清朗相戀三年表悬,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了弥锄。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,768評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡蟆沫,死狀恐怖籽暇,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情饭庞,我是刑警寧澤戒悠,帶...
    沈念sama閱讀 34,461評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站舟山,受9級特大地震影響绸狐,放射性物質(zhì)發(fā)生泄漏卤恳。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,094評論 3 317
  • 文/蒙蒙 一寒矿、第九天 我趴在偏房一處隱蔽的房頂上張望突琳。 院中可真熱鬧,春花似錦符相、人聲如沸拆融。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,850評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽镜豹。三九已至,卻和暖如春蓝牲,著一層夾襖步出監(jiān)牢的瞬間趟脂,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,082評論 1 267
  • 我被黑心中介騙來泰國打工搞旭, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留散怖,地道東北人菇绵。 一個(gè)月前我還...
    沈念sama閱讀 46,571評論 2 362
  • 正文 我出身青樓肄渗,卻偏偏與公主長得像,于是被迫代替她去往敵國和親咬最。 傳聞我的和親對象是個(gè)殘疾皇子翎嫡,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,666評論 2 350

推薦閱讀更多精彩內(nèi)容