當(dāng)我得知需要在項(xiàng)目中同時(shí)集成多個(gè) Kafka 和 RabbitMQ

那時(shí)的我

真的存在這樣的需求么沦寂?

真的存在這樣的需求么猛频?

真的存在這樣的需求么辕万?

Unbelievable!難以置信.jpg

在我還沒(méi)有遇到這個(gè)需求之前我可能會(huì)說(shuō)儡司,怎么可能會(huì)有這樣的需求嘛娱挨?

以至于我能夠列出許多原因:

  • 技術(shù)選型肯定要先做好啊,KafkaRabbitMQ一起用算怎么回事呢
  • 這倆都有Topic的概念捕犬,為啥要接入多個(gè)呢
  • ...

但是它出現(xiàn)了跷坝!它真的出現(xiàn)了!不管你有多無(wú)法想象為什么會(huì)有這樣的需求碉碉,它終究還是被我遇到了

好嘛柴钻,既然如此,那就只能老老實(shí)實(shí)的看看怎么實(shí)現(xiàn)了

(不甘心的小聲bb:我想知道真的有人遇到過(guò)這樣的需求么垢粮?)

簡(jiǎn)單粗暴法

于是乎我點(diǎn)開(kāi)了搜索引擎贴届,作為一個(gè)能夠熟練借助搜索引擎進(jìn)行代碼搬運(yùn)。蜡吧。毫蚓。咳咳

在輸入spring boot 多個(gè) kafka這個(gè)幾個(gè)關(guān)鍵字后昔善,果不其然元潘,第一篇就是相關(guān)的內(nèi)容

我不禁贊嘆了一番自己極高的搜索天賦,接著就點(diǎn)進(jìn)去想看看能不能直接復(fù)制君仆,啊不翩概,是看看能不能讓我在經(jīng)過(guò)無(wú)與倫比的(這是什么鬼形容詞)思考之后借鑒借鑒

文中的思路是這樣的:

  • 首先是配置文件
spring:
  kafka:
    one:
      bootstrap-servers: IP:PORT
      consumer:
        group-id: YOUR_GROUP_ID
    two:
      bootstrap-servers: IP:PORT
      consumer:
        group-id: YOUR_GROUP_ID
  • 然后是第一個(gè)Kafka配置類
@Configuration
public class KafkaOneConfig {

    @Value("${spring.kafka.one.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.one.consumer.group-id}")
    private String groupId;

    @Bean
    public KafkaTemplate<String, String> kafkaOneTemplate() {
        return new KafkaTemplate<>(producerOneFactory());
    }

    @Bean
    public KafkaListenerContainerFactory<? extends MessageListenerContainer> kafkaOneContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerOneFactory());
        return factory;
    }

    @Bean
    public ProducerFactory<String, String> producerOneFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public ConsumerFactory<String, String> consumerOneFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return props;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return props;
    }
}
  • 接著是第二個(gè)Kafka配置類
@Configuration
public class KafkaTwoConfig {

    @Value("${spring.kafka.two.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.two.consumer.group-id}")
    private String groupId;

    @Bean
    public KafkaTemplate<String, String> kafkaTwoTemplate() {
        return new KafkaTemplate<>(producerTwoFactory());
    }

    @Bean
    public KafkaListenerContainerFactory<? extends MessageListenerContainer> kafkaTwoContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerTwoFactory());
        return factory;
    }

    @Bean
    public ProducerFactory<String, String> producerTwoFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public ConsumerFactory<String, String> consumerTwoFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return props;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return props;
    }
}
  • 最后是收發(fā)消息
@Slf4j
@Component
public class Kafka {

    @Autowired
    @Qualifier("kafkaOneTemplate")
    private KafkaTemplate kafkaOneTemplate;
    
    @Autowired
    @Qualifier("kafkaTwoTemplate")
    private KafkaTemplate kafkaTwoTemplate;
    
    public void sendOne(String msg) {
        kafkaOneTemplate.send("topic", msg);
    }
    
