Java線程池使用與分析

1.前言

作為一名Android開發(fā)镣陕,工作中少不了使用線程津坑。最近因?yàn)樵趦?yōu)化廣告請求結(jié)構(gòu)绽媒,重新查看一遍線程相關(guān)的知識,在此做一個總結(jié)柱宦。

2.線程使用

在Java中開啟子線程些椒,執(zhí)行異步任務(wù)最簡單的做法是:

new Thread(new Runnable() {
    @Override
    public void run() {
        // TODO 相關(guān)業(yè)務(wù)處理
    }
}).start();

每當(dāng)需要一個線程的時候就new 一個線程出來操作相關(guān)業(yè)務(wù),但這樣做的話會存在一些問題:

  1. 每次new Thread新建對象性能差掸刊。
  2. 線程缺乏統(tǒng)一管理,可能無限制新建線程赢乓,相互之間競爭忧侧,及可能占用過多系統(tǒng)資源導(dǎo)致死機(jī)或oom。
  3. 缺乏更多功能牌芋,如定時執(zhí)行蚓炬、定期執(zhí)行、線程中斷躺屁。

作為一個有想法的開發(fā)肯夏,是應(yīng)該拒絕該方式使用線程的,此時線程池的概念就應(yīng)運(yùn)而來犀暑。

3. 線程池 Executors

線程池的定義:一種線程使用模式驯击。線程過多會帶來調(diào)度開銷,進(jìn)而影響緩存局部性和整體性能耐亏。而線程池維護(hù)著多個線程徊都,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)。
在Java中系統(tǒng)會提供一下自帶的幾種線程池:

  • newFixedThreadPool:一個可以無限擴(kuò)大的線程池
  • newCachedThreadPool:固定大小的線程池广辰;corePoolSize和maximunPoolSize都為用戶設(shè)定的線程數(shù)量nThreads暇矫;keepAliveTime為0,
  • newScheduledThreadPool:一個主要接受定時任務(wù)的線程池择吊,有兩種提交任務(wù)的方式:scheduledAtFixedRate和scheduledWithFixedDelaySchduled
  • newSingleThreadExecutor:一個只會創(chuàng)建一條工作線程處理任務(wù)線程池

查看Executors 源碼可以看到有如下幾種方法:

Executors.png

分別對應(yīng)上述幾種線程池李根。
關(guān)于線程池的設(shè)計思想其根本是生產(chǎn)消費(fèi)者,線程池會維護(hù)兩個隊列

  1. 各個線程的集合
  2. 各個任務(wù)的集合

前者負(fù)責(zé)生產(chǎn)維護(hù)線程几睛,后者則消費(fèi)線程房轿。
通過源碼跟蹤,上述幾種創(chuàng)建線程最終都會調(diào)用ThreadPoolExecutor的構(gòu)造方法

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
                             }
  • int corePoolSize:核心線程池數(shù)量枉长,線程池創(chuàng)建后保持的數(shù)量
  • int maximumPoolSize:線程池最大線程數(shù)量冀续,大于這個數(shù)則只需Rejected策略
  • long keepAliveTime:空閑線程存活的最大時間,超過這個時間則回收
  • TimeUnit unit: 空閑線程存活的最大時間單位設(shè)置
  • BlockingQueue<Runnable> workQueue:core線程數(shù)滿后存放任務(wù)需要的阻塞隊列
  • ThreadFactory threadFactory: 線程創(chuàng)建工廠必峰,可以指定自定義的線程池名稱等
  • RejectedExecutionHandler handler:自定義拒絕策略洪唐,如果任務(wù)數(shù)量大于maximumPoolSize則執(zhí)行此策略。
    corePoolSize 和maximumPoolSize 以及workQueue 的長度會有以下幾種關(guān)系:
  • 如果正在運(yùn)行的線程數(shù)量小于 corePoolSize吼蚁,那么馬上創(chuàng)建線程運(yùn)行這個任務(wù)凭需;
  • 如果正在運(yùn)行的線程數(shù)量大于或等于 corePoolSize问欠,那么將這個任務(wù)放入隊列。
  • 如果這時候隊列滿了粒蜈,而且正在運(yùn)行的線程數(shù)量小于 maximumPoolSize顺献,那么還是要創(chuàng)建線程運(yùn)行這個任務(wù);
  • 如果隊列滿了枯怖,而且正在運(yùn)行的線程數(shù)量大于或等于 maximumPoolSize注整,那么線程池執(zhí)行拒絕策略

