public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
ThreadPoolExecutor的參數(shù)
- corePoolSize :0 核心并發(fā)數(shù)啥酱,就是在線程池不飽和時挡爵,線程池可擁有的線程數(shù)姆泻。如果是0的話火窒,空閑一段時間后所有線程將全部被銷毀硼补。
-maximumPoolSize:線程池最大線程容量。
-keepAliveTIme: 當總線程數(shù)大于核心線程數(shù)corePoolSize
那部分線程存活的時間熏矿。
-BlockingQueue<Runnable>:
這個參數(shù)被稱為阻塞隊列(生產(chǎn)者消費者模型)
1.ArrayBlockingQueue
2.LinkedBlockingQueue
上此兩個要注意指定最大容量已骇,如果生產(chǎn)者的效率很高,會把隊列緩存占滿票编,然而沒有指定最大值會消耗掉內(nèi)存
3.PriorityBlockingQueue
4.DelayQueue
5.SynchronousQueue
它是一個不存儲元素的阻塞隊列褪储。每個插入操作必須等待另一個線程的移除操作,同樣移除操作也是如此慧域。因此隊列中沒有存儲一個元素鲤竹。(多線程打印出0101010101)
練習(xí)下隊列,看有毛用
public class BlockQueueTest {
private static final int QUEUESIZE= 1;
private ArrayBlockingQueue<Integer> integers;
@Test
public void BlockQueueTest(){
integers = new ArrayBlockingQueue<>(QUEUESIZE);
Consumer consumer = new Consumer();
Producer producer = new Producer();
consumer.start();
producer.start();
}
class Consumer extends Thread{
@Override
public void run() {
super.run();
while (true)
{
try {
Integer take = integers.take();
System.out.println("消費元素:"+take);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Producer extends Thread{
@Override
public void run() {
super.run();
while (true)
{
try {
integers.put(1);
System.out.println("生產(chǎn)元素int 1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
打印010101 順序一個沒亂
package com.system.bhouse.bhouse.Queue;
import org.junit.Test;
import java.util.concurrent.SynchronousQueue;
/**
* Created by wz on 2018-11-11.
*/
public class SynchronousQueueTest {
private static final int QUEUESIZE= 1;
private SynchronousQueue<Integer> integers;
private volatile boolean isConsumer = false;
@Test
public void BlockQueueTest(){
integers = new SynchronousQueue<>();
Consumer consumer = new Consumer();
Producer producer = new Producer();
consumer.start();
producer.start();
}
class Consumer extends Thread{
@Override
public void run() {
super.run();
synchronized (SynchronousQueueTest.this) {
while (true)
{
if (isConsumer) {
System.out.println("消費元素:" + 1);
isConsumer = !isConsumer;
SynchronousQueueTest.this.notify();
}else {
try {
SynchronousQueueTest.this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
class Producer extends Thread{
@Override
public void run() {
super.run();
synchronized (SynchronousQueueTest.this) {
while (true)
{
if (!isConsumer) {
System.out.println("生產(chǎn)元素int 0");
isConsumer = !isConsumer;
SynchronousQueueTest.this.notify();
}else {
try {
SynchronousQueueTest.this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
}
以為SynchronousQueue可以一對一通信配置昔榴,應(yīng)該是打印01010101的最佳配置辛藻,發(fā)現(xiàn)你無法控制隊列里的同步機制碘橘。代碼運行到那隊列操作就停止了。
package com.system.bhouse.bhouse.Queue;
import org.junit.Test;
import java.util.concurrent.SynchronousQueue;
/**
* Created by wz on 2018-11-11.
*/
public class SynchronousQueueTest2 {
private SynchronousQueue<Integer> integers;
private volatile boolean isConsumer = false;
@Test
public void BlockQueueTest(){
integers = new SynchronousQueue<>();
Consumer consumer = new Consumer();
Producer producer = new Producer();
consumer.start();
producer.start();
}
class Consumer extends Thread{
@Override
public void run() {
super.run();
while (true)
{
if (isConsumer) {
try {
Integer take = integers.take();
System.out.println(take);
isConsumer=!isConsumer;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
class Producer extends Thread {
@Override
public void run() {
super.run();
while (true) {
if (!isConsumer) {
try {
isConsumer = !isConsumer;
integers.put(1);
System.out.println(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
言歸正傳
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
來測試一下這個線程池特點
public class CacheTheadPool {
private static ThreadPoolExecutor executorService;
@Test
public void cacheTheadPool(){
// for (int i=0;i<100;i++) {
// newCachedThreadPool().execute(new AsyncCall("thread"+i));
// }
newCachedThreadPool().execute(new AsyncCall("thread"+1));
newCachedThreadPool().execute(new AsyncCall("thread"+2));
newCachedThreadPool().execute(new AsyncCall("thread"+3));
System.out.println("先開3個吱肌,按書上講會有3個是新建線程");
System.out.println("線程池核心:"+executorService.getCorePoolSize());
System.out.println("線程池數(shù)目:"+executorService.getPoolSize());
System.out.println("隊列任務(wù)數(shù)目:"+executorService.getQueue().size());
//讓上面的用完
// try {
// Thread.sleep(500);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
newCachedThreadPool().execute(new AsyncCall("thread"+4));
newCachedThreadPool().execute(new AsyncCall("thread"+5));
newCachedThreadPool().execute(new AsyncCall("thread"+6));
System.out.println("再開3個痘拆,按書上講會有3個是,看看是不是復(fù)用");
System.out.println("線程池核心:"+executorService.getCorePoolSize());
System.out.println("線程池數(shù)目:"+executorService.getPoolSize());
System.out.println("隊列任務(wù)數(shù)目:"+executorService.getQueue().size());
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
e.printStackTrace();
}
newCachedThreadPool().execute(new AsyncCallSleep("thread"+7));
newCachedThreadPool().execute(new AsyncCallSleep("thread"+8));
newCachedThreadPool().execute(new AsyncCallSleep("thread"+9));
System.out.println("再開3個,按書上講會有3個是,看看是不是新建");
System.out.println("線程池核心:"+executorService.getCorePoolSize());
System.out.println("線程池數(shù)目:"+executorService.getPoolSize());
System.out.println("隊列任務(wù)數(shù)目:"+executorService.getQueue().size());
}
/**
* 建立的都是 用戶線程 優(yōu)先級比較高.
* @return
*/
public synchronized ExecutorService newCachedThreadPool(){
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, 6, 5, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
final class AsyncCall extends NamedRunnable {
private AsyncCall(Object... arg){
super("OkHttp %s",arg);
}
@Override
protected void execute() {
String name = Thread.currentThread().getName();
System.out.println("當前處理的線程名是:"+name);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
final class AsyncCallSleep extends NamedRunnable {
private AsyncCallSleep(Object... arg){
super("OkHttpSleep %s",arg);
}
@Override
protected void execute() {
String name = Thread.currentThread().getName();
System.out.println("當前處理的隨眠線程名是:"+name);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = String.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
}