文章參考:Rabbit實戰(zhàn)指南
發(fā)送消息
? 如果要發(fā)送一個消息,可以使用Channel類的basicPublish方法武鲁,比如發(fā)送一條內(nèi)容為“Hello World”的消息蝠检,參考如下:
byte[] messageBodyBytes = "Hello,world!".getBytes();
channel.basicPublish(exchangeName,routingKey,null,messageBodyBytes);
為了更好地可控制發(fā)送叹谁,可以使用mandatory這個參數(shù),或者可以發(fā)送一些特定屬性的信息:
channel.basicPublish(exchangeName,routingKey,mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
下面這行代碼發(fā)送了一條消息憔涉,這條消息的投遞模式(delivery mode)設(shè)置為2析苫,即消息會被持久化(存入磁盤中)在服務(wù)器中衩侥。同時這條消息的優(yōu)先級(priority)設(shè)置為1,content-type為“text/plain”跪但÷臀可以自己設(shè)定消息的屬性:
channel.basicPublish(exchangeName,routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("hidden")
.build(),
messageBodyBytes
);
也可以發(fā)送一條帶有headers的消息:
Map<String,Object> headers = new HashMap<>();
headers.put("localtion","here");
headers.put("time","today");
channel.basicPublish(exchangeName,routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build(),
messageBodyBytes
);
也可以發(fā)送一條帶有過期時間(expiration)的消息
channel.basicPublish(exchangeName,routingKey,
new AMQP.BasicProperties.Builder()
.expiration("6000")
.build(),
messageBodyBytes
);
basicPublish的重載方法:
void basicPublish(String exchange,String routingKey,
BasicProperties props,
byte[] body) throws IOException;
void basicPublish(String exchange,String routingKey,
boolean mandatory,
BasicProperties props,
byte[] body) throws IOException;
void basicPublish(String exchange,String routingKey,
boolean mandatory,boolean imediate,
BasicProperties props,
byte[] body) throws IOException;
具體參數(shù)解釋如下:
- exchange:交換器的名稱被环,指明消息需要發(fā)送到哪個交換器中蛤售。如果設(shè)置為空字符串悴能,則消息會發(fā)送到RabbitMQ默認(rèn)的交換器中雳灾。
- routingKey:路由鍵,交換器根據(jù)路由鍵將消息存儲到相應(yīng)的隊列炒嘲。
- props:消息的基本屬性集匈庭,其包含14個屬性成員阱持,分別有contentType、contentEncoding、header(Map<Stirng,Object>)蒜绽、deliveryMode躲雅、priority骡和。correlationId、replyTo慰于、expiration东囚、messageId、timestamp桨嫁、type、urserId璃吧、appId畜挨、clusterId噩凹。
- byte[] body:消息體(payload),真正需要發(fā)送的消息逮刨。
- mandatory和immediate
消費消息
RabbitMQ的消費模式分為兩種:推(Push)模式和拉(Pull)模式堵泽。推模式采用Basic.Consume進(jìn)行消費,而拉模式則調(diào)用Basic.Get進(jìn)行消費睬愤。
推模式
在推模式中尤辱,可以通過持續(xù)訂閱的方式來消費信息,使用到的相關(guān)類有:
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DeafultConsumer;
接收消息一般通過實現(xiàn)Consumer接口或者繼承DefaultConsumer類來實現(xiàn)啥刻。當(dāng)調(diào)用與Consumer類來實現(xiàn)可帽。當(dāng)調(diào)用與Consumer相關(guān)的API方法窗怒,不同的訂閱采用不同的消費者標(biāo)簽來區(qū)分彼此,在同一個Channel中的消費者也需要通過唯一的消費者標(biāo)簽以作區(qū)別努隙。代碼如下:
boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(queueName,autoAck,"myConsumerTag",
new DefaultConsumer(channel){
@Override
public void handleDevlivery(String consumerTag,
Envelop envelope,
AMQP.BasicProperties properties,
byte[] bode
)throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
//(process the message components here ...)
channel.basicAck(deliveryTag,false);
}
}
)
上面代碼顯式地設(shè)置autoAck為false荸镊,然后在接受到消息之后進(jìn)行顯式ack操作(channel.basicAck)躬存,對應(yīng)消費者來說這個設(shè)置是非常必要的舀锨,可以防止消息不必要的丟失。
? Channel類中basicConsume方法有如下幾種形式:
String basicConsume(String queue,Consumer callback) throws IOException
String basicConsume(String queue,boolean autoAck,Consumer callback) throws IOException
String basicConsume(String queue,boolean autoAck,Map<String,Object> arguments,Consumer callback) throws IOException
String basicConsume(String queue,boolean autoAck,String consumerTag,Consumer callback) throws IOException
-
String basicConsume(String queue,boolean autoAck,String consumerTag,boolean noLocal,boolean exclusive,Map<String,Object> arguments,Consumer callback) throws IOException
對應(yīng)參數(shù)說明:
- queue:隊列的名稱
- autoAck:設(shè)置是否自動確認(rèn)盾剩。建議設(shè)置為false
- consumerTag:消費者標(biāo)簽告私,用來區(qū)分多個消費者德挣;
- noLocal:設(shè)置為true則表示不能將同一個Connection中生產(chǎn)者發(fā)送的消息傳送給這個Connection中的消費者
- exclusive:設(shè)置是否排他快毛;
- arguments:設(shè)置消費者的其他參數(shù)
- callback:設(shè)置消費者的回調(diào)函數(shù)。用來處理RabbitMQ推送過來的消息屯掖,比如DefaultConsumer襟衰,使用時需要客戶端重寫其中的方法。
對于消費者客戶端來說绍坝,重寫handleDelivery方法是十分方便的。更復(fù)雜的消費者客戶端會重寫更多的方法椎咧,具體如下:
void handleConsumeOk(String consumerTag); void handleCancelOk(String consumerTag); void handleCancel(String consumerTag) throws IOException; void handleShutdownSignal(String consumerTag,ShutdowSignalException sig); void handleRecoverOk(String consumerTag);
比如handleShutdownSignal方法把介,當(dāng)Channel或者Connection關(guān)閉的時候回調(diào)用。再者脚牍,handleConsumeOk方法會在其他方法之前調(diào)用巢墅,返回消費者標(biāo)簽。
重寫handleCancelOk和handleCancel方法驯遇,這樣消費端可以再顯示地或者隱式地取消訂閱的時候調(diào)用妹懒。也可以通過channel.basicCancel方法來顯式地取消一個消費者的訂閱:
?
channel.basicCancel(consumerTag)
注意上面這行代碼會首先觸發(fā)handleConsumerOk方法双吆,之后觸發(fā)handleDelivery方法,最后才觸發(fā)handleCanceOk方法匾竿。
和生產(chǎn)者一樣岭妖,消費者客戶端同樣需要考慮線程安全的問題反璃。消費者客戶端的這些callback會被分配到與Channel不同的線程池上,這意味著消費者客戶端可以安全地調(diào)用這些阻塞方法斋攀,如channe.queueDeclare梧田、channel.basicCancel等。
每個Channel都擁有自己獨立的線程鹉梨。最常用的做法是一個Channel對應(yīng)一個消費者。意味著消費者彼此之間沒有任何關(guān)聯(lián)晌坤。也可以在一個Channel中維持多個消費者泡仗,但是要注意一個問題猜憎,如果Channel中的一個消費者一直在運行胰柑,那么其他消費者的callback會被“耽擱”爬泥。
拉模式
通過channel.basicGet方法可以單條的獲取消息,其返回值是GetResponse踩官。Channel類的basicGet方法沒有其他重載方法境输,只有:
GetResponse basicGet(String queue,boolean autoAck) throws IOException;
其中queue代表隊列名稱嗅剖,如果設(shè)置autoAck為false信粮,那么同樣需要調(diào)用channel.basicAck來確認(rèn)消息被成功接收黔攒。
拉模式的關(guān)鍵代碼如下所示:
GetResponse response = channel.basicGet(QUEUE_NAME,false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
注意:
Basic.Consume將信道(Channel)置為接收模式督惰,知道取消隊列的訂閱為止赏胚。在接收模式期間辞友,RabbitMQ會不斷地推送消息給消費者,當(dāng)然推送消息的個數(shù)還是會受到Basic.Qos的限制戳晌。
如果只想從隊列獲取單挑消息而不是持續(xù)訂閱沦偎,建議使用Basic.Get進(jìn)行消費。但不能將Basic.Get放在一個循環(huán)里來代替Basic.Consume,這樣做會嚴(yán)重影響RabbitMQ的性能豪嚎。消費者理應(yīng)使用Basic.Consume方法侈询。