在如今服務(wù)器的開發(fā)與部署時(shí)瞭吃,往往考慮的不是單機(jī)服務(wù)的承載力了郁副,而是更高一階,如何設(shè)計(jì)出高可用笼才,高負(fù)載尾膊,高容量的服務(wù)架構(gòu)媳危。并且業(yè)務(wù)開發(fā)不是簡(jiǎn)單的將所有業(yè)務(wù)糅合在單臺(tái)服務(wù)上,而是分模塊冈敛,分功能分而治理待笑,才有了微服務(wù)架構(gòu)。然后微服務(wù)設(shè)計(jì)中的每一個(gè)模塊都是可以設(shè)計(jì)為一個(gè)集群抓谴,例如常用的暮蹂,用戶模塊,權(quán)限模塊癌压,商品模塊仰泻,訂單模塊,支付模塊滩届,供應(yīng)鏈等等我纪。服務(wù)器的設(shè)計(jì)越來越復(fù)雜,對(duì)于開發(fā)人員的技術(shù)能力提出更高要求丐吓。
先如今微服務(wù)浅悉,分布式,集群的架構(gòu)思想中券犁,zookeeper成為了大多首選并且非常重要的中間件术健,例如我們熟悉的Hadoop,dubbo中粘衬,都出現(xiàn)了zookeeper 荞估。zookeeper有很多特性咳促,例如有注冊(cè)訂閱,分布式鎖勘伺,隊(duì)列等等功能跪腹。但是zookeeper中提供的java api 并不是很友好,使用起來容易踩坑飞醉。例如創(chuàng)建path時(shí)冲茸,需要判斷path的parent是否存在,必須先創(chuàng)建parent path才能創(chuàng)建子路徑缅帘。還有在添加watcher事件時(shí)轴术,一旦該事件觸發(fā)一次后,如果沒有主動(dòng)將事件重新設(shè)置钦无,他不會(huì)收到第二次逗栽。還有其他一些不太友好的api開發(fā)就不在贅述。
所以才引入了curator工具失暂,他實(shí)際是更高級(jí)的api彼宠,使用起來更加方便。但內(nèi)部核心也是使用zookeeper提供的api弟塞,只是在開發(fā)中不那么繁瑣而已凭峡。
舉個(gè)創(chuàng)建path例子:
public void createPath() {
String host = "127.0.0.1:2181";
String path = root + "/my_path";
CuratorFramework curator = CuratorClient.create(host);
try {
String last = curator.create().creatingParentsIfNeeded().forPath(path, "123".getBytes());
logger.info("創(chuàng)建路徑完成 " + last);
} catch (Exception e) {
e.printStackTrace();
logger.info("創(chuàng)建路徑失敗 異常類型:" + e.getClass().getName() + ", message:" + e.getMessage());
}
}
創(chuàng)建路徑,是不是很簡(jiǎn)單宣肚,不需要關(guān)心root是否已經(jīng)創(chuàng)建想罕。curator自己會(huì)去做驗(yàn)證判斷是否需要?jiǎng)?chuàng)建root路徑悠栓。然而霉涨,我們?cè)偕A一下,既然zookeeper提供了很多特性惭适,那么curator是否也能足夠支撐呢笙瑟?在curator組件中,recipes模塊中癞志,可以了解到很多有意思的地方:
可以看到他提供了很多功能往枷,例如原子計(jì)算,柵欄凄杯,緩存错洁,選舉,鎖戒突,隊(duì)列等屯碴,提供了很豐富的功能。 那么我們來分析一下 curator如何利用zookeeper的特性膊存,實(shí)現(xiàn)這些功能的导而。
首先需要了解一些基本嘗試忱叭,例如zookeeper中Watcher有哪些事件
public enum EventType {
None (-1),
NodeCreated (1),
NodeDeleted (2),
NodeDataChanged (3),
NodeChildrenChanged (4);
}
包含了節(jié)點(diǎn)創(chuàng)建,節(jié)點(diǎn)刪除今艺,節(jié)點(diǎn)數(shù)據(jù)變革和子節(jié)點(diǎn)變更韵丑,這些是zookeeper自己的watcher事件類型。
那么curator組件還會(huì)提出哪些自己的事件呢虚缎?
public enum CuratorEventType
{
/**
* Corresponds to {@link CuratorFramework#create()}
*/
CREATE,
/**
* Corresponds to {@link CuratorFramework#delete()}
*/
DELETE,
/**
* Corresponds to {@link CuratorFramework#checkExists()}
*/
EXISTS,
/**
* Corresponds to {@link CuratorFramework#getData()}
*/
GET_DATA,
/**
* Corresponds to {@link CuratorFramework#setData()}
*/
SET_DATA,
/**
* Corresponds to {@link CuratorFramework#getChildren()}
*/
CHILDREN,
//....后面還有很多 事件
}
這些事件與zookeeper沒有直接關(guān)系撵彻,而是curator通過調(diào)用相應(yīng)api后,會(huì)觸發(fā)相應(yīng)的事件遥巴,例如調(diào)用create()方法千康,會(huì)觸發(fā)CREATE事件。如果調(diào)用checkExists方法铲掐,會(huì)觸發(fā)EXISTES事件拾弃。
cache包
該包內(nèi)主要熟悉NodeCache和PathChildrenCache
? NodeCache,是指可以從本地cache中得到節(jié)點(diǎn)數(shù)據(jù)摆霉,并且該node可以增加watcher事件豪椿,例如節(jié)點(diǎn)的 更新/創(chuàng)建/刪除。然后重新拉取數(shù)據(jù)携栋,然后通過本地注冊(cè)的listeners搭盾,他們會(huì)得到變更通知。
private final CuratorFramework client;
private final String path;
private final boolean dataIsCompressed;
private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
private final AtomicBoolean isConnected = new AtomicBoolean(true);
private ConnectionStateListener connectionStateListener = new ConnectionStateListener();
private Watcher watcher = new Watcher();
這些是NodeCache的基本屬性婉支,
listeners是存儲(chǔ)了節(jié)點(diǎn)緩存變更的監(jiān)聽器鸯隅。
data是當(dāng)前節(jié)點(diǎn)的存儲(chǔ)的數(shù)據(jù),從zookeeper節(jié)點(diǎn)上緩存在本地的數(shù)據(jù)
connectionStateListener是連接狀態(tài)變更監(jiān)聽器向挖,例如重連蝌以,掉線等事件
watcher就是與zookeeper中的一樣,針對(duì)路徑進(jìn)行監(jiān)聽何之。
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)
{
this.client = client;
this.path = PathUtils.validatePath(path);
this.dataIsCompressed = dataIsCompressed;
}
普通的構(gòu)造器跟畅,最重要的是一個(gè)path路徑和client,當(dāng)聲明對(duì)象后溶推,就要啟動(dòng)該NodeCache徊件。
public void start(boolean buildInitial) throws Exception
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
client.getConnectionStateListenable().addListener(connectionStateListener);
if ( buildInitial )
{
client.checkExists().creatingParentContainersIfNeeded().forPath(path);
internalRebuild();
}
reset();
}
首先會(huì)將connectionStateListener狀態(tài)監(jiān)聽器添加到client狀態(tài)監(jiān)聽列表中。如果buildInitial=true蒜危,需要初始化虱痕,那么嘗試創(chuàng)建parent,然后獲取zk上的節(jié)點(diǎn)數(shù)據(jù)辐赞。最終執(zhí)行reset方法部翘。
private void reset() throws Exception
{
if ( (state.get() == State.STARTED) && isConnected.get() )
{
client.checkExists().creatingParentContainersIfNeeded()
.usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
}
}
reset方法其實(shí)將watcher添加path路徑中,并且針對(duì)checkExist方法增加回調(diào)方法backgroundCallback占拍,那么該回調(diào)拿到的CuratorEvent事件肯定是EXIST事件略就。其實(shí)rest并沒有獲取節(jié)點(diǎn)數(shù)據(jù)捎迫。
看一下最終調(diào)用方法processBackgroundResult()方法:
private void processBackgroundResult(CuratorEvent event) throws Exception
{
switch ( event.getType() )
{
case GET_DATA:
{
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
ChildData childData = new ChildData(path, event.getStat(), event.getData());
setNewData(childData);
}
break;
}
case EXISTS:
{
if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
setNewData(null);
}
else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
if ( dataIsCompressed )
{
client.getData().decompressed().usingWatcher(watcher)
.inBackground(backgroundCallback).forPath(path);
}
else {
client.getData().usingWatcher(watcher)
.inBackground(backgroundCallback).forPath(path);
}
}
break;
}
}
}
當(dāng)限制任然是EXISTS事件時(shí),判斷是否有節(jié)點(diǎn)表牢,如果沒有則setNewData方法窄绒,即設(shè)置Node的data數(shù)據(jù)為空崔兴。那么如果存在節(jié)點(diǎn)敲茄,他然后沒有去主動(dòng)獲取得到data數(shù)據(jù)位谋,怎么做的?看一下他執(zhí)行了getData()方法掏父,并且添加了watcher事件秆剪,但是任然通過回調(diào)方法赊淑,那么此時(shí)回調(diào)方法是GET_DATA事件了。最終processBackgroundResult方法是執(zhí)行了case GET_DATA這塊代碼陶缺。因?yàn)榇藭r(shí)發(fā)起獲取數(shù)據(jù)時(shí)饱岸,會(huì)將數(shù)據(jù)添加到CuratorEvent中徽千,此時(shí)生成了ChildData對(duì)象,包括了path罐栈,stat和節(jié)點(diǎn)數(shù)據(jù)等信息荠诬。
考慮一下zookeeper中Watcher是什么時(shí)候才能觸發(fā)事件柑贞,當(dāng)然是節(jié)點(diǎn)刪除聂抢,更新琳疏,或者創(chuàng)建才會(huì)發(fā)起,那么怎么才能使用在NodeCache中新荤。再來看一下NodeCache自帶的屬性watcher實(shí)現(xiàn)類
private Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
try
{
reset();
}
catch(Exception e)
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
}
};
其實(shí)他任然調(diào)用reset方法苛骨,就是這么簡(jiǎn)單苟呐。還是先是發(fā)起EXISTS事件牵素,然后GET_DATA事件笆呆。但是考慮清楚,watcher事件是一次性觸發(fā)功能单起,不會(huì)執(zhí)行第二次嘀倒,所以在reset中测蘑,都會(huì)對(duì)path添加watcher事件康二。
那么NodeCache節(jié)點(diǎn)變更沫勿,如何通知添加在監(jiān)聽容器內(nèi)的監(jiān)聽器的产雹?
private void setNewData(ChildData newData) throws InterruptedException
{
ChildData previousData = data.getAndSet(newData);
if ( !Objects.equal(previousData, newData) )
{
listeners.forEach
(
new Function<NodeCacheListener, Void>()
{
@Override
public Void apply(NodeCacheListener listener)
{
try
{
listener.nodeChanged();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("Calling listener", e);
}
return null;
}
}
);
//.....
}
在setNewData方法中蔓挖,通過與原子應(yīng)用的data中之前存儲(chǔ)的previousData比較瘟判,如果不同。則那么需要遍歷容器內(nèi)的監(jiān)聽器了开镣,最終執(zhí)行nodeChanged方法咽扇。
? PathChildrenCache 子路徑緩存
考慮一下质欲,既然curator能在集群中使用嘶伟,那么舉個(gè)最簡(jiǎn)單的例子九昧,在集群中,增加或者減少服務(wù)癌别,需要及時(shí)發(fā)現(xiàn)才能防止繼續(xù)調(diào)用該服務(wù)展姐。那么在curator如何使用圾笨?當(dāng)然在集群中同等服務(wù)功能中每臺(tái)服務(wù)都是作為一個(gè)節(jié)點(diǎn)角色使用的擂达,那好胶滋,只要監(jiān)聽節(jié)點(diǎn)的變化例如節(jié)點(diǎn)移除镀钓,或者節(jié)點(diǎn)增加了丁溅。就能知道服務(wù)集群中的變更窟赏,那么節(jié)點(diǎn)該有哪些標(biāo)識(shí)涯穷,可以用服務(wù)器的ip和端口組成唯一標(biāo)識(shí)。
所以作煌,引申出來幾個(gè)概念粟誓,
1.需要監(jiān)聽的節(jié)點(diǎn)都是某個(gè)業(yè)務(wù)下parent的子節(jié)點(diǎn)children鹰服,2.針對(duì)添加子節(jié)點(diǎn)悲酷,任然必須在parent下變更设易;
在curator中引入了子節(jié)點(diǎn)管理的幾個(gè)事件
PathChildrenCacheEvent下的Type類型:
public enum Type
{
/**
* A child was added to the path
*/
CHILD_ADDED,
/**
* A child's data was changed
*/
CHILD_UPDATED,
/**
* A child was removed from the path
*/
CHILD_REMOVED,
//還有其他事件
}
例如子節(jié)點(diǎn)新增亡嫌,變更掘而,刪除等其他事件袍睡。在curator采用了大量的異步調(diào)用線程斑胜,并且在PathChildrenCache中通過推送事件方式通知節(jié)點(diǎn)狀態(tài)變更的止潘。
RefreshOperation 刷新事件凭戴,主要調(diào)用PathChildrenCache中refresh方法;
GetDataOperation 獲取節(jié)點(diǎn)數(shù)據(jù)事件者冤,主要調(diào)用getDataAndStat方法()涉枫,異步方式得到節(jié)點(diǎn)數(shù)據(jù)
EventOperation 推送事件愿汰,推送給記錄在事件容器中的監(jiān)聽器衬廷,發(fā)起childEvent方法
那么在PathChildrenCache 分為2種形式的基本路徑泵督,parentPath路徑小腊,和childPath路徑秩冈,在zk中入问,已經(jīng)提過,當(dāng)對(duì)parentPath進(jìn)行監(jiān)聽楣黍,如果parentPath新增節(jié)點(diǎn)租漂,就會(huì)觸發(fā)children事件哩治。所以PathChildrenCache也是利用了這點(diǎn)业筏。
在refresh方法中
void refresh(final RefreshMode mode) throws Exception
{
ensurePath();
final BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if (PathChildrenCache.this.state.get().equals(State.CLOSED)) {
// This ship is closed, don't handle the callback
return;
}
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
processChildren(event.getChildren(), mode);
}
}
};
client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path);
}
對(duì)client添加了childrenWatcher蒜胖,然后內(nèi)部有個(gè)回調(diào)類callback翠勉,他接受的CuratorEventType類型肯定是CHILDREN事件对碌。在processChildren方法中朽们,
private void processChildren(List<String> children, RefreshMode mode) throws Exception
{
Set<String> removedNodes = Sets.newHashSet(currentData.keySet());
for ( String child : children ) {
removedNodes.remove(ZKPaths.makePath(path, child));
}
for ( String fullPath : removedNodes )
{
remove(fullPath);
}
for ( String name : children )
{
String fullPath = ZKPaths.makePath(path, name);
if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) )
{
getDataAndStat(fullPath);
}
updateInitialSet(name, NULL_CHILD_DATA);
}
maybeOfferInitializedEvent(initialSet.get());
}
其中children 是當(dāng)前parent地下所有的子路徑的名字(不是完整的路徑)。與本地記錄的當(dāng)前數(shù)據(jù)苍糠,比較出需要移除的節(jié)點(diǎn)岳瞭,發(fā)送EventOperation事件中的子節(jié)點(diǎn)移除事件瞳筏。然后通過RefreshMode模式或者當(dāng)前currentData中沒有保護(hù)子節(jié)點(diǎn)的全路徑姚炕,那么需要獲取數(shù)據(jù)柱宦。
在getDataAndStat()方法中
void getDataAndStat(final String fullPath) throws Exception
{
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
applyNewData(fullPath, event.getResultCode(), event.getStat(), cacheData ? event.getData() : null);
}
};
if ( USE_EXISTS && !cacheData )
{
client.checkExists().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
}
else
{
// always use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
if ( dataIsCompressed && cacheData )
{
client.getData().decompressed().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
}
else
{
client.getData().usingWatcher(dataWatcher).inBackground(callback).forPath(fullPath);
}
}
}
通過是否需要保持節(jié)點(diǎn)數(shù)據(jù)執(zhí)行相應(yīng)的方法摊沉,但是這里都有個(gè)共同的是说墨,對(duì)該子節(jié)點(diǎn)全路徑添加dataWatcher事件尼斧,那么該路徑的刪除或者變更棺棵,都會(huì)通知到dataWatcher事件中。
在PathChildrenCache中存在2中Watcher事件對(duì)象
private volatile Watcher childrenWatcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));
}
};
private volatile Watcher dataWatcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
try
{
if ( event.getType() == Event.EventType.NodeDeleted )
{
remove(event.getPath());
}
else if ( event.getType() == Event.EventType.NodeDataChanged )
{
offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
}
};
有針對(duì)parent路徑添加childrenWatcher事件,基本上子節(jié)點(diǎn)變更苹熏,都會(huì)觸發(fā)轨域,然后通過刷新事件干发,異步方式重新執(zhí)行refresh方法枉长。
由這對(duì)具體的child路徑田間dataWatcher事件必峰,主要是子節(jié)點(diǎn)路徑刪除自点,或者數(shù)據(jù)變更桂敛,做出相應(yīng)的移除事件或者獲取數(shù)據(jù)事件動(dòng)作术唬。
熟悉了PathChildrenPath的工作原理粗仓,那么在工作中如何整合借浊。首先我們要聲明自己的PathChildrenCacheListener 監(jiān)聽器實(shí)現(xiàn)類蚂斤,有了他曙蒸,才能知道節(jié)點(diǎn)的變更情況。 假設(shè)有個(gè)業(yè)務(wù)功能是多組服務(wù)器支撐提供肖油,需要保證他能可動(dòng)態(tài)調(diào)整服務(wù)器資源森枪。那么上游調(diào)用者就可以通過PathChildrenPath工具監(jiān)聽當(dāng)前提供服務(wù)器組有哪些凶朗,而不用實(shí)時(shí)關(guān)心棚愤,在發(fā)起調(diào)用時(shí),去判斷當(dāng)前存在的服務(wù)器信息了瘸洛。
locks包
在分布式系統(tǒng)中反肋,如果要使用公用某一資源時(shí)候石蔗,往往會(huì)申請(qǐng)一個(gè)分布式鎖养距。curator也提供了分布式鎖,利用了zk的特性耘纱。使用方式很簡(jiǎn)單:
public void testDistributeLock() throws Exception {
String host = "127.0.0.1:2181";
String path = root + "/lock_test";
CuratorFramework curator = CuratorClient.create(host);
InterProcessMutex mutex = new InterProcessMutex(curator, path);
if(mutex.acquire(10, TimeUnit.SECONDS)) {
try {
// 業(yè)務(wù)邏輯
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
}
}
}
path就是比作資源束析,鎖針對(duì)path就行資源獲取畸陡,然后執(zhí)行業(yè)務(wù)邏輯丁恭,最終都需要release鎖資源牲览。觀察一下InterProcessMutex工作原理第献。
首先通過acquire方法了解internalLock方法
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
為什么通過當(dāng)前線程得知鎖的狀態(tài)數(shù)據(jù)仔拟,加入從threadData拿到了lockData數(shù)據(jù)利花,說明在這個(gè)線程之前就已經(jīng)獲取鎖資源了炒事,如果重復(fù)獲取同一個(gè)鎖挠乳,那么只要記錄lockCount數(shù)量即可睡扬。當(dāng)前線程沒有保存的鎖資源威蕉,需要通過internals內(nèi)置鎖工具嘗試獲取鎖韧涨,最終得到一個(gè)lockPath,然后進(jìn)行封裝成LockData保存在threadData中宪哩,沒有返回路徑锁孟,說明獲取鎖失敗了。
看一下LockInternals 類attemptLock方法甜熔,是如何嘗試獲取鎖
LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases)
{
this.driver = driver;
this.lockName = lockName;
this.maxLeases = maxLeases;
this.client = client;
this.basePath = PathUtils.validatePath(path);
this.path = ZKPaths.makePath(path, lockName);
}
在構(gòu)造器中腔稀,有兩個(gè)路徑焊虏,basePath基本路徑诵闭,還有path這個(gè)路徑是通過basePath與lockName(默認(rèn)名字為lock-)組合起來涂圆,說明這里的path是basePath的子節(jié)點(diǎn)路徑模狭。還有一個(gè)參數(shù) driver嚼鹉,默認(rèn)通過StandardLockInternalsDriver類實(shí)現(xiàn)的锚赤,該類主要負(fù)責(zé)創(chuàng)建路徑线脚,判斷是否能獲取鎖浑侥。看一下StandardLockInternalsDriver創(chuàng)建路徑代碼:
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
其中path路徑時(shí)基本路徑與lockName組合起來的最終路徑名稱寓落,但是創(chuàng)建的時(shí)候括丁,采用了EPHEMERAL_SEQUENTIAL模式得到的路徑,首先路徑是非永久狀態(tài)存儲(chǔ)的伶选,如果連接端口史飞,該ourPath就會(huì)刪除尖昏。然后還有特點(diǎn)是有序的,就是ourPath的路徑是path路徑與順序編號(hào)組合在一起的构资,并且是有序遞增編號(hào)的路徑,例如 /test/lock-000001,test/lock-000002蚯窥。每次創(chuàng)建都會(huì)增加編號(hào)掸鹅,而且不會(huì)重復(fù),這是zookeeper中一個(gè)特性拦赠。
再來熟悉LockInternals中的attemptLock方法巍沙。
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try {
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e ) {
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) {
isDone = false;
}
else {
throw e;
}
}
}
if ( hasTheLock ) {
return ourPath;
}
return null;
}
為什么在一個(gè)循環(huán)體內(nèi),是為了容錯(cuò)荷鼠,在創(chuàng)建ourPath失敗時(shí)句携,進(jìn)行重復(fù)嘗試。通過driver創(chuàng)建了一個(gè)非持久的并且有序編號(hào)的ourPath路徑允乐,那么考慮一下矮嫉,因?yàn)槁窂綍r(shí)的編號(hào)是遞增的,那么編號(hào)越小牍疏,那么他獲得鎖的概率應(yīng)該是最大的蠢笋,因?yàn)樗亲钤鐒?chuàng)建路徑,也就分配的編號(hào)小了鳞陨。當(dāng)定義完這個(gè)獲取鎖的規(guī)則后昨寞,后續(xù)就方便很多了。
在internalLockLoop方法中厦滤,如何與等待時(shí)間相結(jié)合援岩,當(dāng)獲得鎖后,就可以成功呢掏导?
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
if ( revocable.get() != null ) {
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) {
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() ) {
haveTheLock = true;
}
else {
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
try {
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null ) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 ) {
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else {
wait();
}
}
catch ( KeeperException.NoNodeException e ) {
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
在while循環(huán)體內(nèi)享怀,獲取了所有basePath下的children節(jié)點(diǎn)名稱,并且進(jìn)行從小到大編號(hào)排序趟咆。然后通過driver中的getsTheLock方法得到我節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)名稱添瓷,如果不存在,說明我的節(jié)點(diǎn)是第一個(gè)忍啸,那么我就能獲取鎖仰坦。
如果存在前一個(gè)節(jié)點(diǎn),構(gòu)建完整的路徑previousSequencePath计雌,并對(duì)該路徑進(jìn)行監(jiān)聽悄晃,增加watcher事件。為什么要這么?還是那個(gè)問題妈橄,編號(hào)是有序遞增的庶近,只有當(dāng)我前一個(gè)節(jié)點(diǎn)釋放鎖了,下一個(gè)是我,我就能得到鎖。那么前一個(gè)節(jié)點(diǎn)如何釋放鎖册烈,可以主動(dòng)刪除節(jié)點(diǎn)砚婆,或者掉線系統(tǒng)自動(dòng)刪除潮模。在對(duì)previousSequencePath添加watcher事件后,進(jìn)入等待,那么當(dāng)前線程等待時(shí)通過誰來喚醒呢?當(dāng)然是通過watcher來喚醒投队,通過調(diào)用notifyAll方式喚醒線程,然后重新執(zhí)行循環(huán)爵川,知道超時(shí)敷鸦,或者得到鎖。這里有個(gè)需要考慮寝贡,在超時(shí)時(shí)扒披,將doDelete標(biāo)記為刪除,然后再finally方法中通過這個(gè)狀態(tài)去刪除ourPath節(jié)點(diǎn)圃泡,為什么要這樣呢碟案?因?yàn)槌瑫r(shí)情況下,認(rèn)定是沒有獲取鎖洞焙,但是路徑我已經(jīng)創(chuàng)建了蟆淀,如果不去主動(dòng)刪除,那么他會(huì)一直占用澡匪,在ourPath后面的路徑就會(huì)一直等著他主動(dòng)刪除。在考慮一下褒链,這里為什么會(huì)存在 KeeperException.NoNodeException異常呢唁情?因?yàn)樵趯?duì)previousSequencePath進(jìn)行監(jiān)聽時(shí),假設(shè)這個(gè)鎖剛好釋放了甫匹,已經(jīng)刪除了previousSequencePath路徑甸鸟,那么當(dāng)前去監(jiān)聽時(shí),路徑就會(huì)不存在兵迅,然后會(huì)拋出節(jié)點(diǎn)不存在的異常抢韭。
這是一個(gè)完整的獲取鎖的流程,也很嚴(yán)謹(jǐn)?shù)奶幚砀鞣N出現(xiàn)異常時(shí)的邏輯恍箭。當(dāng)然獲取鎖刻恭,用完就要進(jìn)行釋放。
在Mutex中的release方法中
public void release() throws Exception
{
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData == null ) {
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 ) {
return;
}
if ( newLockCount < 0 ) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try {
internals.releaseLock(lockData.lockPath);
}
finally
{
threadData.remove(currentThread);
}
}
首先釋放時(shí),從LockData中對(duì)lockCount進(jìn)行扣減鳍贾,如果任然大于0鞍匾,就return了,表示該鎖沒有用完骑科。最終通過internals執(zhí)行releaseLock方法橡淑,然后移除掉threadData中的currentThread數(shù)據(jù),在LockInternal中釋放就很簡(jiǎn)單了咆爽,就是調(diào)用刪除路徑功能梁棠,達(dá)到釋放資源效果。那么監(jiān)聽此時(shí)的lockPath時(shí)斗埂,就能監(jiān)聽到刪除事件掰茶,就會(huì)獲取鎖。
在java并發(fā)包中存在讀寫鎖蜜笤,那么在curator中也存在這樣的讀寫分布式鎖-InterProcessReadWriteLock濒蒋。
大致與InterProcessMutex思想一致的,但是內(nèi)部有兩個(gè)InterProcessMutex組成把兔,一個(gè)是readMutex沪伙,一個(gè)是writeMutex。與Java中的ReentrantReadWriteLock一樣县好,如果當(dāng)前是read獲取到了資源围橡,那么另外一個(gè)read線程也能獲取資源,都是read資源是不進(jìn)行互斥的缕贡,但是如果有write資源翁授,那么就會(huì)互斥。寫與寫資源也是存在互斥的晾咪。所以InterProcessReadWriteLock是如何實(shí)現(xiàn)功能的收擦?
無論read,write鎖谍倦,他們的basePath肯定是一致的塞赂,而且在zk中,創(chuàng)建子節(jié)點(diǎn)為序列化的時(shí)候昼蛀,不會(huì)因?yàn)樽庸?jié)點(diǎn)的名稱不一樣宴猾,編號(hào)會(huì)重置。而且同等對(duì)待叼旋,編號(hào)永遠(yuǎn)是有序遞增的仇哆。那么好了,writeMutex就是互斥鎖夫植,與什么請(qǐng)求資源無關(guān)讹剔,readMutex只要判斷在之前的節(jié)點(diǎn)中存在write路徑,那么就需要等待。那么怎么判斷呢辟拷。在StandardLockInternalsDriver中有個(gè)getsTheLock方法撞羽,該方法返回的PredicateResults結(jié)果才能知道是否能拿到鎖,或者對(duì)前一個(gè)路徑進(jìn)行監(jiān)聽衫冻。
在readMutex鎖重新了該方法
private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception
{
if ( writeMutex.isOwnedByCurrentThread() )
{
return new PredicateResults(null, true);
}
int index = 0;
int firstWriteIndex = Integer.MAX_VALUE;
int ourIndex = -1;
for ( String node : children )
{
if ( node.contains(WRITE_LOCK_NAME) )
{
firstWriteIndex = Math.min(index, firstWriteIndex);
}
else if ( node.startsWith(sequenceNodeName) )
{
ourIndex = index;
break;
}
++index;
}
StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = (ourIndex < firstWriteIndex);
String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
return new PredicateResults(pathToWatch, getsTheLock);
}
首先判斷是否是重復(fù)鎖诀紊,然后查詢第一個(gè)firstWriteIndex寫路徑的位置,與ourIndex自己的位置隅俘,進(jìn)行比較邻奠。如果ourIndex小,那么就可以獲得鎖了为居,如果大碌宴,那么需要監(jiān)聽firstWriteIndex的對(duì)應(yīng)的路徑了。