我們在上一篇文章中已經(jīng)實現(xiàn)了ActiveMQ的高可用集群,但是無法做到負(fù)載均衡.接下來我們通過修改配置文件實現(xiàn)支持負(fù)載均衡的集群.
步驟一:
基于上一篇文章的環(huán)境下,我們還是在同一臺CentOS中來完成這個實驗.需要創(chuàng)建6個ActiveMQ的節(jié)點.要實現(xiàn)多主多從,我們把他們分成兩組(Group)
group1
Master Broker(61616)+Slave Borker(61617)+Slave Borker(61618)
group2
Master Broker(61619)+Slave Borker(61620)+Slave Borker(61621)
目錄創(chuàng)建如下:
步驟二:
因為我們是在一臺機(jī)器上做實驗,所以不同的ActiveMQ需要有不同的端口配置,具體分配如下:
節(jié)點 | openwire端口 | amqp端口 | stomp端口 | mqtt端口 | ws端口 | admin端口 |
---|---|---|---|---|---|---|
mq1 | 61616 | 5672 | 61613 | 1883 | 61614 | 8161 |
mq2 | 61617 | 5682 | 61623 | 1903 | 61634 | 8162 |
mq3 | 61618 | 5692 | 61633 | 1923 | 61654 | 8163 |
mq4 | 61619 | 5702 | 61643 | 1933 | 61664 | 8165 |
mq5 | 61620 | 5712 | 61653 | 1943 | 61674 | 8166 |
mq6 | 61621 | 5722 | 61673 | 1963 | 61694 | 8167 |
具體配置的修改同學(xué)們就自己去修改,我在文章中就不貼出來了.
步驟三:
我們要實現(xiàn)的效果是,我們可以往兩個Group中的Master Broker節(jié)點發(fā)送任意消息.
然后消費(fèi)端連接任意一臺機(jī)器都可以獲取到所有的消息.
這時候我們需要在配置文件中配置,讓兩組的Group之間的機(jī)器是可見的.
1 ) 在group1中的所有節(jié)點需要修改activemq.xml文件,添加如下配置:
vi /usr/local/mqcluster/mq1/conf/activemq.xml
:這段配置添加在節(jié)點persistenceAdapter
前
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.122.129:61619,tcp://192.168.122.129:61620,tcp://192.168.122.129:61621)" duplex="true"/>
</networkConnectors>
- 在group2中的所有節(jié)點需要修改
activemq.xml
文件,添加如下配置:
vi /usr/local/mqcluster/mq4/conf/activemq.xml
:這段配置添加在節(jié)點persistenceAdapter
前
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.122.129:61616,tcp://192.168.122.129:61617,tcp://192.168.122.129:61618)" duplex="true"/>
</networkConnectors>
在group2中的所有節(jié)點還需要修改activemq.xml中的persistenceAdapter
內(nèi)容:
<persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="192.168.122.129:2181"
zkPassword=""
hostname="localhost"
sync="local_disk"
zkPath="/activemq/leveldb-stores/group2"
/>
</persistenceAdapter>
修改zkPath="/activemq/leveldb-stores/group2"
,其他不用變.
好,到這一步我們所有的配置都配置好了.接下來就做測試了,測試的代碼還是原來那套.
測試:
- 將6臺ActiveMQ的示例啟動起來.
- 使用group1的生產(chǎn)者發(fā)送3條消息,生產(chǎn)者的地址配置:
ConnectionFactory factory = new ActiveMQConnectionFactory(
"lanxw",
"lanxw",
"failover:(tcp://192.168.122.129:61616,tcp://192.168.122.129:61617,tcp://192.168.122.129:61618)?randomize=false"
);
- 使用group2的生產(chǎn)者發(fā)送3條消息,生產(chǎn)者地址配置:
ConnectionFactory factory = new ActiveMQConnectionFactory(
"lanxw",
"lanxw",
"failover:(tcp://192.168.122.129:61619,tcp://192.168.122.129:61620,tcp://192.168.122.129:61621)?randomize=false"
);
4.運(yùn)行消費(fèi)端,是可以把group1和group2的消息消費(fèi)掉的.消費(fèi)端地址配置:
ConnectionFactory factory = new ActiveMQConnectionFactory(
"lanxw",
"lanxw",
"failover:(tcp://192.168.122.129:61616,tcp://192.168.122.129:61617,tcp://192.168.122.129:61618," +
"tcp://192.168.122.129:61619,tcp://192.168.122.129:61620,tcp://192.168.122.129:61621)?randomize=false");