注:這是RabbitMQ-java版Client的指導(dǎo)教程翻譯系列文章读整,歡迎大家批評指正
第一篇Hello Word了解RabbitMQ的基本用法
第二篇Work Queues介紹隊(duì)列的使用
第三篇Publish/Subscribe介紹轉(zhuǎn)換器以及其中fanout類型
第四篇Routing介紹direct類型轉(zhuǎn)換器
第五篇Topics介紹topic類型轉(zhuǎn)換器
第六篇RPC介紹遠(yuǎn)程調(diào)用
預(yù)備條件
這篇指導(dǎo)教程的前提是已經(jīng)下載了RabbitMQ并且運(yùn)行在本機(jī)上默認(rèn)端口號5672俯萎。如果你使用不同的主機(jī)昭卓,端口號或者相關(guān)認(rèn)證骤星,連接設(shè)置需要做一些調(diào)整。
尋求幫助
如果你在閱讀這個(gè)系列指導(dǎo)教程時(shí)有任何的問題脑慧,可以通過郵件聯(lián)系我們抡驼。
介紹(Introduction)
RabbitMQ是一個(gè)消息中間件:它接受并轉(zhuǎn)發(fā)消息。你可以把它看成是一個(gè)郵局:當(dāng)你把想投遞的郵件放在郵箱中時(shí)晰奖,知道郵遞員終會(huì)把郵件派送給收件人谈撒。這個(gè)比喻中,RabbitMQ是郵箱匾南,郵局和郵遞員啃匿。
RabbitMQ和郵局之間最大的不同是前者不需要處理紙張,就可以接受蛆楞,存儲(chǔ)并且傳發(fā)二進(jìn)制數(shù)據(jù)的消息溯乒。
通常,RabbitMQ和消息傳送會(huì)有一些專業(yè)術(shù)語豹爹。
生產(chǎn)和發(fā)送的意義是一樣的裆悄,一個(gè)應(yīng)用發(fā)送消息就是生產(chǎn)者:
隊(duì)列類似郵局中的郵箱存在于RabbitMQ中,盡管消息是在RabbitMQ和應(yīng)用間傳送臂聋,但消息只存儲(chǔ)在隊(duì)列中光稼。隊(duì)列的大小只受限于主機(jī)的內(nèi)存或者硬盤的大小,本質(zhì)上是有無限大的緩存區(qū)間孩等。許多生產(chǎn)者可以發(fā)送消息到一個(gè)隊(duì)列中艾君,當(dāng)然需要消費(fèi)者也可以從一個(gè)隊(duì)列中接受消息。我們用下列圖形代表隊(duì)列:
消費(fèi)和接受有著同樣的意思肄方,一個(gè)應(yīng)用常在等待接受消息就是消費(fèi)者:
注解:生產(chǎn)者冰垄,消費(fèi)者以及消息中間件并不會(huì)存在于同一個(gè)主機(jī)上,且大部分應(yīng)用確實(shí)也不會(huì)這樣做权她。
"Hello World"
(using the java Client ,我是搞Android開發(fā)的播演,所以語言沒得選:java是當(dāng)今世界上最流行的語言)
在這篇指導(dǎo)教程中,我們將用Java寫兩個(gè)應(yīng)用伴奥,發(fā)送一條簡單消息的生產(chǎn)者写烤,和接受消息并且將消息打印出來的消費(fèi)者。我們將會(huì)省略掉部分Java API的具體細(xì)節(jié)拾徙,專注于開始學(xué)習(xí)最簡單的"Hello World"消息傳遞洲炊。
在下面的圖表中,"P"表示生產(chǎn)者和"C"表示消費(fèi)者,中間的盒子表示隊(duì)列-消費(fèi)者的消息緩存在RabbitMQ中暂衡。
MabbitMQ的java版本客戶端的依賴包(The Java Client library)
RabbitMQ支持多種協(xié)議询微,這篇指導(dǎo)教程中使用AMQP協(xié)議,這是一個(gè)開源狂巢,多用途的消息傳遞協(xié)議撑毛。針對不同的語言,RabbitMQ提供專門的客戶端版本唧领,目前我們使用的是Java版本藻雌。
下載Java Client library并且依賴于SLF4J API和SLF4J Simple,拷貝這些文件到你的工作目錄下斩个,跟其它的java文件一塊放胯杭。
請注意SLF4J Simple只是在指導(dǎo)教程中使用,而在真正的生產(chǎn)項(xiàng)目中受啥,應(yīng)該使用更強(qiáng)大的日志包做个,像Logback。
現(xiàn)在我們有了Java版本的客戶端RabbitMQ和依賴包滚局,可以寫些代碼了居暖。
發(fā)送(Sending)
我們將稱消息發(fā)布者(發(fā)送者)為Send,消息消費(fèi)者(接受者)為Recv藤肢。發(fā)布者將會(huì)連接上服務(wù)端RabbitMQ膝但,發(fā)送一條簡單的消息,然后退出谤草。
在Send.java中跟束,我們需要引入一些類:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
創(chuàng)建類并且給隊(duì)列命名:
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws java.io.IOException {
...
}
}
接著我們連接服務(wù)端:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); //factory可以設(shè)置主機(jī)Ip,端口號丑孩,認(rèn)證信息等連接服務(wù)端
Connection connection = factory.newConnection(); //創(chuàng)建連接
Channel channel = connection.createChannel(); //創(chuàng)建通道
這是抽象的Socket(套接宇)連接方式冀宴,注意協(xié)議版本的差異和驗(yàn)證等等都取決于我們自己。我們連接到本機(jī)的RabbitMQ上温学,所以才是localhost略贮。如果我們想連接到不同的機(jī)器上的RabbitMQ上,可以簡單說明該機(jī)器的名稱和IP地址仗岖。(下面代碼是我擅自添加的)
factory.setPort(8080);
factory.setUsername("admin");
factory.setPassword("password"):
下一步創(chuàng)建通道(channel),大部分的事情都是在這里處理逃延。
我們必須先聲明發(fā)送消息去的隊(duì)列,然后發(fā)送消息到隊(duì)列中:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
聲明隊(duì)列是很重要的轧拄,它不存在時(shí)才會(huì)被創(chuàng)建揽祥,消息體是字節(jié)數(shù)組類型,因此可以在這里編碼你需要的類型檩电。
最后拄丰,關(guān)閉通道和連接:
channel.close();
connection.close();
這里是Send.java的源碼(這么簡答的代碼府树,不想貼)
發(fā)送失敗
如果這是你第一次使用RabbitMQ,并且你沒有看到發(fā)送的消息料按,可能傷腦筋了:哪里出了問題奄侠?可能是消息中間件開始的時(shí)候可用硬盤空間不足(默認(rèn)的至少剩余200MB),因此會(huì)拒絕接收消息载矿。查看消息中間件的日志文件和如果有必要的話減少這些限制垄潮。這篇配置文檔將會(huì)告訴你如何去設(shè)置硬盤剩余空間的限制。
接受(Receiving)
發(fā)布者相對應(yīng)的就是我們的接受者闷盔,接受者是接受從RabbitMQ推送過來的消息弯洗,而不像發(fā)布者是發(fā)布消息到RabbitMQ中。我們設(shè)置了對消息的監(jiān)聽馁筐,并且打印出消息:
Recv.java的引入類和Send.java有三個(gè)是一樣的:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
這個(gè)額外的DefaultConsumer是一個(gè)實(shí)現(xiàn)了Consumer接口的類,我們用來緩存由服務(wù)端推送給接受者的消息坠非。
和發(fā)布者開始的創(chuàng)建是類似的敏沉,打開連接(connection)和通道(channel),并且聲明一條可以消費(fèi)消息的隊(duì)列。注意這個(gè)隊(duì)列是匹配send發(fā)布消息的隊(duì)列:
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
...
}
}
請注意消費(fèi)者也聲明了隊(duì)列炎码,因?yàn)槲覀兛梢栽趧?chuàng)建發(fā)布者之前先創(chuàng)建消費(fèi)者盟迟。我們像確保這些隊(duì)列已經(jīng)存在了,然后就可以從隊(duì)列中消費(fèi)消息潦闲。
將要告訴 服務(wù)端要從隊(duì)列中分發(fā)消費(fèi)者的消息攒菠,然后就會(huì)異步的推送消息給消費(fèi)者。我們提供了一個(gè)callBack的表單對象用于緩存消息直到消費(fèi)者已經(jīng)獲取到它們歉闰。這個(gè)就是DefaultConsumer子類的工作:
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
綜合
你可以在RabbitMQ java客戶端編譯這些類:
javac -cp amqp-client-4.0.2.jar Send.java Recv.java
在一個(gè)終端上運(yùn)行消費(fèi)者辖众,你需要rabbitmq-client.jar和一些依賴:
java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Recv
接著運(yùn)行生產(chǎn)者:
java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Send
在Windows系統(tǒng)上,使用分好代替冒號去分割每一個(gè)條目和敬。
通過RabbitMQ凹炸,消費(fèi)者將會(huì)打印出從生產(chǎn)者接受的消息,并且一直運(yùn)行等待著接受消息(可以使用Ctrl +C去停止運(yùn)行)昼弟,因此可以嘗試從另外一個(gè)終端來運(yùn)行生產(chǎn)者啤它。
第一節(jié)的內(nèi)容大致翻譯完了接奈,這里是原文的鏈接捉兴。接著進(jìn)入下一節(jié):Work Queues。
終篇是我對RabbitMQ使用理解的總結(jié)文章肢娘,歡迎討教芭逝。
--謝謝--