讀寫(xiě)鎖 ReadWriteLock
-
概念
維護(hù)一對(duì)關(guān)聯(lián)鎖,一個(gè)只用于讀操作,一個(gè)只用于寫(xiě)操作;
讀鎖可以由多個(gè)讀線(xiàn)程同時(shí)持有本昏,寫(xiě)鎖是排他的。同一時(shí)間枪汪,兩把鎖不能被不同線(xiàn)程持有涌穆。
目的是為了將讀寫(xiě)分開(kāi),因?yàn)槿绻环珠_(kāi)的話(huà)雀久,那么多個(gè)讀鎖想要同時(shí)獲取的時(shí)候宿稀,還是需要等待,但是此時(shí)鎖住的內(nèi)容是沒(méi)有改變的赖捌,這樣就緹歐生了提升系統(tǒng)運(yùn)行的效率祝沸。
例子:
package lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLock_Demo {
//不希望讀寫(xiě)的內(nèi)容都不一樣,需要加入鎖機(jī)制
volatile long i = 0;
Lock lock = new ReentrantLock();
ReadWriteLock rwLock = new ReentrantReadWriteLock();
// 使用讀寫(xiě)鎖的方式
public void read() {
rwLock.readLock().lock();
long iValue = i;
rwLock.readLock().unlock();
}
public void write() {
rwLock.writeLock().lock();
i++;
rwLock.writeLock().unlock();
}
// 非讀寫(xiě)鎖的方式
// public void read() {
// lock.lock();
//
// long a = i;
//
// lock.unlock();
// }
//
// public void write() {
// lock.lock();
//
// i++;
//
// lock.unlock();
// }
}
-
適用場(chǎng)景
適合讀取操作多于寫(xiě)入操作的場(chǎng)景,改進(jìn)互斥鎖的性能罩锐,
比如:集合的并發(fā)線(xiàn)程安全性改造奉狈、緩存組件。
HashMap使用讀寫(xiě)鎖實(shí)現(xiàn)緩存唯欣,【讀多寫(xiě)少】的例子
package lock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @Author: Neco
* @Description: HashMap使用讀寫(xiě)鎖實(shí)現(xiàn)緩存嘹吨,讀多寫(xiě)少的例子
* @Date: create in 2022/6/20 17:59
*/
public class HashMap_Demo {
// 存放值的map
private final Map<String, Object> map = new HashMap<>();
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 讀寫(xiě)鎖
private final Lock readLock = readWriteLock.readLock(); // 讀鎖
private final Lock writeLock = readWriteLock.writeLock(); // 寫(xiě)鎖
// 獲取數(shù)據(jù)
public Object get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
// 放入緩存
public void put(String key, Object object) {
writeLock.lock();
try {
map.put(key, object);
} finally {
writeLock.unlock();
}
}
// 獲取所有的key
public Object[] keys() {
readLock.lock();
try {
return map.keySet().toArray();
} finally {
readLock.unlock();
}
}
// 清空緩存
public void clear() {
writeLock.lock();
try {
map.clear();
} finally {
writeLock.unlock();
}
}
// 其他的一些方法略……
}
-
鎖降級(jí)
指的是寫(xiě)鎖降級(jí)成為讀鎖。持有寫(xiě)鎖的同時(shí)境氢,再獲取讀鎖蟀拷,隨后釋放寫(xiě)鎖的過(guò)程。寫(xiě)鎖是線(xiàn)程獨(dú)占萍聊,讀鎖是共享问芬,所以寫(xiě)->讀是降級(jí)。(讀->寫(xiě)寿桨,是不能實(shí)現(xiàn)的)
讀寫(xiě)鎖的實(shí)現(xiàn)代碼 - 理論上實(shí)現(xiàn)的一個(gè)例子
- 鎖的代碼
package lock.readwritelock1;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
/**
* @Author: Neco
* @Description:
* @Date: create in 2022/6/20 17:55
*/
public class NecoReadWriteLock {
// 一個(gè)read的count
AtomicInteger readCount = new AtomicInteger(0);
// 一個(gè)write的count
AtomicInteger writeCount = new AtomicInteger(0);
// 獨(dú)占鎖 擁有者
AtomicReference<Thread> owner = new AtomicReference<>();
// 等待隊(duì)列
public volatile LinkedBlockingQueue<WaitNode> waiters = new LinkedBlockingQueue<WaitNode>();
// 獲取獨(dú)占鎖
public void lock() {
int arg = 1;
// 如果嘗試此衅,如果成功則方法正常退出,否則進(jìn)行其他的處理
if (!tryLock(arg)) {
// 創(chuàng)建鎖節(jié)點(diǎn)亭螟,并且標(biāo)記為獨(dú)占鎖
WaitNode waitNode = new WaitNode(Thread.currentThread(), 0, arg);
// 置入等待隊(duì)列
waiters.offer(waitNode);
// 循環(huán)嘗試獲取鎖
while (true) {
// 獲取頭部節(jié)點(diǎn)線(xiàn)程
WaitNode head = waiters.peek();
// 如果頭節(jié)點(diǎn)不為空且為當(dāng)前線(xiàn)程
if (head != null && head.thread == Thread.currentThread()) {
// 再次嘗試獲取 獨(dú)占鎖
if (!tryLock(arg)) {
// 若獲取失敗挡鞍,掛起線(xiàn)程
LockSupport.park();
//若成功獲取
} else {
// 將當(dāng)前線(xiàn)程從隊(duì)列頭部移除,并退出方法
waiters.poll();
return;
}
}
}
}
}
// 釋放獨(dú)占鎖
public void unlock() {
int arg = 1;
//嘗試釋放獨(dú)占鎖 若失敗返回true预烙,若失敗...
if (tryUnlock(arg)) {
// 取出隊(duì)列頭部的元素
WaitNode head = waiters.peek();
if (head != null) {
Thread th = head.thread;
// 喚醒隊(duì)列頭部的線(xiàn)程
LockSupport.unpark(th);
}
}
}
//嘗試獲取獨(dú)占鎖
public boolean tryLock(int acquires) {
// 如果 被讀鎖占用 readCount > 0墨微,則返回false
if (readCount.get() != 0) return false;
// 獲取 獨(dú)占鎖 狀態(tài) 0 未被占用
int wct = writeCount.get(); //
// wct == 0,表示未被占用
if (wct == 0) {
// 設(shè)置值扁掸,搶鎖
if (writeCount.compareAndSet(wct, wct + acquires)) {
// 搶鎖成功翘县,設(shè)置owner
owner.set(Thread.currentThread());
return true;
}
// 如果已經(jīng)被占用,且占用的線(xiàn)程為當(dāng)前線(xiàn)程谴分,則進(jìn)行重入操作
} else if (owner.get() == Thread.currentThread()) {
writeCount.compareAndSet(wct, wct + acquires);
return true;
}
// 否則搶鎖失敗
return false;
}
//嘗試釋放獨(dú)占鎖
public boolean tryUnlock(int releases) {
// 如果當(dāng)前線(xiàn)程沒(méi)有持有獨(dú)占鎖锈麸,則拋出異常
if (owner.get() != Thread.currentThread()) throw new IllegalMonitorStateException();
// 獲取 獨(dú)占鎖 狀態(tài) 0 未被占用
int wct = writeCount.get();
// 計(jì)算 獨(dú)占鎖剩余占用,即釋放了鎖之后新的 writeCount 應(yīng)該是多少
int nwct = wct - releases;
// 不管是否完全釋放牺蹄,都更新count值
writeCount.set(nwct);
// 如果完全釋放忘伞,則將owner設(shè)置為null
if (nwct == 0) {
owner.compareAndSet(Thread.currentThread(), null);
return true;
} else {
return false;
}
}
/************************* 共享鎖 *****************************/
// 獲取共享鎖
public void lockShared() {
int arg = 1;
if (!tryLockShared(arg)) { //如果tryAcquireShare失敗
// 創(chuàng)建鎖節(jié)點(diǎn),并且標(biāo)記為共享鎖
WaitNode node = new WaitNode(Thread.currentThread(), 1, arg);
// 將當(dāng)前進(jìn)程放入隊(duì)列
waiters.offer(node); //加入隊(duì)列
while (true) {
//若隊(duì)列頭部的元素是當(dāng)前線(xiàn)程
WaitNode head = waiters.peek();
if (head != null && head.thread == Thread.currentThread()) {
//嘗試獲取共享鎖沙兰,若成功
if (tryLockShared(arg)) {
//將當(dāng)前線(xiàn)程從隊(duì)列中移除
waiters.poll();
WaitNode next = waiters.peek();
if (next != null && next.type == 1) { // 如果下一個(gè)線(xiàn)程也是等待共享鎖
LockSupport.unpark(next.thread); // 將其喚醒
}
return; // 退出方法
} else { // 若嘗試失敗
LockSupport.park(); // 掛起線(xiàn)程
}
} else { // 若不是頭部元素
LockSupport.park();
}
}
}
}
// 釋放共享鎖
public boolean unLockShared() {
int arg = 1;
if (tryUnLockShared(arg)) { //當(dāng)read count變?yōu)?虑省,才叫release share成功
WaitNode next = waiters.peek();
if (next != null) {
LockSupport.unpark(next.thread);
}
return true;
}
return false;
}
// 嘗試獲取共享鎖
public boolean tryLockShared(int acquires) {
while (true) {
// 如果已經(jīng)被獨(dú)占鎖占用,且當(dāng)前獨(dú)占鎖所屬的線(xiàn)程不是當(dāng)前線(xiàn)程僧凰,加鎖失敗
if (writeCount.get() != 0 && owner.get() != Thread.currentThread()) {
return false;
}
// 獲取共享鎖狀態(tài)
int rct = readCount.get();
// 加鎖探颈,修改狀態(tài),并且返回值
if (readCount.compareAndSet(rct, rct + acquires)) {
return true;
}
}
}
//嘗試解鎖共享鎖
public boolean tryUnLockShared(int releases) {
while (true) {
int rct = readCount.get();
int nrct = rct - releases;
if (readCount.compareAndSet(rct, nrct)) {
return nrct == 0;
}
}
}
class WaitNode {
int type = 0; //0 為想獲取獨(dú)占鎖的線(xiàn)程训措, 1為想獲取共享鎖的線(xiàn)程
Thread thread = null;
int arg = 0;
public WaitNode(Thread thread, int type, int arg) {
this.thread = thread;
this.type = type;
this.arg = arg;
}
}
}
- 測(cè)試代碼
package lock.readwritelock1;
/**
* @Author: Neco
* @Description:
* @Date: create in 2022/6/20 22:02
*/
public class ReadWriteLockDemo {
static NecoReadWriteLock rwLock = new NecoReadWriteLock();
static volatile int i = 0;
static void add() {
i++;
}
public static void main(String args[]) throws InterruptedException {
long startTime = System.currentTimeMillis();
for (int a = 1; a <= 20000; a++) {
final int n = a;
new Thread(new Runnable() {
@Override
public void run() {
if (n % 5 == 0) {
rwLock.lock();
add();
rwLock.unlock();
} else {
rwLock.lockShared();
System.out.println("i=" + i);
int a = i;
rwLock.unLockShared();
}
}
}).start();
}
while (true) {
System.out.println("目前耗時(shí):" + (System.currentTimeMillis() - startTime) / 1000 + "s");
Thread.sleep(1000L);
System.out.println("i=" + i);
}
}
}
- 運(yùn)行結(jié)果伪节,截選
i=3996
i=3996
i=3996
i=3997
i=3997
i=3997
i=3997
i=3998
i=3998
i=3998
i=3998
i=3999
i=3999
目前耗時(shí):1s
i=3999
i=3999
i=4000
目前耗時(shí):2s
i=4000
目前耗時(shí):3s
當(dāng)然以上的代碼還有很多可以修改的內(nèi)容光羞,比如抽取公共的方法,又比如實(shí)現(xiàn)公平鎖和非公平鎖的相關(guān)內(nèi)容怀大,具體的實(shí)現(xiàn)可以根據(jù)需求來(lái)繼續(xù)擴(kuò)充纱兑,以上的代碼只是作為一個(gè)參考。
- 抽取重復(fù)代碼塊
package lock.readwritelock2;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
public class CommonMask { //重復(fù)的代碼提取到這里
volatile AtomicInteger readCount = new AtomicInteger(0);
AtomicInteger writeCount = new AtomicInteger(0);
//獨(dú)占鎖 擁有者
AtomicReference<Thread> owner = new AtomicReference<>();
//等待隊(duì)列
public volatile LinkedBlockingQueue<WaitNode> waiters = new LinkedBlockingQueue<WaitNode>();
class WaitNode {
int type = 0; //0 為想獲取獨(dú)占鎖的線(xiàn)程化借, 1為想獲取共享鎖的線(xiàn)程
Thread thread = null;
int arg = 0;
public WaitNode(Thread thread, int type, int arg) {
this.thread = thread;
this.type = type;
this.arg = arg;
}
}
//獲取獨(dú)占鎖
public void lock() {
int arg = 1;
//嘗試獲取獨(dú)占鎖潜慎,若成功,退出方法蓖康, 若失敗...
if (!tryLock(arg)) {
//標(biāo)記為獨(dú)占鎖
WaitNode waitNode = new WaitNode(Thread.currentThread(), 0, arg);
waiters.offer(waitNode); //進(jìn)入等待隊(duì)列
//循環(huán)嘗試拿鎖
for (; ; ) {
//若隊(duì)列頭部是當(dāng)前線(xiàn)程
WaitNode head = waiters.peek();
if (head != null && head.thread == Thread.currentThread()) {
if (!tryLock(arg)) { //再次嘗試獲取 獨(dú)占鎖
LockSupport.park(); //若失敗铐炫,掛起線(xiàn)程
} else { //若成功獲取
waiters.poll(); // 將當(dāng)前線(xiàn)程從隊(duì)列頭部移除
return; //并退出方法
}
} else { //若不是隊(duì)列頭部元素
LockSupport.park(); //將當(dāng)前線(xiàn)程掛起
}
}
}
}
//釋放獨(dú)占鎖
public boolean unlock() {
int arg = 1;
//嘗試釋放獨(dú)占鎖 若失敗返回true,若失敗...
if (tryUnlock(arg)) {
WaitNode next = waiters.peek(); //取出隊(duì)列頭部的元素
if (next != null) {
Thread th = next.thread;
LockSupport.unpark(th); //喚醒隊(duì)列頭部的線(xiàn)程
}
return true; //返回true
}
return false;
}
//嘗試獲取獨(dú)占鎖
public boolean tryLock(int acquires) {
//如果read count 蒜焊!=0 返回false
if (readCount.get() != 0)
return false;
int wct = writeCount.get(); //拿到 獨(dú)占鎖 當(dāng)前狀態(tài)
if (wct == 0) {
if (writeCount.compareAndSet(wct, wct + acquires)) { //通過(guò)修改state來(lái)?yè)屾i
owner.set(Thread.currentThread()); // 搶到鎖后倒信,直接修改owner為當(dāng)前線(xiàn)程
return true;
}
} else if (owner.get() == Thread.currentThread()) {
writeCount.set(wct + acquires); //修改count值
return true;
}
return false;
}
//嘗試釋放獨(dú)占鎖
public boolean tryUnlock(int releases) {
//若當(dāng)前線(xiàn)程沒(méi)有 持有獨(dú)占鎖
if (owner.get() != Thread.currentThread()) {
throw new IllegalMonitorStateException(); //拋IllegalMonitorStateException
}
int wc = writeCount.get();
int nextc = wc - releases; //計(jì)算 獨(dú)占鎖剩余占用
writeCount.set(nextc); //不管是否完全釋放,都更新count值
if (nextc == 0) { //是否完全釋放
owner.compareAndSet(Thread.currentThread(), null);
return true;
} else {
return false;
}
}
//獲取共享鎖
public void lockShared() {
int arg = 1;
if (tryLockShared(arg) < 0) { //如果tryAcquireShare失敗
//將當(dāng)前進(jìn)程放入隊(duì)列
WaitNode node = new WaitNode(Thread.currentThread(), 1, arg);
waiters.offer(node); //加入隊(duì)列
for (; ; ) {
//若隊(duì)列頭部的元素是當(dāng)前線(xiàn)程
WaitNode head = waiters.peek();
if (head != null && head.thread == Thread.currentThread()) {
if (tryLockShared(arg) >= 0) { //嘗試獲取共享鎖泳梆, 若成功
waiters.poll(); //將當(dāng)前線(xiàn)程從隊(duì)列中移除
WaitNode next = waiters.peek();
if (next != null && next.type == 1) { //如果下一個(gè)線(xiàn)程也是等待共享鎖
LockSupport.unpark(next.thread); //將其喚醒
}
return; //退出方法
} else { //若嘗試失敗
LockSupport.park(); //掛起線(xiàn)程
}
} else { //若不是頭部元素
LockSupport.park();
}
}
}
}
//解鎖共享鎖
public boolean unLockShared() {
int arg = 1;
if (tryUnLockShared(arg)) { //當(dāng)read count變?yōu)?鳖悠,才叫release share成功
WaitNode next = waiters.peek();
if (next != null) {
LockSupport.unpark(next.thread);
}
return true;
}
return false;
}
//嘗試獲取共享鎖
public int tryLockShared(int acquires) {
for (; ; ) {
if (writeCount.get() != 0 &&
owner.get() != Thread.currentThread())
return -1;
int rct = readCount.get();
if (readCount.compareAndSet(rct, rct + acquires)) {
return 1;
}
}
}
//嘗試解鎖共享鎖
public boolean tryUnLockShared(int releases) {
for (; ; ) {
int rc = readCount.get();
int nextc = rc - releases;
if (readCount.compareAndSet(rc, nextc)) {
return nextc == 0;
}
}
}
}
AQS (AbstractQueuedSynchronizer)
實(shí)際上,AQS實(shí)現(xiàn)對(duì)我們同步狀態(tài)的管理优妙,以及阻塞線(xiàn)程進(jìn)行排隊(duì)等待通知的一系列底層實(shí)現(xiàn)的處理乘综,AQS核心其實(shí)就包含了以下幾個(gè)內(nèi)容:
- 同步隊(duì)列
- 鎖的釋放和獲取,包含了獨(dú)占鎖和共享鎖一系列的實(shí)現(xiàn)
- 本質(zhì)來(lái)說(shuō)套硼,跟上方的重復(fù)代碼塊的內(nèi)容(模板方法)類(lèi)似
- AQS中是使用鏈表來(lái)實(shí)現(xiàn)等待隊(duì)列的
AQS為一系列同步器依賴(lài)于一個(gè)單獨(dú)的原子變量(state)的同步器提供了一個(gè)非常有用的基礎(chǔ)卡辰。子類(lèi)們必須定義改變state變量的protected方法,這些方法定義了state是如何被獲取或釋放的熟菲。鑒于此,本類(lèi)中的其他方法執(zhí)行所有的排隊(duì)和阻塞機(jī)制朴恳。子類(lèi)也可以維護(hù)其他的state變量抄罕,但是為了保證同步,必須原子地操作這些變量于颖。
AQS 抽象隊(duì)列同步器
- 提供了對(duì)資源占用呆贿、釋放,線(xiàn)程的掛起、喚醒的邏輯森渐。
- 預(yù)留了各種 try方法給用戶(hù)實(shí)現(xiàn)
- 可以用在各種需要控制資源爭(zhēng)用的場(chǎng)景中做入。(ReentrantLock/CountDownLatch/Semphore)
此外 ReadWriteLock 使用了一個(gè)int 存儲(chǔ)了兩個(gè)count的值,以解決readCount和writeCount的在讀寫(xiě)線(xiàn)程同時(shí)操作時(shí)可能沖突的問(wèn)題
如果覺(jué)得有收獲就點(diǎn)個(gè)贊吧同衣,更多知識(shí)竟块,請(qǐng)點(diǎn)擊關(guān)注查看我的主頁(yè)信息哦~