ActiveMQ能做什么
大多數(shù)情況下ActiveMQ被用于做系統(tǒng)之間的數(shù)據(jù)交換赛蔫。
比如說幾個不同的系統(tǒng)之間需要進行業(yè)務(wù)的聯(lián)辦,這個時候我們就需要一個第三方消息中間件來做消息傳遞番川。而ActiveMQ就是這個中間件邑狸。
使用ActiveMQ的用途和優(yōu)點:
- 將數(shù)據(jù)從一個應(yīng)用程序傳送到另一個應(yīng)用程序懈糯,或者從軟件的一個模塊傳送到另外一個模塊;
- 負責(zé)建立網(wǎng)絡(luò)通信的通道单雾,進行數(shù)據(jù)的可靠傳送赚哗。
- 保證數(shù)據(jù)不重發(fā)她紫,不丟失
- 能夠?qū)崿F(xiàn)跨平臺操作,能夠為不同操作系統(tǒng)上的軟件集成數(shù)據(jù)傳送服務(wù)
安裝使用ActiveMQ
- 下載ActiveMQ
- 直接解壓屿储,啟動贿讹,在命令行cd到ActiveMQ/bin目錄下 ./activemq start
- 檢測是否已啟動
ActiveMQ默認采用61616端口提供JMS服務(wù),使用8161端口提供管理控制臺服務(wù)够掠,執(zhí)行以下命令便可以檢驗是否已經(jīng)成功啟動ActiveMQ服務(wù)
命令行中:netstat -an|find "61616"
直接訪問ActiveMQ管理頁面http://localhost:8161/admin/ 默認用戶名密碼admin/admin
- 關(guān)閉ActiveMQ可以使用./activemq stop
測試使用ActiveMQ
其實ActiveMQ是按照JMS(java消息服務(wù))規(guī)范實現(xiàn)
JMS(java消息服務(wù))就是典型的異步消息處理機制
ActiveMQ默認提供兩種通信模式
p2p(點對點)
p2p比較簡單民褂,一方發(fā)送消息,一方接收消息疯潭。相互通信的雙方是通過一個類似于隊列的方式來進行交流赊堪。而在p2p里一個queue只有一個發(fā)送者和一個接收者。queue之間是通過名字區(qū)別的
//發(fā)送消息
public class P2pSend {
public static void main(String[] args) throws JMSException, InterruptedException {
//創(chuàng)建鏈接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//創(chuàng)建連接
Connection connection = connectionFactory.createConnection();
connection.start();
//創(chuàng)建一個會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建生產(chǎn)者 發(fā)送消息的人
MessageProducer producer = session.createProducer(null);
Destination destination = session.createQueue("JOBS.1");
Message message = session.createObjectMessage(123);
System.out.println("Sending: id: " + ((ObjectMessage)message).getObject() + " on queue: " + destination);
producer.send(destination, message);
producer.close();
session.close();
connection.close();
}
}
//接收消息
public class P2pReceiver {
public static void main(String[] args) throws JMSException, InterruptedException {
//創(chuàng)建鏈接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//創(chuàng)建連接
Connection connection = connectionFactory.createConnection();
connection.start();
//創(chuàng)建一個會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("JOBS.1");
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message arg0) {
try {
Thread.sleep(2000);
System.out.println("1" + " id:" + ((ObjectMessage)arg0).getObject());
} catch (Exception e) {
e.printStackTrace();
}
}
});
//session.close();
//connection.close();
}
}
這里主要就是通過session.createQueue("隊列名稱")中的隊列名稱來判斷有那個消費者進行消費的竖哩。
訪問localhost:8161/admin/后臺可以看到
publish-subscribe(發(fā)布/訂閱)
//消息的發(fā)送者
public class PubSend {
public static void main(String[] args) throws JMSException {
//創(chuàng)建鏈接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//創(chuàng)建連接
Connection connection = connectionFactory.createConnection();
connection.start();
//創(chuàng)建一個會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建生產(chǎn)者 發(fā)送消息的人
MessageProducer producer = session.createProducer(null);
Destination destination = session.createTopic("JOB01");
MapMessage message = session.createMapMessage();
message.setString("name", "shli");
message.setDouble("price", 1.00);
message.setBoolean("up", true);
producer.send(destination, message);
producer.close();
session.close();
connection.close();
}
}
//消息接收者
public class PubReceive {
public static void main(String[] args) throws JMSException {
//創(chuàng)建鏈接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//創(chuàng)建連接
Connection connection = connectionFactory.createConnection();
connection.start();
//創(chuàng)建一個會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("JOB01");
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message arg0) {
MapMessage map = (MapMessage)arg0;
try {
String shli = map.getString("name");
double price = map.getDouble("price");
boolean up = map.getBoolean("up");
System.out.println(shli + "----" + price +"------"+up);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
MessageConsumer messageConsumer1 = session.createConsumer(destination);
messageConsumer1.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message arg0) {
MapMessage map = (MapMessage)arg0;
try {
String shli = map.getString("name");
double price = map.getDouble("price");
boolean up = map.getBoolean("up");
System.out.println(shli + "-1-1-1-" + price +"-1--1-1--"+up);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//session.close();
//connection.close();
}
}
#######這里主要注意的地方就是
- 消費者只能接收到注冊之后哭廉,發(fā)送端發(fā)送的消息
- 發(fā)送端發(fā)送的消息會分發(fā)給所有注冊的消費者
- 注冊發(fā)送端的關(guān)鍵在于session.createTopic("Topic名稱")中的Topic名稱
訪問localhost:8161/admin/ 后臺可以看到
說明
我也是第一次學(xué)習(xí)MQ,記錄學(xué)習(xí)內(nèi)容相叁,難免會有錯誤的地方遵绰。如果發(fā)現(xiàn),請指出增淹,謝謝~