介紹
rabbitmq是一個(gè)消息代理冒掌,它接收和轉(zhuǎn)發(fā)消息歪今,類似一個(gè)郵局萧锉,把你投遞的郵件送給指定收件人。
相關(guān)術(shù)語:
- producing: 消息生產(chǎn)者朗兵,用于發(fā)送消息
- queue: 隊(duì)列污淋,用于存儲消息
- consuming: 消息消費(fèi)者,用于接收消息
HelloWorld
P為生產(chǎn)者余掖,C是消費(fèi)者寸爆,中間的框是隊(duì)列,消息的緩沖區(qū)盐欺。
消息發(fā)送
send.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
// 創(chuàng)建一個(gè)連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個(gè)頻道赁豆,用于復(fù)用連接
Channel channel = connection.createChannel();
// 聲明消息發(fā)送的隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
// 往隊(duì)列中發(fā)出一條消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent'" + message + "'");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
消息接收
Recv.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
// 創(chuàng)建一個(gè)連接
Connection connection = factory.newConnection();
// 創(chuàng)建一個(gè)頻道,用于復(fù)用連接
Channel channel = connection.createChannel();
// 指定發(fā)送的隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
// 往隊(duì)列中發(fā)出一條消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent'" + message + "'");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
Work Queues
在HelloWorld中寫了發(fā)送/接收消息的程序冗美,現(xiàn)在我們創(chuàng)建一個(gè)Work Queues(也稱為Task Queues)魔种,來在多個(gè)耗時(shí)的消息之間分配任務(wù)。
NewTask.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class NewTask {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// durable true 持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 發(fā)送當(dāng)前時(shí)間
String message = String.valueOf(System.currentTimeMillis());
// PERSISTENT_TEXT_PLAIN 持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("[x] Sent'" + message + "'");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
聲明消息持久化后rabbitmq宕機(jī)也能從存儲中恢復(fù)消息粉洼。
Worker.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Worker {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 未回復(fù)消息處理完节预,消息隊(duì)列不會給它發(fā)新的消息
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 手動(dòng)確認(rèn)
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
private static void doWork () throws InterruptedException {
Thread.sleep(1000);
}
}
設(shè)置prefetchCount為1,在沒有處理完一條消息的時(shí)候属韧,消息隊(duì)列不會給它繼續(xù)下發(fā)消息安拟,在它確認(rèn)完消息后,消息隊(duì)列繼續(xù)下發(fā)新消息宵喂。
發(fā)送/訂閱
在上面我們把消息發(fā)給相同的一個(gè)消費(fèi)者糠赦,現(xiàn)在把消息發(fā)送給多個(gè)消費(fèi)者,這種模式稱為發(fā)布訂閱模式。為了演示這種模式愉棱,我們創(chuàng)建一個(gè)簡單的日志記錄系統(tǒng)唆铐,生產(chǎn)者發(fā)出日志,消費(fèi)者接收并打印它們奔滑,發(fā)布的消息將被廣播給所有消費(fèi)者。
Exchange (交換機(jī))
前面簡單的展示了如何接收發(fā)送消息顺少,現(xiàn)在介紹完整的rabbitmq概念朋其。簡單重復(fù)一下前面介紹的內(nèi)容:
- 生產(chǎn)者是發(fā)送消息的程序
- 隊(duì)列是消息的緩沖器
- 消費(fèi)者是接收處理消息的程序
rabbitmq消息模型的核心思想是,生產(chǎn)者從來不會直接發(fā)送消息給一個(gè)隊(duì)列脆炎。又或者說生產(chǎn)者甚至不知道它的消息將會發(fā)送到哪個(gè)隊(duì)列梅猿。
生產(chǎn)者只能發(fā)送消息給一個(gè)交換機(jī)。交換機(jī)是一個(gè)很簡單的概念秒裕。它接收生產(chǎn)者的消息袱蚓,然后推送消息到隊(duì)列中。交換機(jī)必須明確知道自己要對接收到的消息進(jìn)行何種處理: 是推送到特定隊(duì)列几蜻,還是推送到所有的隊(duì)列還是直接丟棄喇潘,這些規(guī)則由交換機(jī)的類型來定義。
有許多可供選擇的交換機(jī)類型: direct梭稚、topic颖低、headers、fanout弧烤。 我詳細(xì)介紹fanout忱屑。創(chuàng)建一個(gè)fanout類型的交換機(jī),稱它為logs暇昂。
EmitLog.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// fanout廣播模式莺戒,會廣播所有接收到的消息給所有它的已知隊(duì)列
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = argv.length < 1 ? "info: Hello World!" :
String.join(" ", argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
ReveiveLogs.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 臨時(shí)隊(duì)列,非持久化的急波,唯一的从铲,斷開連接自動(dòng)刪除的并且隨機(jī)名稱的隊(duì)列
String queueName = channel.queueDeclare().getQueue();
// 綁定隊(duì)列和交換機(jī),告訴交換機(jī)給我們發(fā)送消息幔崖,如果沒有綁定到交換機(jī)上食店,消息會丟失
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}