一预烙、 概括圖
二、程序示例
1. CountDownLatch
package test.java;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch start = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(10);
for(int i = 0; i < 10; i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
start.await();
System.out.println("Task is running...");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
end.countDown();
}
}
}).start();
}
long startTime = System.currentTimeMillis();
start.countDown();
end.await();
long endTime = System.currentTimeMillis();
System.out.println("All running time:"+ (endTime - startTime)+" minis");
}
}
2. Semaphore
package test.java;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Semaphore;
public class BoundList<T> {
private final List<T> list;
private final Semaphore semaphore;
public BoundList(int bound){
list = Collections.synchronizedList(new ArrayList<>());
semaphore = new Semaphore(bound);
}
public boolean add(T t) throws InterruptedException {
semaphore.acquire();
boolean isAdd = false;
try {
isAdd = list.add(t);
return isAdd;
}finally {
if(!isAdd){
semaphore.release();
}
}
}
public boolean remove(T t){
boolean isRemoved = list.remove(t);
if(isRemoved){
semaphore.release();
}
return isRemoved;
}
}
3. CyclicBarrier
package test.java;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
private final CyclicBarrier barrier;
private final worker[] workers;
public CyclicBarrierTest(int count){
this.barrier = new CyclicBarrier(count, new Runnable() {
@Override
public void run() {
System.out.println("All thread have got the barrier...");
}
});
this.workers = new worker[count];
for(int i = 0; i < count; i++){
workers[i] = new worker();
}
}
public void start(){
for(int i = 0; i < workers.length; i++){
new Thread(workers[i]).start();
}
}
private class worker implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+ " is running...");
try {
barrier.await();
} catch (InterruptedException e) {
return;
} catch (BrokenBarrierException e) {
return;
}
System.out.println(Thread.currentThread().getName() + " Out of barrier...");
}
}
}
4. Exchanger
package test.java;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Exchanger;
public class ExchangerTest {
private Exchanger<BlockingQueue> exchanger;
private BlockingQueue<String> reader;
private BlockingQueue<String> writer;
public ExchangerTest(Exchanger<BlockingQueue> exchanger,
BlockingQueue<String> reader,
BlockingQueue<String> writer) {
this.exchanger = exchanger;
this.reader = reader;
this.writer = writer;
}
public void exchangeQueue(){
new Thread(new Runnable() {
@Override
public void run() {
while(true){
if(reader.isEmpty()){
try {
reader = exchanger.exchange(reader);
System.out.println("The reader is " + reader.hashCode());
} catch (InterruptedException e) {
return;
}
}else{
reader.remove();
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
while (true){
if(writer.size() == 15){
try {
writer = exchanger.exchange(writer);
System.out.println("The writer is "+ writer.hashCode());
} catch (InterruptedException e) {
return;
}
}else{
writer.add("123");
}
}
}
}).start();
}
}
Exchanger類僅可用作兩個(gè)線程的信息交換道媚,當(dāng)超過兩個(gè)線程調(diào)用同一個(gè)exchanger對(duì)象時(shí)默伍,得到的結(jié)果是隨機(jī)的,exchanger對(duì)象僅關(guān)心其包含的兩個(gè)“格子”是否已被填充數(shù)據(jù)衰琐,當(dāng)兩個(gè)格子都填充數(shù)據(jù)完成時(shí)也糊,該對(duì)象就認(rèn)為線程之間已經(jīng)配對(duì)成功,然后開始執(zhí)行數(shù)據(jù)交換操作羡宙。而剩下的未得到配對(duì)的線程狸剃,則會(huì)被阻塞,永久等待狗热,直到與之配對(duì)的線程到達(dá)位置钞馁。