Apache Kafka 是一個分布式的流數(shù)據(jù)平臺,最初由LinkedIn公司開發(fā)判族,后成為 Apache 基金會的開源項目躺盛。Kafka 主要用于解決高吞吐量、低延遲的實時數(shù)據(jù)傳輸和處理問題形帮,特別適用于日志收集槽惫、流式處理周叮、事件驅動等場景。
特點
kafka有如下特點:
- 高吞吐量:Kafka每秒可以處理數(shù)十萬條消息的寫入和讀取界斜。
- 分布式:Kafka采用分布式架構仿耽,可以將消息分布在多個節(jié)點上進行存儲和處理。
- 持久化:Kafka使用磁盤持久化消息各薇,可以將消息持久保存在磁盤上项贺,即使消費者不在線也不會丟失消息。
- 發(fā)布-訂閱模型:Kafka采用發(fā)布-訂閱模型得糜,允許生產(chǎn)者將消息發(fā)布到一個或多個主題(Topic)敬扛,而消費者可以訂閱一個或多個主題來接收消息。
- 分區(qū)和副本:Kafka將每個主題分成多個分區(qū)(Partition)朝抖,每個分區(qū)都是一個獨立的存儲單元啥箭,可以在不同的節(jié)點上進行分布,目的是為了將數(shù)據(jù)分散存儲治宣、提高并行處理能力急侥。Kafka 副本 是 Kafka 中為了實現(xiàn)數(shù)據(jù)冗余和高可用性而存在的。每個 Kafka 分區(qū)都有多個副本
- 實時處理:Kafka支持實時數(shù)據(jù)處理侮邀,可以與流處理框架(如Apache Flink和Apache Spark)集成坏怪。
- 消息保留:Kafka可以根據(jù)配置的策略保留消息的時間或大小。
- 可擴展性:Kafka可以水平擴展绊茧,通過添加更多的節(jié)點來增加處理能力和存儲容量铝宵,而不會中斷現(xiàn)有的消息流。
缺點:
- 不支持死信隊列华畏,可能造成消息丟失
- 不支持定時消息
- 沒有完整的順序消息機制
集群
Kafka 集群由多個Broker組成鹏秋,每個 Broker 負責處理消息的存儲、分發(fā)和管理亡笑。在 Kafka 集群中侣夷,節(jié)點的工作可以分為幾個主要部分:
- Broker:Kafka 的核心組成部分,負責處理消息的生產(chǎn)和消費請求仑乌、存儲消息百拓,以及管理消息的分區(qū)和副本。
- ZooKeeper 或 KRaft 模式:用于管理集群的元數(shù)據(jù)晰甚、選舉 Controller衙传,以及協(xié)調集群中的各種任務。早期使用 ZooKeeper 進行集群管理厕九,而2.8.0版本中的 KRaft 模式(Kafka Raft Metadata Mode)則移除了對 ZooKeeper 的依賴粪牲。
- Controller:Kafka 集群的中央管理者,負責管理分區(qū)的副本狀態(tài)止剖、分配 Leader 副本腺阳、處理節(jié)點的上下線等。
Broker管理:
- Broker 的啟動與注冊
- 每個 Kafka Broker 節(jié)點啟動后穿香,會向 ZooKeeper(或 KRaft 元數(shù)據(jù)管理器)注冊自己亭引。ZooKeeper 保存所有 Broker 的信息,包括其 ID皮获、地址和端口等焙蚓。
- 在 KRaft 模式下,Broker 節(jié)點直接與 KRaft 元數(shù)據(jù)日志通信洒宝,不再通過 ZooKeeper购公。
- Broker 上下線的處理
- 當一個 Broker 節(jié)點啟動或加入集群時,它會通知 Controller 節(jié)點雁歌,Controller 會將該 Broker 添加到元數(shù)據(jù)中宏浩,并為它分配分區(qū)和副本。
- 如果某個 Broker 節(jié)點失效(網(wǎng)絡故障靠瞎、機器宕機等)比庄,Controller 會檢測到該 Broker 的下線,并將該 Broker 上存儲的分區(qū)副本重新分配到其他正常運行的 Broker 上乏盐。這個過程稱為 副本重分配佳窑。
Kafka 的 server.properties 示例(KRaft 模式):
# 表示該節(jié)點既是 Broker,也是 Controller父能。如果只想讓某個節(jié)點充當 Controller神凑,可以配置為 controller。
process.roles=broker,controller
node.id=1
# 指定元數(shù)據(jù)管理的控制器節(jié)點及其端口何吝。這些節(jié)點組成 Kafka 元數(shù)據(jù)的管理層溉委,負責元數(shù)據(jù)的存儲和同步。1@kafka1:9093 表示第 1 個控制器節(jié)點運行在 kafka1 主機的 9093 端口岔霸。
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
# 定義 Kafka 節(jié)點的監(jiān)聽端口薛躬,包括用于 Broker 的 PLAINTEXT 協(xié)議和用于 Controller 的 CONTROLLER 協(xié)議。
listeners=PLAINTEXT://kafka1:9092,CONTROLLER://kafka1:9093
# 指定 Kafka 對外暴露的 IP 或域名呆细,供客戶端連接使用型宝。
advertised.listeners=PLAINTEXT://<外部訪問的IP或域名>:9092
# Kafka 的元數(shù)據(jù)日志存儲位置
log.dirs=/var/lib/kafka/logs
# 指定 Kafka 使用 CONTROLLER 監(jiān)聽器處理集群元數(shù)據(jù)。
controller.listener.names=CONTROLLER
# KRaft 模式的集群元數(shù)據(jù)目錄
metadata.log.dir=/var/lib/kafka/metadata
# 每個分區(qū)的默認副本數(shù)
default.replication.factor=3
Partition
在 Kafka 中絮爷,分區(qū)(Partition)是一種邏輯上的數(shù)據(jù)劃分方式趴酣,它將一個主題(Topic)的數(shù)據(jù)分成多個部分,每個分區(qū)都是一個有序坑夯、持久化的日志隊列岖寞。每個分區(qū)都可以獨立地存儲和管理數(shù)據(jù),Kafka 使用分區(qū)來實現(xiàn)高吞吐量和水平擴展柜蜈。
分區(qū)的特點
- 水平擴展: 分區(qū)允許將一個主題的數(shù)據(jù)劃分為多個分區(qū)仗谆,每個分區(qū)可以在不同的 broker 節(jié)點上存儲指巡。這樣可以實現(xiàn)數(shù)據(jù)的水平擴展,提高了整個系統(tǒng)的吞吐量隶垮。
- 順序性: 在一個分區(qū)內藻雪,數(shù)據(jù)的順序是有保障的,即使在不同分區(qū)之間數(shù)據(jù)的順序不保證狸吞,但在同一分區(qū)內的數(shù)據(jù)會按照寫入的順序存儲和讀取勉耀。
- 容錯性: Kafka 使用分區(qū)的復制機制來實現(xiàn)數(shù)據(jù)的冗余備份,保證數(shù)據(jù)的可靠性蹋偏。一個分區(qū)可以有多個副本(Replica)便斥,每個副本位于不同的 broker 節(jié)點上,如果某個副本不可用威始,可以通過其他副本來恢復數(shù)據(jù)枢纠。
- 負載均衡: 通過調整分區(qū)的數(shù)量和分布,可以實現(xiàn)對消息消費者的負載均衡字逗。消費者可以同時從多個分區(qū)中讀取數(shù)據(jù)京郑,從而提高消費的并行度。
副本因子
副本因子(Replication Factor)是指 Kafka 中每個分區(qū)的副本數(shù)葫掉。副本因子是 Kafka 實現(xiàn)高可用性和數(shù)據(jù)冗余的關鍵機制之一些举。通過為每個分區(qū)設置多個副本,可以確保分區(qū)數(shù)據(jù)的冗余存儲俭厚,即使某個 Broker發(fā)生故障户魏,也能夠從其他擁有該分區(qū)副本的 Broker上繼續(xù)提供服務,保證數(shù)據(jù)的可用性挪挤。
副本因子的類型:
- Leader 副本:每個 Kafka 分區(qū)有一個 leader 副本叼丑,它負責處理該分區(qū)的所有讀寫操作。
- Follower 副本:其他副本被稱為 follower 副本扛门,它們從 leader 副本那里同步數(shù)據(jù)鸠信,但不參與直接的讀寫操作。當 leader 副本出現(xiàn)故障時论寨,Kafka 會從 follower 副本中選舉一個新的 leader 副本星立。
副本因子的設置在創(chuàng)建主題(Topic)時進行,可以根據(jù)需要設置不同的副本因子葬凳。一般來說绰垂,副本因子的設置涉及到可用性、性能火焰、資源開銷等方面的權衡劲装。常見的副本因子設置有以下幾種情況:
- 副本因子為1:這意味著每個分區(qū)只有Leader副本,沒有冗余。適用于單點故障沒有太大影響的場景占业,或者測試環(huán)境等不要求高可用性的情況绒怨。
- 副本因子大于1:常見的設置為2或3。其中有一個被選為 leader 副本纺酸,其余為 follower 副本窖逗。比如設置2個副本時,數(shù)據(jù)有兩個拷貝餐蔬,一個在Leader上,一個在Follower副本上佑附;設置3個副本時樊诺,數(shù)據(jù)有三個拷貝,一個在Leader上音同,另外兩個在Follower副本分區(qū)上词爬。這種設置提供了基本的冗余和故障轉移能力,能夠抵御一臺 Broker 節(jié)點的故障权均。
- 更高的副本因子:如果對可用性和數(shù)據(jù)保護要求較高顿膨,可以設置更多的副本。但副本因子越高叽赊,數(shù)據(jù)的冗余和復制開銷就越大恋沃,需要更多的存儲空間和網(wǎng)絡帶寬。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic mytopic
上述命令會創(chuàng)建一個名為 "mytopic" 的主題必指,設置分區(qū)數(shù)為 3囊咏,副本因子為 2,表示每個分區(qū)都有兩個副本塔橡。
需要注意的是梅割,分區(qū)數(shù)的設置一旦確定后,就不能直接修改葛家。如果需要修改分區(qū)數(shù)户辞,可以考慮創(chuàng)建一個新的主題,并將數(shù)據(jù)從舊主題遷移到新主題癞谒。
分區(qū)數(shù)的合理設置需要根據(jù)實際的業(yè)務需求和數(shù)據(jù)規(guī)模來進行調優(yōu)底燎,合適的分區(qū)數(shù)能夠充分利用 Kafka 集群的資源,提高消息的并行處理能力扯俱。
消息保留策略
在kafka中消費者消費消息后书蚪,消息仍然可以在 Kafka中存在一段時間,以便滿足后續(xù)需求迅栅,如重新消費或審計等殊校。Kafka 提供了兩種常見的消息保留策略:
- 時間保留策略(Time-based Retention):根據(jù)消息的時間戳進行保留《链妫可以配置一個保留時間段为流,超過該時間的消息將被刪除呕屎。
- 大小保留策略(Size-based Retention):根據(jù)消息的大小進行保留【床欤可以配置一個保留消息總大小的閾值秀睛,當消息大小超過該閾值時,較早的消息將被刪除莲祸。
二次消費
根據(jù)kafka的消息保留策略可以看出來蹂安,kafka支持重新消費,我們可以通過重置/修改消費者組的偏移量進行重新消費锐帜,或者創(chuàng)建新的消費者來重新消費田盈。
例如,如果你想讓消費者從 offset 10 開始重新消費:
kafka-consumer-groups.sh --bootstrap-server <broker_host:port> --group <consumer_group_name> --reset-offsets --to-offset 10 --topic <your_topic> --execute
消息的有序性
在kafka中缴阎,同一個分區(qū)內的消息是有序的允瞧,但是不同分區(qū)的消息之間無法保證順序,而且消費者組通常不會配置單線程蛮拔,這就導致topic中的消息不是完全有序消費的述暂。
如果想要實現(xiàn)消息的有序性,可以考慮以下幾點:
- 單個分區(qū):首先建炫,確保每個主題只有一個分區(qū)畦韭。
- 消費者線程:為了保證順序,每個分區(qū)應該只被一個消費者線程消費踱卵。
- 消息的生產(chǎn)和分區(qū)鍵:如果消息的順序非常重要廊驼,可以考慮在生產(chǎn)者端使用消息的分區(qū)鍵來確保相關消息被分配到同一個分區(qū)。
- 應用層面:在應用層面做額外的處理惋砂,例如在消費者端根據(jù)消息的時間戳來排序妒挎。
什么是分區(qū)鍵
Kafka 的分區(qū)鍵是一個用于指定消息被分配到哪個分區(qū)的值。分區(qū)鍵是由生產(chǎn)者在發(fā)送消息時指定的西饵,它決定了消息被分配到哪個主題的哪個分區(qū)中酝掩。一些常見的選擇分區(qū)鍵的策略:
- 根據(jù)消息的關鍵屬性:根據(jù)消息的某個關鍵屬性,例如用戶 ID眷柔、產(chǎn)品 ID 等期虾,來作為分區(qū)鍵。這樣驯嘱,具有相同屬性的消息會被分配到同一個分區(qū)镶苞,便于保證有序性和數(shù)據(jù)的聚合。
- 輪詢分區(qū):如果希望負載均衡鞠评,可以采用輪詢分區(qū)的策略茂蚓,讓每個生產(chǎn)者按照輪詢順序選擇分區(qū),確保消息均勻分布。
- 隨機分區(qū):對于一些場景聋涨,如果消息的順序不是特別關鍵晾浴,可以采用隨機分區(qū)的策略,將消息隨機分配到分區(qū)中牍白。
ack機制
Kafka 使用了一種 ack(Acknowledgment)機制來確保消息的可靠性傳遞脊凰。有以下幾種設置:
- acks = 0:生產(chǎn)者不需要等待任何服務器的確認。這種方式存在最高的風險茂腥,因為如果發(fā)送的消息在傳輸過程中出現(xiàn)問題狸涌,生產(chǎn)者將無法得知。消息可能會丟失或發(fā)送失敗础芍。
- acks = 1:生產(chǎn)者會等待消息被主題的 leader 分區(qū)成功寫入杈抢。如果 leader 成功寫入消息,生產(chǎn)者將收到一個確認響應仑性。但在這種模式下,仍然有可能出現(xiàn)副本同步失敗右蹦,因此消息仍然有丟失的風險诊杆。
- acks = all:生產(chǎn)者會等待消息被主題的所有副本成功寫入才會收到確認響應。這是最安全的模式何陆,確保了消息的高可靠性晨汹。但這也會增加一定的延遲,因為需要等待所有副本寫入成功贷盲。
簡單應用
我們可以在 Spring 項目中通過 Kafka 集群 來存儲日志淘这,使用 Logback 進行日志記錄時,可以配置 Logback 將日志發(fā)送到 Kafka巩剖。我們可以通過 Logback Kafka Appender(如 logstash-logback-encoder 提供的 Kafka Appender)實現(xiàn)這個功能铝穷。
首先,在你的 pom.xml 中添加相關依賴佳魔。我們可以使用 logstash-logback-encoder 提供的 Kafka Appender:
<dependencies>
<!-- Logback Core -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</dependency>
<!-- Logback Classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<!-- Logstash Logback Encoder (includes Kafka Appender) -->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.2</version>
</dependency>
</dependencies>
配置 logback-spring.xml
<configuration>
<!-- 定義 Kafka Appender -->
<appender name="KAFKA" class="net.logstash.logback.appender.LoggingEventAsyncDisruptorAppender">
<appender-ref ref="KAFKA_APPENDER"/>
</appender>
<appender name="KAFKA_APPENDER" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<!-- Kafka 集群的 brokers -->
<destination>ip1:port1,ip2:port2,ip3:port3</destination>
<!-- 發(fā)送到 Kafka 的 topic -->
<topic>log_topic</topic>
<!-- 編碼日志為 JSON 格式 -->
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<!-- 格式化日志為 JSON -->
<includeMdcKeyName>requestId</includeMdcKeyName>
<includeCallerData>true</includeCallerData>
</encoder>
</appender>
<!-- 配置日志級別 -->
<root level="INFO">
<!-- 將日志寫到 Kafka -->
<appender-ref ref="KAFKA"/>
</root>
</configuration>
通過這種方式曙聂,Spring 項目的日志可以被發(fā)送到 Kafka 集群,方便日志的集中收集和分析鞠鲜。