ReturnListener
mandatory
This flag tells the server how to react if the message cannot be routed to a queue.if this flag is set,the server will return an unroutable message with a return method.if this flag is zero,the server silently drops the message
如果mandatory有設(shè)置,則當(dāng)消息不能路由到隊(duì)列中去的時(shí)候枉氮,會(huì)觸發(fā)return method志衍。如果mandatory沒(méi)有設(shè)置,則當(dāng)消息不能路由到隊(duì)列的時(shí)候聊替,server會(huì)刪除該消息楼肪。
immediate
This flag tell the server how to react if the message cannot be routed to a queue consumer immediately.if the flag is set ,the server will return an undeliverable message with a return method.if this flag is zero,the server will queue the message ,but with no guarantee that it will ever be consumed.
如果有設(shè)置immediate,則當(dāng)消息路由到隊(duì)列的時(shí)候惹悄,沒(méi)有消費(fèi)者的時(shí)候春叫,會(huì)觸發(fā)return method。如果immediate標(biāo)識(shí)是0俘侠,則服務(wù)器就會(huì)將消息加入隊(duì)列象缀,但是不能保證這個(gè)消息能被消費(fèi)。
注意
immediate屬性在Rabbitmq3.x的時(shí)候爷速,被廢棄了央星。
因?yàn)檫@個(gè)關(guān)鍵字違背了生產(chǎn)者和消費(fèi)者之間解耦的特性,因?yàn)樯a(chǎn)者不關(guān)心消息是否被消費(fèi)者消費(fèi)掉惫东,所以這個(gè)字段被drop掉了莉给。
RabbitMQ java client 相關(guān)API
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Send {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
//或者
connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
System.out.println("=========handleReturn===method============");
System.out.println("replyText:"+replyText);
System.out.println("exchange:"+exchange);
System.out.println("routingKey:"+routingKey);
System.out.println("message:"+new String(body));
}
});
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).
contentEncoding("UTF-8").build();
//第三個(gè)參數(shù)是設(shè)置的mandatory
channel.basicPublish("zhihao.direct.exchange","log.aaa",true,properties,"注冊(cè)驗(yàn)證碼".getBytes());
TimeUnit.SECONDS.sleep(10);
channel.close();
connection.close();
}
}
因?yàn)槁酚刹坏秸_的隊(duì)列所以觸發(fā)了ReturnListener方法的回調(diào),控制臺(tái)上打恿凇:
replyText:NO_ROUTE
exchange:zhihao.direct.exchange
routingKey:log.aaa
message:注冊(cè)驗(yàn)證碼
spring-amqp的相關(guān)api
配置:
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
factory.setPublisherReturns(true);
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
//rabbitTemplate.setMandatoryExpression(new SpelExpressionParser().parseExpression("(1+2) > 3")); //表達(dá)式的值為false
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
@Override
public void returnedMessage(Message message,
int replyCode,
String replyText,
String exchange,
String routingKey){
System.out.println("============returnedMessage==method=========");
System.out.println("replyCode: "+replyCode);
System.out.println("replyText: "+replyText);
System.out.println("exchange: "+exchange);
System.out.println("routingKey: "+routingKey);
}
});
return rabbitTemplate;
}
}
發(fā)送消息
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
System.out.println(rabbitTemplate);
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc","消息發(fā)送");
messageProperties.getHeaders().put("token","234sdfsdf3r342dsfd1232");
//路由不到指定的隊(duì)列
rabbitTemplate.convertAndSend("zhihao.direct.exchange","log.aaa","hello welcome");
TimeUnit.SECONDS.sleep(10);
context.close();
}
}
rabbitTemplate.setReturnCallback的方法會(huì)被執(zhí)行颓遏,控制臺(tái)打印:
============returnedMessage==method=========
replyCode: 312
replyText: NO_ROUTE
exchange: zhihao.direct.exchange
routingKey: log.aaa
總結(jié)
- 設(shè)置factory.setPublisherReturns(true);
- rabbitTemplate.setMandatory(true);或rabbitTemplate.setMandatoryExpression(new SpelExpressionParser().parseExpression("(1+2) > 2")); //表達(dá)式的值為true
才會(huì)觸發(fā)returnCallback回調(diào)方法的執(zhí)行滞时。