一句話(huà)告訴我Curator是什么?
ph-quote.png
(Zookeeper的背景就不在這里介紹了.)
Curator是對(duì)ZK的高階封裝. 與操作原生的Zookeeper相比, 它提供了對(duì)ZK的完美封裝, 簡(jiǎn)化了對(duì)集群的連接, 錯(cuò)誤的處理; 實(shí)現(xiàn)了一系列經(jīng)典"模式", 比如分布式鎖, Leader選舉等.
既然是為了簡(jiǎn)單
首先, 獲取一個(gè)連接實(shí)例(CuratorFramework). 對(duì)于每個(gè)集群, 只需要?jiǎng)?chuàng)建一個(gè)實(shí)例.
CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy)
這里我們需要指定一個(gè)retryPolicy, 這里用一個(gè)指數(shù)補(bǔ)償策略:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
然后我們就可以調(diào)用Zookeeper的一系列操作了, 比如創(chuàng)建:
client.create().forPath("/my/path", myData)
這里如果出現(xiàn)了連接問(wèn)題,curator也會(huì)幫你自動(dòng)重連. 所以, 很簡(jiǎn)單咯?
我們繼續(xù), C/R/U/D:
// this will create the given ZNode with the given data
client.create().forPath(path, payload);
// set data for the given node
client.setData().forPath(path, payload);
// set data for the given node asynchronously. The completion notification
// is done via the CuratorListener.
client.setData().inBackground().forPath(path, payload);
// delete the given node
client.delete().forPath(path);
public static List<String> watchedGetChildren(CuratorFramework client, String path) throws Exception
{
/**
* Get children and set a watcher on the node. The watcher notification will come through the
* CuratorListener (see setDataAsync() above).
*/
return client.getChildren().watched().forPath(path);
}
模式
分布式鎖
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) )
{
try
{
// do some work inside of the critical section here
}
finally
{
lock.release();
}
}
Leader選舉
LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()
{
public void takeLeadership(CuratorFramework client) throws Exception
{
// this callback will get called when you are the leader
// do whatever leader work you need to and only exit
// this method when you want to relinquish leadership
}
}
LeaderSelector selector = new LeaderSelector(client, path, listener);
selector.autoRequeue(); // not required, but this is behavior that you will probably expect
selector.start();
事務(wù)
Collection<CuratorTransactionResult> results = client.inTransaction()
.create().forPath("/a/path", "some data".getBytes())
.and()
.setData().forPath("/another/path", "other data".getBytes())
.and()
.delete().forPath("/yet/another/path")
.and()
.commit(); // IMPORTANT! The transaction is not submitted until commit() is called
附錄
模塊 | 簡(jiǎn)介 |
---|---|
Framework | 簡(jiǎn)化Zookeeper的使用,提供了一系列高階API, 在Zookeeper集群連接的管理, 以及重連操作上化繁為簡(jiǎn). |
Receipts | 實(shí)現(xiàn)了一些ZooKeeper的基本"模式", 該實(shí)現(xiàn)基于Curator Framework. |
Utilities | 一系列工具 |
Client | ZooKeeper類(lèi)的替代 |
Errors | 定義了curator如何處理錯(cuò)誤,連接問(wèn)題,可恢復(fù)的異常等. |
Extensions | 實(shí)現(xiàn)了一些Zookeeper文檔中提及的通用模式. |
版本
- Curator 2.x.x - ZooKeeper 3.4.x, ZooKeeper 3.5.x
- Curator 3.x.x - 只兼容ZooKeeper 3.5.x; 支持dynamic reconfiguration等新feature.