ActiveMQ高可用與集群搭建

ActiveMQ的高可用性架構(gòu)是基于Master/Slave 模型的,Master/Slave模式的搭建有好幾種方式:

  • 1)Shared File System Master Slave 使用共享文件系統(tǒng)的方式
  • 2)JDBC Master Slave 使用數(shù)據(jù)庫作為持久化存儲
  • 3)基于zookeeper 方式搭建

我們采取的是zookeeper的方式搭建Active高可用環(huán)境:

步驟一:

確保已經(jīng)安裝好jdk8的環(huán)境和zookeeper的環(huán)境.如果沒裝好的同學,找之前的文檔裝一裝.

步驟二:

正常來說,多個ActiveMQ應該配置在多臺服務器中的,但是做實驗我們就只在一臺CentOS中來模擬,每個ActiveMQ的端口不一致.在/usr/local目錄下創(chuàng)建mqcluster目錄,然后在該目錄中創(chuàng)建mq1,mq2,mq3的文件夾.將apache-activemq-5.15.3-bin.tar.gz壓縮包的內(nèi)容解壓到這三個目錄.


步驟三:

首先修改每個ActiveMQ的持久化方式(修改ACTIVEMQ_HOME/conf/activemq.xml文件),ActiveMQ默認使用的是KahaDB作為持久化存儲數(shù)據(jù)的,我們修改成levelDB.如下圖所示:

<persistenceAdapter>
     <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
     <replicatedLevelDB
            directory="${activemq.data}/leveldb"
            replicas="3"
            bind="tcp://0.0.0.0:0"
            zkAddress="192.168.122.129:2181"
            zkPassword=""
            hostname="localhost"
            sync="local_disk"
            zkPath="/activemq/leveldb-stores/group1"
    />
</persistenceAdapter>

directory:levelDB持久化路徑
replicas=“3”: ActiveMQ實例的節(jié)點數(shù),它需要滿足2N+1
bind:當這個節(jié)點成為Master, 它會綁定配置好的地址和端口來履行主從復制協(xié)議(官方建議不要改動)
zkAddress:zk的地址,這里可以配置集群
sync: 在認為消息被消費完成前, 同步信息所存貯的策略,如果有多種策略用逗號隔開, ActiveMQ會選擇較強的策略(local_mem, local_disk則肯定選擇存貯在本地硬盤)
zkPath : ZooKeeper選舉信息交換的存貯路徑

步驟四:

針對mq2和mq3的目錄需要修改一下對應的端口.端口分配如下所示:

節(jié)點 openwire端口 amqp端口 stomp端口 mqtt端口 ws端口 admin端口
mq1 61616 5672 61613 1883 61614 8161
mq2 61617 5682 61623 1903 61634 8162
mq3 61618 5692 61633 1923 61654 8163

vi /usr/local/mqcluster/mq1/conf/activemq.xml:配置如下:

<transportConnectors>
  <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
  <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

vi /usr/local/mqcluster/mq2/conf/activemq.xml:配置如下:

<transportConnectors>
  <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
  <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="amqp" uri="amqp://0.0.0.0:5682?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="stomp" uri="stomp://0.0.0.0:61623?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1903?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="ws" uri="ws://0.0.0.0:61634?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

vi /usr/local/mqcluster/mq3/conf/activemq.xml:配置如下:

<transportConnectors>
  <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
  <transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="amqp" uri="amqp://0.0.0.0:5692?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="stomp" uri="stomp://0.0.0.0:61633?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1913?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="ws" uri="ws://0.0.0.0:61654?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
步驟五:

接著需要修改jetty的端口號(修改ACTIVEMQ_HOME/bin/jetty.xml文件).mq1依然使用默認端口8161伶选,mq2使用8162史飞,mq3使用8163端口:
vi /usr/local/mqcluster/mq1/conf/jetty.xml:配置如下:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort"
      init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8161"/>
</bean>

