ActiveMQ的高可用性架構(gòu)是基于Master/Slave 模型的,Master/Slave模式的搭建有好幾種方式:
- 1)Shared File System Master Slave 使用共享文件系統(tǒng)的方式
- 2)JDBC Master Slave 使用數(shù)據(jù)庫作為持久化存儲
- 3)基于zookeeper 方式搭建
我們采取的是zookeeper的方式搭建Active高可用環(huán)境:
步驟一:
確保已經(jīng)安裝好jdk8的環(huán)境和zookeeper的環(huán)境.如果沒裝好的同學,找之前的文檔裝一裝.
步驟二:
正常來說,多個ActiveMQ應該配置在多臺服務器中的,但是做實驗我們就只在一臺CentOS中來模擬,每個ActiveMQ的端口不一致.在/usr/local目錄下創(chuàng)建mqcluster目錄,然后在該目錄中創(chuàng)建mq1,mq2,mq3的文件夾.將apache-activemq-5.15.3-bin.tar.gz壓縮包的內(nèi)容解壓到這三個目錄.
步驟三:
首先修改每個ActiveMQ的持久化方式(修改ACTIVEMQ_HOME/conf/activemq.xml
文件),ActiveMQ默認使用的是KahaDB作為持久化存儲數(shù)據(jù)的,我們修改成levelDB.如下圖所示:
<persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="192.168.122.129:2181"
zkPassword=""
hostname="localhost"
sync="local_disk"
zkPath="/activemq/leveldb-stores/group1"
/>
</persistenceAdapter>
directory
:levelDB持久化路徑
replicas=“3”
: ActiveMQ實例的節(jié)點數(shù),它需要滿足2N+1
bind
:當這個節(jié)點成為Master, 它會綁定配置好的地址和端口來履行主從復制協(xié)議(官方建議不要改動)
zkAddress
:zk的地址,這里可以配置集群
sync
: 在認為消息被消費完成前, 同步信息所存貯的策略,如果有多種策略用逗號隔開, ActiveMQ會選擇較強的策略(local_mem, local_disk則肯定選擇存貯在本地硬盤)
zkPath
: ZooKeeper選舉信息交換的存貯路徑
步驟四:
針對mq2和mq3的目錄需要修改一下對應的端口.端口分配如下所示:
節(jié)點 | openwire端口 | amqp端口 | stomp端口 | mqtt端口 | ws端口 | admin端口 |
---|---|---|---|---|---|---|
mq1 | 61616 | 5672 | 61613 | 1883 | 61614 | 8161 |
mq2 | 61617 | 5682 | 61623 | 1903 | 61634 | 8162 |
mq3 | 61618 | 5692 | 61633 | 1923 | 61654 | 8163 |
vi /usr/local/mqcluster/mq1/conf/activemq.xml
:配置如下:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
vi /usr/local/mqcluster/mq2/conf/activemq.xml
:配置如下:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5682?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61623?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1903?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61634?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
vi /usr/local/mqcluster/mq3/conf/activemq.xml
:配置如下:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5692?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61633?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1913?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61654?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
步驟五:
接著需要修改jetty的端口號(修改ACTIVEMQ_HOME/bin/jetty.xml
文件).mq1依然使用默認端口8161
伶选,mq2使用8162
史飞,mq3使用8163
端口:
vi /usr/local/mqcluster/mq1/conf/jetty.xml
:配置如下:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort"
init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8161"/>
</bean>
vi /usr/local/mqcluster/mq2/conf/jetty.xml
:配置如下:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort"
init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8162"/>
</bean>
vi /usr/local/mqcluster/mq3/conf/jetty.xml
:配置如下:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort"
init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
步驟六:
到這里為止,ActiveMQ的高可用就已經(jīng)配置好了,需要先把zookeeper先開啟.(注意要把防火墻關閉),分別啟動mq1,mq2,mq3程序.
/usr/local/mqcluster/mq1/bin/linux-x86-64/activemq start
/usr/local/mqcluster/mq2/bin/linux-x86-64/activemq start
/usr/local/mqcluster/mq3/bin/linux-x86-64/activemq start
可以登錄zookeeper客戶端可以查看到對應的節(jié)點.
其中elected不為空的節(jié)點表示為Master尖昏,由該activemq對外提供服務。
接著我們來測試一下:
生產(chǎn)端
:
/**
* Created by lanxw
*/
public class Producer {
public static void main(String[] args) throws Exception {
//第一步:建立ConnectionFactory工廠對象,需要填入用戶名,密碼,以及要連接的地址.均使用默認即可,默認端口為tcp://localhost:61616
ConnectionFactory factory = new ActiveMQConnectionFactory(
"lanxw",
"lanxw",
"failover:(tcp://192.168.122.129:61616,tcp://192.168.122.129:61617,tcp://192.168.122.129:61618)?randomize=false"
);
//第二步:通過ConnectionFactory工廠對象我們創(chuàng)建一個Connection連接,并且調(diào)用Connection的start方法開啟連接,Connection默認是關閉的.
Connection connection = factory.createConnection();
connection.start();
//第三步:通過Connection對象創(chuàng)建Session會話(上下文環(huán)境對象),用于接受信息,參數(shù)配置1為是否啟動事務,參數(shù)配置2為簽收模式,一般我們設置自動簽收.
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//第四步:通過session創(chuàng)建Destination對象,指的是一個客戶端用戶指定生產(chǎn)者消息目標和消費消息的來源的對象.在PTP模式中,Destination稱作Queue即隊列;在Pub/Sub模式,Destination被稱作Topic即主體.在程序中可以使用多個Queue和Topic.
Queue destination = session.createQueue("test");
//第五步:我們需要通過Session對象創(chuàng)建消息的發(fā)送和接受對象(生產(chǎn)者和消費者)MessageProducer/MessageConsumer
MessageProducer producer = session.createProducer(null);
TextMessage msg;
for(int i = 0;i<3;i++){
msg = session.createTextMessage("這是隊列消息" + i);
//第六步:我們可以使用MessageProducer的setDeliveryMode方法為其設置持久化特性和非持久化特性(DeliveryMode),我們稍后詳細介紹.
producer.send(destination,msg);
}
//session.commit();
//第七步:最后我們使用JMS規(guī)范的TextMessage形式創(chuàng)建數(shù)據(jù)(通過Session對象),并用MessageProducer的send方法發(fā)送數(shù)據(jù).同理客戶端使用receive方法進行接接收數(shù)據(jù),最后不要忘記關閉Connection連接.
if(connection!=null){
connection.close();
}
}
}
消費端
:
/**
* Created by lanxw
*/
public class Cusumer {
public static void main(String[] args) throws JMSException {
//第一步:建立ConnectionFactory工廠對象,需要填入用戶名,密碼,以及要連接的地址.均使用默認即可,默認端口為tcp://localhost:61616
ConnectionFactory factory = new ActiveMQConnectionFactory(
"lanxw",
"lanxw",
"failover:(tcp://192.168.122.129:61616,tcp://192.168.122.129:61617,tcp://192.168.122.129:61618)?randomize=false");
//第二步:通過ConnectionFactory工廠對象我們創(chuàng)建一個Connection連接,并且調(diào)用Connection的start方法開啟連接,Connection默認是關閉的.
Connection connection = factory.createConnection();
connection.start();
//第三步:通過Connection對象創(chuàng)建Session會話(上下文環(huán)境對象),用于接受信息,參數(shù)配置1為是否啟動事務,參數(shù)配置2為簽收模式,一般我們設置自動簽收.
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
//第四步:通過session創(chuàng)建Destination對象,指的是一個客戶端用戶指定生產(chǎn)者消息目標和消費消息的來源的對象.在PTP模式中,Destination稱作Queue即隊列;在Pub/Sub模式,Destination被稱作Topic即主體.在程序中可以使用多個Queue和Topic.
Destination destination = session.createQueue("test");
//第五步:通過session創(chuàng)建消費者對象
MessageConsumer consumer = session.createConsumer(destination);
while (true){
TextMessage msg = (TextMessage) consumer.receive();
System.out.println(msg.getText());
}
}
}
測試:
我們停掉任何一臺ActiveMQ,發(fā)現(xiàn)依然可以發(fā)送消息和接收消息.說明我們已經(jīng)搭建好ActiveMQ的高可用的環(huán)境.