同步容器用于解決并發(fā)情況下的容器線程安全問題凰狞,可以給多線程環(huán)境準(zhǔn)備一個線程安全的容器對象。
線程安全的容器對象: Vector, Hashtable框都。它們是使用synchronized方法實(shí)現(xiàn)的允坚。
concurrent包中的同步容器极谊,大多數(shù)是使用系統(tǒng)底層技術(shù)實(shí)現(xiàn)的線程安全。Java8中使用的是CAS碘赖。
1.1 ConcurrentHashMap/ConcurrentHashSet
底層哈希實(shí)現(xiàn)的同步Map(Set)驾荣。效率高,線程安全普泡。使用系統(tǒng)底層技術(shù)實(shí)現(xiàn)線程安全播掷。
量級較synchronized低。key和value不能為null撼班。
public class Test1_ConcurrentMap {
public static void main(String[] args) {
final Map<String, String> map = new Hashtable<>();// 效率低
// final Map<String, String> map = new ConcurrentHashMap<>();效率高于Hashtable
// final Map<String, String> map = new ConcurrentSkipListMap<>();元素有序歧匈,效率最低
final Random r = new Random();
Thread[] array = new Thread[100];
final CountDownLatch latch = new CountDownLatch(array.length);// 門閂
long begin = System.currentTimeMillis();
for (int i = 0; i < array.length; i++) {
array[i] = new Thread(new Runnable() {
public void run() {
for (int j = 0; j < 10000; j++) {
map.put("key" + r.nextInt(100000), "value" + r.nextInt(100000));
}
latch.countDown();
}
});
}
for (Thread t : array) {
t.start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("執(zhí)行時間為 : " + (end - begin) + "毫秒!");
}
}
1.2 ConcurrentSkipListMap/ConcurrentSkipListSet
底層跳表(SkipList)實(shí)現(xiàn)的同步Map(Set)砰嘁。有序件炉,效率比ConcurrentHashMap低。
2 CopyOnWriteArrayList
寫入數(shù)據(jù)時會復(fù)制集合橘洞。
寫入效率低捌袜,讀取效率高。
每次寫入數(shù)據(jù)炸枣,都會創(chuàng)建一個新的底層數(shù)組虏等,很浪費(fèi)空間,用于寫入少讀取多的情況适肠。
public class Test2_CopyOnWriteList {
public static void main(String[] args) {
// final List<String> list = new ArrayList<>();線程不安全
// final List<String> list = new Vector<>();
final List<String> list = new CopyOnWriteArrayList<>();
final Random r = new Random();
Thread[] array = new Thread[100];
final CountDownLatch latch = new CountDownLatch(array.length);
long begin = System.currentTimeMillis();
for (int i = 0; i < array.length; i++) {
array[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 1000; j++) {
list.add("value" + r.nextInt(100000));
}
latch.countDown();
}
});
}
for (Thread t : array) {
t.start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("執(zhí)行時間為 : " + (end - begin) + "毫秒霍衫!");
System.out.println("List.size() : " + list.size());
}
}
3.1 ConcurrentLinkedQueue
鏈表實(shí)現(xiàn)的同步隊(duì)列。
public class Test3_ConcurrentLinkedQueue {
public static void main(String[] args) {
Queue<String> queue = new ConcurrentLinkedQueue<>();
for (int i = 0; i < 10; i++) {
queue.offer("value" + i);// 添加數(shù)據(jù)
}
System.out.println(queue);
System.out.println(queue.size());
// peek() -> 查看queue中的首數(shù)據(jù)
System.out.println(queue.peek());
System.out.println(queue.size());
// poll() -> 刪除和獲取queue中的首數(shù)據(jù)
System.out.println(queue.poll());
System.out.println(queue.size());
}
}
3.2 LinkedBlockingQueue
阻塞隊(duì)列
執(zhí)行put方法時侯养, 隊(duì)列容量不足敦跌,自動阻塞。
執(zhí)行take方法逛揩, 隊(duì)列容量為0柠傍,自動阻塞麸俘。
示例:
public class Test4_LinkedBlockingQueue {
final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
final Random r = new Random();
public static void main(String[] args) {
final Test4_LinkedBlockingQueue t = new Test4_LinkedBlockingQueue();
// 一個生產(chǎn)者
new Thread(new Runnable() {
public void run() {
while (true) {
try {
t.queue.put("value" + t.r.nextInt(1000));
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "producer").start();
// 三個消費(fèi)者
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
public void run() {
while (true) {
try {
System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "consumer" + i).start();
}
}
}
3.3 ArrayBlockingQueue
數(shù)組實(shí)現(xiàn)的有界隊(duì)列。
執(zhí)行add方法時惧笛,數(shù)組容量不足从媚,拋出異常。
執(zhí)行put方法時患整,數(shù)組容量不足拜效,阻塞等待。
執(zhí)行offer方法時并级,
(1)單參數(shù)offer方法拂檩,不阻塞。數(shù)組容量不足時嘲碧,返回false稻励,放棄要新增的數(shù)據(jù)。
(2)三參數(shù)offer方法(offer(value,times,timeunit))愈涩,數(shù)組容量不足時望抽,阻塞times時長(單位為timeunit)。如果在阻塞時長內(nèi)履婉,數(shù)組容量出現(xiàn)空閑煤篙,新增數(shù)據(jù)返回true。如果阻塞時長范圍內(nèi)毁腿,無容量空閑辑奈,放棄要新增的數(shù)據(jù),返回false已烤。
它的其它方法和LinkedBlockingQueue類似鸠窗。
示例:
public class Test5_ArrayBlockingQueue {
final BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
public static void main(String[] args) {
final Test5_ArrayBlockingQueue t = new Test5_ArrayBlockingQueue();
for (int i = 0; i < 5; i++) {
// System.out.println("add method : " + t.queue.add("value" + i));執(zhí)行add方法時,數(shù)組容量不足胯究,拋出異常
/*try {
t.queue.put("put" + i);執(zhí)行put方法時稍计,數(shù)組容量不足,阻塞等待裕循。
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("put method : " + i);*/
//System.out.println("offer method : " + t.queue.offer("value"+i));單參數(shù)offer方法臣嚣,不阻塞。數(shù)組容量不足時剥哑,返回false硅则,放棄要新增的數(shù)據(jù)。
try {
// 三參數(shù)offer方法株婴,數(shù)組容量不足時抢埋,阻塞times時長(單位為timeunit)。
// 如果在阻塞時長內(nèi)督暂,數(shù)組容量出現(xiàn)空閑揪垄,新增數(shù)據(jù)返回true。如果阻塞時長范圍內(nèi)逻翁,無容量空閑饥努,放棄要新增的數(shù)據(jù),返回false八回。
System.out.println("offer method : " + t.queue.offer("value" + i, 1, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(t.queue);
}
}
3.4 DelayQueue
延時隊(duì)列酷愧,無界。
根據(jù)比較機(jī)制缠诅,實(shí)現(xiàn)自定義處理順序的隊(duì)列溶浴。常用于定時任務(wù),例如:定時關(guān)機(jī)管引。
隊(duì)列中存放的元素必須實(shí)現(xiàn)Delay接口士败。
示例:
public class Test6_DelayQueue {
static BlockingQueue<MyTask_06> queue = new DelayQueue<>();
public static void main(String[] args) throws InterruptedException {
long value = System.currentTimeMillis();
MyTask_06 task1 = new MyTask_06(value + 2000);
MyTask_06 task2 = new MyTask_06(value + 1000);
MyTask_06 task3 = new MyTask_06(value + 3000);
MyTask_06 task4 = new MyTask_06(value + 2500);
MyTask_06 task5 = new MyTask_06(value + 1500);
queue.put(task1);
queue.put(task2);
queue.put(task3);
queue.put(task4);
queue.put(task5);
System.out.println(queue);
System.out.println("value" + value);
for (int i = 0; i < 5; i++) {
System.out.println(queue.take());
}
}
}
class MyTask_06 implements Delayed {
private long compareValue;
public MyTask_06(long compareValue) {
this.compareValue = compareValue;
}
// 比較大小,實(shí)現(xiàn)自動升序褥伴,建議和getDelay方法配合使用
// 如果在DelayQueue中需要按時間完成計劃任務(wù)谅将,必須配合getDelay方法完成。
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
// 獲取計劃時長的方法重慢。 根據(jù)參數(shù)TimeUnit來決定饥臂,如何返回結(jié)果值。
public long getDelay(TimeUnit unit) {
return unit.convert(compareValue - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
public String toString() {
return "Task compare value is : " + this.compareValue;
}
}
3.5 LinkedTransferQueue
轉(zhuǎn)移隊(duì)列
執(zhí)行add方法時似踱,隊(duì)列保存數(shù)據(jù)隅熙,不會阻塞等待。
執(zhí)行transfer方法(TransferQueue接口的特有方法)時核芽,沒有消費(fèi)者阻塞(take方法的調(diào)用者)囚戚,發(fā)生阻塞;有消費(fèi)者阻塞狞洋,數(shù)據(jù)直接給到消費(fèi)者弯淘。
一般用于處理即時消息。
示例:
public class Test7_LinkedTransferQueue {
TransferQueue<String> queue = new LinkedTransferQueue<>();
public static void main(String[] args) {
final Test7_LinkedTransferQueue t = new Test7_LinkedTransferQueue();
// 消費(fèi)者阻塞吉懊,直接拿走transfer方法的數(shù)據(jù)
/*new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " thread begin " );
System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "output thread").start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
t.queue.transfer("test string");
} catch (InterruptedException e) {
e.printStackTrace();
}*/
// transfer方法阻塞庐橙,消費(fèi)者消費(fèi)時直接拿走數(shù)據(jù)
new Thread(new Runnable() {
public void run() {
try {
t.queue.transfer("test string");// 阻塞,直至消費(fèi)者來拿數(shù)據(jù)
// t.queue.add("test string");// 不阻塞借嗽,向TransferQueue中添加數(shù)據(jù)
System.out.println("ok");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " thread begin " );
System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "output thread").start();
}
}
3.6 SynchronusQueue
同步隊(duì)列态鳖,是一個容量為0的隊(duì)列,是一個特殊的TransferQueue恶导。
必須先有消費(fèi)線程阻塞浆竭,才能使用的隊(duì)列。
add方法,無阻塞能力邦泄。若沒有消費(fèi)線程阻塞等待數(shù)據(jù)删窒,則拋出異常。
put方法顺囊,有阻塞能力肌索。若沒有消費(fèi)線程阻塞等待數(shù)據(jù),則阻塞特碳。
示例:
public class Test8_SynchronusQueue {
BlockingQueue<String> queue = new SynchronousQueue<>();
public static void main(String[] args) {
final Test8_SynchronusQueue t = new Test8_SynchronusQueue();
new Thread(new Runnable() {
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " thread begin ");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " - " + t.queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "output thread").start();
// try {
// TimeUnit.SECONDS.sleep(3);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// t.queue.add("test add");
try {
t.queue.put("test put");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " queue size : " + t.queue.size());// 輸出main queue size : 0
}
}