一、Kafka集群環(huán)境
1琴拧、環(huán)境版本
版本:kafka2.11降瞳,zookeeper3.4
注意:這里zookeeper3.4也是基于集群模式部署。
2蚓胸、解壓重命名
tar -zxvf kafka_2.11-0.11.0.0.tgz
mv kafka_2.11-0.11.0.0 kafka2.11
創(chuàng)建日志目錄
[root@en-master kafka2.11]# mkdir logs
注意:以上操作需要同步到集群下其他服務(wù)上挣饥。
3、添加環(huán)境變量
vim /etc/profile
export KAFKA_HOME=/opt/kafka2.11
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
4沛膳、修改核心配置
[root@en-master /opt/kafka2.11/config]# vim server.properties
-- 核心修改如下
# 唯一編號
broker.id=0
# 開啟topic刪除
delete.topic.enable=true
# 日志地址
log.dirs=/opt/kafka2.11/logs
# zk集群
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
注意:broker.id安裝集群服務(wù)個數(shù)編排即可扔枫,集群下不能重復(fù)。
5锹安、啟動kafka集群
# 啟動命令
[root@node02 kafka2.11]# bin/kafka-server-start.sh -daemon config/server.properties
# 停止命令
[root@node02 kafka2.11]# bin/kafka-server-stop.sh
# 進(jìn)程查看
[root@node02 kafka2.11]# jps
注意:這里默認(rèn)啟動了zookeeper集群服務(wù)短荐,并且集群下的kafka分別啟動。
6叹哭、基礎(chǔ)管理命令
創(chuàng)建topic
bin/kafka-topics.sh --zookeeper zk01:2181 \
--create --replication-factor 3 --partitions 1 --topic one-topic
參數(shù)說明:
replication-factor 定義副本個數(shù)
partitions 定義分區(qū)個數(shù)
topic:定義topic名稱
查看topic列表
bin/kafka-topics.sh --zookeeper zk01:2181 --list
修改topic分區(qū)
bin/kafka-topics.sh --zookeeper zk01:2181 --alter --topic one-topic --partitions 5
查看topic
bin/kafka-topics.sh --zookeeper zk01:2181 \
--describe --topic one-topic
發(fā)送消息
bin/kafka-console-producer.sh \
--broker-list 192.168.72.133:9092 --topic one-topic
消費(fèi)消息
bin/kafka-console-consumer.sh \
--bootstrap-server 192.168.72.133:9092 --from-beginning --topic one-topic
刪除topic
bin/kafka-topics.sh --zookeeper zk01:2181 \
--delete --topic first
7忍宋、Zk集群用處
Kafka集群中有一個broker會被選舉為Controller,Controller依賴Zookeeper環(huán)境话速,管理集群broker的上下線,所有topic的分區(qū)副本分配和leader選舉等工作芯侥。
二泊交、消息攔截案例
1、攔截器簡介
Kafka中間件的Producer攔截器主要用于實(shí)現(xiàn)消息發(fā)送的自定義控制邏輯柱查。用戶可以在消息發(fā)送前以及回調(diào)邏輯執(zhí)行前有機(jī)會對消息做一些自定義廓俭,比如消息修改等,發(fā)送狀態(tài)監(jiān)控等唉工,用戶可以指定多個攔截器按順序執(zhí)行攔截研乒。
核心方法
configure:獲取配置信息和初始化數(shù)據(jù)時(shí)調(diào)用;
onSend:消息被序列化以及和計(jì)算分區(qū)前調(diào)用該方法淋硝,可以對消息做操作雹熬;
onAcknowledgement:消息發(fā)送到Broker之后谣膳,或發(fā)送過程失敗時(shí)調(diào)用竿报;
close:關(guān)閉攔截器調(diào)用继谚,執(zhí)行一些資源清理工作;
注意:這里說的攔截器是針對消息發(fā)送流程。
2挚赊、自定義攔截
定義方式:實(shí)現(xiàn)ProducerInterceptor接口即可济瓢。
攔截器一:在onSend方法中,對攔截的消息進(jìn)行修改葬荷。
@Component
public class SendStartInterceptor implements ProducerInterceptor<String, String> {
? ? private final Logger LOGGER = LoggerFactory.getLogger("SendStartInterceptor");
? ? @Override
? ? public void configure(Map<String, ?> configs) {
? ? ? ? LOGGER.info("configs...");
? ? }
? ? @Override
? ? public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
? ? ? ? // 修改消息內(nèi)容
? ? ? ? return new ProducerRecord<>(record.topic(), record.partition(),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? record.timestamp(), record.key(),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "onSend:{" + record.value()+"}");
? ? }
? ? @Override
? ? public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
? ? ? ? LOGGER.info("onAcknowledgement...");
? ? }
? ? @Override
? ? public void close() {
? ? ? ? LOGGER.info("SendStart close...");
? ? }
}
攔截器二:在onAcknowledgement方法中宠漩,判斷消息是否發(fā)送成功。
@Component
public class SendOverInterceptor implements ProducerInterceptor<String, String> {
? ? private final Logger LOGGER = LoggerFactory.getLogger("SendOverInterceptor");
? ? @Override
? ? public void configure(Map<String, ?> configs) {
? ? ? ? LOGGER.info("configs...");
? ? }
? ? @Override
? ? public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
? ? ? ? LOGGER.info("record...{}", record.value());
? ? ? ? return record ;
? ? }
? ? @Override
? ? public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
? ? ? ? if (exception != null){
? ? ? ? ? ? LOGGER.info("Send Fail...exe-msg",exception.getMessage());
? ? ? ? }
? ? ? ? LOGGER.info("Send success...");
? ? }
? ? @Override
? ? public void close() {
? ? ? ? LOGGER.info("SendOver close...");
? ? }
}
加載攔截器:基于一個KafkaProducer配置Bean火鼻,加入攔截器雕崩。
@Configuration
public class KafkaConfig {
? ? @Bean
? ? public Producer producer (){
? ? ? ? Properties props = new Properties();
? ? ? ? // 省略其他配置...
? ? ? ? // 添加攔截器
? ? ? ? List<String> interceptors = new ArrayList<>();
? ? ? ? interceptors.add("com.kafka.cluster.interceptor.SendStartInterceptor");
? ? ? ? interceptors.add("com.kafka.cluster.interceptor.SendOverInterceptor");
? ? ? ? props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
? ? ? ? return new KafkaProducer<>(props) ;
? ? }
}
3、代碼案例
@RestController
public class SendMsgWeb {
? ? @Resource
? ? private KafkaProducer<String,String> producer ;
? ? @GetMapping("/sendMsg")
? ? public String sendMsg (){
? ? ? ? producer.send(new ProducerRecord<>("one-topic", "msgKey", "msgValue"));
? ? ? ? return "success" ;
? ? }
}
基于上述自定義Bean類型盼铁,進(jìn)行消息發(fā)送,關(guān)注攔截器中打印日志信息鹏控。
三肤寝、Kafka存儲分析
說明:該過程基于上述案例producer.send方法追蹤的源碼執(zhí)行流程当辐,源碼中的過程相對清楚鲤看,涉及的核心流程如下。
1找筝、消息生成過程
?
Producer發(fā)送消息采用的是異步發(fā)送的方式,消息發(fā)送過程如下:
Producer發(fā)送消息之后慷吊,經(jīng)過攔截器,序列化罢浇,事務(wù)判斷沐祷;
流程執(zhí)行后攒岛,消息內(nèi)容放入容器中;
容器在指定時(shí)間內(nèi)如果裝滿(size),會喚醒Sender線程;
容器如果在指定時(shí)間內(nèi)沒有裝滿兢榨,也會執(zhí)行一次Sender線程喚醒;
喚醒Sender線程之后吵聪,把容器數(shù)據(jù)拉取到topic中兼雄;
絮叨一句:讀這些中間件的源碼,不僅能開闊思維赦肋,也會讓自己意識到平時(shí)寫的代碼可能真的叫搬磚。
2囱井、存儲機(jī)制
Kafka中消息是以topic進(jìn)行標(biāo)識分類趣避,生產(chǎn)者面向topic生產(chǎn)消息,topic分區(qū)(partition)是物理上的存儲程帕,基于消息日志文件的方式。
?
每個partition對應(yīng)于一個log文件,發(fā)送的消息不斷追加到該log文件末端澎羞;
log文件中存儲的就是producer生產(chǎn)的消息數(shù)據(jù)敛苇,采用分片和索引機(jī)制顺呕;
partition分為多個segment。每個segment對應(yīng)兩個(.index)和(.log)文件株茶;
index文件類型存儲的索引信息;
log文件存儲消息的數(shù)據(jù)蹦掐;
索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址;
消費(fèi)者組中的每個消費(fèi)者卧抗,都會實(shí)時(shí)記錄消費(fèi)的消息offset位置;
當(dāng)然消息消費(fèi)出錯時(shí)拙绊,恢復(fù)是從上次的記錄位置繼續(xù)消費(fèi);
3标沪、事務(wù)控制機(jī)制
?
Kafka支持消息的事務(wù)控制
Producer事務(wù)
跨分區(qū)跨會話的事務(wù)原理嗜傅,引入全局唯一的TransactionID,并將Producer獲得的PID和TransactionID綁定趴梢。Producer重啟后可以通過正在進(jìn)行的TransactionID獲得原來的PID币他。Kafka基于TransactionCoordinator組件管理Transaction,Producer通過和TransactionCoordinator交互獲得TransactionID對應(yīng)的任務(wù)狀態(tài)蝴悉。TransactionCoordinator將事務(wù)狀態(tài)寫入Kafka的內(nèi)部Topic,即使整個服務(wù)重啟拍冠,進(jìn)行中的事務(wù)狀態(tài)可以得到恢復(fù)。
Consumer事務(wù)
Consumer消息消費(fèi)射众,事務(wù)的保證強(qiáng)度很低,無法保證消息被精確消費(fèi)叨橱,因?yàn)橥皇聞?wù)的消息可能會出現(xiàn)重啟后已經(jīng)被刪除的情況断盛。
四、源代碼地址
GitHub·地址
https://github.com/cicadasmile/data-manage-parent
GitEE·地址
https://gitee.com/cicadasmile/data-manage-parent
最后
針對于上面的文章我總結(jié)出了相關(guān)知識點(diǎn)做成了文檔和架構(gòu)視頻免費(fèi)分享給大家(包括Dubbo伙菜、Redis命迈、Netty贩绕、zookeeper淑倾、Spring cloud卫玖、分布式踊淳、高并發(fā)等架構(gòu)技術(shù)資料),希望能幫助到您面試前的復(fù)習(xí)且找到一個好的工作脱茉,也節(jié)省大家在網(wǎng)上搜索資料的時(shí)間來學(xué)習(xí)垄开,也可以關(guān)注我一下以后會有更多干貨分享。
直接添加微信:XianXian010501免費(fèi)領(lǐng)取資料