一裳仆、快速開始:
(1)概述:Zookeeper是Hadoop的一個子項目溉箕,它是分布式系統(tǒng)中的協(xié)調(diào)系統(tǒng)晦墙,可提供的服務(wù)主要有:配置服務(wù)、名字服務(wù)肴茄、分布式同步晌畅、組服務(wù)等。?
(2)使用常見:1寡痰,統(tǒng)一配置:把配置放在ZooKeeper的節(jié)點(diǎn)中維護(hù)抗楔,當(dāng)配置變更時,客戶端可以收到變更的通知拦坠,并應(yīng)用最新的配置连躏。2,集群管理:集群中的節(jié)點(diǎn)贞滨,創(chuàng)建ephemeral的節(jié)點(diǎn)入热,一旦斷開連接,ephemeral的節(jié)點(diǎn)會消失疲迂,其它的集群機(jī)器可以收到消息才顿。3,分布式鎖:多個客戶端發(fā)起節(jié)點(diǎn)創(chuàng)建操作尤蒿,只有一個客戶端創(chuàng)建成功郑气,從而獲得鎖。?
(3)安裝和配置:通過官方下載鏈接zookeeper?進(jìn)行下載腰池,解壓后進(jìn)入conf目錄尾组,新建一個zoo.conf文件,配置內(nèi)容如下:
tickTime=2000? ?
dataDir=/Users/lsq/Documents/zookeeper/zookeeper0/data
dataLogDir=/Users/lsq/Documents/zookeeper/zookeeper0/dataLog
clientPort=4399
initLimit=5
syncLimit=2
tickTime: ZooKeeper基本時間單位(ms)?
initLimit: 指定了啟動zookeeper時示弓,zookeeper實例中的隨從實例同步到領(lǐng)導(dǎo)實例的初始化連接時間限制讳侨,超出時間限制則連接失敗(以tickTime為時間單位)奏属;?
syncLimit: 指定了zookeeper正常運(yùn)行時跨跨,主從節(jié)點(diǎn)之間同步數(shù)據(jù)的時間限制,若超過這個時間限制囱皿,那么隨從實例將會被丟棄?
dataDir: zookeeper存放數(shù)據(jù)的目錄勇婴;?
clientPort: 用于連接客戶端的端口
接下來進(jìn)入bin目錄啟動ZooKeeper實例以及客戶端連接:
./zkServer.sh start
./zkCli.sh -server localhost:4399
接下來看看集群如何配置,其實跟單機(jī)差不多嘱腥,這里我們把剛剛下載的Zookeeper復(fù)制多兩份耕渴,一共是三個,配置信息如下:
tickTime=2000? ?
dataDir=/Users/lsq/Documents/zookeeper/zookeeper0/data
dataDir=/Users/lsq/Documents/zookeeper/zookeeper0/dataLog
clientPort=4399
initLimit=5
syncLimit=2
server.1=127.0.0.1:8880:9990
server.2=127.0.0.1:8881:9991
server.3=127.0.0.1:8882:9992
三個文件夾下面的zoo.conf都是這個格式齿兔,需要修改dataDir橱脸,dataDir础米,clientPort,?
然后在dataDir所指向的目錄下面新建一個myid文件添诉,對應(yīng)server.x屁桑,比如第一個文件夾下面的myid就填入一個1,第二個就填入一個2栏赴,以此類推掏颊。接著依次啟動即可“剩可以采用下面的命令
echo "1" > myid
二乌叶、使用java來操作ZooKeeper實例?
一門技術(shù)最重要的就算實戰(zhàn)了,接下來的內(nèi)容將圍繞這一部分來講柒爸。?
(1)首先是Znode的創(chuàng)建和刪除?
Znode有兩種類型:短暫的和持久的准浴。短暫的znode在創(chuàng)建的客戶端與服務(wù)器端斷開(無論是明確的斷開還是故障斷開)連接時,該znode都會被刪除捎稚;相反乐横,持久的znode則不會
public class CreateGroup implements Watcher {
? ? //會話延時
? ? private static final int SESSION_TIMEOUT = 1000;
? ? //zk對象
? ? private ZooKeeper zk = null;
? ? //同步計數(shù)器
? ? private CountDownLatch countDownLatch = new CountDownLatch(1);
? ? //客戶端連接到服務(wù)器時會觸發(fā)觀察者進(jìn)行調(diào)用
? ? public void process(WatchedEvent event) {
? ? ? ? if(event.getState() == KeeperState.SyncConnected){
? ? ? ? ? ? countDownLatch.countDown();//計數(shù)器減一
? ? ? ? }
? ? }
? ? public void connect(String hosts) throws IOException, InterruptedException {
? ? ? ? zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
? ? ? ? countDownLatch.await();//阻塞程序繼續(xù)執(zhí)行
? ? }
? ? //創(chuàng)建GROUP
? ? public void create(String groupName) throws KeeperException, InterruptedException{
? ? ? ? String path = "/" + groupName;
? ? ? ? //允許任何客戶端對該znode進(jìn)行讀寫,以及znode進(jìn)行持久化
? ? ? ? String createPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
? ? ? ? System.out.println("Created "+createPath);
? ? }
? ? //關(guān)閉zk
? ? public void close() throws InterruptedException{
? ? ? ? if(zk != null){
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? zk.close();
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? throw e;
? ? ? ? ? ? }finally{
? ? ? ? ? ? ? ? zk = null;
? ? ? ? ? ? ? ? System.gc();
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? //測試主類
? ? public static void main(String args[]){
? ? ? ? String host = "127.0.0.1:4399";
? ? ? ? String groupName = "test";
? ? ? ? CreateGroup createGroup = new CreateGroup();
? ? ? ? try {
? ? ? ? ? ? createGroup.connect(host);
? ? ? ? ? ? createGroup.create(groupName);
? ? ? ? ? ? createGroup.close();
? ? ? ? ? ? createGroup = null;
? ? ? ? ? ? System.gc();
? ? ? ? } catch (IOException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? } catch (KeeperException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }?
? ? }
}
接下來把創(chuàng)建和銷毀分離出來作為一個獨(dú)立的類,以后相關(guān)操作可以直接使用
public class ConnetctionWatcher implements Watcher {
? ? private static final int SESSION_TIMEOUT = 5000;
? ? protected ZooKeeper zk = null;
? ? private CountDownLatch countDownLatch = new CountDownLatch(1);
? ? public void process(WatchedEvent event) {
? ? ? ? KeeperState state = event.getState();
? ? ? ? if(state == KeeperState.SyncConnected){
? ? ? ? ? ? countDownLatch.countDown();
? ? ? ? }
? ? }
? ? public void connection(String hosts) throws IOException, InterruptedException {
? ? ? ? zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
? ? ? ? countDownLatch.await();
? ? }
? ? public void close() throws InterruptedException {
? ? ? ? if (null != zk) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? zk.close();
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? throw e;
? ? ? ? ? ? }finally{
? ? ? ? ? ? ? ? zk = null;
? ? ? ? ? ? ? ? System.gc();
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
接下來我們看看節(jié)點(diǎn)如何刪除
public class DeleteGroup extends ConnetctionWatcher {
? ? public void delete(String groupName) {
? ? ? ? String path = "/" + groupName;
? ? ? ? try {
? ? ? ? ? ? List<String> children = zk.getChildren(path, false);
? ? ? ? ? ? for(String child : children){
? ? ? ? ? ? ? ? zk.delete(path + "/" + child, -1);
? ? ? ? ? ? }
? ? ? ? ? ? zk.delete(path, -1);//版本號為-1,
? ? ? ? } catch (KeeperException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
}
git上面還有官方給的例子的代碼今野,放在類Executor.java葡公,DataMonitor.java,DataMonitorListener.java条霜。有興趣可以看一下催什,試著自己實現(xiàn)一下。
三宰睡、利用java實現(xiàn)分布式Barrier?
Barrier是一種控制和協(xié)調(diào)多個任務(wù)觸發(fā)次序的機(jī)制蒲凶。簡單來說就是用一個屏障把將要執(zhí)行的任務(wù)攔住,等待所有任務(wù)都處于可運(yùn)行狀態(tài)才放開屏障拆内,其實在單機(jī)上我們可以利用CyclicBarrier來實現(xiàn)這個機(jī)制旋圆,但是在分布式環(huán)境下,我們可以利用ZooKeeper可以派上用場麸恍,我們可以利用一個Node來作為Barrier的實體灵巧,然后要Barrier的任務(wù)通過調(diào)用exists檢測是否Node存在,當(dāng)需要打開Barrier時候抹沪,刪除這個Node刻肄,這樣ZooKeeper的watch機(jī)制會通知到各個任務(wù)可以開始執(zhí)行。接下來看代碼:
public class Barrier extends SyncPrimitive {
? ? int size;
? ? String name;
? ? Barrier(String address, String root, int size) {
? ? ? ? super(address);
? ? ? ? this.root = root;
? ? ? ? this.size = size;
? ? ? ? //創(chuàng)建Barrier的Node
? ? ? ? if (zk != null) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Stat s = zk.exists(root, false);
? ? ? ? ? ? ? ? if (s == null) {
? ? ? ? ? ? ? ? ? ? zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } catch (KeeperException e) {
? ? ? ? ? ? ? ? System.out.println("Keeper exception when instantiating queue: " + e.toString());
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? System.out.println("Interrupted exception");
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? try {
? ? ? ? ? ? name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
? ? ? ? } catch (UnknownHostException e) {
? ? ? ? ? ? System.out.println(e.toString());
? ? ? ? }
? ? }
? ? /**
? ? * 加入Barrier等待
? ? */
? ? boolean enter() throws KeeperException, InterruptedException{
? ? ? ? zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
? ? ? ? while (true) {
? ? ? ? ? ? synchronized (mutex) {
? ? ? ? ? ? ? ? List<String> list = zk.getChildren(root, true);
? ? ? ? ? ? ? ? if (list.size() < size) {
? ? ? ? ? ? ? ? ? ? mutex.wait();
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? /**
? ? * 一直等待知道指定數(shù)量節(jié)點(diǎn)到達(dá)
? ? */
? ? boolean leave() throws KeeperException, InterruptedException{
? ? ? ? zk.delete(root + "/" + name, 0);
? ? ? ? while (true) {
? ? ? ? ? ? synchronized (mutex) {
? ? ? ? ? ? ? ? List<String> list = zk.getChildren(root, true);
? ? ? ? ? ? ? ? ? ? if (list.size() > 0) {
? ? ? ? ? ? ? ? ? ? ? ? mutex.wait();
? ? ? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? }
}
父類代碼如下:
public class SyncPrimitive implements Watcher {
? ? static ZooKeeper zk = null;
? ? static Integer mutex;
? ? //根節(jié)點(diǎn)
? ? String root;
? ? SyncPrimitive(String address) {
? ? ? ? if(zk == null){
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? System.out.println("Starting ZK:");
? ? ? ? ? ? ? ? zk = new ZooKeeper(address, 3000, this);
? ? ? ? ? ? ? ? mutex = new Integer(-1);
? ? ? ? ? ? ? ? System.out.println("Finished starting ZK: " + zk);
? ? ? ? ? ? } catch (IOException e) {
? ? ? ? ? ? ? ? System.out.println(e.toString());
? ? ? ? ? ? ? ? zk = null;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? //else mutex = new Integer(-1);
? ? }
? ? synchronized public void process(WatchedEvent event) {
? ? ? ? synchronized (mutex) {
? ? ? ? ? ? System.out.println("Process: " + event.getType());
? ? ? ? ? ? mutex.notify();
? ? ? ? }
? ? }
? ? public static void queueTest(String args[]) {
? ? ? ? Queue q = new Queue(args[1], "/app1");
? ? ? ? System.out.println("Input: " + args[1]);
? ? ? ? int i;
? ? ? ? Integer max = new Integer(args[2]);
? ? ? ? if (args[3].equals("p")) {
? ? ? ? ? ? System.out.println("Producer");
? ? ? ? ? ? for (i = 0; i < max; i++)
? ? ? ? ? ? ? ? try{
? ? ? ? ? ? ? ? ? ? q.produce(10 + i);
? ? ? ? ? ? ? ? } catch (KeeperException e){
? ? ? ? ? ? ? ? } catch (InterruptedException e){
? ? ? ? ? ? ? ? }
? ? ? ? } else {
? ? ? ? ? ? System.out.println("Consumer");
? ? ? ? ? ? for (i = 0; i < max; i++) {
? ? ? ? ? ? ? ? try{
? ? ? ? ? ? ? ? ? ? int r = q.consume();
? ? ? ? ? ? ? ? ? ? System.out.println("Item: " + r);
? ? ? ? ? ? ? ? } catch (KeeperException e){
? ? ? ? ? ? ? ? ? ? i--;
? ? ? ? ? ? ? ? } catch (InterruptedException e){
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public static void barrierTest(String args[]) {
? ? ? ? Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
? ? ? ? try{
? ? ? ? ? ? boolean flag = b.enter();
? ? ? ? ? ? System.out.println("Entered barrier: " + args[2]);
? ? ? ? ? ? if(!flag) System.out.println("Error when entering the barrier");
? ? ? ? } catch (KeeperException e){
? ? ? ? } catch (InterruptedException e){
? ? ? ? }
? ? ? ? Random rand = new Random();
? ? ? ? int r = rand.nextInt(100);
? ? ? ? for (int i = 0; i < r; i++) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Thread.sleep(100);
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? try{
? ? ? ? ? ? b.leave();
? ? ? ? } catch (KeeperException e){
? ? ? ? } catch (InterruptedException e){
? ? ? ? }
? ? ? ? System.out.println("Left barrier");
? ? }
? ? //測試用的主類
? ? public static void main(String args[]) {
? ? ? ? /*
? ? ? ? args =new String[] {"qTest","localhost:4399","3","c"};
? ? ? ? if (args[0].equals("qTest"))
? ? ? ? ? ? queueTest(args);
? ? ? ? else
? ? ? ? ? ? barrierTest(args);
? ? ? ? */
? ? }
}
(四)分布式隊列(Queue)?
在分布式環(huán)境下采够,實現(xiàn)Queue需要高一致性來保證肄方,那么我們可以這樣來設(shè)計冰垄。把一個Node當(dāng)成一個隊列蹬癌,然后children用來存儲內(nèi)容权她,利用ZooKeeper提供的順序遞增的模式(會自動在name后面加入一個遞增的數(shù)字來插入新元素)。于是在offer時候我們可以使用create逝薪,take時候按照順序把children第一個delete就可以了隅要。ZooKeeper保證了各個server上數(shù)據(jù)是一致的。廢話不多說了董济,直接看代碼
/**
* 一個消費(fèi)者-生產(chǎn)者模式的消息隊列
*/
public class Queue extends SyncPrimitive {
? ? Queue(String address, String name) {
? ? ? ? super(address);
? ? ? ? this.root = name;
? ? ? ? if (zk != null) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Stat s = zk.exists(root, false);
? ? ? ? ? ? ? ? if (s == null) {
? ? ? ? ? ? ? ? ? ? zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } catch (KeeperException e) {
? ? ? ? ? ? ? ? System.out.println("Keeper exception when instantiating queue: " + e.toString());
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? System.out.println("Interrupted exception");
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? /**
? ? * 隊列中插入數(shù)據(jù)
? ? */
? ? boolean produce(int i) throws KeeperException, InterruptedException{
? ? ? ? ByteBuffer b = ByteBuffer.allocate(4);
? ? ? ? byte[] value;
? ? ? ? b.putInt(i);
? ? ? ? value = b.array();
? ? ? ? zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL);
? ? ? ? return true;
? ? }
? ? /**
? ? * 把元素從隊列中移除
? ? */
? ? int consume() throws KeeperException, InterruptedException{
? ? ? ? int retvalue = -1;
? ? ? ? Stat stat = null;
? ? ? ? //得到現(xiàn)在隊列中首個可用的節(jié)點(diǎn)
? ? ? ? while (true) {
? ? ? ? ? ? synchronized (mutex) {
? ? ? ? ? ? ? ? List<String> list = zk.getChildren(root, true);
? ? ? ? ? ? ? ? if (list.size() == 0) {
? ? ? ? ? ? ? ? ? ? System.out.println("Going to wait");
? ? ? ? ? ? ? ? ? ? mutex.wait();
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? Integer min = new Integer(list.get(0).substring(7));
? ? ? ? ? ? ? ? ? ? for(String s : list){
? ? ? ? ? ? ? ? ? ? ? ? Integer tempValue = new Integer(s.substring(7));
? ? ? ? ? ? ? ? ? ? ? ? //System.out.println("Temporary value: " + tempValue);
? ? ? ? ? ? ? ? ? ? ? ? if(tempValue < min) min = tempValue;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? System.out.println("Temporary value: " + root + "/element" + min);
? ? ? ? ? ? ? ? ? ? byte[] b = zk.getData(root + "/element" + min, false, stat);
? ? ? ? ? ? ? ? ? ? zk.delete(root + "/element" + min, 0);
? ? ? ? ? ? ? ? ? ? ByteBuffer buffer = ByteBuffer.wrap(b);
? ? ? ? ? ? ? ? ? ? retvalue = buffer.getInt();
? ? ? ? ? ? ? ? ? ? return retvalue;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
}