rabbitmq消息標記攔截處理

鏈路壓測是一種常見的壓測手段,可以測試出系統(tǒng)凿宾,鏈路的性能瓶頸在哪矾屯。大公司基本都有根據(jù)自己的業(yè)務開發(fā)的整套鏈路壓測的產(chǎn)品。但是基本沒有開源出來初厚,技術細節(jié)都是沒有的件蚕,只是有文章介紹他們的場景和解決方案。本人最近也參與了一個鏈路壓測的項目产禾,把這個項目中的遇到的一些問題和解決寫出來排作,希望給到有需要的人,技術方案并不復雜亚情。

鏈路壓測的操作方式有2中

  • 可以部署一套跟生產(chǎn)一樣的服務妄痪,然后錄制線上流量到模擬環(huán)境回放
  • 另一種就是直接對線上服務進行壓測,但是不能污染到線上的數(shù)據(jù)

這里只討論第二種方式涉及到的問題

  • 既然是鏈路楞件,就有上下游依賴的服務衫生,就需要把壓測請求一直傳遞下去裳瘪,這叫透傳
  • 識別到壓測請求后數(shù)據(jù)插到影子庫,影子mongo(mongodb影子庫動態(tài)切換)罪针,影子redis彭羹,mq 消息也要標記識別

說到mq,消息的標記識別需要在發(fā)送端跟消費端做處理站故,也分2種方式

  • 要么讓壓測請求跟正常請求都進入到生產(chǎn)的服務器的隊列皆怕,壓測消息發(fā)送端加上標記毅舆,接收端加上識別西篓。
  • 另一個就是壓測的消息發(fā)送放到另一個影子隊列里面,跟生產(chǎn)的隊列完全隔離開憋活,消費端監(jiān)聽生產(chǎn)對列跟影子隊列岂津,接收的攔截里面判斷是哪個隊列發(fā)過來的消息,做對應處理
這里針對的都是注解方式使用的rabbitmq悦即,不包括那些硬編碼使用mq發(fā)送接收的
消息接收
@RabbitListener(queues = MqConstant.QUEUE)
public final void onMessage(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody());
        System.out.println("是否是壓測消息: " + HeaderThreadLocal.isTestRequest());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
消息發(fā)送
public void sendMq(){
        rabbitTemplate.convertAndSend(MqConstant.EXCHANGE, MqConstant.ROUT_KEY, "message-body-" + UUID.randomUUID().toString());
}

1 先來說說第一種

消息的標記識別實際上就是對發(fā)送接收做一個攔截處理吮成,配置2個bean,在bean 的方法里面做攔截的邏輯就可以了

    @Bean(name = "rabbitListenerContainerFactory")
    @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                                     ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //消息接收之前加攔截處理辜梳,每次接收消息都會調(diào)用粱甫,是有壓測消息標記的,先存到副本變量作瞄,后續(xù)的操作數(shù)據(jù)庫根據(jù)這個變量進行切換影子庫
        factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                Map header = message.getMessageProperties().getHeaders();
                //判斷是哪個隊列的消息茶宵,影子隊列的話要動態(tài)切換影子庫跟后續(xù)操作
                String queue = message.getMessageProperties().getConsumerQueue();
                if (header.containsKey("test")){
                    HeaderThreadLocal.setIsTestRequest(true);
                }
                return message;
            }
        });
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
        //發(fā)送之前加一個攔截器,每次發(fā)送都會調(diào)用這個方法宗挥,方法名稱已經(jīng)說明了一切了
        rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                if (HeaderThreadLocal.isTestRequest()) {
                    //攔截邏輯就是如果是壓測請求就加個header標記
                    message.getMessageProperties().getHeaders().put("test", true);
                }
                return message;
            }
        });
        return rabbitTemplate;
    }

RabbitTemplate 也有 setAfterReceivePostProcessors方法乌庶,但是這個方法對注解方式的接收消息是沒用的,源碼注釋有說明只適用哪種接收消息的方式

2 接下來說說第二種

第二種也很簡單契耿,操作如下

  • 1 在生產(chǎn)的mq服務器上面建好影子隊列


    image.png
  • 2 用一個對應的影子 routing-key 做好影子隊列與交換器的綁定


    image.png
  • 3 在發(fā)送消息的時候瞒大,如果是正常請求,就直接發(fā)送搪桂,壓測請求就把發(fā)送的routing-key 加上一個后綴對應到影子routing-key
使用一個自定義的RabbitTemplate透敌,重寫里面的convertAndSend方法,每次調(diào)用都判斷踢械,是否要切換routing-key
public class CustomRabbitTemplate extends RabbitTemplate {
    public CustomRabbitTemplate(ConnectionFactory connectionFactory) {
        super(connectionFactory);
    }
    @Override
    public void convertAndSend(String exchange,
                               String routingKey,
                               final Object object) throws AmqpException {
        if (HeaderThreadLocal.isTestRequest()){
            routingKey = routingKey +"-shadow";
        }
        super.convertAndSend(exchange, routingKey, object, (CorrelationData) null);
    }
}

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {
        CustomRabbitTemplate rabbitTemplate = new CustomRabbitTemplate(factory);
        return rabbitTemplate;
    }
  • 4 消息消費的時候加上攔截酗电,判斷是否是影子隊列的標記放一個到 ThreadLocal里面,后面的操作根據(jù)這個標記來寫數(shù)據(jù)庫或者其他
