1.發(fā)送消息
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;
exchange:交換器的名稱
routingKey:路由鍵
props:消息的基本屬性集:messageProperties.MINIMAL_BASIC
messageProperties.MINIMAL_PERSISTENT_BASIC
messageProperties.BASIC
messageProperties.TEXT_PLAIN
messageProperties.PERSISTENT_TEXT_PLAIN
byte[] body:消息體,需要發(fā)送的內(nèi)容
示例:
String message = "hello!";
channel.basicPublish(EXCHANGE_NAME, "item.insert", MessageProperties.TEXT_PLAIN, message.getBytes());
上行代碼向EXCHANGE_NAME交換器發(fā)送了一條非持久化的String類型的消息蕊玷。
2.消費(fèi)消息
RabbitMQ的消費(fèi)模式分兩種:推模式和拉模式。推模式采用basicConsume進(jìn)行消費(fèi)逢渔,拉模式通過basicGet進(jìn)行消費(fèi)
2.1.推模式
在推模式中,可以通過持續(xù)訂閱的方式獲得消息斋射。
Channel類中basicConsume重載形式:
String basicConsume(String queue, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
參數(shù)說明:
queue:隊(duì)列名稱
autoAck:設(shè)置是否自動(dòng)確認(rèn)晰洒。建議設(shè)成false。
consumerTag:消費(fèi)者標(biāo)簽崭庸,用來區(qū)分多個(gè)消費(fèi)者
noLocal:設(shè)置為true,則表示不能將同一個(gè)Connection中生產(chǎn)者發(fā)送的消息傳遞給這個(gè)Connection中的消費(fèi)者。
exlusive:設(shè)置是否排他怕享。
arguments:設(shè)置消費(fèi)者的其他參數(shù)
callback:設(shè)置消費(fèi)者的回調(diào)函數(shù)执赡。用來處理RabbitMQ推送過來的消息。比如DefaultConsumer函筋,使用時(shí)沙合,需要客戶端重寫其中的方法。對(duì)于消費(fèi)者客戶端來說跌帐,重寫handleDelivery方法是十分方便的首懈,更復(fù)雜的消費(fèi)者需要重寫更多的方法。
使用示例:
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息谨敛,并且處理究履,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候佣盒,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
System.out.println(" [消費(fèi)者2] received : " + msg + "!");
}
};
// 監(jiān)聽隊(duì)列挎袜,自動(dòng)ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
首先我們聲明了一個(gè)DefaultConsumer的子類,并且重寫了它的handleDelivery方法肥惭。
2.2.拉模式
通過channel.baiscGet方法可以單條地獲取消息,其返回值是GetResponse紊搪。沒有方法重載蜜葱,只有:
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
queue:隊(duì)列名稱
autoAck:是否自動(dòng)確認(rèn),如果是false耀石,則同樣需要channel.basicAck來自動(dòng)確認(rèn)牵囤。
使用實(shí)例:
GetResponse response=channel.basicGet(QUEUE_NAME,true);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);