開篇
很早之前就開始使用zookeeper了沃饶,當(dāng)時就覺的很好用,基于zookeeper還把一個不支持分項布式部署和任務(wù)分發(fā)的通用文件采集器改成了分布式的石抡,后來拜讀了《從Paxos到Zookeeper分布式一致性原理與實踐》檐嚣,之后自己就按照幾種不同的場景debug了zookeeper單機(jī)版源代碼,由于當(dāng)時debug完之后沒有做相應(yīng)的筆記啰扛,時間久了很多地方的知識點都忘記了嚎京,于是最近又把zookeeper的源碼讀了一下,獲益良多隐解,記錄于此
Server端的啟動過程
1. 參數(shù)解析
我們先分析zookeeper單機(jī)版Server端的啟動過程鞍帝,啟動入口類是org.apache.zookeeper.server.ZooKeeperServerMain,需要傳入zoo.cfg作為啟動參數(shù)煞茫,在zoo.cfg中配置的參數(shù)會被解析成ServerConfig的各個屬性帕涌,這里面需要提到的是minSessionTimeout,maxSessionTimeout這兩個參數(shù)续徽,他們是根據(jù)tickTime算出來的
下面是ServerConfig的屬性信息
我們注解幾個重要的參數(shù)
- clientPortAddress 服務(wù)端監(jiān)聽客戶端連接端口
- dataDir 服務(wù)端zookeeper節(jié)點信息持久化的位置
- logDir 服務(wù)端事物操作日志記錄的位置
- tickTime 基于tickTime可以計算出minSessionTimeout和maxSessionTimeout
- maxClientCnxns 同一個IP地址可以創(chuàng)建的最大連接數(shù)
- listenBacklog 用來設(shè)置socket連接隊列的最大值
runFromConfig
runFromConfig根據(jù)解析好的參數(shù)來啟動zookeeper server
- 創(chuàng)建 FileTxnSnapLog類
FileTxnSnapLog 包含了兩個重要的屬性txnLog和snapLog分別指向zookeeper的事物日志和數(shù)據(jù)持久化文件
- 創(chuàng)建ZookeeperServer
ZookeeperServer是server端的代表類
- 創(chuàng)建adminServer
adminServer使用jetty作為服務(wù)器默認(rèn)端口是8080蚓曼,通過http://ip:8080/commands
可以查看zookeeper支持的命令
- 創(chuàng)建ServerCnxnFactory
ServerCnxnFactory用來管理服務(wù)端的連接,有兩種實現(xiàn)NIOServerCnxnFactory钦扭,NettyServerCnxnFactory
默認(rèn)情況下是NIOServerCnxnFactory
我看下NIOServerCnxnFactory的configure方法
public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
if (secure) {
throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
}
configureSaslLogin();
maxClientCnxns = maxcc;
initMaxCnxns();
//sessionlessCnxnTimeout是做檢查session是否過期的時間間隔
sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
// We also use the sessionlessCnxnTimeout as expiring interval for
// cnxnExpiryQueue. These don't need to be the same, but the expiring
// interval passed into the ExpiryQueue() constructor below should be
// less than or equal to the timeout.
//用來管理各個連接的超時情況的容器
cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
//處理各個連接超時的線程
expirerThread = new ConnectionExpirerThread();
int numCores = Runtime.getRuntime().availableProcessors();
// 32 cores sweet spot seems to be 4 selector threads
//根據(jù)服務(wù)器可用的處理器個數(shù)計算得到selector線程的個數(shù)
numSelectorThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
Math.max((int) Math.sqrt((float) numCores / 2), 1));
if (numSelectorThreads < 1) {
throw new IOException("numSelectorThreads must be at least 1");
}
//獲得服務(wù)端處理IO事件的線程數(shù)量
numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
String logMsg = "Configuring NIO connection handler with "
+ (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, "
+ numSelectorThreads + " selector thread(s), "
+ (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and "
+ (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers."));
LOG.info(logMsg);
//創(chuàng)建numSelectorThreads個SelectorThread線程
for (int i = 0; i < numSelectorThreads; ++i) {
selectorThreads.add(new SelectorThread(i));
}
listenBacklog = backlog;
//創(chuàng)建服務(wù)端ServerSocketChannel并且綁定到給定的地址上
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port {}", addr);
if (listenBacklog == -1) {
ss.socket().bind(addr);
} else {
ss.socket().bind(addr, listenBacklog);
}
//設(shè)置服務(wù)端ServerSocketChannel為非阻塞
ss.configureBlocking(false);
//創(chuàng)建接受客戶端連接線程
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}
上面在NIOServerCnxnFactory的configure方法中創(chuàng)建了三種類型的線程
- ConnectionExpirerThread
- SelectorThread
- AcceptThread
SelectorThread
我看下它的構(gòu)造方法
public SelectorThread(int id) throws IOException {
super("NIOServerCxnFactory.SelectorThread-" + id);
this.id = id;
//所有被接受的SocketChannel都會被加入到acceptedQueue中
acceptedQueue = new LinkedBlockingQueue<SocketChannel>();
//zookeeper在處理各種IO狀態(tài)的時候會更改SelectionKey注冊的感興趣事件纫版,當(dāng)有SelectionKey需要去更改感興趣的事件的時候,需要把連接對應(yīng)的SelectionKey放入到updateQueue中
updateQueue = new LinkedBlockingQueue<SelectionKey>();
}
AcceptThread
看下它的構(gòu)造方法
public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException {
//在super中會創(chuàng)建AcceptThread關(guān)聯(lián)的selector選擇器
super("NIOServerCxnFactory.AcceptThread:" + addr);
this.acceptSocket = ss;
//服務(wù)端的ServerSocketChannel向selector注冊O(shè)P_ACCEPT客情,準(zhǔn)備接受來自客戶端的連接
this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT);
//將來被接受的connection都會從selectorThreads選擇一個selectorThread分配給這個connection其弊,和connection對應(yīng)的selectorThread
//負(fù)責(zé)處理該connection所有的IO事件
this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads));
selectorIterator = this.selectorThreads.iterator();
}
zookeeper 服務(wù)啟動
上面創(chuàng)建了各種線程,這些線程還沒有啟動膀斋,同時還有一些線程沒有創(chuàng)建
NIOServerCnxnFactory.start(ZookeeperServer)最終實現(xiàn)了這些線程的創(chuàng)建啟動梭伐,從而完成了zookeeper server端的啟動
NIOServerCnxnFactory.startup 源碼
@Override
public void startup(ZooKeeperServer zks, boolean startServer) throws IOException, InterruptedException {
//啟動已經(jīng)創(chuàng)建的acceptThread,selectorThread等
start();
setZooKeeperServer(zks);
if (startServer) {
//數(shù)據(jù)恢復(fù)概页,zookeeper啟動之后會從數(shù)據(jù)庫持久化文件snap和事物log中恢復(fù)節(jié)點數(shù)據(jù)和session信息到內(nèi)存中
zks.startdata();
//
zks.startup();
}
}
start()
public void start() {
stopped = false;
if (workerPool == null) {
//創(chuàng)建處理IO事件的線程池
workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
}
//啟動SelectorThread
for (SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
//啟動acceptThread
// ensure thread is started once and only once
if (acceptThread.getState() == Thread.State.NEW) {
acceptThread.start();
}
//啟動連接超時管理線程
if (expirerThread.getState() == Thread.State.NEW) {
expirerThread.start();
}
}
zookeeperServer.startup
public synchronized void startup() {
if (sessionTracker == null) {
//創(chuàng)建sessionTracker類來管理session 超時狀態(tài)
createSessionTracker();
}
//啟動sessionTracker
startSessionTracker();
//設(shè)置zookeeper server端請求處理鏈
//對于單機(jī)版本的zookeeper而言籽御,請求處理鏈上包含了3個節(jié)點
//PrepRequestProcessor --> SyncRequestProcessor --> FinalRequestProcessor
//其中PrepRequestProcessor和SyncRequestProcessor都是由單獨的線程運行,通過queue傳遞消息
setupRequestProcessors();
//創(chuàng)建RequestThrottler并且啟動,RequestThrottler用來控制請求的數(shù)量
//達(dá)到限流的目的
startRequestThrottler();
//注冊jmx監(jiān)控
registerJMX();
//開啟jvm暫停監(jiān)控
startJvmPauseMonitor();
//注冊監(jiān)控點
registerMetrics();
//設(shè)置zookeeper server為運行狀態(tài)
setState(State.RUNNING);
requestPathMetricsCollector.start();
localSessionEnabled = sessionTracker.isLocalSessionsEnabled();
notifyAll();
}
啟動ContainerManager
ContainerManager用來管理zookeeper container類型的節(jié)點技掏,container類型的節(jié)點作為容器借來用來存放別的節(jié)點铃将,當(dāng)一個container類型節(jié)點的所有子節(jié)點都被刪除之后,ContainerManager會按照固定的檢查周期去找到這些空的container節(jié)點然后把他們刪除掉
上面就是zookeeper服務(wù)啟動的大體過程哑梳,我現(xiàn)在把server端最核心的幾個線程詳細(xì)解析下
- acceptThread
- selectorThread
acceptThread
服務(wù)端啟動之后默認(rèn)會在2181端口監(jiān)聽用戶的連接請求劲阎,這一過程由acceptThread實現(xiàn),acceptThread會接受客戶端的連接然后給連接分配一個selectorThread去處理鸠真,典型的reactor模型悯仙。我們看下acceptThread.run 源碼
//上面我們講解AcceptThread的構(gòu)造方法的時候講過,在AcceptThread的構(gòu)造方法中服務(wù)端的ServerSocketChannel會向selector注冊O(shè)P_ACCEPT監(jiān)聽
public void run() {
try {
while (!stopped && !acceptSocket.socket().isClosed()) {
try {
//select方法是acceptThread的核心業(yè)務(wù)邏輯
select();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
} finally {
closeSelector();
// This will wake up the selector threads, and tell the
// worker thread pool to begin shutdown.
if (!reconfiguring) {
NIOServerCnxnFactory.this.stop();
}
LOG.info("accept thread exitted run method");
}
}
- acceptThread.select
private void select() {
try {
//等待客戶端連接事件的發(fā)生
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
//對于連接事件使用doAccept方法處理
if (key.isAcceptable()) {
if (!doAccept()) {
// If unable to pull a new connection off the accept
// queue, pause accepting to give us time to free
// up file descriptors and so the accept thread
// doesn't spin in a tight loop.
pauseAccept(10);
}
} else {
LOG.warn("Unexpected ops in accept select {}", key.readyOps());
}
}
} catch (IOException e) {
LOG.warn("Ignoring IOException while selecting", e);
}
}
- acceptThread.doAccept()
private boolean doAccept() {
boolean accepted = false;
SocketChannel sc = null;
try {
//獲取客戶端連接的socketChannel
sc = acceptSocket.accept();
accepted = true;
if (limitTotalNumberOfCnxns()) {
throw new IOException("Too many connections max allowed is " + maxCnxns);
}
InetAddress ia = sc.socket().getInetAddress();
//getClientCnxnCount用于獲取和記錄同一個ip創(chuàng)建的連接數(shù)量
int cnxncount = getClientCnxnCount(ia);
//如果同一個ip創(chuàng)建的連接數(shù)量大于用戶設(shè)定的單個客戶端允許的最大連接數(shù)吠卷,直接報錯锡垄,連接失敗
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);
}
LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
//對可以接受的連接按照round-robin的方式從selectorThread列表中取得一個selectorThread分配給當(dāng)前的連接
// Round-robin assign this connection to a selector thread
if (!selectorIterator.hasNext()) {
selectorIterator = selectorThreads.iterator();
}
SelectorThread selectorThread = selectorIterator.next();
//把當(dāng)前連接加入到分配的selectorThread的隊列中
if (!selectorThread.addAcceptedConnection(sc)) {
throw new IOException("Unable to add connection to selector queue"
+ (stopped ? " (shutdown in progress)" : ""));
}
acceptErrorLogger.flush();
} catch (IOException e) {
// accept, maxClientCnxns, configureBlocking
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());
fastCloseSock(sc);
}
return accepted;
}
}
我們分析下SelectorThread.run 源碼
//selectorThread run 方法包含三個重要的方法
// select(), processAcceptedConnections(), processInterestOpsUpdateRequests()
public void run() {
try {
while (!stopped) {
try {
select();
processAcceptedConnections();
processInterestOpsUpdateRequests();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
// Close connections still pending on the selector. Any others
// with in-flight work, let drain out of the work queue.
for (SelectionKey key : selector.keys()) {
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
if (cnxn.isSelectable()) {
cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
}
cleanupSelectionKey(key);
}
SocketChannel accepted;
while ((accepted = acceptedQueue.poll()) != null) {
fastCloseSock(accepted);
}
updateQueue.clear();
} finally {
closeSelector();
// This will wake up the accept thread and the other selector
// threads, and tell the worker thread pool to begin shutdown.
NIOServerCnxnFactory.this.stop();
LOG.info("selector thread exitted run method");
}
}
對于selectorThread.run的三個方法select(), processAcceptedConnections(), processInterestOpsUpdateRequests()為了理解上的方便,我們先講解processAcceptedConnections()
在講解processAcceptedConnections()之前我們回到上面AcceptThread.doAccept方法調(diào)用的selectorThread.addAcceptedConnection()
public boolean addAcceptedConnection(SocketChannel accepted) {
//acceptThread接受的連接會被加入到selectorThread的updateQueue中祭隔,加入之后會把selectorThred從selector.select()中喚醒
if (stopped || !acceptedQueue.offer(accepted)) {
return false;
}
wakeupSelector();
return true;
}
- processAcceptedConnections
processAcceptedConnections就是用來處理加入到acceptedQueue中的socketChannel的
private void processAcceptedConnections() {
SocketChannel accepted;
//從acceptedQueue中獲取一個SocketChannel
while (!stopped && (accepted = acceptedQueue.poll()) != null) {
SelectionKey key = null;
try {
//socketChannel在selectorThread對應(yīng)的selector上注冊O(shè)P_READ事件
key = accepted.register(selector, SelectionKey.OP_READ);
//把socketChannel包裝成zookeeper服務(wù)端管理的NIOServerCnxn連接對象
NIOServerCnxn cnxn = createConnection(accepted, key, this);
key.attach(cnxn);
//把創(chuàng)建的連接對象NIOServerCnxn加入到zookeeper server端的管理范疇货岭,之后cnxnExpiry會去定時的檢查它的超時狀態(tài)
addCnxn(cnxn);
} catch (IOException e) {
// register, createConnection
cleanupSelectionKey(key);
fastCloseSock(accepted);
}
}
}
通過processAcceptedConnections的處理,socketChannel就會在分配的selectorThread的selector上注冊監(jiān)聽OP_READ事件疾渴,現(xiàn)在我們在看下select方法的實現(xiàn)
private void select() {
try {
//獲取監(jiān)聽事件
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
Collections.shuffle(selectedList);
Iterator<SelectionKey> selectedKeys = selectedList.iterator();
while (!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selected.remove(key);
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
if (key.isReadable() || key.isWritable()) {
//handleIO來處理監(jiān)聽事件
handleIO(key);
} else {
LOG.warn("Unexpected ops in select {}", key.readyOps());
}
}
} catch (IOException e) {
LOG.warn("Ignoring IOException while selecting", e);
}
}
handleIO 實現(xiàn)
private void handleIO(SelectionKey key) {
//zookeeper把每個socketChannel上發(fā)生的IO事件封裝成IOWorkRequest任務(wù)千贯,
//然后這個IOWorkRequest任務(wù)會被提交給workerPool線程池去處理
IOWorkRequest workRequest = new IOWorkRequest(this, key);
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
// Stop selecting this key while processing on its
// connection
cnxn.disableSelectable();
//socketChannel在selector注冊對任何事件都不感興趣
//zookeeper對單個連接上的IO事件是按照順序一個一個處理的,它是
//如何實現(xiàn)按照IO事件發(fā)生順序一個一個處理的呢搞坝?
//當(dāng)socketChannel接受到一個IO事件之后搔谴,他就會設(shè)置對任何事件都不感興趣,這樣后來到來的事件就沒有辦法傳遞給server端的socketChannel桩撮,
//直到當(dāng)前IO事件處理完成之后敦第,socketChannel才會繼續(xù)注冊一些感興趣事件,這個時候后續(xù)事件才能被處理
key.interestOps(0);
//更新連接的超時時間點
touchCnxn(cnxn);
//向IO任務(wù)處理線程池提交IOWorkRequest任務(wù)
workerPool.schedule(workRequest);
}
根據(jù)上面的描述zookeeper通過設(shè)置socketChannel對任何事件都不感興趣來實現(xiàn)同一個socketChannel上不同IO事件按照順序處理店量,那么在一個IO事件處理完成之后申尼,socketChannel是如何再次向selector注冊感興趣的事件的呢?
這個就是processInterestOpsUpdateRequests的工作了
我們看下processInterestOpsUpdateRequests的源代碼
//IOWorkRequest在處理完一個IO事件之后垫桂,它就會把連接對應(yīng)的selectionKey放入到updateQueue中
private void processInterestOpsUpdateRequests() {
SelectionKey key;
//processInterestOpsUpdateRequests從updateQueue中取出所有的selectionKey
while (!stopped && (key = updateQueue.poll()) != null) {
if (!key.isValid()) {
cleanupSelectionKey(key);
}
//通過selectionKey的附件獲取到對應(yīng)的連接對象NIOServerCnxn
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
if (cnxn.isSelectable()) {
//selectionKey重新注冊感興趣的事件师幕,cnxn.getInterestOps會根據(jù)連接上的讀寫情況設(shè)置對應(yīng)的感興趣事件
key.interestOps(cnxn.getInterestOps());
}
}
}
上面講解了zookeeper關(guān)于連接接受和連接上IO事件處理的邏輯,下面的這張圖是對這個過程的概括
server端還會啟動session和connection超時管理線程诬滩,在這里就不詳解解析了可以看本系列另一篇文章
zookeeper 超時對象管理實現(xiàn)--ExpiryQueue