1炕倘、聲明隊(duì)列
result = channel.queue_declare(queue='', exclusive=True)
2、聲明exchange
exchange_type 類型: direct, topic, headers fanout
路由模式使用:direct 準(zhǔn)確匹配route_key路由
發(fā)布訂閱使用:fanout 綁定的所有隊(duì)列路由
topic模式使用:topic *窥摄,#模糊配置到的隊(duì)列路由
channel.exchange_declare(exchange='logs',
exchange_type='fanout')
4俺叭、綁定消息隊(duì)列:將隊(duì)列與exchange和route綁定,發(fā)送消息時(shí)通過兩者路由到對(duì)應(yīng)的隊(duì)列
注意:
1畦娄、當(dāng)exchange為空時(shí)谐算,直接路由隊(duì)列
2熟尉、當(dāng)exchange為direct時(shí),準(zhǔn)確匹配對(duì)應(yīng)隊(duì)列路由隊(duì)列
3洲脂、當(dāng)exchange為fanout時(shí)斤儿,路由綁定的所有隊(duì)列
4、當(dāng)exchange為topic 時(shí)恐锦, *往果、#模糊配置到的隊(duì)列路由
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
5、發(fā)送消息:通過exchange一铅,轉(zhuǎn)route到對(duì)應(yīng)的queue陕贮,若exchage未傳將route作為隊(duì)列名發(fā)送
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
6、消費(fèi)消息:消費(fèi)消息只針對(duì)相應(yīng)的隊(duì)列潘飘,主要控制發(fā)送的exchange和route
hannel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
7肮之、rpc模式
(1) RPCServer
import com.rabbitmq.client.*;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized (monitor) {
monitor.notify();
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
(2)RPCClient
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient implements AutoCloseable {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
}
public static void main(String[] argv) {
try (RPCClient fibonacciRpc = new RPCClient()) {
for (int i = 0; i < 32; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}
public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
String result = response.take();
channel.basicCancel(ctag);
return result;
}
public void close() throws IOException {
connection.close();
}
}