Master選舉實(shí)現(xiàn)
思路:選擇一個(gè)根節(jié)點(diǎn),例如/master_select酬诀,多臺(tái)機(jī)器同時(shí)向該節(jié)點(diǎn)創(chuàng)建一個(gè)子節(jié)點(diǎn) /master_select/lock鸳碧,利用zk的特性报强,最終只有一臺(tái)機(jī)器能夠創(chuàng)建成功,這臺(tái)機(jī)器就是master
static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
.connectString("localhost:32770")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.namespace("master_select")
.build();
public static void main(String[] args) throws InterruptedException {
zkFluentClient.start();
String selectPath = "/master_select";
LeaderSelector selector = new LeaderSelector(zkFluentClient, selectPath, new LeaderSelectorListenerAdapter() {
@Override
// 需要注意的是爬虱,一旦執(zhí)行完這個(gè)方法隶债,curator就會(huì)立即釋放Master的權(quán)利,然后重新開始新一輪的Master選舉
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
System.out.println("Be a Leader");
TimeUnit.SECONDS.sleep(3);
System.out.println("釋放 Leader ");
}
});
selector.autoRequeue();
selector.start();
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}
如果同時(shí)有2個(gè)請(qǐng)求跑筝,可以看到交替執(zhí)行死讹,創(chuàng)建2個(gè)臨時(shí)節(jié)點(diǎn):
[zk: localhost:2181(CONNECTED) 21] ls /master_select/master_select
[_c_93265fd6-4b11-4668-baf8-e4211a8d1b5f-lock-0000000067, _c_36a0c859-efb1-442a-9dff-26121e7a1a7e-lock-0000000068]
這里的臨時(shí)節(jié)點(diǎn),在master失效的時(shí)候就會(huì)被刪除曲梗。
一旦takeLeaderShip執(zhí)行結(jié)束赞警,master的就會(huì)被釋放妓忍,然后重新開始新一輪的master選舉。
分布式鎖
使用InterProcessMutex來(lái)做分布式鎖處理
public class DistributeLockTest {
static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.connectString("localhost:32770")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.namespace("lock")
.build();
public static void main(String[] args) {
zkFluentClient.start();
final InterProcessMutex lock = new InterProcessMutex(zkFluentClient, "/distribute_lock");
final CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < 30; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
latch.await();
lock.acquire(); // 獲取鎖
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
String orderNo = sdf.format(Date.from(Instant.now()));
System.out.println("OrderNo is:" + orderNo);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();
}
latch.countDown();// 這里很有意思愧旦,在主線程啟動(dòng)了幾十個(gè)線程之后世剖,這些線程都是hold住的(通過(guò) countDownLatch.await()方法)
// 然后主線程處理latch.countDown(),導(dǎo)致所有子線程同時(shí)滿足觸發(fā)條件,同時(shí)執(zhí)行笤虫,保證并發(fā)旁瘫。不過(guò)僅用在測(cè)試環(huán)節(jié)比較合適。
// 其實(shí)latch可以去掉琼蚯,只是這樣并發(fā)沒(méi)有那么集中酬凳。
}
}
分布式計(jì)數(shù)器
思路很類似,用上述分布式鎖的思路遭庶。
比如統(tǒng)計(jì)在線人數(shù)宁仔,指定zk的一個(gè)數(shù)據(jù)節(jié)點(diǎn)作為計(jì)數(shù)器,多個(gè)應(yīng)用實(shí)例在分布式鎖的控制下峦睡,通過(guò)更新該數(shù)據(jù)節(jié)點(diǎn)的內(nèi)容來(lái)實(shí)現(xiàn)計(jì)數(shù)功能台诗。
public class DistributeCounterTest {
static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
.namespace("counter")
.connectString("localhost:32770")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(new ExponentialBackoffRetry(800, 5))
.build();
public static void main(String[] args) throws Exception {
zkFluentClient.start();
// 計(jì)數(shù)器
DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(zkFluentClient, "/adder",
new RetryNTimes(3, 1000));
AtomicValue<Integer> rc = atomicInteger.add(8);
rc = atomicInteger.increment();
rc = atomicInteger.decrement();
atomicInteger.increment();
System.out.println("Result:" + rc.succeeded());
System.out.println("preValue:" + rc.preValue() + ",postValue:" + rc.postValue());
System.out.println();
String value = new String(zkFluentClient.getData().forPath("/adder"));
System.out.println(value);
// 試著重新取
DistributedAtomicInteger newAtomicInteger = new DistributedAtomicInteger(zkFluentClient, "/adder", new RetryNTimes(3, 800));
System.out.println(newAtomicInteger.get().preValue() + "_" + newAtomicInteger.get().postValue());
}
}
可以看到,只要同一個(gè)路徑下赐俗,對(duì)應(yīng)的DistributeAtomicInteger的對(duì)象值都是同一個(gè)拉队,可以隨時(shí)創(chuàng)建一個(gè)對(duì)象直接使用。
分布式Barrier
先看一個(gè)JDK自帶的CyclicBarrier阻逮,先看下CyclicBarrier的說(shuō)明:
CyclicBarrier 的字面意思是可循環(huán)(Cyclic)使用的屏障(Barrier)粱快。它要做的事情是,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞叔扼,直到最后一個(gè)線程到達(dá)屏障時(shí)事哭,屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)干活瓜富。線程進(jìn)入屏障通過(guò)CyclicBarrier的await()方法鳍咱。
CyclicBarrier默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量与柑,每個(gè)線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障谤辜,然后當(dāng)前線程被阻塞。
實(shí)現(xiàn)原理:在CyclicBarrier的內(nèi)部定義了一個(gè)Lock對(duì)象价捧,每當(dāng)一個(gè)線程調(diào)用CyclicBarrier的await方法時(shí)丑念,將剩余攔截的線程數(shù)減1,然后判斷剩余攔截?cái)?shù)是否為0结蟋,如果不是脯倚,進(jìn)入Lock對(duì)象的條件隊(duì)列等待。如果是嵌屎,執(zhí)行barrierAction對(duì)象的Runnable方法推正,然后將鎖的條件隊(duì)列中的所有線程放入鎖等待隊(duì)列中恍涂,這些線程會(huì)依次的獲取鎖、釋放鎖植榕,接著先從await方法返回乳丰,再?gòu)腃yclicBarrier的await方法中返回。
CyclicBarrier主要用于一組線程之間的相互等待内贮,而CountDownLatch一般用于一組線程等待另一組線程。實(shí)際上可以通過(guò)CountDownLatch的countDown()和await()來(lái)實(shí)現(xiàn)CyclicBarrier的功能汞斧。即 CountDownLatch中的countDown()+await() = CyclicBarrier中的await()夜郁。注意:在一個(gè)線程中先調(diào)用countDown(),然后調(diào)用await()粘勒。
先看代碼
public class DistributeCyclicBarrierTest {
static CyclicBarrier jdkBarrier = new CyclicBarrier(3);
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.execute(new Thread(new JdkBasedRuner("jinsiyu")));
executorService.execute(new Thread(new JdkBasedRuner("AMANDA")));
executorService.execute(new Thread(new JdkBasedRuner("QQ")));
}
static class JdkBasedRuner implements Runnable {
private String name;
public JdkBasedRuner(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(name + " Ready!!!");
try {
jdkBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(name + " GO!!");
}
}
}
結(jié)果如下:
jinsiyu Ready!!!
AMANDA Ready!!!
QQ Ready!!!QQ GO!!
jinsiyu GO!!
AMANDA GO!!
可以看到竞端,只有當(dāng)CyclicBarrier中的值為0時(shí),才會(huì)統(tǒng)一執(zhí)行其后的操作庙睡,也就是“XXX GO”的語(yǔ)句打印事富。
而如果這里jdkBarrier如果設(shè)置的為4,那么下面三句“XXX GO”的語(yǔ)句根本不會(huì)打印乘陪,會(huì)一直等待统台。
ZK下的實(shí)現(xiàn):
static DistributedBarrier distributedBarrier;
static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectionTimeoutMs(3000)
.sessionTimeoutMs(5000)
.connectString("localhost:32770")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("cyclicBarrier").build();
.......
zkClient.start();
// distribute
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
distributedBarrier = new DistributedBarrier(zkClient, "/barrier");
System.out.println(Thread.currentThread().getName() + "號(hào)barrier設(shè)置");
try {
distributedBarrier.setBarrier(); // 看實(shí)現(xiàn),就是在create節(jié)點(diǎn)
distributedBarrier.waitOnBarrier(); // 等待啡邑,直到remove
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "Starting...");
}
}).start();
}
TimeUnit.SECONDS.sleep(2);
distributedBarrier.removeBarrier(); // delete節(jié)點(diǎn)
ZKPaths & EnsurePath
public class ZKPathsTest {
static String path = "/zkpath_sample";
static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.connectString("localhost:32770")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.build();
public static void main(String[] args) throws Exception {
zkClient.start();
System.out.println(ZKPaths.fixForNamespace(path,"/sub"));
System.out.println(ZKPaths.makePath(path,"/sub"));
System.out.println(ZKPaths.getNodeFromPath("/zkpath_sample/sub1")); // 不存在節(jié)點(diǎn)也不會(huì)報(bào)錯(cuò)贱勃,從路徑str中截取
ZKPaths.PathAndNode pn = ZKPaths.getPathAndNode("/zkpath_sample/sub1"); // 獲取節(jié)點(diǎn),不存在也不會(huì)報(bào)錯(cuò),只是從路徑上截取
System.out.println(pn.getPath());
System.out.println(pn.getNode());
// 獲取zookeeper谤逼,這個(gè)是干啥的贵扰?
ZooKeeper zooKeeper = zkClient.getZookeeperClient().getZooKeeper();
String dir1 = path + "/child1";
String dir2 = path + "/child2";
ZKPaths.mkdirs(zooKeeper,dir1); // 創(chuàng)建目錄,如果存在不會(huì)報(bào)錯(cuò)流部,也不會(huì)拋異常
ZKPaths.mkdirs(zooKeeper,dir2);
System.out.println(ZKPaths.getSortedChildren(zooKeeper,path)); // 獲取已排序的子節(jié)點(diǎn)
ZKPaths.deleteChildren(zooKeeper,path,false); // 刪除子節(jié)點(diǎn)戚绕,如果最后一個(gè)參數(shù)為true,會(huì)刪除本身
}
}
public class EnsurePathTest {
static String path = "/path2";
static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectionTimeoutMs(3000)
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.connectString("localhost:32770")
.namespace("ensure")
.build();
public static void main(String[] args) throws Exception {
zkClient.start();
EnsurePath ensurePath = new EnsurePath(path); // 這里跟namespace沒(méi)關(guān)系枝冀,只會(huì)從根目錄下開始建舞丛,所以是絕對(duì)路徑了
ensurePath.ensure(zkClient.getZookeeperClient());
EnsurePath ensurePath1 = zkClient.newNamespaceAwareEnsurePath("/c2"); // 用這個(gè)方法,namespace生效
ensurePath1.ensure(zkClient.getZookeeperClient());
}
}
不過(guò)EnsurePath貌似已經(jīng)不推薦使用了果漾。
順序節(jié)點(diǎn)
static CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString("localhost:32770")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.namespace("sequence-jin")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
zkClient.start();
// 創(chuàng)建順序節(jié)點(diǎn)
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
.forPath("/seq");
}
執(zhí)行多次后瓷马,結(jié)果如下:
ls /sequence-jin
[seq0000000001, seq0000000000, seq0000000002]
可以看到,順序節(jié)點(diǎn)跨晴。欧聘。。就是如此
可以通過(guò)臨時(shí)節(jié)點(diǎn)來(lái)代替心跳端盆,來(lái)判斷client端是否存在怀骤。