介紹
RabbitMQ是一種消息中間件璧疗。它的核心思想很簡(jiǎn)單坯辩,接收并傳遞消息。你可以把RabbitMQ想象成郵局:當(dāng)你把信扔進(jìn)信箱后崩侠,你十分確信郵遞員會(huì)準(zhǔn)確的把信交給收件人漆魔。在這個(gè)比喻里,RabbitMQ就是郵箱却音、郵局和郵遞員的集合改抡。
RabbitMQ和郵局主要的區(qū)別在于,RabbitMQ處理的不是紙質(zhì)郵件系瓢,而是二進(jìn)制的數(shù)據(jù)(Messages)
接下來用較為專業(yè)的術(shù)語解釋RabbitMQ以及消息傳遞阿纤。
Producing指的是只做發(fā)送操作,其余什么也不干八拱。自然而然,Producer就是只發(fā)消息的程序阵赠。我們用P指代Producer涯塔。
Queue等同于郵箱的意思,它存在于RabbitMQ當(dāng)中清蚀。當(dāng)消息穿過RabbitMQ到達(dá)你的應(yīng)用程序期間匕荸,它們?nèi)慷急4嬖赒ueue當(dāng)中。Queue的大小沒有限制枷邪,你想存多少就存多少-它基本等同于一個(gè)容量無限的緩存榛搔。大量Producer往里發(fā)送消息,大量Consumer從同一個(gè)隊(duì)列里取消息东揣。我們用個(gè)圖來展示下践惑。
Consuming與接收的含義非常接近。Consumer這類程序主要功能就是接收消息嘶卧。我們畫個(gè)C尔觉。
注意Producer,Consumer以及Broker可能不在同一臺(tái)主機(jī)中芥吟;實(shí)際大多數(shù)情況下侦铜,它們都分布在不同的主機(jī)中。
"Hello World"
Using the Java Client
在這部分內(nèi)容中钟鸵,我們會(huì)寫兩個(gè)JAVA程序钉稍;
第一個(gè)如下:
Producer發(fā)送一條消息,Consumer接收消息并打印棺耍。讓我們暫時(shí)忽視JAVA API的實(shí)現(xiàn)細(xì)節(jié)贡未,集中注意力從簡(jiǎn)單的調(diào)用開始,發(fā)一條“Hello World”的消息蒙袍。
在下方的圖中俊卤,“P”代表Producer,“C”代表Consumer左敌,中間的紅色方塊代表Queue(Rabbit為Consumer持有的消息緩存)
Java版RabbitMQ客戶端
RabbitMQ使用多種協(xié)議瘾蛋。在本部分示例中,采用的是AMQP 0-9-1協(xié)議矫限,它是一種開放的哺哼、多功能的消息傳遞協(xié)議。同時(shí)叼风,RabbitMQ客戶端的實(shí)現(xiàn)語言也種類繁多取董,在這里我們選用JAVA版本。
下載安裝包无宿,檢查簽名茵汰,解壓到你指定的路徑 巴拉巴拉~~~
現(xiàn)在我們開始寫代碼。
Sending
我們稱消息發(fā)送者為Send孽鸡,接收者為Recv蹂午。Send會(huì)連接RabbitMQ栏豺,發(fā)送一條簡(jiǎn)短的消息,然后退出豆胸。
我們需要導(dǎo)入以下class文件
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
建立類文件并且命名隊(duì)列奥洼。
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException {
...
}
}
然后我們創(chuàng)建一個(gè)到服務(wù)器的連接:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Connection掩蓋了套接字實(shí)現(xiàn)的細(xì)節(jié),讓我們能專注于選擇協(xié)議版本和認(rèn)證以及其他重要的事情上晚胡。我們連接本機(jī)的broker灵奖,下文中我們簡(jiǎn)稱為localhost。我們只需要修改IP地址就能簡(jiǎn)單的連上其他主機(jī)上的broker估盘。
接下來我們創(chuàng)建一個(gè)channel瓷患,它包含大量我們常用的API。
為了發(fā)送消息遣妥,我們需要聲明一個(gè)隊(duì)列擅编;然后我們向隊(duì)列中發(fā)送消息。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
聲明隊(duì)列是冪等的燥透,只有當(dāng)隊(duì)列不存在時(shí)沙咏,它才會(huì)被創(chuàng)建(吐槽辨图,不就是單例嘛班套。。故河。)吱韭。消息內(nèi)容是字節(jié)數(shù)組,你可以隨心所以編碼它鱼的。
最后理盆,別忘記關(guān)掉連接。
channel.close();
connection.close();
以上就是整個(gè)Send.java的內(nèi)容凑阶。
發(fā)送無效怎么辦猿规?
如果你第一次使用RabbitMQ并且沒有看見發(fā)送的消息,你肯定會(huì)對(duì)這種不知所措的感覺印象深刻宙橱。也許是broker沒有足夠的磁盤空間(默認(rèn)需要1G)導(dǎo)致拒絕接收消息姨俩。通過檢查broker的日志文件來判斷是否需要降低磁盤需求。 configuration file documentation會(huì)教你怎樣設(shè)置disk_free_limit师郑。
Receving
接下來是reciver环葵,它被RabbitMQ塞入消息。同時(shí)宝冕,reciver實(shí)現(xiàn)起來也比sender復(fù)雜张遭,我們需要它監(jiān)聽消息,直到接收并打印出來地梨。
像寫sender一樣菊卷,這里我們也需要引入很多依賴的代碼缔恳。
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
引入的代碼中,DefaultConsumer是一個(gè)實(shí)現(xiàn)了Consumer接口的類洁闰,我們會(huì)用它來保存RabbitMQ推送的消息褐耳。
類似Sender一樣構(gòu)建代碼;我們打開Connection和Channel,聲明將要消費(fèi)的隊(duì)列渴庆。注意隊(duì)列名稱需要與發(fā)送的隊(duì)列相同铃芦。
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
...
}
}
注意,我們?cè)谶@里也許要聲明隊(duì)列襟雷,是因?yàn)槲覀円苍S在啟動(dòng)sender前就啟動(dòng)了reciver刃滓。我們需要確保在消費(fèi)消息前,隊(duì)列已經(jīng)存在耸弄。
接下來我們要告訴RabbitMQ,讓它把隊(duì)列中的消息發(fā)給我們咧虎。由于它通過異步的方式推送消息,我們?cè)谛问缴舷扔靡粋€(gè)變量保存消息直到我們實(shí)際使用它计呈。這也是DefaultConsumer子類所做的工作砰诵。
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
以上就是整個(gè)Recv.java的代碼。
Putting it all together
你可以將這兩段代碼同rabbitMQ客戶端代碼一起編譯捌显。
$ javac -cp rabbitmq-client.jar Send.java Recv.java
為了運(yùn)行他們茁彭,你需要rabbitmq-client.jar并且它依賴于classpath。在終端上扶歪,運(yùn)行sender:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send
然后理肺,運(yùn)行receiver;
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv
在Windows環(huán)境下善镰,在classpath中用分號(hào)代替冒號(hào)分離它們
當(dāng)receiver從RabbitMQ中獲取消息后會(huì)將其打印出來妹萨。recevier會(huì)一直運(yùn)行并等待新的消息,因此我們?cè)诹硪粋€(gè)終端中啟動(dòng)sender炫欺。
如果你想檢查隊(duì)列乎完,嘗試使用 rabbitmqctl list_queues
hello
是時(shí)候看看Part2,構(gòu)建一個(gè)簡(jiǎn)單的工作隊(duì)列品洛。
小秘密:
為了減少打字树姨,你可以為classpath設(shè)置環(huán)境變量,例如
$ export CP=.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
$ java -cp $CP Send
windows環(huán)境下
> set CP=.;commons-io-1.2.jar;commons-cli-1.1.jar;rabbitmq-client.jar
> java -cp %CP% Send