3.1 newFixedThreadPool

newFixedThreadPool.png
  • 無界任務(wù)隊列,有界的線程隊列
    查看源碼可以看到該線程的策略
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

只需要設(shè)置參數(shù)nThreads 度硝,參數(shù)workQueue 是一個空間無屆的隊列肿轨,基本可以認(rèn)為keepAliveTimemaximumPoolSize 之類的參數(shù)無效蕊程,因?yàn)榇藭r核心線程是滿負(fù)荷運(yùn)行椒袍,沒有線程會處于空閑狀態(tài)。使用方式如下:

public class NewFixedThreadPoolExample {
    public static void main(String[] args) {
        int coreSize = Runtime.getRuntime().availableProcessors();
        System.out.println("coreSize is :"+coreSize);
        ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
        for (int i =0;i<10;i++){
            int  index =i;
            executorService.execute(new Runnable() {
              @Override
              public void run() {
                  System.out.println("currentTime is :"+System.currentTimeMillis()+"---index is :"+index);
                  try {
                      Thread.sleep(2000);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          });
        }

    }

}

3.2 newCachedThreadPool

newCacheThreadPool.png

查看源碼可以看到該線程的策略

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());

corePoolSize為0藻茂,maximumPoolSize為無限大驹暑,意味著線程數(shù)量可以無限大,采用SynchronousQueue裝等待的任務(wù)辨赐,這個阻塞隊列沒有存儲空間优俘,這意味著只要有請求到來, 就必須要找到一條工作線程處理他肖油,如果當(dāng)前沒有空閑的線程兼吓,那么就會再創(chuàng)建一條新的線程。

  • 無界任務(wù)隊列森枪,無界線程隊列视搏。
    使用方式如下:
public class NewCachedThreadPoolExample {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i =0;i<10;i++){
            int index = i;
            try {
                Thread.sleep(1000*index);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("currentTime is :"+System.currentTimeMillis()+"---index is :"+index);
                }
            });
        }
    }
}

3.3 newScheduledThreadPool

newScheduledThreadPool.png

其源碼策略如下:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
  • 無界任務(wù)隊列
    該類型的線程池存在兩種調(diào)用:
    scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit);
    scheduleWithFixedDelay(Runnable command,long initialDelay, long delay, TimeUnit unit);
    調(diào)用如下:
public class NewScheduledThreadPoolExample {
    public static void main(String[] args) {
        int coreSize = Runtime.getRuntime().availableProcessors();
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
        for (int i= 0 ;i<10;i++){
            int index = i;
            executorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println("currentTime is :"+System.currentTimeMillis()+"---index is :"+index);
                 }
            },1,3, TimeUnit.SECONDS);
//            executorService.scheduleWithFixedDelay()
        }
    }
}

3.4 newSingleThreadExecutor

newSingleThreadExecutor.png
  • 無界任務(wù)隊列,有界線程隊列
  • corePoolSize :1 線程隊列只有一個县袱,和單例模式相差無幾浑娜。它只會創(chuàng)建一條工作線程處理任務(wù);采用的阻塞隊列為LinkedBlockingQueue式散。
    使用方式如下:
