1.Spring和ActiveMQ整合
第一步:在taotao-manager-services中引入jar包
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
第二步:配置ActiveMQ整合spring
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以產(chǎn)生Connection的ConnectionFactory逆巍,由對(duì)應(yīng)的 JMS服務(wù)廠商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.208.40:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目標(biāo)ConnectionFactory對(duì)應(yīng)真實(shí)的可以產(chǎn)生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- 配置生產(chǎn)者 -->
<!-- Spring提供的JMS工具類帽撑,它可以進(jìn)行消息發(fā)送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個(gè)connectionFactory對(duì)應(yīng)的是我們定義的Spring提供的那個(gè)ConnectionFactory對(duì)象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--這個(gè)是隊(duì)列目的地,點(diǎn)對(duì)點(diǎn)的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--這個(gè)是主題目的地,一對(duì)多的 -->
<bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="item-change-topic" />
</bean>
</beans>
2.代碼測(cè)試
@Test
public void testQueueProducer() {
//1.讀取配置文件
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//2.從容器中獲得JMSTemplate對(duì)象
JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
//3.從容器中獲得一個(gè)Destination對(duì)象
Queue queue = (Queue)applicationContext.getBean("queueDestination");
//4.使用JMSTemplate對(duì)象發(fā)送消息,需要知道Destination
jmsTemplate.send(queue,new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage("spring activemq test");
return textMessage;
}
});
}
接收的時(shí)候我們?cè)趖aotao-search-service中接收
第一步:把Activemq相關(guān)的jar包添加到工程中(同上)
第二步:創(chuàng)建一個(gè)MessageListener的實(shí)現(xiàn)類。
**
* 接收ActiveMq消息
*/
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
//取消息內(nèi)容
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
第三步:配置spring和Activemq整合
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以產(chǎn)生Connection的ConnectionFactory,由對(duì)應(yīng)的 JMS服務(wù)廠商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.208.40:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目標(biāo)ConnectionFactory對(duì)應(yīng)真實(shí)的可以產(chǎn)生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!--這個(gè)是隊(duì)列目的地抢肛,點(diǎn)對(duì)點(diǎn)的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--這個(gè)是主題目的地,一對(duì)多的 -->
<bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="item-change-topic" />
</bean>
<!-- 接收消息 -->
<!-- 配置監(jiān)聽(tīng)器 -->
<bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" />
<!-- 消息監(jiān)聽(tīng)容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>
如何測(cè)試碳柱?我們已經(jīng)在配置文件中配置了監(jiān)聽(tīng)器相關(guān)的bean捡絮,直接加載就好了
@Test
public void testQueueConsumer() throws Exception {
//初始化spring容器
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
//等待
System.in.read();
}
3.添加商品同步到索引庫(kù)
3.1 生產(chǎn)者
這里我們用的topic一對(duì)多的消息傳送形式,由于添加商品涉及到同步緩存莲镣,同步索引庫(kù)福稳,添加靜態(tài)頁(yè)面等操作。也就是一個(gè)消息會(huì)有多個(gè)消費(fèi)者剥悟。下面是配置
然后我們找到添加商品的方法灵寺,在添加完商品后曼库,發(fā)送消息,這里需要考慮一個(gè)問(wèn)題略板,那就是消息的內(nèi)容應(yīng)該是什么毁枯?既然是添加商品,消費(fèi)者肯定是要知道添加的商品是哪個(gè)商品叮称,同時(shí)本著簡(jiǎn)單的原則种玛,我們只需要傳新增商品的ID即可,如下圖所示
完整的代碼如下
@Service
public class ItemServiceImpl implements ItemService {
@Autowired
private TbItemMapper tbItemMapper;
@Autowired
private TbItemDescMapper itemDescMapper;
@Autowired
private JmsTemplate jmsTemplate;
@Resource(name = "itemAddTopic")
private Destination itemAddTopic;
@Override
public TbItem getItemById(long itemId) {
TbItem item = tbItemMapper.selectByPrimaryKey(itemId);
return item;
}
@Override
public EasyUIDataGridResult getItemList(int page, int rows) {
//1.在執(zhí)行查詢之前配置分頁(yè)條件瓤檐。使用PageHelper的靜態(tài)方法
PageHelper.startPage(page,rows);
//2.執(zhí)行查詢
TbItemExample tbItemExample = new TbItemExample();
List<TbItem> list = tbItemMapper.selectByExample(tbItemExample);
//3.創(chuàng)建PageInfo對(duì)象
PageInfo<TbItem> pageInfo = new PageInfo<>(list);
EasyUIDataGridResult result = new EasyUIDataGridResult();
//設(shè)置數(shù)目
result.setTotal(pageInfo.getTotal());
//設(shè)置返回的數(shù)據(jù)
result.setRows(list);
return result;
}
@Override
public TaotaoResult addItem(TbItem item, String desc) {
final long id = IDUtils.genItemId();
item.setId(id);
item.setCreated(new Date());
item.setUpdated(new Date());
item.setStatus((byte) 1);
tbItemMapper.insert(item);
TbItemDesc itemDesc = new TbItemDesc();
itemDesc.setItemDesc(desc);
itemDesc.setCreated(new Date());
itemDesc.setUpdated(new Date());
itemDescMapper.insert(itemDesc);
//使用ActiveMq發(fā)送消息
jmsTemplate.send(itemAddTopic,new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(id + "");
return textMessage;
}
});
return TaotaoResult.ok();
}
}
3.2 消費(fèi)者
我們的思路是赂韵,消費(fèi)者傳一個(gè)商品的id過(guò)來(lái),然后通過(guò)id查詢到商品的所需要的信息挠蛉,再添加到索引庫(kù)中去祭示;所以我們要在Mapper中增加一個(gè)通過(guò)id查詢商品的方法
完整代碼
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.taotao.search.mapper.SearchItemMapper">
<select id="getItemList" resultType="com.taotao.common.pojo.SearchItem">
SELECT
a.id,
a.title,
a.sell_point,
a.price,
a.image,
b. NAME category_name,
c.item_desc
FROM
tb_item a
LEFT JOIN tb_item_cat b ON a.cid = b.id
LEFT JOIN tb_item_desc c ON a.id = c.item_id
WHERE
a.`status` = 1
</select>
<select id="getItemById" parameterType="long" resultType="com.taotao.common.pojo.SearchItem">
SELECT
a.id,
a.title,
a.sell_point,
a.price,
a.image,
b. NAME category_name,
c.item_desc
FROM
tb_item a
LEFT JOIN tb_item_cat b ON a.cid = b.id
LEFT JOIN tb_item_desc c ON a.id = c.item_id
WHERE
a.`status` = 1
AND
a.id = #{itemId}
</select>
</mapper>
package com.taotao.search.listener;
import com.taotao.search.service.SearchItemService;
import com.taotao.search.service.SearchService;
import org.springframework.beans.factory.annotation.Autowired;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class ItemAddMessageListener implements MessageListener {
@Autowired
private SearchItemService searchItemService;
@Override
public void onMessage(Message message) {
try{
//從消息中獲取商品的id
TextMessage textMessage = (TextMessage)message;
String text = textMessage.getText();
Long itemId = Long.parseLong(text);
//根據(jù)商品ID查詢數(shù)據(jù),添加商品到索引庫(kù)谴古,因?yàn)槭聞?wù)提交需要一段時(shí)間质涛,為了避免查詢不到商品的情況出現(xiàn)
//所以需要設(shè)置一下等待的時(shí)間
//等待事務(wù)的提交
Thread.sleep(1000);
//查詢商品,并將商品添加到索引庫(kù)
searchItemService.addDocument(itemId);
}catch(Exception e) {
e.printStackTrace();
}
}
}
在SearchItmService中添加一個(gè)方法
完整代碼
package com.taotao.search.service.impl;
import com.taotao.common.pojo.SearchItem;
import com.taotao.common.pojo.TaotaoResult;
import com.taotao.search.mapper.SearchItemMapper;
import com.taotao.search.service.SearchItemService;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.common.SolrInputDocument;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class SearchItemServiceImpl implements SearchItemService {
//注入SolrServer在bean中裝配
@Autowired
private SolrServer solrServer;
@Autowired
private SearchItemMapper searchItemMapper;
@Override
public TaotaoResult addSerchItem() throws Exception {
//查詢所有數(shù)據(jù)
List<SearchItem> searchItemList = searchItemMapper.getItemList();
//遍歷商品數(shù)據(jù)掰担,添加到索引庫(kù)
for (SearchItem searchItem:
searchItemList ) {
//為每個(gè)商品創(chuàng)建文檔對(duì)象SolrInputDocument
SolrInputDocument document = new SolrInputDocument();
//對(duì)文檔對(duì)象添加域
document.addField("id", searchItem.getId());
document.addField("item_title", searchItem.getTitle());
document.addField("item_sell_point", searchItem.getSell_point());
document.addField("item_price", searchItem.getPrice());
document.addField("item_image", searchItem.getImage());
document.addField("item_category_name", searchItem.getCategory_name());
document.addField("item_desc", searchItem.getItem_desc());
//向索引庫(kù)中添加文檔
solrServer.add(document);
}
//提交修改
solrServer.commit();
//返回結(jié)果
return TaotaoResult.ok();
}
public TaotaoResult addDocument(Long itemId) throws Exception {
// 1汇陆、根據(jù)商品id查詢商品信息。
SearchItem searchItem = searchItemMapper.getItemById(itemId);
// 2带饱、創(chuàng)建一SolrInputDocument對(duì)象毡代。
SolrInputDocument document = new SolrInputDocument();
// 3、使用SolrServer對(duì)象寫(xiě)入索引庫(kù)勺疼。
document.addField("id", searchItem.getId());
document.addField("item_title", searchItem.getTitle());
document.addField("item_sell_point", searchItem.getSell_point());
document.addField("item_price", searchItem.getPrice());
document.addField("item_image", searchItem.getImage());
document.addField("item_category_name", searchItem.getCategory_name());
document.addField("item_desc", searchItem.getItem_desc());
// 5教寂、向索引庫(kù)中添加文檔。
solrServer.add(document);
solrServer.commit();
// 4恢口、返回成功孝宗,返回TaotaoResult。
return TaotaoResult.ok();
}
}
為什么我們要等待一秒呢耕肩。因?yàn)橛锌赡苌唐诽砑拥氖聞?wù)還沒(méi)有完成的時(shí)候就把消息傳遞過(guò)來(lái),并且消費(fèi)者馬上消息了问潭,這樣的話會(huì)造成一個(gè)問(wèn)題就是查詢不到商品的信息.所以我們一般會(huì)設(shè)置一個(gè)等待時(shí)間
Spring中配置監(jiān)聽(tīng)
完整配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- 真正可以產(chǎn)生Connection的ConnectionFactory猿诸,由對(duì)應(yīng)的 JMS服務(wù)廠商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.208.40:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目標(biāo)ConnectionFactory對(duì)應(yīng)真實(shí)的可以產(chǎn)生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!--這個(gè)是隊(duì)列目的地,點(diǎn)對(duì)點(diǎn)的 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--這個(gè)是主題目的地狡忙,一對(duì)多的 -->
<bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="item-change-topic" />
</bean>
<!-- 接收消息 -->
<!-- 配置監(jiān)聽(tīng)器 -->
<bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" />
<!-- 消息監(jiān)聽(tīng)容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
<!-- 配置監(jiān)聽(tīng)器 -->
<bean id="itemAddMessageListener" class="com.taotao.search.listener.ItemAddMessageListener"/>
<!-- 消息監(jiān)聽(tīng)容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="itemAddTopic"/>
<property name="messageListener" ref="itemAddMessageListener"/>
</bean>
</beans>
4.測(cè)試結(jié)果
啟動(dòng)所有服務(wù)梳虽,然后添加商品
然后再前臺(tái)搜索少兒可以看到效果