ActiveMQ簡(jiǎn)介
ActiveMQ 是Apache出品蜗元,最流行的掌敬,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)芯杀,盡管JMS規(guī)范出臺(tái)已經(jīng)是很久的事情了,但是JMS在當(dāng)今的J2EE應(yīng)用中間仍然扮演著特殊的地位。
ActiveMQ特性
- 多語言和協(xié)議編寫客戶端揭厚。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP却特。應(yīng)用協(xié)議:
OpenWire,Stomp REST,WS Notification,XMPP,AMQP。 - 完全支持JMS1.1和J2EE 1.4規(guī)范(持久化筛圆、XA消息裂明,事物)。
- 對(duì)Spring的支持太援,ActiveMQ可以很容易嵌套到使用Spring的系統(tǒng)里面去闽晦,而且也支持Spring2.0的特征。
- 通過了常見的J2EE服務(wù)器(如Geronimo,JBoss 4,GlassFish,WebLogic)的測(cè)試,其中通過JCA 1.5提岔。
resource adaptors的配置仙蛉,可以讓ActiveMQ可以自動(dòng)的部署到任何兼容J2EE 1.4 商業(yè)服務(wù)器上。 - 支持多種傳送協(xié)議: in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA碱蒙。
- 支持通過JDBC和journal提供高速的消息持久化捅儒。
- 從設(shè)計(jì)上保證了高性能的集群,客戶端-服務(wù)器振亮,點(diǎn)對(duì)點(diǎn)巧还。
- 支持Ajax。
- 支持和Axis的整合坊秸。
- 可以很容易的調(diào)用內(nèi)嵌JMS provider麸祷,進(jìn)行測(cè)試。
什么情況下使用ActiveMQ?
- 多個(gè)項(xiàng)目之間集成
跨平臺(tái)
多語言
多項(xiàng)目 - 降低系統(tǒng)間模塊的耦合度褒搔,解耦
軟件擴(kuò)展性 - 系統(tǒng)前后端隔離
前后端隔離阶牍,屏蔽高安全區(qū)
ActiveMQ安裝
在官方下載ActiveMQ: http://activemq.apache.org/download.html
這次選擇的是Unix版本,解壓安裝包星瘾,啟動(dòng)ActiveMQ:
root@ubuntu:~/apache-activemq-5.15.3/bin# activemq start
訪問ActiveMQ監(jiān)控界面:
SpringBoot整合ActiveMQ
- pom文件添加依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
- 在application.yml中加入activemq的配置:
spring:
activemq:
broker-url: tcp://192.168.0.197:61616
user: admin
password: admin
pool:
enabled: false
- 創(chuàng)建一個(gè)消息生產(chǎn)者:
@Service
public class JMSProducer {
@Autowired
private JmsTemplate jmsTemplate;
// 發(fā)送消息走孽,destination是發(fā)送到的隊(duì)列,message是待發(fā)送的消息
public void sendMessage(Destination destination, final String message){
jmsTemplate.convertAndSend(destination, message);
}
}
- 創(chuàng)建一個(gè)消息消費(fèi)者:
@Component
public class JMSConsumer {
private final static Logger logger = LoggerFactory.getLogger(JMSConsumer.class);
@JmsListener(destination = "springboot.queue.test")
public void receiveQueue(String msg) {
logger.info("接收到消息:{}",msg);
}
}
- 測(cè)試:
@Autowired
private JMSProducer jmsProducer;
@Test
public void testJms() {
Destination destination = new ActiveMQQueue("springboot.queue.test");
for (int i=0;i<10;i++) {
jmsProducer.sendMessage(destination,"hello,world!" + i);
}
}
注:后面多加了兩個(gè)消費(fèi)者琳状。
可以看到磕瓷,在ActiveMQ監(jiān)控界面上,已經(jīng)存在前面定義的隊(duì)列“springboot.queue.test”念逞。
支持同時(shí)發(fā)送和接收queue/topic
- 新建一個(gè)JMS的配置類:
@Configuration
public class JmsConfig {
public final static String TOPIC = "springboot.topic.test";
public final static String QUEUE = "springboot.queue.test";
@Bean
public Queue queue() {
return new ActiveMQQueue(QUEUE);
}
@Bean
public Topic topic() {
return new ActiveMQTopic(TOPIC);
}
// topic模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
// queue模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
}
- 新增消息消費(fèi)者JMSConsumer3 困食,指定ConnectionFactory:
@Component
public class JMSConsumer3 {
private final static Logger logger = LoggerFactory.getLogger(JMSConsumer3.class);
@JmsListener(destination = JmsConfig.TOPIC,containerFactory = "jmsListenerContainerTopic")
public void onTopicMessage(String msg) {
logger.info("接收到topic消息:{}",msg);
}
@JmsListener(destination = JmsConfig.QUEUE,containerFactory = "jmsListenerContainerQueue")
public void onQueueMessage(String msg) {
logger.info("接收到queue消息:{}",msg);
}
}
- 測(cè)試:
@Autowired
private Topic topic;
@Autowired
private Queue queue;
@Test
public void testJms2() {
for (int i=0;i<10;i++) {
jmsProducer.sendMessage(queue,"queue,world!" + i);
jmsProducer.sendMessage(topic, "topic,world!" + i);
}
}