一文詳解Kafka集群環(huán)境搭建兑燥,消息存儲機(jī)制

一、Kafka集群環(huán)境

1琴拧、環(huán)境版本

版本:kafka2.11降瞳,zookeeper3.4

注意:這里zookeeper3.4也是基于集群模式部署。

2蚓胸、解壓重命名

tar -zxvf kafka_2.11-0.11.0.0.tgz

mv kafka_2.11-0.11.0.0 kafka2.11

創(chuàng)建日志目錄

[root@en-master kafka2.11]# mkdir logs

注意:以上操作需要同步到集群下其他服務(wù)上挣饥。

3、添加環(huán)境變量

vim /etc/profile

export KAFKA_HOME=/opt/kafka2.11

export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

4沛膳、修改核心配置

[root@en-master /opt/kafka2.11/config]# vim server.properties

-- 核心修改如下

# 唯一編號

broker.id=0

# 開啟topic刪除

delete.topic.enable=true

# 日志地址

log.dirs=/opt/kafka2.11/logs

# zk集群

zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

注意:broker.id安裝集群服務(wù)個數(shù)編排即可扔枫,集群下不能重復(fù)。

5锹安、啟動kafka集群

# 啟動命令

[root@node02 kafka2.11]# bin/kafka-server-start.sh -daemon config/server.properties

# 停止命令

[root@node02 kafka2.11]# bin/kafka-server-stop.sh

# 進(jìn)程查看

[root@node02 kafka2.11]# jps

注意:這里默認(rèn)啟動了zookeeper集群服務(wù)短荐,并且集群下的kafka分別啟動。

6叹哭、基礎(chǔ)管理命令

創(chuàng)建topic

bin/kafka-topics.sh --zookeeper zk01:2181 \

--create --replication-factor 3 --partitions 1 --topic one-topic

參數(shù)說明:

replication-factor 定義副本個數(shù)

partitions 定義分區(qū)個數(shù)

topic:定義topic名稱

查看topic列表

bin/kafka-topics.sh --zookeeper zk01:2181 --list

修改topic分區(qū)

bin/kafka-topics.sh --zookeeper zk01:2181 --alter --topic one-topic --partitions 5

查看topic

bin/kafka-topics.sh --zookeeper zk01:2181 \

--describe --topic one-topic

發(fā)送消息

bin/kafka-console-producer.sh \

--broker-list 192.168.72.133:9092 --topic one-topic

消費(fèi)消息

bin/kafka-console-consumer.sh \

--bootstrap-server 192.168.72.133:9092 --from-beginning --topic one-topic

刪除topic

bin/kafka-topics.sh --zookeeper zk01:2181 \

--delete --topic first

7忍宋、Zk集群用處

Kafka集群中有一個broker會被選舉為Controller,Controller依賴Zookeeper環(huán)境话速,管理集群broker的上下線,所有topic的分區(qū)副本分配和leader選舉等工作芯侥。

二泊交、消息攔截案例

1、攔截器簡介

Kafka中間件的Producer攔截器主要用于實(shí)現(xiàn)消息發(fā)送的自定義控制邏輯柱查。用戶可以在消息發(fā)送前以及回調(diào)邏輯執(zhí)行前有機(jī)會對消息做一些自定義廓俭,比如消息修改等,發(fā)送狀態(tài)監(jiān)控等唉工,用戶可以指定多個攔截器按順序執(zhí)行攔截研乒。

核心方法

configure:獲取配置信息和初始化數(shù)據(jù)時(shí)調(diào)用;

onSend:消息被序列化以及和計(jì)算分區(qū)前調(diào)用該方法淋硝,可以對消息做操作雹熬;

onAcknowledgement:消息發(fā)送到Broker之后谣膳,或發(fā)送過程失敗時(shí)調(diào)用竿报;

close:關(guān)閉攔截器調(diào)用继谚,執(zhí)行一些資源清理工作;

注意:這里說的攔截器是針對消息發(fā)送流程。

2挚赊、自定義攔截

定義方式:實(shí)現(xiàn)ProducerInterceptor接口即可济瓢。

攔截器一:在onSend方法中,對攔截的消息進(jìn)行修改葬荷。

@Component