vi /usr/local/mqcluster/mq2/conf/jetty.xml:配置如下:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort"
      init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8162"/>
</bean>

vi /usr/local/mqcluster/mq3/conf/jetty.xml:配置如下:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort"
      init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8163"/>
</bean>
步驟六:

到這里為止,ActiveMQ的高可用就已經(jīng)配置好了,需要先把zookeeper先開啟.(注意要把防火墻關閉),分別啟動mq1,mq2,mq3程序.
/usr/local/mqcluster/mq1/bin/linux-x86-64/activemq start
/usr/local/mqcluster/mq2/bin/linux-x86-64/activemq start
/usr/local/mqcluster/mq3/bin/linux-x86-64/activemq start
可以登錄zookeeper客戶端可以查看到對應的節(jié)點.


其中elected不為空的節(jié)點表示為Master尖昏,由該activemq對外提供服務。
接著我們來測試一下:
生產(chǎn)端:

/**
 * Created by lanxw
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        //第一步:建立ConnectionFactory工廠對象,需要填入用戶名,密碼,以及要連接的地址.均使用默認即可,默認端口為tcp://localhost:61616
        ConnectionFactory factory = new ActiveMQConnectionFactory(
                "lanxw",
                "lanxw",
                "failover:(tcp://192.168.122.129:61616,tcp://192.168.122.129:61617,tcp://192.168.122.129:61618)?randomize=false"
        );
        //第二步:通過ConnectionFactory工廠對象我們創(chuàng)建一個Connection連接,并且調(diào)用Connection的start方法開啟連接,Connection默認是關閉的.
        Connection connection = factory.createConnection();
        connection.start();
        //第三步:通過Connection對象創(chuàng)建Session會話(上下文環(huán)境對象),用于接受信息,參數(shù)配置1為是否啟動事務,參數(shù)配置2為簽收模式,一般我們設置自動簽收.
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //第四步:通過session創(chuàng)建Destination對象,指的是一個客戶端用戶指定生產(chǎn)者消息目標和消費消息的來源的對象.在PTP模式中,Destination稱作Queue即隊列;在Pub/Sub模式,Destination被稱作Topic即主體.在程序中可以使用多個Queue和Topic.
        Queue destination = session.createQueue("test");
        //第五步:我們需要通過Session對象創(chuàng)建消息的發(fā)送和接受對象(生產(chǎn)者和消費者)MessageProducer/MessageConsumer
        MessageProducer producer = session.createProducer(null);
        TextMessage msg;
        for(int i = 0;i<3;i++){
            msg = session.createTextMessage("這是隊列消息" + i);
            //第六步:我們可以使用MessageProducer的setDeliveryMode方法為其設置持久化特性和非持久化特性(DeliveryMode),我們稍后詳細介紹.
            producer.send(destination,msg);
        }
        //session.commit();
        //第七步:最后我們使用JMS規(guī)范的TextMessage形式創(chuàng)建數(shù)據(jù)(通過Session對象),并用MessageProducer的send方法發(fā)送數(shù)據(jù).同理客戶端使用receive方法進行接接收數(shù)據(jù),最后不要忘記關閉Connection連接.
        if(connection!=null){
            connection.close();
        }
    }
}

消費端:

/**
 * Created by lanxw
 */
