RabbitListenerConfigurer詳解
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerEndpoint;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConsumerConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean
public RabbitListenerConfigurer rabbitListenerConfigurer(){
return new RabbitListenerConfigurer() {
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
//endpoint設(shè)置zhihao.miao.order隊列的消息處理邏輯
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setId("10");
endpoint.setQueueNames("zhihao.miao.order");
endpoint.setMessageListener(message -> {
System.out.println("endpoint1處理消息的邏輯");
System.out.println(new String(message.getBody()));
});
//使用適配器來處理消息,設(shè)置了order,pay隊列的消息處理邏輯
SimpleRabbitListenerEndpoint endpoint2 = new SimpleRabbitListenerEndpoint();
endpoint2.setId("11");
endpoint2.setQueueNames("order","pay");
System.out.println("endpoint2處理消息的邏輯");
endpoint2.setMessageListener(new MessageListenerAdapter(new MessageHandler()));
//注冊二個endpoint
registrar.registerEndpoint(endpoint);
registrar.registerEndpoint(endpoint2);
}
};
}
}
消費端消息處理器
public class MessageHandler {
public void handleMessage(byte[] message){
System.out.println("消費消息");
System.out.println(new String(message));
}
}
消費端應(yīng)用啟動類
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@EnableRabbit
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
System.out.println("rabbit service startup");
TimeUnit.SECONDS.sleep(60);
context.close();
}
}
使用總結(jié)
- 實現(xiàn)
RabbitListenerConfigurer
接口瘩蚪,并把實現(xiàn)類托管到spring容器中 - 在spring容器中万牺,托管一個
RabbitListenerContainerFactory
的bean(SimpleRabbitListenerContainerFactory
) - 在啟動類上加上
@EnableRabbit
注解