public class NewSingleThreadExecutorExample {
    public static void main(String[] args) {
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            singleThreadExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("currentTime is :" + System.currentTimeMillis() + "---index is :" + index);
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

以上是系統(tǒng)自帶的線程池創(chuàng)建方式筋遭,其根本設(shè)計思想生產(chǎn)者--消費(fèi)者,其中任務(wù)隊列和線程隊列的長度如果采用無界的話暴拄,就會存在OOM的風(fēng)險漓滔。阿里巴巴Java開發(fā)手冊中強(qiáng)制不允許使用Executors創(chuàng)建線程池。

Executors-OOM.png

鑒于上述原因乖篷,我們在某些場景下需要自定義線程池响驴。

4.自定義線程池

  1. ThreadPoolExecutor 自定義線程池
public class CustomThreadPool {
    //以下相關(guān)參數(shù)可以考慮根據(jù)不同設(shè)備不同環(huán)境計算得到
    private static final int CORE_POOL_SIZE = 1;
    private static final int MAX_POOL_SIZE = 1;
    private static final int KEEP_ALIVE_TIME = 30;
    private static final int CAPACITY = 2;
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<Runnable>(CAPACITY), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                //線程創(chuàng)建的次數(shù)同核心線程數(shù),最大線程數(shù)撕蔼,任務(wù)數(shù)有關(guān)系
                Thread thread = new Thread(r);
                System.out.println("newThread==="+thread.getName());
                return thread;
            }

        }, new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                //線程拒絕之后的策略,可以延時將該任務(wù)再次加入到隊列中
                System.out.println("rejectedExecution===="+((MyRunnable)r).Name);

            }
        });
        MyRunnable t1 = new MyRunnable("T1");
        MyRunnable t2 = new MyRunnable("T2");
        MyRunnable t3 = new MyRunnable("T3");
        MyRunnable t4 = new MyRunnable("T4");
        MyRunnable t5 = new MyRunnable("T5");
        MyRunnable t6 = new MyRunnable("T6");
        MyRunnable t7 = new MyRunnable("T7");
        // 將線程放入池中進(jìn)行執(zhí)行
        threadPoolExecutor.execute(t1);
        threadPoolExecutor.execute(t2);
        threadPoolExecutor.execute(t3);
        threadPoolExecutor.execute(t4);
        threadPoolExecutor.execute(t5);
        threadPoolExecutor.execute(t6);
        threadPoolExecutor.execute(t7);
    }
    static class  MyRunnable implements Runnable {
        public MyRunnable(String name) {
            Name = name;
        }

        public String Name ;
        @Override
        public void run() {
            System.out.println(Name + "正在執(zhí)行豁鲤。秽誊。。");
        }
    }
}

custom-thread-pool.png

拒絕了3個任務(wù)琳骡,因?yàn)?code>run方法耗時不同锅论,核心線程執(zhí)行效率不同所致,運(yùn)行結(jié)果回不一致楣号。

4.2 自定義ThreadPoolExecutor

1.自定義ThreadPoolExecutor 最易,可以自定義核心線程數(shù)等相關(guān)參數(shù)。

public class MyExecutorPools extends ThreadPoolExecutor {
    public MyExecutorPools(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                           BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
     * 線程執(zhí)行后進(jìn)行調(diào)用
     */
    protected void afterExecute(Runnable r, Throwable t) {
        MyRunnable myr = (MyRunnable) r;
        System.out.println(myr.getMyname() + "..執(zhí)行完畢");

        }

    /**
     * 重寫執(zhí)行方法,線程池執(zhí)行前進(jìn)行調(diào)用
     */
    protected void beforeExecute(Thread t, Runnable r) {
        MyRunnable myr = (MyRunnable) r;
        System.out.println(myr.getMyname() + "..準(zhǔn)備執(zhí)行");
    }
}
  1. ThreadFactory
public class MyThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        Thread td = new Thread(r);
        td.setName("myfactory-" + td.getId());
        td.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                //自定義異常捕獲機(jī)制..
                System.out.println("thread execute error");
            }
        });
        return td;
    }
}
  1. Runnable
public class MyRunnable implements Runnable {

    public MyRunnable(String name) {
        this.myname = name;
    }

    private String myname;

    public String getMyname() {
        return myname;
    }

