在傳統(tǒng)單機(jī)項目中為了解決并發(fā)訪問的問題鳖链,我們通常都會使用鎖來實現(xiàn)诚纸。一般使用synchronized關(guān)鍵字來鎖住臨界區(qū)鞠评,當(dāng)然也可以使用Lock鎖驰凛。但是現(xiàn)在很多項目都使用分布式集群方式部署非驮,synchronized關(guān)鍵字無法實現(xiàn)跨機(jī)器之間的鎖交汤,Lock也是如此,為了解決這種問題劫笙,需要設(shè)計一種新的分布式鎖芙扎。
分布式鎖除了具有分布式情況下控制并發(fā)的問題,同時還需要考慮以下問題:
1.較高的性能填大,如果實現(xiàn)分布式鎖的方式太差戒洼,會影響并發(fā)訪問。
2.可重入性栋盹,同一個線程可多次獲取鎖施逾。
3.鎖失效機(jī)制,如果一個線程在獲取鎖后掛掉了例获,需要釋放鎖汉额,否則其他線程將一直獲取不到鎖。
4.如果用戶在失效時間內(nèi)沒有完成任務(wù)榨汤,鎖如何續(xù)期蠕搜,該續(xù)期多長時間。
基于以上問題收壕,共設(shè)計了三種分布式鎖
①基于數(shù)據(jù)庫實現(xiàn)分布式鎖
基于數(shù)據(jù)庫實現(xiàn)分布式鎖主要是利用數(shù)據(jù)庫唯一索引的排斥性和天然支持原子操作有關(guān)妓灌,可以設(shè)計如下的表結(jié)構(gòu):
//基于數(shù)據(jù)庫創(chuàng)建分布式鎖
DROP TABLE IF EXISTS `method_lock`;
CREATE TABLE `method_lock` (
`id` bigint(0) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`method_name` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '臨界區(qū)方法',
`machine_id` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '機(jī)器id',
`thread_id` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '線程id',
`invalid_time` datetime(0) NOT NULL COMMENT '失效時間',
`desc` varchar(255) CHARACTER SET utf8mb4 NULL DEFAULT NULL COMMENT '描述',
`update_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '更新時間',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE INDEX `idx_method_name`(`method_name`) USING BTREE COMMENT '方法名主鍵'
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '分布式鎖表' ROW_FORMAT = Dynamic;
加鎖:insert into method_lock(method_name, machine_id, thread_id, invalid_time)values("addUser", "001", "thread-01", "2022-01-01 01:01:01")
釋放鎖:delete from method_lock where method_name = "addUser" and machine_id = "001" and thread_id = "thread-01"
說明:客戶端1加鎖成功后,客戶端再執(zhí)行加鎖語句就會因為method_name創(chuàng)建了唯一索引而失敗蜜宪,這樣就達(dá)到不同機(jī)器并發(fā)訪問的問題虫埂。
以上是使用mysql自身的鎖來完成,mysql的鎖是一種排他鎖圃验,其實也可以利用樂觀鎖來完成掉伏。即通過version來完成鎖控制。
但是還需要考慮上面分布式鎖的四個問題?
1.由于數(shù)據(jù)庫的查詢性能問題斧散,需要使用集群部署或者主從復(fù)制
2.為了完成可重入性供常,我們在表結(jié)構(gòu)中添加了machine_id和thread_id來實現(xiàn)。如果客戶端1再次執(zhí)行加鎖過程鸡捐,需要先根據(jù)機(jī)器和線程信息查詢栈暇,如果查詢到則直接返回。如果沒有則說明沒有加鎖成功箍镜,則嘗試加鎖或者失敗源祈。為了查詢是否已經(jīng)加鎖,需要使用select * from table for update
語句鹿寨,此語句可以避免臟讀新博。
3.為了客戶端1線程宕掉后薪夕,不影響其他客戶端操作脚草,需要存儲線程的失效時間,并利用定時任務(wù)清理失效鎖
4.如果線程在規(guī)定時間內(nèi)沒有完成任務(wù)原献,需要利用定時任務(wù)在查詢失效鎖的同時查詢線程狀態(tài)馏慨,如果線程存活就對鎖進(jìn)行續(xù)期。note:這種方式需要考慮的問題很多姑隅,同時性能也不太理想写隶,所以使用這種方式的很少。
②基于redis實現(xiàn)分布式鎖
基于redis實現(xiàn)分布式鎖主要是利用redis的性能和某些特性讲仰。
加鎖:利用redis的SETNX命令和EXPIRE命令完成setnx key val
是如果key不存在則val設(shè)置成功慕趴,返回1;否則val設(shè)置失敗鄙陡,返回0冕房。以下語句可以完成。
key是加鎖的方法趁矾,value是線程的唯一標(biāo)識 set nx key value
存在的問題:
①假設(shè)線程獲取了鎖之后耙册,在執(zhí)行任務(wù)的過程中掛掉,但是線程還沒有執(zhí)行del命令釋放鎖毫捣,那么競爭該鎖的線程都會執(zhí)行不了详拙,產(chǎn)生死鎖的情況。
②釋放鎖的時候為了保證只釋放自己的鎖蔓同,需要先判斷加鎖的value饶辙,如果不等于自己才能刪除。但是這兩部操作并不是原子性的斑粱,所以釋放鎖需要使用lua腳本弃揽。
解決方案:如下代碼所示:
//加鎖過程 public Boolean getLock(String key,String value,Long timeout){ boolean result = this.redisTemplate.opsForValue().setIfAbsent(key,value, Duration.ofSeconds(timeout)); return result; } //釋放鎖過程 public Long releaseLock(String key,String value){ String script= "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; RedisScript<Long> redisScript = new DefaultRedisScript<>(script,Long.class); Long result= (Long)this.redisTemplate.execute(redisScript, Collections.singletonList(key),value); return result; } //調(diào)用過程 //添加自旋邏輯 boolean result = false; do{ result = redisService.getLock(key, value, timeout); }while(!result) try{ ...... //業(yè)務(wù)代碼 }catch(Exception e){ }finally{ //最后釋放鎖的邏輯 redisService.releaseLock(key, value); }
存在的問題:這個方案雖然解決了過期和釋放鎖的問題,但是沒有實現(xiàn)可重入性蹋宦,當(dāng)已經(jīng)加鎖的線程再次加鎖時就會失敗披粟。
解決方案:需要在加鎖失敗后判斷加鎖線程是否是自己,如果是自己則+1返回成功冷冗,否則返回失敗守屉。這樣加鎖和釋放鎖都需要執(zhí)行l(wèi)ua腳本完成多步操作。
local key = KEYS[1]; -- 第1個參數(shù),鎖的key local value = ARGV[1]; -- 第2個參數(shù),鎖的值蒿辙,可以是線程的唯一標(biāo)識 local timeout = ARGV[2]; -- 第3個參數(shù),鎖的自動釋放時間 -- 判斷鎖是否已存在 if (redis.call('exists', key) == 0) then -- 不存在, 則獲取鎖 redis.call('hset', key, value, '1'); redis.call('expire', key, timeout); return 1; end ; -- 鎖已存在拇泛,判斷擁有者是否是自己 if (redis.call('hexists', key, value) == 1) then -- 如果是自己,則重入次數(shù)+1 redis.call('hincrby', key, value, '1'); redis.call('expire', key, timeout); return 1; end ; return 0; -- 鎖已經(jīng)被獲取思灌,并且不是自己俺叭,返回獲取鎖失敗
local key = KEYS[1]; -- 第1個參數(shù),鎖的key local value = ARGV[1]; -- 第2個參數(shù),鎖的值,可以是線程的唯一標(biāo)識 -- 判斷鎖是否已存在 if (redis.call('exists', key) == 0) then return 0; end ; -- 鎖已存在泰偿,判斷擁有者是否是自己 if (redis.call('hexists', key, value) == 0) then return 0; end ; local count = redis.call('hincrby', key, value, '-1'); -- 當(dāng)可重入次數(shù)為0時熄守,需要刪除key if (tonumber(count) == 0) then redis.call('del', key); return 1; end ; return 1;
然而上面的代碼還是有問題,就是續(xù)期的問題耗跛。因為如果線程A在獲取到鎖之后裕照,執(zhí)行業(yè)務(wù)時間較長,鎖過期了调塌。那么其他線程就可以獲得鎖了晋南,所以還需要寫一個定時任務(wù)定時查詢鎖是否快要過期,如果快過期了就續(xù)期一段時間羔砾。
Redisson實現(xiàn)
針對上面這些問題负间,redisson提供了實現(xiàn),它將加鎖和解鎖的lua腳本姜凄,以及續(xù)期功能功能(看門狗)都統(tǒng)一到了一個簡單地邏輯中@Bean public RedissonClient getRedisson() { Config config = new Config(); // 修改看門狗默認(rèn)的鎖過期時間 config.setLockWatchdogTimeout(100); config.useSingleServer().setAddress("redis://localhost:6379").setPassword("123456"); return Redisson.create(config); }
RLock lock = redissonClient.getLock(key); //key和上面的redis鍵意思相同 try { lock.lock(); ..... //業(yè)務(wù)代碼 } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); }
上面代碼就完成了分布式鎖的功能政溃,而且加鎖和解鎖都是通過lua腳本來完成的。簡單方便檀葛,lock.lock()默認(rèn)加鎖時間為30s玩祟, 后臺默認(rèn)會啟動一個WatchDog的看門狗線程,該線程每10s去掃描一下該鎖是否被釋放屿聋,如果沒有釋放那么就延長至30s空扎,這個機(jī)制就是看門狗機(jī)制。如果客戶端請求沒有獲取到鎖润讥,那么它將while循環(huán)獲取繼續(xù)嘗試加鎖转锈。網(wǎng)上關(guān)于redisson的描述圖:
//該方式下會使用默認(rèn)的加鎖時間,即30s楚殿。但是如果自定義程序中自定義加鎖時間撮慨,則看門狗線程不會進(jìn)行鎖續(xù)期。 void lock(); void lock(long leaseTime, TimeUnit unit); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
單機(jī)模式下,redisson還有一個問題:就是線程A如果剛獲取到redis的鎖砌溺,此時redis宕機(jī)了影涉,重新啟動后鎖過期了,然后線程B這時過來是可以加鎖的规伐,然后A線程也認(rèn)為自己還在加鎖蟹倾,這種問題怎么解決?
redis宕機(jī)的時候猖闪,別那么快去重啟redis鲜棠,可以在你的方法里面設(shè)置事務(wù)的超時時間,等到事務(wù)超時再去重啟redis應(yīng)用培慌。
在redis集群環(huán)境下豁陆,redission實際還存在一些問題,就是線程A如果剛獲取到master節(jié)點的鎖吵护,還沒有同步到slave節(jié)點盒音,此時master節(jié)點給掛了,然后線程B這時過來是可以加鎖的何址,但是實際上master已經(jīng)被A加鎖過了里逆,這就是集群環(huán)境下可能出現(xiàn)的問題进胯?
Redlock 是 Redis 的作者 antirez 給出的集群模式的 Redis 分布式鎖用爪,它基于 N 個完全獨立的 Redis 節(jié)點。
算法描述:
①客戶端嘗試獲取 N 個節(jié)點的鎖胁镐,(每個節(jié)點獲取鎖的方式和前面說的緩存鎖一樣)偎血,N 個節(jié)點以相同的 key 和 value 獲取鎖。而且客戶端需要設(shè)置接口訪問超時盯漂,接口超時時間需要遠(yuǎn)遠(yuǎn)小于鎖超時時間颇玷,這樣可以在有 redis 節(jié)點宕機(jī)后,訪問該節(jié)點時能盡快超時就缆,而減小鎖的正常使用帖渠。
②客戶端需要獲得了超過 半數(shù) 節(jié)點的鎖,而且獲取鎖的時間小于鎖的超時時間竭宰,客戶端才獲得了分布式鎖空郊。
③客戶端獲取的鎖的時間為設(shè)置的鎖超時時間減去獲取鎖花費時間。
④如果客戶端獲取鎖失敗了切揭,會依次請求所有節(jié)點刪除所有的鎖狞甚。
note:使用 Redlock 算法,可以保證在掛掉最多 不超過半數(shù)節(jié)點的時候廓旬,分布式鎖服務(wù)仍然能工作哼审,這相比之前的單機(jī)模式大大提高了可用性。有關(guān)redisson分布式鎖問題的問題可以查看基于Redission實現(xiàn)分布式鎖
③基于zookeeper實現(xiàn)分布式鎖
zookeeper是分布式系統(tǒng)協(xié)調(diào)服務(wù),可以解決分布式系統(tǒng)下數(shù)據(jù)同步問題涩盾。我們可以在上面創(chuàng)建樹形結(jié)構(gòu)的節(jié)點十气,相當(dāng)于一個分布式文件系統(tǒng)+watch服務(wù)。而且同一目錄下只能有唯一的文件名春霍。
zookeeper的節(jié)點分為兩類:
1)持久節(jié)點 :節(jié)點一旦創(chuàng)建桦踊,除非被主動刪除,否則一直存在终畅;
2)臨時節(jié)點 :一旦創(chuàng)建該節(jié)點的客戶端會話失效籍胯,則所有該客戶端創(chuàng)建的臨時節(jié)點都會被刪除。
原理: 就是利用zookeeper 的臨時有序節(jié)點和 watcher 機(jī)制來實現(xiàn)离福≌壤牵客戶端創(chuàng)建zookeeper的臨時有序節(jié)點,如果該節(jié)點序號最小則返回成功妖爷,如果不是則監(jiān)聽前一個節(jié)點的蝶涩,一旦客戶端會話結(jié)束,臨時節(jié)點就會釋放絮识,后一個節(jié)點就會監(jiān)聽到該事件并重新獲取鎖绿聘。
面對上面提到的死鎖,可重入次舌,續(xù)期以及非阻塞問題熄攘,zookeeper該如何解決?
死鎖:客戶端會在ZK中創(chuàng)建一個臨時節(jié)點彼念,一旦客戶端獲取到鎖之后突然掛掉(Session連接斷開)挪圾,那么這個臨時節(jié)點就會自動刪除掉。其他客戶端就可以再次獲得鎖逐沙。
不可重入:客戶端在創(chuàng)建節(jié)點的時候哲思,可以把當(dāng)前客戶端的主機(jī)信息和線程信息直接寫入到節(jié)點中,下次想要獲取鎖的時候和當(dāng)前最小的節(jié)點中的數(shù)據(jù)比對一下就可以了吩案。如果相同棚赔,那么自己直接獲取到鎖,如果不一樣就再創(chuàng)建一個臨時的順序節(jié)點徘郭,參與排隊靠益。
續(xù)期問題:zookeeper的臨時節(jié)點不存在過期時間,所以只有在客戶端主動刪除節(jié)點或者斷開連接的情況下才會失效崎岂,所以不存在續(xù)期問題捆毫。
非阻塞鎖:客戶端可以通過在節(jié)點上添加watch監(jiān)聽,一旦節(jié)點有變化冲甘,zookeeper會通知客戶端绩卤。不需要空轉(zhuǎn)途样。
springboot代碼如下:
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </dependency>
ZKLockService zkLockService = new ZKLockService(zooKeeper); try { zkLockService.tryLock(name); //xxxxxxx // 業(yè)務(wù)邏輯 } catch (Exception e) { e.printStackTrace(); } finally { zkLockService.unLock(); }
public class ZKLockService implements AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback, Watcher { private ZooKeeper zooKeeper; private String nodeName; private CountDownLatch countDownLatch = new CountDownLatch(1); private final static String PARENT_NODE = "/zkLock"; public ZKLockService(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } // 創(chuàng)建節(jié)點之后需要阻塞等待,因為zk獲取子節(jié)點的方式是通過回調(diào)的方式濒憋。 public void tryLock(String value) throws InterruptedException { zooKeeper.create(PARENT_NODE + "/lock", value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "orderLock"); countDownLatch.await(); } public void unLock() throws InterruptedException, KeeperException { zooKeeper.delete(nodeName, -1); } //創(chuàng)建子節(jié)點的回調(diào)函數(shù)何暇,獲取子節(jié)點信息 @Override public void processResult(int i, String s, Object o, String s1) { this.nodeName = s1; zooKeeper.getChildren(PARENT_NODE, false, this, o); } //獲取子節(jié)點信息的回調(diào)函數(shù),判斷是否加鎖成功凛驮;如果加鎖失敗裆站,則監(jiān)聽前一個節(jié)點 @Override public void processResult(int i, String s, Object o, List<String> list, Stat stat) { if (list == null || list.isEmpty()) { System.out.println("children is null......"); return; } Collections.sort(list); String minPath = list.get(0); String tempNodeName = this.nodeName.substring(8); if (minPath.equals(tempNodeName)) { System.out.println("加鎖成功"); countDownLatch.countDown(); } else { System.out.println("加鎖失敗"); int index = list.indexOf(tempNodeName); // 只監(jiān)聽前一個結(jié)點,當(dāng)前一個節(jié)點掛掉黔夭,只需要通過后一個節(jié)點即可宏胯。能避免羊群效應(yīng) zooKeeper.exists(PARENT_NODE + "/" + list.get(index - 1), this, this, o); } } //監(jiān)聽前一個節(jié)點的方法實現(xiàn),當(dāng)前一個節(jié)點被刪除后本姥,重新獲取子節(jié)點信息比較判斷 @Override public void process(WatchedEvent watchedEvent) { switch (watchedEvent.getType()) { case NodeDeleted: zooKeeper.getChildren(PARENT_NODE, false, this, "orderLock"); break; default: break; } } @Override public void processResult(int i, String s, Object o, Stat stat) { } }
note:上段代碼create和getChildren方法使用的都是zookeeper的異步方法肩袍,如果使用同步方法,可以這樣寫:
public class ZKLockServiceNew implements AsyncCallback.StatCallback, Watcher { private ZooKeeper zooKeeper; private String nodeName; private final static String PARENT_NODE = "/zkLock"; private CountDownLatch countDownLatch = new CountDownLatch(1); public ZKLockServiceNew(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } public void tryLock(String value) throws InterruptedException, KeeperException { String path = zooKeeper.create(PARENT_NODE + "/lock", value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); nodeName = path; boolean flag = false; do { flag = verifyMinNode(path); } while (!flag); } public void unLock() throws InterruptedException, KeeperException { zooKeeper.delete(nodeName, -1); } @Override public void process(WatchedEvent watchedEvent) { switch (watchedEvent.getType()) { case NodeDeleted: countDownLatch.countDown(); break; default: break; } } @Override public void processResult(int i, String s, Object o, Stat stat) { } private boolean verifyMinNode(String path) throws KeeperException, InterruptedException { List<String> children = zooKeeper.getChildren(PARENT_NODE, false); if (children == null || children.isEmpty()) { System.out.println("children is null......"); return false; } Collections.sort(children); String minPath = children.get(0); String tempNodeName = this.nodeName.substring(8); if (minPath.equals(tempNodeName)) { System.out.println("加鎖成功"); return true; } else { System.out.println("加鎖失敗"); int index = children.indexOf(tempNodeName); zooKeeper.exists(PARENT_NODE + "/" + children.get(index - 1), this, this, "AAA"); countDownLatch.await(); } return false; } }
Curator是已經(jīng)封裝好的zookeeper類婚惫,通過它可以更方便的完成分布式鎖的功能氛赐,以下是示例代碼:
public class CuratorTest { public static void main(String[] args) throws Exception { CuratorFramework zkClient = getClient(); InterProcessMutex zkMutex = new InterProcessMutex(zkClient, "/test/mutex"); ExecutorService executor = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { final int temp = i; executor.submit(() -> { System.out.println("線程" + temp + "啟動"); try { zkMutex.acquire(); //阻塞等待,也可超時等待 System.out.println("線程" + temp + "獲取到鎖"); Thread.sleep(3000); zkMutex.release(); System.out.println("線程" + temp + "釋放鎖"); } catch (Exception e) { throw new RuntimeException(e); } }); } TimeUnit.SECONDS.sleep(60); System.out.println("end----"); } public static CuratorFramework getClient() { ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework zkClient = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy); zkClient.start(); return zkClient; } }
參照:
實現(xiàn)分布式鎖的三種方式
實現(xiàn)分布式鎖的方式
分布式鎖
elk
elk基礎(chǔ)
分布式基礎(chǔ)
分布式基礎(chǔ)理論
Redis可重入鎖的實現(xiàn)設(shè)計
Zookeeper介紹
zookeeper介紹
zookeeper分布式鎖