最近的一個(gè)計(jì)費(fèi)項(xiàng)目段磨,在rpc調(diào)用和流式處理之間徘徊了許久镇饮,后來選擇流式處理蜓竹。一是可以增加吞吐量,二是事務(wù)的控制相比于rpc要容易很多。
確定了流式處理的方式俱济,后續(xù)是技術(shù)的選型嘶是。剛開始傾向于用storm,無奈文檔實(shí)在太少蛛碌,折騰起來著實(shí)費(fèi)勁聂喇。最終放棄,改用消息隊(duì)列+微服務(wù)的方式實(shí)現(xiàn)左医。
消息隊(duì)列的選型上授帕,有activemq同木,rabbitmq浮梢,kafka等。最開始傾向于用activemq彤路,因?yàn)橐郧暗捻?xiàng)目用過秕硝,很多代碼都是可直接復(fù)用的。后來看了不少文章對(duì)比洲尊,發(fā)現(xiàn)rabbitmq對(duì)多語言的支持更好一點(diǎn)远豺,同時(shí)相比于kafka,犧牲了部分的性能換取了更好的穩(wěn)定性安全性以及持久化坞嘀。
最終決定使用rabbitmq躯护。
rabbitmq的官網(wǎng)如下:
對(duì)rabbitmq的封裝,有幾個(gè)目標(biāo):
1 提供send接口
2 提供consume接口
3 保證消息的事務(wù)性處理
所謂事務(wù)性處理丽涩,是指對(duì)一個(gè)消息的處理必須嚴(yán)格可控棺滞,必須滿足原子性,只有兩種可能的處理結(jié)果:
(1) 處理成功矢渊,從隊(duì)列中刪除消息
(2) 處理失敗(網(wǎng)絡(luò)問題继准,程序問題,服務(wù)掛了)矮男,將消息重新放回隊(duì)列
為了做到這點(diǎn)移必,我們使用rabbitmq的手動(dòng)ack模式,這個(gè)后面細(xì)說毡鉴。
1 send接口
public interface MessageSender {
DetailRes send(Object message);
}
send接口相對(duì)簡單崔泵,我們使用spring的RabbitTemplate來實(shí)現(xiàn),代碼如下:
//1 構(gòu)造template, exchange, routingkey等
//2 設(shè)置message序列化方法
//3 設(shè)置發(fā)送確認(rèn)
//4 構(gòu)造sender方法
public MessageSender buildMessageSender(final String exchange, final String routingKey, final String queue) throws IOException, TimeoutException {
Connection connection = connectionFactory.createConnection();
//1
buildQueue(exchange, routingKey, queue, connection);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setExchange(exchange);
rabbitTemplate.setRoutingKey(routingKey);
//2
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//3
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.info("send message failed: " + cause); //+ correlationData.toString());
throw new RuntimeException("send error " + cause);
}
}
});
//4
return new MessageSender() {
@Override
public DetailRes send(Object message) {
try {
rabbitTemplate.convertAndSend(message);
} catch (RuntimeException e) {
e.printStackTrace();
log.info("send failed " + e);
try {
//retry
rabbitTemplate.convertAndSend(message);
} catch (RuntimeException error) {
error.printStackTrace();
log.info("send failed again " + error);
return new DetailRes(false, error.toString());
}
}
return new DetailRes(true, "");
}
};
}
2 consume接口
public interface MessageConsumer {
DetailRes consume();
}
在consume接口中猪瞬,會(huì)調(diào)用用戶自己的MessageProcess憎瘸,接口定義如下:
public interface MessageProcess<T> {
DetailRes process(T message);
}
consume的實(shí)現(xiàn)相對(duì)來說復(fù)雜一點(diǎn),代碼如下:
//1 創(chuàng)建連接和channel
//2 設(shè)置message序列化方法
//3 構(gòu)造consumer
public <T> MessageConsumer buildMessageConsumer(String exchange, String routingKey,
final String queue, final MessageProcess<T> messageProcess) throws IOException {
final Connection connection = connectionFactory.createConnection();
//1
buildQueue(exchange, routingKey, queue, connection);
//2
final MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
final MessageConverter messageConverter = new Jackson2JsonMessageConverter();
//3
return new MessageConsumer() {
QueueingConsumer consumer;
{
consumer = buildQueueConsumer(connection, queue);
}
@Override
//1 通過delivery獲取原始數(shù)據(jù)
//2 將原始數(shù)據(jù)轉(zhuǎn)換為特定類型的包
//3 處理數(shù)據(jù)
//4 手動(dòng)發(fā)送ack確認(rèn)
public DetailRes consume() {
QueueingConsumer.Delivery delivery = null;
Channel channel = consumer.getChannel();
try {
//1
delivery = consumer.nextDelivery();
Message message = new Message(delivery.getBody(),
messagePropertiesConverter.toMessageProperties(delivery.getProperties(), delivery.getEnvelope(), "UTF-8"));
//2
@SuppressWarnings("unchecked")
T messageBean = (T) messageConverter.fromMessage(message);
//3
DetailRes detailRes = messageProcess.process(messageBean);
//4
if (detailRes.isSuccess()) {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} else {
log.info("send message failed: " + detailRes.getErrMsg());
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
return detailRes;
} catch (InterruptedException e) {
e.printStackTrace();
return new DetailRes(false, "interrupted exception " + e.toString());
} catch (IOException e) {
e.printStackTrace();
retry(delivery, channel);
log.info("io exception : " + e);
return new DetailRes(false, "io exception " + e.toString());
} catch (ShutdownSignalException e) {
e.printStackTrace();
try {
channel.close();
} catch (IOException io) {
io.printStackTrace();
} catch (TimeoutException timeout) {
timeout.printStackTrace();
}
consumer = buildQueueConsumer(connection, queue);
return new DetailRes(false, "shutdown exception " + e.toString());
} catch (Exception e) {
e.printStackTrace();
log.info("exception : " + e);
retry(delivery, channel);
return new DetailRes(false, "exception " + e.toString());
}
}
};
}
3 保證消息的事務(wù)性處理
rabbitmq默認(rèn)的處理方式為auto ack撑螺,這意味著當(dāng)你從消息隊(duì)列取出一個(gè)消息時(shí)含思,ack自動(dòng)發(fā)送,mq就會(huì)將消息刪除。而為了保證消息的正確處理含潘,我們需要將消息處理修改為手動(dòng)確認(rèn)的方式饲做。
(1) sender的手工確認(rèn)模式
首先將ConnectionFactory的模式設(shè)置為publisherConfirms,如下
connectionFactory.setPublisherConfirms(true);
之后設(shè)置rabbitTemplate的confirmCallback遏弱,如下:
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.info("send message failed: " + cause); //+ correlationData.toString());
throw new RuntimeException("send error " + cause);
}
}
});
(2) consume的手工確認(rèn)模式
首先在queue創(chuàng)建中指定模式
channel.exchangeDeclare(exchange, "direct", true, false, null);
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
channel.queueDeclare(queue, true, false, false, null);
只有在消息處理成功后發(fā)送ack確認(rèn)盆均,或失敗后發(fā)送nack使信息重新投遞
if (detailRes.isSuccess()) {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} else {
log.info("send message failed: " + detailRes.getErrMsg());
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
4 自動(dòng)重連機(jī)制
為了保證rabbitmq的高可用性,我們使用rabbitmq Cluster模式漱逸,并配合haproxy泪姨。這樣,在一臺(tái)機(jī)器down掉時(shí)或者網(wǎng)絡(luò)發(fā)生抖動(dòng)時(shí)饰抒,就會(huì)發(fā)生當(dāng)前連接失敗的情況肮砾,如果不對(duì)這種情況做處理,就會(huì)造成當(dāng)前的服務(wù)不可用袋坑。
在spring-rabbitmq中仗处,已實(shí)現(xiàn)了connection的自動(dòng)重連,但是connection重連后枣宫,channel的狀態(tài)并不正確婆誓。因此我們需要自己捕捉ShutdownSignalException異常,并重新生成channel也颤。如下:
catch (ShutdownSignalException e) {
e.printStackTrace();
channel.close();
//recreate channel
consumer = buildQueueConsumer(connection, queue);
}
5 consumer線程池
在對(duì)消息處理的過程中洋幻,我們期望多線程并行執(zhí)行來增加效率,因此對(duì)consumer做了一個(gè)線程池的封裝翅娶。
線程池通過builder模式構(gòu)造文留,需要準(zhǔn)備如下參數(shù):
//線程數(shù)量
int threadCount;
//處理間隔(每個(gè)線程處理完成后休息的時(shí)間)
long intervalMils;
//exchange及queue信息
String exchange;
String routingKey;
String queue;
//用戶自定義處理接口
MessageProcess<T> messageProcess;
核心循環(huán)也較為簡單,代碼如下:
public void run() {
while (!stop) {
try {
//2
DetailRes detailRes = messageConsumer.consume();
if (infoHolder.intervalMils > 0) {
try {
Thread.sleep(infoHolder.intervalMils);
} catch (InterruptedException e) {
e.printStackTrace();
log.info("interrupt " + e);
}
}
if (!detailRes.isSuccess()) {
log.info("run error " + detailRes.getErrMsg());
}
} catch (Exception e) {
e.printStackTrace();
log.info("run exception " + e);
}
}
}
6 使用示例
最后故觅,我們還是用一個(gè)例子做結(jié)厂庇。
(1) 定義model
//參考lombok
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserMessage {
int id;
String name;
}
(2) rabbitmq配置
配置我們使用@Configuration實(shí)現(xiàn),如下:
@Configuration
public class RabbitMQConf {
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPublisherConfirms(true); // enable confirm mode
return connectionFactory;
}
}
(3) sender示例
@Service
public class SenderExample {
private static final String EXCHANGE = "example";
private static final String ROUTING = "user-example";
private static final String QUEUE = "user-example";
@Autowired
ConnectionFactory connectionFactory;
private MessageSender messageSender;
@PostConstruct
public void init() throws IOException, TimeoutException {
MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
messageSender = mqAccessBuilder.buildMessageSender(EXCHANGE, ROUTING, QUEUE);
}
public DetailRes send(UserMessage userMessage) {
return messageSender.send(userMessage);
}
}
(4) MessageProcess(用戶自定義處理接口)示例输吏,本例中我們只是簡單的將信息打印出來
public class UserMessageProcess implements MessageProcess<UserMessage> {
@Override
public DetailRes process(UserMessage userMessage) {
System.out.println(userMessage);
return new DetailRes(true, "");
}
}
(5) consumer示例
@Service
public class ConsumerExample {
private static final String EXCHANGE = "example";
private static final String ROUTING = "user-example";
private static final String QUEUE = "user-example";
@Autowired
ConnectionFactory connectionFactory;
private MessageConsumer messageConsumer;
@PostConstruct
public void init() throws IOException, TimeoutException {
MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
messageConsumer = mqAccessBuilder.buildMessageConsumer(EXCHANGE, ROUTING, QUEUE, new UserMessageProcess());
}
public DetailRes consume() {
return messageConsumer.consume();
}
}
(6) 線程池consumer示例
在main函數(shù)中权旷,我們使用一個(gè)獨(dú)立線程發(fā)送數(shù)據(jù),并使用線程池接收數(shù)據(jù)贯溅。
@Service
public class PoolExample {
private static final String EXCHANGE = "example";
private static final String ROUTING = "user-example";
private static final String QUEUE = "user-example";
@Autowired
ConnectionFactory connectionFactory;
private ThreadPoolConsumer<UserMessage> threadPoolConsumer;
@PostConstruct
public void init() {
MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
MessageProcess<UserMessage> messageProcess = new UserMessageProcess();
threadPoolConsumer = new ThreadPoolConsumer.ThreadPoolConsumerBuilder<UserMessage>()
.setThreadCount(Constants.THREAD_COUNT).setIntervalMils(Constants.INTERVAL_MILS)
.setExchange(EXCHANGE).setRoutingKey(ROUTING).setQueue(QUEUE)
.setMQAccessBuilder(mqAccessBuilder).setMessageProcess(messageProcess)
.build();
}
public void start() throws IOException {
threadPoolConsumer.start();
}
public void stop() {
threadPoolConsumer.stop();
}
public static void main(String[] args) throws IOException {
ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml");
PoolExample poolExample = ac.getBean(PoolExample.class);
final SenderExample senderExample = ac.getBean(SenderExample.class);
poolExample.start();
new Thread(new Runnable() {
int id = 0;
@Override
public void run() {
while (true) {
senderExample.send(new UserMessage(id++, "" + System.nanoTime()));
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
7 github地址拄氯,路過的幫忙點(diǎn)個(gè)星星,謝謝_它浅。
附:
rabbitmq安裝過程:
mac版安裝可以使用homebrew译柏。brew install就可以,安裝好之后通過brew services start rabbitmq啟動(dòng)服務(wù)姐霍。通過
就可以在頁面端看到rabbitmq了鄙麦,如下:
have fun