ActiveMQ (一)
1.啟動ActiveMQ
1.png
2.編寫Sernder
package test.activemp;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
/*
ActiveMQ Hello World
ActiveMQ德崭,需要實現(xiàn)接收者和發(fā)送者兩部分代碼的編寫。
Sender/Receiver
step1:建立ConnectionFactory工廠對象,需要填入用戶名辱姨、密碼、以及要連接的地址,均使用默認即可施禾,
ActiveMQ默認的 TCP連接地址為:"tcp://localhost:61616"祷嘶。
step2:通過ConnectionFactory工廠對象我們創(chuàng)建一個Connection對象,并且調(diào)用Connection的start方法開啟連接脖捻,
Connection默認是關(guān)閉的阔逼。
step3:通過Connection對象創(chuàng)建Session會話(上下文環(huán)境對象),用于接收消息地沮,參數(shù)配置1為是否啟用事務(wù)嗜浮,
參數(shù)配置2為簽收模式,一般我們設(shè)置自動簽收摩疑。
step4:通過Session創(chuàng)建Destination對象危融,指的是一個客戶端用來指定生產(chǎn)消息目標和消費消息來源的對象,
在PTP模式中未荒,Destination被稱作Queue即隊列专挪;在Pub/Sub模式,Destination被稱作Topic即主題。
在程序中可以使用多個Queue和Topic寨腔。
step5:我們需要通過Session對象創(chuàng)建消息的發(fā)送和接收對象(生產(chǎn)者和消費者)MessageProducer/MessageConsumer速侈。
step6:我們可以使用MessageProducer的setDeliveryMode方法為其設(shè)置持久化特性和非持久化特性(DeliveryMode)。
step7:最后我們使用JMS規(guī)范的TextMessage形式創(chuàng)建數(shù)據(jù)(通過Session對象)迫卢,并用MessageProducer的send方法發(fā)送數(shù)據(jù)倚搬。
同理客戶端使用receive方法進行接收數(shù)據(jù),最后不要忘記關(guān)閉Connection連接乾蛤。
*/
public static void main(String[] args){
// step1:建立ConnectionFactory工廠對象每界,需要填入用戶名、密碼家卖、以及要連接的地址,均使用默認即可眨层,
// ActiveMQ默認的 TCP連接地址為:"tcp://localhost:61616",
// 這個可以在路徑:D:\Program Files\Java\ActiveMq\apache-activemq-5.6.0\conf 下的 activemq.xml文件中看到如下:
// <transportConnectors>
// <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
// </transportConnectors>
/*
* ConnectionFactory 是JMS提供的接口
* ActiveMQConnectionFactory 是apache提供的ConnectionFactory接口的實現(xiàn)上荡。
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
// step2:通過ConnectionFactory工廠對象我們創(chuàng)建一個Connection對象趴樱,并且調(diào)用Connection的start方法開啟連接,Connection默認是關(guān)閉的酪捡。
Connection connection = connectionFactory.createConnection();
connection.start();
// step3:通過Connection對象創(chuàng)建Session會話(上下文環(huán)境對象)叁征,用于接收消息,參數(shù)配置1為是否啟用事務(wù)逛薇,參數(shù)配置2為簽收模式捺疼,一般我們設(shè)置自動簽收。
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// step4:通過Session創(chuàng)建Destination對象永罚,指的是一個客戶端用來指定生產(chǎn)消息目標和消費消息來源的對象啤呼,
// 在PTP模式中,Destination被稱作Queue即隊列呢袱;在Pub/Sub模式媳友,Destination被稱作Topic即主題。
// 在程序中可以使用多個Queue和Topic产捞。
Destination destination = session.createQueue("queue1");
// step5:我們需要通過Session對象創(chuàng)建消息的發(fā)送和接收對象(生產(chǎn)者和消費者)MessageProducer/MessageConsumer。
MessageProducer messageProducer = session.createProducer(destination);
// step6:我們可以使用MessageProducer的setDeliveryMode方法為其設(shè)置持久化特性和非持久化特性(DeliveryMode)哼御。
/*
* ActiveMQ 默認是將生產(chǎn)的消息存放在 kahadab中的坯临,也可以自己放在levelDB或者JDBC中等。
* 也可以設(shè)置非持久化存儲恋昼,這種就是把ActiveMQ關(guān)閉后看靠,在重新啟動時,數(shù)據(jù)就都丟失了液肌。
*
* DeliveryMode.NON_PERSISTENT : 設(shè)置為非持久化存儲挟炬。
*/
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// step7:最后我們使用JMS規(guī)范的TextMessage形式創(chuàng)建數(shù)據(jù)(通過Session對象),并用MessageProducer的send方法發(fā)送數(shù)據(jù)。
// 同理客戶端使用receive方法進行接收數(shù)據(jù)谤祖,最后不要忘記關(guān)閉Connection連接婿滓。
for(int i=0;i<5;i++){
TextMessage textMessage = session.createTextMessage();
textMessage.setText("我是消息內(nèi)容,id為"+i);
messageProducer.send(textMessage);
System.out.println("生產(chǎn)者:"+textMessage.getText());
}
if(connection != null){
//關(guān)閉掉connection之后,它自己內(nèi)部會遞歸的去關(guān)閉掉它下級的節(jié)點粥喜。
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
3. 運行:run as javaapplication.
2.png
4.打開瀏覽器凸主,輸入網(wǎng)址:http://localhost:8161/admin/ , 然后點擊 "Queues",可以查看生產(chǎn)的消息。
3.png
5.編寫接受者 receiver
package test.activemp;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver {
public static void main(String[] args){
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("queue1");
MessageConsumer messageConsumer = session.createConsumer(destination);
// //由于是取消息额湘,所以不需要設(shè)置是否持久化
// messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
while(true){
TextMessage msg = (TextMessage)messageConsumer.receive();
if(msg == null) break;
System.out.println("收到的內(nèi)容:"+msg.getText());
}
if(connection != null){
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
6.運行 receiver
4.png