ActiveMQ安裝
下載地址: apache-activemq-5.15.12-bin.tar.gz
解壓和啟動(dòng)
tar -zxvf apache-activemq-5.15.12-bin.tar.gz
cd apache-activemq-5.15.12/bin
./activemq start
# 查看啟動(dòng)端口
netstat -anp|grep 61616
#關(guān)閉
./activemq stop
image.png
服務(wù)器防火墻配置
1市俊、ECS服務(wù)器需要先開啟8161(web管理頁面端口)涯捻、61616(activemq服務(wù)監(jiān)控端口) 兩個(gè)端口
image.png
2夜矗、打開linux防火墻端口(我這里是Centos7)
firewall-cmd --zone=public --add-port=8161/tcp --permanent
firewall-cmd --zone=public --add-port=61616/tcp --permanent
# 重新載入
firewall-cmd --reload
# 查看
# firewall-cmd --zone=public --query-port=8161/tcp
# 刪除
# firewall-cmd --zone=public --remove-port=61616/tcp --permanent
測試啟動(dòng)
訪問地址:http://ip:8161/admin
認(rèn)證:admin/admin
image.png
消息發(fā)送至MQ測試
1.引入依賴
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.12</version>
</dependency>
2.編寫邏輯
package com.liuyun;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import sun.security.util.Password;
import javax.jms.*;
/**
* Class
* Created by wwx193433
* 2020-04-16 18:39
*/
public class Producer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKEURL = "failover://tcp://ip:61616";
// private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//1.連接MQ
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;//消息目的地
MessageProducer messageProducer = null;//消息生產(chǎn)者
try{
System.out.println(BROKEURL);
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("消息發(fā)送");
messageProducer = session.createProducer(destination);
//2.發(fā)送消息
for(int i = 0;i<10;i++){
String txt = "Hello World, this is "+i;
TextMessage textMessage = session.createTextMessage(txt);
messageProducer.send(textMessage);
}
session.commit();
}catch (Exception e){
e.printStackTrace();
}finally {
//3.斷開
try {
messageProducer.close();
connection.close();
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
-
發(fā)送驗(yàn)證
image.png
image.png
消息消費(fèi)
package com.liuyun;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Class
* Created by wwx193433
* 2020-04-16 18:39
*/
public class Consumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKEURL = "failover://tcp://ip:61616";
// private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//1.連接MQ
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;//消息目的地
MessageConsumer messageConsumer = null;//消息消費(fèi)者
try{
System.out.println(BROKEURL);
connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("消息發(fā)送");
messageConsumer = session.createConsumer(destination);
//2.發(fā)送消息
for(int i = 0;i<5;i++){
TextMessage textMessage = (TextMessage) messageConsumer.receive();
System.out.println(textMessage.getText());
}
session.commit();
}catch (Exception e){
e.printStackTrace();
}finally {
//3.斷開
try {
messageConsumer.close();
connection.close();
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
image.png
image.png