    public void sendTwo(String msg) {
        kafkaTwoTemplate.send("topic", msg);
    }
    
    // containerFactory 的值要與配置中 KafkaListenerContainerFactory 的 Bean 名相同
    @KafkaListener(topics = "topic", containerFactory = "kafkaOneContainerFactory")
    public void listenerOne(ConsumerRecord<?, ?> record) {
        log.info("Kafka one 接收到消息:{}", record.value());
    }

    @KafkaListener(topics = "topic", containerFactory = "kafkaTwoContainerFactory")
    public void listenerTwo(ConsumerRecord<?, ?> record) {
        log.info("Kafka two 接收到消息:{}", record.value());
    }
}

看到這里基本上就有思路了,不得不說(shuō)還是比較方便易懂的返咱,直接CV過(guò)來(lái)改改就能用

但是這種方式有一個(gè)致命的問(wèn)題钥庇,在我的需求中是不確定數(shù)量的,是的沒(méi)錯(cuò)洛姑,可能在后續(xù)的維護(hù)中需要添加或移除一個(gè)或多個(gè)Kafka

如果按照這種方式上沐,那我每天的工作就會(huì)變成:

  1. 添加Kafka配置和代碼,重新打包部署
  2. 刪除Kafka配置和代碼楞艾,重新打包部署
  3. 添加Kafka配置和代碼参咙,重新打包部署
  4. 刪除Kafka配置和代碼龄广,重新打包部署
  5. ...

嗯?那我為什么不找個(gè)工廠擰螺絲蕴侧?

哎择同,沒(méi)想到我12年寒窗苦讀,如今終于出人頭地净宵,以為能干出一番大事業(yè)敲才。。择葡。

(快醒醒紧武,別做夢(mèng)了,干不完可是要加班干的)

好的敏储,繼續(xù)我們偉大的阻星。。已添。emmm妥箕,擰螺絲事業(yè)

于是我就想有沒(méi)有可能把這個(gè)做成能夠動(dòng)態(tài)添加刪除而不是在代碼中寫(xiě)死呢

思維發(fā)散

現(xiàn)在我們把自己想象成一個(gè)極度厭惡麻煩的人,沒(méi)錯(cuò)更舞,極度厭惡畦幢,能改2個(gè)地方絕不改3個(gè)地方,能改1個(gè)地方絕不改2個(gè)地方缆蝉,能讓別人改就絕不自己改

在忽略了如何實(shí)現(xiàn)的情況下就有了下面3種簡(jiǎn)化版操作方案:

  1. 修改配置文件并重啟
  2. 修改配置文件并自動(dòng)刷新
  3. 實(shí)現(xiàn)一套可視化頁(yè)面

畢竟基于我們極度厭惡麻煩的人設(shè)宇葱,改代碼是不可能的,而改配置返奉,重啟贝搁,或是可視化操作還是有很多人能做到的

所以方案3(實(shí)現(xiàn)一套可視化頁(yè)面)得到了我極大的認(rèn)可,但是考慮到畫(huà)頁(yè)面的工作量也是比較大的

最終我決定芽偏,先實(shí)現(xiàn)方案1(修改配置文件并重啟)再實(shí)現(xiàn)方案3(實(shí)現(xiàn)一套可視化頁(yè)面)雷逆,至于方案2(修改配置文件并自動(dòng)刷新),有了可視化就沒(méi)必要自動(dòng)刷新了

現(xiàn)在我們大致明確了這個(gè)需求在完成之后是個(gè)什么玩意兒(從操作上來(lái)說(shuō))污尉,接下來(lái)就要想想如何實(shí)現(xiàn)了

在考慮實(shí)現(xiàn)之前膀哲,讓我們先來(lái)俯瞰一下這個(gè)需求,找找KafkaRabbitMQ存在哪些共同點(diǎn)被碗,也就是從哪個(gè)角度去抽象這個(gè)需求

抽象某宪?什么抽象?抽什么象锐朴?抽象什么兴喂?這還用抽象?這不直接上來(lái)就敲?

