環(huán)境搭建
1.根據(jù)自己使用的操作系統(tǒng),去ActiveMQ官網(wǎng)下載ActiveMQ搔体。本文使用的是Windows季希,下載zip壓縮包即可褪那。
2.解壓。解壓下載后的壓縮包式塌,解壓后的目錄結(jié)構(gòu)如圖1所示博敬。
圖1中,各目錄功能說明如下峰尝。
bin:存放ActiveMQ運(yùn)行相關(guān)腳本偏窝。
conf:存放ActiveMQ配置文件。
data:存放ActiveMQ運(yùn)行時(shí)數(shù)據(jù)武学,比如日志祭往。
docs:存放ActiveMQ入門指導(dǎo)文檔。
examples:存放基于各消息傳遞協(xié)議火窒,使用ActiveMQ實(shí)現(xiàn)消息傳遞的demos硼补。
lib:存放ActiveMQ相關(guān)jar包。
其余目錄及文件見名知意熏矿,此處不贅述括勺。
3.啟動(dòng)ActiveMQ Broker。運(yùn)行bin下的activemq腳本即可曲掰。
4.訪問ActiveMQ Web UI界面疾捍。本文是http://localhost:8161,默認(rèn)username和password都是admin栏妖。登錄后乱豆,UI界面如圖2所示。
寫代碼
ActiveMQ客戶端支持多種語言吊趾,本文基于Java實(shí)現(xiàn)宛裕。其實(shí)網(wǎng)上有一大堆代碼可參考瑟啃,然而其中有太多的樣板式代碼(boilerplate code),這是我最討厭的揩尸。
1.新建maven project蛹屿,pom中加入以下依賴。
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-client -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
本文最終的項(xiàng)目結(jié)構(gòu)如圖3所示岩榆。
2.寫模板代碼错负,將JMS操作的樣板式代碼抽象到JMSTemplate中。
package com.activemq.template;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
public abstract class JMSTemplate {
private ConnectionFactory connectionFactory;
public JMSTemplate(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public ConnectionFactory getConnectionFactory() {
return this.connectionFactory;
}
/**
* To personalize your task based on JMS, Such as producing a message to the
* broker or cunsuming a message from the broker.
*
* @param session
* javax.jms.Session object
* @return
*/
public abstract void doTask(Session session) throws JMSException;
/**
* Idicates whether the client is a message consumer.
*
* @return ture if the client is a message consumer, false oherwise.
*/
public abstract boolean isConsumer();
/**
* Starts the task based on JMS.
*/
public final void startTask() throws JMSException {
Connection connection = null;
Session session = null;
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
doTask(session);
} finally {
if (!isConsumer()) {
releaseConnection(connection, session);
}
}
}
public void releaseConnection(Connection connection, Session session) throws JMSException {
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
}
}
3.再把獲取ActiveMQConnectionFactory的代碼抽象到ActiveMQUtil中勇边。
package com.activemq.template;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ActiveMQUtil {
private static final Logger logger = LoggerFactory.getLogger(ActiveMQUtil.class);
public static ActiveMQConnectionFactory getActiveMQConnectionFactory() throws IOException {
// load the ActiveMQ configerations
Properties props = new Properties();
try (InputStream inStream = ActiveMQUtil.class.getClassLoader()
.getResourceAsStream("activemqcofig.properties");) {
logger.debug("load the ActiveMQ config file:activemqcofig.properties");
props.load(inStream);
String brokerURL = props.getProperty("activemq.url");
if (null == brokerURL || 0 == brokerURL.length()) {
brokerURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
}
String username = props.getProperty("activemq.username");
if (null == username || 0 == username.length()) {
username = ActiveMQConnectionFactory.DEFAULT_USER;
}
String password = props.getProperty("activemq.password");
if (null == password || 0 == password.length()) {
password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
}
return new ActiveMQConnectionFactory(username, password, brokerURL);
}
}
}
在ActiveMQUtil中犹撒,會(huì)去加載src/main/resources下的activemqcofig.properties配置信息來獲取ActiveMQ連接工廠,該文件目前就三行粒褒,含義名副其實(shí)识颊。
activemq.url=
activemq.username=
activemq.password=
3.demo實(shí)現(xiàn)。
(1)消息生產(chǎn)者
package com.activemq.demos;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.activemq.template.JMSTemplate;
public class ActiveMQProducerTest extends JMSTemplate {
public ActiveMQProducerTest(ConnectionFactory connectionFactory) {
super(connectionFactory);
}
@Override
public void doTask(Session session) throws JMSException {
Destination destination = session.createQueue(ActiveMQNameUtil.TEST_QUEUE);
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage message = session.createTextMessage("Hello,JMS and ActiveMQ");
producer.send(message);
}
@Override
public boolean isConsumer() {
return false;
}
}
(2)消息消費(fèi)者
package com.activemq.demos;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.activemq.template.JMSTemplate;
public class ActiveMQConsumerTest extends JMSTemplate implements MessageListener {
private Logger logger = LoggerFactory.getLogger(ActiveMQConsumerTest.class);
public ActiveMQConsumerTest(ConnectionFactory connectionFactory) {
super(connectionFactory);
}
@Override
public void doTask(Session session) throws JMSException {
Destination destination = session.createQueue(ActiveMQNameUtil.TEST_QUEUE);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
}
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("received msg:" + textMessage.getText());
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
}
@Override
public boolean isConsumer() {
return true;
}
}
消息生產(chǎn)者和消費(fèi)者中使用的消息隊(duì)列信息放在這里奕坟。
package com.activemq.demos;
public class ActiveMQNameUtil {
public static final String TEST_QUEUE = "testQueue";
public static final String TEST_TOPIC = "testTopic";
}
(3)運(yùn)行demo
package com.activemq.demos;
import java.io.IOException;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.activemq.template.ActiveMQUtil;
import com.activemq.template.JMSTemplate;
public class ActiveMQTest {
private static final Logger logger = LoggerFactory.getLogger(ActiveMQTest.class);
public static void main(String[] args) {
try {
ActiveMQConnectionFactory activeMQConnectionFactory = ActiveMQUtil.getActiveMQConnectionFactory();
JMSTemplate consumer = new ActiveMQConsumerTest(activeMQConnectionFactory);
consumer.startTask();
JMSTemplate producer = new ActiveMQProducerTest(activeMQConnectionFactory);
producer.startTask();
} catch (IOException e) {
logger.error(e.getMessage(), e);
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
}
}
順利的話祥款,到此應(yīng)該可以正常發(fā)送和接收消息了。
需要注意的是月杉,本文的ActiveMQ消費(fèi)者會(huì)一直監(jiān)聽消息隊(duì)列刃跛,自己不會(huì)主動(dòng)釋放連接(面向生產(chǎn)環(huán)境),所以需要釋放consumer連接時(shí)手動(dòng)kill掉進(jìn)程即可沙合。
最后,懶得copy代碼的請(qǐng)微信我跌帐!