RabbitMQ-消費(fèi)消息
Address[] addresses = new Address[] {new Address(IP_ADDRESS, PORT)};
/**
* 1.建立連接工廠
*/
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(PASSWORD);
/**
* 網(wǎng)絡(luò)故障自動(dòng)連接恢復(fù)
*/
connectionFactory.setAutomaticRecoveryEnabled(true);
/**
* 2.創(chuàng)建連接 和生產(chǎn)者有一點(diǎn)不同
*/
Connection connection = connectionFactory.newConnection(addresses);
/**
* 3.創(chuàng)建信道
*/
final Channel channel = connection.createChannel();
/**
* 4.設(shè)置客戶端最多接收示被ack的消息個(gè)數(shù)
*/
channel.basicQos(64);
Consumer consumer =new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
System.out.println("接收消息 : "+new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//消息確認(rèn)
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
/**
* 回調(diào)
*/
channel.basicConsume(QUEUR_NAME, consumer);
/**
* 關(guān)閉資源
*/
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
basicConsume方法
String basicConsume(String queue, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
- queue 隊(duì)列名
- autoAck 是否自動(dòng)確認(rèn)消息,true自動(dòng)確認(rèn),false 不自動(dòng)要手動(dòng)調(diào)用,建立設(shè)置為false
//消息確認(rèn)
channel.basicAck(envelope.getDeliveryTag(), false);
- consumerTag 消費(fèi)者標(biāo)簽缓升,用來區(qū)分多個(gè)消費(fèi)者
- noLocal 設(shè)置為true吠各,表示 不能將同一個(gè)Conenction中生產(chǎn)者發(fā)送的消息傳遞給這個(gè)Connection中 的消費(fèi)者
- exclusive 是否排他
- arguments 消費(fèi)者的參數(shù)
- callback 消費(fèi)者 DefaultConsumer建立使用,重寫其中的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
- 重寫的方法
@Override
public void handleConsumeOk(String consumerTag) {
this._consumerTag = consumerTag;
}
@Override
public void handleCancelOk(String consumerTag) {
// no work to do
}
@Override
public void handleCancel(String consumerTag) throws IOException {
// no work to do
}
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
// no work to do
}
@Override
public void handleRecoverOk(String consumerTag) {
// no work to do
}
- handleShutdownSignal方法 當(dāng)Channel與Conenction關(guān)閉的時(shí)候會(huì)調(diào)用欠动,
- handleCancelOk方法會(huì)在其它方法之前調(diào)用永乌,返回消費(fèi)者標(biāo)簽
*handleCancelOk與handleCancel消費(fèi)者可以顯式或者隱式的取水訂單的時(shí)候調(diào)用惑申,也可以通過
channel.basicCancel方法來顯式的取消一個(gè)消費(fèi)者訂閱
會(huì)首先觸發(fā)handleConsumeOk方法,之后觸發(fā)handleDelivery方法翅雏,最后才觸發(fā)handleCancelOk方法
channel.basicAck();確認(rèn)消息
deliveryTag:該消息的index
multiple:是否批量.true:將一次性ack所有小于deliveryTag的消息圈驼。