常用消息中間件有activeMQ,RabbitMQ,Kafka穴翩,ZeroMQ氨菇,MetaMQ,RocketMQ响牛,本文這里只介紹第一個(gè)玷禽。
一、linux上安裝
下載tar.gz包 解壓tar -zxvf
在MQ目錄下bin/linux-x86-64下 ./activemq start ./activemq stop
http://IP:8161/admin
默認(rèn)用戶名密碼 admin/admin
三呀打、發(fā)布和消費(fèi)消息(這里以P2P為例)
1矢赁,寫一個(gè)發(fā)消息服務(wù)如下:
@Service("notifyService")
public class NotifyServiceImpl implements INotifyService{
private static final Logger logger = LoggerFactory.getLogger(NotifyServiceImpl.class);
@Resource(name="jmsTemplate")
private JmsTemplate jmsTemplate;
@Resource(name = "awardMsgDestinationQueue")
private Destination awardMsgDestinationQueue;
@Override
public void sendAwardMsg(final String msg) {
try {
Destination destination = this.awardMsgDestinationQueue;
jmsTemplate.send(destination,new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage();
textMessage.setText(msg);
return textMessage;
}
});
} catch (Exception ex) {
ex.printStackTrace();
logger.error("向默認(rèn)隊(duì)列發(fā)送消息失敗");
}
}
}
2,寫一個(gè)消費(fèi)監(jiān)聽器
public class AwardMsgQueueListener implements MessageListener{
@Override
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
msg = tm.getText();
} catch (JMSException e1) {
logger.error("從消息隊(duì)列獲取消息出現(xiàn)異常贬丛,請檢查");
e1.printStackTrace();
return;
}
......//根據(jù)實(shí)際業(yè)務(wù)拿到msg并處理
}
}
由上可以看出撩银,我們發(fā)布消息注入了jms模板和queue,這就需要我們先進(jìn)行配置
二豺憔、整合spring
基礎(chǔ)配置文件如下:
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL"><value>tcp://192.168.28.2:61616?</value></property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!--使用緩存可以提升效率-->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jmsFactory"/>
<property name="sessionCacheSize" value="100"/>
</bean>
<!-- 配置JMS模板(Queue)额获,Spring提供的JMS工具類够庙,它發(fā)送、接收消息抄邀。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
<!-- 定義推送中獎(jiǎng)消息隊(duì)列(Queue) -->
<bean id="awardMsgDestinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="awardMsgDestinationQueue"/>
</bean>
<!-- 配置監(jiān)聽者(Queue) -->
<bean id="awardMsgQueueListener" class="com.latech.notify.consumer.AwardMsgQueueListener" />
<!-- 配置多個(gè)消息監(jiān)聽容器耘眨,配置連接工廠,監(jiān)聽的目標(biāo)是defaultDestinationQueue境肾,監(jiān)聽器是上面定義的監(jiān)聽器 -->
<bean id="queueListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="awardMsgDestinationQueue" />
<property name="messageListener" ref="awardMsgQueueListener" />
</bean>