volatile
是java虛擬機(jī)提供的輕量級的同步機(jī)制
- 保證可見性
- 不保證原子性
- 禁止指令重排
JMM
(java menory model java內(nèi)存模型持灰,是一種抽象概念并不真實(shí)存在嵌洼,它描述的是一組規(guī)范或規(guī)則,通過這組規(guī)范定義了程序中各個(gè)變量(包括實(shí)例字段,靜態(tài)字段和構(gòu)成數(shù)組對象的元素)的訪問方式)
- 線程解鎖前,必須把共享變量的值刷新回主內(nèi)存
- 線程加鎖前必須讀取主內(nèi)存的最新值到自己的工作內(nèi)存
- 加鎖解鎖是同一把鎖
CAS是什么
cas : 比較并更新
如果期望值與物理內(nèi)存的一樣則修改。
返回boolean
automicInteger.compareAndSet(5,1024);
CAS底層原理
Unsafe類
do while
比較當(dāng)前工作內(nèi)存中的值和主內(nèi)存中的值花盐,如果相同則執(zhí)行規(guī)定操作,否則繼續(xù)比較直到主內(nèi)存和工作內(nèi)存中的值一致為止圆米。
CAS缺點(diǎn)
如果CAS失敗卒暂,會一直進(jìn)行嘗試。如果CAS長時(shí)間一直不成功娄帖,可能會給CPU帶來了很大的開銷也祠。
ABA問題
貍貓換太子
線程t1把變量值由A該成B ,再由B改為了A 近速,對于t2線程來看以為沒有發(fā)生變化
解決ABA問題
新增版本號或時(shí)間戳
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100,1);
atomicStampedReference.compareAndSet(100,101,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
并發(fā)異常
java.util.ConcurrentModificationException 并發(fā)修改異常
使用線程安全的數(shù)據(jù)結(jié)構(gòu)
List
- new Vector();
- Collections.synchronizedList(new ArrayList());
- new CopyOnWriteArrayList();底層使用了ReentrantLock鎖诈嘿,在JUC包下
Set
- CopyOnWriteArraySet
Hashset底層是Hashmap
Map
- ConcurrentHashMap
鎖
- 公平鎖(按照先后)和非公平鎖(可插隊(duì))
- 可重入鎖(遞歸鎖)防止死鎖(synchronized和reentrantLock都是可重入鎖)
外層方法獲取了鎖,進(jìn)入內(nèi)層的方法會自動獲取鎖,是使用同一把鎖 - 自旋鎖
是指嘗試獲取鎖的線程不會立即阻塞削葱,而是采用循環(huán)的方式去嘗試獲取鎖奖亚,這樣的好處是減少線程上下切換的消耗,缺點(diǎn)是循環(huán)會消耗CPU
優(yōu)點(diǎn):不用阻塞
缺點(diǎn):耗費(fèi)性能
package com.dongge.volatiledemo;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class SpinLockDemo {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"\t come in ");
while(!atomicReference.compareAndSet(null,thread)){
}
}
public void myUnLock(){
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread,null);
System.out.println(Thread.currentThread().getName()+"\t invoked myUnLock");
}
public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();
new Thread(()->{
spinLockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
spinLockDemo.myUnLock();
},"AA").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
spinLockDemo.myLock();
spinLockDemo.myUnLock();
},"BB").start();
}
}
- 獨(dú)占鎖(寫鎖)
ReentrantLock和Synchronized都是獨(dú)占鎖
寫操作:原子+獨(dú)占 整個(gè)過程必須是一個(gè)完整的統(tǒng)一體析砸,中間不許被分割昔字,被打斷 - 共享鎖(讀鎖)
是指該鎖可以被多個(gè)線程所持有
ReentrantReadWriteLock其讀鎖是共享鎖,其寫鎖是獨(dú)占鎖
package com.dongge.volatiledemo;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public void put(String key, Object value) {
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t正在寫入:" + key);
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "\t正在寫入完成:" + key);
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.writeLock().unlock();
}
}
public void get(String key) {
rwLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t正在讀取:" + key);
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.get(key);
System.out.println(Thread.currentThread().getName() + "\t讀取完成:" + key);
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
}
public void clearMap(){
map.clear();
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i < 5; i++) {
final int tempInt = i;
new Thread(() -> {
myCache.put(tempInt + "", tempInt + "");
}, String.valueOf(i)).start();
}
for (int i = 0; i < 5; i++) {
final int tempInt = i;
new Thread(() -> {
myCache.get(tempInt + "");
}, String.valueOf(i)).start();
}
}
}
CountDownLatch
規(guī)定的線程都完成后首繁,做減法 countDownLatch.countDown();
package com.dongge.volatiledemo;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t上完自習(xí)離開教室");
countDownLatch.countDown();
},String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"\t班長最后關(guān)門走人");
}
}
CyclicBarrier 做加法
都到齊了才開始
package com.dongge.volatiledemo;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召喚神龍");
});
for (int i = 1; i <= 7; i++) {
final int tempInt = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t收集到第"+tempInt+"龍珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
Semaphore信號量
主要用于兩個(gè)目的作郭,一個(gè)是用于多個(gè)共享資源的互斥使用,另一個(gè)用于并發(fā)線程數(shù)的控制弦疮。
package com.dongge.volatiledemo;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);//模擬3個(gè)停車位
for (int i = 1; i <= 6; i++) {//模擬6部汽車
new Thread(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"\t搶到車位");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"\t停車3秒后離開車位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
阻塞隊(duì)列BlockingQueue
當(dāng)阻塞隊(duì)列是空時(shí)夹攒,從隊(duì)列中獲取元素的操作將會被阻塞。
當(dāng)阻塞隊(duì)列是滿時(shí)胁塞,往隊(duì)列里添加元素的操作將會被阻塞咏尝。
多線程的判斷一定要用
while(boolean) 不能用if(boolean)
Synchronized與Lock的區(qū)別
synchronized是關(guān)鍵字屬于JVM層面压语,自動釋放
synchronized要么隨機(jī)喚醒一個(gè)線程,要么全部喚醒
monitorenter monitorexit
lock是API層面的鎖编检,需要主動釋放
lock可以幫多個(gè)Condition,可以精確喚醒線程
等待是否可中斷
synchronized不可中斷胎食,除非拋出異常或者正常運(yùn)行完成
ReetrantLock可中斷蒙谓,1.設(shè)置超時(shí)方法trylock(long timeout,TimeUnit unit)
2.lockInterruptibly()放代碼塊中斥季,調(diào)用interrupt()方法可中斷
生產(chǎn)者與消費(fèi)者 阻塞隊(duì)列版
package com.dongge.volatiledemo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class PrdConsumer_BlockQueueDemo {
public static void main(String[] args) throws Exception {
Resource myResource = new Resource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t生產(chǎn)線程啟動");
try {
myResource.myProd();
} catch (Exception e) {
e.printStackTrace();
}
}, "prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t消費(fèi)線程啟動");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "consumer").start();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
myResource.stop();
}
}
class Resource {
private volatile boolean FLAG = true;
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public Resource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void myProd() throws Exception {
String data = null;
boolean retValue;
while (FLAG) {
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2l, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t插入隊(duì)列" + data + "成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t插入隊(duì)列" + data + "失敗");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "大老板叫停了训桶,表示FLAG=false,生產(chǎn)動作結(jié)束");
}
public void myConsumer() throws Exception {
String result = null;
while (FLAG) {
result = blockingQueue.poll(2l, TimeUnit.SECONDS);
if (null == result || result.equalsIgnoreCase("")) {
return;
}
System.out.println(Thread.currentThread().getName() + "\t消費(fèi)隊(duì)列" + result + "成功");
}
}
public void stop() throws Exception {
this.FLAG = false;
}
}
Callable
package com.dongge.volatiledemo;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return 1024;
}
}
public class CallableDemo{
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread());
new Thread(futureTask,"AA").start();
while(true){
if(futureTask.isDone()){
System.out.println(futureTask.get());
break;
}
}
}
}
注意 :多個(gè)線程使用同一個(gè)FutureTask累驮,只能被一個(gè)線程執(zhí)行。
線程池
獲取當(dāng)前主機(jī)CPU核數(shù)
Runtime.getRuntime().availableProcessors()
使用線程池的優(yōu)點(diǎn):
- 提高響應(yīng)速度舵揭,當(dāng)任務(wù)到達(dá)時(shí)谤专,任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
- 提高可管理性午绳,線程是稀缺資源置侍,如果無限的創(chuàng)建,不僅會消耗系統(tǒng)資源拦焚,還會降低系統(tǒng)穩(wěn)定性蜡坊。用完后可以還回池中再次使用。
ThreadPoolExecutor
固定線程數(shù)的線程池赎败,執(zhí)行一個(gè)長期的任務(wù)
Executors.newFixedThreadPool(int) 使用的是鏈表阻塞隊(duì)列
一池一個(gè)線程
Executors.newSingleThreadExecutor() 使用的是鏈表阻塞隊(duì)列
一池N個(gè)線程秕衙,執(zhí)行很短的任務(wù)
Executors.newCachedThreadPool() 使用的是同步隊(duì)列
case 1
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
try{
for (int i = 0; i < 10; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"\t辦理業(yè)務(wù)");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}
線程池的7大參數(shù)
- corePoolSize:線程池中的常駐核心線程數(shù)
- maximumPoolSize:線程池能夠容納同時(shí)執(zhí)行的最大線程數(shù),此值必須大于等于1
- keepAliveTime: 多余空閑線程的存活時(shí)間
當(dāng)前線程池?cái)?shù)量超過corePoolSize時(shí)僵刮,當(dāng)空閑時(shí)間達(dá)到keepAliveTime值時(shí)据忘,多余空閑線程會被銷毀直到只剩下corePoolSize個(gè)線程為止。 - unit: keepAliveTime 的單位
- workQueue: 任務(wù)隊(duì)列搞糕,被提交但尚未被執(zhí)行的任務(wù)
- threadFactory: 表示生成線程池中工作線程的線程工廠勇吊,用于創(chuàng)建線程一般默認(rèn)即可
- handler: 拒絕策略,表示當(dāng)隊(duì)列滿了并且工作線程大于線程池的最大線程數(shù)(maximumPoolSize)時(shí)如何來拒絕
當(dāng)前工作線程大于corePoolSize窍仰,就會到workQueue(候客區(qū))汉规,并創(chuàng)建線程小于等于maximumPoolSize,當(dāng)workQueue也滿了就會根據(jù)拒絕策略處理驹吮。
線程池的拒絕策略
- AbortPolicy(默認(rèn))直接拋出RejectedExecutionException阻止系統(tǒng)正常運(yùn)行针史。
- CallerRunsPolicy:調(diào)用運(yùn)行,一種調(diào)節(jié)機(jī)制钥屈,該策略既不會拋棄任務(wù)悟民,也不會拋出異常,而是將某些任務(wù)回退到調(diào)用者(任務(wù)由調(diào)用者執(zhí)行)篷就。
- DiscardOldestPolicy:拋棄隊(duì)列中等待最久的任務(wù)射亏,然后把當(dāng)前任務(wù)加入隊(duì)列中嘗試再次提交當(dāng)前任務(wù)近忙。
- DiscardPolicy:直接丟棄任務(wù),不予任何處理也不拋出異常智润。如果允許任務(wù)丟失及舍,這是最好的一種方案。
注意:實(shí)際工作中不要Executors來創(chuàng)建線程池窟绷,需要手寫
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(2,5,
1l, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
}
}
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(2,5,
1l, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
try {
for (int i = 0; i < 10; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"\t辦理業(yè)務(wù)");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
合理的配置線程池線程數(shù)
- CUP密集型(計(jì)算密集型)
cup核數(shù)+1 - IO密集型
- cup核數(shù) * 2
- cup核數(shù) / 1 - 阻塞系數(shù) (0.8~0.9)
死鎖
線程各自持有對方的鎖锯玛,陷入相互等待。
package com.dongge.volatiledemo;
import java.util.concurrent.TimeUnit;
class HoldLockThread implements Runnable{
private String lockA;
private String lockB;
public HoldLockThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA){
System.out.println(Thread.currentThread().getName()+"\t自己持有"+lockA+"\t嘗試獲得:"+lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB){
System.out.println(Thread.currentThread().getName()+"\t自己持有"+lockB+"\t嘗試獲得:"+lockA);
}
}
}
}
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new HoldLockThread(lockA,lockB),"AAA").start();
new Thread(new HoldLockThread(lockB,lockA),"BBB").start();
}
}
找出死鎖
jps -l 找出出問題的進(jìn)程
jstack 9636(進(jìn)程編號)