緩存一致性問題
計(jì)算機(jī)在執(zhí)行程序時(shí),每條指令都是在CPU中執(zhí)行的合溺,而執(zhí)行指令過程中昭躺,勢(shì)必涉及到數(shù)據(jù)的讀取和寫入则果。由于程序運(yùn)行過程中的臨時(shí)數(shù)據(jù)是存放在主存(物理內(nèi)存)當(dāng)中的幔翰,這時(shí)就存在一個(gè)問題,由于CPU執(zhí)行速度很快西壮,而從內(nèi)存讀取數(shù)據(jù)和向內(nèi)存寫入數(shù)據(jù)的過程跟CPU執(zhí)行指令的速度比起來要慢的多遗增,因此如果任何時(shí)候?qū)?shù)據(jù)的操作都要通過和內(nèi)存的交互來進(jìn)行,會(huì)大大降低指令執(zhí)行的速度款青。因此在CPU里面就有了高速緩存做修。
也就是,當(dāng)程序在運(yùn)行過程中抡草,會(huì)將運(yùn)算需要的數(shù)據(jù)從主存復(fù)制一份到CPU的高速緩存當(dāng)中饰及,那么CPU進(jìn)行計(jì)算時(shí)就可以直接從它的高速緩存讀取數(shù)據(jù)和向其中寫入數(shù)據(jù),當(dāng)運(yùn)算結(jié)束之后康震,再將高速緩存中的數(shù)據(jù)刷新到主存當(dāng)中燎含。
在多核CPU中,每條線程可能運(yùn)行于不同的CPU中腿短,因此每個(gè)線程運(yùn)行時(shí)有自己的高速緩存屏箍。被多個(gè)線程訪問的共享變量在多個(gè)CPU中都存在緩存,這里那么就可能存在緩存不一致的問題
所以就出現(xiàn)了緩存一致性協(xié)議橘忱。最出名的就是Intel 的MESI協(xié)議赴魁,MESI協(xié)議保證了每個(gè)緩存中使用的共享變量的副本是一致的。它核心的思想是:當(dāng)CPU寫數(shù)據(jù)時(shí)钝诚,如果發(fā)現(xiàn)操作的變量是共享變量颖御,即在其他CPU中也存在該變量的副本,會(huì)發(fā)出信號(hào)通知其他CPU將該變量的緩存行置為無效狀態(tài)凝颇,因此當(dāng)其他CPU需要讀取這個(gè)變量時(shí)潘拱,發(fā)現(xiàn)自己緩存中緩存該變量的緩存行是無效的,那么它就會(huì)從內(nèi)存重新讀取祈噪。
線程安全性
- 原子性
即一個(gè)操作或者多個(gè)操作 要么全部執(zhí)行并且執(zhí)行的過程不會(huì)被任何因素打斷泽铛,要么就都不執(zhí)行
思考:?int long double讀寫操作的原子性
思考:辑鲤?int i++的原子性
- 可見性
可見性是指當(dāng)多個(gè)線程訪問同一個(gè)變量時(shí)盔腔,一個(gè)線程修改了這個(gè)變量的值,其他線程能夠立即看得到修改的值
與緩存相關(guān),某線程改變了數(shù)據(jù)弛随,其他線程沒有立即看到修改后的值
- 有序性
即程序執(zhí)行的順序按照代碼的先后順序執(zhí)行
與指令重排序有關(guān)瓢喉。一般來說,處理器為了提高程序運(yùn)行效率舀透,可能會(huì)對(duì)輸入代碼進(jìn)行優(yōu)化栓票,它不保證程序中各個(gè)語句的執(zhí)行先后順序同代碼中的順序一致,但是它會(huì)保證程序最終執(zhí)行結(jié)果和代碼順序執(zhí)行的結(jié)果是一致的愕够。
Java內(nèi)存模型具備一些先天的“有序性”走贪,即不需要通過任何手段就能夠得到保證的有序性,這個(gè)通常也稱為 happens-before 原則惑芭。如果兩個(gè)操作的執(zhí)行次序無法從happens-before原則推導(dǎo)出來坠狡,那么它們就不能保證它們的有序性,虛擬機(jī)可以隨意地對(duì)它們進(jìn)行重排序遂跟。
- 程序次序規(guī)則:一個(gè)線程內(nèi)逃沿,按照代碼順序,書寫在前面的操作先行發(fā)生于書寫在后面的操作
- 鎖定規(guī)則:一個(gè)unLock操作先行發(fā)生于后面對(duì)同一個(gè)鎖額lock操作
- volatile變量規(guī)則:對(duì)一個(gè)變量的寫操作先行發(fā)生于后面對(duì)這個(gè)變量的讀操作
- 傳遞規(guī)則:如果操作A先行發(fā)生于操作B幻锁,而操作B又先行發(fā)生于操作C凯亮,則可以得出操作A先行發(fā)生于操作C
- 線程啟動(dòng)規(guī)則:Thread對(duì)象的start()方法先行發(fā)生于此線程的每個(gè)一個(gè)動(dòng)作
- 線程中斷規(guī)則:對(duì)線程interrupt()方法的調(diào)用先行發(fā)生于被中斷線程的代碼檢測(cè)到中斷事件的發(fā)生
- 線程終結(jié)規(guī)則:線程中所有的操作都先行發(fā)生于線程的終止檢測(cè),我們可以通過Thread.join()方法結(jié)束哄尔、Thread.isAlive()的返回值手段檢測(cè)到線程已經(jīng)終止執(zhí)行
- 對(duì)象終結(jié)規(guī)則:一個(gè)對(duì)象的初始化完成先行發(fā)生于他的finalize()方法的開始
當(dāng)多個(gè)線程訪問某個(gè)類時(shí)假消,不管運(yùn)行時(shí)環(huán)境采用何種調(diào)度方式或者這些線程將如何交替執(zhí)行,并且在主調(diào)代碼中不需要任何額外的同步或協(xié)同究飞,這個(gè)類都能表現(xiàn)出正確的行為置谦,那么就稱這個(gè)類是線程安全的。
對(duì)象的共享
加鎖與volatile
加鎖機(jī)制既可以確币诟担可見性媒峡,又可以確保原子性,而volatile變量只能確笨妫可見性谅阿。
發(fā)布與逸出
發(fā)布一個(gè)對(duì)象的意思是指,使對(duì)象能夠在當(dāng)前作用域之外的代碼中使用酬滤。
發(fā)布內(nèi)部狀態(tài)可能會(huì)破壞封裝性签餐,并使得程序難以維持不變性條件。
當(dāng)某個(gè)不應(yīng)該發(fā)布的對(duì)象被發(fā)布時(shí)盯串,這種情況就被稱之為逸出氯檐。
當(dāng)一個(gè)對(duì)象發(fā)布時(shí),在該對(duì)象的非私有域中引用的所有對(duì)象同樣會(huì)被發(fā)布体捏。一般來說冠摄,如果一個(gè)已經(jīng)發(fā)布的對(duì)象能夠通過非私有的變量引用和方法調(diào)用到達(dá)其他的對(duì)象糯崎,那么這些對(duì)象也都會(huì)被發(fā)布。
不要在構(gòu)造過程中使this引用逸出河泳。
線程封閉
如果僅在單線程內(nèi)訪問數(shù)據(jù)沃呢,就不需要同步。
- Ad-hoc線程封閉(脆弱)
- 棧封閉
- ThreadLocal類
不變性
滿足同步需求的另一種方法是使用不可變對(duì)象(Immutable Object)
不可變對(duì)象:
- 對(duì)象創(chuàng)建以后其狀態(tài)就不能修改
- 對(duì)象的所有域都是final類型
- 對(duì)象是正確創(chuàng)建的
安全發(fā)布
- 在靜態(tài)初始化函數(shù)中初始化一個(gè)對(duì)象引用
- 將對(duì)象的引用保存到volatile類型的域或者AtomicReferance對(duì)象中
- 將對(duì)象的引用保存到某個(gè)正確構(gòu)造對(duì)象的final類型域中
- 將對(duì)象的引用保存到一個(gè)由鎖保護(hù)的域中
線程安全庫的容器類:
HashTable拆挥、synchronizedMap薄霜、ConcurrentMap
Vector、CopyOnWriteArrayList纸兔、CopyOnWriteArraySet惰瓜、synchronizedList、synchronizedSet
BlockingQueue食拜、ConcurrentLinkedQueue
事實(shí)不可變對(duì)象(Effectively Immutable Object):如果對(duì)象從技術(shù)上看是可變的鸵熟,但其狀態(tài)在發(fā)布后不會(huì)再改變副编,那么把這種對(duì)象稱為事實(shí)不可變對(duì)象负甸。
- 不可變對(duì)象可以通過任意機(jī)制發(fā)布
- 事實(shí)不可變對(duì)象必須通過安全方式發(fā)布
- 可變對(duì)象必須通過安全方式來發(fā)布,并且必須是線程安全的或者由某個(gè)鎖保護(hù)起來
安全地共享對(duì)象
- 線程封閉
- 只讀共享
- 線程安全共享
- 保護(hù)共享
對(duì)象的組合
設(shè)計(jì)線程安全的類
在設(shè)計(jì)線程安全類的過程中痹届,需要包含以下三個(gè)基本要素:
- 找出構(gòu)成對(duì)象狀態(tài)的所有變量
- 找出約束狀態(tài)變量的不變性條件
- 建立對(duì)象狀態(tài)的并發(fā)訪問管理策略
Java監(jiān)視器模式:對(duì)于任何一種鎖對(duì)象呻待,自始至終都使用該鎖對(duì)象,都可以用來保護(hù)對(duì)象的狀態(tài)
@NotThreadSafe
public class MutablePoint {
public int x,y;
public MutablePoint() {
x = 0;
y = 0;
}
public MutablePoint(MutablePoint p) {
this.x = p.x;
this.y = p.y;
}
}
@ThreadSafe
public class MonitorVehicleTracker {
@GuardedBy("this")
private final Map<String,MutablePoint> locations;
public MonitorVehicleTracker(Map<String, MutablePoint> locations) {
this.locations = deepCopy(locations);
}
public synchronized Map<String,MutablePoint> getLocations(){
return deepCopy(locations);
}
public synchronized MutablePoint getLocation(String id){
MutablePoint loc = locations.get(id);
return loc == null ? null : new MutablePoint(loc);
}
public synchronized void setLocation(String id,int x,int y){
MutablePoint loc = locations.get(id);
if(loc == null){
throw new IllegalArgumentException("No such ID:" + id);
}
loc.x = x;
loc.y = y;
}
private static Map<String,MutablePoint> deepCopy(Map<String,MutablePoint> m){
Map<String,MutablePoint> result = new HashMap<>();
for(String id:m.keySet()){
result.put(id,new MutablePoint(m.get(id)));
}
return Collections.unmodifiableMap(result);
}
}
線程安全性的委托
如果一個(gè)類是由多個(gè)獨(dú)立且線程安全的狀態(tài)變量組成队腐,并且在所有的操作中都不包含無效狀態(tài)轉(zhuǎn)換蚕捉,那么可以將線程安全性委托給底層的狀態(tài)變量。
public class VisualComponent {
private final List<KeyListener> keyListeners = new CopyOnWriteArrayList<>();
private final List<MouseListener> mouseListeners = new CopyOnWriteArrayList<>();
public void addKeyListener(KeyListener listener){
keyListeners.add(listener);
}
public void addMouseListener(MouseListener listener){
mouseListeners.add(listener);
}
public void removeKeyListener(KeyListener listener){
keyListeners.remove(listener);
}
public void removeMouseListener(MouseListener listener){
mouseListeners.remove(listener);
}
}
Java里的基礎(chǔ)構(gòu)建模塊
同步容器類
Vector HashTable Collections.synchronizedXxx工廠方法
- 同步容器的線程安全問題
public static <T> T getLast(Vector<T> vector){
int lastIndex = vector.size() - 1;
return vector.get(lastIndex);
}
public static <T> T deleteLast(Vector<T> vector){
int lastIndex = vector.size() - 1;
return vector.remove(lastIndex);
}
在多線程中上述方法是不安全的柴淘,雖然Vector是安全的容器迫淹,但size()方法和get()或者remove()同時(shí)使用,存在“先檢查再運(yùn)行”操作为严,就會(huì)拋出異常(ArrayIndexOutOfBoundsException),所以需要在客戶端加鎖
public static <T> T getLast(Vector<T> vector) {
synchronized (vector) {
int lastIndex = vector.size() - 1;
return vector.get(lastIndex);
}
}
public static <T> T deleteLast(Vector<T> vector) {
synchronized (vector) {
int lastIndex = vector.size() - 1;
return vector.remove(lastIndex);
}
}
- 迭代器與ConcurrentModificationException
在設(shè)計(jì)同步容器的迭代器時(shí)并沒有考慮并發(fā)修改的問題敛熬,它們表現(xiàn)出的行為是及時(shí)失敗(fail-fast)第股。
List<Widget> widgeList = Collections.synchronizedList(new ArrayList<Widget>());
//可能拋出ConcurrentModificationException
for(Widget w:widgeList){
doSomeThing(w);
}
解決方法有兩種:一是加鎖应民,但可能會(huì)產(chǎn)生死鎖;二是克隆夕吻,這里的性能開銷也很大诲锹。
- 隱藏迭代器
容器的toString()、hashCode()涉馅、equals()归园、containsAll()、removeAll()稚矿、retainAll()以及把容器作為參數(shù)的構(gòu)造方法庸诱,都會(huì)對(duì)容器進(jìn)行迭代悬钳。這些操作都有可能拋出ConcurrentModificationException
并發(fā)容器
- ConcurrentHashMap 替代同步Map
使用分段鎖
迭代器具有弱一致性(可以容忍并發(fā)修改,但并不能保證在迭代器被構(gòu)造后將修改操作反映給容器偶翅,所以size()和isEmpty()的語義被略微減弱了默勾。 - CopyOnWriteArrayList/CopyOnWriteArraySet
"寫入時(shí)復(fù)制"容器,每次修改時(shí)聚谁,都會(huì)創(chuàng)建并重新發(fā)布一個(gè)新的容器副本
/**
* Replaces the element at the specified position in this list with the
* specified element.
*
* @throws IndexOutOfBoundsException {@inheritDoc}
*/
public E set(int index, E element) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
E oldValue = get(elements, index);
if (oldValue != element) {
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len);
newElements[index] = element;
setArray(newElements);
} else {
// Not quite a no-op; ensures volatile write semantics
setArray(elements);
}
return oldValue;
} finally {
lock.unlock();
}
}
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return <tt>true</tt> (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
容器規(guī)模較大時(shí)母剥,底層復(fù)制需要一定的開銷。僅當(dāng)?shù)僮鬟h(yuǎn)遠(yuǎn)多于修改操作時(shí)形导,才使用"寫入時(shí)復(fù)制"容器环疼。
- ConcurrentSkipListMap 替代同步的SortedMap
- ConcurrentSkipListSet 替代同步的SortedSet
- Queue ConcurrentLinkedQueue(先進(jìn)先出) PriorityQueue(優(yōu)先隊(duì)列)
- BlockingQueue 阻塞隊(duì)列 LinkedBlockingQueue/ArrayBlockingQueue(FIFO) PriorityBlockingQueue(優(yōu)先隊(duì)列) SynchronousQueue(維護(hù)一組線程,不維護(hù)存儲(chǔ)空間朵耕,直接交付)
put()和take()是可阻塞的
支持生產(chǎn)者消費(fèi)者模式
支持有界或者無界隊(duì)列 - Deque BlockDeque 雙端隊(duì)列和工作密取
每個(gè)消費(fèi)者都有自己的雙端隊(duì)列炫隶,消費(fèi)完自己的任務(wù),就去其他隊(duì)列的末尾秘密的獲取工作
阻塞方法中斷方法
某方法拋出InterruptedException時(shí)阎曹,表示該方法是一個(gè)阻塞方法伪阶。
捕獲異常,恢復(fù)中斷
try {
processTask(fileQueue.take())
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
同步工具類
- CountDownLatch
是一個(gè)或多個(gè)線程等待一組事件發(fā)生
public class TestHarness {
public static long timeTasks(int nThreads,final Runnable task) throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for(int i = 0;i < nThreads;i++){
// Runnable t = new Runnable() {
// @Override
// public void run() {
// try {
// startGate.await();
// try {
// task.run();
// } finally {
// endGate.countDown();
// }
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// };
Thread t = new Thread(){
@Override
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
public static void main(String[] args) throws InterruptedException {
Runnable a = new Runnable() {
@Override
public void run() {
int sum = 0;
for(int i = 0; i < 1000000; i++){
sum += i;
}
System.out.println(sum);
}
};
System.out.println( timeTasks(100,a) );
}
}
- FutureTask
異步獲取執(zhí)行的結(jié)果 - 信號(hào)量 Semaphore
控制同時(shí)訪問某個(gè)特定資源的操作數(shù)量或者同時(shí)執(zhí)行某個(gè)制定操作的數(shù)量
實(shí)現(xiàn)資源池
對(duì)容器施加邊界 - 柵欄 CyclicBarrier Exchanger
簡(jiǎn)單的可伸縮性緩存
public class Momoizerl<A,V> implements Computable<A,V> {
private final ConcurrentMap<A,Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A,V> c;
public Momoizerl(Computable<A, V> c) {
this.c = c;
}
@Override
public V compute(final A arg) throws InterruptedException {
while (true){
Future<V> f = cache.get(arg);
if(f == null){
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(arg);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = cache.putIfAbsent(arg,ft);
if(f == null){
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
基礎(chǔ)小結(jié)
- 可變狀態(tài)越少处嫌,越容易確保線程安全性
- 盡量將域聲明為final類型
- 不可變對(duì)象一定是線程安全的
- 使用所來保護(hù)可變變量
- 當(dāng)保護(hù)同一個(gè)不變性條件中的所有變量時(shí)栅贴,使用同一個(gè)鎖
- 復(fù)合操作,使用鎖
- 安全的適當(dāng)?shù)氖褂貌l(fā)容器和同步工具