本文首發(fā)于泊浮目的簡(jiǎn)書(shū):http://www.reibang.com/u/204b8aaab8ba
版本 | 日期 | 備注 |
---|---|---|
1.0 | 2020.6.14 | 文章首發(fā) |
1.1 | 2020.8.16 | 排版優(yōu)化 |
1.2 | 2020.8.21 | 優(yōu)化措辭 |
1.3 | 2021.6.23 | 標(biāo)題從深入淺出Zookeeper(七):Leader選舉 改變?yōu)?code>深入淺出Zookeeper源碼(七):Leader選舉
|
1. 前言
對(duì)于一個(gè)分布式集群來(lái)說(shuō),保證數(shù)據(jù)寫(xiě)入一致性最簡(jiǎn)單的方式就是依靠一個(gè)節(jié)點(diǎn)來(lái)調(diào)度和管理其他節(jié)點(diǎn)。在分布式系統(tǒng)中我們一般稱其為L(zhǎng)eader摊趾。
為什么是最簡(jiǎn)單的方式呢弄跌?我們想象一下磷账,當(dāng)我們寫(xiě)數(shù)據(jù)到Leader時(shí)缝裁,Leader寫(xiě)入自己的一份數(shù)據(jù)后,可能會(huì)做副本到Follower治拿,那么拷貝的數(shù)量摩泪、及所在的位置都由該Leader來(lái)控制。但如果是多Leader調(diào)度劫谅,就要涉及到數(shù)據(jù)分區(qū)见坑,請(qǐng)求負(fù)載均衡等問(wèn)題了。
今天捏检,筆者就和大家一起來(lái)看看ZK的選舉流程荞驴。
2. 選舉算法剖析-ZAB
這是一種典型的多數(shù)派算法,聽(tīng)名字就知道是為ZK而生了(Zookeeper Atomic Broadcast)未檩。其Leader的選舉主要關(guān)心節(jié)點(diǎn)的ID和數(shù)據(jù)ID戴尸,這其中數(shù)據(jù)ID越大,則表示數(shù)據(jù)越新冤狡,優(yōu)先成為主。
2.1 選舉時(shí)機(jī)
常見(jiàn)由兩種場(chǎng)景觸發(fā)選舉项棠,無(wú)論如何悲雳,至少得有兩臺(tái)ZK機(jī)器。
- Startup觸發(fā):我們知道香追,每臺(tái)zk都需要配置不同的
myid
合瓢,而當(dāng)剛開(kāi)始時(shí),zxid
必定都為0透典。這便意味著會(huì)挑選myid
最大的zk節(jié)點(diǎn)作為leader晴楔。 - Leader失聯(lián)觸發(fā):zk節(jié)點(diǎn)每經(jīng)過(guò)一次事務(wù)處理顿苇,都會(huì)更新
zxid
。那便意味著數(shù)據(jù)越新税弃,zxid
會(huì)越大纪岁。在這個(gè)選舉過(guò)程中,會(huì)挑選出zxid
的節(jié)點(diǎn)作為leader则果。
2.2 Zk選舉過(guò)程剖析(帶源碼分析)
核心方法為org.apache.zookeeper.server.quorum.QuorumPeer.startLeaderElection
和org.apache.zookeeper.server.quorum.QuorumPeer.run
幔翰,我們的源碼分析也基于此展開(kāi)。
下面的源碼分析基于
3.5.7
版本西壮。
2.2.1 Startup
我們得從QuorumPeerMain
來(lái)看遗增,因?yàn)檫@是啟動(dòng)的入口:
/**
*
* <h2>Configuration file</h2>
*
* When the main() method of this class is used to start the program, the first
* argument is used as a path to the config file, which will be used to obtain
* configuration information. This file is a Properties file, so keys and
* values are separated by equals (=) and the key/value pairs are separated
* by new lines. The following is a general summary of keys used in the
* configuration file. For full details on this see the documentation in
* docs/index.html
* <ol>
* <li>dataDir - The directory where the ZooKeeper data is stored.</li>
* <li>dataLogDir - The directory where the ZooKeeper transaction log is stored.</li>
* <li>clientPort - The port used to communicate with clients.</li>
* <li>tickTime - The duration of a tick in milliseconds. This is the basic
* unit of time in ZooKeeper.</li>
* <li>initLimit - The maximum number of ticks that a follower will wait to
* initially synchronize with a leader.</li>
* <li>syncLimit - The maximum number of ticks that a follower will wait for a
* message (including heartbeats) from the leader.</li>
* <li>server.<i>id</i> - This is the host:port[:port] that the server with the
* given id will use for the quorum protocol.</li>
* </ol>
* In addition to the config file. There is a file in the data directory called
* "myid" that contains the server id as an ASCII decimal value.
*
*/
@InterfaceAudience.Public
public class QuorumPeerMain {
private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class);
private static final String USAGE = "Usage: QuorumPeerMain configfile";
protected QuorumPeer quorumPeer;
/**
* To start the replicated server specify the configuration file name on
* the command line.
* @param args path to the configfile
*/
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
LOG.error("Invalid arguments, exiting abnormally", e);
LOG.info(USAGE);
System.err.println(USAGE);
System.exit(2);
} catch (ConfigException e) {
LOG.error("Invalid config, exiting abnormally", e);
System.err.println("Invalid config, exiting abnormally");
System.exit(2);
} catch (DatadirException e) {
LOG.error("Unable to access datadir, exiting abnormally", e);
System.err.println("Unable to access datadir, exiting abnormally");
System.exit(3);
} catch (AdminServerException e) {
LOG.error("Unable to start AdminServer, exiting abnormally", e);
System.err.println("Unable to start AdminServer, exiting abnormally");
System.exit(4);
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
System.exit(1);
}
LOG.info("Exiting normally");
System.exit(0);
}
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
public void runFromConfig(QuorumPeerConfig config)
throws IOException, AdminServerException
{
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
// @VisibleForTesting
protected QuorumPeer getQuorumPeer() throws SaslException {
return new QuorumPeer();
}
}
我們從QuorumPeerMain.main()
-> main.initializeAndRun(args)
-> runFromConfig
-> quorumPeer.start()
,繼續(xù)往下看QuorumPeer.java(這個(gè)類(lèi)用于管理選舉相關(guān)的邏輯):
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
startLeaderElection();
super.start();
}
現(xiàn)在款青,我們來(lái)到核心代碼startLeaderElection
:
synchronized public void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
// if (!getView().containsKey(myid)) {
// throw new RuntimeException("My id " + myid + " not in the peer list");
//}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(getQuorumAddress().getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
this.electionAlg = createElectionAlgorithm(electionType);
}
邏輯非常的簡(jiǎn)單做修,如果處于Looking狀態(tài)(服務(wù)器剛啟動(dòng)時(shí)默認(rèn)為L(zhǎng)ooking),那么就發(fā)起選舉的投票抡草,并確認(rèn)選舉算法(從3.4.0開(kāi)始饰及,只有FastLeaderElection選舉算法了),并將其發(fā)送出去渠牲。由于代碼篇幅較大旋炒,這里不再粘出,感興趣的讀者可以自行閱讀FastLeaderElection.Messenger.WorkerReceiver.run
签杈。其本質(zhì)上就是一個(gè)線程瘫镇,從存儲(chǔ)選票的隊(duì)列中取出vote,并發(fā)送答姥。
在這里普及一下服務(wù)器狀態(tài):
- LOOKING:尋找Leader狀態(tài)铣除。當(dāng)服務(wù)器處于該狀態(tài)時(shí),它認(rèn)為當(dāng)前集群中沒(méi)有Leader鹦付。
- FOLLOWING:跟隨者狀態(tài)尚粘,表明當(dāng)前服務(wù)器角色Follower。
- LEADING:領(lǐng)導(dǎo)者狀態(tài)敲长,表明當(dāng)前服務(wù)器角色是Leader郎嫁。
- OBSERVING:觀察者狀態(tài),表明當(dāng)前服務(wù)器是Observer祈噪。
接下來(lái)看QuorumPeer
的相關(guān)核心代碼:
@Override
public void run() {
updateThreadName();
LOG.debug("Starting quorum peer");
try {
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for(QuorumServer s: getView().values()){
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
RemotePeerBean rBean = new RemotePeerBean(this, s);
try {
MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
jmxRemotePeerBean.put(s.id, rBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
}
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk =
new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
在這里僅僅截取了Looking
的相關(guān)邏輯泽铛,上半段的if主要處理只讀服務(wù)——其用于handle只讀client。else邏輯則是常見(jiàn)的情況辑鲤,但是從代碼塊:
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
其實(shí)區(qū)別不大盔腔。接著來(lái)看lookForLeader
,為了篇幅,我們只截取Looking相關(guān)的代碼:
/**
* Starts a new round of leader election. Whenever our QuorumPeer
* changes its state to LOOKING, this method is invoked, and it
* sends notifications to all other peers.
*/
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = Time.currentElapsedTime();
}
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
注釋說(shuō)的很清楚,這個(gè)方法會(huì)開(kāi)啟新的一輪選舉:當(dāng)我們的服務(wù)器狀態(tài)變?yōu)長(zhǎng)ooking弛随,這個(gè)方法會(huì)被調(diào)用瓢喉,被通知集群其他需要參與選舉的服務(wù)器。那么在這段邏輯中舀透,recvqueue
會(huì)存放著相關(guān)的選舉通知信息栓票,取出一個(gè)。接下來(lái)有兩個(gè)邏輯分支:
- 為空盐杂。想辦法通知其他服務(wù)器逗载。
- 有效的投票(即大家的選舉輪次都是統(tǒng)一論次),那么便進(jìn)行選票P(pán)K链烈。
我們來(lái)看totalOrderPredicate
這個(gè)方法:
/**
* Check if a pair (server id, zxid) succeeds our
* current vote.
*
* @param id Server identifier
* @param zxid Last zxid observed by the issuer of this vote
*/
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
理一下邏輯:
- 如果新的輪次大于內(nèi)部投票輪次厉斟,則需要進(jìn)行投票變更
- 如果選舉輪次一致,并外部投票的ZXID大于內(nèi)部投票的强衡,則需要變更
- 如果選舉輪次一致擦秽,并外部投票的SID大于內(nèi)部投票的漩勤,則需要變更
經(jīng)過(guò)這個(gè)邏輯,便可以確定外部投票優(yōu)于內(nèi)部投票——即更適合成為L(zhǎng)eader越败。這時(shí)便會(huì)把外部選票信息來(lái)覆蓋內(nèi)部投票,并發(fā)送出去:
case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
接下來(lái)就會(huì)判斷集群中是否有過(guò)半的服務(wù)器認(rèn)可該投票置谦。
/**
* Termination predicate. Given a set of votes, determines if have
* sufficient to declare the end of the election round.
*
* @param votes
* Set of votes
* @param vote
* Identifier of the vote received last
*/
protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}
return voteSet.hasAllQuorums(); //是否超過(guò)一半
}
否則的話會(huì)繼續(xù)收集選票媒峡。
接下來(lái)便是更新服務(wù)器狀態(tài)葵擎。
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
2.2.2 Leader失聯(lián)
上文我們提到了QuorumPeer.java
,里面有個(gè)main loop
酬滤,不同的角色會(huì)在這個(gè)loop下做自己的事签餐。直到退出。在這里盯串,我們以Follower為例贱田,進(jìn)行分析:
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
跳follower.followLeader()
:
/**
* the main method called by the follower to follow the leader
*
* @throws InterruptedException
*/
void followLeader() throws InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
try {
QuorumServer leaderServer = findLeader();
try {
connectToLeader(leaderServer.addr, leaderServer.hostname);
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
if (self.isReconfigStateChange())
throw new Exception("learned about role change");
//check to see if the leader zxid is lower than ours
//this should never happen but is just a safety check
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
} catch (Exception e) {
LOG.warn("Exception when following the leader", e);
try {
sock.close();
} catch (IOException e1) {
e1.printStackTrace();
}
// clear pending revalidations
pendingRevalidations.clear();
}
} finally {
zk.unregisterJMX((Learner)this);
}
}
跳往核心方法processPacket
:
/**
* Examine the packet received in qp and dispatch based on its contents.
* @param qp
* @throws IOException
*/
protected void processPacket(QuorumPacket qp) throws Exception{
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();
if (hdr.getType() == OpCode.reconfig){
SetDataTxn setDataTxn = (SetDataTxn) txn;
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
self.setLastSeenQuorumVerifier(qv, true);
}
fzk.logRequest(hdr, txn);
break;
case Leader.COMMIT:
fzk.commit(qp.getZxid());
break;
case Leader.COMMITANDACTIVATE:
// get the new configuration from the request
Request request = fzk.pendingTxns.element();
SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
// get new designated leader from (current) leader's message
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
boolean majorChange =
self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
// commit (writes the new config to ZK tree (/zookeeper/config)
fzk.commit(qp.getZxid());
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
break;
case Leader.UPTODATE:
LOG.error("Received an UPTODATE message after Follower started");
break;
case Leader.REVALIDATE:
revalidate(qp);
break;
case Leader.SYNC:
fzk.sync();
break;
default:
LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
break;
}
}
在case COMMITANDACTIVATE
中,我們可以看到當(dāng)其收到leader改變相關(guān)的消息時(shí)嘴脾,就會(huì)拋出異常。接下來(lái)它自己就會(huì)變成LOOKING
狀態(tài),開(kāi)始選舉译打。
那么如何確定leader不可用呢耗拓?答案是通過(guò)心跳指令。在一定時(shí)間內(nèi)如果leader的心跳沒(méi)有過(guò)來(lái)奏司,那么則認(rèn)為其已經(jīng)不可用乔询。
見(jiàn)LeanerHandler.run
里的case Leader.PING
:
case Leader.PING:
// Process the touches
ByteArrayInputStream bis = new ByteArrayInputStream(qp
.getData());
DataInputStream dis = new DataInputStream(bis);
while (dis.available() > 0) {
long sess = dis.readLong();
int to = dis.readInt();
leader.zk.touch(sess, to);
}
break;
3. 其他常見(jiàn)選舉算法
首先,我們要知道韵洋。選舉算法的本質(zhì)是共識(shí)算法竿刁,而絕大多數(shù)共識(shí)算法就是為了解決分布式環(huán)境下數(shù)據(jù)一致性而誕生的。而zk里所謂leader搪缨、follower之類(lèi)的食拜,無(wú)非也是個(gè)狀態(tài),基于zk這個(gè)語(yǔ)義下(上下文里)大家都認(rèn)為一個(gè)leader是leader副编,才是有效的共識(shí)负甸。
常見(jiàn)的共識(shí)算法都有哪些呢?現(xiàn)階段的共識(shí)算法主要可以分成三大類(lèi):公鏈痹届,聯(lián)盟鏈和私鏈呻待。下面描述這三種類(lèi)別的特征:
- 私鏈:私鏈的共識(shí)算法即區(qū)塊鏈這個(gè)概念還沒(méi)普及時(shí)的傳統(tǒng)分布式系統(tǒng)里的共識(shí)算法,比如 zookeeper 的 zab 協(xié)議,就是類(lèi) paxos 算法的一種。私鏈的適用環(huán)境一般是不考慮集群中存在作惡節(jié)點(diǎn)瑞妇,只考慮因?yàn)橄到y(tǒng)或者網(wǎng)絡(luò)原因?qū)е碌墓收瞎?jié)點(diǎn)。
- 聯(lián)盟鏈:聯(lián)盟鏈中,經(jīng)典的代表項(xiàng)目是 Hyperledger 組織下的 Fabric 項(xiàng)目环疼, Fabric0.6 版本使用的就是 pbft 算法芬膝。聯(lián)盟鏈的適用環(huán)境除了需要考慮集群中存在故障節(jié)點(diǎn)厨剪,還需要考慮集群中存在作惡節(jié)點(diǎn)膨俐。對(duì)于聯(lián)盟鏈,每個(gè)新加入的節(jié)點(diǎn)都是需要驗(yàn)證和審核的。
- 公鏈:公鏈不斷需要考慮網(wǎng)絡(luò)中存在故障節(jié)點(diǎn)浅役,還需要考慮作惡節(jié)點(diǎn)符欠,這一點(diǎn)和聯(lián)盟鏈?zhǔn)穷?lèi)似的端姚。和聯(lián)盟鏈最大的區(qū)別就是盆顾,公鏈中的節(jié)點(diǎn)可以很自由的加入或者退出奈懒,不需要嚴(yán)格的驗(yàn)證和審核磷杏。
引用自:https://zhuanlan.zhihu.com/p/35847127;作者:美圖技術(shù)團(tuán)隊(duì)
基于篇幅,接下來(lái)簡(jiǎn)單介紹下兩個(gè)較為典型的共識(shí)算法遥金。
3.1 Raft
Raft 算法是典型的多數(shù)派投票選舉算法稿械,其選舉機(jī)制與我們?nèi)粘I钪械拿裰魍镀睓C(jī)制類(lèi)似美莫,核心思想是“少數(shù)服從多數(shù)”。也就是說(shuō)述吸,Raft 算法中蝌矛,獲得投票最多的節(jié)點(diǎn)成為主入撒。
采用 Raft 算法選舉,集群節(jié)點(diǎn)的角色有 3 種:
- Leader献雅,即主節(jié)點(diǎn)挺身,同一時(shí)刻只有一個(gè) Leader章钾,負(fù)責(zé)協(xié)調(diào)和管理其他節(jié)點(diǎn)惨撇;
- Candidate魁衙,即候選者,每一個(gè)節(jié)點(diǎn)都可以成為 Candidate邪狞,節(jié)點(diǎn)在該角色下才可以被選為新的 Leader;
- Follower剑令,Leader 的跟隨者吁津,不可以發(fā)起選舉。
Raft 選舉的流程稍算,可以分為以下幾步:
- 初始化時(shí)钾埂,所有節(jié)點(diǎn)均為 Follower 狀態(tài)褥紫。
- 開(kāi)始選主時(shí),所有節(jié)點(diǎn)的狀態(tài)由 Follower 轉(zhuǎn)化為 Candidate绳军,并向其他節(jié)點(diǎn)發(fā)送選舉請(qǐng)求。
- 其他節(jié)點(diǎn)根據(jù)接收到的選舉請(qǐng)求的先后順序射赛,回復(fù)是否同意成為主楣责。這里需要注意的是,在每一輪選舉中沮趣,一個(gè)節(jié)點(diǎn)只能投出一張票坷随。
- 若發(fā)起選舉請(qǐng)求的節(jié)點(diǎn)獲得超過(guò)一半的投票缸匪,則成為主節(jié)點(diǎn)凌蔬,其狀態(tài)轉(zhuǎn)化為 Leader砂心,其他節(jié)點(diǎn)的狀態(tài)則由 Candidate 降為 Follower计贰。Leader 節(jié)點(diǎn)與 Follower 節(jié)點(diǎn)之間會(huì)定期發(fā)送心跳包躁倒,以檢測(cè)主節(jié)點(diǎn)是否活著。
- 當(dāng) Leader 節(jié)點(diǎn)的任期到了衰抑,即發(fā)現(xiàn)其他服務(wù)器開(kāi)始下一輪選主周期時(shí)(或主節(jié)點(diǎn)掛了)砾淌,Leader 節(jié)點(diǎn)的狀態(tài)由 Leader 降級(jí)為 Follower赃春,進(jìn)入新一輪選主织中。
這個(gè)算法比起ZAB狭吼,較易實(shí)現(xiàn)刁笙,但由于消息通信量大,相比于ZAB,更適用于中小的場(chǎng)景嫡纠。
3.2 Pow
PoW 算法除盏,是以每個(gè)節(jié)點(diǎn)或服務(wù)器的計(jì)算能力(即“算力”)來(lái)競(jìng)爭(zhēng)記賬權(quán)的機(jī)制者蠕,因此是一種使用工作量證明機(jī)制的共識(shí)算法踱侣。也就是說(shuō),誰(shuí)的算力強(qiáng)(解題快)杠愧,誰(shuí)獲得記賬權(quán)的可能性就越大流济。
比如發(fā)生一次交易腌闯,同時(shí)有三個(gè)節(jié)點(diǎn)(A、B工腋、C)都收到了這個(gè)記賬請(qǐng)求擅腰。A節(jié)點(diǎn)已經(jīng)算出來(lái)了,那么就會(huì)通知BC節(jié)點(diǎn)進(jìn)行驗(yàn)證——這是一種橢圓曲線加密算法渗勘,解題的速度會(huì)比驗(yàn)證的速度慢很多旺坠。當(dāng)所有節(jié)點(diǎn)驗(yàn)證后,這個(gè)記賬就記下來(lái)了璧疗。
聽(tīng)起來(lái)很公平崩侠。但PoW 機(jī)制每次達(dá)成共識(shí)需要全網(wǎng)共同參與運(yùn)算,增加了每個(gè)節(jié)點(diǎn)的計(jì)算量僧家,并且如果題目過(guò)難,會(huì)導(dǎo)致計(jì)算時(shí)間長(zhǎng)、資源消耗多 ;而如果題目過(guò)于簡(jiǎn)單枷邪,會(huì)導(dǎo)致大量節(jié)點(diǎn)同時(shí)獲得記賬權(quán)诺凡,沖突多东揣。這些問(wèn)題,都會(huì)增加達(dá)成共識(shí)的時(shí)間腹泌。
4. 小結(jié)
在本文嘶卧,我們先提到了zookeeper的leader選舉,大致流程如下:
4.1 服務(wù)器啟動(dòng)時(shí)選舉
- 每個(gè)Server會(huì)發(fā)出一個(gè)投票
- 接受來(lái)每個(gè)Server的投票
- 處理投票(對(duì)比zxid和myid)
- 統(tǒng)計(jì)投票凉袱,直到超過(guò)半數(shù)的機(jī)器收到相同的投票信息
- 更改服務(wù)器角色
4.2 服務(wù)器運(yùn)行期間選舉
和服務(wù)器啟動(dòng)時(shí)選舉
非常的像芥吟,無(wú)非就是多了一個(gè)狀態(tài)變更——當(dāng)Leader掛了,余下的Follower都會(huì)將自己的服務(wù)器狀態(tài)變更為L(zhǎng)OOKING专甩,然后進(jìn)入選舉流程钟鸵。
4.3 一致性算法和共識(shí)算法
我們還提到了一致性算法和共識(shí)算法的概念,那么一致性與共識(shí)的區(qū)別是什么呢俐镐?在平常使用中,我們通常會(huì)混淆一致性和共識(shí)這兩個(gè)概念,不妨在這兒說(shuō)清:
- 一致性:分布式系統(tǒng)中的多個(gè)節(jié)點(diǎn)之間,給定一系列的操作,在約定協(xié)議的保障下骡尽,對(duì)外界呈現(xiàn)的數(shù)據(jù)或狀態(tài)是一致的辨图。
- 共識(shí):分布式系統(tǒng)中多個(gè)節(jié)點(diǎn)之間痘煤,彼此對(duì)某個(gè)狀態(tài)達(dá)成一致結(jié)果的過(guò)程师郑。
即:一致性強(qiáng)調(diào)的是結(jié)果地梨,共識(shí)強(qiáng)調(diào)的是達(dá)成一致的過(guò)程渴庆,共識(shí)算法是保障系統(tǒng)滿足不同程度一致性的核心技術(shù)卓缰。
因此摄闸,結(jié)合上篇文章和這篇文章竣稽,ZAB應(yīng)該是一種共識(shí)算法耍缴。