這一步參考這個bean 的配置 SimpleRabbitListenerContainerFactory
  • 5 自動配置里面解析@RabbitListener(queues = MqConstant.QUEUE)這一段的邏輯自動加上影子隊列的監(jiān)聽

找到這個bean RabbitListenerAnnotationBeanPostProcessor 里面的這個方法 resolveQueues

private String[] resolveQueues(RabbitListener rabbitListener) {
        String[] queues = rabbitListener.queues();
        //修改這里面的邏輯裸燎,加上這2行代碼
        String oldQueue = queues[0];
        queues = new String[]{oldQueue, oldQueue+"-shadow"};
        QueueBinding[] bindings = rabbitListener.bindings();
        return result.toArray(new String[result.size()]);
}

那個bean 是spring的代碼顾瞻,這個方法是私有的,很多方法都是私有的德绿,怎么改荷荤? AOP ? 我直接簡單粗暴的把
SimpleRabbitListenerContainerFactory 的所有代碼弄出來到一個新的子類里面退渗,修改這個方法的邏輯,然后注入這個bean 就行了蕴纳。

    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
        //這個類 CustomRabbitListenerAnnotationBeanPostProcessor 的代碼時全部復制
        //RabbitListenerAnnotationBeanPostProcessor的会油,只是加了2行代碼
        return new CustomRabbitListenerAnnotationBeanPostProcessor();
    }

    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
    public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }

這樣就可以自動加上影子隊列的監(jiān)聽了,一頓操作下來古毛,可以進行測試了翻翩,消息是可以動態(tài)指定發(fā)到對應的隊列的

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市稻薇,隨后出現(xiàn)的幾起案子嫂冻,更是在濱河造成了極大的恐慌,老刑警劉巖塞椎,帶你破解...
    沈念sama閱讀 212,222評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件桨仿,死亡現(xiàn)場離奇詭異,居然都是意外死亡案狠,警方通過查閱死者的電腦和手機服傍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,455評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來骂铁,“玉大人吹零,你說我怎么就攤上這事±郑” “怎么了灿椅?”我有些...
    開封第一講書人閱讀 157,720評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長名段。 經(jīng)常有香客問我阱扬,道長,這世上最難降的妖魔是什么伸辟? 我笑而不...
    開封第一講書人閱讀 56,568評論 1 284
  • 正文 為了忘掉前任麻惶,我火速辦了婚禮,結果婚禮上信夫,老公的妹妹穿的比我還像新娘窃蹋。我一直安慰自己,他們只是感情好静稻,可當我...
    茶點故事閱讀 65,696評論 6 386
  • 文/花漫 我一把揭開白布警没。 她就那樣靜靜地躺著,像睡著了一般振湾。 火紅的嫁衣襯著肌膚如雪杀迹。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,879評論 1 290
  • 那天押搪,我揣著相機與錄音树酪,去河邊找鬼浅碾。 笑死,一個胖子當著我的面吹牛续语,可吹牛的內(nèi)容都是我干的垂谢。 我是一名探鬼主播,決...
    沈念sama閱讀 39,028評論 3 409
  • 文/蒼蘭香墨 我猛地睜開眼疮茄,長吁一口氣:“原來是場噩夢啊……” “哼滥朱!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起力试,我...
    開封第一講書人閱讀 37,773評論 0 268
  • 序言:老撾萬榮一對情侶失蹤徙邻,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后懂版,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鹃栽,經(jīng)...
    沈念sama閱讀 44,220評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡躏率,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,550評論 2 327
  • 正文 我和宋清朗相戀三年躯畴,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片薇芝。...
    茶點故事閱讀 38,697評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡蓬抄,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出夯到,到底是詐尸還是另有隱情嚷缭,我是刑警寧澤,帶...
    沈念sama閱讀 34,360評論 4 332
  • 正文 年R本政府宣布耍贾,位于F島的核電站阅爽,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏荐开。R本人自食惡果不足惜付翁,卻給世界環(huán)境...
    茶點故事閱讀 40,002評論 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望晃听。 院中可真熱鬧百侧,春花似錦、人聲如沸能扒。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,782評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽初斑。三九已至辛润,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間见秤,已是汗流浹背砂竖。 一陣腳步聲響...
    開封第一講書人閱讀 32,010評論 1 266
  • 我被黑心中介騙來泰國打工灵迫, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人晦溪。 一個月前我還...
    沈念sama閱讀 46,433評論 2 360
  • 正文 我出身青樓瀑粥,卻偏偏與公主長得像,于是被迫代替她去往敵國和親三圆。 傳聞我的和親對象是個殘疾皇子狞换,可洞房花燭夜當晚...
    茶點故事閱讀 43,587評論 2 350

推薦閱讀更多精彩內(nèi)容