當(dāng)然了衣迷,上來(lái)就敲肯定沒(méi)問(wèn)題畏鼓,就是后續(xù)的重構(gòu)可能讓你比較糟心,作為一個(gè)極度厭惡麻煩的人設(shè)壶谒,與其之后因?yàn)楣δ懿恢С侄貥?gòu)還不如一步到位直接干到完美云矫,當(dāng)然了,理想很豐滿汗菜,不知道大家覺(jué)得這個(gè)世界上究竟存不存在完美的代碼呢让禀?

如何抽象

抽象是一門(mén)很高深的藝術(shù),單單會(huì)幾個(gè)設(shè)計(jì)模式肯定是做不好抽象的陨界,能否做出好的抽象基于大量的抽象實(shí)踐巡揍,沒(méi)有實(shí)踐,理論再牛逼設(shè)計(jì)出來(lái)也是一坨shi普碎,是的吼肥,就是不斷的練(趕緊的,拿你們公司的項(xiàng)目練)麻车,知道你接手的代碼為什么像shi么,沒(méi)錯(cuò)斗这,那都是我練手練下來(lái)的hhh

扯遠(yuǎn)了动猬,直接給結(jié)論,大家覺(jué)得用事件模型來(lái)作為抽象怎么樣表箭?

KafkaRabbitMQ主要的應(yīng)用場(chǎng)景就是作為消息中間件赁咙,無(wú)非就是發(fā)布訂閱模型,也就是事件模型

如果我們將其抽象為事件模型免钻,那么無(wú)論是Kafka還是RabbitMQ彼水,又或者是RocketMQActiveMQ,甚至今后可能出現(xiàn)的更NB的消息中間件极舔,只要符合事件模型就都能夠?qū)⑺鼈兡依ㄆ渲?/p>

于是乎我們完全可以設(shè)想凤覆,在項(xiàng)目中使用抽象的事件模型進(jìn)行消息的發(fā)布和訂閱,將具體的實(shí)現(xiàn)隱藏起來(lái)拆魏,對(duì)我們后續(xù)的擴(kuò)展和維護(hù)來(lái)說(shuō)具有非常大的便利性

舉個(gè)例子盯桦,比如我們現(xiàn)在的項(xiàng)目使用RabbitMQ來(lái)作為事件模型的具體實(shí)現(xiàn),當(dāng)有一天RabbitMQ的吞吐量不足以支持我們的業(yè)務(wù)時(shí)渤刃,我們可以直接將實(shí)現(xiàn)替換成Kafka而不用在代碼中把RabbitTemplate改成KafkaTemplate拥峦,把@RabbitListener改成@KafkaListener,將技術(shù)實(shí)現(xiàn)和業(yè)務(wù)邏輯解耦卖子,這其實(shí)也是DDD中所追求的方式

引擎和端點(diǎn)

基于我們現(xiàn)在的需求略号,我們可以先設(shè)計(jì)一下多個(gè)KafkaRabbitMQ的結(jié)構(gòu)

{
    kafka: {
        kafka1: {},
        kafka2: {}
    },
    rabbitmq: {
        rabbitmq1: {},
        rabbitmq2: {}
    }
}

在這里我定義了兩個(gè)概念,事件引擎EventEngine和事件端點(diǎn)EventEndpoint

  • 事件引擎就是kafkarabbitmq,代表事件的發(fā)布訂閱所依賴的中間件

  • 事件端點(diǎn)就是kafka1玄柠,kafka2rabbitmq1突梦,rabbitmq2,代表不同的中間件服務(wù)(集群)

基于事件引擎的抽象随闪,我們可以定義Kafka的事件引擎KafkaEventEngineRabbitMQ的事件引擎RabbitEventEngine阳似,如果后續(xù)需要集成RocketMQ就可以直接擴(kuò)展一個(gè)RocketEventEngine

基于事件端點(diǎn)的抽象,我們就可以添加任意個(gè)數(shù)的Kafka或是RabbitMQ铐伴,不用再通過(guò)硬編碼集成撮奏,每個(gè)端點(diǎn)的配置相互隔離,互不影響

