Zookeeper學(xué)習(xí)-(原生API和Curator API)

原生API:

package com.ruozedata.zookeeper

import org.apache.zookeeper.Watcher.Event

import org.apache.zookeeper._

import scala.util.Random

import java.util.concurrent.CountDownLatch

import scala.collection.mutable._

//import org.apache.zookeeper.data.Stat

/**

* /consumers/G301/offsets/ruoze_offset_topic/partition/0

* /consumers/G301/offsets/ruoze_offset_topic/partition/1

* /consumers/G301/offsets/ruoze_offset_topic/partition/2

*/

object ZooKeeperApp {

//等待zk創(chuàng)建成功淆衷,配合Watcher里的process方法

? ? val connected =new CountDownLatch(1)

val zookeeper =new ZooKeeper("192.168.205.131:2181",3000,new Watcher {

override def process(event:WatchedEvent): Unit = {

println("create zookeeper client begin...")

if(Event.KeeperState.SyncConnected ==event.getState){

println("create zookeeper client end...")

connected.countDown()

}

}

})

def main(args:Array[String]): Unit = {

connected.await()

storeOffsets(getOffsetRanges,"G306")

val result =obtainOffsets("ruoze_offset_topic","G306")

for (map <-result){

println("topic:"+map._1.topic+"? ? partition:"+map._1.partition+"? ? ? offset:"+map._2)

}

}

/*

? ? ? ? 隨機生成偏移量記錄對象*/

? ? def getOffsetRanges():ArrayBuffer[OffsetRange]={

val array =new ArrayBuffer[OffsetRange]()

for(i<-0 to 2){

array += OffsetRange("ruoze_offset_topic",getRandomInt(3),0,getRandomInt(1000).toLong)

}

array

? ? }

def getRandomInt(n:Int): Int ={

Random.nextInt(n)

}

/*

? ? ? ? 存儲偏移量*/

? ? def storeOffsets(offsetsRanges:ArrayBuffer[OffsetRange],groupName:String) :Unit= {

for(or <-offsetsRanges){

val path ="/consumers/"+groupName+"/offsets/"+or.topic+"/partition/"+or.partition

? ? ? ? ? ? createOrExistsPath(path,zookeeper)

zookeeper.setData(path,(or.utilOffset+"").getBytes(),-1)

}

}

/*

? ? ? ? 根據(jù)topic和groupName查詢所屬分區(qū)和對應(yīng)偏移量*/

? ? def obtainOffsets(topic:String,groupName:String):Map[TopicAndPartition, Long] = {

val path ="/consumers/"+groupName+"/offsets/"+topic+"/partition"

? ? ? ? if(zookeeper.exists(path,false)==null){

Map(TopicAndPartition(topic,-1)-> -1)

}else {

val childs = zookeeper.getChildren(path,false)

val maps =new HashMap[TopicAndPartition,Long]()

for(i<-0 until childs.size()){

val partition =childs.get(i)

val partition_path =path+"/"+partition

? ? ? ? ? ? ? ? val result =new String(zookeeper.getData(partition_path,false,null))

maps(TopicAndPartition(topic,partition.toInt)) =result.toLong

? ? ? ? ? ? }

maps

? ? ? ? }

}

/*

? ? ? ? 檢查znode是否存在丛晌,如果不存在就創(chuàng)建,創(chuàng)建成功后返回創(chuàng)建路徑*/

? ? def createOrExistsPath(path:String,zooKeeper:ZooKeeper):String ={

println("check "+path)

val stat = zookeeper.exists(path,false)

if(stat!=null){

path

? ? ? ? }else{

if(path.lastIndexOf("/")!=0){

val paths =path.substring(0,path.lastIndexOf("/"))

createOrExistsPath(paths,zooKeeper)

}

zookeeper.create(path,"".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT)

}

}

}

/*

? ? 每次消費數(shù)據(jù)的記錄唉锌,包括消費的對象(對應(yīng)主題和分區(qū))和消費的數(shù)據(jù)偏移量? ? fromOffset:起始偏移量? ? utilOffset:終止偏移量*/

case class OffsetRange(val topic:String,val partition: Int,val fromOffset: Long,val utilOffset: Long)

/*

? ? 主題對應(yīng)的當(dāng)前偏移量*/

case class TopicAndPartition(topic:String,partition:Int)

Curator? API:

package com.ruozedata.zookeeper

import org.apache.curator.RetryPolicy

import org.apache.curator.framework.CuratorFramework

import org.apache.curator.framework.CuratorFrameworkFactory

import org.apache.curator.retry.ExponentialBackoffRetry

import scala.collection.mutable.{ArrayBuffer, HashMap, Map}

import scala.util.Random

/*

? ? 參考博客:http://www.reibang.com/p/70151fc0ef5d

*/

