zookeeper常用的Java客戶端有三種:zookeeper原生的澎蛛、Apache Curator、開源的zkclient躏鱼。Curator官網(wǎng)上這么說
一般生產(chǎn)環(huán)境我們使用curator氮采,它主要解決了三類問題:
1.封裝ZooKeeper client與ZooKeeper server之間的連接處理,當會話超時時可自動重連染苛。
2.提供了一套流式風格的操作API
3.提供ZooKeeper各種分布式協(xié)調(diào)應用場景(recipe, 比如leader選舉鹊漠,分布式鎖主到,分布式緩存等
)的抽象封裝。
本文基于3.1.0版本結(jié)合curator的使用簡要介紹curator的啟動加載,會話管理躯概,通知方式和recipe功能的實現(xiàn)登钥。
curator的用法
curator組件如下:
其中curator-recipes是建立在Curator Framework之上實現(xiàn)的,提供了zookeeper分布式協(xié)調(diào)相關的技巧娶靡,大多時候我們只需要依賴這一個jar包即可牧牢。
啟動加載
//1.初始化過程
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectionString)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
// etc
.build();
//2.啟動過程
client.start();
可分為初始化過程和啟動過程。
初始化過程
通過以上builder模式即可創(chuàng)建一個CuratorFrameworkImpl客戶端實例姿锭,初始化方法主要為:
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
//定義創(chuàng)建原生客戶端實例zookeeper的工廠方法
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
//zookeeper的包裝類塔鳍,可處理curator較低層次的會話保持和同步請求等
this.client = new CuratorZookeeperClient
(
localZookeeperFactory,
builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(),
builder.getConnectionTimeoutMs(),
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
processEvent(event);
}
},
builder.getRetryPolicy(),
builder.canBeReadOnly(),
builder.getConnectionHandlingPolicy()
);
//用于判斷連接斷開和連接超時的狀態(tài),設置curator的連接狀態(tài)呻此,并通過connectionStateManager觸發(fā)連接事件狀態(tài)通知
internalConnectionHandler = isClassic ? new ClassicInternalConnectionHandler() : new StandardInternalConnectionHandler();
//接收事件的通知轮纫。后臺線程操作事件和連接狀態(tài)事件會觸發(fā)
listeners = new ListenerContainer<CuratorListener>();
//當后臺線程發(fā)生異常或者handler發(fā)生異常的時候會觸發(fā)
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
//后臺線程執(zhí)行的操作隊列
backgroundOperations = new DelayQueue<OperationAndData<?>>();
//命名空間
namespace = new NamespaceImpl(this, builder.getNamespace());
//線程工廠方法焚鲜,初始化后臺線程池時會使用
threadFactory = getThreadFactory(builder);
//負責連接狀態(tài)變化時的通知
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent());
//CuratorFrameworkImpl的狀態(tài)蜡感,調(diào)用start方法之前為 LATENT,調(diào)用start方法之后為 STARTED ,調(diào)用close()方法之后為STOPPED
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
//錯誤連接策略
connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");
//有保障的執(zhí)行刪除操作恃泪,其實是不斷嘗試直到刪除成功郑兴,通過遞歸調(diào)用實現(xiàn)
failedDeleteManager = new FailedDeleteManager(this);
//有保障的執(zhí)行刪除watch操作
failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
//服務端可用節(jié)點的檢測器,第一次連接和重連成功之后都會觸發(fā)重新獲取服務端列表
ensembleTracker = new EnsembleTracker(this, builder.getEnsembleProvider());
}
可以看出贝乎,主要初始化了zookeeper客戶端包裝實例CuratorZookeeperClient
情连,與后臺操作,連接事件览效,異常相關的listener
容器却舀,命名空間和負載均衡等。這些都是與curator 功能密切相關的實現(xiàn)锤灿。這里具體看下CuratorZookeeperClient
和ConnectionStateManager
的初始化過程挽拔。
CuratorZookeeperClient初始化
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
{
//3.0.0版本后默認為StandardInternalConnectionHandler,之前session Expired是由服務端告知才會觸發(fā)Expired事件但校,
//StandardConnectionHandler當收到Disconnect事件后螃诅,如果在規(guī)定時間內(nèi)沒有重連到服務器,則會主動觸發(fā)Expired事件
this.connectionHandlingPolicy = connectionHandlingPolicy;
if ( sessionTimeoutMs < connectionTimeoutMs )
{
log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
}
//重新嘗試連接的策略
retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");
this.connectionTimeoutMs = connectionTimeoutMs;
//curator注冊到原生客戶端上的defaultWatcher,會收到和連接狀態(tài)有關的事件通知等状囱,負責超時重連
state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
setRetryPolicy(retryPolicy);
}
可以看出他主要負責了連接的創(chuàng)建和保證連接正常术裸,此外如果直接同步調(diào)用客戶端與服務端操作,他也根據(jù)retryPolicy
負責同步操作時候的連接保證亭枷。ConnectionState
是注冊到原生客戶端上的defaultWatcher.
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
{
this.ensembleProvider = ensembleProvider;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
this.tracer = tracer;
this.connectionHandlingPolicy = connectionHandlingPolicy;
if ( parentWatcher != null )
{
//因為defaultWatcher只能有一個袭艺,通過parentWatchers可實現(xiàn)defaultWatcher接到事件通知時parentWatchers的回調(diào)
parentWatchers.offer(parentWatcher);
}
zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
}
ConnectionStateManager初始化
它主要負責curator相關連接狀態(tài)的處理和通知,如果我們想要監(jiān)聽連接狀態(tài)的改變叨粘,就需要向它的listeners
上注冊一個ConnectionStateListener
猾编。
//連接狀態(tài)事件通知隊列
private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
//需要通知的listeners
private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
//ConnectionStateManager的運行狀態(tài)
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent)
{
this.client = client;
this.sessionTimeoutMs = sessionTimeoutMs;
this.sessionExpirationPercent = sessionExpirationPercent;
if ( threadFactory == null )
{
threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
}
//事件隊列處理線程池
service = Executors.newSingleThreadExecutor(threadFactory);
}
啟動過程
建立與服務端的會話連接和相關功能的啟動
CuratorFrameworkImpl.start
public void start()
{
log.info("Starting");
if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
{
throw new IllegalStateException("Cannot be started more than once");
}
try
{
//啟動connectionStateManager
connectionStateManager.start(); // ordering dependency - must be called before client.start()
final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
{
logAsErrorConnectionErrors.set(true);
}
}
};
this.getConnectionStateListenable().addListener(listener);
//建立與服務端的連接
client.start();
executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
backgroundOperationsLoop();
return null;
}
});
ensembleTracker.start();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleBackgroundOperationException(null, e);
}
}
可以看出分別啟動ConnectionStateManager和CuratorZookeeperClient瘤睹。啟動ConnectionStateManager可做好連接事件通知的準備,啟動CuratorZookeeperClient建立與服務端的會話連接答倡。
會話管理
curator的會話管理是在原生客戶端的會話管理基礎上包裝而來默蚌,在上面的啟動過程中我們介紹到ConnectionState會負責超時的重連,ConnectionStateManager會負責連接狀態(tài)的改變和通知苇羡, connectionHandlingPolicy會負責連接超時的主動觸發(fā)绸吸。此外,在客戶端執(zhí)行一些操作時如果感知到連接斷開设江,也可以主動進行連接重連锦茁。下面會介紹下curator如何在原生客戶端的會話管理基礎上進行會話狀態(tài)的通知
和會話超時的重連
。
我們知道會話連接狀態(tài)相關的事件類型為Watcher.Event.EventType.None
叉存,會通知到客戶端上所有的Watcher码俩,ConnectionState
作為defaultWatcher,它的事件回調(diào)如下:
public void process(WatchedEvent event)
{
if ( LOG_EVENTS )
{
log.debug("ConnectState watcher: " + event);
}
if ( event.getType() == Watcher.Event.EventType.None )
{
//isConnected:客戶當前的連接狀態(tài),true表示已連接(SyncConnected和ConnectedReadOnly狀態(tài))
boolean wasConnected = isConnected.get();
boolean newIsConnected = checkState(event.getState(), wasConnected);
if ( newIsConnected != wasConnected )
{
//如果連接狀態(tài)發(fā)生改變歼捏,則更新
isConnected.set(newIsConnected);
connectionStartMs = System.currentTimeMillis();
if ( newIsConnected )
{
//說明是重連稿存,更新會話超時協(xié)商時間
lastNegotiatedSessionTimeoutMs.set(zooKeeper.getNegotiatedSessionTimeoutMs());
log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());
}
}
}
//通知parentWatchers,注意初始化的時候其實傳入了一個parentWatcher,會調(diào)用CuratorFrameworkImpl.processEvent
for ( Watcher parentWatcher : parentWatchers )
{
TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get());
parentWatcher.process(event);
timeTrace.commit();
}
}
可以看到,對連接狀態(tài)事件的處理主要是checkState
方法
private boolean checkState(Event.KeeperState state, boolean wasConnected)
{
boolean isConnected = wasConnected;
boolean checkNewConnectionString = true;
switch ( state )
{
default:
case Disconnected:
{
isConnected = false;
break;
}
case SyncConnected:
case ConnectedReadOnly:
{
isConnected = true;
break;
}
case AuthFailed:
{
isConnected = false;
log.error("Authentication failed");
break;
}
case Expired:
{
isConnected = false;
checkNewConnectionString = false;
handleExpiredSession();
break;
}
case SaslAuthenticated:
{
// NOP
break;
}
}
if ( checkNewConnectionString )
{
//如果服務端列表發(fā)生變化瞳秽,則更新
String newConnectionString = zooKeeper.getNewConnectionString();
if ( newConnectionString != null )
{
handleNewConnectionString(newConnectionString);
}
}
return isConnected;
}
可以看到會根據(jù)不同的會話狀態(tài)判斷連接是否正常瓣履,isConnected = true
表示正常。當會話超時過期Expired
時练俐,會調(diào)用handleExpiredSession
進行reset
操作袖迎,也就是連接的關閉和重新建立新的會話連接。即會話超時的被動重連
腺晾。在連接過程中燕锥,會根據(jù)客戶端設置的連接重試機制retryPolicy
檢測重連是否超時。
- parentWatchers的回調(diào)
其實在CuratorFramework client
初始化時悯蝉,會初始化一個watcher添加到ConnectionState
的parentWatcher
中归形,負責連接狀態(tài)改變時的會話狀態(tài)改變。
//初始化的parentWatcher
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
processEvent(event);
}
}
會話狀態(tài)改變時調(diào)用上面watcher的process
方法鼻由,調(diào)用至CuratorFrameworkImpl.processEvent
private void processEvent(final CuratorEvent curatorEvent)
{
if ( curatorEvent.getType() == CuratorEventType.WATCHED )
{
//狀態(tài)轉(zhuǎn)換
validateConnection(curatorEvent.getWatchedEvent().getState());
}
//通知所有注冊的CuratorListener
listeners.forEach(new Function<CuratorListener, Void>()
{
@Override
public Void apply(CuratorListener listener)
{
try
{
TimeTrace trace = client.startTracer("EventListener");
listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
trace.commit();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
logError("Event listener threw exception", e);
}
return null;
}
});
}
其中暇榴,validateConnection
負責連接狀態(tài)的轉(zhuǎn)換
void validateConnection(Watcher.Event.KeeperState state)
{
if ( state == Watcher.Event.KeeperState.Disconnected )
{
internalConnectionHandler.suspendConnection(this);
}
else if ( state == Watcher.Event.KeeperState.Expired )
{
connectionStateManager.addStateChange(ConnectionState.LOST);
}
else if ( state == Watcher.Event.KeeperState.SyncConnected )
{
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
可以看出ConnectionStateManager
負責curator連接狀態(tài)的更新。原生客戶端的連接狀態(tài)和curator包裝的連接狀態(tài)對應關系如下:
同時嗡靡,
ConnectionStateManager
會將當前狀態(tài)ConnectionState
放入自身的事件隊列中跺撼,通知所有注冊到自身listeners
的ConnectionStateListener
此外窟感,當相關操作(包括同步和后臺線程的操作讨彼,如getData)發(fā)現(xiàn)連接斷開了,也會調(diào)用client.getZooKeeper()
重連柿祈,(注意底層建立客戶端連接是加鎖的哈误,保證一個客戶端只有一個線程可以創(chuàng)建會話成功)哩至。如CuratorFrameworkImpl的后臺線程任務:
void performBackgroundOperation(OperationAndData<?> operationAndData)
{
try
{
if ( !operationAndData.isConnectionRequired() || client.isConnected() )
{
operationAndData.callPerformBackgroundOperation();
}
else
{
client.getZooKeeper(); // important - allow connection resets, timeouts, etc. to occur
if ( operationAndData.getElapsedTimeMs() >= client.getConnectionTimeoutMs() )
{
throw new CuratorConnectionLossException();
}
operationAndData.sleepFor(1, TimeUnit.SECONDS);
queueOperation(operationAndData);
}
}
catch ( Throwable e )
{
ThreadUtils.checkInterrupted(e);
/**
* Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException
* when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry policy
* and callbacks need to get invoked, etc.
*/
if ( e instanceof CuratorConnectionLossException )
{
WatchedEvent watchedEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null, null);
if ( checkBackgroundRetry(operationAndData, event) )
{
queueOperation(operationAndData);
}
else
{
logError("Background retry gave up", e);
}
}
else
{
handleBackgroundOperationException(operationAndData, e);
}
}
}
通知機制
通知機制其實就是在事件發(fā)生的地方觸發(fā)已經(jīng)注冊好的listerner相應的回調(diào)函數(shù)(觀察者模式)。CuratorFrameworkImpl client可注冊listener的方式有:
- 一次性watch
client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground().forPath(path);
同原生客戶端的watch蜜自,只能生效一次菩貌,需要反復注冊。
- 注冊CuratorListener
// this is one method of getting event/async notifications
CuratorListener listener = new CuratorListener() {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
// examine event for details
}
};
client.getCuratorListenable().addListener(listener);
其實是將listener添加到CuratorFrameworkImpl.listeners
中重荠。當后臺線程完成操作會觸發(fā)相應的事件通知該listener箭阶,如異步創(chuàng)建路徑會觸發(fā)CuratorEventType.CREATE事件。此外當連接狀態(tài)事件觸發(fā)時戈鲁,parentWatcher也會回調(diào)這些listeners.
- 注冊ConnectionStateListener
ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
//Some details
}
};
client.getConnectionStateListenable().addListener(connectionStateListener);
其實是將connectionStateListener添加到connectionStateManager.listeners
中仇参,在連接狀態(tài)發(fā)生改變時,會收到通知婆殿。
- 注冊UnhandledErrorListener
UnhandledErrorListener unhandledErrorListener = new UnhandledErrorListener() {
@Override
public void unhandledError(String message, Throwable e) {
//
}
};
client.getUnhandledErrorListenable().addListener(unhandledErrorListener);
其實是將unhandledErrorListener添加到CuratorFrameworkImpl.unhandledErrorListeners
中诈乒,當后臺線程操作發(fā)生異常或者handler發(fā)生異常的時候會觸發(fā)婆芦,收到通知怕磨。
- 后臺線程操作完成時的回調(diào)
public static void setDataAsyncWithCallback(CuratorFramework client, BackgroundCallback callback, String path, byte[] payload) throws Exception {
// this is another method of getting notification of an async completion
client.setData().inBackground(callback).forPath(path, payload);
}
對于不同的操作,如setData,可通過鏈式調(diào)用inBackground(callback)傳入回調(diào)函數(shù)callback消约,這樣當操作完成后肠鲫,會執(zhí)行回調(diào)函數(shù)。
- 緩存機制或粮,多次注冊
curator提供了三種類型的緩存方式:Path Cache,Node Cache 和Tree Cache滩届。相當于和服務端的節(jié)點數(shù)據(jù)進行對比,沒當數(shù)據(jù)不一致時被啼,會通過watch機制觸發(fā)帜消,后臺回調(diào)更新本地緩存數(shù)據(jù),同時再次注冊相應的watch浓体。同時泡挺,每次重連成功之后也會重新注冊watch,保證了watch不丟失命浴。
結(jié)合上面會話管理和通知機制的介紹娄猫,可以知道原生客戶端的watcher是同步通知的,當然可以在特定watcher中的處理做異步生闲。connectionStateManager.listeners
是由內(nèi)部的線程池做異步通知的媳溺,CuratorFrameworkImpl.listeners
對于連接狀態(tài)的通知是與watcher通知線程同步,由后臺線程通知時為異步碍讯。如果客戶端watcher注冊過多悬蔽,那么可能就會導致重連之后watch丟失(重連會清空sendThread的發(fā)送和接收隊列,可能會導致watch丟失)捉兴,甚至重連不成功(本文分析的版本3.1.0中只要調(diào)用client.getZooKeeper()
就會重連蝎困,和當時的連接狀態(tài)無關录语。所以我覺得看版本,如參考資料二)禾乘。
recipes功能
curator實現(xiàn)的recipes功能主要有:分布式鎖澎埠,Leader選舉,Barriers始藕,計數(shù)器蒲稳,緩存,隊列伍派,事務等弟塞。對于隊列和事務,我們可以使用其他中間件拙已,如kafka,TCC-Transaction等解決决记,這里不做介紹。
分布式鎖
類似于Java的j.u.c包中的鎖倍踪,recipes提供了分布式協(xié)調(diào)下(不同JVM)的互斥鎖(可重入/不可重入)系宫,可重入讀寫鎖,信號量和多鎖對象建车。
可重入互斥鎖
由InterProcessMutex
類實現(xiàn)扩借。基于臨時順序節(jié)點實現(xiàn)缤至,獲取鎖規(guī)則為:
1.當前存在的創(chuàng)建時間最早的臨時節(jié)點獲得鎖權限潮罪,再次進入可重新獲得鎖,內(nèi)部維持了一個ConcurrentMap<Thread, LockData> threadData
記錄鎖的重入次數(shù)领斥。
2.不是第一個節(jié)點的每個臨時順序節(jié)點都在前一個臨時順序節(jié)點上注冊節(jié)點watch
當前一個節(jié)點被刪除時嫉到,后一個節(jié)點會重新根據(jù)鎖規(guī)則競爭鎖。不可重入互斥鎖
由InterProcessSemaphoreMutex
實現(xiàn)月洛,相比InterProcessMutex
只是少了重入功能何恶,實現(xiàn)原理是在互斥鎖InterProcessMutex
的基礎上構(gòu)造一個租約,由InterProcessSemaphoreV2
類實現(xiàn)(下文的信號量)嚼黔。每次只允許客戶端獲得一個租約细层,重入InterProcessSemaphoreMutex
這個鎖就因為租約不夠而無法獲得。可重入讀寫鎖
類似JDK的ReentrantReadWriteLock唬涧,讀寫鎖擁有一個可重入讀鎖和可重入寫鎖疫赎。讀讀操作不互斥,涉及寫操作和其他讀寫操作都互斥碎节,寫鎖可降級為讀鎖捧搞。在使用寫鎖時,應該先獲取讀鎖,再釋放寫鎖实牡。由InterProcessReadWriteLock
類和InternalInterProcessMutex
類實現(xiàn)陌僵。
InterProcessReadWriteLock
類封裝了兩個由InternalInterProcessMutex
類實現(xiàn)的讀鎖readMutex
和寫鎖writeMutex
轴合。它們是在可重入互斥鎖InterProcessMutex
的基礎上創(chuàng)建的创坞。
客戶端在獲取讀鎖或?qū)戞i時均在相同的父路徑下創(chuàng)建臨時順序節(jié)點。獲取讀鎖時
受葛,如果是寫鎖線程或前面節(jié)點沒有寫鎖臨時節(jié)點题涨,可直接獲取讀鎖。否則讀鎖客戶端watch在前面節(jié)點最小的寫鎖上面总滩,直到前面沒有寫鎖節(jié)點時獲取鎖纲堵。獲取寫鎖時
,如果前面沒有臨時順序節(jié)點,則直接獲得闰渔,否則寫鎖客戶端watch在前面一個臨時順序節(jié)點上席函,直到成為第一個臨時順序節(jié)點時獲取到寫鎖。信號量
由InterProcessSemaphoreV2實現(xiàn)冈涧,可指定租約數(shù)茂附。在獲取租約時,先獲取由其維護的互斥鎖督弓,如果租約數(shù)量足夠(也就是與租約有關的臨時順序節(jié)點數(shù)目不到租約數(shù))营曼,可獲得租約,即獲取信號量愚隧,然后釋放互斥鎖進行資源的操作蒂阱,可保證其他線程可繼續(xù)獲取互斥鎖,然后獲取剩余的租約狂塘。同理录煤,歸還租約時,只需刪除與租約有關的臨時順序節(jié)點即可荞胡。多鎖對象
由InterProcessMultiLock
實現(xiàn)辐赞,內(nèi)部維護了一個互斥鎖(可為重入或非重入)列表locks
,只有同時獲得列表中所有的互斥鎖時硝训,才保證了獲取到了這個多鎖對象响委。釋放多鎖對象時同樣需要釋放列表中的每個互斥鎖。
需要注意的是窖梁,這些鎖內(nèi)部沒有使用ConnectionStateListener監(jiān)聽連接狀態(tài)赘风,也就是說當擁有鎖的客戶端會話過期服務端刪掉其臨時節(jié)點了,擁有鎖的客戶端并不能及時感知到這種變化纵刘。
Leader選舉
分布式場景下邀窃,通常選出一個leader負責任務的分派,數(shù)據(jù)的寫入等。此外瞬捕,當leader意外宕機鞍历,新的leader要被選舉出來。recipes提供了兩種選舉方式:Leader latch
和LeaderSelector
肪虎。
Leader latch
客戶端分別在相同的path下創(chuàng)建臨時順序節(jié)點劣砍,選舉規(guī)則為:
1.當前存在的創(chuàng)建時間最早的臨時節(jié)點獲得leader權限。
2.不是第一個節(jié)點的每個臨時順序節(jié)點都在前一個臨時順序節(jié)點上注冊節(jié)點watch
當前一個節(jié)點被刪除時扇救,后一個節(jié)點會重新根據(jù)選舉規(guī)則進行選舉刑枝。
此外,每個客戶端都會注冊一個ConnectionStateListener監(jiān)聽連接狀態(tài)迅腔。當連接異常時装畅,會根據(jù)當前連接策略決定是否釋放leader權限。重新連接上之后如果原來的leader喪失了權限沧烈,會刪除原來節(jié)點并創(chuàng)建新的節(jié)點掠兄,重新參與選舉。
LeaderSelector
基于互斥鎖InterProcessMutex
實現(xiàn)的锌雀,獲取鎖即擁有l(wèi)eader權限蚂夕,用完了會釋放鎖。如果連接異常汤锨,會中斷任務執(zhí)行線程双抽。與LeaderLatch相比, 通過LeaderSelectorListener
可以對領導權進行控制闲礼, 用完了就釋放領導權牍汹,這樣每個節(jié)點都有可能獲得領導權。 而LeaderLatch只有主動調(diào)用close方法才會釋放領導權柬泽。
Barriers
分布式柵欄會使等待在相同節(jié)點路徑上一批線程阻塞慎菲,直到某個條件滿足時,才允許他們繼續(xù)運行锨并。類似于j.u.c包中的CountDownlatch與CyclicBarrier露该,分為DistributedBarrier
與DistributedDoubleBarrier
兩類Barriers。前者線程間可使用一次等待第煮,后者可重復使用解幼。
DistributedBarrier
1.調(diào)用setBarrier
創(chuàng)建barrierPath持久節(jié)點
2.等待線程調(diào)用waitOnBarrier
分別在相同的barrierPath下注冊節(jié)點watch,然后阻塞
3.當調(diào)用釋放柵欄的removeBarrier
方法時,會刪掉持久節(jié)點包警,等待線程繼續(xù)運行
DistributedDoubleBarrier
允許一組固定數(shù)量的分布式進程撵摆,相互等待。直到最后一個進程到達害晦,才允許所有進程繼續(xù)運行特铝。同時離開的時候,也需相互等待,直到最后一個進程要離開鲫剿,才允許所有進程繼續(xù)運行鳄逾。
在分析之前先知道下文中watch的作用:
在節(jié)點上注冊的watch主要做的事情是:1.當節(jié)點被刪除或創(chuàng)建時锰镀,喚醒注冊watch的線程挟纱。2。當當前線程所在客戶端連接斷開時抚笔,喚醒注冊watch的線程笆呆。
1.調(diào)用enter
方法等待線程在barrierPath下創(chuàng)建臨時節(jié)點并在barrierPath/ready
節(jié)點下注冊節(jié)點watch请琳,如果此時臨時節(jié)點數(shù)未達到等待線程的數(shù)目,則阻塞粱挡。否則創(chuàng)建barrierPath/ready
節(jié)點赠幕,并放行。
2.調(diào)用leave
時询筏,將這一批等待線程按照臨時節(jié)點名字排序榕堰,最小節(jié)點線程不斷在最大節(jié)點上面注冊watch,而其他節(jié)點線程都在最小節(jié)點上面注冊watch,其他節(jié)點會主動刪除嫌套,然后阻塞直到只剩下最小節(jié)點的時候最小節(jié)點也刪除逆屡。此時當所有節(jié)點都刪掉了,所有線程可跳出無限循環(huán)踱讨,繼續(xù)運行魏蔗。
計數(shù)器
Curator有兩個計數(shù)器, SharedCount
是用int來計數(shù)痹筛,DistributedAtomicLong
是用long來計數(shù)莺治。
SharedCount
SharedCount管理一個相同path下的共享int值,start的時候在該path節(jié)點下注冊watch帚稠,能夠感知到節(jié)點數(shù)據(jù)的變化谣旁,并更新本地緩存的數(shù)據(jù)值和版本值。如果想改變共享值滋早,則通過本地版本號+最新值 去更新節(jié)點榄审,如果版本過期,則更新失敗并更新本地數(shù)據(jù)杆麸,否則更新成功搁进。類似樂觀鎖。
此外昔头,可以為它增加一個SharedCountListener饼问,可以接收到共享值和連接狀態(tài)的改變事件。
DistributedAtomicLong
DistributedAtomicLong管理一個相同path下的共享long值,調(diào)用trySet
修改時减细,首先會采用樂觀鎖的方式(版本+最新值)進行修改匆瓜,按照嘗試策略retryPolicy
修改直到成功。如果嘗試策略次數(shù)用完仍然失敗,如果允許使用悲觀鎖方式(初始化promotedToLock!=null驮吱,會初始化一個互斥鎖)修改茧妒,則會獲取互斥鎖,然后再修改左冬。
緩存
緩存是客戶端對服務端的數(shù)據(jù)的緩存桐筏,如果服務端數(shù)據(jù)發(fā)生變化,通過watch機制對客戶端通知拇砰,更新緩存數(shù)據(jù)并重新watch梅忌。可分為Path Cache除破、Node Cache和Tree Cache牧氮。
我們知道zookeeper原生的watch是一次性的,每次觸發(fā)之后服務端和客戶端都會清理掉瑰枫。watch分為三種:node watch踱葛,path watch和default watch。
node watch:當監(jiān)控路徑下的節(jié)點數(shù)據(jù)變化光坝,節(jié)點被創(chuàng)建和刪除時觸發(fā)尸诽,通過調(diào)用exists
,getData
方法可注冊該類watch盯另。
path watch:當監(jiān)控路徑下的節(jié)點被刪除性含,新增或刪除子節(jié)點時觸發(fā),通過調(diào)用getChildren
方法可注冊該類watch鸳惯。
default watch:每當連接狀態(tài)發(fā)生改變時商蕴,都會觸發(fā)。通過初始化zookeeper
的時候注冊悲敷。同時也可以作為node watch和path watch傳入exists
究恤,getData
和getChildren
方法中,此時對于監(jiān)控路徑是一次性的后德。
Path Cache
是對節(jié)點路徑下子節(jié)點的新增部宿,修改和刪除的監(jiān)控。當一個子節(jié)點增加瓢湃, 更新理张,刪除時, Path Cache會改變它的數(shù)據(jù)和狀態(tài)绵患。
在調(diào)用start方法
雾叭,連接異常時重連成功
(通過啟動時注冊connectionStateListener),每次收到pathwatch的回調(diào)
時都重新注冊path watch達到始終監(jiān)控子節(jié)點新增和刪除的效果落蝙。同時每次收到watch時织狐,都會獲得子節(jié)點的所有信息暂幼,更新緩存數(shù)據(jù)。如果是新增子節(jié)點移迫,則注冊node watch旺嬉,達到監(jiān)控子節(jié)點數(shù)據(jù)更新的效果,從而觸發(fā)對應的子節(jié)點增加厨埋, 更新邪媳,刪除事件,并通知注冊的PathChildrenCacheListener
荡陷。
Node Cache
NodeCache是對一個節(jié)點的監(jiān)控雨效。當節(jié)點數(shù)據(jù)內(nèi)容修改或者刪除節(jié)點時,都會觸發(fā)本地緩存的更新废赞。
node cache在調(diào)用start方法
徽龟,連接異常時重連成功
(通過啟動時注冊connectionStateListener),每次收到node watch的回調(diào)
時都重新注冊node watch達到始終監(jiān)控節(jié)點的效果蛹头。此外顿肺,每次注冊watch時是后臺線程發(fā)起的戏溺,會在他的響應數(shù)據(jù)上注冊一個回調(diào)函數(shù)渣蜗,負責獲得最新節(jié)點數(shù)據(jù),當節(jié)點數(shù)據(jù)與之前的本地緩存不一樣時旷祸,觸發(fā)ListenerContainer<NodeCacheListener> listeners
的回調(diào)耕拷,同時使注冊的NodeCacheListener感知到。
Tree Cache
Tree Cache是對監(jiān)控路徑下所有節(jié)點(一棵樹)的新增托享,修改和刪除的監(jiān)控骚烧。
首先對于樹上每個節(jié)點會把它當成一個TreeNode
,并在節(jié)點上注冊node watch 和 path watch闰围。當調(diào)用start
方法赃绊,連接異常時重連成功
(通過啟動時注冊connectionStateListener),每次收到節(jié)點上對應 watch的回調(diào)
時都會根據(jù)條件重新構(gòu)造相應TreeNode
羡榴,注冊對應的node watch 和 path watch碧查。同時樹上的節(jié)點變化會通知到注冊的TreeCacheListener。
參考資料:
ZooKeeper的Java客戶端使用
跟著實例學習ZooKeeper的用法: 文章匯總
Zookeeper Client架構(gòu)分析——ZK鏈接重連失敗排查
http://zookeeper.apache.org/doc/r3.4.9/recipes.html#sc_recipes_Locks
感謝您的閱讀校仑,歡迎關注我的公眾號:「碼農(nóng)知識點」,和我進一步交流學習~