zookeeper主要的功能是數(shù)據(jù)的存儲與分配,zookeeper的API可以根據(jù)開發(fā)者的需求設(shè)計不同的工作模式工扎,因此可以適用于很多場景之中。
這里用python寫了一個例子來實現(xiàn)簡單的發(fā)布訂閱功能。例子中實現(xiàn)了發(fā)布偿凭、監(jiān)視的功能齐蔽,完整的發(fā)布訂閱還有很多功能需要實現(xiàn)两疚,都可以通過zookeeper協(xié)助實現(xiàn),例子中實現(xiàn)了server發(fā)布和client接收的功能含滴。
- 首先我們需要搭建一個zookeeper集群诱渤,這里就不詳講了,可以參考http://www.reibang.com/p/b76628b82208谈况。例子中用了3臺虛擬機每臺分別搭建一個zookeeper服務(wù)來實現(xiàn)zookeeper集群勺美。
- 安裝kazoo
pip3 install kazoo
- 發(fā)布端代碼
#!/bin/usr/python
#_*_ coding: utf-8 _*_
import sys
from myzk import ZooKeeper
zk = ZooKeeper(hosts='192.168.137.129:2181,192.168.137.130:2181,192.168.137.131:2181')
path = "/monitor/" + sys.argv[1]
data = sys.argv[2]
zk.create_nodes(path, data)
- 接收端代碼
#!/bin/usr/python
#_*_ coding: utf-8 _*_
import sys
from myzk import ZooKeeper
zk = ZooKeeper(hosts='192.168.137.129:2181,192.168.137.130:2181,192.168.137.131:2181')
zk.watch_child_node("/monitor")
- zookeeper類
#!/bin/usr/python
#_*_ coding: utf-8 _*_
import time
from kazoo.client import KazooClient,ChildrenWatch,DataWatch
class ZooKeeper(KazooClient):
def __init__(self, timeout=15, *args, **kwargs):
super().__init__(timeout=timeout, *args, **kwargs)
self.start()
def __del__(self):
self.stop()
def create_path(self, zk_path):
self.ensure_path(zk_path)
return
def update_nodes_data(self, zk_path, data):
self.set(zk_path, bytes(data, encoding="utf8"))
return
def create_nodes(self, zk_path, data):
self.create(zk_path, bytes(data, encoding="utf8"))
return
def get_data(self, zk_path):
if self.exists(zk_path):
return self.get(zk_path)
else:
print("node not exists")
return
def watch_child_node(self, zk_path):
@ChildrenWatch(client=self, path=zk_path, send_event=True)
def get_changes(children, event):
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " node childs had been change")
print("now: ", children)
if event:
print(event)
return children
while True:
time.sleep(5)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " watching...")
def watch_node_data(self, zk_path):
@DataWatch(client=self, path=zk_path)
def watch_node(data, stat):
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " data changed")
if data:
print("Version: %s, data: %s" % (stat.version, data.decode("utf8")))
while True:
time.sleep(5)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " watching...")
把接收端和Zookeeper類的代碼放在三臺server上,然后均運行接收端代碼碑韵,然后隨便選一臺server運行發(fā)布端代碼赡茸,每創(chuàng)建一個新節(jié)點,三個接收端都會發(fā)現(xiàn)祝闻。
運行結(jié)果:
我們隨機找一臺服務(wù)器運行發(fā)布端代碼
server.png
接收端1
client1.png
接收端2
client2.png
接收端3
client3.png
可以看出每次新增一個znode占卧,接收端都能響應(yīng)。