隊列是JAVA開發(fā)過程中一種非常常見的數據結構,尤其是需要再使用生產者-消費者的業(yè)務模型中颗味,Queue常常作為多線程執(zhí)行任務的數據交界點直奋,從而保證生產者產生的數據能夠依次被消費诗越。
阻塞隊列的選擇
阻塞隊列的實現包括ArrayBlockingQueue與LinkedBlockingQueue厘熟。相同點不做贅述,區(qū)別有以下幾點:
1.初始化時嘀趟,ArrayBlockingQueue必須指定隊列最大容量脐区,LinkedBlockingQueue不強制指定,若不指定她按,默認Interger.Max為最大容量牛隅。
2.ArrayBlockingQueue內部數據結構是數組:Element[],通過putIndex和takeIndex下標的循環(huán)移動控制隊首和隊尾酌泰;LinkedBlockingQueue內部結構是鏈表:Node<Element>媒佣,通過head 和 tail節(jié)點控制隊首和隊尾。
3.ArrayBlockingQueue生產與消費之間共用一把鎖陵刹,而LinkedBlockingQueue生產與消費時用不同的鎖競爭默伍。
對于阻塞隊列的選擇,一方面考慮吞吐性能,另一方面考慮內存占用也糊。
我們可以看到上面說的第三點炼蹦,可以確定的是,在多生產者與多消費者的情況下狸剃,LinkedBlockingQueue的吞吐性能肯定是要更高的掐隐,而且ArrayBlockingQueue在初始化時直接就申請了一片連續(xù)的內存空間。所以在實際生產使用環(huán)境中钞馁,沒有特殊限制考慮瑟枫,我們在使用阻塞隊列時往往用LinkedBlockingQueue。
那什么場景下我們會偏向于使用ArrayBlockingQueue呢指攒?
- 生產者與消費者之間沒有太大競爭,傾向于單消費者僻焚,單生產者允悦,且兩者之間沖突較小,這種情況下數組尋址是明顯要比鏈表去指向next的操作要更快的
- 基本可以確定隊列大小虑啤,且隊列大小穩(wěn)定在一定的數量隙弛,這個時候數組占用內存是比鏈表小的
阻塞隊列與非阻塞隊列的選擇
首先,ConcurrentLinkedQueue相對阻塞隊列來說狞山,采用的是CAS無鎖操作全闷,沒有take和put方法,主用poll與offer萍启,無界总珠。有人說,既然此隊列內部進隊和出隊操作采用的是無鎖勘纯,那性能肯定比有鎖的BlockingQueue強局服,那BlockingQueue還有啥用武之地,其實不然驳遵,有些時候我們就需要線程進入阻塞狀態(tài)而非不斷自旋消耗CPU淫奔,我們可以歸類以下場景:
- 數據入隊速度過快,出隊速度過慢堤结,這個時候ConcurrentLinkedQueue如果不借助其他限制手段唆迁,隨著時間的推移,JVM必然會進行頻繁的FULL GC ,嚴重的情況下甚至會發(fā)生OOM竞穷。使用BlockingQueue可以更好的控制內存的狀況唐责。
- 數據入隊速度過慢,出隊速度過快来庭,這個時候消費者線程如果一定想要拿到數據而不進行阻塞妒蔚,將進入大量時間的自旋狀態(tài),白白浪費CPU資源。
- 入隊與出隊速度相仿肴盏。
這時候要考慮速度科盛,有多少個線程在同時做操作,線程操作的頻率如何菜皂?
在大部分場景下贞绵,ConcurrentLinkedQueue的性能是要比BlockingQueue要好的,注意是大部分恍飘,如果線程之間的競爭足夠又高又快榨崩,CAS操作的CPU消耗以及線程操作的成功率是極低的,這個時候是會反而不如用鎖競爭控制效率來的高章母。
我們寫了個測試類可以大致看下觀感下母蛛,在同樣的環(huán)境下,消費者與生產者在不斷對隊列進行操作乳怎,然后不斷增加消費者與生產者內部線程的數量彩郊。
package algorithm;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.LongAdder;
public class TestBlockingQueue {
BlockingQueue<Data> linkedBlockingQueue = new LinkedBlockingQueue<>(100);
// BlockingQueue<Data> linkedBlockingQueue = new ArrayBlockingQueue<>(100);
LongAdder longAdder = new LongAdder();
Producer producer;
Consumer consumer;
static class Data {
String msg;
public Data(String msg) {
this.msg = msg;
}
}
TestBlockingQueue(int size) {
producer = new Producer(size);
consumer = new Consumer(size);
}
public void startTest() {
producer.startProduce();
consumer.startConsume();
}
public long stopTestAndReturn() {
producer.stopProduce();
consumer.stopConsume();
return longAdder.longValue();
}
class Producer{
ExecutorService service;
List<ProduceWorker> workers = new LinkedList<>();
Producer(int concurrentNum) {
service = Executors.newFixedThreadPool(concurrentNum);
for(int i = 0; i < concurrentNum; i++) {
workers.add(new ProduceWorker());
}
}
public void startProduce() {
workers.forEach(worker -> service.execute(worker));
}
public void stopProduce() {
service.shutdownNow();
}
class ProduceWorker implements Runnable {
@Override
public void run() {
for(;;) {
if(!Thread.currentThread().isInterrupted()) {
try {
linkedBlockingQueue.put(new Data(randomString(10)));
} catch (InterruptedException e) {
// e.printStackTrace();
}
} else {
break;
}
}
}
}
}
class Consumer{
ExecutorService service;
List<ConsumeWork> workers = new LinkedList<>();
Consumer(int concurrentNum) {
service = Executors.newFixedThreadPool(concurrentNum);
for(int i = 0; i < concurrentNum; i++) {
workers.add(new ConsumeWork());
}
}
class ConsumeWork implements Runnable {
@Override
public void run() {
for(;;) {
if(!Thread.currentThread().isInterrupted()) {
try {
Data data = linkedBlockingQueue.take();
if (null != data) {
longAdder.increment();
}
} catch (InterruptedException e) {
// e.printStackTrace();
}
} else {
break;
}
}
}
}
public void startConsume() {
workers.forEach(worker -> service.execute(worker));
}
public void stopConsume() {
service.shutdownNow();
}
}
public static void main(String[] args) {
long timeStart = System.currentTimeMillis();
System.out.println(timeStart);
TestBlockingQueue testQueue = new TestBlockingQueue(4);
testQueue.startTest();
try {
Thread.currentThread().sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
long result = testQueue.stopTestAndReturn();
System.out.println("最終結果為; " + result);
long timeEnd = System.currentTimeMillis();
System.out.println(timeEnd);
//計算每s吞吐
double average = (result / (timeEnd -timeStart)) * 1000;
System.out.println("1每秒吞吐: " + average);
}
static String randomString(int strLength) {
Random rnd = ThreadLocalRandom.current();
StringBuilder ret = new StringBuilder();
for (int i = 0; i < strLength; i++) {
boolean isChar = (rnd.nextInt(2) % 2 == 0);// 輸出字母還是數字
if (isChar) { // 字符串
int choice = rnd.nextInt(2) % 2 == 0 ? 65 : 97; // 取得大寫字母還是小寫字母
ret.append((char) (choice + rnd.nextInt(26)));
} else { // 數字
ret.append(Integer.toString(rnd.nextInt(10)));
}
}
return ret.toString();
}
}
ConcurrentQueue無法用put和take方法,需要用poll和offer,其他代碼一致蚪缀,不同的地方在于
Data data = concurrentLinkedQueue.poll();
if (null != data) {
longAdder.increment();
}
concurrentLinkedQueue.offer(new Data(randomString(10)));
可以先試試同時有4個生產者和消費者在不斷進行隊列操作秫逝,然后再試試1000個生產者與消費者在不斷進行隊列操作。
![ConcurrentQueue-4](https://upload-images.jianshu.io/upload_images/14745967-9f2a71o z66c7c3f219.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
這個情況下可以看出無鎖操作是遠高于有鎖操作的
負載過高的情況下询枚,CAS效率低下违帆,反而不如有鎖操作