原生API:
package com.ruozedata.zookeeper
import org.apache.zookeeper.Watcher.Event
import org.apache.zookeeper._
import scala.util.Random
import java.util.concurrent.CountDownLatch
import scala.collection.mutable._
//import org.apache.zookeeper.data.Stat
/**
* /consumers/G301/offsets/ruoze_offset_topic/partition/0
* /consumers/G301/offsets/ruoze_offset_topic/partition/1
* /consumers/G301/offsets/ruoze_offset_topic/partition/2
*/
object ZooKeeperApp {
//等待zk創(chuàng)建成功淆衷,配合Watcher里的process方法
? ? val connected =new CountDownLatch(1)
val zookeeper =new ZooKeeper("192.168.205.131:2181",3000,new Watcher {
override def process(event:WatchedEvent): Unit = {
println("create zookeeper client begin...")
if(Event.KeeperState.SyncConnected ==event.getState){
println("create zookeeper client end...")
connected.countDown()
}
}
})
def main(args:Array[String]): Unit = {
connected.await()
storeOffsets(getOffsetRanges,"G306")
val result =obtainOffsets("ruoze_offset_topic","G306")
for (map <-result){
println("topic:"+map._1.topic+"? ? partition:"+map._1.partition+"? ? ? offset:"+map._2)
}
}
/*
? ? ? ? 隨機生成偏移量記錄對象*/
? ? def getOffsetRanges():ArrayBuffer[OffsetRange]={
val array =new ArrayBuffer[OffsetRange]()
for(i<-0 to 2){
array += OffsetRange("ruoze_offset_topic",getRandomInt(3),0,getRandomInt(1000).toLong)
}
array
? ? }
def getRandomInt(n:Int): Int ={
Random.nextInt(n)
}
/*
? ? ? ? 存儲偏移量*/
? ? def storeOffsets(offsetsRanges:ArrayBuffer[OffsetRange],groupName:String) :Unit= {
for(or <-offsetsRanges){
val path ="/consumers/"+groupName+"/offsets/"+or.topic+"/partition/"+or.partition
? ? ? ? ? ? createOrExistsPath(path,zookeeper)
zookeeper.setData(path,(or.utilOffset+"").getBytes(),-1)
}
}
/*
? ? ? ? 根據(jù)topic和groupName查詢所屬分區(qū)和對應(yīng)偏移量*/
? ? def obtainOffsets(topic:String,groupName:String):Map[TopicAndPartition, Long] = {
val path ="/consumers/"+groupName+"/offsets/"+topic+"/partition"
? ? ? ? if(zookeeper.exists(path,false)==null){
Map(TopicAndPartition(topic,-1)-> -1)
}else {
val childs = zookeeper.getChildren(path,false)
val maps =new HashMap[TopicAndPartition,Long]()
for(i<-0 until childs.size()){
val partition =childs.get(i)
val partition_path =path+"/"+partition
? ? ? ? ? ? ? ? val result =new String(zookeeper.getData(partition_path,false,null))
maps(TopicAndPartition(topic,partition.toInt)) =result.toLong
? ? ? ? ? ? }
maps
? ? ? ? }
}
/*
? ? ? ? 檢查znode是否存在丛晌,如果不存在就創(chuàng)建,創(chuàng)建成功后返回創(chuàng)建路徑*/
? ? def createOrExistsPath(path:String,zooKeeper:ZooKeeper):String ={
println("check "+path)
val stat = zookeeper.exists(path,false)
if(stat!=null){
path
? ? ? ? }else{
if(path.lastIndexOf("/")!=0){
val paths =path.substring(0,path.lastIndexOf("/"))
createOrExistsPath(paths,zooKeeper)
}
zookeeper.create(path,"".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT)
}
}
}
/*
? ? 每次消費數(shù)據(jù)的記錄唉锌,包括消費的對象(對應(yīng)主題和分區(qū))和消費的數(shù)據(jù)偏移量? ? fromOffset:起始偏移量? ? utilOffset:終止偏移量*/
case class OffsetRange(val topic:String,val partition: Int,val fromOffset: Long,val utilOffset: Long)
/*
? ? 主題對應(yīng)的當(dāng)前偏移量*/
case class TopicAndPartition(topic:String,partition:Int)
Curator? API:
package com.ruozedata.zookeeper
import org.apache.curator.RetryPolicy
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.Random
/*
? ? 參考博客:http://www.reibang.com/p/70151fc0ef5d
*/
object ZooKeeperCuratorApp {
def main(args:Array[String]): Unit = {
val retryPolicy =new ExponentialBackoffRetry(1000,3)
val client =CuratorFrameworkFactory.newClient("192.168.205.131:2181",retryPolicy)
client.start()
var status =""
? ? ? ? while (status !="STARTED"){
status =client.getState.toString
? ? ? ? }
storeOffsets(getOffsetRanges,"G306",client)
val result =obtainOffsets("ruoze_offset_topic","G306",client)
for (map <-result){
println("topic:"+map._1.topic+"? ? partition:"+map._1.partition+"? ? ? offset:"+map._2)
}
client.close()
}
def storeOffsets(offsetsRanges:ArrayBuffer[OffsetRange],groupName:String,zookeeper:CuratorFramework) :Unit= {
for(or <-offsetsRanges) {
val path ="/consumers/" +groupName +"/offsets/" +or.topic +"/partition/" +or.partition
? ? ? ? ? ? if(zookeeper.checkExists().forPath(path)==null){
zookeeper.create().creatingParentContainersIfNeeded().forPath(path)
}
zookeeper.setData().forPath(path,(or.utilOffset+"").getBytes())
}
}
def obtainOffsets(topic:String,groupName:String,zookeeper:CuratorFramework):Map[TopicAndPartition, Long] = {
val path ="/consumers/"+groupName+"/offsets/"+topic+"/partition"
? ? ? ? if(zookeeper.checkExists().forPath(path)==null){
Map()
}else{
val maps =new HashMap[TopicAndPartition,Long]()
val childs =zookeeper.getChildren.forPath(path)//應(yīng)答結(jié)果是List
? ? ? ? ? ? for(i <-0 until childs.size()){
val data =new String(zookeeper.getData().forPath(path+"/"+childs.get(i)))
maps(TopicAndPartition(topic,childs.get(i).toInt)) =data.toLong
? ? ? ? ? ? }
maps
? ? ? ? }
}
/*
? ? ? 隨機生成偏移量記錄對象*/
? ? def getOffsetRanges():ArrayBuffer[OffsetRange]={
val array =new ArrayBuffer[OffsetRange]()
for(i<-0 to 2){
array += OffsetRange("ruoze_offset_topic",getRandomInt(3),0,getRandomInt(1000).toLong)
}
array
? ? }
def getRandomInt(n:Int): Int ={
Random.nextInt(n)
}
}
/*
? ? 每次消費數(shù)據(jù)的記錄抖所,包括消費的對象(對應(yīng)主題和分區(qū))和消費的數(shù)據(jù)偏移量? ? fromOffset:起始偏移量? ? utilOffset:終止偏移量*/
case class OffsetRange(val topic:String,val partition: Int,val fromOffset: Long,val utilOffset: Long)
/*
? ? 主題對應(yīng)的當(dāng)前偏移量*/
case class TopicAndPartition(topic:String,partition:Int)