利用zookeeper實現(xiàn)分布式隊列

一、zookeeper介紹

? ? ? ? zookeeper是源代碼開放的分布式協(xié)調(diào)服務(wù)鹊杖,由雅虎創(chuàng)建悴灵,是Google的開源實現(xiàn)。zookeeper是一個高性能的分布式數(shù)據(jù)一致性解決方案骂蓖,它將那些復雜的积瞒、容易出錯的分布式一致性服務(wù)封裝起來,構(gòu)成一個高效可靠的原語集涯竟,并提供一系列簡單易用的接口給用戶使用。

? ??????其本身提供了一致性保證空厌,特點如下:

????????(1)順序一致性:客戶端的更新順序與它們被發(fā)送的順序一致庐船。

????????(2)原子性:更新操作要么成功,要么失敗嘲更,沒有第三種結(jié)果筐钟。

????????(3)單系統(tǒng)鏡像:無論客戶端連接到哪一個服務(wù)器,他都將看到相同的zookeeper視圖赋朦。

????????(4)可靠性:一旦一個更新操作被應(yīng)用篓冲,那么在客戶端再次更新之前李破,其值不會再改變。

二壹将、應(yīng)用場景

? ? ? ? zookeeper可以應(yīng)用于很多的分布式服務(wù)場景嗤攻,包括:集群管理,master選舉诽俯,發(fā)布/訂閱妇菱,分布式鎖,分布式隊列暴区,分布式命名服務(wù)闯团,服務(wù)注冊于發(fā)現(xiàn),負載均衡等等仙粱。下面一個例子介紹zookeeper如何實現(xiàn)分布式隊列房交。

三、zookeeper分布式隊列實現(xiàn)

? ? ? ??zookeeper分布式隊列的實現(xiàn)完成以下幾個要素:

? ? ? ? (1)伐割、數(shù)據(jù)入隊候味,在一個節(jié)點下創(chuàng)建有序子節(jié)點,節(jié)點中設(shè)置需要入隊的數(shù)據(jù)口猜,完成數(shù)據(jù)的入隊操作负溪。

? ? ? ? (2)、數(shù)據(jù)出隊济炎,取出該節(jié)點下的所有子節(jié)點川抡,如果數(shù)量不為0,取出一個子節(jié)點须尚,并將子節(jié)點刪除崖堤。

? ? ? ? (3)、提供判斷是否有數(shù)據(jù)等的api耐床。

? ? ? ? 下面為具體代碼實現(xiàn)

1密幔、DistributedSimpleQueue類


public class DistributedSimpleQueue{

????????protected final ZkClient zkClient;

? ? ? ? protected final String root;

????????protected static final String Node_NAME = "n_";

????????public DistributedSimpleQueue(ZkClient zkClient, String root) {

????????????????this.zkClient = zkClient;

????????????????this.root = root;

? ? ? ? ?}

????????public int size() {

????????????return zkClient.getChildren(root).size();

????????}

????????public boolean isEmpty() {

????????????return zkClient.getChildren(root).size() == 0;

????????}?

? ? ? ? public boolean offer(T element) throws Exception{?

? ? ? ? ? ? ? ? String nodeFullPath = root .concat( "/" ).concat( Node_NAME );

?????????????????try {?

?????????????????????????zkClient.createPersistentSequential(nodeFullPath , element);?

?????????????????}catch (ZkNoNodeException e) {?

?????????????????????????zkClient.createPersistent(root);?

?????????????????????????offer(element);?

?????????????????} catch (Exception e) {?

?????????????????????throw ExceptionUtil.convertToRuntimeException(e);?

?????????????????}?

?????????????????return true;?

? ? ? ? ? ? ?}

????????????@SuppressWarnings("unchecked")

? ? ? ? ? ? public T poll() throws Exception {

? ? ? ? ? ? ? ? ? ?try {

? ? ? ? ? ? ? ? ? ? ? ? ? ?List? list = zkClient.getChildren(root);

? ? ? ? ? ? ? ? ? ? ? ? ? ?if (list.size() == 0) {

????????????????????????????????return null;

????????????????????????????}

????????????????????????????Collections.sort(list, new Comparator() {

????????????????????????????????????public int compare(String lhs, String rhs) {

????????????????????????????????????????????return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));

????????????????????????????????????}

????????????????????????????});

????????????????????????????for ( String nodeName : list ){

????????????????????????????????????String nodeFullPath = root.concat("/").concat(nodeName);

????????????????????????????????????try {

????????????????????????????????????????????T node = (T) zkClient.readData(nodeFullPath);

????????????????????????????????????????????zkClient.delete(nodeFullPath);

????????????????????????????????????????????return node;

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?} catch (ZkNoNodeException e) {

????????????????????????????????????????????????// ignore

????????????????????????????????????????}

????????????????????????????}

????????????????????????????return null;

? ? ? ? ? ? ? ? ? ? } catch (Exception e) {

????????????????????????????throw ExceptionUtil.convertToRuntimeException(e);

????????????????????}

????????????}

????????????private String getNodeNumber(String str, String nodeName) {

????????????????????int index = str.lastIndexOf(nodeName);

????????????????????if (index >= 0) {

????????????????????????index += Node_NAME.length();

????????????????????????return index <= str.length() ? str.substring(index) : "";

????????????????????}

????????????????????return str;

????????????}

}

2、阻塞隊列實現(xiàn)類