public class SendStartInterceptor implements ProducerInterceptor<String, String> {

? ? private final Logger LOGGER = LoggerFactory.getLogger("SendStartInterceptor");

? ? @Override

? ? public void configure(Map<String, ?> configs) {

? ? ? ? LOGGER.info("configs...");

? ? }

? ? @Override

? ? public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {

? ? ? ? // 修改消息內(nèi)容

? ? ? ? return new ProducerRecord<>(record.topic(), record.partition(),

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? record.timestamp(), record.key(),

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "onSend:{" + record.value()+"}");

? ? }

? ? @Override

? ? public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

? ? ? ? LOGGER.info("onAcknowledgement...");

? ? }

? ? @Override

? ? public void close() {

? ? ? ? LOGGER.info("SendStart close...");

? ? }

}

攔截器二:在onAcknowledgement方法中宠漩,判斷消息是否發(fā)送成功。

@Component

public class SendOverInterceptor implements ProducerInterceptor<String, String> {

? ? private final Logger LOGGER = LoggerFactory.getLogger("SendOverInterceptor");

? ? @Override

? ? public void configure(Map<String, ?> configs) {

? ? ? ? LOGGER.info("configs...");

? ? }

? ? @Override

? ? public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {

? ? ? ? LOGGER.info("record...{}", record.value());

? ? ? ? return record ;

? ? }

? ? @Override

? ? public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

? ? ? ? if (exception != null){

? ? ? ? ? ? LOGGER.info("Send Fail...exe-msg",exception.getMessage());

? ? ? ? }

? ? ? ? LOGGER.info("Send success...");

? ? }

? ? @Override

? ? public void close() {

? ? ? ? LOGGER.info("SendOver close...");

? ? }

}

加載攔截器:基于一個KafkaProducer配置Bean火鼻,加入攔截器雕崩。

@Configuration

public class KafkaConfig {

? ? @Bean

? ? public Producer producer (){

? ? ? ? Properties props = new Properties();

? ? ? ? // 省略其他配置...

? ? ? ? // 添加攔截器

? ? ? ? List<String> interceptors = new ArrayList<>();

? ? ? ? interceptors.add("com.kafka.cluster.interceptor.SendStartInterceptor");

? ? ? ? interceptors.add("com.kafka.cluster.interceptor.SendOverInterceptor");

? ? ? ? props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

? ? ? ? return new KafkaProducer<>(props) ;

? ? }

}

3、代碼案例

@RestController

public class SendMsgWeb {

? ? @Resource

? ? private KafkaProducer<String,String> producer ;

? ? @GetMapping("/sendMsg")

? ? public String sendMsg (){

? ? ? ? producer.send(new ProducerRecord<>("one-topic", "msgKey", "msgValue"));

? ? ? ? return "success" ;

? ? }

}

基于上述自定義Bean類型盼铁,進(jìn)行消息發(fā)送,關(guān)注攔截器中打印日志信息鹏控。

三肤寝、Kafka存儲分析

說明:該過程基于上述案例producer.send方法追蹤的源碼執(zhí)行流程当辐,源碼中的過程相對清楚鲤看,涉及的核心流程如下。

1找筝、消息生成過程

?

Producer發(fā)送消息采用的是異步發(fā)送的方式,消息發(fā)送過程如下:

Producer發(fā)送消息之后慷吊,經(jīng)過攔截器,序列化罢浇,事務(wù)判斷沐祷;

流程執(zhí)行后攒岛,消息內(nèi)容放入容器中;

容器在指定時(shí)間內(nèi)如果裝滿(size),會喚醒Sender線程;

容器如果在指定時(shí)間內(nèi)沒有裝滿兢榨,也會執(zhí)行一次Sender線程喚醒;

喚醒Sender線程之后吵聪,把容器數(shù)據(jù)拉取到topic中兼雄;

絮叨一句:讀這些中間件的源碼,不僅能開闊思維赦肋,也會讓自己意識到平時(shí)寫的代碼可能真的叫搬磚。

2囱井、存儲機(jī)制

Kafka中消息是以topic進(jìn)行標(biāo)識分類趣避,生產(chǎn)者面向topic生產(chǎn)消息,topic分區(qū)(partition)是物理上的存儲程帕,基于消息日志文件的方式。

?

每個partition對應(yīng)于一個log文件,發(fā)送的消息不斷追加到該log文件末端澎羞;

log文件中存儲的就是producer生產(chǎn)的消息數(shù)據(jù)敛苇,采用分片和索引機(jī)制顺呕;

