來一幅圖:
說明:可靠性和效率是不可兼得的,保證可靠得犧牲一部分效率。
為了保障消息成功從生產(chǎn)者投遞到broker:
采用comfirm確認(rèn)消息機制结笨,如果Broker端接受到消息,那么就會回送相應(yīng),然后生產(chǎn)者會監(jiān)聽Broker給的應(yīng)答气忠,流程圖:
實現(xiàn)方式:
代碼如下:
生產(chǎn)者:
public class Producer {
public static void main(String[] args) throws Exception {
//1 創(chuàng)建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//2 獲取C onnection
Connection connection = connectionFactory.newConnection();
//3 通過Connection創(chuàng)建一個新的Channel
Channel channel = connection.createChannel();
//4 指定我們的消息投遞模式: 消息的確認(rèn)模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
//5 發(fā)送一條消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
//6 添加一個確認(rèn)監(jiān)聽
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!-----------");
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
}
}
隊列收到消息之后會自動ACK(或者在消費者端可以手動選擇channel的NCK),那么生產(chǎn)者端的監(jiān)聽就會收到回復(fù)未桥。
2.如果消息路由不到指定的隊列笔刹,處理路由不到的問題,那么方法一就可以使用Return Listener.
流程圖如下:
如果發(fā)送消息的時候冬耿,可能因為routingkey錯誤舌菜,或者隊列不存在,或者隊列名稱錯誤導(dǎo)致路由失敗亦镶。
使用方式:
使用mandatory參數(shù)(即發(fā)送消息時候的第三個參數(shù)設(shè)置為true)和ReturnListener日月,可以實現(xiàn)消息無法路由的時候返回給生產(chǎn)者。
核心代碼:
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("---------handle return----------");
System.err.println("replyCode: " + replyCode);
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
});
channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
此時如果消息路由不到缤骨,生產(chǎn)者配置的監(jiān)聽會拿到該消息爱咬。
方法二是采用備份交換機(alternate-exchange),無法路由的消息會發(fā)送到這個交換機上绊起。
代碼如下:
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE"); // 指定交換機的備份交換機
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);
3.確保消息成功從隊列投到消費者(注意是隊列到消費者):
采用ACK機制,在channel操作隊列和消費者時候即首先關(guān)閉autoAck,
// 手工簽收 必須要關(guān)閉 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
這里我使用的是自定義消費者精拟,自定義消費者代碼:
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
如果是第0條消息,那么就拒絕并將此消息重回隊列尾部虱歪,如果不是第0條那么就ACK蜂绎。
生產(chǎn)者的代碼如下,每次發(fā)送消息都有num標(biāo)識:
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_ack_exchange";
String routingKey = "ack.save";
for(int i =0; i<5; i ++){
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("num", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
String msg = "Hello RabbitMQ ACK Message " + i;
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
開啟消費者和生產(chǎn)者,控制臺打印如下:
可以看出第0條消息一直被循環(huán)消費(因為這個隊列只綁定了一個消費者笋鄙,此消費者設(shè)置了第0條消息重回隊列师枣,那么就循環(huán)消費)。