0 緣起
Pulsar是一個支持多租戶的僚匆、高性能的消息中間件嘉抓。
2018年11月中旬開始初步在線上生產(chǎn)環(huán)境使用振湾。
pulsar官網(wǎng),部署教程文檔還不是很詳細跺涤,網(wǎng)絡(luò)上的教程基本都是官網(wǎng)的翻譯版光酣,對于一個沒有豐富經(jīng)驗的開發(fā)者還是會踩一些坑苔货,本文記錄一下相對詳細測試集群搭建狂男。
1 準備資源
一臺主機(本文以macOS為例)
java8運行環(huán)境
2 搭建集群的組成
zk集群(3個ZooKeeper節(jié)點組成)
bookie集群(3個BookKeeper節(jié)點組成)
broker集群(3個Pulsar節(jié)點組成)
3 zk集群-搭建
ZooKeeper 版本 3.4.12。
使用一臺機器脾拆,在該臺機器上運行多個ZooKeeper 服務(wù)進程馒索,搭建zk集群。
(3.1)下載zk名船,解壓绰上。
Zookeeper官網(wǎng)下載地址(zookeeper-3.4.12.tar.gz)
https://archive.apache.org/dist/zookeeper/zookeeper-3.4.12/
(3.2)將解壓好的zookeeper-3.4.12復制到新建文件zookeepers目錄下,重名為server1包帚。用server1復制出server2和server3,目錄結(jié)構(gòu)如圖所示运吓。
(3.3)對server1進行配置(先對zk的一個節(jié)點進行配置)渴邦。
(3.3.1)在server1目錄下疯趟,新建data和dataLog兩個文件夾。目錄結(jié)構(gòu)如下谋梭。
(3.3.2)將conf目錄下的zoo_sample.cfg文件重命名為zoo.cfg信峻。如圖所示。
(3.3.3)修改 zoo.cfg 文件內(nèi)容,主要修改以下5個配置參數(shù)瓮床,以及添加集群節(jié)點信息盹舞。
*修改如下5個參數(shù)
dataDir
dataLogDir
clientPort
admin.enableServer
admin.serverPort
*添加如下集群節(jié)點信息
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
我的server1的zoo.cfg 文件內(nèi)容如下:
The number of milliseconds of each tick
tickTime=2000
The number of ticks that the initial
synchronization phase can take
initLimit=10
The number of ticks that can pass between
sending a request and getting an acknowledgement
syncLimit=10
the directory where the snapshot is stored.
do not use /tmp for storage, /tmp here is just
example sakes.
dataDir=/Users/gaotianci/Softwares/zookeepers/server1/data
dataLogDir=/Users/gaotianci/Softwares/zookeepers/server1/dataLog
the port at which the clients will connect
clientPort=2181
the maximum number of client connections.
increase this if you need to handle more clients
maxClientCnxns=60
admin.enableServer=true
admin.serverPort=9181
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
Be sure to read the maintenance section of the
administrator guide before turning on autopurge.
http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
Purge task interval in hours
Set to "0" to disable auto purge feature
autopurge.purgeInterval=1
對zoo.cfg 配置參數(shù)解釋:
tickTime:zookeeper中使用的基本時間單位, 毫秒值,默認2000ms隘庄。
initLimit:用來配置Zookeeper服務(wù)器集群中Follower服務(wù)器初始化連接Leader服務(wù)器時最長能忍受多少個tickTime踢步。這里設(shè)置為5表示最長容忍時間為10秒。
syncLimit:用來配置Leader與Follower之間發(fā)送消息丑掺、請求和應(yīng)答時間最長能忍受多少個tickTime获印。這里設(shè)置為2表示最長容忍時間為4秒。
dataDir:數(shù)據(jù)文件目錄街州。
dataLogDir:日志文件目錄兼丰。
clientPort:監(jiān)聽client連接的端口號。
server.{myid}={ip}:{leader服務(wù)器交換信息的端口}:{當leader服務(wù)器掛了后, 選舉leader的端口}
maxClientCnxns:對于一個客戶端的連接數(shù)限制唆缴,默認是60鳍征。
admin.enableServer:是否啟用zk管理后臺。
admin.serverPort:管理后臺端口號面徽。
在一臺服務(wù)器上艳丛,部署多個實例,需要指定不同的端口號斗忌。
[1]clientPort: 3個zk節(jié)點中分別配置為:2181,2182,2183
[2]admin.enableServer:3個zk節(jié)點中都配置為 true
[3]admin.serverPort:3個zk節(jié)點中分別配置為:9181,9182,9183
[4]集群節(jié)點信息:3個zk節(jié)點中都配置為:
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
(3.3.4)在server1/data/目錄下創(chuàng)建名字為myid文件质礼。
向myid文件中寫入內(nèi)容 1
對myid文件的解釋:
每一個zk節(jié)點的的myid內(nèi)容都不能一樣,它是不同節(jié)點的唯一標識织阳。
myid文件內(nèi)容,3個zk節(jié)點中分別為1,2,3
(3.3.5)server1配置完畢眶蕉,對server2,server3做類似配置。具體不再贅述唧躲。
(3.3.6)啟動zk集群,在終端分別用命令啟動zk節(jié)點造挽。
./bin/zkServer.sh start zk啟動命令(zkServer.sh start)
./bin/zkServer.sh status zk狀態(tài)查看命令(zkServer.sh status)
./bin/zkServer.sh stop zk關(guān)閉命令(zkServer.sh stop)
(3.3.7)./bin/zkServer.sh status 命令查看節(jié)點狀態(tài)。
(3.3.8)用zk客戶端命令(./bin/zkCli.sh -timeout 5000 -server 127.0.0.1:2181)連接zk節(jié)點,在這里連接server1弄痹。
(3.3.9)查看(ls /)zk集群當前內(nèi)容饭入。
(3.3.10)向zk集群寫入元數(shù)據(jù)內(nèi)容。用如下命令寫入肛真,后面解釋寫入內(nèi)容谐丢。
create /--cluster pulsar-cluster
create /--zookeeper 127.0.0.1:2181
create /--configuration-store 127.0.0.1:2181
create /--web-service-url http://pulsar.cluster.com:8080
create /--web-service-url-tls https://pulsar.cluster.com:8443
create /--broker-service-url pulsar://pulsar.cluster.com:6650
create /--broker-service-url-tls pulsar+ssl://pulsar.cluster.com:6651
zk集群寫入元數(shù)據(jù)內(nèi)容解釋:
--cluster
集群名稱
--zookeeper
ZooKeeper集群連接參數(shù),僅需要包含集群中的一個節(jié)點即可
--configuration-store
Pulsar實例的配置存儲集群(ZooKeeper),和-zookeeper參數(shù)一樣只需要包含集群中的一個節(jié)點即可
--web-service-url
集群Web服務(wù)的URL+端口乾忱,URL必須是一個i標準的DNS名稱讥珍,默認端口8080,不建議修改窄瘟。
--web-service-url-tls
集群Web提供TLS服務(wù)的URL+端口衷佃,端口默認8443,不建議修改蹄葱。
--broker-service-url
集群brokers服務(wù)URL氏义,URL中DNS的名稱和Web服務(wù)保持一致,URL使用pulsar替代http/http图云,端口默認6650惯悠,不建議修改。
--broker-service-url-tls
集群brokers提供TLS服務(wù)的URL琼稻,默認端口6551吮螺,不建議修改。
4 bookie集群-搭建
bookie 是bookkeeper 的別稱帕翻。版本 4.7.2鸠补。
使用一臺機器,在該臺機器上運行多個bookie 服務(wù)進程嘀掸,搭建bookie集群紫岩。
(4.1)下載zk,解壓睬塌。
bookkeeper官網(wǎng)下載地址(bookkeeper-server-4.7.2-bin.tar.gz)
https://bookkeeper.apache.org/releases/
(4.2)將解壓好的bookkeeper-server-4.7.2復制到新建文件bookkeepers目錄下泉蝌,重名為bookie1。用bookie1復制出bookie2和bookie3揩晴,目錄結(jié)構(gòu)如圖所示勋陪。
(4.3)對bookie1進行配置(先對bookkeeper的一個節(jié)點進行配置)。
(4.3.1)修改 bk_server.conf 文件內(nèi)容,主要修改以下3個端口號,以防在一臺主機上造成端口號沖突硫兰,以及添加zk集群節(jié)點信息诅愚。
*修改如下3個參數(shù)
bookiePort
httpServerPort
storageserver.grpc.port
*添加如下zk集群節(jié)點信息
zkServers=localhost:2181,localhost:2182,localhost:2183
在一臺服務(wù)器上,部署多個實例劫映,需要指定不同的端口號违孝。
[1]bookiePort: 3個bookie節(jié)點中分別配置為:3181,3182,3183
[2]httpServerPort: 3個bookie節(jié)點中都配置為:8050,8060,8070
[3]storageserver.grpc.port: 3個bookie節(jié)點中都配置為: 4181,4182,4183
[4]zk集群節(jié)點信息:3個bookie中都配置為:
zkServers=localhost:2181,localhost:2182,localhost:2183
(4.3.2)bookie1配置完畢,對bookie2,bookie3做類似配置泳赋。具體不再贅述雌桑。
(4.3.3)執(zhí)行初始化集群元數(shù)據(jù)命令(在一個bookie上執(zhí)行即可),命令如下。
./bin/bookkeeper shell metaformat
(4.3.4)在終端祖今,3個bookie下校坑,分別執(zhí)行命令啟動bookie,命令如下拣技。
./bin/bookkeeper bookie
(4.3.5)檢查bookie啟動狀態(tài),集群啟動成功會有"Bookie sanity test succeeded"日志輸出。在一個bookie 實例下執(zhí)行如下命令耍目。
./bin/bookkeeper shell bookiesanity
5 broker集群-搭建
broker 是pulsar實例別稱过咬。版本 2.2.0。
使用一臺機器制妄,在該臺機器上運行多個broker 服務(wù)進程,搭建broker集群泵三。
(5.1)下載pulsar耕捞,解壓。
pulsar官網(wǎng)下載地址(apache-pulsar-2.2.0-bin-tar.gz)
http://pulsar.apache.org/zh-CN/download/
(5.2)將解壓好的apache-pulsar-2.2.0復制到新建文件brokers目錄下烫幕,重名為broker1俺抽。用broker1復制出broker2和broker3,目錄結(jié)構(gòu)如圖所示较曼。
(5.3)對broker1進行配置(先對broker的一個節(jié)點進行配置)磷斧。
(5.3.1)修改 broker.conf 文件內(nèi)容,主要修改以下4個端口號,以防在一臺主機上造成端口號沖突,以及添加zk集群節(jié)點信息捷犹。
*修改如下4個參數(shù)
brokerServicePort
brokerServicePortTls
webServicePort
webServicePortTls
*添加如下zk集群節(jié)點信息
zookeeperServers=localhost:2181,localhost:2182,localhost:2183
configurationStoreServers=localhost:2181,localhost:2182,localhost:2183
在一臺服務(wù)器上弛饭,部署多個實例,需要指定不同的端口號萍歉。
[1]brokerServicePort: 3個broker節(jié)點中分別配置為:6650,6660,6670
[2]brokerServicePortTls: 3個broker節(jié)點中分別配置為:6651,6661,6671
[3]webServicePort: 3個broker節(jié)點中分別配置為:8080,8081,8082
[4]webServicePortTls: 3個broker節(jié)點中分別配置為:8443,8444,8445
[5]zk集群節(jié)點信息:3個broker中都配置為:
zookeeperServers=localhost:2181,localhost:2182,localhost:2183
configurationStoreServers=localhost:2181,localhost:2182,localhost:2183
(5.3.2)broker1配置完畢侣颂,對broker2,broker3做類似配置。具體不再贅述枪孩。
(5.3.3)在終端憔晒,3個broker下,分別執(zhí)行命令啟動broker,啟動成功后會有日志“PulsarService started”輸出,命令如下蔑舞。
./bin/pulsar broker
6 pulsar集群啟動完畢拒担,現(xiàn)用命令創(chuàng)建集群名,租戶名攻询,命名空間从撼,topic并行給出測試demo。
(6.1)依次創(chuàng)建集群蜕窿,租戶谋逻,命名空間,分區(qū)topic桐经,并為命名空間指定集群毁兆。
創(chuàng)建集群(集群名:pulsar-cluster)
./bin/pulsar-admin clusters create --url http://pulsar.cluster.com:8080 pulsar-cluster
創(chuàng)建租戶(租戶名:my-tenant)
./bin/pulsar-admin tenants create my-tenant
創(chuàng)建命名空間(命名空間名,指定了租戶my-tenant:my-tenant/my-namespace)
./bin/pulsar-admin namespaces create my-tenant/my-namespace
創(chuàng)建持久性分區(qū)topic(topic全名:persistent://my-tenant/my-namespace/my-topic)
./bin/pulsar-admin topics create-partitioned-topic persistent://my-tenant/my-namespace/my-topic -p 3
更新命名空間為其指定集群
./bin/pulsar-admin namespaces set-clusters my-tenant/my-namespace --clusters pulsar-cluster
(6.2)生產(chǎn)者,消費者測試demo阴挣。
(6.2.1)maven依賴
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.2.0</version>
</dependency>
(6.2.2)生產(chǎn)者
public class PulsarProducerDemo {
private static String localClusterUrl = "pulsar://localhost:6650";
public static void main(String[] args) {
try {
Producer<byte[]> producer = getProducer();
String msg = "hello world pulsar!";
Long start = System.currentTimeMillis();
MessageId msgId = producer.send(msg.getBytes());
System.out.println("spend=" + (System.currentTimeMillis() - start) + ";send a message msgId = " + msgId.toString());
} catch (Exception e) {
System.err.println(e);
}
}
public static Producer<byte[]> getProducer() throws Exception {
PulsarClient client;
client = PulsarClient.builder().serviceUrl(localClusterUrl).build();
Producer<byte[]> producer = client.newProducer().topic("persistent://my-tenant/my-namespace/my-topic").producerName("producerName").create();
return producer;
}
}
(6.2.3)消費者
public class PulsarConsumerDemo {
private static String localClusterUrl = "pulsar://localhost:6650";
public static void main(String[] args) {
try {
//將訂閱消費者指定的主題和訂閱
Consumer<byte[]> consumer = getClient().newConsumer()
.topic("persistent://my-tenant/my-namespace/my-topic")
.subscriptionName("my-subscription")
.subscribe();
while (true) {
Message msg = consumer.receive();
System.out.printf("consumer-Message received: %s. \n", new String(msg.getData()));
// 確認消息气堕,以便broker刪除消息
consumer.acknowledge(msg);
}
} catch (Exception e) {
System.out.println(e);
}
}
public static PulsarClient getClient() throws Exception {
PulsarClient client;
client = PulsarClient.builder().serviceUrl(localClusterUrl).build();
return client;
}
}