object ZooKeeperCuratorApp {

def main(args:Array[String]): Unit = {

val retryPolicy =new ExponentialBackoffRetry(1000,3)

val client =CuratorFrameworkFactory.newClient("192.168.205.131:2181",retryPolicy)

client.start()

var status =""

? ? ? ? while (status !="STARTED"){

status =client.getState.toString

? ? ? ? }

storeOffsets(getOffsetRanges,"G306",client)

val result =obtainOffsets("ruoze_offset_topic","G306",client)

for (map <-result){

println("topic:"+map._1.topic+"? ? partition:"+map._1.partition+"? ? ? offset:"+map._2)

}

client.close()

}

def storeOffsets(offsetsRanges:ArrayBuffer[OffsetRange],groupName:String,zookeeper:CuratorFramework) :Unit= {

for(or <-offsetsRanges) {

val path ="/consumers/" +groupName +"/offsets/" +or.topic +"/partition/" +or.partition

? ? ? ? ? ? if(zookeeper.checkExists().forPath(path)==null){

zookeeper.create().creatingParentContainersIfNeeded().forPath(path)

}

zookeeper.setData().forPath(path,(or.utilOffset+"").getBytes())

}

}

def obtainOffsets(topic:String,groupName:String,zookeeper:CuratorFramework):Map[TopicAndPartition, Long] = {

val path ="/consumers/"+groupName+"/offsets/"+topic+"/partition"

? ? ? ? if(zookeeper.checkExists().forPath(path)==null){

Map()

}else{

val maps =new HashMap[TopicAndPartition,Long]()

val childs =zookeeper.getChildren.forPath(path)//應(yīng)答結(jié)果是List

? ? ? ? ? ? for(i <-0 until childs.size()){

val data =new String(zookeeper.getData().forPath(path+"/"+childs.get(i)))

maps(TopicAndPartition(topic,childs.get(i).toInt)) =data.toLong

? ? ? ? ? ? }

maps

? ? ? ? }

}

/*

? ? ? 隨機生成偏移量記錄對象*/

? ? def getOffsetRanges():ArrayBuffer[OffsetRange]={

val array =new ArrayBuffer[OffsetRange]()

for(i<-0 to 2){

array += OffsetRange("ruoze_offset_topic",getRandomInt(3),0,getRandomInt(1000).toLong)

}

array

? ? }

def getRandomInt(n:Int): Int ={

Random.nextInt(n)

}

}

/*

? ? 每次消費數(shù)據(jù)的記錄抖所,包括消費的對象(對應(yīng)主題和分區(qū))和消費的數(shù)據(jù)偏移量? ? fromOffset:起始偏移量? ? utilOffset:終止偏移量*/

case class OffsetRange(val topic:String,val partition: Int,val fromOffset: Long,val utilOffset: Long)

/*

? ? 主題對應(yīng)的當(dāng)前偏移量*/

case class TopicAndPartition(topic:String,partition:Int)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末饱搏,一起剝皮案震驚了整個濱河市叭披,隨后出現(xiàn)的幾起案子辽狈,更是在濱河造成了極大的恐慌慈参,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,454評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件刮萌,死亡現(xiàn)場離奇詭異驮配,居然都是意外死亡,警方通過查閱死者的電腦和手機着茸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評論 3 385
  • 文/潘曉璐 我一進店門壮锻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人涮阔,你說我怎么就攤上這事猜绣。” “怎么了敬特?”我有些...
    開封第一講書人閱讀 157,921評論 0 348
  • 文/不壞的土叔 我叫張陵掰邢,是天一觀的道長。 經(jīng)常有香客問我伟阔,道長尸变,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,648評論 1 284
  • 正文 為了忘掉前任减俏,我火速辦了婚禮召烂,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘娃承。我一直安慰自己奏夫,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,770評論 6 386
  • 文/花漫 我一把揭開白布历筝。 她就那樣靜靜地躺著酗昼,像睡著了一般。 火紅的嫁衣襯著肌膚如雪梳猪。 梳的紋絲不亂的頭發(fā)上麻削,一...
    開封第一講書人閱讀 49,950評論 1 291
  • 那天蒸痹,我揣著相機與錄音,去河邊找鬼呛哟。 笑死叠荠,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的扫责。 我是一名探鬼主播榛鼎,決...
    沈念sama閱讀 39,090評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼鳖孤!你這毒婦竟也來了者娱?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,817評論 0 268
  • 序言:老撾萬榮一對情侶失蹤苏揣,失蹤者是張志新(化名)和其女友劉穎黄鳍,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體平匈,經(jīng)...
    沈念sama閱讀 44,275評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡际起,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,592評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了吐葱。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片街望。...
    茶點故事閱讀 38,724評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖弟跑,靈堂內(nèi)的尸體忽然破棺而出灾前,到底是詐尸還是另有隱情,我是刑警寧澤孟辑,帶...
    沈念sama閱讀 34,409評論 4 333
  • 正文 年R本政府宣布哎甲,位于F島的核電站,受9級特大地震影響饲嗽,放射性物質(zhì)發(fā)生泄漏炭玫。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,052評論 3 316
  • 文/蒙蒙 一貌虾、第九天 我趴在偏房一處隱蔽的房頂上張望吞加。 院中可真熱鬧,春花似錦尽狠、人聲如沸衔憨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽践图。三九已至,卻和暖如春沉馆,著一層夾襖步出監(jiān)牢的瞬間码党,已是汗流浹背德崭。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留揖盘,地道東北人眉厨。 一個月前我還...
    沈念sama閱讀 46,503評論 2 361
  • 正文 我出身青樓,卻偏偏與公主長得像扣讼,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子缨叫,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,627評論 2 350

推薦閱讀更多精彩內(nèi)容