配置
<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<!-- 配置掃描路徑 -->
<context:component-scan base-package="cn.enjoyedu">
<context:exclude-filter type="annotation"
expression="org.springframework.stereotype.Controller"/>
</context:component-scan>
<!-- ActiveMQ 連接工廠 -->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616" userName="" password=""/>
<!-- Spring Caching連接工廠 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="sessionCacheSize" value="100"></property>
</bean>
<!-- Spring JmsTemplate 的消息生產(chǎn)者 start-->
<!-- 定義JmsTemplate的Queue類型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"></constructor-arg>
<!-- 隊列模式-->
<property name="pubSubDomain" value="false"></property>
</bean>
<!-- 定義JmsTemplate的Topic類型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"></constructor-arg>
<!-- 發(fā)布訂閱模式-->
<property name="pubSubDomain" value="true"></property>
</bean>
<!--Spring JmsTemplate 的消息生產(chǎn)者 end-->
<!--接收消費者應(yīng)答的監(jiān)聽器-->
<jms:listener-container destination-type="queue" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="tempqueue" ref="getResponse"></jms:listener>
</jms:listener-container>
<!-- 消息消費者 start-->
<!-- 定義Topic監(jiān)聽器 -->
<jms:listener-container destination-type="topic" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener>
<jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener>
</jms:listener-container>
<!-- 定義Queue監(jiān)聽器 -->
<jms:listener-container destination-type="queue" container-type="default"
connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener>
<jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener>
</jms:listener-container>
<!-- 消息消費者 end -->
</beans>
生產(chǎn)者 topic
@Component("topicSender")
public class TopicSender {
@Autowired
@Qualifier("jmsTopicTemplate")
private JmsTemplate jmsTemplate;
public void send(String queueName, final String message) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
return textMessage;
}
});
}
}
生產(chǎn)者 queue
@Component("queueSender")
public class QueueSender {
@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;
@Autowired
private GetResponse getResponse;
//json
public void send(String queueName, final String message) {
jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message msg = session.createTextMessage(message);
//配置,告訴消費者如何應(yīng)答
Destination tempDst = session.createTemporaryQueue();
MessageConsumer responseConsumer = session.createConsumer(tempDst);
responseConsumer.setMessageListener(getResponse);
msg.setJMSReplyTo(tempDst);
String uid = System.currentTimeMillis()+"";
msg.setJMSCorrelationID(uid);
return msg;
}
});
//發(fā)送MapMessage
/* jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage map = session.createMapMessage();
map.setString("id", "10000");
map.setString("name", "享學(xué)學(xué)員");
return map;
}
});*/
//發(fā)送ObjectMessage萧吠,被發(fā)送的實體類必須實現(xiàn)Serializable 接口
/* jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
User user = new User(10000,"享學(xué)學(xué)員");
ObjectMessage objectMessage
= session.createObjectMessage(user);
return objectMessage;
}
});*/
//發(fā)送BytesMessage
//protobuf,kyro,messgepack
/* jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes("BytesMessage類型消息".getBytes());
return bytesMessage;
}
});*/
//發(fā)送StreamMessage
/* jmsTemplate.send(queueName, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("享學(xué)學(xué)員");
streamMessage.writeInt(10000);
//streamMessage.writeString(age);
return streamMessage;
}
});*/
}
}
接受應(yīng)答
@Component
public class GetResponse implements MessageListener {
public void onMessage(Message message) {
String textMsg = null;
try {
textMsg = ((TextMessage) message).getText();
System.out.println("GetResponse accept msg : " + textMsg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消費者Queue應(yīng)答
@Component
public class QueueReceiver1 implements MessageListener {
@Autowired
private ReplyTo replyTo;
public void onMessage(Message message) {
try {
String textMsg = ((TextMessage) message).getText();
System.out.println("QueueReceiver1 accept msg : " + textMsg);
// do business work;
replyTo.send(textMsg,message);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
@Component
public class QueueReceiver2 implements MessageListener {
public void onMessage(Message message) {
try {
// 接收Text消息
if (message instanceof TextMessage) {
String textMsg = ((TextMessage) message).getText();
System.out.println("QueueReceiver2 accept msg : " + textMsg);
}
// 接收Map消息
if (message instanceof MapMessage) {
MapMessage mm = (MapMessage) message;
System.out.println("獲取 MapMessage: name:" + mm.getString("name")
+ " msg:" + mm.getString("msg"));
}
/* // 接收Object消息
if (message instanceof ObjectMessage) {
ObjectMessage objectMessage = (ObjectMessage) message;
User user = (User) objectMessage.getObject();
System.out.println("獲取 ObjectMessage: "+user);
}*/
// 接收bytes消息
/* if (message instanceof BytesMessage) {
byte[] b = new byte[1024];
int len = -1;
BytesMessage bm = (BytesMessage) message;
while ((len = bm.readBytes(b)) != -1) {
System.out.println(new String(b, 0, len));
}
}*/
/* // 接收Stream消息
if (message instanceof StreamMessage) {
StreamMessage streamMessage = (StreamMessage) message;
System.out.println(streamMessage.readString());
System.out.println(streamMessage.readInt());
}*/
} catch (JMSException e) {
e.printStackTrace();
}
}
}
@Component
public class ReplyTo {
@Autowired
@Qualifier("jmsConsumerQueueTemplate")
private JmsTemplate jmsTemplate;
public void send(final String consumerMsg, Message producerMessage)
throws JMSException {
jmsTemplate.send(producerMessage.getJMSReplyTo(),
new MessageCreator() {
public Message createMessage(Session session)
throws JMSException {
Message msg
= session.createTextMessage("ReplyTo " + consumerMsg);
return msg;
}
});
}
}
消費者topic
@Component
public class TopicReceiver1 implements MessageListener {
public void onMessage(Message message) {
try {
System.out.println(((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}