摘要
前面針對server啟動到選舉leader進(jìn)行了一個小結(jié)掏父,現(xiàn)在進(jìn)入leader和follower的啟動交互過程,需要先講ZooKeeperServer,
在之前源碼閱讀的25節(jié)里面帶過了一部分赢底,這里詳細(xì)講解ZooKeeperServer的源碼
繼承關(guān)系如下
本節(jié)主要講解內(nèi)容如下
父接口
ServerStats.Provider
SessionTracker.SessionExpirer
屬性
內(nèi)部類
DataTreeBuilder && BasicDataTreeBuilder
MissingSessionException
ChangeRecord
State
函數(shù)
構(gòu)造函數(shù)
加載數(shù)據(jù)相關(guān)函數(shù)
啟動相關(guān)函數(shù)
實(shí)現(xiàn)SessionTracker.SessionExpirer接口的函數(shù)
client重連相關(guān)函數(shù)
其他函數(shù)
思考
父接口
ServerStats.Provider
在源碼閱讀第24節(jié)講解了疼鸟,這里不贅述
SessionTracker.SessionExpirer
是SessionTracker的內(nèi)部接口
public static interface SessionExpirer {
void expire(Session session);//過期某個session
long getServerId();//獲取serverId
}
屬性
如下圖
除去log后控,jmx相關(guān)部分,源碼如下
public static final int DEFAULT_TICK_TIME = 3000;//默認(rèn)最短周期
protected int tickTime = DEFAULT_TICK_TIME;//周期時長
/** value of -1 indicates unset, use default */
protected int minSessionTimeout = -1;//默認(rèn)最短會話超時時間
/** value of -1 indicates unset, use default */
protected int maxSessionTimeout = -1;//默認(rèn)最長會話超時時間
protected SessionTracker sessionTracker;//會化跟蹤器
private FileTxnSnapLog txnLogFactory = null;//事務(wù)空镜,快照日志
private ZKDatabase zkDb;//數(shù)據(jù)庫
private final AtomicLong hzxid = new AtomicLong(0);//即zxid
public final static Exception ok = new Exception("No prob");
protected RequestProcessor firstProcessor;//首個請求處理器
protected volatile State state = State.INITIAL;//初始狀態(tài)
/**
* This is the secret that we use to generate passwords, for the moment it
* is more of a sanity check.
*/
static final private long superSecret = 0XB3415C00L;//密碼相關(guān)
private final AtomicInteger requestsInProcess = new AtomicInteger(0);//正在處理的請求個數(shù)
final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();//變更列表
// this data structure must be accessed under the outstandingChanges lock
final HashMap<String, ChangeRecord> outstandingChangesForPath =//變更的path到變更記錄的map
new HashMap<String, ChangeRecord>();
private ServerCnxnFactory serverCnxnFactory;//連接工廠
private final ServerStats serverStats;//服務(wù)器統(tǒng)計器
private final ZooKeeperServerListener listener;//監(jiān)聽浩淘,處理服務(wù)器stop狀態(tài)
private ZooKeeperServerShutdownHandler zkShutdownHandler;//server的關(guān)閉處理器
ChangeRecord是ZooKeeperServer的內(nèi)部類捌朴,下面會介紹
ServerStats,ZooKeeperServerListener都在25節(jié)的源碼介紹過
內(nèi)部類
DataTreeBuilder && BasicDataTreeBuilder
這個類并沒有調(diào)用,不用管
/**
* The server delegates loading of the tree to an instance of the interface
*/
public interface DataTreeBuilder {
public DataTree build();
}
static public class BasicDataTreeBuilder implements DataTreeBuilder {
public DataTree build() {
return new DataTree();
}
}
MissingSessionException
定義異常
public static class MissingSessionException extends IOException {
private static final long serialVersionUID = 7467414635467261007L;
public MissingSessionException(String msg) {
super(msg);
}
}
ChangeRecord
這個數(shù)據(jù)結(jié)構(gòu)為了促進(jìn)PrepRequestProcessor以及FinalRequestProcessor的信息共享,講到調(diào)用鏈的時候再講张抄。
static class ChangeRecord {
ChangeRecord(long zxid, String path, StatPersisted stat, int childCount,
List<ACL> acl) {
this.zxid = zxid;
this.path = path;
this.stat = stat;
this.childCount = childCount;
this.acl = acl;
}
long zxid;
String path;//路徑
//持久化狀態(tài)
StatPersisted stat; /* Make sure to create a new object when changing */
int childCount;
List<ACL> acl; /* Make sure to create a new object when changing */
@SuppressWarnings("unchecked")
ChangeRecord duplicate(long zxid) {//復(fù)制一份
StatPersisted stat = new StatPersisted();
if (this.stat != null) {
DataTree.copyStatPersisted(this.stat, stat);
}
return new ChangeRecord(zxid, path, stat, childCount,
acl == null ? new ArrayList<ACL>() : new ArrayList(acl));
}
}
其中砂蔽,StatPersisted在源碼閱讀7中講DataNode的時候講過了
State
描述當(dāng)前server所處的狀態(tài)
protected enum State {
INITIAL, RUNNING, SHUTDOWN, ERROR;
}
函數(shù)
構(gòu)造函數(shù)
這里列舉處兩個底層調(diào)用的構(gòu)造函數(shù)
/**
* Creates a ZooKeeperServer instance. Nothing is setup, use the setX
* methods to prepare the instance (eg datadir, datalogdir, ticktime,
* builder, etc...)
*
* @throws IOException
*/
public ZooKeeperServer() {//利用set方法設(shè)置其他屬性
serverStats = new ServerStats(this);
listener = new ZooKeeperServerListenerImpl(this);
}
/**
* Creates a ZooKeeperServer instance. It sets everything up, but doesn't
* actually start listening for clients until run() is invoked.
*
* @param dataDir the directory to put the data
*/
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
int minSessionTimeout, int maxSessionTimeout,
DataTreeBuilder treeBuilder, ZKDatabase zkDb) {//配置各種屬性,等run方法被調(diào)用才真正開始接收clients
serverStats = new ServerStats(this);
this.txnLogFactory = txnLogFactory;
this.zkDb = zkDb;
this.tickTime = tickTime;
this.minSessionTimeout = minSessionTimeout;
this.maxSessionTimeout = maxSessionTimeout;
listener = new ZooKeeperServerListenerImpl(this);
LOG.info("Created server with tickTime " + tickTime
+ " minSessionTimeout " + getMinSessionTimeout()
+ " maxSessionTimeout " + getMaxSessionTimeout()
+ " datadir " + txnLogFactory.getDataDir()
+ " snapdir " + txnLogFactory.getSnapDir());
}
加載數(shù)據(jù)相關(guān)
啟動涉及到db的數(shù)據(jù)加載署惯,這里也有集群和單機(jī)兩種左驾,調(diào)用順序?yàn)?/p>
集群調(diào)用順序:
Leader#lead
ZooKeeperServer#loadData
單機(jī)調(diào)用順序:
ServerCnxnFactory#startup
ZooKeeperServer#startdata
ZooKeeperServer#loadData
主要是集群的時候,server選完了leader极谊,由leader才能調(diào)用數(shù)據(jù)加載loadData
下面按照單機(jī)版startdata函數(shù)展開
startdata
初始化zkDb完成數(shù)據(jù)加載
public void startdata()
throws IOException, InterruptedException {
//check to see if zkDb is not null
if (zkDb == null) {
zkDb = new ZKDatabase(this.txnLogFactory);//沒有就構(gòu)造
}
if (!zkDb.isInitialized()) {//沒有初始化就加載數(shù)據(jù)诡右,完成初始化
loadData();
}
}
loadData
恢復(fù)session和數(shù)據(jù),單機(jī)版啟動或者集群版leader選舉之后調(diào)用lead方法時轻猖,會調(diào)用該方法帆吻。
主要完成設(shè)置zxid以及把無效的session給kill掉的工作
/**
* Restore sessions and data
*/
public void loadData() throws IOException, InterruptedException {//設(shè)置zxid,再從db中把舊的session給kill掉
/*
* When a new leader starts executing Leader#lead, it
* invokes this method. The database, however, has been
* initialized before running leader election so that
* the server could pick its zxid for its initial vote.
* It does it by invoking QuorumPeer#getLastLoggedZxid.
* Consequently, we don't need to initialize it once more
* and avoid the penalty of loading it a second time. Not
* reloading it is particularly important for applications
* that host a large database.
*
* The following if block checks whether the database has
* been initialized or not. Note that this method is
* invoked by at least one other method:
* ZooKeeperServer#startdata.
*
* See ZOOKEEPER-1642 for more detail.
*/
if(zkDb.isInitialized()){
setZxid(zkDb.getDataTreeLastProcessedZxid());//設(shè)置當(dāng)前server的zxid
}
else {
setZxid(zkDb.loadDataBase());//設(shè)置當(dāng)前server的zxid
}
// Clean up dead sessions
LinkedList<Long> deadSessions = new LinkedList<Long>();
for (Long session : zkDb.getSessions()) {//獲取臨時會話記錄
if (zkDb.getSessionWithTimeOuts().get(session) == null) {//如果會話已經(jīng)被tracker超時檢測給清除掉了,應(yīng)該是已經(jīng)處理檢測過期咙边,但是異步發(fā)送請求還未完成的情況
deadSessions.add(session);
}
}
zkDb.setDataTreeInit(true);//設(shè)置標(biāo)志位
for (long session : deadSessions) {
// XXX: Is lastProcessedZxid really the best thing to use?
killSession(session, zkDb.getDataTreeLastProcessedZxid());
}
}
這里注意氮凝,為什么需要干這件事情芹缔,在下面思考中會說
里面調(diào)用了setZxid(不展開)以及killSession函數(shù)
killSession
清除db中臨時會話記錄,會話跟蹤器也清除記錄
protected void killSession(long sessionId, long zxid) {
zkDb.killSession(sessionId, zxid);//清除db中相關(guān)臨時會話記錄
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"ZooKeeperServer --- killSession: 0x"
+ Long.toHexString(sessionId));
}
if (sessionTracker != null) {
sessionTracker.removeSession(sessionId);//會化跟蹤器清除記錄
}
}
啟動相關(guān)
入口是ZooKeeperServer#startup,zkServer都是在上述加載了db的數(shù)據(jù)之后频蛔,調(diào)用startup來完成啟動
startup
啟動的入口函數(shù)
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();//創(chuàng)建會化跟蹤器
}
startSessionTracker();//啟動會話管理
setupRequestProcessors();//初始化請求處理鏈路
registerJMX();//注冊jmx
setState(State.RUNNING);//提供服務(wù)
notifyAll();
}
調(diào)用了createSessionTracker等函數(shù),介紹如下
createSessionTracker & startSessionTracker
createSessionTracker 完成會話跟蹤器的創(chuàng)建
protected void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
tickTime, 1, getZooKeeperServerListener());//1是默認(rèn)單機(jī)版實(shí)現(xiàn)的sid
}
這里是默認(rèn)的單機(jī)版實(shí)現(xiàn)误算,在集群版不同的角色有不同的實(shí)現(xiàn),主要是參數(shù)sid不會傳1般码,而是配置中的sid
startSessionTracker 啟動會話跟蹤器
protected void startSessionTracker() {
((SessionTrackerImpl)sessionTracker).start();
}
setState
設(shè)置服務(wù)器運(yùn)行狀態(tài),對于ERROR和SHUTDOWN的state被丧,進(jìn)行對應(yīng)的操作
protected void setState(State state) {//根據(jù)zkShutdownHandler處理state
this.state = state;
// Notify server state changes to the registered shutdown handler, if any.
if (zkShutdownHandler != null) {
zkShutdownHandler.handle(state);//如果是錯誤的state就進(jìn)行關(guān)閉操作
} else {
LOG.error("ZKShutdownHandler is not registered, so ZooKeeper server "
+ "won't take any action on ERROR or SHUTDOWN server state changes");
}
}
在源碼閱讀25:服務(wù)器異常報警盟戏,關(guān)閉機(jī)制講過,這里不贅述
setupRequestProcessors
安裝請求處理鏈路,是PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor順序
具體在后面請求處理鏈路再講
protected void setupRequestProcessors() {//安裝請求處理鏈路
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);//鏈路是PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor順序
((PrepRequestProcessor)firstProcessor).start();//啟動鏈路
}
實(shí)現(xiàn)SessionTracker.SessionExpirer接口的函數(shù)
兩個函數(shù)getServerId和expire
public long getServerId() {//默認(rèn)實(shí)現(xiàn),不同角色的實(shí)現(xiàn)返回值不同
return 0;
}
public void expire(Session session) {//過期某個session
long sessionId = session.getSessionId();
LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
+ ", timeout of " + session.getTimeout() + "ms exceeded");
close(sessionId);
}
close函數(shù)在下面會講
client重連相關(guān)
processConnectRequest用于處理client的連接請求甥桂,不展開
值得注意的地方是重連的調(diào)用
展開如下
reopenSession
重連的核心函數(shù)
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
int sessionTimeout) throws IOException {
if (!checkPasswd(sessionId, passwd)) {//如果密碼不對
finishSessionInit(cnxn, false);//完成初始化柿究,傳入valid值為false
} else {
revalidateSession(cnxn, sessionId, sessionTimeout);//如果密碼正確,判斷會話跟蹤器的記錄
}
}
checkPasswd
驗(yàn)證sessionId和傳遞來的密碼的正確性
protected boolean checkPasswd(long sessionId, byte[] passwd) {
return sessionId != 0
&& Arrays.equals(passwd, generatePasswd(sessionId));
}
generatePasswd
根據(jù)sessionId生成密碼
byte[] generatePasswd(long id) {
Random r = new Random(id ^ superSecret);
byte p[] = new byte[16];
r.nextBytes(p);
return p;
}
revalidateSession
在會話跟蹤器SessionTracker中判斷會話是否還有小
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
int sessionTimeout) throws IOException {//驗(yàn)證會話跟蹤器是否有對應(yīng)記錄黄选,獲取結(jié)果蝇摸,再調(diào)用finishSessionInit
boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(sessionId) +
" is valid: " + rc);
}
finishSessionInit(cnxn, rc);
}
finishSessionInit
完成會話初始化,根據(jù)參數(shù)valid代表認(rèn)證通過與否办陷,用來判斷server是接收連接請求貌夕,還是發(fā)出closeConn的請求,不展開,重要部分如下
其他函數(shù)
除去的get,set,jmx,shutdown相關(guān)函數(shù)民镜,剩下重要函數(shù)如下
部分函數(shù)列舉如下
getNextZxid
獲取下一個server的zxid,調(diào)用方需要確狈茸ǎ控制并發(fā)順序
long getNextZxid() {
return hzxid.incrementAndGet();
}
close
上面ZooKeeperServer#expire調(diào)用了close函數(shù),介紹如下
該函數(shù)用于提交一個 關(guān)閉某個sessionId 的請求
private void close(long sessionId) {
submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
}
submitRequest
這里有兩個函數(shù)
private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
int xid, ByteBuffer bb, List<Id> authInfo) {
Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);//生成request
submitRequest(si);//提交請求
}
public void submitRequest(Request si) {
if (firstProcessor == null) {//等待第一個處理器構(gòu)造完成
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn);//更新SessionTracker相關(guān)統(tǒng)計
boolean validpacket = Request.isValid(si.type);//驗(yàn)證Request的OpCode
if (validpacket) {
firstProcessor.processRequest(si);//支持的type就調(diào)用第一個處理鏈來處理
if (si.cnxn != null) {
incInProcess();//正在處理的請求個數(shù) +1
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}
剩下的
函數(shù) | 備注 |
---|---|
public void dumpConf(PrintWriter pwriter) | ServerCnxn接收"conf"的cmd時制圈,輸出配置到PrintWriter |
void removeCnxn(ServerCnxn cnxn) | db中刪除這個ServerCnxn(清除Watcher記錄) |
public void takeSnapshot() | 記錄快照们童,把sessions和dataTree信息保存至快照 |
void touch(ServerCnxn cnxn) | touch一個ServerCnxn的sessionId畔况,更新SessionTracker對應(yīng)統(tǒng)計,在submitRequest即提交請求時調(diào)用 |
processPacket | 處理普通請求的函數(shù)慧库,除了sasl以及auth以外跷跪,大部分進(jìn)入submitRequest |
dumpEphemerals | ServerCnxn接收"dump"的cmd時,把臨時節(jié)點(diǎn)信息輸出到PrintWriter |
processSasl | 處理請求中是sasl的部分 |
processTxn | 調(diào)用鏈處理完之后完沪,最終處理事務(wù)請求 |
思考
ZooKeeperServer#loadData為什么會出現(xiàn)數(shù)據(jù)不一致而需要清理session
之前在源碼21節(jié) 會話管理中講解了會話清除域庇,在sessionTracker的記錄是馬上清除的,而DateTree中臨時會話的清除是通過調(diào)用鏈一步步來的覆积,也就是說兩個步驟不是同步的听皿,所以如果中間服務(wù)器狀態(tài)改變了,會出現(xiàn)不一致的情況
屬性requestsInProcess什么時候變化
requestsInProcess代表正在處理的請求個數(shù)
增加時宽档,調(diào)用鏈
ZooKeeperServer#submitRequest(org.apache.zookeeper.server.Request)調(diào)用
ZooKeeperServer#incInProcess
減少時尉姨,調(diào)用鏈
FinalRequestProcessor#processRequest
ZooKeeperServer#decInProcess
就是說發(fā)出請求時,requestsInProcess+1吗冤,最后完成請求時又厉,requestsInProcess-1.涉及到請求處理鏈。
每個連接的密碼是怎么生成的
ZooKeeperServer#checkPasswd調(diào)用
ZooKeeperServer#generatePasswd
就是sessionId要和sessionId^superSecret生成的第一個隨機(jī)數(shù)相匹配即可
密碼不是client端設(shè)置的椎瘟,是根據(jù)sessionId生成的
client重連的機(jī)制是怎樣的
ZooKeeperServer#processConnectRequest 里面調(diào)用reopenSession中
在上面已經(jīng)講了,核心就是
1.先看密碼對不對
2.再看SessionTracker中是否還有效
3.都通過了才允許重連
問題
ChangeRecord如何促進(jìn)PrepRequestProcessor以及FinalRequestProcessor的信息共享
這里還沒有深入看覆致,先存疑
吐槽
需要思考的細(xì)節(jié)太多了
比如思考中提到的loadData為什么會出現(xiàn)數(shù)據(jù)不一致,屬于某種異常情況的處理
client重連的代碼
為什么不放到另外一個類里面去