    public void setMyname(String myname) {
        this.myname = myname;
    }

    @Override
    public void run() {
        System.out.println("myname is :" + myname);
        try {
            Thread.sleep(5 * 100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  1. 拒絕策略 實(shí)現(xiàn)
/**
 * 拒絕策略
 */
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        MyRunnable my = (MyRunnable)r;
        System.out.println("拒絕執(zhí)行...:"+my.getMyname());
    }
}
  1. 測試
public class MyExecuterPoolsTest {
    public static void main(String[] args) {
        MyExecutorPools ex = new MyExecutorPools(3, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
                new MyThreadFactory(), new RejectedExecutionHandlerImpl());
        for (int i = 0; i < 20; i++) {
            ex.execute(new MyRunnable("n" + i));
        }
    }
}

以上是兩種自定義線程池的方式炫狱,根據(jù)不同場景選擇使用耘纱,但是究其根本還是維護(hù)兩個隊列

  • 線程隊列
  • 任務(wù)對列

5 拒絕策略

上述是自定義一個拒絕策略,當(dāng)該任務(wù)被拒絕之后毕荐,多久再次加入到任務(wù)隊列中。其中Java 自帶幾種策略:

  • AbortPolicy 當(dāng)任務(wù)添加到線程池中被拒絕時艳馒,它將拋出 RejectedExecutionException 異常憎亚。
  • CallerRunsPolicy 當(dāng)任務(wù)添加到線程池中被拒絕時,會在線程池當(dāng)前正在運(yùn)行的Thread線程池中處理被拒絕的任務(wù)弄慰。
  • DiscardOldestPolicy 當(dāng)任務(wù)添加到線程池中被拒絕時第美,線程池會放棄等待隊列中最舊的未處理任務(wù),然后將被拒絕的任務(wù)添加到等待隊列中陆爽。
  • DiscardPolicy 當(dāng)任務(wù)添加到線程池中被拒絕時什往,線程池將丟棄被拒絕的任務(wù)。
    關(guān)于不同策略的使用可以自行搜索慌闭。
    上面是Java相關(guān)的線程池的使用和分析别威,后面結(jié)合項目中廣告任務(wù)相關(guān)業(yè)務(wù),采用自定義線程池的方式進(jìn)行改造驴剔。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末省古,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子丧失,更是在濱河造成了極大的恐慌豺妓,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件布讹,死亡現(xiàn)場離奇詭異琳拭,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)描验,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進(jìn)店門白嘁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人挠乳,你說我怎么就攤上這事权薯」枚悖” “怎么了?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵盟蚣,是天一觀的道長黍析。 經(jīng)常有香客問我,道長屎开,這世上最難降的妖魔是什么阐枣? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮奄抽,結(jié)果婚禮上蔼两,老公的妹妹穿的比我還像新娘。我一直安慰自己逞度,他們只是感情好额划,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著档泽,像睡著了一般俊戳。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上馆匿,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天抑胎,我揣著相機(jī)與錄音,去河邊找鬼渐北。 笑死阿逃,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的赃蛛。 我是一名探鬼主播恃锉,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼焊虏!你這毒婦竟也來了淡喜?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤诵闭,失蹤者是張志新(化名)和其女友劉穎炼团,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體疏尿,經(jīng)...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡瘟芝,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了褥琐。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片锌俱。...
    茶點(diǎn)故事閱讀 38,789評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖敌呈,靈堂內(nèi)的尸體忽然破棺而出贸宏,到底是詐尸還是另有隱情造寝,我是刑警寧澤,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布吭练,位于F島的核電站诫龙,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏鲫咽。R本人自食惡果不足惜签赃,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望分尸。 院中可真熱鬧锦聊,春花似錦、人聲如沸箩绍。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽材蛛。三九已至史飞,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間仰税,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工抽诉, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留陨簇,地道東北人。 一個月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓迹淌,卻偏偏與公主長得像河绽,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子唉窃,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,697評論 2 351

推薦閱讀更多精彩內(nèi)容