交換機(jī)

因?yàn)槲覀儸F(xiàn)在集成了多個(gè)事件引擎和事件端點(diǎn)当宴,所以當(dāng)我們?cè)诎l(fā)布或訂閱時(shí)就需要指定使用哪個(gè)事件引擎下的哪個(gè)事件端點(diǎn)(就是要使用哪個(gè)中間件)

比如我們現(xiàn)在需要把事件E1發(fā)送到kafka1上畜吊,所以我們就需要先從kafka1kafka2户矢,rabbitmq1玲献,rabbitmq2中把kafka1找出來(lái)

所以我定義了事件交換機(jī)EventExchange的概念

public interface EventExchange {

    Collection<? extends EventEndpoint> exchange(Collection<? extends EventEngine> engines, EventContext context);
}

將所有的事件引擎?zhèn)魅耄缓蠓祷貙?duì)應(yīng)的事件端點(diǎn)(事件引擎中持有對(duì)應(yīng)的事件端點(diǎn)梯浪,如kafka這個(gè)事件引擎持有kafka1kafka2這兩個(gè)事件端點(diǎn))

在我們發(fā)布或訂閱時(shí)捌年,可以傳入一個(gè)事件交換機(jī)來(lái)指定發(fā)布事件到哪個(gè)中間件,或是訂閱哪個(gè)中間件的消息

比如當(dāng)我們的業(yè)務(wù)business1需要發(fā)送一個(gè)事件消息到kafka1rabbitmq2時(shí)挂洛,我們就可以定義一個(gè)Business1EventExchange

public class Business1EventExchange implements EventExchange {

    @Override
    public Collection<? extends EventEndpoint> exchange(Collection<? extends EventEngine> engines, EventContext context) {
        return engines.stream()
                .flatMap(it -> it.getEndpoints().stream())
                .filter(it -> it.getName().equals("kafka1") || 
                        it.getName().equals("rabbitmq2"))
                .collect(Collectors.toList());
    }
}

這里建議單獨(dú)實(shí)現(xiàn)一個(gè)類并且基于業(yè)務(wù)起名礼预,這樣當(dāng)業(yè)務(wù)business1需要更換事件端點(diǎn)或是添加事件端點(diǎn)時(shí),只需要修改Business1EventExchange即可虏劲,對(duì)于業(yè)務(wù)來(lái)說(shuō)就隱藏了具體的中間件細(xì)節(jié)托酸,業(yè)務(wù)邏輯和技術(shù)實(shí)現(xiàn)就不會(huì)耦合在一起

發(fā)布器

基于之前的業(yè)務(wù)business1,我們已經(jīng)通過(guò)事件交換機(jī)指定了kafka1rabbitmq2柒巫,但是KafkaRabbitMQ發(fā)消息的方式完全不一樣励堡,所以在事件發(fā)布的時(shí)候需要讓兩者分別使用各自的方式發(fā)送消息

于是我又定義了事件發(fā)布器EventPublisher的概念

public interface EventPublisher {

    void publish(Object event, EventEndpoint endpoint, EventContext context);
}

在平時(shí)的開(kāi)發(fā)中我們一般

使用KafkaTemplate來(lái)發(fā)消息到Kafka

使用RabbitTemplate來(lái)發(fā)消息到RabbitMQ

我就將發(fā)送消息這一動(dòng)作抽離出來(lái),就有了事件發(fā)布器

我們給業(yè)務(wù)business1來(lái)實(shí)現(xiàn)一個(gè)定制化的事件發(fā)布器

public class Business1EventPublisher implements EventPublisher {

    @Override
    public void publish(Object event, EventEndpoint endpoint, EventContext context) {
        if (endpoint.getName().equals("kafka1")) {
            //KafkaEventEndpoint持有KafkaTemplate
            KafkaTemplate<Object, Object> kafkaTemplate =
                    ((KafkaEventEndpoint) endpoint).getTemplate();
            //發(fā)送Kafka消息
            kafkaTemplate.sendDefault(event);
        }
        if (endpoint.getName().equals("rabbitmq2")) {
            //RabbitEventEndpoint持有RabbitTemplate
            RabbitTemplate rabbitTemplate =
                    ((RabbitEventEndpoint) endpoint).getTemplate();
            //發(fā)送RabbitMQ消息
            rabbitTemplate.convertAndSend(event);
        }
    }
}

