問題
企業(yè)中使用消息中間件面臨的常見問題:
1.消息莫名其妙的沒了旭旭,也不知道什么情況盖彭,有丟消息的問題。
2.發(fā)送者沒法確認(rèn)是否發(fā)送成功蚀乔,消費(fèi)者處理失敗也無法反饋烁竭。
消息可靠性的二種方式
1.事務(wù),利用AMQP協(xié)議的一部分吉挣,發(fā)送消息前設(shè)置channel為tx模式(channel.txSelect();)派撕,如果txCommit提交成功了,則消息一定到達(dá)了broker了睬魂,如果在txCommit執(zhí)行之前broker異常崩潰或者由于其他原因拋出異常终吼,這個(gè)時(shí)候我們便可以捕獲異常通過txRollback回滾事務(wù)了。(大大得削弱消息中間件的性能)
2.消息確認(rèn)(publish confirms)氯哮,設(shè)置管道為confirmSelect模式(channel.confirmSelect();)
生產(chǎn)者與broker之間的消息確認(rèn)稱為public confirms际跪,public confirms機(jī)制用于解決生產(chǎn)者與Rabbitmq服務(wù)器之間消息可靠傳輸,它在消息服務(wù)器持久化消息后通知消息生產(chǎn)者發(fā)送成功喉钢。
發(fā)送確認(rèn)(publisher confirms)
RabbitMQ java Client實(shí)現(xiàn)發(fā)送確認(rèn)
deliveryTag(投遞的標(biāo)識(shí))垫卤,當(dāng)Channel設(shè)置成confirm模式時(shí),發(fā)布的每一條消息都會(huì)獲得一個(gè)唯一的deliveryTag出牧,任何channel上發(fā)布的第一條消息的deliveryTag為1,此后的每一條消息都會(huì)加1歇盼,deliveryTag在channel范圍內(nèi)是唯一的舔痕。
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
public class Send {
static Long id = 0L;
static TreeSet<Long> tags = new TreeSet<>();
public static Long send(Channel channel,byte[] bytes) throws Exception{
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).
contentEncoding("UTF-8").build();
channel.basicPublish("zhihao.direct.exchange","zhihao.miao.order",properties,bytes);
return ++id;
}
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//是當(dāng)前的channel處于確認(rèn)模式
channel.confirmSelect();
//使當(dāng)前的channel處于事務(wù)模式,與上面的使channel處于確認(rèn)模式使互斥的
//channel.txSelect();
/**
* deliveryTag 消息id
* multiple 是否批量
* 如果是true豹缀,就意味著伯复,小于等于deliveryTag的消息都處理成功了
* 如果是false,只是成功了deliveryTag這一條消息
*/
channel.addConfirmListener(new ConfirmListener() {
//消息發(fā)送成功并且在broker落地邢笙,deliveryTag是唯一標(biāo)志符啸如,在channek上發(fā)布的消息的deliveryTag都會(huì)比之前加1
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("=========deliveryTag==========");
System.out.println("deliveryTag: "+deliveryTag);
System.out.println("multiple: "+multiple);
//處理成功發(fā)送的消息
if(multiple){
//批量操作
for(Long _id:new TreeSet<>(tags.headSet(deliveryTag+1))){
tags.remove(_id);
}
}else{
//單個(gè)確認(rèn)
tags.remove(deliveryTag);
}
System.out.println("未處理的消息: "+tags);
}
/**
* deliveryTag 消息id
* multiple 是否批量
* 如果是true,就意味著氮惯,小于等于deliveryTag的消息都處理失敗了
* 如果是false叮雳,只是失敗了deliveryTag這一條消息
*/
//消息發(fā)送失敗或者落地失敗
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("===========handleNack===========");
System.out.println("deliveryTag: "+deliveryTag);
System.out.println("multiple: "+multiple);
}
});
/**
* 當(dāng)Channel設(shè)置成confirm模式時(shí)想暗,發(fā)布的每一條消息都會(huì)獲得一個(gè)唯一的deliveryTag
* deliveryTag在basicPublish執(zhí)行的時(shí)候加1
*/
Long id = send(channel,"你的外賣已經(jīng)送達(dá)".getBytes());
tags.add(id);
//channel.waitForConfirms();
id =send(channel,"你的外賣已經(jīng)送達(dá)".getBytes());
tags.add(id);
//channel.waitForConfirms();
id = send(channel,"呵呵,不接電話".getBytes());
tags.add(id);
//channel.waitForConfirms();
TimeUnit.SECONDS.sleep(10);
channel.close();
connection.close();
}
}
channel.waitForConfirms():表示等待已經(jīng)發(fā)送給broker的消息act或者nack之后才會(huì)繼續(xù)執(zhí)行帘不。
channel.waitForConfirmsOrDie():表示等待已經(jīng)發(fā)送給broker的消息act或者nack之后才會(huì)繼續(xù)執(zhí)行说莫,如果有任何一個(gè)消息觸發(fā)了nack則拋出IOException。
總結(jié)
生產(chǎn)者與broker之間的消息可靠性保證的基本思路就是
- 當(dāng)消息發(fā)送到broker的時(shí)候寞焙,會(huì)執(zhí)行監(jiān)聽的回調(diào)函數(shù)储狭,其中deliveryTag是消息id(在同一個(gè)channel中這個(gè)數(shù)值是遞增的,而multiple表示是否批量確認(rèn)消息捣郊。
- 在生產(chǎn)端要維護(hù)一個(gè)消息發(fā)送的表辽狈,消息發(fā)送的時(shí)候記錄消息id,在消息成功落地broker磁盤并且進(jìn)行回調(diào)確認(rèn)(ack)的時(shí)候呛牲,根據(jù)本地消息表和回調(diào)確認(rèn)的消息id進(jìn)行對(duì)比刮萌,這樣可以確保生產(chǎn)端的消息表中的沒有進(jìn)行回調(diào)確認(rèn)(或者回調(diào)確認(rèn)時(shí)網(wǎng)絡(luò)問題)的消息進(jìn)行補(bǔ)救式的重發(fā),當(dāng)然不可避免的就會(huì)在消息端可能會(huì)造成消息的重復(fù)消息侈净。針對(duì)消費(fèi)端重復(fù)消息尊勿,在消費(fèi)端進(jìn)行冪等處理。(丟消息和重復(fù)消息是不可避免的二個(gè)極端畜侦,比起丟消息元扔,重復(fù)消息還有補(bǔ)救措施,而消息丟失就真的丟失了旋膳。
Spring AMQP實(shí)現(xiàn)實(shí)現(xiàn)發(fā)送確認(rèn)
示列
定義消息內(nèi)容
public class Order {
private String orderId;
private String createTime;
private double price;
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getCreateTime() {
return createTime;
}
public void setCreateTime(String createTime) {
this.createTime = createTime;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
}
配置項(xiàng):
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
factory.setPublisherConfirms(true);
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 唯一標(biāo)識(shí)澎语,有了這個(gè)唯一標(biāo)識(shí),我們就知道可以確認(rèn)(失斞榘谩)哪一條消息了
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("=====消息進(jìn)行消費(fèi)了======");
if(ack){
System.out.println("消息id為: "+correlationData+"的消息擅羞,已經(jīng)被ack成功");
}else{
System.out.println("消息id為: "+correlationData+"的消息,消息nack义图,失敗原因是:"+cause);
}
}
});
return rabbitTemplate;
}
}
啟動(dòng)應(yīng)用類:
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ComponentScan
public class Application {
public static Order createOrder(){
Order order = new Order();
order.setOrderId(UUID.randomUUID().toString());
order.setCreateTime(LocalDateTime.now().toString());
order.setPrice(100L);
return order;
}
public static void saveOrder(Order order){
//入庫操作
System.out.println("入庫操作");
}
public static void main(String[] args) throws Exception{
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
Order order = createOrder();
saveOrder(order);
ObjectMapper objectMapper = new ObjectMapper();
byte[] body = objectMapper.writeValueAsBytes(order);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("json");
Message message = new Message(body,messageProperties);
System.out.println("id: "+order.getOrderId());
//指定correlationData的值
rabbitTemplate.send("zhihao.direct.exchange","zhihao.miao.order",message,new CorrelationData(order.getOrderId().toString()));
TimeUnit.SECONDS.sleep(10);
context.close();
}
}
控制臺(tái)打釉群濉:
入庫操作
id: 11bc9eb3-fbcb-4777-9596-b6f6db81cafc
十月 22, 2017 7:14:14 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection
信息: Created new connection: connectionFactory#50ad3bc1:0/SimpleConnection@4efc180e [delegate=amqp://zhihao.miao@192.168.1.131:5672/, localPort= 61095]
=====消息進(jìn)行消費(fèi)了======
消息id為: CorrelationData [id=11bc9eb3-fbcb-4777-9596-b6f6db81cafc]的消息,已經(jīng)被ack成功
原理其實(shí)和java client是一樣的舰讹,我們?cè)诎l(fā)送消息的時(shí)候落地本地的消息表(有表示confirm字段)北苟,然后進(jìn)行回調(diào)確認(rèn)的方法中進(jìn)行狀態(tài)的更新,最后輪詢表中狀態(tài)不正確的消息進(jìn)行輪詢重發(fā)怕篷。
步驟
- 在容器中的ConnectionFactory實(shí)例中加上setPublisherConfirms屬性
factory.setPublisherConfirms(true); - 在RabbitTemplate實(shí)例中增加setConfirmCallback回調(diào)方法历筝。
- 發(fā)送消息的時(shí)候,需要指定CorrelationData廊谓,用于標(biāo)識(shí)該發(fā)送的唯一id梳猪。
對(duì)比與java client的publisher confirm:
1.spring amqp不支持批量確認(rèn),底層的rabbitmq java client方式支持批量確認(rèn)蒸痹。
2.spring amqp提供的方式更加的簡單明了春弥。
參考資料
關(guān)于另外一種Publisher Confirms事務(wù)機(jī)制可以參考下面這篇博客呛哟,很是簡單
深入學(xué)習(xí)RabbitMQ(二):AMQP事務(wù)機(jī)制