kafka應(yīng)用最少需要兩部分柒室,一部分是producer履磨,另外一部分是consumer,這兩部分可以在一個(gè)應(yīng)用中蛉抓,也可以不在一個(gè)應(yīng)用中。在通常情況下剃诅,為了消費(fèi)性能巷送,可能需要多個(gè)消費(fèi)者,也可能需要多個(gè)生產(chǎn)者矛辕,而消費(fèi)者和生產(chǎn)者可能處于不同的位置或者環(huán)境笑跛,所以本示例將生產(chǎn)者和消費(fèi)者放在不同的應(yīng)用中付魔。
生產(chǎn)者端
引入依賴(lài)
在spring boot中使用kafka生產(chǎn)者端,需要引入如下依賴(lài)
<dependencies>
......
<!--Spring 的kafka依賴(lài)-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
......
</dependencies>
啟用kafka支持
在項(xiàng)目中的配置類(lèi)飞蹂,或者啟動(dòng)類(lèi)上增加@EnableKafka,它會(huì)幫助我們創(chuàng)建一些必要的Bean几苍,包括KafkaTemplate,KafkaMessageListenerContainer等
@EnableKafka
public class KafkaConfig{
}
修改連接配置
修改producer的配置文件,配置連接地址
spring:
kafka:
# 指定kafka集群地址陈哑,多為地址用逗號(hào)分割
bootstrap-servers: dm105:9092,dm106:9092,dm107:9092
創(chuàng)建Topic
如果你的topic還未在kafka中創(chuàng)建妻坝,則可以使用spring-boot自動(dòng)創(chuàng)建主題,只需創(chuàng)建一個(gè)類(lèi)型為NewTopic的bean,并指定topic相關(guān)的信息即可
/**
* 新建一個(gè)主題
*
* @return
*/
@Bean
public NewTopic testTopic() {
return TopicBuilder.name("test")// 指定主題名稱(chēng)
.partitions(30) // 指定分區(qū)數(shù)量惊窖,這個(gè)數(shù)量通常要大于消費(fèi)者的數(shù)量刽宪,按消費(fèi)者線(xiàn)程數(shù)計(jì)算
.replicas(2) // 指定副本數(shù)量
.compact()
.build();
}
基本的配置已經(jīng)完成,接下來(lái)就是發(fā)送消息了界酒。
使用KafkaTemplate發(fā)送消息
接下來(lái)使用kafkaTempalte發(fā)送消息到服務(wù)端,以下是一個(gè)極簡(jiǎn)示例
/**
* 使用Spring boot test測(cè)試消息發(fā)送
*/
@SpringBootTest
class KafkaDemoApplicationTests {
/**
* 注入KafkaTemplate,用于發(fā)送消息
*/
@Autowired
private KafkaTemplate template;
@Test
public void newMessage() {
System.out.println("start at " + ZonedDateTime.now() + "");
for (int i = 0; i < 1000000; i++) {
long now = System.currentTimeMillis();
// 調(diào)用template,將消息發(fā)送到kafka
// 第一個(gè)參數(shù)是topic名稱(chēng)圣拄,第二個(gè)參數(shù)是要發(fā)送的消息內(nèi)容
template.send("test", "adg" + now);
}
}
}
消費(fèi)者端
根據(jù)上面的生產(chǎn)者,需要一個(gè)消費(fèi)者來(lái)消費(fèi)生產(chǎn)者生產(chǎn)的數(shù)據(jù)毁欣。spring boot整合kafka的消費(fèi)者也非常方便
引入依賴(lài)
生產(chǎn)者和消費(fèi)者的依賴(lài)是一致的庇谆。在此不再贅述
啟用kafka支持
該操作和生產(chǎn)者應(yīng)用一致,不再贅述署辉。
修改連接配置
消費(fèi)者的配置需要除了需要指定連之外族铆,最好指定一些額外的配置參數(shù),以便提高消費(fèi)者性能
spring:
kafka:
# 指定kafka集群地址
bootstrap-servers: dm105:9092,dm106:9092,dm107:9092
consumer:
# 如果兩個(gè)應(yīng)用程序?yàn)椴⑿邢M(fèi)某個(gè)topic的消息哭尝,需要將兩個(gè)應(yīng)用的group-id指定一致
group-id: "message-group"
listener:
# 指定消息消費(fèi)的模式,type=batch代表可以批量消費(fèi)
type: batch
# 指定消費(fèi)者的并發(fā)數(shù)剖煌,也就是可以同時(shí)有多少個(gè)消費(fèi)者線(xiàn)程在監(jiān)聽(tīng)數(shù)據(jù)材鹦,默認(rèn)為1,
# 更具情況設(shè)置并行數(shù)據(jù)耕姊,通常建議最小為Cpu的核心數(shù)
concurrency: 16
創(chuàng)建消費(fèi)者
消費(fèi)者的就是一個(gè)普通的Spring bean.在對(duì)應(yīng)的方法上添加@KafkaListener注解桶唐,并指定需要消費(fèi)的topic即可開(kāi)始消費(fèi)者監(jiān)聽(tīng)。
@Component
public class Consumer {
/**
* 注入repository,用戶(hù)數(shù)據(jù)持久化(略)
*/
@Autowired
private MessageRepository repository;
/**
* 使用@KafkaListener注解標(biāo)記消費(fèi)方法茉兰,指定topics屬性指定監(jiān)聽(tīng)的待消費(fèi)topic
*
* @param messages 待消費(fèi)的數(shù)據(jù)尤泽,由于啟用了批量消費(fèi)模式,所以監(jiān)聽(tīng)獲取到的是一個(gè)集合
*/
@KafkaListener(topics = {"test"})
@Transactional
public void test(List<String> messages) {
List<Message> result = messages.stream().map(Message::new).collect(Collectors.toList());
repository.saveAll(result);
System.out.println("save message [" + messages.size() + "] 條 at" + ZonedDateTime.now().toString());
}
}
測(cè)試項(xiàng)目
當(dāng)項(xiàng)目構(gòu)建完成之后规脸,可以按照如下步驟來(lái)測(cè)試項(xiàng)目
- 啟動(dòng)消費(fèi)者程序
- 執(zhí)行生產(chǎn)者測(cè)試代碼坯约,觀察生產(chǎn)者執(zhí)行結(jié)果
項(xiàng)目倉(cāng)庫(kù)地址
完整項(xiàng)目參考https://github.com/ldwqh0/hadoop-demo