只要我們?cè)诎l(fā)布時(shí)傳入指定的事件發(fā)布器就可以讓事件消息以我們想要方式發(fā)送到Kafka或是RabbitMQ

最終的事件發(fā)布大概就是這樣

@RestController
public class Business1Controller {

    @Autowired
    private EventConcept concept;

    @PostMapping("/business1")
    public void business1() {
        concept.template()
                .exchange(new Business1EventExchange())
                .publisher(new Business1EventPublisher())
                .publish(new Business1Event());
    }
}

訂閱器

和事件發(fā)布一樣堡掏,KafkaRabbitMQ監(jiān)聽(tīng)消息的方式也不一樣应结,而且沒(méi)辦法使用@KafkaListener@RabbitListener,因?yàn)槭褂米⒔饩蜎](méi)有辦法實(shí)現(xiàn)動(dòng)態(tài)監(jiān)聽(tīng)了

當(dāng)我們動(dòng)態(tài)添加了一個(gè)端點(diǎn)之后布疼,卻沒(méi)辦法監(jiān)聽(tīng)這個(gè)端點(diǎn)的消息摊趾,那不是很蠢?

所以我直接找到這兩個(gè)注解的解析類KafkaListenerAnnotationBeanPostProcessorRabbitListenerAnnotationBeanPostProcessor跟著看了一下是怎么監(jiān)聽(tīng)的

過(guò)程就不贅述了游两,直接上代碼

  • 首先是Kafka
public class KafkaReceiver {

    public void receive() {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        MessageListenerContainer container = factory.createContainer("topic");
        container.getContainerProperties().setMessageListener(new MessageListener<Object, Object>() {
            @Override
            public void onMessage(ConsumerRecord<Object, Object> data) {
                //接收數(shù)據(jù)
            }
        });
        container.start();
    }
}
  • 然后是RabbitMQ
public class RabbitReceiver {

    public void receive() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
        endpoint.setQueueNames("queue");
        endpoint.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                //接收數(shù)據(jù)
            }
        });
        SimpleMessageListenerContainer container = factory.createListenerContainer(endpoint);
        container.start();
    }
}

同時(shí)我們定義一個(gè)事件訂閱器EventSubscriber的概念來(lái)抽象監(jiān)聽(tīng)消息這個(gè)動(dòng)作

public interface EventSubscriber {

    Subscription subscribe(EventListener listener, EventEndpoint endpoint, EventContext context);
}

現(xiàn)在給業(yè)務(wù)business1來(lái)實(shí)現(xiàn)一個(gè)定制化的事件訂閱器

public class Business1EventSubscriber implements EventSubscriber {

    @Override
    public Subscription subscribe(EventListener listener, EventEndpoint endpoint, EventContext context) {
        if (endpoint.getName().equals("kafka1")) {
            ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            MessageListenerContainer container = factory.createContainer("topic");
            ContainerProperties properties = container.getContainerProperties();
            properties.setMessageListener(new org.springframework.kafka.listener.MessageListener<Object, Object>() {
                @Override
                public void onMessage(ConsumerRecord<Object, Object> data) {
                    //使用統(tǒng)一的接口返回?cái)?shù)據(jù)
                    listener.onEvent(data.value(), endpoint, context);
                }
            });
            container.start();
            return new KafkaSubscription(container);
        }
        if (endpoint.getName().equals("rabbitmq2")) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            SimpleRabbitListenerEndpoint listenerEndpoint = new SimpleRabbitListenerEndpoint();
            listenerEndpoint.setQueueNames("queue");
            listenerEndpoint.setMessageListener(new org.springframework.amqp.core.MessageListener() {
                @Override
                public void onMessage(Message message) {
                    //使用統(tǒng)一的接口返回?cái)?shù)據(jù)
                    listener.onEvent(message.getBody(), endpoint, context);
                }
            });
            SimpleMessageListenerContainer container = factory.createListenerContainer(listenerEndpoint);
            container.start();
            return new RabbitSubscription(container);
        }
        return Subscription.EMPTY;
    }
}

