一、ActiveMQ簡(jiǎn)介
1.什么是ActiveMQ
ActiveMQ是Apache出品,最流行的顽爹,能力強(qiáng)勁的開(kāi)源消息總線朱灿。ActiveMQ是一個(gè)完全支持JMS1.1 和J2EE1.4規(guī)范的JMS Provider實(shí)現(xiàn),盡管JMS規(guī)范出臺(tái)已經(jīng)是很久的事情了,但是JMS在當(dāng)今的J2EE應(yīng)用中間仍然扮演著特殊的地位。
2.什么是消息
“消息”是在兩臺(tái)計(jì)算機(jī)間傳送的數(shù)據(jù)單位拐云。消息可以非常簡(jiǎn)單完疫,例如只包含文本字符串库正,也可以更復(fù)雜,可以包含嵌入對(duì)象追他。
3.什么是隊(duì)列
是一種有序的,先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu)钝荡,例如:生活中的排隊(duì)
4.什么是消息隊(duì)列
“消息隊(duì)列”是在消息的傳輸過(guò)程中保存消息的容器
5.常見(jiàn)消息服務(wù)應(yīng)用
- ActiveMQ
- RabbitMQ
- RocketMQ
二街立、消息服務(wù)的應(yīng)用場(chǎng)景
消息隊(duì)列的特點(diǎn)主要是異步處理,主要作用是減少消息請(qǐng)求和響應(yīng)的時(shí)間以及解耦埠通。所以主要用于比較耗時(shí)并且不需要即時(shí)(同步)返回結(jié)果的操作赎离。
2.1 異步處理
2.1.1 用戶注冊(cè)
用戶注冊(cè)流程:
- 注冊(cè)處理及寫(xiě)入數(shù)據(jù)庫(kù)
- 發(fā)送注冊(cè)成功的手機(jī)短信
- 發(fā)送注冊(cè)成功的郵件信息
如果使用消息中間件,則可以創(chuàng)建兩個(gè)線程來(lái)做這些事情植阴,直接發(fā)送消息給消息中間件蟹瘾,然后讓郵件服務(wù)和短信服務(wù)去消息中間件中取消息圾浅,取到消息后自己再做對(duì)應(yīng)的操作掠手。
2.2 應(yīng)用的解耦
2.2.1 訂單處理
生成訂單流程:
- 在購(gòu)物車中點(diǎn)擊結(jié)算
- 完成支付
- 創(chuàng)建訂單
- 調(diào)用庫(kù)存系統(tǒng)
訂單完成后,訂單系統(tǒng)不用直接取調(diào)用庫(kù)存系統(tǒng)狸捕,而是發(fā)送消息到消息中間件喷鸽,寫(xiě)入一個(gè)訂單信息。庫(kù)存系統(tǒng)自己去消息中間件中獲取灸拍,然后做發(fā)貨處理做祝,并更新庫(kù)存。
2.3流量的削峰
2.3.1 秒殺功能
秒殺流程
- 用戶點(diǎn)擊秒殺
- 發(fā)送請(qǐng)求到秒殺應(yīng)用
- 在請(qǐng)求秒殺應(yīng)用之前將請(qǐng)求放入到消息隊(duì)列
- 秒殺應(yīng)用從消息隊(duì)列中獲取請(qǐng)求并處理
系統(tǒng)舉行秒殺活動(dòng)鸡岗,流量蜂擁而至100件商品混槐,10萬(wàn)人擠進(jìn)來(lái)怎么辦?
將10萬(wàn)秒殺的操作轩性,放入消息隊(duì)列声登。秒殺應(yīng)用將10萬(wàn)個(gè)請(qǐng)求中的前100個(gè)進(jìn)行處理,其它的駁回通知失敗。這樣將流量控制在了消息隊(duì)列處悯嗓。秒殺應(yīng)用不會(huì)被懟死件舵。
三、JMS
1.什么是JMS
JMS(Java Message Service)是Java平臺(tái)上面向消息中間件的技術(shù)規(guī)范脯厨,它便于消息系統(tǒng)中的Java應(yīng)用程序進(jìn)行消息交換铅祸,并且提供標(biāo)準(zhǔn)的產(chǎn)生、發(fā)送合武、接收消息的接口临梗,簡(jiǎn)化企業(yè)應(yīng)用的開(kāi)發(fā)。
2.JMS模型
2.1 點(diǎn)對(duì)點(diǎn)模型(Point To Point)
生產(chǎn)者發(fā)送一條消息到queue眯杏,只有一個(gè)消費(fèi)者能收到夜焦。
2.2 發(fā)布訂閱模型(Publish/Subscribe)
發(fā)布者發(fā)送到topic的消息,只有訂閱了topic的訂閱者才會(huì)收到消息岂贩。
四茫经、ActiveMQ安裝
1.下載資源
ActiveMQ官網(wǎng):http://activemq.apache.org
1.1 版本說(shuō)明
ActiveMQ5.10.x 以上版本必須使用 JDK1.8 才能正常使用。
ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用萎津。
2.上傳至Linux服務(wù)器
3.解壓安裝文件
tar -zxf apache-activemq-5.9.0-bin.tar.gz
4.檢查權(quán)限
ls -al apache-activemq-5.9.0/bin
如果權(quán)限不足卸伞,則無(wú)法執(zhí)行,需要修改文件權(quán)限:
chmod 755 activemq
5.復(fù)制應(yīng)用至本地目錄
cp apache-activemq-5.9.0 /usr/local/activemq -r
6.啟動(dòng)ActiveMQ
/usr/local/activemq/bin/activemq start
7.測(cè)試ActiveMQ
7.1檢查進(jìn)程
ps aux|grep activemq
7.2管理界面
使用瀏覽器訪問(wèn)ActiveMQ管理應(yīng)用锉屈,地址如下:
http://ip:8161/admin/
用戶名:admin
密碼:admin
AcitveMQ使用的是Jetty提供的HTTP服務(wù)荤傲。啟動(dòng)稍慢,建議短暫等待再訪問(wèn)測(cè)試颈渊。
見(jiàn)到如下界面代表服務(wù)啟動(dòng)成功
7.3 修改訪問(wèn)端口(管理應(yīng)用監(jiān)聽(tīng)的端口)
修改ActiveMQ配置文件:
/usr/local/activemq/conf/jetty.xml
7.4 修改用戶名和密碼
修改conf/users.properties配置文件遂黍,內(nèi)容為:用戶名=密碼
保存并啟動(dòng)ActiveMQ服務(wù)即可。
8.重啟ActiveMQ
/usr/local/activemq/bin/activemq restart
9.關(guān)閉ActiveMQ
/usr/local/activemq/bin/activemq stop
10.配置文件activemq.xml
配置文件中俊嗽,配置的是ActiveMQ的核心配置信息雾家,是提供服務(wù)時(shí)使用的配置,可以修改啟動(dòng)的訪問(wèn)端口绍豁,即Java編程中訪問(wèn)ActiveMQ的訪問(wèn)端口
默認(rèn)端口:61616(編程時(shí)使用的端口)
使用協(xié)議:TCP協(xié)議
修改端口后芯咧,保存并重啟ActiveMQ服務(wù)即可
11.ActiveMQ目錄介紹
bin:可執(zhí)行的腳本文件
conf:相關(guān)的配置文件
data:存放的是日志文件
docs:存放的是相關(guān)文檔
examples:存放的是簡(jiǎn)單的實(shí)例
lib:相關(guān)的jar包
webapps:用于存放項(xiàng)目的目錄
五、ActiveMQ術(shù)語(yǔ)
1.Destination
目的地竹揍,JMS Provider(消息中間件)負(fù)責(zé)維護(hù)敬飒,用于對(duì)Message進(jìn)行管理的對(duì)象。MessageProducer需要指定Destination才能發(fā)送消息芬位,MessageReceiver需要指定Destination才能接收消息无拗。
2.Producer
消息生成者,負(fù)責(zé)發(fā)送Message到目的地昧碉。
3.Consumer|Receiver
消息消費(fèi)者英染,負(fù)責(zé)從目的地中消費(fèi)(處理/監(jiān)聽(tīng)/訂閱)Message
4.Message
消息阴孟,用于封裝一次通信的內(nèi)容
六、ActiveMQ應(yīng)用
1.ActiveMQ常用API簡(jiǎn)介
下述API都是接口類型税迷,定義在javax.jms包中
1.1 ConnectionFactory
連接工廠:用于創(chuàng)建連接的工廠類型
1.2 Connection
連接:用于建立訪問(wèn)ActiveMQ連接的類型永丝,由連接工廠創(chuàng)建
1.3 Session
會(huì)話:一次持久、有效箭养、有狀態(tài)的訪問(wèn)慕嚷,由連接創(chuàng)建
1.4 Destination & Queue
目的地:用于描述本次訪問(wèn)ActiveMQ的消息訪問(wèn)目的地,即ActiveMQ服務(wù)中的具體隊(duì)列毕泌,由會(huì)話創(chuàng)建
Interface Queue extends Destination
1.5 MessageProducer
消息生產(chǎn)者:在一次有效會(huì)話中喝检,用于發(fā)送消息給ActiveMQ的服務(wù)工具,由會(huì)話創(chuàng)建
1.6 MessageConsumer
消息消費(fèi)者:在一次有效會(huì)話中撼泛,用于從ActiveMQ中獲取消息的工具挠说,由會(huì)話創(chuàng)建
1.7 Message
消息:通過(guò)消息生產(chǎn)者向ActiveMQ服務(wù)發(fā)送消息時(shí)使用的數(shù)據(jù)載體對(duì)象或消息消費(fèi)者從ActiveMQ服務(wù)中獲取消息時(shí)使用的數(shù)據(jù)載體對(duì)象,是所有消息(文本消息愿题、對(duì)象消息等)具體類型的頂級(jí)接口损俭,可以通過(guò)會(huì)話創(chuàng)建或通過(guò)會(huì)話從ActiveMQ服務(wù)中獲取
2.JMS-HelloWorld
2.1 處理文本消息
2.1.1 創(chuàng)建消息生產(chǎn)者
創(chuàng)建工程
mq-producer
添加坐標(biāo)
<!--activeMQ-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq-all.version}</version>
</dependency>
編寫(xiě)消息生產(chǎn)者
public class HelloWorldProducer {
public void sendHelloWorldActiveMQ(String msgText){
//定義連接工廠
ConnectionFactory connectionFactory = null;
//定義連接對(duì)象
Connection connection = null;
//定義會(huì)話
Session session = null;
//目的地
Destination destination = null;
//消息生產(chǎn)者
MessageProducer producer = null;
//定義消息
Message message = null;
try {
//傳入的用戶名和密碼可以通過(guò)jetty-realm.properties文件修改
//brokerURL:訪問(wèn)activeMQ的連接地址,路徑結(jié)構(gòu)為:協(xié)議://主機(jī)地址:端口號(hào)
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");
//創(chuàng)建連接對(duì)象
connection = connectionFactory.createConnection();
//啟動(dòng)連接(此時(shí)才是真正創(chuàng)建連接)
connection.start();
/**
* 創(chuàng)建會(huì)話
* transacted:是否使用事務(wù)潘酗,可選值為true杆兵,false
* true:使用事務(wù),設(shè)置此變量值仔夺,Session.SESSION.TRANSACTION
* false:不使用事務(wù)琐脏,設(shè)置此變量 則acknowledgeMode必須設(shè)置
* acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自動(dòng)確認(rèn)機(jī)制
* Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn)機(jī)制(需手動(dòng)調(diào)用API)
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)機(jī)制(前兩種一旦收到消息確認(rèn)就會(huì)進(jìn)行刪除,這個(gè)則不會(huì))
*/
session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);
//創(chuàng)建目的地缸兔,即隊(duì)列的名稱日裙,消息消費(fèi)者需要通過(guò)此名稱訪問(wèn)對(duì)應(yīng)的隊(duì)列
destination = session.createQueue("helloworld-destination");
//創(chuàng)建消息生產(chǎn)者
producer = session.createProducer(destination);
//創(chuàng)建消息對(duì)象
message = session.createTextMessage(msgText);
//發(fā)送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
//回收消息發(fā)送者資源
if (producer != null){
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null){
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
2.1.2 創(chuàng)建消息消費(fèi)者
創(chuàng)建工程
mq-consumer
添加坐標(biāo)
<!--activeMQ-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
創(chuàng)建消息生產(chǎn)者
public class HelloWorldConsumer {
public void receiveHelloWorldActiveMQ() {
//定義連接工廠
ConnectionFactory connectionFactory = null;
//定義連接
Connection connection = null;
//定義會(huì)話
Session session = null;
//定義目的地
Destination destination = null;
//定義消息消費(fèi)者
MessageConsumer consumer = null;
//定義消息
Message message = null;
try {
//創(chuàng)建連接工廠
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
//創(chuàng)建連接對(duì)象
connection = connectionFactory.createConnection();
//開(kāi)啟連接
connection.start();
//創(chuàng)建會(huì)話
session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
//創(chuàng)建目的地
destination = session.createQueue("helloworld-destination");
//創(chuàng)建消息消費(fèi)者
consumer = session.createConsumer(destination);
//接收消息
message = consumer.receive();
//獲取文本消息
String msg = ((TextMessage) message).getText();
System.out.println("從ActiveMQ中獲取的文本信息:" + msg);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (consumer != null) {
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
2.1.3 測(cè)試
測(cè)試生產(chǎn)者
public class Test {
public static void main(String[] args) {
HelloWorldProducer producer = new HelloWorldProducer();
producer.sendHelloWorldActiveMQ("HelloWorld");
}
}
測(cè)試消費(fèi)者
public class Test {
public static void main(String[] args) {
HelloWorldConsumer consumer = new HelloWorldConsumer();
consumer.receiveHelloWorldActiveMQ();
}
}
2.2 處理對(duì)象消息
2.2.1 創(chuàng)建對(duì)象
public class User implements Serializable {
private Integer userId;
private String userName;
private Integer userAge;
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public Integer getUserAge() {
return userAge;
}
public void setUserAge(Integer userAge) {
this.userAge = userAge;
}
@Override
public String toString() {
return "User{" +
"userId=" + userId +
", userName='" + userName + '\'' +
", userAge=" + userAge +
'}';
}
}
2.2.2 創(chuàng)建生產(chǎn)者
public class HelloWorldProducer2 {
public void sendHelloWorldActiveMQ(User user){
//定義連接工廠
ConnectionFactory connectionFactory = null;
//定義連接對(duì)象
Connection connection = null;
//定義會(huì)話
Session session = null;
//目的地
Destination destination = null;
//消息生產(chǎn)者
MessageProducer producer = null;
//定義消息
Message message = null;
try {
//傳入的用戶名和密碼可以通過(guò)jetty-realm.properties文件修改
//brokerURL:訪問(wèn)activeMQ的連接地址,路徑結(jié)構(gòu)為:協(xié)議://主機(jī)地址:端口號(hào)
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");
//創(chuàng)建連接對(duì)象
connection = connectionFactory.createConnection();
//啟動(dòng)連接(此時(shí)才是真正創(chuàng)建連接)
connection.start();
/**
* 創(chuàng)建會(huì)話
* transacted:是否使用事務(wù)惰蜜,可選值為true昂拂,false
* true:使用事務(wù),設(shè)置此變量值蝎抽,Session.SESSION.TRANSACTION
* false:不使用事務(wù)政钟,設(shè)置此變量 則acknowledgeMode必須設(shè)置
* acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自動(dòng)確認(rèn)機(jī)制
* Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn)機(jī)制(需手動(dòng)調(diào)用API)
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)機(jī)制(前兩種一旦收到消息確認(rèn)就會(huì)進(jìn)行刪除路克,這個(gè)則不會(huì))
*/
session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);
//創(chuàng)建目的地樟结,即隊(duì)列的名稱,消息消費(fèi)者需要通過(guò)此名稱訪問(wèn)對(duì)應(yīng)的隊(duì)列
destination = session.createQueue("my-user");
//創(chuàng)建消息生產(chǎn)者
producer = session.createProducer(destination);
//創(chuàng)建消息對(duì)象
message = session.createObjectMessage(user);
//發(fā)送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
//回收消息發(fā)送者資源
if (producer != null){
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null){
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
2.2.3 創(chuàng)建消費(fèi)者
public class HelloWorldConsumer2 {
public void receiveHelloWorldActiveMQ() {
//定義連接工廠
ConnectionFactory connectionFactory = null;
//定義連接
Connection connection = null;
//定義會(huì)話
Session session = null;
//定義目的地
Destination destination = null;
//定義消息消費(fèi)者
MessageConsumer consumer = null;
//定義消息
Message message = null;
try {
//創(chuàng)建連接工廠
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
//創(chuàng)建連接對(duì)象
connection = connectionFactory.createConnection();
//開(kāi)啟連接
connection.start();
//創(chuàng)建會(huì)話
session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
//創(chuàng)建目的地
destination = session.createQueue("my-user");
//創(chuàng)建消息消費(fèi)者
consumer = session.createConsumer(destination);
//接收消息
message = consumer.receive();
Serializable obj = ((ObjectMessage) message).getObject();
User user = (User) obj;
System.out.println("從ActiveMQ中獲取的對(duì)象信息:" + user);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (consumer != null) {
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
2.2.4 測(cè)試
public class Test {
public static void main(String[] args) {
/*HelloWorldProducer producer = new HelloWorldProducer();
producer.sendHelloWorldActiveMQ("HelloWorld");*/
HelloWorldProducer2 producer2 = new HelloWorldProducer2();
producer2.sendHelloWorldActiveMQ(new User(1, "tom", 21));
}
}
public class Test {
public static void main(String[] args) {
/*HelloWorldConsumer consumer = new HelloWorldConsumer();
consumer.receiveHelloWorldActiveMQ();*/
HelloWorldConsumer2 consumer2 = new HelloWorldConsumer2();
consumer2.receiveHelloWorldActiveMQ();
}
}
3.JMS-實(shí)現(xiàn)隊(duì)列服務(wù)監(jiān)聽(tīng)
隊(duì)列監(jiān)聽(tīng)使用了觀察者模式
3.1 創(chuàng)建消息生產(chǎn)者
public class HelloWorldProducer3 {
public void sendHelloWorldActiveMQ(User user){
//定義連接工廠
ConnectionFactory connectionFactory = null;
//定義連接對(duì)象
Connection connection = null;
//定義會(huì)話
Session session = null;
//目的地
Destination destination = null;
//消息生產(chǎn)者
MessageProducer producer = null;
//定義消息
Message message = null;
try {
//傳入的用戶名和密碼可以通過(guò)jetty-realm.properties文件修改
//brokerURL:訪問(wèn)activeMQ的連接地址精算,路徑結(jié)構(gòu)為:協(xié)議://主機(jī)地址:端口號(hào)
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");
//創(chuàng)建連接對(duì)象
connection = connectionFactory.createConnection();
//啟動(dòng)連接(此時(shí)才是真正創(chuàng)建連接)
connection.start();
/**
* 創(chuàng)建會(huì)話
* transacted:是否使用事務(wù)瓢宦,可選值為true,false
* true:使用事務(wù)灰羽,設(shè)置此變量值驮履,Session.SESSION.TRANSACTION
* false:不使用事務(wù)鱼辙,設(shè)置此變量 則acknowledgeMode必須設(shè)置
* acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自動(dòng)確認(rèn)機(jī)制
* Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn)機(jī)制(需手動(dòng)調(diào)用API)
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)機(jī)制(前兩種一旦收到消息確認(rèn)就會(huì)進(jìn)行刪除,這個(gè)則不會(huì))
*/
session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);
//創(chuàng)建目的地玫镐,即隊(duì)列的名稱倒戏,消息消費(fèi)者需要通過(guò)此名稱訪問(wèn)對(duì)應(yīng)的隊(duì)列
destination = session.createQueue("my-destination");
//創(chuàng)建消息生產(chǎn)者
producer = session.createProducer(destination);
//創(chuàng)建消息對(duì)象
message = session.createObjectMessage(user);
//發(fā)送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
//回收消息發(fā)送者資源
if (producer != null){
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null){
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
3.2 創(chuàng)建消息消費(fèi)者
public class HelloWorldConsumer3 {
public void receiveHelloWorldActiveMQ() {
//定義連接工廠
ConnectionFactory connectionFactory = null;
//定義連接
Connection connection = null;
//定義會(huì)話
Session session = null;
//定義目的地
Destination destination = null;
//定義消息消費(fèi)者
MessageConsumer consumer = null;
//定義消息
Message message = null;
try {
//創(chuàng)建連接工廠
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
//創(chuàng)建連接對(duì)象
connection = connectionFactory.createConnection();
//開(kāi)啟連接
connection.start();
//創(chuàng)建會(huì)話
session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
//創(chuàng)建目的地
destination = session.createQueue("my-destination");
//創(chuàng)建消息消費(fèi)者
consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
//ActiveMQ的回調(diào)方法,通過(guò)該方法將消息傳遞到consumer中
@Override
public void onMessage(Message message) {
Serializable obj = null;
try {
obj = ((ObjectMessage) message).getObject();
} catch (JMSException e) {
e.printStackTrace();
}
User user = (User) obj;
System.out.println("從ActiveMQ中獲取的對(duì)象信息:" + user);
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
4.Topic模型
4.1 Publish/Subscribe 處理模式(Topic)
消息生產(chǎn)者(發(fā)布)將消息發(fā)布到topic中恐似,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息杜跷。
和點(diǎn)對(duì)點(diǎn)方式不同,發(fā)布到topic的消息會(huì)被所有訂閱者消費(fèi)
當(dāng)生產(chǎn)者發(fā)布消息矫夷,不管是否有消費(fèi)者葛闷,都不會(huì)保存消息
一定要先有消息的消費(fèi)者,后有消息生產(chǎn)者
4.2 創(chuàng)建消息生產(chǎn)者
public class HelloWorldProducerTopic {
public void sendHelloWorldActiveMQ(String msgText){
//定義連接工廠
ConnectionFactory connectionFactory = null;
//定義連接對(duì)象
Connection connection = null;
//定義會(huì)話
Session session = null;
//目的地
Destination destination = null;
//消息生產(chǎn)者
MessageProducer producer = null;
//定義消息
Message message = null;
try {
//傳入的用戶名和密碼可以通過(guò)jetty-realm.properties文件修改
//brokerURL:訪問(wèn)activeMQ的連接地址双藕,路徑結(jié)構(gòu)為:協(xié)議://主機(jī)地址:端口號(hào)
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");
//創(chuàng)建連接對(duì)象
connection = connectionFactory.createConnection();
//啟動(dòng)連接(此時(shí)才是真正創(chuàng)建連接)
connection.start();
/**
* 創(chuàng)建會(huì)話
* transacted:是否使用事務(wù)淑趾,可選值為true,false
* true:使用事務(wù)忧陪,設(shè)置此變量值扣泊,Session.SESSION.TRANSACTION
* false:不使用事務(wù),設(shè)置此變量 則acknowledgeMode必須設(shè)置
* acknowledgeMode:
* Session.AUTO_ACKNOWLEDGE:自動(dòng)確認(rèn)機(jī)制
* Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn)機(jī)制(需手動(dòng)調(diào)用API)
* Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)機(jī)制(前兩種一旦收到消息確認(rèn)就會(huì)進(jìn)行刪除嘶摊,這個(gè)則不會(huì))
*/
session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);
//創(chuàng)建目的地旷赖,即隊(duì)列的名稱,消息消費(fèi)者需要通過(guò)此名稱訪問(wèn)對(duì)應(yīng)的隊(duì)列
destination = session.createTopic("test-topic");
//創(chuàng)建消息生產(chǎn)者
producer = session.createProducer(destination);
//創(chuàng)建消息對(duì)象
message = session.createTextMessage(msgText);
//發(fā)送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
//回收消息發(fā)送者資源
if (producer != null){
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null){
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
4.3 創(chuàng)建消息消費(fèi)者
創(chuàng)建三份
public class HelloWorldConsumerTopic1 implements Runnable{
public void receiveHelloWorldActiveMQ() {
//定義連接工廠
ConnectionFactory connectionFactory = null;
//定義連接
Connection connection = null;
//定義會(huì)話
Session session = null;
//定義目的地
Destination destination = null;
//定義消息消費(fèi)者
MessageConsumer consumer = null;
//定義消息
Message message = null;
try {
//創(chuàng)建連接工廠
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
//創(chuàng)建連接對(duì)象
connection = connectionFactory.createConnection();
//開(kāi)啟連接
connection.start();
//創(chuàng)建會(huì)話
session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
//創(chuàng)建目的地
destination = session.createTopic("test-topic");
//創(chuàng)建消息消費(fèi)者
consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//獲取文本消息
String msg = null;
try {
msg = ((TextMessage) message).getText();
} catch (JMSException e) {
e.printStackTrace();
}
System.out.println("從ActiveMQ中獲取的文本信息----topic1:" + msg);
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void run() {
receiveHelloWorldActiveMQ();
}
}
4.4 測(cè)試
public class Test {
public static void main(String[] args) {
/*HelloWorldProducer producer = new HelloWorldProducer();
producer.sendHelloWorldActiveMQ("HelloWorld");*/
/*HelloWorldProducer2 producer2 = new HelloWorldProducer2();
producer2.sendHelloWorldActiveMQ(new User(1, "tom", 21));*/
/*HelloWorldProducer3 producer3 = new HelloWorldProducer3();
producer3.sendHelloWorldActiveMQ(new User(2,"alice",19));*/
HelloWorldProducerTopic topic = new HelloWorldProducerTopic();
topic.sendHelloWorldActiveMQ("Hello Topic");
}
}
public class Test {
public static void main(String[] args) {
/*HelloWorldConsumer consumer = new HelloWorldConsumer();
consumer.receiveHelloWorldActiveMQ();*/
/*HelloWorldConsumer2 consumer2 = new HelloWorldConsumer2();
consumer2.receiveHelloWorldActiveMQ();*/
/*HelloWorldConsumer3 consumer3 = new HelloWorldConsumer3();
consumer3.receiveHelloWorldActiveMQ();*/
HelloWorldConsumerTopic1 topic1 = new HelloWorldConsumerTopic1();
Thread thread1 = new Thread(topic1);
thread1.start();
HelloWorldConsumerTopic2 topic2 = new HelloWorldConsumerTopic2();
Thread thread2 = new Thread(topic2);
thread2.start();
HelloWorldConsumerTopic3 topic3 = new HelloWorldConsumerTopic3();
Thread thread3 = new Thread(topic3);
thread3.start();
}
}
七更卒、Spring整合ActiveMQ
1.創(chuàng)建項(xiàng)目
創(chuàng)建spring-activemq-producer
1.1 添加坐標(biāo)
<dependencies>
<!--activeMQ-->
<!--ActiveMQ客戶端完整jar包依賴-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-context</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--ActiveMQ和Spring整合配置文件標(biāo)簽處理jar包依賴-->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
</dependency>
<!--Spring JMS插件相關(guān)的jar包依賴-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<!--Active Pool-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-pool</artifactId>
</dependency>
<!-- 單元測(cè)試 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<!-- 日志處理 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<!-- spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</dependency>
<!--javaee-->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jsp-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
</dependency>
</dependencies>
1.2 整合ActiveMQ
- web.xml
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
version="2.5">
<servlet>
<servlet-name>springmvc</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring-*.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>springmvc</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<filter>
<filter-name>encodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>encodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
</web-app>
- spring-mvc.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!--掃包-->
<context:component-scan base-package="com.hxx.web.controller"/>
<!--添加注解驅(qū)動(dòng)-->
<mvc:annotation-driven/>
<!--配置視圖解析器-->
<bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
<property name="prefix" value="/WEB-INF/jsp/"/>
<property name="suffix" value=".jsp"/>
</bean>
<!--放行靜態(tài)資源-->
</beans>
- spring-service
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!--加載資源文件-->
<context:property-placeholder location="classpath:resource.properties"/>
<!--掃描bean對(duì)象-->
<context:component-scan base-package="com.hxx.service.impl"/>
</beans>
- spring-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!--創(chuàng)建一個(gè)連接工廠等孵,連接ActiveMQ,ActiveMQConnectionFactory,需要依賴ActiveMQ提供的amq標(biāo)簽-->
<!--amq:connectionFactory是bean的子標(biāo)簽蹂空,會(huì)在Spring容器中創(chuàng)建一個(gè)bean對(duì)象俯萌,
可以為對(duì)象名,類似:<bean id="" class="ActiveMQConnectionFactory"/>-->
<amq:connectionFactory brokerURL="tcp://192.168.254.128:61616"
userName="admin" password="admin" id="amqConnectionFactory"/>
<!--spring管理JMS相關(guān)代碼的時(shí)候上枕,必須依賴jms標(biāo)簽庫(kù)咐熙,Spring-jms提供標(biāo)簽庫(kù)-->
<!--
定義Spring-jms中的連接工廠對(duì)象
CachingConnectionFactory - spring框架提供的連接工廠對(duì)象,不能真正訪問(wèn)MOM容器辨萍,
類似一個(gè)工廠的代理對(duì)象棋恼,需要提供一個(gè)真實(shí)工廠,實(shí)現(xiàn)MOM容器的連接訪問(wèn)
-->
<bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
<property name="connectionFactory" ref="amqConnectionFactory"/>
<property name="maxConnections" value="10"/>
</bean>
<!--配置有緩存的ConnectionFactory锈玉,Session的緩存大小可定制-->
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"/>
<property name="sessionCacheSize" value="3"/>
</bean>
<!--jmsTemplate配置-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!--給定連接工廠-->
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<!--默認(rèn)目的地命名-->
<property name="defaultDestinationName" value="test-spring"/>
</bean>
</beans>
2.創(chuàng)建項(xiàng)目
spring-activemq-consumer
2.1 添加依賴
<dependencies>
<!--activeMQ-->
<!--ActiveMQ客戶端完整jar包依賴-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<exclusions>
<exclusion>
<artifactId>spring-context</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--ActiveMQ和Spring整合配置文件標(biāo)簽處理jar包依賴-->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
</dependency>
<!--Spring JMS插件相關(guān)的jar包依賴-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<!--Active Pool-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-pool</artifactId>
</dependency>
<!-- 單元測(cè)試 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<!-- 日志處理 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<!-- spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
</dependency>
</dependencies>
2.2 整合ActiveMQ
- spring-service.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!--掃描bean對(duì)象-->
<context:component-scan base-package="com.hxx.service,com.hxx.listener"/>
</beans>
- spring-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
<!--創(chuàng)建一個(gè)連接工廠爪飘,連接ActiveMQ络断,ActiveMQConnectionFactory,需要依賴ActiveMQ提供的amq標(biāo)簽-->
<!--amq:connectionFactory是bean的子標(biāo)簽统锤,會(huì)在Spring容器中創(chuàng)建一個(gè)bean對(duì)象桩蓉,
可以為對(duì)象名扔水,類似:<bean id="" class="ActiveMQConnectionFactory"/>-->
<amq:connectionFactory brokerURL="tcp://192.168.254.128:61616"
userName="admin" password="admin" id="amqConnectionFactory"/>
<!--spring管理JMS相關(guān)代碼的時(shí)候遭商,必須依賴jms標(biāo)簽庫(kù)攘残,Spring-jms提供標(biāo)簽庫(kù)-->
<!--
定義Spring-jms中的連接工廠對(duì)象
CachingConnectionFactory - spring框架提供的連接工廠對(duì)象插佛,不能真正訪問(wèn)MOM容器产弹,
類似一個(gè)工廠的代理對(duì)象,需要提供一個(gè)真實(shí)工廠床估,實(shí)現(xiàn)MOM容器的連接訪問(wèn)
-->
<!--配置有緩存的ConnectionFactory含滴,Session的緩存大小可定制-->
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"/>
<property name="sessionCacheSize" value="3"/>
</bean>
<!--注冊(cè)監(jiān)聽(tīng)器-->
<!--
開(kāi)始注冊(cè)監(jiān)聽(tīng)
需要的參數(shù)有:
acknowledge:消息確認(rèn)機(jī)制
container-type:simple|default
simple:SimpleMessageListenerContainer最簡(jiǎn)單的消息監(jiān)聽(tīng)器容器,只能處理固定數(shù)量的JMS會(huì)話
default:DefaultMessageListenerContainer是一種用于異步消息監(jiān)聽(tīng)的管理類丐巫,且支持事務(wù)
destination-type:目的地類型蛙吏,使用隊(duì)列作為目的地,
connection-factory:連接工廠鞋吉,spring-jms使用的工廠鸦做,必須是spring自主創(chuàng)建的
不能使用第三方工具創(chuàng)建工程,如:ActiveMQConnectionFactory
-->
<jms:listener-container acknowledge="auto" container-type="default"
destination-type="queue" connection-factory="cachingConnectionFactory">
<!--
在監(jiān)聽(tīng)器容器中注冊(cè)某監(jiān)聽(tīng)對(duì)象谓着,
destination - 設(shè)置目的地命名
ref - 指定監(jiān)聽(tīng)器對(duì)象
-->
<jms:listener destination="test-spring" ref="myListener"/>
</jms:listener-container>
</beans>
- 創(chuàng)建MyMessageListener
@Component(value = "myListener")
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
}
}
- 測(cè)試
public class TestActiveMQ {
public static void main(String[] args) throws IOException {
ClassPathXmlApplicationContext ac = new ClassPathXmlApplicationContext(new String[]{"classpath:spring-jms.xml"
,"classpath:spring-service.xml"});
ac.start();
System.out.println("spring容器啟動(dòng)");
System.in.read();
}
}
3.測(cè)試整合
需求:
1.在Producer中創(chuàng)建User類
2.將User對(duì)象傳遞到ActiveMQ中
3.在Consumer中獲取User對(duì)象并在控制臺(tái)打印
3.1 Producer發(fā)送消息
@Service
public class UserServiceImpl implements UserService {
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void addUser(final User user) {
jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
//發(fā)送消息
return session.createObjectMessage(user);
}
});
}
}
發(fā)送成功
3.2 Consumer接收消息
- userServiceImpl.java
@Service
public class UserServiceImpl implements UserService {
@Override
public void showUser(User user) {
System.out.println(user);
}
}
- MyMessageListener.java
@Component(value = "myListener")
public class MyMessageListener implements MessageListener {
@Autowired
private UserService userService;
@Override
public void onMessage(Message message) {
Serializable obj = null;
try {
obj = ((ObjectMessage) message).getObject();
} catch (JMSException e) {
e.printStackTrace();
}
User user = (User) obj;
userService.showUser(user);
}
}