rabbitmq分為發(fā)送端和接收端
發(fā)送端代碼
- 首先捐祠,引入依賴jar
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
- 先在application.properties里面配置rabbitmq地址和topic
spring.rabbitmq.addresses=IP
spring.rabbitmq.port=5672
spring.rabbitmq.username=canace(賬號)
spring.rabbitmq.password=123456(密碼)
spring.rabbitmq.connection-timeout=2000
rabbit_topickey=wsQueue
- 新建一個類玖瘸,發(fā)送消息
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${rabbit_topickey}")
private String rabbitmqtopic;
@GetMapping("/test")
public String toRabbitmq(String username) {
System.out.println("To Rabbitmq>>>>>>>>" + username);
// 第一個參數為剛剛定義的隊列名稱
this.rabbitTemplate.convertAndSend(rabbitmqtopic, username);
return "add User to rabbitmq Successful!!";
}
- 第二種發(fā)送,引入jar包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
- 創(chuàng)建RabbitmqConfig,鏈接rabbitmq
@Configuration
public class RabbitmqConfig {
private static String host = "IP";
private static String userName = "canace";
private static String passWord = "123456";
private static int port = 5672;
public static Channel getChannelInstance(String connectionDescription) {
try {
ConnectionFactory connectionFactory = getConnectionFactory();
Connection connection = connectionFactory.newConnection(connectionDescription);
return connection.createChannel();
} catch (Exception e) {
throw new RuntimeException("獲取Channel連接失敗", e);
}
}
@Bean
private static ConnectionFactory getConnectionFactory() {
ConnectionFactory connectionFactory = new ConnectionFactory();
// 配置連接信息
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(passWord);
connectionFactory.setVirtualHost("/");
connectionFactory.setConnectionTimeout(30000);
connectionFactory.setHandshakeTimeout(30000);
// 網絡異常自動連接恢復
connectionFactory.setAutomaticRecoveryEnabled(true);
// 每10秒嘗試重試連接一次
connectionFactory.setNetworkRecoveryInterval(10000);
return connectionFactory;
}
}
- 發(fā)送消息代碼
public String send(String exchange, String routingKey, String message) throws IOException {
if (channel == null || !channel.isOpen()) {
channel = RabbitmqConfig.getChannelInstance("隊列消息生產者");
}
message = URLEncoder.encode(message, "utf-8");
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2).contentType(
"UTF-8").build();
channel.basicPublish(exchange, routingKey, false, basicProperties, message.getBytes());
RedisUtil.set(message, message);
return "send ok";
}
接收端代碼
- 首先,引入依賴jar
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
- 先在application.properties里面配置一樣的rabbitmq地址,然后queues與發(fā)送端的key配置成一樣是目,group消費組可自行配置。
@Component
@RabbitListener(queues = "wsQueue", group = "wushaungRabit")
public class rabbitMQConsumer {
/**
* 消息消費
* @RabbitHandler 代表此方法為接受到消息后的處理方法
*/
@RabbitHandler
public void receive(String msg) {
System.out.println("[Rabbitmq] recieved message: " + msg);
}
}
- 第二種标捺,引入jar包spring-rabbit和spring-amqp
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.3.2</version>
</dependency>
- 配置rabbitmq.properties
rabbit.hosts=192.168.1.95
rabbit.username=canace
rabbit.password=123456
rabbit.port=5672
rabbit.virtualHost=/
- 配置amqp-share.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.3.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-4.3.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<util:properties id="appConfig" location="classpath:rabbitmq.properties"></util:properties>
<rabbit:connection-factory id="connectionFactory" host="${rabbit.hosts}"
port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"
virtual-host="${rabbit.virtualHost}"
channel-cache-size="10"/>
<rabbit:admin connection-factory="connectionFactory"/>
<!--定義消息隊列-->
<rabbit:queue name="spittle.alert.queue.3" durable="true" auto-delete="false"/>
<!--交換機定義懊纳,綁定隊列-->
<rabbit:fanout-exchange id="spittle.fanout" name="spittle.fanout" durable="true">
<rabbit:bindings>
<rabbit:binding queue="spittle.alert.queue.3"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
</beans>
- 配置amqp-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<import resource="amqp-share.xml"/>
<!-- 配置監(jiān)聽器 -->
<rabbit:listener-container connection-factory="connectionFactory" type="simple">
<rabbit:listener ref="spittleListener" method="onMessage"
queues="spittle.alert.queue.3"/>
</rabbit:listener-container>
<bean id="spittleListener" class="com.pamirs.agent.rabbitmq2xc.demo.handler.SpittleAlertHandler"/>
</beans>
- 消費端接收端
public class SpittleAlertHandler implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String body = new String(message.getBody(), "UTF-8").replace("\"", "");
System.out.println("body>>>>>>>>>>>>>>>" + body);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
- 第三種,引入jar包amqp-client
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.0</version>
</dependency>
- 消費端代碼
@Component
public class rabbitMGconsumer {
@PostConstruct
public void init() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.95");
factory.setPort(5672);
factory.setUsername("canace");
factory.setPassword("123456");
factory.setVirtualHost("/");
//// 網絡異常自動連接恢復
//factory.setAutomaticRecoveryEnabled(true);
//// 每10秒嘗試重試連接一次
//factory.setNetworkRecoveryInterval(10000);
Connection connect = factory.newConnection();
Channel channel = connect.createChannel();
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume("test", false, tag, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.printf("exchange : %s routingKey : %s consumer tag : %s thread : %s 消息內容 : %s%n", message.getEnvelope().getExchange(), message.getEnvelope().getRoutingKey(), consumerTag, Thread.currentThread().getName(), new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), true);
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("cancel" + consumerTag);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}