curator創(chuàng)建zookeeper連接
1.初始化CuratorFramework client = CuratorFrameworkFactory.newClient()
1)入?yún)ⅲ悍?wù)器IP地址,session超時(shí)時(shí)間澜汤,連接超時(shí)時(shí)間,重試策略
2)初始化ZookeeperFactory,實(shí)現(xiàn)newZooKeeper方法,該方法實(shí)現(xiàn)zookeeper的連接創(chuàng)建
3)初始化CuratorZookeeperClient居灯,傳入Watcher香浩,但這個(gè)并不是傳給zookeeper的,zookeeper回調(diào)時(shí)恭垦,會(huì)調(diào)到這個(gè)watcher,這個(gè)watcher會(huì)回調(diào)我們創(chuàng)建連接時(shí)的client.getCuratorListenable().addListener(new LcpCuratorListener(ip));這個(gè)操作格嗅》Γ可以用于我們對(duì)事件的監(jiān)控與處理(eventReceived(CuratorFramework client, CuratorEvent event))。
4)初始化ConnectionState屯掖,將3)中的watcher傳給ConnectionState的parentWatchers屬性玄柏,ConnectionState也是最終傳給zookeeper的watcher
2.創(chuàng)建連接client.start();
1)連接狀態(tài)管理 connectionStateManager.start()
connectionStateManager中,BlockingQueue<ConnectionState>保存著連接狀態(tài)的狀態(tài)變化值贴铜,
start()方法中循環(huán)獲取隊(duì)列中的狀態(tài)值粪摘,然后執(zhí)行在創(chuàng)建client時(shí) client.getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));進(jìn)行狀態(tài)變化的監(jiān)控或處理
2)連接創(chuàng)建 client.start();
該方法主要執(zhí)行的是ConnectionState的reset()方法,reset ()主要完成老連接的關(guān)閉绍坝,和新連接的創(chuàng)建徘意,此處創(chuàng)建連接即調(diào)用初始化ZookeeperFactory實(shí)現(xiàn)的newZooKeeper方法
3.zookeeper回調(diào)watcher
傳入zookeeper的watcher是ConnectionState對(duì)象,則回調(diào)時(shí)轩褐,則先調(diào)用ConnectionState中的process方法椎咧,此處會(huì)判斷連接狀態(tài),SyncConnected灾挨,ConnectedReadOnly則連接成功邑退,Expired連接過(guò)期,則重新調(diào)用2中的reset()方法,其他狀態(tài)則連接失敗劳澄。如果連接狀態(tài)有變化則通過(guò)AtomicBoolean進(jìn)行保存地技。
此處還會(huì)調(diào)用之前初始化進(jìn)去的parentWatchers,回調(diào)到初始化CuratorZookeeperClient時(shí)傳入的watcher秒拔。此處莫矗,會(huì)校驗(yàn)連接狀態(tài),并將連接狀態(tài)加入ConnectionStateManager狀態(tài)管理器重進(jìn)行管理(ConnectionStateManager的BlockingQueue<ConnectionState>)。加入ConnectionStateManager管理的狀態(tài)會(huì)在connectionStateManager.start()中獲取到作谚,并可以通過(guò) client.getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));來(lái)監(jiān)控或處理
//創(chuàng)建Curator連接
public void init(){
Assert.hasText(zkServer,"zkServer is empty");
Assert.hasText(zkPath,"zkPath is empty");
ip = IpUtils.getOneIpV4();
//RetryNTimes 重試策略三娩,
client = CuratorFrameworkFactory.newClient(zkServer,sessionTimeoutMs,connectionTimeoutMs,new RetryNTimes(maxRetries,sleepMsBetweenRetries));
client.getCuratorListenable().addListener(new LcpCuratorListener(ip));
client.getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));
client.getUnhandledErrorListenable().addListener(new LcpErrorListener(ip));
client.start();
}
client = CuratorFrameworkFactory.newClient(zkServer,sessionTimeoutMs,connectionTimeoutMs,new RetryNTimes(maxRetries,sleepMsBetweenRetries));
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
{
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
build();
}
//返回一個(gè)CuratorFrameworkImpl對(duì)象
public CuratorFramework build()
{
return new CuratorFrameworkImpl(this);
}
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
//初始化ZookeeperFactory
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
初始化ConnectionState,HandleHolder
this.client = new CuratorZookeeperClient
(
localZookeeperFactory,
builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(),
builder.getConnectionTimeoutMs(),
builder.getWaitForShutdownTimeoutMs(),
//這個(gè)watcher并不是真正傳給zookeeper的watcher妹懒,傳給zookeeper的是ConnectionState雀监,
//ConnectionState中重寫process(WatchedEvent event)方法中,會(huì)調(diào)用這個(gè)Watcher
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
//在這里眨唬,實(shí)現(xiàn)CuratorListener接口的listener重寫eventReceived方法会前,接收zk事件信息
processEvent(event);
}
},
builder.getRetryPolicy(),
builder.canBeReadOnly(),
builder.getConnectionHandlingPolicy()
);
//zk連接狀態(tài)的管理類, 狀態(tài)發(fā)生變化時(shí)匾竿,回掉listener的
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator());
//K節(jié)點(diǎn)默認(rèn)值為本機(jī)IP瓦宜,ZK本身是不允許創(chuàng)建沒(méi)有value的節(jié)點(diǎn)的,但curator允許岭妖,就是使用了該默認(rèn)值
byte[] builderDefaultData = builder.getDefaultData();
//省略其他變量賦值
}
CuratorFramework.start();
@Override
public void start()
{
try
{
//開啟 連接狀態(tài)管理
connectionStateManager.start();
//CuratorZookeeperClient中的start方法临庇,真正與ZK建立連接
client.start();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleBackgroundOperationException(null, e);
}
}
CuratorZookeeperClient
public void start() throws Exception
{
log.debug("Starting");
if ( !started.compareAndSet(false, true) )
{
throw new IllegalStateException("Already started");
}
//ConnectionState 的start方法
state.start();
}
ConnectionState類
void start() throws Exception
{
log.debug("Starting");
ensembleProvider.start();
reset();
}
synchronized void reset() throws Exception
{
log.debug("reset");
//用來(lái)記錄zookeeper實(shí)例創(chuàng)建次數(shù)
instanceIndex.incrementAndGet();
isConnected.set(false);
//連接開始時(shí)間
connectionStartMs = System.currentTimeMillis();
//HandleHolder 關(guān)閉老的zookeeper實(shí)例,重新構(gòu)建新的helper
zooKeeper.closeAndReset();
//調(diào)用zookeeperFactory.newZooKeeper創(chuàng)建原生zookeeper連接
zooKeeper.getZooKeeper();
}
HandleHolder類
void closeAndReset() throws Exception
{
//如果有的話關(guān)閉之前的zookeeper實(shí)例昵慌,重構(gòu)HandleHolder
internalClose(0);
helper = new Helper()
{
private volatile ZooKeeper zooKeeperHandle = null;
private volatile String connectionString = null;
@Override
public ZooKeeper getZooKeeper() throws Exception
{
synchronized(this)
{
if ( zooKeeperHandle == null )
{
connectionString = ensembleProvider.getConnectionString();
//這里創(chuàng)建zookeeper連接假夺,傳入的watcher就是 ConnectionState
zooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly);
}
helper = new Helper()
{
@Override
public ZooKeeper getZooKeeper() throws Exception
{
return zooKeeperHandle;
}
@Override
public String getConnectionString()
{
return connectionString;
}
@Override
public int getNegotiatedSessionTimeoutMs()
{
return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
}
};
return zooKeeperHandle;
}
}
@Override
public String getConnectionString()
{
return connectionString;
}
@Override
public int getNegotiatedSessionTimeoutMs()
{
return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0;
}
};
}
//通過(guò)上面的helper實(shí)現(xiàn)知道這里真正與ZK建立連接
ZooKeeper getZooKeeper() throws Exception
{
return (helper != null) ? helper.getZooKeeper() : null;
}
創(chuàng)建zookeeper連接之后,watcher接收zookeeper返回的連接事件并進(jìn)行處理废离,這里的watcher就是ConnectionState類侄泽,執(zhí)行其中的process方法
@Override
public void process(WatchedEvent event)
{
//這里為None說(shuō)明收到的事件是ZK連接狀態(tài)改變的事件
if ( event.getType() == Watcher.Event.EventType.None )
{
boolean wasConnected = isConnected.get();
boolean newIsConnected = checkState(event.getState(), wasConnected);
//連接狀態(tài)發(fā)生變化
if ( newIsConnected != wasConnected )
{
isConnected.set(newIsConnected);
//記錄連接開始時(shí)間
connectionStartMs = System.currentTimeMillis();
//連接狀態(tài)變化為已連接則記錄新協(xié)商的回話超市時(shí)間
if ( newIsConnected )
{
//重置session超時(shí)時(shí)間
lastNegotiatedSessionTimeoutMs.set(zooKeeper.getNegotiatedSessionTimeoutMs());
log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());
}
}
}
//回調(diào)CuratorZookeeperClient創(chuàng)建時(shí)的watcher,
for ( Watcher parentWatcher : parentWatchers )
{
OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
//回調(diào)CuratorZookeeperClient創(chuàng)建時(shí)的watcher
parentWatcher.process(event);
trace.commit();
}
}
// 獲取當(dāng)前連接狀態(tài)
private boolean checkState(Event.KeeperState state, boolean wasConnected)
{
// AtomicBoolean isConnected = new AtomicBoolean(false); 原子boolean保存連接狀態(tài)
boolean isConnected = wasConnected;
boolean checkNewConnectionString = true;
switch ( state )
{
default:
//連接斷開
case Disconnected:
{
isConnected = false;
break;
}
//連接成功
case SyncConnected:
case ConnectedReadOnly:
{
isConnected = true;
break;
}
//權(quán)限驗(yàn)證失敗連接失敗
case AuthFailed:
{
isConnected = false;
log.error("Authentication failed");
break;
}
//連接過(guò)期
case Expired:
{
isConnected = false;
checkNewConnectionString = false;
//處理連接過(guò)期
//調(diào)用ConnectionState.reset() 重新構(gòu)建zookeeper連接
handleExpiredSession();
break;
}
case SaslAuthenticated:
{
// NOP
break;
}
}
//當(dāng)連接狀態(tài)發(fā)生改變且不是會(huì)話過(guò)期時(shí)蜻韭,檢查ZK地址是否發(fā)生變化
if ( checkNewConnectionString )
{
String newConnectionString = zooKeeper.getNewConnectionString();
if ( newConnectionString != null )
{ //處理ZK地址發(fā)生變化
handleNewConnectionString(newConnectionString);
}
}
return isConnected;
}
parentWatcher.process(event);回調(diào)初始化CuratorZookeeperClient時(shí)傳入的watcher
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{ //將zookeeper的event包裝成CuratorEvent
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
processEvent(event);
}
}
private void processEvent(final CuratorEvent curatorEvent)
{
if ( curatorEvent.getType() == CuratorEventType.WATCHED )
{
//校驗(yàn)連接狀態(tài),并將狀態(tài)加入connectionStateManager進(jìn)行管理
validateConnection(curatorEvent.getWatchedEvent().getState());
}
listeners.forEach(new Function<CuratorListener, Void>()
{
@Override
public Void apply(CuratorListener listener)
{
try
{
OperationTrace trace = client.startAdvancedTracer("EventListener");
//去回調(diào)創(chuàng)建client時(shí)的client.getCuratorListenable().addListener(new LcpCuratorListener(ip));
listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
trace.commit();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
logError("Event listener threw exception", e);
}
return null;
}
});
}
void validateConnection(Watcher.Event.KeeperState state)
{
if ( state == Watcher.Event.KeeperState.Disconnected )
{
internalConnectionHandler.suspendConnection(this);
}
else if ( state == Watcher.Event.KeeperState.Expired )
{ //將狀態(tài)加入 阻塞隊(duì)列中柿扣,在connectionStateManager.start()中循環(huán)獲取該隊(duì)列中的狀態(tài)數(shù)據(jù)肖方,并執(zhí)行我們初始化client時(shí)的getConnectionStateListenable().addListener(new LcpConnectionStateListener(ip));
connectionStateManager.addStateChange(ConnectionState.LOST);
}
else if ( state == Watcher.Event.KeeperState.SyncConnected )
{
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
unSleepBackgroundOperations();
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
connectionStateManager.start(); 開啟連接狀態(tài)管理
public void start()
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
processEvents();
return null;
}
}
);
}
private void processEvents()
{ //注意是個(gè)循環(huán),一直在獲取 上面那個(gè)阻塞隊(duì)列中的狀態(tài)值
while ( state.get() == State.STARTED )
{
try
{
//第一次ZK還沒(méi)有建立連接未状,這里得到的就是用戶指定的會(huì)話超時(shí)時(shí)間
int useSessionTimeoutMs = getUseSessionTimeoutMs();
long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;
long pollMaxMs = useSessionTimeoutMs - elapsedMs;
//這個(gè)隊(duì)列就是剛才放進(jìn)去事件的隊(duì)列
final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
if ( newState != null )
{
if ( listeners.size() == 0 )
{
log.warn("There are no ConnectionStateListeners registered.");
}
//這里僅僅就是回調(diào)監(jiān)聽器StandardListenerManager<ConnectionStateListener>
//client.getConnectionStateListenable().addListener(new ConnectionStateListener());
//連接狀態(tài)變化
listeners.forEach(listener -> listener.stateChanged(client, newState));
}
//該值默認(rèn)100俯画,如果長(zhǎng)時(shí)間沒(méi)有收到事件變化就判斷下會(huì)話是否過(guò)期
else if ( sessionExpirationPercent > 0 )
{
synchronized(this)
{
checkSessionExpiration();
}
}
}
catch ( InterruptedException e )
{
// swallow the interrupt as it's only possible from either a background
// operation and, thus, doesn't apply to this loop or the instance
// is being closed in which case the while test will get it
}
}
}