public class Cusumer {
    public static void main(String[] args) throws JMSException {
        //第一步:建立ConnectionFactory工廠對象,需要填入用戶名,密碼,以及要連接的地址.均使用默認即可,默認端口為tcp://localhost:61616
        ConnectionFactory factory = new ActiveMQConnectionFactory(
                "lanxw",
                "lanxw",
                "failover:(tcp://192.168.122.129:61616,tcp://192.168.122.129:61617,tcp://192.168.122.129:61618)?randomize=false");
        //第二步:通過ConnectionFactory工廠對象我們創(chuàng)建一個Connection連接,并且調(diào)用Connection的start方法開啟連接,Connection默認是關閉的.
        Connection connection = factory.createConnection();
        connection.start();
        //第三步:通過Connection對象創(chuàng)建Session會話(上下文環(huán)境對象),用于接受信息,參數(shù)配置1為是否啟動事務,參數(shù)配置2為簽收模式,一般我們設置自動簽收.
        Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
        //第四步:通過session創(chuàng)建Destination對象,指的是一個客戶端用戶指定生產(chǎn)者消息目標和消費消息的來源的對象.在PTP模式中,Destination稱作Queue即隊列;在Pub/Sub模式,Destination被稱作Topic即主體.在程序中可以使用多個Queue和Topic.
        Destination destination = session.createQueue("test");
        //第五步:通過session創(chuàng)建消費者對象
        MessageConsumer consumer = session.createConsumer(destination);
        while (true){
            TextMessage msg = (TextMessage) consumer.receive();
            System.out.println(msg.getText());
        }
    }
}
測試:

我們停掉任何一臺ActiveMQ,發(fā)現(xiàn)依然可以發(fā)送消息和接收消息.說明我們已經(jīng)搭建好ActiveMQ的高可用的環(huán)境.

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末构资,一起剝皮案震驚了整個濱河市抽诉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌吐绵,老刑警劉巖朴则,帶你破解...
    沈念sama閱讀 222,681評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件廷没,死亡現(xiàn)場離奇詭異,居然都是意外死亡粤剧,警方通過查閱死者的電腦和手機槽唾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,205評論 3 399
  • 文/潘曉璐 我一進店門罢洲,熙熙樓的掌柜王于貴愁眉苦臉地迎上來窖铡,“玉大人琅豆,你說我怎么就攤上這事≡世郑” “怎么了?”我有些...
    開封第一講書人閱讀 169,421評論 0 362
  • 文/不壞的土叔 我叫張陵削咆,是天一觀的道長牍疏。 經(jīng)常有香客問我,道長拨齐,這世上最難降的妖魔是什么鳞陨? 我笑而不...
    開封第一講書人閱讀 60,114評論 1 300
  • 正文 為了忘掉前任,我火速辦了婚禮瞻惋,結(jié)果婚禮上厦滤,老公的妹妹穿的比我還像新娘。我一直安慰自己歼狼,他們只是感情好掏导,可當我...
    茶點故事閱讀 69,116評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著羽峰,像睡著了一般趟咆。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上梅屉,一...
    開封第一講書人閱讀 52,713評論 1 312
  • 那天值纱,我揣著相機與錄音,去河邊找鬼坯汤。 笑死虐唠,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的惰聂。 我是一名探鬼主播疆偿,決...
    沈念sama閱讀 41,170評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼咱筛,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了翁脆?” 一聲冷哼從身側(cè)響起眷蚓,我...
    開封第一講書人閱讀 40,116評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎反番,沒想到半個月后沙热,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,651評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡罢缸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,714評論 3 342
  • 正文 我和宋清朗相戀三年篙贸,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片枫疆。...
    茶點故事閱讀 40,865評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡爵川,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出息楔,到底是詐尸還是另有隱情寝贡,我是刑警寧澤,帶...
    沈念sama閱讀 36,527評論 5 351
  • 正文 年R本政府宣布值依,位于F島的核電站圃泡,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏愿险。R本人自食惡果不足惜颇蜡,卻給世界環(huán)境...
    茶點故事閱讀 42,211評論 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望辆亏。 院中可真熱鬧风秤,春花似錦、人聲如沸扮叨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,699評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽彻磁。三九已至甸鸟,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間兵迅,已是汗流浹背抢韭。 一陣腳步聲響...
    開封第一講書人閱讀 33,814評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留恍箭,地道東北人刻恭。 一個月前我還...
    沈念sama閱讀 49,299評論 3 379
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親鳍贾。 傳聞我的和親對象是個殘疾皇子鞍匾,可洞房花燭夜當晚...
    茶點故事閱讀 45,870評論 2 361

推薦閱讀更多精彩內(nèi)容