只要我們?cè)谟嗛啎r(shí)傳入指定的事件訂閱器就可以以我們想要方式監(jiān)聽(tīng)Kafka或是RabbitMQ的事件消息

最終的事件訂閱大概就是這樣

@Component
public class Business1SubscriberConfiguration implements ApplicationRunner {

    @Autowired
    private EventConcept concept;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        concept.template()
                .exchange(new Business1EventExchange())
                .subscriber(new Business1EventSubscriber())
                .subscribe(new GenericEventListener<Business1Event>() {
                    @Override
                    public void onGenericEvent(Business1Event event, EventEndpoint endpoint, EventContext context) {
                        //可以通過(guò)EventEndpoint獲取消息的來(lái)源
                    }
                });
    }
}

大致的流程就是這樣了吧

我們通過(guò)兩者的共同點(diǎn)進(jìn)行功能抽象砾层,在我們使用時(shí)就可以不依賴具體的實(shí)現(xiàn)

通過(guò)不同的實(shí)現(xiàn)就可以非常靈活的控制發(fā)布和訂閱的各個(gè)細(xì)節(jié),而不需要在業(yè)務(wù)邏輯代碼中改來(lái)改去

細(xì)節(jié)優(yōu)化

接下來(lái)我們看看有沒(méi)有可以優(yōu)化的細(xì)節(jié)問(wèn)題

不知道大家還記不記得業(yè)務(wù)business1發(fā)布時(shí)的示例代碼贱案,如下:

@RestController
public class Business1Controller {

    @Autowired
    private EventConcept concept;

    @PostMapping("/business1")
    public void business1() {
        concept.template()
                .exchange(new Business1EventExchange())
                .publisher(new Business1EventPublisher())
                .publish(new Business1Event());
    }
}

這里每次發(fā)布事件的時(shí)候都會(huì)new一個(gè)Business1EventExchangeBusiness1EventPublisher非常多余肛炮,而且浪費(fèi)性能

針對(duì)這個(gè)問(wèn)題止吐,我從兩個(gè)維度進(jìn)行了使用優(yōu)化

  1. 基于多級(jí)配置實(shí)現(xiàn)各級(jí)配置的優(yōu)先級(jí)
  2. 基于事件模版EventTemplate的多場(chǎng)景配置

多級(jí)配置

每次發(fā)布或訂閱的時(shí)候都要手動(dòng)指定事件交換機(jī),事件發(fā)布器或是事件訂閱器侨糟,如果這個(gè)配置每次都一樣碍扔,甚至整個(gè)項(xiàng)目都是統(tǒng)一的配置,那就可以提供一種類似于全局的配置秕重,而不需要每次手動(dòng)指定

  • 事件交換機(jī)

    • 可以指定一個(gè)全局的配置

    • 當(dāng)手動(dòng)指定了事件交換機(jī)就使用指定的事件交換機(jī)

    • 當(dāng)未手動(dòng)指定事件交換機(jī)則使用全局配置的事件交換機(jī)

    • 當(dāng)未配置全局的事件交換機(jī)則默認(rèn)發(fā)布到所有端點(diǎn)

  • 事件發(fā)布器/事件訂閱器

    • 可以給事件引擎和事件端點(diǎn)配置一個(gè)事件發(fā)布器/事件訂閱器

    • 當(dāng)手動(dòng)指定了事件發(fā)布器/事件訂閱器就使用指定的事件發(fā)布器/事件訂閱器

    • 當(dāng)未手動(dòng)指定事件發(fā)布器/事件訂閱器則使用事件端點(diǎn)配置的事件發(fā)布器/事件訂閱器

    • 當(dāng)未配置事件端點(diǎn)的事件發(fā)布器/事件訂閱器則使用事件引擎配置的事件發(fā)布器/事件訂閱器

