當(dāng)分布式部署的時(shí)候, 簡(jiǎn)單的本地鎖是沒辦法滿足需求的. 實(shí)現(xiàn)分布式鎖的方法多樣, 比如基于Mysql或Redis的. 本文介紹基于MongoDB的分布式互斥鎖. 實(shí)現(xiàn)中, 采用Mongoose, 若是直接MongoDB, 也是差不多的.
我們將使用以下Mongoose的Schema在MongoDB中描述鎖:
const LockSchema = new mongoose.Schema({
_id: String, // 鎖名
acquirer: String, // 分布式結(jié)點(diǎn)的uuid
acquiredAt: Date, // 獲取鎖時(shí)的時(shí)間
updatedAt: { type: Date, expires: 2 * 60, default: Date.now } // 更新時(shí)間, 2分鐘后過期自動(dòng)刪除
});
- _id: 這個(gè)_id直接用于存儲(chǔ)鎖名, 直接利用MongoDB中_id的唯一性來保證鎖的唯一
- acquirer: 這種用于保存分布式結(jié)點(diǎn)的uuid, 這樣方便在數(shù)據(jù)中查看是誰(shuí)在使用這把鎖, 以及刪除的時(shí)候, 聯(lián)查這個(gè)屬性, 避免刪錯(cuò)
- acquiredAt: 獲取到鎖的時(shí)候, 存入獲取時(shí)間到這個(gè)屬性, 這樣可以和updatedAt想減, 可得知正常使用的這個(gè)鎖的節(jié)點(diǎn)已經(jīng)使用了的時(shí)長(zhǎng).
-
updatedAt: 更新時(shí)間,
- 初始時(shí)和acquiredAt一致. 然后節(jié)點(diǎn)在使用時(shí), 如果執(zhí)行時(shí)間比較長(zhǎng), 則每隔一段時(shí)間調(diào)用renew函數(shù)更新一次這個(gè)屬性, 避免使用時(shí)長(zhǎng)過長(zhǎng), 導(dǎo)致超過了expires時(shí)間, 而被迫釋放鎖.
- 設(shè)置了自動(dòng)過期時(shí)間, 也就是expires屬性, 這個(gè)屬性對(duì)應(yīng)mongoDB中的expireafterseconds的屬性. 避免節(jié)點(diǎn)獲取鎖后, 掛掉, 從而導(dǎo)致死鎖. 超時(shí)后, MongoDB會(huì)自動(dòng)刪除. 注意: MongoDB的expire調(diào)度是每分鐘一次, 所以不是一過期就立馬刪除的
具體實(shí)現(xiàn)demo
首先dblock.js實(shí)現(xiàn)如下:
// dblock.js
const mongoose = require('mongoose');
mongoose.connect(
'mongodb://127.0.0.1:27017/test',
{ useNewUrlParser: true }
);
const LockSchema = new mongoose.Schema({
_id: String,
acquirer: String,
acquiredAt: Date,
updatedAt: { type: Date, expires: 10, default: Date.now }
});
const Lock = mongoose.model('Lock', LockSchema);
class DBLock {
constructor() {
this._uuid = this.uuid(); // 分布式節(jié)點(diǎn)的uuid
console.log(this._uuid);
}
// 基于時(shí)間戳生成的uuid
uuid() {
var d = Date.now();
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(
c
) {
var r = (d + Math.random() * 16) % 16 | 0;
d = Math.floor(d / 16);
return (c === 'x' ? r : (r & 0x3) | 0x8).toString(16);
});
}
// 獲取一次鎖
async acquire(name) {
try {
const lock = new Lock({
_id: name,
acquirer: this._uuid,
acquiredAt: new Date(),
updatedAt: new Date()
});
await lock.save();
return true;
} catch (e) {
console.log('cannot acquire');
return false;
}
}
// 獲取鎖, 每3s重試一次
async lock(name, retryInterval = 3000) {
while (true) {
if (await this.acquire(name)) {
break;
} else {
await this.sleep(retryInterval);
}
}
}
// 解鎖
async unlock(name) {
await Lock.deleteMany({ _id: name, acquirer: this._uuid });
}
// 續(xù)期
async renew(name) {
let result = await Lock.updateOne(
{ _id: name, acquirer: this._uuid },
{
updatedAt: new Date()
}
);
console.log('renew');
}
// 睡眠
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
let instance = new DBLock();
module.exports = instance;
然后測(cè)試?yán)觤ain.js:
// main.js
const dblock = require('./dblock');
async function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async function main() {
while(true) {
try {
await dblock.lock('send_sms');
console.log('Locked');
await sleep(15 * 1000);
await dblock.renew('send_sms');
await sleep(15 * 1000);
console.log('unlock');
await sleep(3 * 1000);
} finally {
await dblock.unlock('send_sms');
}
}
}
main();
分布式測(cè)試的話, 可以手動(dòng)多開幾個(gè)shell, 同時(shí)運(yùn)行這個(gè)main.js, 即可模擬分布式中的鎖的爭(zhēng)搶及使用.