上一篇 <<<Rabbitmq基礎(chǔ)知識(shí)
下一篇 >>>Rabbitmq示例之工作(公平)隊(duì)列
1.特點(diǎn)
默認(rèn)的傳統(tǒng)隊(duì)列是為均攤消費(fèi)讯检,存在不公平性加酵;
如果每個(gè)消費(fèi)者速度不一樣的情況下旁钧,均攤消費(fèi)是不公平的顽铸,應(yīng)該是能者多勞突雪。
2.程序代碼示例
2.1依賴包
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5 </version>
</dependency>
</dependencies>
2.2 連接類
public class RabitMQConnection {
public static Connection getConnection() throws IOException, TimeoutException {
// 1.創(chuàng)建我們的連接
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.設(shè)置我們的連接地址
connectionFactory.setHost("10.211.55.16");
// 3.設(shè)置我們的端口號(hào)
connectionFactory.setPort(5672);
// 4.設(shè)置賬號(hào)和密碼
connectionFactory.setUsername("jiang");
connectionFactory.setPassword("123456");
// 5.設(shè)置VirtualHost
connectionFactory.setVirtualHost("/mytest1205");
return connectionFactory.newConnection();
}
}
2.3 生產(chǎn)者
public class Producer {
private static final String QUEUE_NAME = "test1205";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("生產(chǎn)者啟動(dòng)成功..");
// 1.創(chuàng)建我們的連接
Connection connection = RabitMQConnection.getConnection();
// 2.創(chuàng)建我們通道
Channel channel = connection.createChannel();
for (int i = 0; i < 10; i++) {
String msg = "發(fā)送測(cè)試內(nèi)容-" + i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("生產(chǎn)者發(fā)送消息成功:" + msg);
}
channel.close();
connection.close();
}
}
2.4 消費(fèi)者
public class Consumer {
private static final String QUEUE_NAME = "test1205";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.創(chuàng)建我們的連接
Connection connection = RabitMQConnection.getConnection();
// 2.創(chuàng)建我們通道
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消費(fèi)消息msg:" + msg);
}
};
// 3.創(chuàng)建我們的監(jiān)聽的消息
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
3.測(cè)試效果
消費(fèi)者開多個(gè)時(shí)触机,默認(rèn)是采用輪詢(均攤)機(jī)制
推薦閱讀:
<<<消息中間件的核心思想
<<<消息中間件常見問題匯總
<<<基于Netty簡單手寫消息中間件思路
<<<消息隊(duì)列常用名詞與中間件對(duì)比
<<<Rabbitmq基礎(chǔ)知識(shí)
<<<Rabbitmq示例之工作(公平)隊(duì)列
<<<Rabbitmq示例之發(fā)布訂閱模式
<<<Rabbitmq示例之路由模式Routing
<<<Rabbitmq示例之通配符模式Topics
<<<Rabbitmq示例之RPC模式
<<<Rabbitmq隊(duì)列模式總結(jié)
<<<Rabbitmq如何保證消息不丟失
<<<Springboot利用AmqpTemplate整合Rabbitmq
<<<Rabbitmq如何保證冪等性
<<<Rabbitmq的重試策略
<<<Rabbitmq通過死信隊(duì)列實(shí)現(xiàn)過期監(jiān)聽
<<<Rabbitmq解決分布式事務(wù)思路
<<<Rabbitmq解決分布式事務(wù)demo
<<<Rabbitmq環(huán)境安裝
<<<Kafka中的專業(yè)術(shù)語都有哪些
<<<Kafka的設(shè)計(jì)原理介紹
<<<Kafka集群如何實(shí)現(xiàn)相互感知
<<<Kafka如何實(shí)現(xiàn)分區(qū)及指定分區(qū)消費(fèi)
<<<Kafka如何保證消息順序消費(fèi)
<<<Kafka如何保證高吞吐量
<<<Kafka集群環(huán)境搭建
<<<RocketMQ架構(gòu)原理
<<<RocketMQ盛嘿、RabbitMQ和Kafka的對(duì)比
<<<SpringBoot整合RocketMQ示例
<<<RocketMQ保證順序消費(fèi)demo
<<<RocketMQ如何動(dòng)態(tài)擴(kuò)容和縮容
<<<RocketMQ如何解決分布式事務(wù)
<<<RocketMQ單機(jī)版本安裝
<<<RocketMQ集群環(huán)境程序啟用相關(guān)知識(shí)點(diǎn)
<<<RocketMQ單機(jī)做主備實(shí)操
<<<RocketMQ所有配置說明