上一篇 <<<Rabbitmq示例之點(diǎn)對(duì)點(diǎn)簡(jiǎn)單隊(duì)列
下一篇 >>>Rabbitmq示例之發(fā)布訂閱模式
1.原理
a、channel.basicQos(1);----每次只會(huì)給消費(fèi)者推送1條消息蹲盘,等待手動(dòng)ack確認(rèn)后才會(huì)繼續(xù)發(fā)送
b频鉴、手動(dòng)確認(rèn)ack操作
優(yōu)點(diǎn):實(shí)現(xiàn)能者多勞的公平性了
2.消費(fèi)者接受消息代碼示例
/**
* 消費(fèi)者接受消息
* a溉知、channel.basicQos(1); 每次推送一個(gè)船万,消費(fèi)成功后再推送下一個(gè)
* b籽慢、channel.basicConsume(QUEUE_NAME, false, defaultConsumer); 改為手動(dòng)應(yīng)答模式
* c犯助、channel.basicAck(envelope.getDeliveryTag(),true); 手動(dòng)應(yīng)答
* 分別設(shè)置sleepTime開(kāi)啟多線程測(cè)試
*
*/
public class Worker {
private static int sleepTime = 2000;
private static final String QUEUE_NAME = "test1205";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("work:"+sleepTime+"啟動(dòng)");
Connection connection = RabitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println(msg+"--消費(fèi)成功");
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),true);
}
};
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
3.效果示例
推薦閱讀:
<<<消息中間件的核心思想
<<<消息中間件常見(jiàn)問(wèn)題匯總
<<<基于Netty簡(jiǎn)單手寫(xiě)消息中間件思路
<<<消息隊(duì)列常用名詞與中間件對(duì)比
<<<Rabbitmq基礎(chǔ)知識(shí)
<<<Rabbitmq示例之點(diǎn)對(duì)點(diǎn)簡(jiǎn)單隊(duì)列
<<<Rabbitmq示例之發(fā)布訂閱模式
<<<Rabbitmq示例之路由模式Routing
<<<Rabbitmq示例之通配符模式Topics
<<<Rabbitmq示例之RPC模式
<<<Rabbitmq隊(duì)列模式總結(jié)
<<<Rabbitmq如何保證消息不丟失
<<<Springboot利用AmqpTemplate整合Rabbitmq
<<<Rabbitmq如何保證冪等性
<<<Rabbitmq的重試策略
<<<Rabbitmq通過(guò)死信隊(duì)列實(shí)現(xiàn)過(guò)期監(jiān)聽(tīng)
<<<Rabbitmq解決分布式事務(wù)思路
<<<Rabbitmq解決分布式事務(wù)demo
<<<Rabbitmq環(huán)境安裝
<<<Kafka中的專業(yè)術(shù)語(yǔ)都有哪些
<<<Kafka的設(shè)計(jì)原理介紹
<<<Kafka集群如何實(shí)現(xiàn)相互感知
<<<Kafka如何實(shí)現(xiàn)分區(qū)及指定分區(qū)消費(fèi)
<<<Kafka如何保證消息順序消費(fèi)
<<<Kafka如何保證高吞吐量
<<<Kafka集群環(huán)境搭建
<<<RocketMQ架構(gòu)原理
<<<RocketMQ、RabbitMQ和Kafka的對(duì)比
<<<SpringBoot整合RocketMQ示例
<<<RocketMQ保證順序消費(fèi)demo
<<<RocketMQ如何動(dòng)態(tài)擴(kuò)容和縮容
<<<RocketMQ如何解決分布式事務(wù)
<<<RocketMQ單機(jī)版本安裝
<<<RocketMQ集群環(huán)境程序啟用相關(guān)知識(shí)點(diǎn)
<<<RocketMQ單機(jī)做主備實(shí)操
<<<RocketMQ所有配置說(shuō)明