partition分為多個segment。每個segment對應(yīng)兩個(.index)和(.log)文件株茶;

index文件類型存儲的索引信息;

log文件存儲消息的數(shù)據(jù)蹦掐;

索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址;

消費(fèi)者組中的每個消費(fèi)者卧抗,都會實(shí)時(shí)記錄消費(fèi)的消息offset位置;

當(dāng)然消息消費(fèi)出錯時(shí)拙绊,恢復(fù)是從上次的記錄位置繼續(xù)消費(fèi);

3标沪、事務(wù)控制機(jī)制

?

Kafka支持消息的事務(wù)控制

Producer事務(wù)

跨分區(qū)跨會話的事務(wù)原理嗜傅,引入全局唯一的TransactionID,并將Producer獲得的PID和TransactionID綁定趴梢。Producer重啟后可以通過正在進(jìn)行的TransactionID獲得原來的PID币他。Kafka基于TransactionCoordinator組件管理Transaction,Producer通過和TransactionCoordinator交互獲得TransactionID對應(yīng)的任務(wù)狀態(tài)蝴悉。TransactionCoordinator將事務(wù)狀態(tài)寫入Kafka的內(nèi)部Topic,即使整個服務(wù)重啟拍冠,進(jìn)行中的事務(wù)狀態(tài)可以得到恢復(fù)。

Consumer事務(wù)

Consumer消息消費(fèi)射众,事務(wù)的保證強(qiáng)度很低,無法保證消息被精確消費(fèi)叨橱,因?yàn)橥皇聞?wù)的消息可能會出現(xiàn)重啟后已經(jīng)被刪除的情況断盛。

四、源代碼地址

GitHub·地址

https://github.com/cicadasmile/data-manage-parent

GitEE·地址

https://gitee.com/cicadasmile/data-manage-parent

最后

針對于上面的文章我總結(jié)出了相關(guān)知識點(diǎn)做成了文檔和架構(gòu)視頻免費(fèi)分享給大家(包括Dubbo伙菜、Redis命迈、Netty贩绕、zookeeper淑倾、Spring cloud卫玖、分布式踊淳、高并發(fā)等架構(gòu)技術(shù)資料),希望能幫助到您面試前的復(fù)習(xí)且找到一個好的工作脱茉,也節(jié)省大家在網(wǎng)上搜索資料的時(shí)間來學(xué)習(xí)垄开,也可以關(guān)注我一下以后會有更多干貨分享。

直接添加微信:XianXian010501免費(fèi)領(lǐng)取資料

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末榜田,一起剝皮案震驚了整個濱河市锻梳,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌疑枯,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件废亭,死亡現(xiàn)場離奇詭異具钥,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)骂删,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進(jìn)店門桃漾,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拟逮,“玉大人,你說我怎么就攤上這事敦迄∑炯#” “怎么了?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵嗅绸,是天一觀的道長撕彤。 經(jīng)常有香客問我,道長蚀狰,這世上最難降的妖魔是什么职员? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任焊切,我火速辦了婚禮扮授,結(jié)果婚禮上专肪,老公的妹妹穿的比我還像新娘。我一直安慰自己深夯,他們只是感情好诺苹,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布收奔。 她就那樣靜靜地躺著,像睡著了一般坪哄。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上模暗,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天兑宇,我揣著相機(jī)與錄音粱坤,去河邊找鬼瓷产。 笑死枚驻,一個胖子當(dāng)著我的面吹牛濒旦,可吹牛的內(nèi)容都是我干的尔邓。 我是一名探鬼主播锉矢,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼沈撞,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了显晶?” 一聲冷哼從身側(cè)響起壹士,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤躏救,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后崩掘,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體少办,經(jīng)...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡英妓,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年蔓纠,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了腿倚。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡潦刃,死狀恐怖懈叹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情胧洒,我是刑警寧澤墨状,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布肾砂,位于F島的核電站镐确,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏诗越。R本人自食惡果不足惜息堂,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一荣堰、第九天 我趴在偏房一處隱蔽的房頂上張望振坚。 院中可真熱鬧,春花似錦只酥、人聲如沸呀狼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽窟勃。三九已至逗堵,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間汁咏,已是汗流浹背作媚。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工漂问, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留女揭,地道東北人田绑。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓掩驱,卻偏偏與公主長得像欧穴,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子拼苍,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,901評論 2 355