延遲隊(duì)列
在我們的上一篇文章使用delayedQueue實(shí)現(xiàn)你本地的延遲隊(duì)列
中提到了延遲隊(duì)列的作用.
但是我們知道鹏往,利用delayedQueue實(shí)現(xiàn)的是一個(gè)單機(jī)的,而且是內(nèi)存中的延遲隊(duì)列,他并沒(méi)有一個(gè)集群的支持藏雏,并且需要在對(duì)泵機(jī)的時(shí)候涯塔,消息消費(fèi)異常的時(shí)候做相應(yīng)的邏輯處理。
那么這樣做的話(huà)惯驼,我們需要的工作量還是很大的蹲嚣,有沒(méi)有什么東西是讓我們不做這一部分的工作也能實(shí)現(xiàn)延遲隊(duì)列的功能?
當(dāng)然有了祟牲。答案是:rabbitMq
利用rabbitMq來(lái)實(shí)現(xiàn)延遲隊(duì)列的功能
那么如何利用rabbitMq來(lái)實(shí)現(xiàn)延遲隊(duì)列的功能呢隙畜?
請(qǐng)先注意一點(diǎn),RabbitMQ本身沒(méi)有直接支持延遲隊(duì)列功能说贝,但是可以通過(guò)以下特性模擬出延遲隊(duì)列的功能议惰。那么這是通過(guò)哪些特性呢,那就讓我們來(lái)認(rèn)識(shí)一下這兩個(gè)特性吧.
-
Per-Queue Message TTL
RabbitMQ可以對(duì)消息和隊(duì)列設(shè)置TTL(過(guò)期時(shí)間)。
RabbitMQ針對(duì)隊(duì)列中的消息過(guò)期時(shí)間(Time To Live, TTL)有兩種方法可以設(shè)置乡恕。第一種方法是通過(guò)隊(duì)列屬性設(shè)置言询,隊(duì)列中所有消息都有相同的過(guò)期時(shí)間。第二種方法是對(duì)消息進(jìn)行單獨(dú)設(shè)置傲宜,每條消息TTL可以不同运杭。如果上述兩種方法同時(shí)使用,則消息的過(guò)期時(shí)間以?xún)烧咧gTTL較小的那個(gè)數(shù)值為準(zhǔn)函卒。消息在隊(duì)列的生存時(shí)間一旦超過(guò)設(shè)置的TTL值县习,就成為dead message,消費(fèi)者將無(wú)法再收到該消息。
-
Dead Letter Exchanges
利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信后躁愿,它能被重新publish到另一個(gè)Exchange叛本,這個(gè)Exchange就是DLX。消息變成死信一向有以下幾種情況:
- 消息被拒絕(basic.reject or basic.nack)并且requeue=false
- 消息TTL過(guò)期
- 隊(duì)列達(dá)到最大長(zhǎng)度
DLX也是一下正常的Exchange同一般的Exchange沒(méi)有區(qū)別彤钟,它能在任何的隊(duì)列上被指定来候,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性,當(dāng)這個(gè)隊(duì)列中有死信時(shí)逸雹,RabbitMQ就會(huì)自動(dòng)的將這個(gè)消息重新發(fā)布到設(shè)置的Exchange中去营搅,進(jìn)而被路由到另一個(gè)隊(duì)列,publish可以監(jiān)聽(tīng)這個(gè)隊(duì)列中消息做相應(yīng)的處理梆砸,這個(gè)特性可以彌補(bǔ)RabbitMQ 3.0.0以前支持的immediate參數(shù)中的向publish確認(rèn)的功能转质。
結(jié)合以上兩個(gè)特性,就可以模擬出延遲消息的功能.
基于x-dead-letter-routing-key的單條消息延遲隊(duì)列的java代碼實(shí)現(xiàn)
生產(chǎn)者(發(fā)送)端代碼:
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
//隊(duì)列名稱(chēng)
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception
{
/**
* 創(chuàng)建連接連接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置MabbitMQ所在主機(jī)ip或者主機(jī)名
factory.setHost("localhost");
//創(chuàng)建一個(gè)連接
Connection connection = factory.newConnection();
//創(chuàng)建一個(gè)頻道
Channel channel = connection.createChannel();
//指定一個(gè)隊(duì)列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//發(fā)送的消息
String message = "hello world!"+System.currentTimeMillis();;
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties properties = builder.expiration("2000").deliveryMode(2).build();
//往隊(duì)列中發(fā)出一條消息 這時(shí)候要發(fā)送的隊(duì)列不應(yīng)該是QUEUE_NAME帖世,這樣才能進(jìn)行轉(zhuǎn)發(fā)的
channel.basicPublish("", "DELAY_QUEUE", properties, message.getBytes());
System.out.println(" [x] Sent '" + message + "'" );
//關(guān)閉頻道和連接
channel.close();
connection.close();
}
}
消費(fèi)者(接受)端代碼:
import java.util.HashMap;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Recv {
// 隊(duì)列名稱(chēng)
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// channel.queueBind(QUEUE_NAME, "amq.direct", QUEUE_NAME);
HashMap<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "amq.direct");
arguments.put("x-dead-letter-routing-key", QUEUE_NAME);
channel.queueDeclare("DELAY_QUEUE", true, false, false, arguments);
//聲明隊(duì)列休蟹,主要為了防止消息接收者先運(yùn)行此程序,隊(duì)列還不存在時(shí)創(chuàng)建隊(duì)列日矫。
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//創(chuàng)建隊(duì)列消費(fèi)者
QueueingConsumer consumer = new QueueingConsumer(channel);
//指定消費(fèi)隊(duì)列
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true)
{
//nextDelivery是一個(gè)阻塞方法(內(nèi)部實(shí)現(xiàn)其實(shí)是阻塞隊(duì)列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'"+ "' [當(dāng)前系統(tǒng)時(shí)間戳]" +System.currentTimeMillis());
}
}
}
參考資料
http://www.rabbitmq.com/ttl.html
http://www.rabbitmq.com/dlx.html
https://www.cloudamqp.com/docs/delayed-messages.html
http://www.netfoucs.com/article/xtjsxtj/73636.html#