鏈路壓測是一種常見的壓測手段,可以測試出系統(tǒng)凿宾,鏈路的性能瓶頸在哪矾屯。大公司基本都有根據(jù)自己的業(yè)務開發(fā)的整套鏈路壓測的產(chǎn)品。但是基本沒有開源出來初厚,技術細節(jié)都是沒有的件蚕,只是有文章介紹他們的場景和解決方案。本人最近也參與了一個鏈路壓測的項目产禾,把這個項目中的遇到的一些問題和解決寫出來排作,希望給到有需要的人,技術方案并不復雜亚情。
鏈路壓測的操作方式有2中
- 可以部署一套跟生產(chǎn)一樣的服務妄痪,然后錄制線上流量到模擬環(huán)境回放
- 另一種就是直接對線上服務進行壓測,但是不能污染到線上的數(shù)據(jù)
這里只討論第二種方式涉及到的問題
- 既然是鏈路楞件,就有上下游依賴的服務衫生,就需要把壓測請求一直傳遞下去裳瘪,這叫透傳
- 識別到壓測請求后數(shù)據(jù)插到影子庫,影子mongo(mongodb影子庫動態(tài)切換)罪针,影子redis彭羹,mq 消息也要標記識別
說到mq,消息的標記識別需要在發(fā)送端跟消費端做處理站故,也分2種方式
- 要么讓壓測請求跟正常請求都進入到生產(chǎn)的服務器的隊列皆怕,壓測消息發(fā)送端加上標記毅舆,接收端加上識別西篓。
- 另一個就是壓測的消息發(fā)送放到另一個影子隊列里面,跟生產(chǎn)的隊列完全隔離開憋活,消費端監(jiān)聽生產(chǎn)對列跟影子隊列岂津,接收的攔截里面判斷是哪個隊列發(fā)過來的消息,做對應處理
這里針對的都是注解方式使用的rabbitmq悦即,不包括那些硬編碼使用mq發(fā)送接收的
消息接收
@RabbitListener(queues = MqConstant.QUEUE)
public final void onMessage(Message message, Channel channel) throws Exception{
String msg = new String(message.getBody());
System.out.println("是否是壓測消息: " + HeaderThreadLocal.isTestRequest());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
消息發(fā)送
public void sendMq(){
rabbitTemplate.convertAndSend(MqConstant.EXCHANGE, MqConstant.ROUT_KEY, "message-body-" + UUID.randomUUID().toString());
}
1 先來說說第一種
消息的標記識別實際上就是對發(fā)送接收做一個攔截處理吮成,配置2個bean,在bean 的方法里面做攔截的邏輯就可以了
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//消息接收之前加攔截處理辜梳,每次接收消息都會調(diào)用粱甫,是有壓測消息標記的,先存到副本變量作瞄,后續(xù)的操作數(shù)據(jù)庫根據(jù)這個變量進行切換影子庫
factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
Map header = message.getMessageProperties().getHeaders();
//判斷是哪個隊列的消息茶宵,影子隊列的話要動態(tài)切換影子庫跟后續(xù)操作
String queue = message.getMessageProperties().getConsumerQueue();
if (header.containsKey("test")){
HeaderThreadLocal.setIsTestRequest(true);
}
return message;
}
});
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
//發(fā)送之前加一個攔截器,每次發(fā)送都會調(diào)用這個方法宗挥,方法名稱已經(jīng)說明了一切了
rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
if (HeaderThreadLocal.isTestRequest()) {
//攔截邏輯就是如果是壓測請求就加個header標記
message.getMessageProperties().getHeaders().put("test", true);
}
return message;
}
});
return rabbitTemplate;
}
RabbitTemplate 也有 setAfterReceivePostProcessors方法乌庶,但是這個方法對注解方式的接收消息是沒用的,源碼注釋有說明只適用哪種接收消息的方式
2 接下來說說第二種
第二種也很簡單契耿,操作如下
1 在生產(chǎn)的mq服務器上面建好影子隊列
2 用一個對應的影子 routing-key 做好影子隊列與交換器的綁定
- 3 在發(fā)送消息的時候瞒大,如果是正常請求,就直接發(fā)送搪桂,壓測請求就把發(fā)送的routing-key 加上一個后綴對應到影子routing-key
使用一個自定義的RabbitTemplate透敌,重寫里面的convertAndSend方法,每次調(diào)用都判斷踢械,是否要切換routing-key
public class CustomRabbitTemplate extends RabbitTemplate {
public CustomRabbitTemplate(ConnectionFactory connectionFactory) {
super(connectionFactory);
}
@Override
public void convertAndSend(String exchange,
String routingKey,
final Object object) throws AmqpException {
if (HeaderThreadLocal.isTestRequest()){
routingKey = routingKey +"-shadow";
}
super.convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {
CustomRabbitTemplate rabbitTemplate = new CustomRabbitTemplate(factory);
return rabbitTemplate;
}
- 4 消息消費的時候加上攔截酗电,判斷是否是影子隊列的標記放一個到 ThreadLocal里面,后面的操作根據(jù)這個標記來寫數(shù)據(jù)庫或者其他
這一步參考這個bean 的配置 SimpleRabbitListenerContainerFactory
- 5 自動配置里面解析@RabbitListener(queues = MqConstant.QUEUE)這一段的邏輯自動加上影子隊列的監(jiān)聽
找到這個bean RabbitListenerAnnotationBeanPostProcessor 里面的這個方法 resolveQueues
private String[] resolveQueues(RabbitListener rabbitListener) {
String[] queues = rabbitListener.queues();
//修改這里面的邏輯裸燎,加上這2行代碼
String oldQueue = queues[0];
queues = new String[]{oldQueue, oldQueue+"-shadow"};
QueueBinding[] bindings = rabbitListener.bindings();
return result.toArray(new String[result.size()]);
}
那個bean 是spring的代碼顾瞻,這個方法是私有的,很多方法都是私有的德绿,怎么改荷荤? AOP ? 我直接簡單粗暴的把
SimpleRabbitListenerContainerFactory 的所有代碼弄出來到一個新的子類里面退渗,修改這個方法的邏輯,然后注入這個bean 就行了蕴纳。
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
//這個類 CustomRabbitListenerAnnotationBeanPostProcessor 的代碼時全部復制
//RabbitListenerAnnotationBeanPostProcessor的会油,只是加了2行代碼
return new CustomRabbitListenerAnnotationBeanPostProcessor();
}
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
這樣就可以自動加上影子隊列的監(jiān)聽了,一頓操作下來古毛,可以進行測試了翻翩,消息是可以動態(tài)指定發(fā)到對應的隊列的