當(dāng)我們配置了全局的事件交換機(jī)或是事件引擎和事件端點(diǎn)的事件發(fā)布器/事件訂閱器不同,我們的代碼就可以改為下面這樣:

  • 發(fā)布
@RestController
public class Business1Controller {

    @Autowired
    private EventConcept concept;

    @PostMapping("/business1")
    public void business1() {
        concept.template().publish(new Business1Event());
    }
}
  • 訂閱
@Component
public class Business1SubscriberConfiguration implements ApplicationRunner {

    @Autowired
    private EventConcept concept;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        concept.template().subscribe(new GenericEventListener<Business1Event>() {
            @Override
            public void onGenericEvent(Business1Event event, EventEndpoint endpoint, EventContext context) {
                //可以通過(guò)EventEndpoint獲取消息的來(lái)源
            }
        });
    }
}

模版配置

雖然我們現(xiàn)在支持了全局的配置,但是當(dāng)處理之前提到的業(yè)務(wù)business1需要單獨(dú)自定義時(shí)溶耘,全局配置就沒(méi)有什么效果了

于是我定義了事件模版EventTemplate的概念

我們可以新建并持有一個(gè)事件模版進(jìn)行配置復(fù)用二拐,寫(xiě)法如下:

@RestController
public class Business1Controller {

    private final EventTemplate template;

    @Autowired
    public Business1Controller(EventConcept concept) {
        //template方法每次調(diào)用都會(huì)創(chuàng)建一個(gè)模版
        this.template = concept.template()
                .exchange(new Business1EventExchange())
                .publisher(new Business1EventPublisher());
    }

    @PostMapping("/business1")
    public void business1() {
        template.publish(new Business1Event());
    }
}

屬性繼承

由于我們會(huì)有多個(gè)Kafka和多個(gè)RabbitMQ,所以配置文件可能會(huì)非常長(zhǎng)凳兵,甚至有很多配置都是一樣的

所以我就想到可以將相同的配置提取出來(lái)作為父配置百新,其他端點(diǎn)繼承父配置,大概就是這個(gè)樣子:

concept:
  event:
    kafka:
      enabled: true #需要手動(dòng)開(kāi)啟
      endpoints: #在該節(jié)點(diǎn)下配置多個(gè)kafka庐扫,屬性同spring.kafka
        kafka1: #端點(diǎn)名稱-kafka1
          inherit: parent #繼承名稱為parent的端點(diǎn)配置
          bootstrap-servers:
            - 192.168.30.100:9092
            - 192.168.30.101:9092
            - 192.168.30.102:9092
          consumer:
            group-id: kafka1
        kafka2: #端點(diǎn)名稱-kafka2
          inherit: parent #繼承名稱為parent的端點(diǎn)配置
          bootstrap-servers:
            - 192.168.60.200:9092
            - 192.168.60.201:9092
            - 192.168.60.202:9092
          consumer:
            group-id: kafka2
        parent: #作為其他端點(diǎn)的父配置
          enabled: false #是否啟用該端點(diǎn)饭望,這里作為父配置不啟用
          producer:
            retries: 0
            acks: 1
          consumer:
            enable-auto-commit: false
          template:
            default-topic: sample
          listener:
            ack-mode: manual_immediate

parent端點(diǎn)作為父配置,kafka1kafka2通過(guò)屬性inherit指定繼承parent端點(diǎn)的配置同時(shí)額外定義bootstrap-serversconsumer.group-id

擴(kuò)展支持

當(dāng)我們實(shí)現(xiàn)了功能之后就要看看對(duì)后續(xù)維護(hù)非常重要的擴(kuò)展性

當(dāng)然了形庭,一般情況下擴(kuò)展性肯定是在功能實(shí)現(xiàn)之前就規(guī)劃好一些擴(kuò)展點(diǎn)或是使用高擴(kuò)展性的架構(gòu)

這里只是把這塊放到了后面來(lái)講