public class DistributedBlockingQueue? extends DistributedSimpleQueue{?

?????????public DistributedBlockingQueue(ZkClient zkClient, String root) {?

?????????????????super(zkClient, root);

????????}?

? ? ? ? @Override

? ? ? ? public T poll() throws Exception {

????????????????while (true){

????????????????????????final CountDownLatch latch = new CountDownLatch(1);

????????????????????????final IZkChildListener childListener = new IZkChildListener() {

????????????????????????????????public void handleChildChange(String parentPath, List currentChilds) throws Exception {

????????????????????????????????????????latch.countDown();

????????????????????????????????}

????????????????????????};

????????????????????????zkClient.subscribeChildChanges(root, childListener);

????????????????????????try{

????????????????????????????????T node = super.poll();

? ? ? ? ? ????????????????????? if ( node != null ){

? ? ? ? ? ? ? ? ????????????????????????return node;

? ? ? ? ? ????????????????????? }else{

? ? ? ? ? ????????????????????????????? latch.await();

? ? ? ? ? ????????????????????? }

????????????????????????}finally{

????????????????????????????????zkClient.unsubscribeChildChanges(root, childListener);

????????????????????????}

????????????????}

????????????}

}

? ? ? ? 阻塞隊列的實現(xiàn)利用了CountDownLatch 的特性撩轰。當子節(jié)點數(shù)量為0時胯甩,即隊列中沒有元素,這是線程在此等待堪嫂,同時監(jiān)聽子節(jié)點的變化偎箫,如果有數(shù)據(jù)入隊,則從等待返回皆串,取出數(shù)據(jù)淹办。

3、測試類

public class TestDistributedBlockingQueue {

????????????public static void main(String[] args) {

????????????????????ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);

????????????????????int delayTime = 5;

? ? ? ? ? ? ? ? ? ? ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());

????????????????????final DistributedBlockingQueuequeue = new DistributedBlockingQueue(zkClient,"/Queue");

????????????????????final User user1 = new User();

????????????????????user1.setId("1");

????????????????????user1.setName("xiao wang");

????????????????????final User user2 = new User();

????????????????????user2.setId("2");

????????????????????user2.setName("xiao wang");

????????????????????try {

????????????????????????????delayExector.schedule(new Runnable() {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? public void run() {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?try {

????????????????????????????????????????????????queue.offer(user1);

????????????????????????????????????????????????queue.offer(user2);

????????????????????????????????????????????} catch (Exception e) {

????????????????????????????????????????????????????e.printStackTrace();

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? }

????????????????????????????????????????}

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?}, delayTime , TimeUnit.SECONDS);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?System.out.println("ready poll!");

????????????????????????????????User u1 = (User) queue.poll();

????????????????????????????????User u2 = (User) queue.poll();

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){

????????????????????????????????????????System.out.println("Success!");

????????????????????????????????}

????????????????????} catch (Exception e) {

????????????????????????????e.printStackTrace();

????????????????????} finally{

????????????????????????????????delayExector.shutdown();

????????????????????try {

????????????????????????????????delayExector.awaitTermination(2, TimeUnit.SECONDS);

????????????????????} catch (InterruptedException e) {

????????????????????}

????????????}

????????}

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末恶复,一起剝皮案震驚了整個濱河市怜森,隨后出現(xiàn)的幾起案子速挑,更是在濱河造成了極大的恐慌,老刑警劉巖副硅,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件姥宝,死亡現(xiàn)場離奇詭異,居然都是意外死亡想许,警方通過查閱死者的電腦和手機伶授,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來流纹,“玉大人糜烹,你說我怎么就攤上這事∈” “怎么了疮蹦?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長茸炒。 經(jīng)常有香客問我愕乎,道長,這世上最難降的妖魔是什么壁公? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任感论,我火速辦了婚禮,結(jié)果婚禮上紊册,老公的妹妹穿的比我還像新娘比肄。我一直安慰自己,他們只是感情好囊陡,可當我...
    茶點故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布芳绩。 她就那樣靜靜地躺著,像睡著了一般撞反。 火紅的嫁衣襯著肌膚如雪妥色。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天遏片,我揣著相機與錄音嘹害,去河邊找鬼。 笑死吮便,一個胖子當著我的面吹牛笔呀,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播线衫,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼凿可,長吁一口氣:“原來是場噩夢啊……” “哼惑折!你這毒婦竟也來了授账?” 一聲冷哼從身側(cè)響起枯跑,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎白热,沒想到半個月后敛助,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡屋确,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年纳击,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片攻臀。...
    茶點故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡焕数,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出刨啸,到底是詐尸還是另有隱情堡赔,我是刑警寧澤,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布设联,位于F島的核電站善已,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏离例。R本人自食惡果不足惜换团,卻給世界環(huán)境...
    茶點故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望宫蛆。 院中可真熱鬧艘包,春花似錦、人聲如沸洒扎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽袍冷。三九已至磷醋,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間胡诗,已是汗流浹背邓线。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留煌恢,地道東北人骇陈。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像瑰抵,于是被迫代替她去往敵國和親你雌。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,901評論 2 355

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)婿崭,斷路器拨拓,智...
    卡卡羅2017閱讀 134,656評論 18 139
  • 1. Java基礎(chǔ)部分 基礎(chǔ)部分的順序:基本語法,類相關(guān)的語法氓栈,內(nèi)部類的語法渣磷,繼承相關(guān)的語法,異常的語法授瘦,線程的語...
    子非魚_t_閱讀 31,631評論 18 399
  • ... 一醋界、相關(guān)概念 中間件:為分布式系統(tǒng)提供協(xié)調(diào)服務(wù)的組件,如專門用于計算服務(wù)的機器就是一個計算型中間件提完,還有專...
    帥可兒妞閱讀 475評論 0 0
  • 一徒欣、基本數(shù)據(jù)類型 注釋 單行注釋:// 區(qū)域注釋:/* */ 文檔注釋:/** */ 數(shù)值 對于byte類型而言...
    龍貓小爺閱讀 4,261評論 0 16
  • 石min閱讀 118評論 0 0