一相恃、ActiveMQ的安裝
1.什么是ActiveMQ奋早?
ActiveMQ 是 Apache 出品夷蚊,最流行的娱颊,能力強(qiáng)勁的開源消息總線傲诵。ActiveMQ 是一個(gè) 完全支持 JMS1.1 和 J2EE 1.4 規(guī)范的 JMS Provider 實(shí)現(xiàn),盡管 JMS 規(guī)范出臺(tái)已經(jīng)是很久 的事情了箱硕,但是 JMS 在當(dāng)今的 J2EE 應(yīng)用中間仍然扮演著特殊的地位拴竹。
1.1什么是消息?
“消息”是在兩臺(tái)計(jì)算機(jī)間傳送的數(shù)據(jù)單位剧罩。消息可以非常簡(jiǎn)單栓拜,例如只包含文本字符串; 也可以更復(fù)雜惠昔,可能包含嵌入對(duì)象幕与。
1.2什么是隊(duì)列?
隊(duì)列的特點(diǎn)是先進(jìn)先出镇防。
- 什么是消息隊(duì)列啦鸣?
“消息隊(duì)列”是在消息的傳輸過(guò)程中保存消息的容器。
1.3常用的消息服務(wù)應(yīng)用:
(1)ActiveMQ
ActiveMQ 是 Apache 出品营罢,最流行的赏陵,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個(gè)完 全支持 JMS1.1 和 J2EE 1.4 規(guī)范的 JMS Provider 實(shí)現(xiàn)饲漾。
(2)RabbitMQ
RabbitMQ 是一個(gè)在 AMQP 基礎(chǔ)上完成的蝙搔,可復(fù)用的企業(yè)消息系統(tǒng)。他遵循 Mozilla Public License 開源協(xié)議考传。開發(fā)語(yǔ)言為 Erlang吃型。
(3)RocketMQ
由阿里巴巴定義開發(fā)的一套消息隊(duì)列應(yīng)用服務(wù)。
2.消息服務(wù)的應(yīng)用場(chǎng)景:
消息隊(duì)列的主要特點(diǎn)是異步處理僚楞,主要目的是減少請(qǐng)求響應(yīng)時(shí)間和解耦勤晚。所以主要的使 用場(chǎng)景就是將比較耗時(shí)而且不需要即時(shí)(同步)返回結(jié)果的操作作為消息放入消息隊(duì)列。同 時(shí)由于使用了消息隊(duì)列泉褐,只要保證消息格式不變赐写,消息的發(fā)送方和接收方并不需要彼此聯(lián)系, 也不需要受對(duì)方的影響膜赃,即解耦和挺邀。
參考理解
- 異步處理:
用戶注冊(cè)流程:
(1)注冊(cè)處理以及寫數(shù)據(jù)庫(kù)
(2)發(fā)送注冊(cè)成功的手機(jī)短信
(3)發(fā)送注冊(cè)成功的郵件信息
如果用消息中間件:則可以創(chuàng)建兩個(gè)線程來(lái)做這些事情,直接發(fā)送消息給消息中間件, 然后讓郵件服務(wù)和短信服務(wù)自己去消息中間件里面去取消息端铛,然后取到消息后再自己做對(duì)應(yīng) 的業(yè)務(wù)操作泣矛。
- 訂單處理(解耦):
生成訂單流程:
(1)在購(gòu)物車中點(diǎn)擊結(jié)算
(2)完成支付
(3)創(chuàng)建訂單
(4)調(diào)用庫(kù)存系統(tǒng)
訂單完成后,訂單系統(tǒng)并不去直接調(diào)用庫(kù)存系統(tǒng)禾蚕,而是發(fā)送消息到消息中間件您朽,寫入一 個(gè)訂單信息。庫(kù)存系統(tǒng)自己去消息中間件上去獲取换淆,然后做發(fā)貨處理哗总,并更新庫(kù)存,這樣能 夠?qū)崿F(xiàn)互聯(lián)網(wǎng)型應(yīng)用追求的快這一個(gè)屬性产舞。而庫(kù)存系統(tǒng)讀取訂單后庫(kù)存應(yīng)用這個(gè)操作也是非 郴臧拢快的菠剩,所以有消息中間件對(duì)解耦來(lái)說(shuō)也是一個(gè)不錯(cuò)的方向易猫。
- 秒殺功能 ( 流量的削峰 ):
秒殺流程:
(1)用戶點(diǎn)擊秒殺
(2)發(fā)送請(qǐng)求到秒殺應(yīng)用
(3)在請(qǐng)求秒殺應(yīng)用之前將請(qǐng)求放入到消息隊(duì)列
(4)秒殺應(yīng)用從消息隊(duì)列中獲取請(qǐng)求并處理。
比如具壮,系統(tǒng)舉行秒殺活動(dòng)准颓,熱門商品。流量蜂擁而至 100 件商品棺妓,10 萬(wàn)人擠進(jìn)來(lái)怎么 辦攘已?10 萬(wàn)秒殺的操作,放入消息隊(duì)列怜跑。秒殺應(yīng)用處理消息隊(duì)列中的 10 萬(wàn)個(gè)請(qǐng)求中的前 100 個(gè)样勃,其他的打回,通知失敗性芬。流量峰值控制在消息隊(duì)列處峡眶,秒殺應(yīng)用不會(huì)瞬間被懟死。
3.JMS
3.1什么是JMS植锉?
JMS(Java Messag Service)是 Java 平臺(tái)上有關(guān)面向消息中間件的技術(shù)規(guī)范辫樱,它便于 消息系統(tǒng)中的 Java 應(yīng)用程序進(jìn)行消息交換,并且通過(guò)提供標(biāo)準(zhǔn)的產(chǎn)生、發(fā)送俊庇、接收消息的接 口狮暑,簡(jiǎn)化企業(yè)應(yīng)用的開發(fā)。
3.2JMS 模型:
- 點(diǎn)對(duì)點(diǎn)模型(Point To Point):
生產(chǎn)者發(fā)送一條消息到 queue辉饱,只有一個(gè)消費(fèi)者能收到搬男。
- 發(fā)布訂閱模型(Publish/Subscribe) :
發(fā)布者發(fā)送到 topic 的消息,只有訂閱了 topic 的訂閱者才會(huì)收到消息彭沼。
4.安裝ActiveMQ:
4.1下載資源:
ActiveMQ 官網(wǎng): http://activemq.apache.org ;
- 注意:
ActiveMQ5.10.x 以上版本必須使用 JDK1.8 才能正常使用缔逛。 ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用。
4.2上傳并解壓:
將下載的資源上傳到Linux服務(wù)器并解壓;解壓命令“tar -zxf apache-activemq-5.9.0-bin.tar.gz ”译株;
將文件拷貝到指定目錄:“ cp apache-activemq-5.9.0 /usr/local/activemq -r ”瓜喇;
4.3檢查權(quán)限:
ls -al apache-activemq-5.9.0/bin 如果權(quán)限不足,則無(wú)法執(zhí)行,需要修改文件權(quán)限: chmod 755 activemq 曲掰;
- 啟動(dòng)ActiveMQ:
/usr/local/activemq/bin/activemq start 力九;
- 檢查進(jìn)程查看是否啟動(dòng)成功:
ps aux | grep activemq 喻括;下面表示啟動(dòng)成功非竿。
- 管理界面:
使用瀏覽器訪問(wèn) ActiveMQ 管理應(yīng)用, 地址如下: http://ip:8161/admin/ 吱肌;用戶名:admin诗鸭;密碼admin溢豆。
4.4修改訪問(wèn)端口:
修改 ActiveMQ 配置文件: /usr/local/activemq/conf/jetty.xml 烦粒;
配置文件修改完畢夯缺,保存并重新啟動(dòng) ActiveMQ 服務(wù)蚤氏。
- 修改用戶名和密碼:
修改 conf/users.properties 配置文件.內(nèi)容為: 用戶名=密碼;
保存并重啟 ActiveMQ 服務(wù)即可。
- 重啟和關(guān)閉ActiveMQ:
/usr/local/activemq/bin/activemq restart 踊兜;
/usr/local/activemq/bin/activemq stop 竿滨;
- 配置文件activemq.xml :
配置文件中,配置的是 ActiveMQ 的核心配置信息. 是提供服務(wù)時(shí)使用的配置. 可以修改 啟動(dòng)的訪問(wèn)端口. 即 java 編程中訪問(wèn) ActiveMQ 的訪問(wèn)端口. 默認(rèn)端口為 61616;
使用協(xié)議是: tcp 協(xié)議捏境;
修改端口后, 保存并重啟 ActiveMQ 服務(wù)即可于游。
4.5ActiveMQ目錄介紹:
(1)bin 存放的是腳本文件;
(2)conf 存放的是基本配置文件垫言;
(3)data 存放的是日志文件贰剥;
(4)docs 存放的是說(shuō)明文檔 ;
(5)examples 存放的是簡(jiǎn)單的實(shí)例筷频;
(6)lib 存放的是 activemq 所需 jar 包蚌成;
(7)webapps 用于存放項(xiàng)目的目錄 ;
5.ActiveMQ 術(shù)語(yǔ) :
(1)Destination :
目的地凛捏,JMS Provider(消息中間件)負(fù)責(zé)維護(hù)担忧,用于對(duì) Message 進(jìn)行管理的對(duì)象。 MessageProducer 需要指定 Destination 才能發(fā)送消息葵袭,MessageReceiver 需要指定 Destination 才能接收消息涵妥。
(2)Producer :
消息生成者,負(fù)責(zé)發(fā)送 Message 到目的地坡锡。
(3)Consumer | Receiver :
消息消費(fèi)者蓬网,負(fù)責(zé)從目的地中消費(fèi)【處理|監(jiān)聽|訂閱】Message。
(4)Message:
消息鹉勒,消息封裝一次通信的內(nèi)容帆锋。
二、ActiveMQ應(yīng)用:
1.ActiveMQ 常用 API 簡(jiǎn)介 :
API 都是接口類型,由定義在 javax.jms 包中禽额;
是 JMS 標(biāo)準(zhǔn)接口定義锯厢。
(1)ConnectionFactory:
鏈接工廠, 用于創(chuàng)建鏈接的工廠類型皮官;
(2)Connection :
鏈接. 用于建立訪問(wèn) ActiveMQ 連接的類型, 由鏈接工廠創(chuàng)建。
(3)Session
會(huì)話, 一次持久有效有狀態(tài)的訪問(wèn). 由鏈接創(chuàng)建实辑。
(4)Destination & Queue
目的地, 用于描述本次訪問(wèn) ActiveMQ 的消息訪問(wèn)目的地. 即 ActiveMQ 服務(wù)中的具體隊(duì) 列. 由會(huì)話創(chuàng)建. interface Queue extends Destination
(5)MessageProducer
消息生成者, 在一次有效會(huì)話中, 用于發(fā)送消息給 ActiveMQ 服務(wù)的工具. 由會(huì)話創(chuàng)建捺氢。
(6)MessageConsumer
消息消費(fèi)者【消息訂閱者,消息處理者】, 在一次有效會(huì)話中, 用于從 ActiveMQ 服務(wù)中 獲取消息的工具. 由會(huì)話創(chuàng)建剪撬。
(7)Message
消息, 通過(guò)消息生成者向 ActiveMQ 服務(wù)發(fā)送消息時(shí)使用的數(shù)據(jù)載體對(duì)象或消息消費(fèi)者 從 ActiveMQ 服務(wù)中獲取消息時(shí)使用的數(shù)據(jù)載體對(duì)象. 是所有消息【文本消息摄乒,對(duì)象消息等】 具體類型的頂級(jí)接口. 可以通過(guò)會(huì)話創(chuàng)建或通過(guò)會(huì)話從 ActiveMQ 服務(wù)中獲取。
2.使用ActiveMQ處理文本消息:
2.1創(chuàng)建消息生產(chǎn)者:
- 修改POM文件添加ActiveMQ坐標(biāo):
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
</dependencies>
2.2編寫消息的生產(chǎn)者:
/**
* 消息的生產(chǎn)者
*
* @author zhang
*
*/
public class HelloProducer {
public static void testProducer(String msg) {
// 定義工廠
ConnectionFactory factory = null;
// 定義連接對(duì)象
Connection conn = null;
// 定義會(huì)話
Session session = null;
// 目的地
Destination des = null;
// 定義消息發(fā)送者
MessageProducer producer = null;
// 定義消息
Message message = null;
try {
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
// 創(chuàng)建連接對(duì)象
conn = factory.createConnection();
// 啟動(dòng)連接
conn.start();
/*
* transacted:是否使用事務(wù) 可選值為: true|false true:使用事務(wù) 當(dāng)設(shè)置次變量 值残黑。
* Session.SESSION_TRANSACTED false:不適用事務(wù),設(shè)置次變量 則 acknowledgeMode 參數(shù)必須設(shè)置
* acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自動(dòng)消息確認(rèn)機(jī)制
* Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn) 機(jī)制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)消息機(jī)制
*/
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建目的地
des = session.createQueue("helloword-destination");
// 創(chuàng)建消息生產(chǎn)者
producer = session.createProducer(des);
// 創(chuàng)建消息對(duì)象
message = session.createTextMessage(msg);
// 發(fā)送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
// 關(guān)閉資源
if (producer != null) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
2.3創(chuàng)建消息消費(fèi)者
- 修改 POM 文件添加 ActiveMQ 坐標(biāo):
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
2.4編寫消息的消費(fèi)者:
/**
* 消息的消費(fèi)者
* @author zhang
*
*/
public class HelloConsumer {
public static void testConsumer() {
// 定義工廠
ConnectionFactory factory = null;
// 定義連接對(duì)象
Connection conn = null;
// 定義會(huì)話
Session session = null;
// 目的地
Destination des = null;
// 定義消息消費(fèi)者
MessageConsumer consumer = null;
// 定義消息
Message message = null;
try {
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
// 創(chuàng)建連接對(duì)象
conn = factory.createConnection();
// 啟動(dòng)連接
conn.start();
/*
* transacted:是否使用事務(wù) 可選值為: true|false true:使用事務(wù) 當(dāng)設(shè)置次變量 值馍佑。
* Session.SESSION_TRANSACTED false:不適用事務(wù),設(shè)置次變量 則 acknowledgeMode 參數(shù)必須設(shè)置
* acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自動(dòng)消息確認(rèn)機(jī)制
* Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn) 機(jī)制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)消息機(jī)制
*/
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建目的地
des = session.createQueue("helloword-destination");
// 創(chuàng)建消息生產(chǎn)者
consumer = session.createConsumer(des);
// 創(chuàng)建消息對(duì)象
message = consumer.receive();
//處理消息
String text = ((TextMessage)message).getText();
System.out.println("從ActiveMQ服務(wù)中獲取的文本信息"+text);
} catch (JMSException e) {
e.printStackTrace();
} finally {
// 關(guān)閉資源
if (consumer != null) {
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
3.處理對(duì)象消息:
3.1定義消費(fèi)對(duì)象(生成getter和setter):
private int userid;
private String username;
private int userage;
3.2創(chuàng)建消息的生產(chǎn)者:
/**
* 消息的生產(chǎn)者
* @author zhang
*
*/
public class HelloProducer2 {
public static void testProducer(User user) {
// 定義工廠
ConnectionFactory factory = null;
// 定義連接對(duì)象
Connection conn = null;
// 定義會(huì)話
Session session = null;
// 目的地
Destination des = null;
// 定義消息發(fā)送者
MessageProducer producer = null;
// 定義消息
Message message = null;
try {
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
// 創(chuàng)建連接對(duì)象
conn = factory.createConnection();
// 啟動(dòng)連接
conn.start();
/*
* transacted:是否使用事務(wù) 可選值為: true|false true:使用事務(wù) 當(dāng)設(shè)置次變量 值。
* Session.SESSION_TRANSACTED false:不適用事務(wù),設(shè)置次變量 則 acknowledgeMode 參數(shù)必須設(shè)置
* acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自動(dòng)消息確認(rèn)機(jī)制
* Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn) 機(jī)制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)消息機(jī)制
*/
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建目的地
des = session.createQueue("helloword-destination");
// 創(chuàng)建消息生產(chǎn)者
producer = session.createProducer(des);
// 創(chuàng)建消息對(duì)象
message = session.createObjectMessage(user);
// 發(fā)送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
// 關(guān)閉資源
if (producer != null) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
3.2創(chuàng)建消息的消費(fèi)者:
/**
* 消息的消費(fèi)者
* @author zhang
*
*/
public class HelloConsumer2 {
public static void testConsumer() {
// 定義工廠
ConnectionFactory factory = null;
// 定義連接對(duì)象
Connection conn = null;
// 定義會(huì)話
Session session = null;
// 目的地
Destination des = null;
// 定義消息消費(fèi)者
MessageConsumer consumer = null;
// 定義消息
Message message = null;
try {
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
// 創(chuàng)建連接對(duì)象
conn = factory.createConnection();
// 啟動(dòng)連接
conn.start();
/*
* transacted:是否使用事務(wù) 可選值為: true|false true:使用事務(wù) 當(dāng)設(shè)置次變量 值梨水。
* Session.SESSION_TRANSACTED false:不適用事務(wù),設(shè)置次變量 則 acknowledgeMode 參數(shù)必須設(shè)置
* acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自動(dòng)消息確認(rèn)機(jī)制
* Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn) 機(jī)制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)消息機(jī)制
*/
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建目的地
des = session.createQueue("helloword-destination");
// 創(chuàng)建消息生產(chǎn)者
consumer = session.createConsumer(des);
// 獲取消息對(duì)象
message = consumer.receive();
//處理消息
ObjectMessage objectMessage = (ObjectMessage)message;
User user = (User)objectMessage.getObject();
System.out.println("從ActiveMQ服務(wù)中獲取的文本信息"+user);
} catch (JMSException e) {
e.printStackTrace();
} finally {
// 關(guān)閉資源
if (consumer != null) {
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
-
實(shí)現(xiàn):
4.實(shí)現(xiàn)隊(duì)列服務(wù)監(jiān)聽:
4.1創(chuàng)建消息生產(chǎn)者:
public class HelloProducer3 {
public static void testProducer(String msg) {
// 定義工廠
ConnectionFactory factory = null;
// 定義連接對(duì)象
Connection conn = null;
// 定義會(huì)話
Session session = null;
// 目的地
Destination des = null;
// 定義消息發(fā)送者
MessageProducer producer = null;
// 定義消息
Message message = null;
try {
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
// 創(chuàng)建連接對(duì)象
conn = factory.createConnection();
// 啟動(dòng)連接
conn.start();
/*
* transacted:是否使用事務(wù) 可選值為: true|false true:使用事務(wù) 當(dāng)設(shè)置次變量 值拭荤。
* Session.SESSION_TRANSACTED false:不適用事務(wù),設(shè)置次變量 則 acknowledgeMode 參數(shù)必須設(shè)置
* acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自動(dòng)消息確認(rèn)機(jī)制
* Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn) 機(jī)制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)消息機(jī)制
*/
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建目的地
des = session.createQueue("02-message");
// 創(chuàng)建消息生產(chǎn)者
producer = session.createProducer(des);
// 創(chuàng)建消息對(duì)象
message = session.createTextMessage(msg);
// 發(fā)送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
// 關(guān)閉資源
if (producer != null) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
4.2創(chuàng)建消息的消費(fèi)者:
public class HelloConsumer3 {
public static void testConsumer() {
// 定義工廠
ConnectionFactory factory = null;
// 定義連接對(duì)象
Connection conn = null;
// 定義會(huì)話
Session session = null;
// 目的地
Destination des = null;
// 定義消息消費(fèi)者
MessageConsumer consumer = null;
// 定義消息
Message message = null;
try {
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
// 創(chuàng)建連接對(duì)象
conn = factory.createConnection();
// 啟動(dòng)連接
conn.start();
/*
* transacted:是否使用事務(wù) 可選值為: true|false true:使用事務(wù) 當(dāng)設(shè)置次變量 值呛谜。
* Session.SESSION_TRANSACTED false:不適用事務(wù),設(shè)置次變量 則 acknowledgeMode 參數(shù)必須設(shè)置
* acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自動(dòng)消息確認(rèn)機(jī)制
* Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn) 機(jī)制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)消息機(jī)制
*/
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建目的地
des = session.createQueue("02-message");
// 創(chuàng)建消息消費(fèi)者
consumer = session.createConsumer(des);
// 創(chuàng)建消息消費(fèi)者對(duì)象
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println(message);
// 處理消息
String text = null;
try {
text = ((TextMessage) message).getText();
} catch (JMSException e) {
e.printStackTrace();
}
System.out.println("從ActiveMQ服務(wù)中獲取的文本信息:" + text);
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
-
實(shí)現(xiàn):
5.Topic 模型 :
Publish/Subscribe 處理模式(Topic)
消息生產(chǎn)者(發(fā)布)將消息發(fā)布到 topic 中琅关,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消 息。 和點(diǎn)對(duì)點(diǎn)方式不同冤荆,發(fā)布到 topic 的消息會(huì)被所有訂閱者消費(fèi)踊沸。 當(dāng)生產(chǎn)者發(fā)布消息歇终,不管是否有消費(fèi)者社证。都不會(huì)保存消息 一定要先有消息的消費(fèi)者逼龟,后有消息的生產(chǎn)者。
5.1創(chuàng)建生產(chǎn)者:
public class HelloProducerTopic {
public static void testProducer(String msg) {
// 定義工廠
ConnectionFactory factory = null;
// 定義連接對(duì)象
Connection conn = null;
// 定義會(huì)話
Session session = null;
// 目的地
Destination des = null;
// 定義消息發(fā)送者
MessageProducer producer = null;
// 定義消息
Message message = null;
try {
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
// 創(chuàng)建連接對(duì)象
conn = factory.createConnection();
// 啟動(dòng)連接
conn.start();
/*
* transacted:是否使用事務(wù) 可選值為: true|false true:使用事務(wù) 當(dāng)設(shè)置次變量 值追葡。
* Session.SESSION_TRANSACTED false:不適用事務(wù),設(shè)置次變量 則 acknowledgeMode 參數(shù)必須設(shè)置
* acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自動(dòng)消息確認(rèn)機(jī)制
* Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn) 機(jī)制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)消息機(jī)制
*/
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建目的地
des = session.createTopic("topic");
// 創(chuàng)建消息生產(chǎn)者
producer = session.createProducer(des);
// 創(chuàng)建消息對(duì)象
message = session.createTextMessage(msg);
// 發(fā)送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
// 關(guān)閉資源
if (producer != null) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
- 創(chuàng)建消費(fèi)者:
創(chuàng)建多個(gè)消費(fèi)者腺律;代碼相同;
public class HelloConsumerTopic implements Runnable{
public void testConsumer() {
// 定義工廠
ConnectionFactory factory = null;
// 定義連接對(duì)象
Connection conn = null;
// 定義會(huì)話
Session session = null;
// 目的地
Destination des = null;
// 定義消息消費(fèi)者
MessageConsumer consumer = null;
// 定義消息
Message message = null;
try {
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.226.130:61616");
// 創(chuàng)建連接對(duì)象
conn = factory.createConnection();
// 啟動(dòng)連接
conn.start();
/*
* transacted:是否使用事務(wù) 可選值為: true|false true:使用事務(wù) 當(dāng)設(shè)置次變量 值宜肉。
* Session.SESSION_TRANSACTED false:不適用事務(wù),設(shè)置次變量 則 acknowledgeMode 參數(shù)必須設(shè)置
* acknowledgeMode: Session.AUTO_ACKNOWLEDGE:自動(dòng)消息確認(rèn)機(jī)制
* Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn) 機(jī)制 Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)消息機(jī)制
*/
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建目的地
des = session.createTopic("topic");
// 創(chuàng)建消息消費(fèi)者
consumer = session.createConsumer(des);
// 創(chuàng)建消息消費(fèi)者對(duì)象
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println(message);
// 處理消息
String text = null;
try {
text = ((TextMessage) message).getText();
} catch (JMSException e) {
e.printStackTrace();
}
System.out.println("從ActiveMQ服務(wù)中獲取的文本信息Topic1:" + text);
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void run() {
testConsumer();
}
}
6.測(cè)試:
6.1生產(chǎn)者測(cè)試:
public class Test {
public static void main(String[] args) {
// HelloProducer.testProducer("Hello");
// User user = new User();
// user.setUserid(01);
// user.setUsername("張三");
// user.setUserage(20);
// HelloProducer2.testProducer(user);
// HelloProducer3.testProducer("Dave");
HelloProducerTopic.testProducer("Dave");
}
}
-
實(shí)現(xiàn)效果:
6.2消費(fèi)者測(cè)試:
public class Test {
public static void main(String[] args) {
// HelloConsumer.testConsumer();
// HelloConsumer2.testConsumer();
// HelloConsumer3.testConsumer();
HelloConsumerTopic topic1 = new HelloConsumerTopic();
Thread thread = new Thread(topic1);
thread.start();
HelloConsumerTopic2 topic2 = new HelloConsumerTopic2();
Thread thread2 = new Thread(topic2);
thread2.start();
HelloConsumerTopic3 topic3 = new HelloConsumerTopic3();
Thread thread3 = new Thread(topic3);
thread3.start();
}
}
三匀钧、spring整合ActiveMQ
1.創(chuàng)建生產(chǎn)者項(xiàng)目:
- 修改POM文件:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.zlw</groupId>
<artifactId>11-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<groupId>com.zlw</groupId>
<artifactId>11spring-actviemq-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<dependencies>
<!-- ActiveMQ客戶端完整jar包依賴 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
<!-- ActiveMQ和Spring整合配置文件標(biāo)簽處理jar包依賴 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
</dependency>
<!-- Spring-JMS插件相關(guān)jar包依賴 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-pool</artifactId>
</dependency>
<!-- 單元測(cè)試 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<!-- 日志處理 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<!-- spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</dependency>
<!-- JSP相關(guān) -->
<dependency>
<groupId>jstl</groupId>
<artifactId>jstl</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jsp-api</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 配置Tomcat插件 -->
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<configuration>
<path>/</path>
<port>8080</port>
</configuration>
</plugin>
</plugins>
</build>
</project>
1.1整合項(xiàng)目:
- applicationContext-service.xml:
<!-- 掃描bean對(duì)象 -->
<context:component-scan base-package="com.zlw.service"/>
- springmvc:
<!-- 掃描@Controller -->
<context:component-scan base-package="com.zlw.web.controller"></context:component-scan>
<!-- 注冊(cè)兩個(gè)新對(duì)象 主要是為了來(lái)處理springmvc 中的其他 anntation 如:@requestmapping -->
<mvc:annotation-driven></mvc:annotation-driven>
<!-- 視圖解析器 -->
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
<property name="prefix" value="/WEB-INF/jsp/"></property>
<property name="suffix" value=".jsp"></property>
</bean>
<!-- 配置靜態(tài)資源映射 -->
<mvc:resources location="/WEB-INF/css/" mapping="/css/**"/>
<mvc:resources location="/WEB-INF/js/" mapping="/js/**"/>
- web.xml:
<!-- 上下文參數(shù),告訴spring配置文件路徑 -->
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:applicationContext-*.xml</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<!-- 配置springmvc -->
<servlet>
<servlet-name>springmvc</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:springmvc.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>springmvc</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<filter>
<filter-name>encoding</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>utf-8</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>encoding</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
- 整合ActiveMQ(applicationContext-jms):
<!-- 需要?jiǎng)?chuàng)建一個(gè)連接工廠,連接 ActiveMQ. ActiveMQConnectionFactory. 需要依賴 ActiveMQ 提供的 amq 標(biāo)簽 -->
<!-- amq:connectionFactory 是 bean 標(biāo)簽的子標(biāo)簽, 會(huì)在 spring 容器中創(chuàng)建一個(gè) bean 對(duì)象. 可以為對(duì)象命名.
類似: <bean id="" class="ActiveMQConnectionFactory"></bean> -->
<amq:connectionFactory
brokerURL="tcp://192.168.226.130:61616" userName="admin"
password="admin" id="amqConnectionFactory" />
<!-- spring 管理 JMS 相關(guān)代碼的時(shí)候,必須依賴 jms 標(biāo) 簽庫(kù). spring-jms 提供的標(biāo)簽庫(kù) -->
<!-- 定義 Spring-JMS 中的連接工廠對(duì)象 CachingConnectionFactory - spring 框架提供的 連接工廠對(duì)象.
不能真正的訪問(wèn) MOM 容器. 類似一個(gè)工廠的代理對(duì)象. 需要提供一個(gè)真實(shí)工 廠,實(shí)現(xiàn) MOM 容器的連接訪問(wèn). -->
<bean id="pooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactoryBean">
<property name="connectionFactory" ref="amqConnectionFactory"></property>
<property name="maxConnections" value="10"></property>
</bean>
<!-- 配置有緩存的 ConnectionFactory谬返,session 的 緩存大小可定制之斯。 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory"></property>
<property name="sessionCacheSize" value="3"></property>
</bean>
<!-- JmsTemplate 配置 -->
<bean id="template" class="org.springframework.jms.core.JmsTemplate">
<!-- 給定連接工廠, 必須是 spring 創(chuàng)建的連接工 廠. -->
<property name="connectionFactory" ref="connectionFactory"></property>
<!-- 可選 - 默認(rèn)目的地命名 -->
<property name="defaultDestinationName" value="test-spring"></property>
</bean>
1.2定義對(duì)象:
private int userid;
private String username;
private String usermail;
private int userage;
1.3創(chuàng)建發(fā)送消息的service:
@Service
public class UserServiceImpl implements UserService {
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void addUser(Users user) {
// 發(fā)送消息
jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
Message message = session.createObjectMessage(user);
return message;
}
});
}
}
1.4創(chuàng)建Controller:
@Controller
@RequestMapping("/user")
public class UserController {
@Autowired
private UserService userService;
@RequestMapping("addUser")
public String userAdd(Users users) {
userService.addUser(users);
return "ok";
}
}
- 添加JSP頁(yè)面:
<body>
<form action="/user/addUser" method="post">
<p>
ID:<input type="text" name="userid" />
</p>
<p>
姓名:<input type="text" name="username" />
</p>
<p>
年齡:<input type="text" name="userage" />
</p>
<p>
郵箱:<input type="text" name="usermail" />
</p>
<p>
<input type="submit" value="提交" />
</p>
</form>
</body>
2.創(chuàng)建消費(fèi)者項(xiàng)目:
2.1修改POM文件:
<dependencies>
<!-- activemq 客戶端 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
<!-- spring 框架對(duì) JMS 標(biāo)準(zhǔn)的支持 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<!-- ActiveMQ 和 spring 整合的插件 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
</dependencies>
2.2創(chuàng)建接受消息的service:
@Service
public class UserServiceImpl implements UserService {
@Override
public void showUser(Users user) {
System.out.println(user);
}
}
2.3創(chuàng)建Listener處理消息:
@Component(value = "myListener")
public class MyMessageListener implements MessageListener{
@Autowired
private UserService userService;
@Override
public void onMessage(Message message) {
//處理消息
ObjectMessage objectMessage = (ObjectMessage)message;
Users user = null;
try {
user = (Users) objectMessage.getObject();
} catch (JMSException e) {
e.printStackTrace();
}
this.userService.showUser(user);
}
}
2.4測(cè)試:
public class Test {
public static void main(String[] args) throws IOException {
ClassPathXmlApplicationContext cpa = new ClassPathXmlApplicationContext(new String[] {"classpath:applicationContext-jms.xml","classpath:applicationContext-service.xml"});
cpa.start();
System.err.println("spring容器啟動(dòng)!G猜痢佑刷!");
System.in.read();
}
}