多線程
進程 操作系統(tǒng)管理最小單元
線程 CPU調(diào)度最小單元
-
如何停止線程
使用violate boolean變量來標識線程是否停止
停止線程時髓帽,需要調(diào)用停止線程的interrupt()方法食侮,因為線程有可能在wait()或sleep(), 提高停止線程的即時性
對于blocking IO的處理剧罩,盡量使用InterruptibleChannel來代替blocking IO
sleep()方法由于中斷而拋出異常淮椰,此時,它會清除中斷標記,如果不加以處理,那么下一次循環(huán)開始時镊辕,就無法捕獲這個中斷,故在異常處理中蚁袭,再次設(shè)置中斷標記位
while (!Thread.currentThread().isInterrupted()) {
// ... do stuff ...
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
- 類鎖 對象鎖 顯示鎖
- 內(nèi)置鎖 synchronized 鎖對象class和obj 分別對應(yīng)類鎖和對象鎖
- 對象鎖的粒度要比類鎖的粒度要細征懈,引起線程競爭鎖的情況比類鎖要少的多,所以盡量別用類鎖揩悄,鎖的粒度越少越好卖哎。
- 顯示鎖 ReentrantLock
- 內(nèi)置鎖 synchronized 鎖對象class和obj 分別對應(yīng)類鎖和對象鎖
內(nèi)置鎖和讀寫鎖效率對比
//---------------內(nèi)置鎖------------------------
public synchronized String getStr(){
return "x";
}
public synchronized void setStr(String str){
//"x"
}
//----------------讀寫鎖----------------------------
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock getLock = lock.readLock();//讀鎖
private final Lock setLock = lock.readLock();//寫鎖
public String getStr(){
getLock.lock();
try{
return "x";
}finally{
getLock.unlock();
}
}
public void setStr(String str){
setLock.lock();
try{
//"x"
}finally{
setLock.unlock();
}
}
//讀寫鎖比互斥鎖允許對于共享數(shù)據(jù)更大程度的并發(fā)。每次只能有一個寫線程删性,但是同時可以有多個線程并發(fā)地讀數(shù)據(jù)亏娜。ReadWriteLock適用于讀多寫少的并發(fā)情況。
-
生產(chǎn)者消費者模式
第一版 使用synchronized 保證生產(chǎn)者 生產(chǎn)完后消費者才獲取到
導(dǎo)致問題: 生產(chǎn)者不斷生產(chǎn) 生產(chǎn)完后消費者才能消費 需要實現(xiàn): 生產(chǎn)者生產(chǎn)一個消費一個
public synchronized void put(String name){
id += 1;
//生產(chǎn)者生產(chǎn)一個完成
}
public synchronized void out(){
id -=1;
//消費者得到
}
第二版 增加標記
wait()蹬挺、notify()和notifyAll()方法為什么要在synchronized代碼塊中维贺?
在Object的wait()方法上面有這樣一行注釋:The current thread must own this object's monitor,意思是調(diào)用實例對象的wait()方法時巴帮,該線程必須擁有當前對象的monitor對象鎖溯泣,而要擁有monitor對象鎖就需要在synchronized修飾的方法或代碼塊中競爭并生成占用monitor對象鎖虐秋。而不使用synchronized修飾的方法或代碼塊就不會占有monitor對象鎖,所以在synchronized代碼塊之外調(diào)用會出現(xiàn)錯誤垃沦,錯誤提示為:
Exception in thread "main" java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at it.cast.basic.thread.SynchronizedTest.main(SynchronizedTest.java:27)
private boolean flag;
public synchronized void put(String name){
if(!flag){
id += 1;
//生產(chǎn)者生產(chǎn)一個完成
flag = true;
notifyAll();//喚醒wait()凍結(jié)的線程 如果沒有就是空喚醒
wait();//當前線程凍結(jié) 釋放CPU執(zhí)行權(quán)去執(zhí)行其它線程
}
}
public synchronized void out(){
if(flag){
id -=1;
//消費者得到
flag = false;
notifyAll();//喚醒wait()凍結(jié)的線程 如果沒有就是空喚醒
wait();//當前線程凍結(jié) 釋放CPU執(zhí)行權(quán)去執(zhí)行其它線程
}
}
管程法 生產(chǎn)者消費者和優(yōu)化
private ArrayList<Integer> array= new ArrayList<>();
private Object creatorLocker = new Object();
public void put(int num){
synchronized (creatorLocker) {//必須先獲得生產(chǎn)者鎖才能生產(chǎn)
synchronized (array) {
while(array.size()>=5){
try{
array.wait();
}
}
array.add(num);
array.notify();
}
}
}
private Object consumerLocker = new Object();
public void get(){
synchronized (consumerLocker) {
synchronized (array) {
while(array.size()<1){
try{
array.wait();
}
}
int a = array.get(0);
array.notify();
}
return a;
}
}
Lock 生產(chǎn)者和消費者
private int number =0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void put(){
lock.lock();
try{
while(number >= 5){
condition.await();
}
number++;
condition.signalAll();
}finally{
lock.unlock();
}
}
public void get(){
lock.lock();
try{
while(number <= 0){
condition.await();
}
number--;
condition.signalAll();
}finally{
lock.unlock();
}
}
- ThreadLocal
- CAS
鎖機制存在以下問題:
(1)在多線程競爭下客给,加鎖、釋放鎖會導(dǎo)致比較多的上下文切換和調(diào)度延時肢簿,引起性能問題靶剑。
(2)一個線程持有鎖會導(dǎo)致其它所有需要此鎖的線程掛起。
(3)如果一個優(yōu)先級高的線程等待一個優(yōu)先級低的線程釋放鎖會導(dǎo)致優(yōu)先級倒置池充,引起性能風(fēng)險抬虽。
volatile是不錯的機制,但是volatile不能保證原子性纵菌。因此對于同步最終還是要回到鎖機制上來。
獨占鎖是一種悲觀鎖休涤,synchronized就是一種獨占鎖咱圆,會導(dǎo)致其它所有需要鎖的線程掛起,等待持有鎖的線程釋放鎖功氨。而另一個更加有效的鎖就是樂觀鎖序苏。所謂樂觀鎖就是,每次不加鎖而是假設(shè)沒有沖突而去完成某項操作捷凄,如果因為沖突失敗就重試忱详,直到成功為止。樂觀鎖用到的機制就是CAS跺涤,Compare and Swap匈睁。
- 線程池
public ThreadPoolExecutor(int corePoolSize,//核心線程數(shù)
int maximumPoolSize,//最大線程數(shù) 核心線程和隊列線程滿了創(chuàng)建空閑線程
long keepAliveTime,//核心線程除外的空閑線程存活時間
TimeUnit unit,
BlockingQueue<Runnable> workQueue,//有界阻塞隊列 新任務(wù)放入等待調(diào)度
ThreadFactory threadFactory,//創(chuàng)建新線程的工廠類
RejectedExecutionHandler handler)//拒絕策略 達到最大線程數(shù)限制
工作隊列
- ArrayBlockingQueue 基于數(shù)組有界阻塞隊列 新任務(wù)加入隊尾 隊列滿則創(chuàng)建新線程到maximumPoolSizes數(shù)量
- LinkedBlockingQueue 最大容量為Interger.MAX 新任務(wù)一直存在該隊列 不會創(chuàng)建新線程到maximumPoolSizes數(shù)量 相當于maximumPoolSize參數(shù)不起作用
- SynchronousQueue 不緩存阻塞隊列 生產(chǎn)者放入一個任務(wù)必須等到消費者取出這個任務(wù) 沒有線程則創(chuàng)建線程到maximumPoolSizes數(shù)量
- PriorityBlockingQueue 優(yōu)先級隊列 優(yōu)先級通過Comparator實現(xiàn)
拒絕策略 工作隊列任務(wù)達到最大限制 超過maximumPoolSizes數(shù)量 則執(zhí)行拒絕策略
- CallerRunsPolicy 直接執(zhí)行被拒絕任務(wù)run方法 除非線程池已經(jīng)shutdown 則直接拋棄任務(wù)
- AbortPolicy 直接丟棄任務(wù) 并拋出RejectedExecutionExecption異常
- DiscardPolicy 直接丟棄任務(wù) 什么都不做
- DiscardOldestPolicy 拋棄進入最早的任務(wù) 嘗試將這次拒絕的任務(wù)放入隊列
線程池提交任務(wù)執(zhí)行順序
- 線程數(shù)<核心線程數(shù) 創(chuàng)建線程(線程為核心線程)
- 線程數(shù)>核心線程數(shù) 隊列未滿 將任務(wù)放入隊列中
- 線程數(shù)>核心線程數(shù) 隊列已滿
- 線程數(shù)<最大線程數(shù) 創(chuàng)建線程
- 線程數(shù)>最大線程數(shù) 啟用拒絕策略
- 一個線程執(zhí)行完 從隊列中取出一個任務(wù)執(zhí)行
- 線程執(zhí)行最后空閑線程根據(jù)空閑時間停掉 最終收縮到核心線程數(shù)
-
BlockingQueue阻塞隊列
- ArrayBlockingQueue
方式 拋出異常 有返回值不拋出異常 阻塞等待 超時等待 添加 add() offer() put() offer 移除 remove() poll() take() poll() 檢測隊首元素 element() peek() - - -
SynchronousQueue同步隊列
- 沒有容量 put進入一個元素 必須等待 take取出來 才能進行下一個put元素操
-
線程池的使用
降低資源的消耗
提高響應(yīng)速度
-
方便管理
線程復(fù)用 可以控制最大并發(fā)數(shù) 管理線程
Executors.newSingleThreadExecutor();//單線程 Executors.newFixedThreadPool(5);//固定線程池大小 Executors.newCachedThreadPool();//可伸縮 遇強則強 遇弱則弱 //使用完調(diào)用shutdown關(guān)閉
線程池不允許使用Executors去創(chuàng)建, 而是通過ThreadPoolExecutor的方式
-
FixedThreadPool和SingleThreadPool
允許的請求隊列長度為Integer.MAX_VALUE 可能會堆積大量的請求 從而導(dǎo)致OOM
-
CachedThreadPool和ScheduledTreadPool
允許的創(chuàng)建線程數(shù)量為Integer.MAX_VALUE, 可能會創(chuàng)建大量的線程, 從而導(dǎo)致OOM
- 自定義線程池 ThreadPoolExecutor
- 最大線程數(shù)如何定義
- CPU密集型 保持CPU效率最高
- Runtime.getRuntime().availableProcessors()
- IO密集型
- 設(shè)置數(shù)量為>程序中十分耗IO資源的線程數(shù)
- CPU密集型 保持CPU效率最高
- 最大線程數(shù)如何定義
-
函數(shù)式接口
- 一個有且僅有一個抽象方法,但是可以有多個非抽象方法的接口
//一個有且僅有一個抽象方法桶错,但是可以有多個非抽象方法的接口 @FunctionalInterface interface GreetingService{ void sayMessage(String message); }
-
Stream流式計算
/** * 現(xiàn)在有5個用戶 篩選 * 1 id必須偶數(shù) * 2 年齡>23 * 3 用戶名轉(zhuǎn)為大寫字母 * 4 用戶名字字母倒排 * 5 只輸出一個用戶 * @param args */ @RequiresApi(api = Build.VERSION_CODES.N) public static void main(String[] args) { User a = new User(1, "a", 21); User b = new User(2, "b", 22); User c = new User(3, "c", 23); User d = new User(4, "d", 24); User e = new User(5, "e", 25); User f = new User(6, "f", 26); List<User> users = Arrays.asList(a, b, c, d, e,f); //計算交給Stream users.stream() .filter(user -> user.id % 2 == 0) .filter(user -> user.age > 23) .map(user -> user.name.toUpperCase())//返回name // .sorted(Comparator.reverseOrder())//name調(diào)用 .sorted((name1, name2) -> name2.compareTo(name1))//name調(diào)用 .limit(1) .forEach(System.out::println); // .forEach(s -> System.out.println(s)); }
- ForkJoin
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
private Long temp = 10000L;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
@RequiresApi(api = Build.VERSION_CODES.N)
public static void main(String[] args) {
forkTest();
//stream并行流
stream();
}
@RequiresApi(api = Build.VERSION_CODES.N)
private static void stream() {
long sum3 = LongStream.rangeClosed(0L, 10000000L).parallel().reduce(0, Long::sum);
}
private static void forkTest() {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinDemo demo = new ForkJoinDemo(0L, 100000000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(demo);
try {
Long sum = submit.get();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//或者
forkJoinPool.execute(demo);
try {
Long sum2 = demo.get();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//計算方法
@Override
protected Long compute() {
if ((end-start)>temp){//超過分支
long mid = (start + end) / 2;//中間值
ForkJoinDemo demo1 = new ForkJoinDemo(start, mid);
demo1.fork();//拆分任務(wù) 將任務(wù)加入線程隊列
ForkJoinDemo demo2 = new ForkJoinDemo(mid + 1, end);
demo2.fork();
return demo1.join()+demo2.join();
}else {
long sum=0L;
for (Long i = start; i <= end; i++) {
sum += i;
}
return sum;
}
}
}
- Future異步調(diào)用
public static void main(String[] args) {
//發(fā)起一個請求 無返回值 runAsync
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
System.out.println("runAsync");
});
try {
completableFuture.get();//獲取阻塞執(zhí)行結(jié)果
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
//線程順序執(zhí)行
Thread thread1 = new Thread(()-> System.out.println(Thread.currentThread().getName()),"thread-1");
Thread thread2 = new Thread(()-> System.out.println(Thread.currentThread().getName()),"thread-2");
Thread thread3 = new Thread(()-> System.out.println(Thread.currentThread().getName()),"thread-3");
CompletableFuture.runAsync(thread1::start).thenRun(thread2::start).thenRun(thread3::start);
//有返回值 supplyAsync
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync");
return 1;
});
//異步回調(diào)
System.out.println(CompletableFuture.whenComplete((t, u) -> {
System.out.println("whenComplete" + " " + t + " " + u);
}).exceptionally((e) -> {
return 400;
}).get());
}
- JMM java內(nèi)存模型
關(guān)于JMM的一些同步約定
1. 線程解鎖前 必須將共享變量立即刷回主存
2. 線程加鎖前 必須讀取主存中的最新值到工作內(nèi)存中
3. 加鎖和解鎖是同一把鎖
java內(nèi)存模型定義了8種操作來完成:
- lock(鎖定):作用于主內(nèi)存航唆,它把一個變量標記為一條線程獨占狀態(tài);
- unlock(解鎖):作用于主內(nèi)存院刁,它將一個處于鎖定狀態(tài)的變量釋放出來糯钙,釋放后的變量才能夠被其他線程鎖定;
- read(讀取):作用于主內(nèi)存退腥,它把變量值從主內(nèi)存?zhèn)魉偷骄€程的工作內(nèi)存中任岸,以便隨后的load動作使用;
- load(載入):作用于工作內(nèi)存狡刘,它把read操作的值放入工作內(nèi)存中的變量副本中;
- use(使用):作用于工作內(nèi)存享潜,它把工作內(nèi)存中的值傳遞給執(zhí)行引擎,每當虛擬機遇到一個需要使用這個變量的指令時候颓帝,將會執(zhí)行這個動作米碰;
- assign(賦值):作用于工作內(nèi)存窝革,它把從執(zhí)行引擎獲取的值賦值給工作內(nèi)存中的變量,每當虛擬機遇到一個給變量賦值的指令時候吕座,執(zhí)行該操作虐译;
- store(存儲):作用于工作內(nèi)存,它把工作內(nèi)存中的一個變量傳送給主內(nèi)存中吴趴,以備隨后的write操作使用漆诽;
- write(寫入):作用于主內(nèi)存,它把store傳送值放到主內(nèi)存中的變量中.
volatile的理解
volatile是輕量級同步機制 保證可見性 不保證原子性 禁止指令重排
拋出問題: 子線程不知道主存的變化
使用volatile關(guān)鍵字會強制將修改的值立即寫入主存锣枝,只要有一個線程將變量的值改了厢拭,馬上就會同步到內(nèi)存,其他的線程馬上就可以得到這個改過后的值
- CAS
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(1);
//參數(shù)1期望值 達到了就更新為 參數(shù)2更新值
atomicInteger.compareAndSet(1,2);
System.out.println(atomicInteger.get());
//自旋鎖
atomicInteger.getAndIncrement();
System.out.println(atomicInteger.get());
atomicInteger.compareAndSet(3,4);
System.out.println(atomicInteger.get());
//Unsafe類 通過這個類操作內(nèi)存
}
缺點:
1. 循環(huán)耗時
2. 一次性只能保證一個共享變量的原子性
3. ABA問題
帶版本號原子引用解決ABA問題
AtomicRefrence
AtomicStampedReference<Integer> reference = new AtomicStampedReference<>(初始值, 初始版本號);
-
各種鎖的理解
公平鎖 不插隊
-
非公平 可以插隊 (默認都是非公平)
public ReentrantLock(boolean fair){ sync = fair? new FairSync() : new NonfairSync(); }
可重入鎖
不可重入鎖
-
自旋鎖
AtomicReference<Thread> atomicReference = new AtomicReference<>(); //加鎖 public void myLock(){ Thread thread = Thread.currentThread(); //自旋鎖 while (!atomicReference.compareAndSet(null,thread)){ } } public void myUnlock(){ Thread thread = Thread.currentThread(); atomicReference.compareAndSet(thread,null); }
-
死鎖
jps查詢進程信息 jstack查詢堆棧信息排查死鎖原因
java.lang.management 接口 ThreadMXBean
線程管理類接口