一 :Work Queues
Work Queues— 工作隊(duì)列 (又稱任務(wù)隊(duì)列) 的主要思想是避免立即執(zhí)行資源密集型任務(wù)管宵,而不得不等待它完成。我們把任務(wù)封裝為消息并將其發(fā)送到隊(duì)列,在后臺(tái)運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)箩朴。當(dāng)有多個(gè)工作線程時(shí)岗喉,這些工作線程將一起處理這些任務(wù)。
輪訓(xùn)分發(fā)消息
在這個(gè)案例中我們會(huì)啟動(dòng)兩個(gè)工作線程炸庞,一個(gè)消息發(fā)送線程钱床,我們來(lái)看看他們兩個(gè)工作線程是如何工作的。
package com.oddfar.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
//得到一個(gè)連接的 channel
public static Channel getChannel() throws Exception {
//創(chuàng)建一個(gè)連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("42.192.149.71");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
package com.oddfar.two;
import com.oddfar.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* 這是一個(gè)工作線程埠居,相當(dāng)于之前的消費(fèi)者
*
* @author zhiyuan
*/
public class Worker01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//消息接受
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:" + receivedMessage);
};
//消息被取消
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
};
System.out.println("C1 消費(fèi)者啟動(dòng)等待消費(fèi).................. ");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
public class Task01 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息發(fā)送完成:" + message);
}
}
}
通過(guò)程序執(zhí)行發(fā)現(xiàn)生產(chǎn)者總共發(fā)送 4 個(gè)消息查牌,消費(fèi)者 1 和消費(fèi)者 2 分別分得兩個(gè)消息,并且是按照有序的一個(gè)接收一次消息滥壕。這就是輪訓(xùn)的效果
另外:輪詢分發(fā)并不是串行的輪詢分發(fā)纸颜,假設(shè)生產(chǎn)者連續(xù)發(fā)1 - 10 這10個(gè)消息,消費(fèi)者A會(huì)消費(fèi)1,3,5,7,9绎橘,消費(fèi)者2會(huì)消費(fèi)到2,4,6,8,10 胁孙。消費(fèi)者1比消費(fèi)者2的消費(fèi)速度快很多,當(dāng)消費(fèi)者1消費(fèi)完1的時(shí)候称鳞,消費(fèi)者2還在消費(fèi)2,此時(shí)消費(fèi)者1并不會(huì)等待消費(fèi)者1完成消息2涮较,消費(fèi)者1會(huì)快速的將3,5,7,9這4個(gè)消息快速消費(fèi)完,然后消費(fèi)者2再慢慢的消費(fèi)4,6,8,10冈止。
此時(shí)消費(fèi)者1它并不會(huì)去救濟(jì)消費(fèi)者2 并不會(huì)去幫消費(fèi)者2消費(fèi)消息
狂票,也就是說(shuō)它不會(huì)向forkJoin這種東西會(huì)動(dòng)態(tài)的去執(zhí)行任務(wù)。
- 按照消費(fèi)者消費(fèi)能力分發(fā)(在消費(fèi)者代碼中聲明)
//設(shè)置權(quán)重分發(fā) (消費(fèi)者速度快的機(jī)器會(huì)消費(fèi)到更多的消息熙暴,消費(fèi)速度慢的機(jī)器會(huì)消費(fèi)到更少的消息闺属,就是能者多勞)
channel.basicQos(1);
- 權(quán)重分發(fā)
也可以按照機(jī)器的能力進(jìn)行分發(fā),和nginx權(quán)重類似
二 消息應(yīng)答 (ack)
消費(fèi)者完成一個(gè)任務(wù)可能需要一段時(shí)間周霉,如果其中一個(gè)消費(fèi)者處理一個(gè)長(zhǎng)的任務(wù)并僅只完成了部分突然它掛掉了掂器,會(huì)發(fā)生什么情況。
RabbitMQ 一旦向消費(fèi)者傳遞了一條消息诗眨,便立即將該消息標(biāo)記為刪除 (自動(dòng)ack的情況)。在這種情況下孕讳,突然有個(gè)消費(fèi)者掛掉了匠楚,我們將丟失正在處理的消息,以及后續(xù)發(fā)送給該消費(fèi)者的消息厂财,因?yàn)樗鼰o(wú)法接收到芋簿。
為了保證消息在發(fā)送過(guò)程中不丟失,引入消息應(yīng)答機(jī)制璃饱,消息應(yīng)答就是:消費(fèi)者在接收到消息并且處理該消息之后与斤,告訴 rabbitmq 它已經(jīng)處理了,rabbitmq 可以把該消息刪除了。(這就是手動(dòng)ack的情況)
public class Task02 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//聲明隊(duì)列
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
Scanner sc = new Scanner(System.in);
System.out.println("請(qǐng)輸入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
//發(fā)布消息
channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("生產(chǎn)者發(fā)出消息" + message);
}
}
}
public class Work03 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1 等待接收消息處理時(shí)間較 短");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接收到消息:" + message);
//todo 三個(gè)處理消息方法的api
/**
* 1.消息標(biāo)記 tag
* 2.是否批量應(yīng)答未應(yīng)答消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
/**
* 第一個(gè)參數(shù):deliveryTag 消息的標(biāo)識(shí)
* 第二個(gè)參數(shù) multiple 是否批量確認(rèn)消息
* 第三個(gè)參數(shù):requeue 被Nack后 是否重新入隊(duì)撩穿,
* 如果是填false 那么就相當(dāng)于消息被消費(fèi)者自己丟棄了磷支,如果是true 那么相當(dāng)于重新入隊(duì),broker會(huì)交給其他的消費(fèi)者來(lái)消費(fèi)這個(gè)消息
* 如果消費(fèi)者僅有一個(gè) 然后Nack后的requeue參數(shù)又是true 那么就相當(dāng)于中國(guó)消息一直會(huì)發(fā)送給這個(gè)消費(fèi)者了食寡,相當(dāng)于消費(fèi)者死循環(huán)的消費(fèi)這個(gè)消息
*/
// channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false,false);
// basicNack和basicReject的區(qū)別僅僅是一個(gè)可以批量一個(gè)不能批量操作而已
// channel.basicReject();
};
CancelCallback cancelCallback = (s) -> {
System.out.println(s + "消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
};
//采用手動(dòng)應(yīng)答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
public class Work04 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1 等待接收消息處理時(shí)間較 長(zhǎng)");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("接收到消息:" + message);
/**
* 1.消息標(biāo)記 tag
* 2.是否批量應(yīng)答未應(yīng)答消息
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = (s) -> {
System.out.println(s + "消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
};
//采用手動(dòng)應(yīng)答
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
最終效果雾狈,如果我們演示了消費(fèi)消息較慢的消費(fèi)者(Work04 )消費(fèi)消息的時(shí)候 將程序斷掉,那么消息相當(dāng)于沒(méi)有給broker確認(rèn)ack抵皱,那么broker會(huì)將本應(yīng)該屬于Work04 的消息重新投遞給其他消費(fèi)者
另外:Nack和Reject的兩種情況在Work03中寫出來(lái)了
- 三 持久化
當(dāng) RabbitMQ 服務(wù)停掉以后善榛,默認(rèn)情況下會(huì)將隊(duì)列和消息都會(huì)刪除,
因此 呻畸,我們?cè)谏a(chǎn)者發(fā)消息的時(shí)候
1:要將隊(duì)列持久化移盆,2: 要將消息進(jìn)行持久化
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//1 設(shè)置隊(duì)列持久化
boolean durable = true;
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
Scanner sc = new Scanner(System.in);
System.out.println("請(qǐng)輸入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
//2 設(shè)置消息持久化
//發(fā)布消息
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN , message.getBytes("UTF-8"));
System.out.println("生產(chǎn)者發(fā)出消息" + message);
}
}