Kafka 工作流程分析
1杖剪、Kafka生產(chǎn)過程分析
(1)寫入方式
producer采用推(push)模式將消息發(fā)布到broker冻押,每條消息都被追加(append)到分區(qū)(patition)中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機寫內(nèi)存要高盛嘿,保障kafka吞吐率)
(2)partition
說明:
- 消息發(fā)送時都被發(fā)送到一個topic洛巢,其本質(zhì)就是一個目錄,而topic是由一些Partition Logs(分區(qū)日志)組成.
- 每個Partition中的消息都是有序的次兆,生產(chǎn)的消息被不斷追加到Partition log上稿茉,其中的每一個消息都被賦予了一個唯一的offset值。
分區(qū)原因:
- 提升拓展性:每個Partition可以通過調(diào)整以適應它所在的機器,而一個topic又可以有多個Partition組成漓库,因此整個集群就可以適應任意大小的數(shù)據(jù)了
- 提高吞吐能力:在進行數(shù)據(jù)寫入時以 Partition 為單位進行寫入恃慧。
分區(qū)依據(jù):
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// (1) 指定了patition,則直接使用該 Partition
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
- 對于已經(jīng)指定了 partition 的渺蒿,則直接使用該partition痢士;
- 未指定patition但指定key,通過對key的value進行hash出一個patition茂装;
- patition和key都未指定良瞧,使用輪詢選出一個patition;
(3)Replica(副本)
(4)寫入流程
流程圖:
流程描述:
- producer先從zookeeper的 "/brokers/.../state"節(jié)點找到該partition的leader
- producer將消息發(fā)送給該leader
- leader將消息寫入本地log
- followers從leader pull消息
- Follower將 pull到的消息寫入本地log
- Follower 寫入成功值后向leader發(fā)送ACK
- leader收到所有ISR中的replication的ACK后训唱,增加HW
- 向producer發(fā)送ACK
2褥蚯、 Broker 保存消息
(1)存儲說明
- 物理上把topic分成一個或多個patition,每個patition物理上對應一個文件夾(該文件夾存儲該patition的所有消息和索引文件)
- Kafka讀取特定消息的時間復雜度為O(1);
- 消息數(shù)據(jù)是存儲在partition文件夾下的*.log文件中的况增;
- 消息存儲時常有兩個策略赞庶,分別為:
基于時間存儲策略:默認保留168小時(log.retention.hours=168)
基于大小保留策略:默認保留 1G(log.retention.bytes=1073741824)
(2)Zk存儲結構
3、consumer flow
(1) 高級API與低級API
- kafka提供了兩套consumer API:高級Consumer API和低級Consumer API澳骤。
- 高級API不需要自行去管理offset歧强,partition replica等,系統(tǒng)通過Zk自行管理为肮。(低級 API反之)
(2)Consumer Group(消費者組)
流程圖:
描述說明:
- Consumer Group 由多個Consumer 組成摊册,同時一個Consumer只有屬于一個Consumer Group。
- Consumer Group 保證了其訂閱的Topic partition 會被該Consumer Group 中的Consumer消費颊艳。對于多個Consumer Group訂閱了同一個Topic茅特,每個Consumer Group之間互不影響。
- 如果要實現(xiàn)一個消息被多個 consumer 消費棋枕,則可以將當consumer 單獨添加到單獨的Consumer Group中(反之白修,如果要實現(xiàn)一個消息 被一個 consumer 消費,則可以將當consumer 添加到同一個Consumer Group中)