Queue模式
生產(chǎn)者
public class MyProducer {
private static final String url = "tcp://127.0.0.1:61616";
private static final String queueName = "queue-demo";
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
//創(chuàng)建會話
// 1.是否在事務(wù)中處理 2.連接的應(yīng)答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//自動應(yīng)答
//創(chuàng)建目的地
Destination destination = session.createQueue(queueName);
//創(chuàng)建生產(chǎn)者
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < 10; i++) {
TextMessage textMessage = session.createTextMessage("發(fā)送消息--" + i);
producer.send(textMessage);
System.out.println("發(fā)送消息--" + i);
}
//關(guān)閉連接
connection.close();
}
}
消費者
public class MyCustomer {
private static final String url = "tcp://127.0.0.1:61616";
private static final String queueName = "queue-demo";
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start();
//創(chuàng)建會話
// 1.是否在事務(wù)中處理 2.連接的應(yīng)答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//自動應(yīng)答
//創(chuàng)建目的地
Destination destination = session.createQueue(queueName);
//創(chuàng)建消費者
MessageConsumer consumer = session.createConsumer(destination);
//創(chuàng)建監(jiān)聽器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(("接收:" + textMessage.getText()));
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
topic模式
topic模式只需要在前面代碼上修改一行
Destination destination = session.createQueue(queueName);
改為
Destination destination = session.createTopic(topicName);
spring集成
common.xml
<!--開啟注解-->
<context:annotation-config/>
<!--activemq 提供的ConnectionFactory-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
</bean>
<!--spring jsm 提供的ConnectionFactory-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!--隊列目的地-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!--隊列名字-->
<constructor-arg value="queue"/>
</bean>
<!--主題目的地 發(fā)布訂閱模式-->
<bean id="targetDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic"/>
</bean>
consumer.xml
<import resource="common.xml"/>
<!--消息監(jiān)聽器-->
<bean id="consumerMessageListener" class="com.reige.jmsdemo.springtest.consumer.ConsumerMessageListener"/>
<!--配置消息監(jiān)聽容器-->
<bean id="jsmContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<!--監(jiān)聽的目的地-->
<property name="destination" ref="queueDestination"/>
<!--監(jiān)聽器-->
<property name="messageListener" ref="consumerMessageListener"/>
</bean>
producer.xml
<import resource="common.xml"/>
<!--jmsTemplate 用于發(fā)送消息-->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<bean id="producerServiceImpl" class="com.reige.jmsdemo.springtest.producer.ProducerServiceImpl"/>
ProducerService
public interface ProducerService {
void sendMessage(String message);
}
public class ProducerServiceImpl implements ProducerService {
@Autowired
private JmsTemplate jmsTemplate;
@Resource(name = "queueDestination")
private Destination destination;
@Override
public void sendMessage(final String message) {
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
return textMessage;
}
});
System.out.println(("發(fā)送消息:" + message));
}
}
ProducerService發(fā)送消息測試
public class MyProducer {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
ProducerService service = context.getBean(ProducerService.class);
for (int i = 0; i < 10; i++) {
service.sendMessage("這是第" + i + "條消息");
}
context.close();
}
}
ConsumerMessageListener .java
public class ConsumerMessageListener implements MessageListener{
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(("接收消息" + textMessage.getText()));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
接收消息測試
public class MyConsumer {
public static void main(String[] args){
ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
}
}