一、消息隊(duì)列(MQ)概述
消息隊(duì)列(Message Queue)寇损,是分布式系統(tǒng)中重要的組件凸郑,其通用的使用場(chǎng)景可以簡(jiǎn)單地描述為:
當(dāng)不需要立即獲得結(jié)果,但是并發(fā)量又需要進(jìn)行控制的時(shí)候矛市,差不多就是需要使用消息隊(duì)列的時(shí)候芙沥。
消息隊(duì)列主要解決了應(yīng)用耦合、異步處理尘盼、流量削鋒等問(wèn)題憨愉。
當(dāng)前使用較多的消息隊(duì)列有RabbitMQ、RocketMQ卿捎、ActiveMQ配紫、Kafka、ZeroMQ午阵、MetaMq等躺孝,而部分?jǐn)?shù)據(jù)庫(kù)如Redis、Mysql以及phxsql也可實(shí)現(xiàn)消息隊(duì)列的功能底桂。
二 植袍、ActiveMQ
有那么多得消息隊(duì)列,各有各的有優(yōu)缺點(diǎn)籽懦,今天主要來(lái)聊聊ActiveMQ
ActiveMQ是由Apache出品于个,ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。它非衬核常快速厅篓,支持多種語(yǔ)言的客戶端和協(xié)議秀存,而且可以非常容易的嵌入到企業(yè)的應(yīng)用環(huán)境中,并有許多高級(jí)功能羽氮。
主要特性:
服從 JMS 規(guī)范:JMS 規(guī)范提供了良好的標(biāo)準(zhǔn)和保證或链,包括:同步或異步的消息分發(fā),一次和僅一次的消息分發(fā)档押,消息接收和訂閱等等澳盐。遵從 JMS 規(guī)范的好處在于,不論使用什么 JMS 實(shí)現(xiàn)提供者令宿,這些基礎(chǔ)特性都是可用的叼耙;
連接性:ActiveMQ 提供了廣泛的連接選項(xiàng),支持的協(xié)議有:HTTP/S粒没,IP 多播旬蟋,SSL,STOMP革娄,TCP,UDP冕碟,XMPP等等拦惋。對(duì)眾多協(xié)議的支持讓 ActiveMQ 擁有了很好的靈活性。
支持的協(xié)議種類(lèi)多:OpenWire安寺、STOMP厕妖、REST、XMPP挑庶、AMQP 言秸;
持久化插件和安全插件:ActiveMQ 提供了多種持久化選擇。而且迎捺,ActiveMQ 的安全性也可以完全依據(jù)用戶需求進(jìn)行自定義鑒權(quán)和授權(quán)举畸;
支持的客戶端語(yǔ)言種類(lèi)多:除了 Java 之外,還有:C/C++凳枝,.NET抄沮,Perl,PHP岖瑰,Python叛买,Ruby;
代理集群:多個(gè) ActiveMQ 代理可以組成一個(gè)集群來(lái)提供服務(wù)蹋订;
異常簡(jiǎn)單的管理:ActiveMQ 是以開(kāi)發(fā)者思維被設(shè)計(jì)的率挣。所以,它并不需要專(zhuān)門(mén)的管理員露戒,因?yàn)樗峁┝撕?jiǎn)單又使用的管理特性椒功。有很多中方法可以監(jiān)控 ActiveMQ 不同層面的數(shù)據(jù)捶箱,包括使用在 JConsole 或者 ActiveMQ 的Web Console 中使用 JMX,通過(guò)處理 JMX 的告警消息蛾茉,通過(guò)使用命令行腳本讼呢,甚至可以通過(guò)監(jiān)控各種類(lèi)型的日志。
使用ActiveMQ需要:
Java JDK
ActiveMQ安裝包
ActiveMQ可以運(yùn)行在Java語(yǔ)言所支持的平臺(tái)之上谦炬。
優(yōu)點(diǎn):
跨平臺(tái)(JAVA編寫(xiě)與平臺(tái)無(wú)關(guān)有悦屏,ActiveMQ幾乎可以運(yùn)行在任何的JVM上)
可以用JDBC:可以將數(shù)據(jù)持久化到數(shù)據(jù)庫(kù)。雖然使用JDBC會(huì)降低ActiveMQ的性能键思,但是數(shù)據(jù)庫(kù)一直都是開(kāi)發(fā)人員最熟悉的存儲(chǔ)介質(zhì)础爬。將消息存到數(shù)據(jù)庫(kù),看得見(jiàn)摸得著吼鳞。而且公司有專(zhuān)門(mén)的DBA去對(duì)數(shù)據(jù)庫(kù)進(jìn)行調(diào)優(yōu)看蚜,主從分離;
支持JMS :支持JMS的統(tǒng)一接口;
支持自動(dòng)重連赔桌;
有安全機(jī)制:支持基于shiro供炎,jaas等多種安全配置機(jī)制,可以對(duì)Queue/Topic進(jìn)行認(rèn)證和授權(quán)疾党。
監(jiān)控完善:擁有完善的監(jiān)控音诫,包括Web Console,JMX雪位,Shell命令行竭钝,Jolokia的REST API;
界面友善:提供的Web Console可以滿足大部分情況雹洗,還有很多第三方的組件可以使用香罐,如hawtio;
缺點(diǎn):
社區(qū)活躍度不及RabbitMQ高时肿;
根據(jù)其他用戶反饋庇茫,會(huì)出莫名其妙的問(wèn)題,會(huì)丟失消息螃成;
目前重心放到activemq6.0產(chǎn)品-apollo港令,對(duì)5.x的維護(hù)較少;
不適合用于上千個(gè)隊(duì)列的應(yīng)用場(chǎng)景锈颗;
三顷霹、具體實(shí)例
首先我們?nèi)ス倬W(wǎng)下載MQ,官網(wǎng)地址:http://activemq.apache.org/,我目前下載的是5.8.0版本的,下載好后直接雙擊activemq.bat啟動(dòng)
啟動(dòng)成功后击吱,輸入默認(rèn)的訪問(wèn)地址:http://localhost:8161/admin/index.jsp
用戶名:admin? 密碼:admin
用戶名密碼設(shè)置具體配置文件在conf下jetty-realm.properties里面淋淀,jetty.xml文件修改端口號(hào)。
?然后點(diǎn)擊隊(duì)列(Queues)覆醇,輸入隊(duì)列名稱(chēng)(Queue Name)FirstQueue朵纷,然后點(diǎn)創(chuàng)建(Create)
下一步我們創(chuàng)建一個(gè)maven 項(xiàng)目炭臭,在pom.xml引入jar
<dependency>?
??<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
? <version>5.8.0</version>
</dependency>
新建文件 Procuder
package Mq.ActivityMq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
*
* @author 生產(chǎn)者
*
*/
public class Procuder {
? ? //默認(rèn)連接用戶名
? ? private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
? ? //默認(rèn)連接密碼
? ? private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
? ? //默認(rèn)連接地址
? ? private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
? ? public static void main(String[] args) {
? ? ? ? //連接工廠
? ? ? ? ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
? ? ? ? try {
? ? ? ? ? ? //連接
? ? ? ? ? ? Connection connection = connectionFactory.createConnection();
? ? ? ? ? ? //啟動(dòng)連接
? ? ? ? ? ? connection.start();
? ? ? ? ? ? //創(chuàng)建session
? ? ? ? ? ? Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
? ? ? ? ? ? //消息目的地
? ? ? ? ? ? Destination destination = session.createQueue("FirstQueue");
? ? ? ? ? ? //消息生產(chǎn)者
? ? ? ? ? ? MessageProducer producer = session.createProducer(destination);
? ? ? ? ? ? //設(shè)置不持久化,此處學(xué)習(xí)袍辞,實(shí)際根據(jù)項(xiàng)目決定
? ? ? ? ? ? producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
? ? ? ? ? ? //發(fā)送消息
? ? ? ? ? ? for (int i = 0; i < 5; i++) {
? ? ? ? ? ? ? ? //創(chuàng)建一條文本消息
? ? ? ? ? ? ? ? TextMessage message = session.createTextMessage("你好ActiveMQ: 這是第 " + i + " 條消息");
? ? ? ? ? ? ? ? //生產(chǎn)者發(fā)送消息
? ? ? ? ? ? ? ? producer.send(message);
? ? ? ? ? ? }
? ? ? ? ? ? session.commit();
? ? ? ? ? ? session.close();
? ? ? ? ? ? connection.close();
? ? ? ? } catch (JMSException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
}
新建文件Consumer
package Mq.ActivityMq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
*
* @author 消費(fèi)者
*
*/
public class Consumer {
? ? //默認(rèn)連接用戶名
? ? private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
? ? //默認(rèn)連接密碼
? ? private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
? ? //默認(rèn)連接地址
? ? private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
? ? public static void main(String[] args) {
? ? ? ? //連接工廠
? ? ? ? ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
? ? ? ? try {
? ? ? ? ? ? //連接
? ? ? ? ? ? Connection connection = connectionFactory.createConnection();
? ? ? ? ? ? //啟動(dòng)連接
? ? ? ? ? ? connection.start();
? ? ? ? ? ? //創(chuàng)建session
? ? ? ? ? ? Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
? ? ? ? ? ? //消息目的地
? ? ? ? ? ? Destination destination = session.createQueue("FirstQueue");
? ? ? ? ? ? //消息消費(fèi)者
? ? ? ? ? ? MessageConsumer consumer = session.createConsumer(destination);
? ? ? ? ? ? while (true) {
? ? ? ? ? ? ? ? TextMessage message = (TextMessage) consumer.receive();
? ? ? ? ? ? ? ? if (message != null) {
? ? ? ? ? ? ? ? ? ? System.out.println("接收到消息: " + message.getText());
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? session.close();
? ? ? ? ? ? connection.close();
? ? ? ? } catch (JMSException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
}
測(cè)試結(jié)果
先運(yùn)行Producer(生產(chǎn)者)鞋仍,再運(yùn)行Consumer(消費(fèi)者)
這是我們查看MQ視圖
我們看到有0條等待消費(fèi)的消息,有1條消費(fèi)者搅吁,生產(chǎn)者發(fā)布了5條消息威创,消費(fèi)者消費(fèi)了5條消息。名詞解釋如下:
Messages Enqueued:表示生產(chǎn)了多少條消息谎懦,記做P
Messages Dequeued:表示消費(fèi)了多少條消息肚豺,記做C
Number Of Consumers:表示在該隊(duì)列上還有多少消費(fèi)者在等待接受消息
Number Of Pending Messages:表示還有多少條消息沒(méi)有被消費(fèi),實(shí)際上是表示消息的積壓程度界拦,就是P-C
四吸申、總結(jié)
如上所示只是ActiveMQ簡(jiǎn)單實(shí)例,實(shí)際的業(yè)務(wù)場(chǎng)景需要配置更多的參數(shù)享甸,比如消費(fèi)者消費(fèi)完畢后要通知生成者對(duì)消息進(jìn)行銷(xiāo)毀.....等等截碴,掌握了基本的原理后,其他都是結(jié)合具體業(yè)務(wù)場(chǎng)景水到渠成的事情蛉威。