這部分沒有涉及到交換機(jī),所以一個(gè)消息只能被消費(fèi)一次,多個(gè)消費(fèi)者之間是競爭關(guān)系
image.png
1、連接rabbitMq
pom文件
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
</dependencies>
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Author: yokipang
* @Date: 2022/5/10
* 連接工廠創(chuàng)建信道
*/
public class RabbitMqUtils {
public static Channel getChannel() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.xx");
factory.setUsername("xxx");
factory.setPassword("xxxxx");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
2贤惯、具體方法
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import utils.RabbitMqUtils;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class ConfirmMessage {
public static final int message_count = 1000;
public static void main(String[] args) throws Exception {
//單個(gè)確認(rèn) 690ms
//publishMessage1();
//批量確認(rèn) 160ms
//publishMessage2();
//異步批量確認(rèn)
publishMessage3();
}
//單個(gè)確認(rèn)
public static void publishMessage1() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
/**
* 聲明隊(duì)列
* 參數(shù)1 隊(duì)列名稱
* 參數(shù)2 消息是否持久化
* 參數(shù)3 是否可以由多個(gè)消費(fèi)者消費(fèi)
* 參數(shù)4 是否自動(dòng)刪除
* 參數(shù)5 其他
*/
channel.queueDeclare(queueName,true,false,false,null);
//開啟發(fā)布、確認(rèn)
channel.confirmSelect();
//開始時(shí)間
long begin = System.currentTimeMillis();
for (int i = 0 ; i<message_count;i++){
String message = i+"";
channel.basicPublish("",queueName,null,message.getBytes());
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("消息發(fā)送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("所需時(shí)間:"+(end-begin)+"ms");
}
//批量發(fā)布確認(rèn)
public static void publishMessage2() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
/**
* 聲明隊(duì)列
* 參數(shù)1 隊(duì)列名稱
* 參數(shù)2 消息是否持久化
* 參數(shù)3 是否可以由多個(gè)消費(fèi)者消費(fèi)
* 參數(shù)4 是否自動(dòng)刪除
* 參數(shù)5 其他
*/
channel.queueDeclare(queueName,true,false,false,null);
//開啟發(fā)布棒掠、確認(rèn)
channel.confirmSelect();
//開始時(shí)間
long begin = System.currentTimeMillis();
//批量確認(rèn)數(shù)量
int batchSize = 100;
for (int i = 0 ; i<message_count;i++){
String message = i+"";
channel.basicPublish("",queueName,null,message.getBytes());
if( i % batchSize == 0){
channel.waitForConfirms();
}
}
long end = System.currentTimeMillis();
System.out.println("所需時(shí)間:"+(end-begin)+"ms");
}
//異步批量發(fā)布確認(rèn)
public static void publishMessage3() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
/**
* 聲明隊(duì)列
* 參數(shù)1 隊(duì)列名稱
* 參數(shù)2 消息是否持久化
* 參數(shù)3 是否可以由多個(gè)消費(fèi)者消費(fèi)
* 參數(shù)4 是否自動(dòng)刪除
* 參數(shù)5 其他
*/
channel.queueDeclare(queueName,true,false,false,null);
//開啟發(fā)布孵构、確認(rèn)
channel.confirmSelect();
/**
* 線程安全有序的哈希表 適用于高并發(fā)情況
* 1、輕松地將序號(hào)與消息進(jìn)行關(guān)聯(lián)
* 2烟很、輕松的批量刪除條目 只要給到序號(hào)
* 3颈墅、支持高并發(fā)(多線程)
*/
ConcurrentSkipListMap<Long,String> concurrentSkipListMap = new ConcurrentSkipListMap<>();
//消息發(fā)送前準(zhǔn)備監(jiān)聽器 監(jiān)聽消息發(fā)送狀態(tài)
/**
* 消息確認(rèn)成功函數(shù)
* 1、消息標(biāo)記
* 2溯职、是否批量
*/
ConfirmCallback ackConfirmCallback =(deliveryTag,multiple)->{
//是否批量處理
if(multiple){
//刪除掉已確認(rèn)的消息 剩下的就是未成功發(fā)送的消息
ConcurrentNavigableMap<Long,String> confirmd = concurrentSkipListMap.headMap(deliveryTag);
confirmd.clear();
}else{
concurrentSkipListMap.remove(deliveryTag);
}
System.out.println("確認(rèn)成功的消息:"+ deliveryTag);
};
/**
* 消息確認(rèn)失敗函數(shù)
* 1精盅、消息標(biāo)記
* 2帽哑、是否批量
*/
ConfirmCallback notAckConfirmCallback =(deliveryTag,multiple)->{
String message = concurrentSkipListMap.get(deliveryTag);
System.out.println("確認(rèn)失敗的消息:"+message +" 消息標(biāo)記" + deliveryTag);
};
//開始時(shí)間
long begin = System.currentTimeMillis();
//監(jiān)聽器
channel.addConfirmListener(ackConfirmCallback,notAckConfirmCallback);
//發(fā)送消息
for (int i = 0 ; i<message_count;i++){
String message = i+"";
channel.basicPublish("",queueName,null,message.getBytes());
//記錄所有要發(fā)送的消息
concurrentSkipListMap.put(channel.getNextPublishSeqNo(),message);
}
long end = System.currentTimeMillis();
System.out.println("所需時(shí)間:"+(end-begin)+"ms");
}
}