問(wèn)題來(lái)源
我司最近剛重構(gòu)完,終于有時(shí)間去償還欠下的技術(shù)債了修肠。
最先準(zhǔn)備改造的就是將一些原本應(yīng)該異步執(zhí)行而因重構(gòu)時(shí)間緊而被迫同步執(zhí)行的方法林说,通過(guò)消息隊(duì)列異步化。
在原來(lái)的老項(xiàng)目中蔼卡,我們使用的是kafka喊崖。因此我們對(duì)kafka的熟悉程度遠(yuǎn)勝于其他消息隊(duì)列,也正因此雇逞,我們?cè)谛孪到y(tǒng)中依然采用了kafka作為我們的消息隊(duì)列中間件荤懂。
而擺在我們面前的問(wèn)題是,該如何在Spring Boot環(huán)境下集成kafka呢塘砸?
spring-kafka作為spring的全家桶成員节仿,是目前最簡(jiǎn)單的集成kafka的方式,自然也成為了我們的首選掉蔬。
這個(gè)集成kafka的重任廊宪,最終落到我的組員頭上。
在前期的編碼階段女轿,他一帆風(fēng)順箭启,并沒(méi)有遇到什么問(wèn)題。但在后期測(cè)試的時(shí)候蛉迹,他發(fā)現(xiàn)了一個(gè)問(wèn)題傅寡。因?yàn)閠opics肯定會(huì)隨著業(yè)務(wù)的發(fā)展而不斷增加,所以@KafkaListener注解下的topics字段肯定不能像demo一樣寫死固定的topic北救。
他最初詢問(wèn)我的時(shí)候荐操,我也沒(méi)有很好的辦法,因?yàn)槲抑按罱╧afka的時(shí)候扭倾,topics是我通過(guò)寫代碼的方式加載到消費(fèi)者線程上去的淀零。但是在注解中,這個(gè)方法顯然連編譯都通不過(guò)膛壹。
然后他也嘗試過(guò)將topics寫在常量類中驾中,而不再寫在枚舉中。這樣當(dāng)然能解決問(wèn)題模聋,但是這么做肩民,對(duì)后續(xù)的調(diào)用者而言,簡(jiǎn)直是個(gè)災(zāi)難链方。
調(diào)用者不光需要在常量類中添加topic和維護(hù)topic對(duì)應(yīng)的業(yè)務(wù)消費(fèi)方法持痰,還需要額外將添加的topic加入到@KafkaListener注解中。
最后一步至關(guān)重要祟蚀,如果不將topic加到注解中工窍,listener將不會(huì)監(jiān)聽該topic割卖,也自然無(wú)法消費(fèi)對(duì)應(yīng)的topic了,但這最后一步顯然是最容易被大家遺忘的患雏。
解決方案
那么還有什么更好的方案么鹏溯?
那自然是有的。
既然topics字段需要一個(gè)字符串?dāng)?shù)組淹仑,那我們可以通過(guò)spel語(yǔ)言丙挽,例如topics = {"#{'${topics}'.split(',')}"}這么寫,然后通過(guò)字符串分割來(lái)動(dòng)態(tài)生成一個(gè)字符串?dāng)?shù)組匀借。
spel語(yǔ)言在spring中應(yīng)用廣泛颜阐,尤其是在注解中。我們之前也通過(guò)spel解決了分布式鎖中動(dòng)態(tài)參數(shù)的問(wèn)題吓肋。
但是這么寫帶來(lái)了另外一個(gè)問(wèn)題凳怨。 如果Spring在加載@KafkaListener對(duì)應(yīng)的消費(fèi)者bean時(shí)(下稱ConsumerListener),找不到topics是鬼,就會(huì)報(bào)『Could not resolve placeholder 'topics' in value "#{'${topics}'.split(',')}"』的錯(cuò)誤猿棉。
那么${topics}該如何被替換呢?
同學(xué)們應(yīng)該很快就能想到可以通過(guò)配置文件來(lái)處理這個(gè)問(wèn)題屑咳。
最簡(jiǎn)單的,自然是在application.properties中配置用逗號(hào)隔開的多個(gè)topic了弊琴。
但是這個(gè)解決方案兆龙,還是不太友好。雖然我們可以采用apollo或者disconf等分布式配置中心來(lái)管理配置文件敲董,但依然沒(méi)有解決很容易遺忘配置topic的問(wèn)題紫皇。
這時(shí)候,我們應(yīng)該回顧Spring Boot配置屬性加載的相關(guān)內(nèi)容了腋寨。
既然配置文件能解決${topics}被替換的問(wèn)題聪铺,那么加載優(yōu)先級(jí)更高的配置自然也可以解決該問(wèn)題。而優(yōu)先級(jí)比配置文件更高的配置中萄窜,我們可以發(fā)現(xiàn)Java系統(tǒng)參數(shù)(System.getProperties())是個(gè)可以利用的點(diǎn)铃剔。
我們可以新創(chuàng)建一個(gè)KafkaTopicConfig的配置類,加上@Configuration注解查刻,然后在該配置類初始化后通過(guò)System.setProperty("topics", topics)把topics加到系統(tǒng)參數(shù)中键兜。
具體在bean初始化后執(zhí)行指定方法做法有四種,實(shí)現(xiàn) InitializingBean接口定制初始化后的方法穗泵,通過(guò) <bean> 元素的 init-method屬性指定初始化后調(diào)用的操作普气,在指定方法上加上@PostConstruct來(lái)指定該方法在初始化后調(diào)用,以及最簡(jiǎn)單的構(gòu)造器佃延。
本文選擇了實(shí)現(xiàn) InitializingBean接口來(lái)完成該操作现诀。具體代碼如下:
@Configuration
public class KafkaTopicConfig implements InitializingBean {
@Override
public void afterPropertiesSet() {
String topics = Sets.newHashSet(KafkaTopicEnum.values()).stream()
.map(KafkaTopicEnum::getTopic).collect(Collectors.joining(","));
System.setProperty("topics", topics);
}
}
然后還需要注意的一點(diǎn)就是夷磕,KafkaTopicConfig配置類必須在ConsumerListener類之前加載到Spring的容器內(nèi),否則依然會(huì)在加載ConsumerListener類時(shí)報(bào)『Could not resolve placeholder 'topics' in value "#{'${topics}'.split(',')}"』的錯(cuò)誤仔沿。
而Spring Boot的bean裝配規(guī)則是根據(jù)Application類所在的包位置從近到遠(yuǎn)進(jìn)行掃描的坐桩,所以如果KafkaTopicConfig所在目錄離Application的距離比ConsumerListener所在目錄更遠(yuǎn),就會(huì)導(dǎo)致ConsumerListener在KafkaTopicConfig之前加載于未。
為了避免出現(xiàn)這種情況撕攒,我們必須調(diào)整這兩個(gè)bean的加載順序。具體修改加載bean加載順序的方式有很多種烘浦,我們采用了在ConsumerListener類上加DependsOn注解的方式來(lái)解決抖坪,如下所示:
@DependsOn(value = "kafkaTopicConfig")
至此,動(dòng)態(tài)加載@KafkaListener的topics的問(wèn)題就被完美解決了闷叉。
好了擦俐,我們下一期再見,歡迎大家一起留言討論握侧。同時(shí)也歡迎點(diǎn)贊和送小星星~