內(nèi)容來自:RabbitMQ Tutorials Java版
介紹
RabbitMQ是一個消息代理:它接受并轉(zhuǎn)發(fā)消息卦尊。你可以把它當成一個郵局:當你想郵寄信件的時候,你會把信件放在投遞箱中成黄,并確信郵遞員最終會將信件送到收件人的手里。在這個例子中稠炬,RabbitMQ就相當與投遞箱点额、郵局和郵遞員。
RabbitMQ與郵局的區(qū)別在于:RabbitMQ并不處理紙質(zhì)信件洁灵,而是接受、存儲并轉(zhuǎn)發(fā)二進制數(shù)據(jù)---消息掺出。
談到RabbitMQ的消息徽千,通常有幾個術(shù)語:
- 生產(chǎn)者:是指發(fā)送消息的程序
- 隊列:相當于RabbitMQ的投遞箱。盡管消息在RabbitMQ和你的應用之間傳遞汤锨,但是消息僅僅會在隊列之中存儲双抽。隊列只能存儲在內(nèi)存或磁盤中,本質(zhì)上是一個大的消息緩沖區(qū)闲礼。不同的生產(chǎn)者可以發(fā)送消息到同一個對隊列牍汹,不同的消費者也可以從同一個隊列中獲取消息铐维。
- 消費者:等待接受消息的程序。
注意慎菲,生產(chǎn)者嫁蛇、消費者以及RabbitMQ并不一定要在同一個主機上,在絕大部分的應用中它們都不在同一主機上露该。
在開始教程之前睬棚,請確保:你已經(jīng)安裝了RabbitMQ,并且在localhost
上運行起來(默認端口5672)解幼。如果你使用了不同的主機或端口抑党,請在下文中的連接設置中
更改相應的參數(shù)。
一撵摆、Hello World
在這一部分底靠,我們將會使用Java編寫兩個小程序:一個發(fā)送單個消息的生產(chǎn)者、一個接受消息并打印出消息的消費者特铝。這個消息就是Hello World
暑中。
下圖中,P代表生產(chǎn)者苟呐,C代表消費者痒芝,中間紅色的小箱子就代表隊列--RabbitMQ為了讓消費者收到消息而保持的消息緩沖區(qū)。
在這一部分牵素,只需要引入Java客戶端依賴即可:amqp-client.jar严衬,也可以通過maven的方式引入:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
1、生產(chǎn)者
我們將消息的發(fā)布者(生產(chǎn)者)命名為Send笆呆,將消息的消費者命名為Recv请琳。發(fā)布者將會連接到RabbitMQ,并且發(fā)送一條消息赠幕,然后退出俄精。
在Send.java
中,首先引入相關(guān)類:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
再定義隊列的名字:
private final static String QUEUE_NAME = "hello";
然后榕堰,創(chuàng)建一個連接到Rabbit服務器的連接:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
上面的代碼中竖慧,connection是socket連接的抽象,為我們處理了通信協(xié)議版本協(xié)商以及認證等逆屡。這樣圾旨,我們就連接到了本地機器上的一個消息代理(broker)。如果想連接到其他機器上的broker魏蔗,只要修改IP即可砍的。
之后,我們又創(chuàng)建了一個通道(channel)莺治,大部分的API操作均在這里完成廓鞠。
對于Send來說帚稠,必須指明消息要發(fā)到哪個隊列:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
隊列的定義是冪等的,它僅僅在不存在時才會創(chuàng)建床佳。消息的內(nèi)容是一個字節(jié)數(shù)組滋早,所以你可以隨意編碼(encode)。
最后夕土,必須將通道和連接關(guān)閉馆衔。
channel.close();
connection.close();
完整代碼
//引入相關(guān)Class文件
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
//定義隊列名字
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
//創(chuàng)建連接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//為通道指明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
//發(fā)布消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
//關(guān)閉連接
channel.close();
connection.close();
}
}
2、接收者(消費者)
消費者從RabbitMQ中取出消息怨绣。不同于發(fā)布者只發(fā)送一條消息就退出角溃,這里我們讓消費者一直監(jiān)聽消息,并把接受到的消息打印出來篮撑。
與Send.java類似减细,首先引入相關(guān)類:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
上面引入的DefaultConsumer
是Consumer
接口的實現(xiàn)類,我們使用它來緩沖從服務器push來的消息赢笨。
接下來的設置與發(fā)布者類似未蝌,打開連接和通道,聲明我們想消費的隊列茧妒。注意萧吠,這里的隊列的名字要與發(fā)布者中聲明的隊列的名字一致。
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, fasle, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
...
}
}
注意桐筏,消費者同樣聲明了隊列纸型。這是因為,我們可能在啟動生產(chǎn)者之前啟動了消費者應用梅忌,我們想確保在從一個隊列消費消息之前狰腌,這個隊列是存在的。
接下來牧氮,告訴服務器(RabbitMQ)把隊列中的消息發(fā)過來琼腔。因為這個過程是異步的,可以通過DefaultConsumer
來進行回調(diào)踱葛。
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
Consumer的完整代碼如下:
package com.maxwell.rabbitdemo;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
//建立連接和通道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//聲明要消費的隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//回調(diào)消費消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
這樣丹莲,消費者就會一直監(jiān)聽聲明的隊列。運行一次生產(chǎn)者(即Send.java
中的main
方法)尸诽,消費者就會打印出接受到的消息圾笨。
說明
①與原文略有出入,如有疑問逊谋,請參考原文。
②RabbitMQ的官方rabbitmq-tutorials的java示例中土铺,amqp-client
版本為3.5胶滋,我改為了4.1板鬓,否則后續(xù)的示例教程中會報錯說找不到文件。