上一節(jié)使用zookeeper實(shí)現(xiàn)分布式鎖,這節(jié)使用ZKClient實(shí)現(xiàn)master選舉
ZKClient使用比zookeeper要方便些誉碴,API更加簡(jiǎn)單。
master選舉實(shí)現(xiàn)原理
- 每一臺(tái)服務(wù)器都去競(jìng)爭(zhēng)創(chuàng)建/master節(jié)點(diǎn)
- 如果創(chuàng)建成功塔橡,那么master就是當(dāng)前創(chuàng)建的服務(wù)器
- 創(chuàng)建失敗拟淮,則監(jiān)聽(tīng)/master節(jié)點(diǎn)的刪除事件
- 一旦/master刪除环疼,所有客戶(hù)端再次去競(jìng)爭(zhēng)創(chuàng)建/master節(jié)點(diǎn)蹋嵌。誰(shuí)創(chuàng)建成功芽隆,誰(shuí)就是主節(jié)點(diǎn)
master選擇類(lèi)
package com.app.master;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DateUtils;
/**
* @Author: chao.zhu
* @Email: chao.zhu@rograndec.com
* @CreateDate: 2018/08/06
* @Version: 1.0
*/
public class MasterSelector {
private ZkClient zkClient;
//當(dāng)前服務(wù)器信息
private ServiceNodeInfo serviceNodeInfo;
//所有進(jìn)程爭(zhēng)搶的節(jié)點(diǎn)
private String MASTER_PATH = "/master";
//超時(shí)時(shí)間
private Integer SESSION_OUT=5000;
//master節(jié)點(diǎn)
private ServiceNodeInfo masterNode;
//服務(wù)器是否啟動(dòng)
private volatile boolean running = false;
//master監(jiān)聽(tīng)事件
IZkDataListener dataListener;
//任務(wù)調(diào)度類(lèi)询微。模擬心跳動(dòng)作崖瞭,每隔一小段時(shí)間當(dāng)前serviceNodeInfo去和zookeeper交互一次,看當(dāng)前master是否掛掉
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
public MasterSelector(ZkClient zkClient,ServiceNodeInfo serviceNodeInfo){
this.zkClient = zkClient;
this.serviceNodeInfo = serviceNodeInfo;
dataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
tryGetMaster();
}
};
}
public void star(){
if(running){
System.out.println("服務(wù)已經(jīng)啟動(dòng)");
return;
}
running = true;
//監(jiān)聽(tīng)MASTER_PATH節(jié)點(diǎn)撑毛,如果節(jié)點(diǎn)發(fā)送變化书聚,執(zhí)行dataListener
zkClient.subscribeDataChanges(MASTER_PATH,dataListener);
tryGetMaster();
}
public void stop(){
if(!running){
System.out.println("服務(wù)器停止");
return;
}
running = false;
scheduledExecutorService.shutdown();
//關(guān)閉當(dāng)前zkclien的監(jiān)聽(tīng)事件
zkClient.unsubscribeDataChanges(MASTER_PATH,dataListener);
//如果當(dāng)前服務(wù)器是主服務(wù)器,那么需要釋放掉MASTER_PATH
releaseMaster();
}
public void releaseMaster(){
if(checkMaster()){
System.out.println(DateToString(System.currentTimeMillis())+"釋放master"+serviceNodeInfo.toString());
zkClient.delete(MASTER_PATH);
}
}
public void tryGetMaster(){
//如果服務(wù)器未啟動(dòng)藻雌,則直接返回雌续。表示需要先執(zhí)行star方法
if(!running){
return;
}
//基本原理就是當(dāng)前服務(wù)器1去創(chuàng)建MASTER_PATH,如果創(chuàng)建成功胯杭,那么服務(wù)器1就是master節(jié)點(diǎn)驯杜。創(chuàng)建失敗,捕捉ZkNodeExistsException異常
try{
zkClient.createEphemeral(MASTER_PATH,serviceNodeInfo);
//沒(méi)有拋出異常做个,則當(dāng)前服務(wù)器1就是master服務(wù)器
masterNode = serviceNodeInfo;
System.out.println(DateToString(System.currentTimeMillis())+":master節(jié)點(diǎn)是:"+masterNode.toString());
//定時(shí)5秒鐘釋放節(jié)點(diǎn)鸽心。為了模擬服務(wù)器故障
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
//檢查一下當(dāng)前服務(wù)器1是否還是主服務(wù)器,如果是居暖,那么刪除/master節(jié)點(diǎn)再悼。即:釋放主服務(wù)
boolean flag = checkMaster();
if(flag){
zkClient.delete(MASTER_PATH);
}
}
},5,TimeUnit.SECONDS);
}
catch (ZkNodeExistsException e){
//如果節(jié)點(diǎn)已經(jīng)存在,獲得master節(jié)點(diǎn)
ServiceNodeInfo serviceNodeInfo1 = zkClient.readData(MASTER_PATH);
//如果在讀取的時(shí)候masterNode為空膝但,則重新去搶master節(jié)點(diǎn)
if(serviceNodeInfo1 == null){
tryGetMaster();
}else{
masterNode = serviceNodeInfo1;
}
}
}
public boolean checkMaster(){
try{
ServiceNodeInfo m = zkClient.readData(MASTER_PATH);
if(m.getServiceId().intValue() == serviceNodeInfo.getServiceId().intValue()){
return true;
}
}catch (ZkNoNodeException e) {
return false;
} catch (ZkInterruptedException e) {
return checkMaster();
} catch (ZkException e) {
return false;
}
return false;
}
public static String DateToString(long time){
return DateFormatUtils.format(time,"yyyy-MM-dd hh:mm:ss");
}
}
服務(wù)器信息類(lèi)
package com.app.master;
import java.io.Serializable;
/**
* @Author: chao.zhu
* @discription: 服務(wù)器信息
* @CreateDate: 2018/08/06
* @Version: 1.0
*/
public class ServiceNodeInfo implements Serializable {
private Integer serviceId;
private String serviceName;
public ServiceNodeInfo(Integer serviceId, String serviceName) {
this.serviceId = serviceId;
this.serviceName = serviceName;
}
public Integer getServiceId() {
return serviceId;
}
public void setServiceId(Integer serviceId) {
this.serviceId = serviceId;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
@Override
public String toString() {
return "ServiceNodeInfo{" +
"serviceId=" + serviceId +
", serviceName='" + serviceName + '\'' +
'}';
}
}
測(cè)試客戶(hù)端
package com.app.master;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import org.I0Itec.zkclient.ZkClient;
/**
* @Author: chao.zhu
* @description:
* @CreateDate: 2018/08/06
* @Version: 1.0
*/
public class Service1 {
//zk連接串
private static final String ZK_CONNECTION = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
public static void main(String[] args) throws Exception{
List<ZkClient> zkClientList = new ArrayList<ZkClient>(10);
List<MasterSelector> masterSelectorList = new ArrayList<MasterSelector>(10);
try{
for(int i = 0 ; i < 10 ; i++){
ZkClient zkClient = new ZkClient(ZK_CONNECTION,5000);
zkClientList.add(zkClient);
ServiceNodeInfo serviceNodeInfo = new ServiceNodeInfo(i,"service"+i);
MasterSelector masterSelector = new MasterSelector(zkClient,serviceNodeInfo);
masterSelector.star();
}
System.out.println("敲回車(chē)鍵退出冲九!\n");
new BufferedReader(new InputStreamReader(System.in)).readLine();
}finally {
System.out.println("Shutting down...");
for (MasterSelector workServer : masterSelectorList) {
try {
workServer.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
for (ZkClient client : zkClientList) {
try {
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
測(cè)試客戶(hù)端代碼說(shuō)明:
- 啟動(dòng)10個(gè)客戶(hù)端
- 然后每個(gè)客戶(hù)端開(kāi)始爭(zhēng)搶/master節(jié)點(diǎn)
- 搶到/master節(jié)點(diǎn)后,5秒鐘釋放master節(jié)點(diǎn)跟束。因?yàn)橹?0個(gè)客戶(hù)端都注冊(cè)了/master刪除事件莺奸,所以當(dāng)釋放master之后,這10個(gè)客戶(hù)端再次去爭(zhēng)搶/master節(jié)點(diǎn)