那時(shí)的我
真的存在這樣的需求么沦寂?
真的存在這樣的需求么猛频?
真的存在這樣的需求么辕万?
Unbelievable!難以置信.jpg
在我還沒(méi)有遇到這個(gè)需求之前我可能會(huì)說(shuō)儡司,怎么可能會(huì)有這樣的需求嘛娱挨?
以至于我能夠列出許多原因:
- 技術(shù)選型肯定要先做好啊,
Kafka
和RabbitMQ
一起用算怎么回事呢 - 這倆都有
Topic
的概念捕犬,為啥要接入多個(gè)呢 - ...
但是它出現(xiàn)了跷坝!它真的出現(xiàn)了!不管你有多無(wú)法想象為什么會(huì)有這樣的需求碉碉,它終究還是被我遇到了
好嘛柴钻,既然如此,那就只能老老實(shí)實(shí)的看看怎么實(shí)現(xiàn)了
(不甘心的小聲bb:我想知道真的有人遇到過(guò)這樣的需求么垢粮?)
簡(jiǎn)單粗暴法
于是乎我點(diǎn)開(kāi)了搜索引擎贴届,作為一個(gè)能夠熟練借助搜索引擎進(jìn)行代碼搬運(yùn)。蜡吧。毫蚓。咳咳
在輸入spring boot 多個(gè) kafka
這個(gè)幾個(gè)關(guān)鍵字后昔善,果不其然元潘,第一篇就是相關(guān)的內(nèi)容
我不禁贊嘆了一番自己極高的搜索天賦,接著就點(diǎn)進(jìn)去想看看能不能直接復(fù)制君仆,啊不翩概,是看看能不能讓我在經(jīng)過(guò)無(wú)與倫比的(這是什么鬼形容詞)思考之后借鑒借鑒
文中的思路是這樣的:
- 首先是配置文件
spring:
kafka:
one:
bootstrap-servers: IP:PORT
consumer:
group-id: YOUR_GROUP_ID
two:
bootstrap-servers: IP:PORT
consumer:
group-id: YOUR_GROUP_ID
- 然后是第一個(gè)
Kafka
配置類
@Configuration
public class KafkaOneConfig {
@Value("${spring.kafka.one.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.one.consumer.group-id}")
private String groupId;
@Bean
public KafkaTemplate<String, String> kafkaOneTemplate() {
return new KafkaTemplate<>(producerOneFactory());
}
@Bean
public KafkaListenerContainerFactory<? extends MessageListenerContainer> kafkaOneContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerOneFactory());
return factory;
}
@Bean
public ProducerFactory<String, String> producerOneFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public ConsumerFactory<String, String> consumerOneFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return props;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
}
- 接著是第二個(gè)
Kafka
配置類
@Configuration
public class KafkaTwoConfig {
@Value("${spring.kafka.two.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.two.consumer.group-id}")
private String groupId;
@Bean
public KafkaTemplate<String, String> kafkaTwoTemplate() {
return new KafkaTemplate<>(producerTwoFactory());
}
@Bean
public KafkaListenerContainerFactory<? extends MessageListenerContainer> kafkaTwoContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerTwoFactory());
return factory;
}
@Bean
public ProducerFactory<String, String> producerTwoFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public ConsumerFactory<String, String> consumerTwoFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return props;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return props;
}
}
- 最后是收發(fā)消息
@Slf4j
@Component
public class Kafka {
@Autowired
@Qualifier("kafkaOneTemplate")
private KafkaTemplate kafkaOneTemplate;
@Autowired
@Qualifier("kafkaTwoTemplate")
private KafkaTemplate kafkaTwoTemplate;
public void sendOne(String msg) {
kafkaOneTemplate.send("topic", msg);
}
public void sendTwo(String msg) {
kafkaTwoTemplate.send("topic", msg);
}
// containerFactory 的值要與配置中 KafkaListenerContainerFactory 的 Bean 名相同
@KafkaListener(topics = "topic", containerFactory = "kafkaOneContainerFactory")
public void listenerOne(ConsumerRecord<?, ?> record) {
log.info("Kafka one 接收到消息:{}", record.value());
}
@KafkaListener(topics = "topic", containerFactory = "kafkaTwoContainerFactory")
public void listenerTwo(ConsumerRecord<?, ?> record) {
log.info("Kafka two 接收到消息:{}", record.value());
}
}
看到這里基本上就有思路了,不得不說(shuō)還是比較方便易懂的返咱,直接CV過(guò)來(lái)改改就能用
但是這種方式有一個(gè)致命的問(wèn)題钥庇,在我的需求中是不確定數(shù)量的,是的沒(méi)錯(cuò)洛姑,可能在后續(xù)的維護(hù)中需要添加或移除一個(gè)或多個(gè)Kafka
如果按照這種方式上沐,那我每天的工作就會(huì)變成:
- 添加
Kafka
配置和代碼,重新打包部署 - 刪除
Kafka
配置和代碼楞艾,重新打包部署 - 添加
Kafka
配置和代碼参咙,重新打包部署 - 刪除
Kafka
配置和代碼龄广,重新打包部署 - ...
嗯?那我為什么不找個(gè)工廠擰螺絲蕴侧?
哎择同,沒(méi)想到我12年寒窗苦讀,如今終于出人頭地净宵,以為能干出一番大事業(yè)敲才。。择葡。
(快醒醒紧武,別做夢(mèng)了,干不完可是要加班干的)
好的敏储,繼續(xù)我們偉大的阻星。。已添。emmm妥箕,擰螺絲事業(yè)
于是我就想有沒(méi)有可能把這個(gè)做成能夠動(dòng)態(tài)添加刪除而不是在代碼中寫(xiě)死呢
思維發(fā)散
現(xiàn)在我們把自己想象成一個(gè)極度厭惡麻煩的人,沒(méi)錯(cuò)更舞,極度厭惡畦幢,能改2個(gè)地方絕不改3個(gè)地方,能改1個(gè)地方絕不改2個(gè)地方缆蝉,能讓別人改就絕不自己改
在忽略了如何實(shí)現(xiàn)的情況下就有了下面3種簡(jiǎn)化版操作方案:
- 修改配置文件并重啟
- 修改配置文件并自動(dòng)刷新
- 實(shí)現(xiàn)一套可視化頁(yè)面
畢竟基于我們極度厭惡麻煩的人設(shè)宇葱,改代碼是不可能的,而改配置返奉,重啟贝搁,或是可視化操作還是有很多人能做到的
所以方案3(實(shí)現(xiàn)一套可視化頁(yè)面)得到了我極大的認(rèn)可,但是考慮到畫(huà)頁(yè)面的工作量也是比較大的
最終我決定芽偏,先實(shí)現(xiàn)方案1(修改配置文件并重啟)再實(shí)現(xiàn)方案3(實(shí)現(xiàn)一套可視化頁(yè)面)雷逆,至于方案2(修改配置文件并自動(dòng)刷新),有了可視化就沒(méi)必要自動(dòng)刷新了
現(xiàn)在我們大致明確了這個(gè)需求在完成之后是個(gè)什么玩意兒(從操作上來(lái)說(shuō))污尉,接下來(lái)就要想想如何實(shí)現(xiàn)了
在考慮實(shí)現(xiàn)之前膀哲,讓我們先來(lái)俯瞰一下這個(gè)需求,找找Kafka
和RabbitMQ
存在哪些共同點(diǎn)被碗,也就是從哪個(gè)角度去抽象這個(gè)需求
抽象某宪?什么抽象?抽什么象锐朴?抽象什么兴喂?這還用抽象?這不直接上來(lái)就敲?
當(dāng)然了衣迷,上來(lái)就敲肯定沒(méi)問(wèn)題畏鼓,就是后續(xù)的重構(gòu)可能讓你比較糟心,作為一個(gè)極度厭惡麻煩的人設(shè)壶谒,與其之后因?yàn)楣δ懿恢С侄貥?gòu)還不如一步到位直接干到完美云矫,當(dāng)然了,理想很豐滿汗菜,不知道大家覺(jué)得這個(gè)世界上究竟存不存在完美的代碼呢让禀?
如何抽象
抽象是一門(mén)很高深的藝術(shù),單單會(huì)幾個(gè)設(shè)計(jì)模式肯定是做不好抽象的陨界,能否做出好的抽象基于大量的抽象實(shí)踐巡揍,沒(méi)有實(shí)踐,理論再牛逼設(shè)計(jì)出來(lái)也是一坨shi普碎,是的吼肥,就是不斷的練(趕緊的,拿你們公司的項(xiàng)目練)麻车,知道你接手的代碼為什么像shi么,沒(méi)錯(cuò)斗这,那都是我練手練下來(lái)的hhh
扯遠(yuǎn)了动猬,直接給結(jié)論,大家覺(jué)得用事件模型來(lái)作為抽象怎么樣表箭?
Kafka
和RabbitMQ
主要的應(yīng)用場(chǎng)景就是作為消息中間件赁咙,無(wú)非就是發(fā)布訂閱模型,也就是事件模型
如果我們將其抽象為事件模型免钻,那么無(wú)論是Kafka
還是RabbitMQ
彼水,又或者是RocketMQ
和ActiveMQ
,甚至今后可能出現(xiàn)的更NB的消息中間件极舔,只要符合事件模型就都能夠?qū)⑺鼈兡依ㄆ渲?/p>
于是乎我們完全可以設(shè)想凤覆,在項(xiàng)目中使用抽象的事件模型進(jìn)行消息的發(fā)布和訂閱,將具體的實(shí)現(xiàn)隱藏起來(lái)拆魏,對(duì)我們后續(xù)的擴(kuò)展和維護(hù)來(lái)說(shuō)具有非常大的便利性
舉個(gè)例子盯桦,比如我們現(xiàn)在的項(xiàng)目使用RabbitMQ
來(lái)作為事件模型的具體實(shí)現(xiàn),當(dāng)有一天RabbitMQ
的吞吐量不足以支持我們的業(yè)務(wù)時(shí)渤刃,我們可以直接將實(shí)現(xiàn)替換成Kafka
而不用在代碼中把RabbitTemplate
改成KafkaTemplate
拥峦,把@RabbitListener
改成@KafkaListener
,將技術(shù)實(shí)現(xiàn)和業(yè)務(wù)邏輯解耦卖子,這其實(shí)也是DDD中所追求的方式
引擎和端點(diǎn)
基于我們現(xiàn)在的需求略号,我們可以先設(shè)計(jì)一下多個(gè)Kafka
和RabbitMQ
的結(jié)構(gòu)
{
kafka: {
kafka1: {},
kafka2: {}
},
rabbitmq: {
rabbitmq1: {},
rabbitmq2: {}
}
}
在這里我定義了兩個(gè)概念,事件引擎EventEngine
和事件端點(diǎn)EventEndpoint
事件引擎就是
kafka
和rabbitmq
,代表事件的發(fā)布訂閱所依賴的中間件事件端點(diǎn)就是
kafka1
玄柠,kafka2
和rabbitmq1
突梦,rabbitmq2
,代表不同的中間件服務(wù)(集群)
基于事件引擎的抽象随闪,我們可以定義Kafka
的事件引擎KafkaEventEngine
和RabbitMQ
的事件引擎RabbitEventEngine
阳似,如果后續(xù)需要集成RocketMQ
就可以直接擴(kuò)展一個(gè)RocketEventEngine
基于事件端點(diǎn)的抽象,我們就可以添加任意個(gè)數(shù)的Kafka
或是RabbitMQ
铐伴,不用再通過(guò)硬編碼集成撮奏,每個(gè)端點(diǎn)的配置相互隔離,互不影響
交換機(jī)
因?yàn)槲覀儸F(xiàn)在集成了多個(gè)事件引擎和事件端點(diǎn)当宴,所以當(dāng)我們?cè)诎l(fā)布或訂閱時(shí)就需要指定使用哪個(gè)事件引擎下的哪個(gè)事件端點(diǎn)(就是要使用哪個(gè)中間件)
比如我們現(xiàn)在需要把事件E1
發(fā)送到kafka1
上畜吊,所以我們就需要先從kafka1
,kafka2
户矢,rabbitmq1
玲献,rabbitmq2
中把kafka1
找出來(lái)
所以我定義了事件交換機(jī)EventExchange
的概念
public interface EventExchange {
Collection<? extends EventEndpoint> exchange(Collection<? extends EventEngine> engines, EventContext context);
}
將所有的事件引擎?zhèn)魅耄缓蠓祷貙?duì)應(yīng)的事件端點(diǎn)(事件引擎中持有對(duì)應(yīng)的事件端點(diǎn)梯浪,如kafka
這個(gè)事件引擎持有kafka1
和kafka2
這兩個(gè)事件端點(diǎn))
在我們發(fā)布或訂閱時(shí)捌年,可以傳入一個(gè)事件交換機(jī)來(lái)指定發(fā)布事件到哪個(gè)中間件,或是訂閱哪個(gè)中間件的消息
比如當(dāng)我們的業(yè)務(wù)business1
需要發(fā)送一個(gè)事件消息到kafka1
和rabbitmq2
時(shí)挂洛,我們就可以定義一個(gè)Business1EventExchange
public class Business1EventExchange implements EventExchange {
@Override
public Collection<? extends EventEndpoint> exchange(Collection<? extends EventEngine> engines, EventContext context) {
return engines.stream()
.flatMap(it -> it.getEndpoints().stream())
.filter(it -> it.getName().equals("kafka1") ||
it.getName().equals("rabbitmq2"))
.collect(Collectors.toList());
}
}
這里建議單獨(dú)實(shí)現(xiàn)一個(gè)類并且基于業(yè)務(wù)起名礼预,這樣當(dāng)業(yè)務(wù)business1
需要更換事件端點(diǎn)或是添加事件端點(diǎn)時(shí),只需要修改Business1EventExchange
即可虏劲,對(duì)于業(yè)務(wù)來(lái)說(shuō)就隱藏了具體的中間件細(xì)節(jié)托酸,業(yè)務(wù)邏輯和技術(shù)實(shí)現(xiàn)就不會(huì)耦合在一起
發(fā)布器
基于之前的業(yè)務(wù)business1
,我們已經(jīng)通過(guò)事件交換機(jī)指定了kafka1
和rabbitmq2
柒巫,但是Kafka
和RabbitMQ
發(fā)消息的方式完全不一樣励堡,所以在事件發(fā)布的時(shí)候需要讓兩者分別使用各自的方式發(fā)送消息
于是我又定義了事件發(fā)布器EventPublisher
的概念
public interface EventPublisher {
void publish(Object event, EventEndpoint endpoint, EventContext context);
}
在平時(shí)的開(kāi)發(fā)中我們一般
使用KafkaTemplate
來(lái)發(fā)消息到Kafka
使用RabbitTemplate
來(lái)發(fā)消息到RabbitMQ
我就將發(fā)送消息這一動(dòng)作抽離出來(lái),就有了事件發(fā)布器
我們給業(yè)務(wù)business1
來(lái)實(shí)現(xiàn)一個(gè)定制化的事件發(fā)布器
public class Business1EventPublisher implements EventPublisher {
@Override
public void publish(Object event, EventEndpoint endpoint, EventContext context) {
if (endpoint.getName().equals("kafka1")) {
//KafkaEventEndpoint持有KafkaTemplate
KafkaTemplate<Object, Object> kafkaTemplate =
((KafkaEventEndpoint) endpoint).getTemplate();
//發(fā)送Kafka消息
kafkaTemplate.sendDefault(event);
}
if (endpoint.getName().equals("rabbitmq2")) {
//RabbitEventEndpoint持有RabbitTemplate
RabbitTemplate rabbitTemplate =
((RabbitEventEndpoint) endpoint).getTemplate();
//發(fā)送RabbitMQ消息
rabbitTemplate.convertAndSend(event);
}
}
}
只要我們?cè)诎l(fā)布時(shí)傳入指定的事件發(fā)布器就可以讓事件消息以我們想要方式發(fā)送到Kafka
或是RabbitMQ
中
最終的事件發(fā)布大概就是這樣
@RestController
public class Business1Controller {
@Autowired
private EventConcept concept;
@PostMapping("/business1")
public void business1() {
concept.template()
.exchange(new Business1EventExchange())
.publisher(new Business1EventPublisher())
.publish(new Business1Event());
}
}
訂閱器
和事件發(fā)布一樣堡掏,Kafka
和RabbitMQ
監(jiān)聽(tīng)消息的方式也不一樣应结,而且沒(méi)辦法使用@KafkaListener
和@RabbitListener
,因?yàn)槭褂米⒔饩蜎](méi)有辦法實(shí)現(xiàn)動(dòng)態(tài)監(jiān)聽(tīng)了
當(dāng)我們動(dòng)態(tài)添加了一個(gè)端點(diǎn)之后布疼,卻沒(méi)辦法監(jiān)聽(tīng)這個(gè)端點(diǎn)的消息摊趾,那不是很蠢?
所以我直接找到這兩個(gè)注解的解析類KafkaListenerAnnotationBeanPostProcessor
和RabbitListenerAnnotationBeanPostProcessor
跟著看了一下是怎么監(jiān)聽(tīng)的
過(guò)程就不贅述了游两,直接上代碼
- 首先是
Kafka
public class KafkaReceiver {
public void receive() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
MessageListenerContainer container = factory.createContainer("topic");
container.getContainerProperties().setMessageListener(new MessageListener<Object, Object>() {
@Override
public void onMessage(ConsumerRecord<Object, Object> data) {
//接收數(shù)據(jù)
}
});
container.start();
}
}
- 然后是
RabbitMQ
public class RabbitReceiver {
public void receive() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setQueueNames("queue");
endpoint.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//接收數(shù)據(jù)
}
});
SimpleMessageListenerContainer container = factory.createListenerContainer(endpoint);
container.start();
}
}
同時(shí)我們定義一個(gè)事件訂閱器EventSubscriber
的概念來(lái)抽象監(jiān)聽(tīng)消息這個(gè)動(dòng)作
public interface EventSubscriber {
Subscription subscribe(EventListener listener, EventEndpoint endpoint, EventContext context);
}
現(xiàn)在給業(yè)務(wù)business1
來(lái)實(shí)現(xiàn)一個(gè)定制化的事件訂閱器
public class Business1EventSubscriber implements EventSubscriber {
@Override
public Subscription subscribe(EventListener listener, EventEndpoint endpoint, EventContext context) {
if (endpoint.getName().equals("kafka1")) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
MessageListenerContainer container = factory.createContainer("topic");
ContainerProperties properties = container.getContainerProperties();
properties.setMessageListener(new org.springframework.kafka.listener.MessageListener<Object, Object>() {
@Override
public void onMessage(ConsumerRecord<Object, Object> data) {
//使用統(tǒng)一的接口返回?cái)?shù)據(jù)
listener.onEvent(data.value(), endpoint, context);
}
});
container.start();
return new KafkaSubscription(container);
}
if (endpoint.getName().equals("rabbitmq2")) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
SimpleRabbitListenerEndpoint listenerEndpoint = new SimpleRabbitListenerEndpoint();
listenerEndpoint.setQueueNames("queue");
listenerEndpoint.setMessageListener(new org.springframework.amqp.core.MessageListener() {
@Override
public void onMessage(Message message) {
//使用統(tǒng)一的接口返回?cái)?shù)據(jù)
listener.onEvent(message.getBody(), endpoint, context);
}
});
SimpleMessageListenerContainer container = factory.createListenerContainer(listenerEndpoint);
container.start();
return new RabbitSubscription(container);
}
return Subscription.EMPTY;
}
}
只要我們?cè)谟嗛啎r(shí)傳入指定的事件訂閱器就可以以我們想要方式監(jiān)聽(tīng)Kafka
或是RabbitMQ
的事件消息
最終的事件訂閱大概就是這樣
@Component
public class Business1SubscriberConfiguration implements ApplicationRunner {
@Autowired
private EventConcept concept;
@Override
public void run(ApplicationArguments args) throws Exception {
concept.template()
.exchange(new Business1EventExchange())
.subscriber(new Business1EventSubscriber())
.subscribe(new GenericEventListener<Business1Event>() {
@Override
public void onGenericEvent(Business1Event event, EventEndpoint endpoint, EventContext context) {
//可以通過(guò)EventEndpoint獲取消息的來(lái)源
}
});
}
}
大致的流程就是這樣了吧
我們通過(guò)兩者的共同點(diǎn)進(jìn)行功能抽象砾层,在我們使用時(shí)就可以不依賴具體的實(shí)現(xiàn)
通過(guò)不同的實(shí)現(xiàn)就可以非常靈活的控制發(fā)布和訂閱的各個(gè)細(xì)節(jié),而不需要在業(yè)務(wù)邏輯代碼中改來(lái)改去
細(xì)節(jié)優(yōu)化
接下來(lái)我們看看有沒(méi)有可以優(yōu)化的細(xì)節(jié)問(wèn)題
不知道大家還記不記得業(yè)務(wù)business1
發(fā)布時(shí)的示例代碼贱案,如下:
@RestController
public class Business1Controller {
@Autowired
private EventConcept concept;
@PostMapping("/business1")
public void business1() {
concept.template()
.exchange(new Business1EventExchange())
.publisher(new Business1EventPublisher())
.publish(new Business1Event());
}
}
這里每次發(fā)布事件的時(shí)候都會(huì)new
一個(gè)Business1EventExchange
和Business1EventPublisher
非常多余肛炮,而且浪費(fèi)性能
針對(duì)這個(gè)問(wèn)題止吐,我從兩個(gè)維度進(jìn)行了使用優(yōu)化
- 基于多級(jí)配置實(shí)現(xiàn)各級(jí)配置的優(yōu)先級(jí)
- 基于事件模版
EventTemplate
的多場(chǎng)景配置
多級(jí)配置
每次發(fā)布或訂閱的時(shí)候都要手動(dòng)指定事件交換機(jī),事件發(fā)布器或是事件訂閱器侨糟,如果這個(gè)配置每次都一樣碍扔,甚至整個(gè)項(xiàng)目都是統(tǒng)一的配置,那就可以提供一種類似于全局的配置秕重,而不需要每次手動(dòng)指定
-
事件交換機(jī)
可以指定一個(gè)全局的配置
當(dāng)手動(dòng)指定了事件交換機(jī)就使用指定的事件交換機(jī)
當(dāng)未手動(dòng)指定事件交換機(jī)則使用全局配置的事件交換機(jī)
當(dāng)未配置全局的事件交換機(jī)則默認(rèn)發(fā)布到所有端點(diǎn)
-
事件發(fā)布器/事件訂閱器
可以給事件引擎和事件端點(diǎn)配置一個(gè)事件發(fā)布器/事件訂閱器
當(dāng)手動(dòng)指定了事件發(fā)布器/事件訂閱器就使用指定的事件發(fā)布器/事件訂閱器
當(dāng)未手動(dòng)指定事件發(fā)布器/事件訂閱器則使用事件端點(diǎn)配置的事件發(fā)布器/事件訂閱器
當(dāng)未配置事件端點(diǎn)的事件發(fā)布器/事件訂閱器則使用事件引擎配置的事件發(fā)布器/事件訂閱器
當(dāng)我們配置了全局的事件交換機(jī)或是事件引擎和事件端點(diǎn)的事件發(fā)布器/事件訂閱器不同,我們的代碼就可以改為下面這樣:
- 發(fā)布
@RestController
public class Business1Controller {
@Autowired
private EventConcept concept;
@PostMapping("/business1")
public void business1() {
concept.template().publish(new Business1Event());
}
}
- 訂閱
@Component
public class Business1SubscriberConfiguration implements ApplicationRunner {
@Autowired
private EventConcept concept;
@Override
public void run(ApplicationArguments args) throws Exception {
concept.template().subscribe(new GenericEventListener<Business1Event>() {
@Override
public void onGenericEvent(Business1Event event, EventEndpoint endpoint, EventContext context) {
//可以通過(guò)EventEndpoint獲取消息的來(lái)源
}
});
}
}
模版配置
雖然我們現(xiàn)在支持了全局的配置,但是當(dāng)處理之前提到的業(yè)務(wù)business1
需要單獨(dú)自定義時(shí)溶耘,全局配置就沒(méi)有什么效果了
于是我定義了事件模版EventTemplate
的概念
我們可以新建并持有一個(gè)事件模版進(jìn)行配置復(fù)用二拐,寫(xiě)法如下:
@RestController
public class Business1Controller {
private final EventTemplate template;
@Autowired
public Business1Controller(EventConcept concept) {
//template方法每次調(diào)用都會(huì)創(chuàng)建一個(gè)模版
this.template = concept.template()
.exchange(new Business1EventExchange())
.publisher(new Business1EventPublisher());
}
@PostMapping("/business1")
public void business1() {
template.publish(new Business1Event());
}
}
屬性繼承
由于我們會(huì)有多個(gè)Kafka
和多個(gè)RabbitMQ
,所以配置文件可能會(huì)非常長(zhǎng)凳兵,甚至有很多配置都是一樣的
所以我就想到可以將相同的配置提取出來(lái)作為父配置百新,其他端點(diǎn)繼承父配置,大概就是這個(gè)樣子:
concept:
event:
kafka:
enabled: true #需要手動(dòng)開(kāi)啟
endpoints: #在該節(jié)點(diǎn)下配置多個(gè)kafka庐扫,屬性同spring.kafka
kafka1: #端點(diǎn)名稱-kafka1
inherit: parent #繼承名稱為parent的端點(diǎn)配置
bootstrap-servers:
- 192.168.30.100:9092
- 192.168.30.101:9092
- 192.168.30.102:9092
consumer:
group-id: kafka1
kafka2: #端點(diǎn)名稱-kafka2
inherit: parent #繼承名稱為parent的端點(diǎn)配置
bootstrap-servers:
- 192.168.60.200:9092
- 192.168.60.201:9092
- 192.168.60.202:9092
consumer:
group-id: kafka2
parent: #作為其他端點(diǎn)的父配置
enabled: false #是否啟用該端點(diǎn)饭望,這里作為父配置不啟用
producer:
retries: 0
acks: 1
consumer:
enable-auto-commit: false
template:
default-topic: sample
listener:
ack-mode: manual_immediate
parent
端點(diǎn)作為父配置,kafka1
和kafka2
通過(guò)屬性inherit
指定繼承parent
端點(diǎn)的配置同時(shí)額外定義bootstrap-servers
和consumer.group-id
擴(kuò)展支持
當(dāng)我們實(shí)現(xiàn)了功能之后就要看看對(duì)后續(xù)維護(hù)非常重要的擴(kuò)展性
當(dāng)然了形庭,一般情況下擴(kuò)展性肯定是在功能實(shí)現(xiàn)之前就規(guī)劃好一些擴(kuò)展點(diǎn)或是使用高擴(kuò)展性的架構(gòu)
這里只是把這塊放到了后面來(lái)講
引擎端點(diǎn)工廠
首先呢铅辞,我覺(jué)得需要給事件引擎EventEngine
和事件端點(diǎn)EventEndpoint
各添加一個(gè)工廠EventEngineFactory
和EventEndpointFactory
這就方便對(duì)兩者做一些擴(kuò)展,同時(shí)這里有一個(gè)經(jīng)驗(yàn)建議
當(dāng)在實(shí)現(xiàn)或封裝一個(gè)庫(kù)時(shí)萨醒,如果邏輯中用到了new
來(lái)創(chuàng)建實(shí)例時(shí)
建議替換為工廠類來(lái)創(chuàng)建或者提供一個(gè)可重寫(xiě)的方法來(lái)創(chuàng)建
為使用者提供一個(gè)自定義入口
引擎端點(diǎn)自定義配置
定義EventEndpointConfigurer
和EventEngineConfigurer
來(lái)提供后置的自定義配置
這里可以類比WebMvcConfigurer
之類的配置類的功能
事件上下文
增加了一個(gè)事件上下文EventContext
的概念巷挥,默認(rèn)基于Map
實(shí)現(xiàn),我們可以在發(fā)布或訂閱時(shí)設(shè)置自定義的屬性來(lái)更靈活的傳遞數(shù)據(jù)和處理邏輯
配置繼承處理器
對(duì)于配置的繼承功能验靡,我也定義了一個(gè)ConfigInheritHandler
接口
考慮到由于版本問(wèn)題如果導(dǎo)致配置繼承出現(xiàn)問(wèn)題因小失大,就可以自定義配置繼承邏輯或是重寫(xiě)其中的一些方法來(lái)避免問(wèn)題
結(jié)束
好了雏节,差不多就這樣吧胜嗓,如果大家有遇到類似的需求可以參考參考這個(gè)思路,或者直接用我寫(xiě)的庫(kù)钩乍,GitHub傳送門(mén)(順便點(diǎn)個(gè)Star吧哈哈哈)