引擎端點(diǎn)工廠

首先呢铅辞,我覺(jué)得需要給事件引擎EventEngine和事件端點(diǎn)EventEndpoint各添加一個(gè)工廠EventEngineFactoryEventEndpointFactory

這就方便對(duì)兩者做一些擴(kuò)展,同時(shí)這里有一個(gè)經(jīng)驗(yàn)建議

當(dāng)在實(shí)現(xiàn)或封裝一個(gè)庫(kù)時(shí)萨醒,如果邏輯中用到了new來(lái)創(chuàng)建實(shí)例時(shí)

建議替換為工廠類來(lái)創(chuàng)建或者提供一個(gè)可重寫(xiě)的方法來(lái)創(chuàng)建

為使用者提供一個(gè)自定義入口

引擎端點(diǎn)自定義配置

定義EventEndpointConfigurerEventEngineConfigurer來(lái)提供后置的自定義配置

這里可以類比WebMvcConfigurer之類的配置類的功能

事件上下文

增加了一個(gè)事件上下文EventContext的概念巷挥,默認(rèn)基于Map實(shí)現(xiàn),我們可以在發(fā)布或訂閱時(shí)設(shè)置自定義的屬性來(lái)更靈活的傳遞數(shù)據(jù)和處理邏輯

配置繼承處理器

對(duì)于配置的繼承功能验靡,我也定義了一個(gè)ConfigInheritHandler接口

考慮到由于版本問(wèn)題如果導(dǎo)致配置繼承出現(xiàn)問(wèn)題因小失大,就可以自定義配置繼承邏輯或是重寫(xiě)其中的一些方法來(lái)避免問(wèn)題

結(jié)束

好了雏节,差不多就這樣吧胜嗓,如果大家有遇到類似的需求可以參考參考這個(gè)思路,或者直接用我寫(xiě)的庫(kù)钩乍,GitHub傳送門(mén)(順便點(diǎn)個(gè)Star吧哈哈哈)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末辞州,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子寥粹,更是在濱河造成了極大的恐慌变过,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,284評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件涝涤,死亡現(xiàn)場(chǎng)離奇詭異媚狰,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)阔拳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)崭孤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人,你說(shuō)我怎么就攤上這事辨宠∫怕啵” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,614評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵嗤形,是天一觀的道長(zhǎng)精偿。 經(jīng)常有香客問(wèn)我,道長(zhǎng)赋兵,這世上最難降的妖魔是什么笔咽? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,671評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮毡惜,結(jié)果婚禮上拓轻,老公的妹妹穿的比我還像新娘。我一直安慰自己经伙,他們只是感情好扶叉,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著帕膜,像睡著了一般枣氧。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上垮刹,一...
    開(kāi)封第一講書(shū)人閱讀 51,562評(píng)論 1 305
  • 那天达吞,我揣著相機(jī)與錄音,去河邊找鬼荒典。 笑死酪劫,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的寺董。 我是一名探鬼主播覆糟,決...
    沈念sama閱讀 40,309評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼遮咖!你這毒婦竟也來(lái)了滩字?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,223評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤御吞,失蹤者是張志新(化名)和其女友劉穎麦箍,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體陶珠,經(jīng)...
    沈念sama閱讀 45,668評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡挟裂,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了背率。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片话瞧。...
    茶點(diǎn)故事閱讀 39,981評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡嫩与,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出交排,到底是詐尸還是另有隱情划滋,我是刑警寧澤,帶...
    沈念sama閱讀 35,705評(píng)論 5 347
  • 正文 年R本政府宣布埃篓,位于F島的核電站处坪,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏架专。R本人自食惡果不足惜同窘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望部脚。 院中可真熱鬧想邦,春花似錦、人聲如沸委刘。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,904評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)锡移。三九已至呕童,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間淆珊,已是汗流浹背夺饲。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,023評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留施符,地道東北人往声。 一個(gè)月前我還...
    沈念sama閱讀 48,146評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像戳吝,于是被迫代替她去往敵國(guó)和親烁挟。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容