Kafka 是一個分布式流媒體平臺,常被用做MQ
kafka組件:
- Producers 消息生產(chǎn)者
- Consumers 消息消費者
-
Broker 消息中間件處理結點盛泡,一個Kafka節(jié)點就是一個broker,多個broker可以組成一個Kafka集群饭于。
kafka概念:
- Topic:一類消息颅痊,例如page view日志舰罚、click日志等都可以以topic的形式存在营罢,Kafka集群能夠同時負責多個topic的分發(fā)。
- Partition:topic物理上的分組证鸥,一個topic可以分為多個partition,每個partition是一個有序的隊列(即Partition內(nèi)有序)
- Partition Leader:
Each partition has one server which acts as the "leader" and zero or more servers which act as "followers". The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster. - Replicate:副本,一個Partition的副本Partition鸟蜡,用于高可用癌淮。
- Segment:partition物理上由多個segment組成美侦,下面2.2和2.3有詳細說明产舞。
- offset:每個partition都由一系列有序的、不可變的消息組成菠剩,這些消息被連續(xù)的追加到partition中易猫。partition中的每個消息都有一個連續(xù)的序列號叫做offset,用于partition唯一標識一條消息.
kafka Partition和副本Partition
每個Partition會有多個副本Partition,分布在不同的Broker上。
Partition存儲
topic和partition都是邏輯上的概念
每個partition對應一個目錄:{topic}-{partition}
比如圖中topic car_data,partition 0的目錄時 /car_data-0
/car_data-0中存的時segment(邏輯上的),對應實際的是.index文件和.log文件
index文件通過mmap在內(nèi)存中
index和log文件
index存儲的是稀疏索引(稀疏索引占用空間少)
index和log文件名稱和文件的內(nèi)容中第一個offset有關具壮。
kafka消費者組
kafka消費者Offset保存
新版本中offset由broker維護准颓,offset信息由一個特殊的topic “ __consumer_offsets”來保存,offset以消息形式發(fā)送到該topic并保存在broker中棺妓。這樣consumer提交offset時攘已,只需連接到broker,不用訪問zk怜跑,避免了zk節(jié)點更新瓶頸样勃。
broker消息保存目錄在配置文件server.properties中
# A comma separated list of directories under which to store log files
log.dirs=/usr/local/var/lib/kafka-logs
#ls /usr/local/var/lib/kafka-log
__consumer_offsets-0 __consumer_offsets-22 __consumer_offsets-36 __consumer_offsets-5
__consumer_offsets-1 __consumer_offsets-23 __consumer_offsets-37
其中numPartitions由offsets.topic.num.partitions參數(shù)決定,默認值即50
key是group.id+topic+分區(qū)號性芬,而 value 就是 offset 的值
kafka消息投遞語義
Kafka 性能
- 順序讀寫
磁盤讀寫的快慢取決于你怎么使用它彤灶,也就是順序讀寫或者隨機讀寫。在順序讀寫的情況下批旺,磁盤的順序讀寫速度和內(nèi)存持平。因為硬盤是機械結構诵姜,每次讀寫都會尋址->寫入汽煮,其中尋址是一個“機械動作”,它是最耗時的棚唆。所以硬盤最討厭隨機 I/O暇赤,最喜歡順序 I/O。為了提高讀寫硬盤的速度宵凌,Kafka 就是使用順序 I/O - Memory Mapped Files
即便是順序寫入硬盤鞋囊,硬盤的訪問速度還是不可能追上內(nèi)存。所以 Kafka 的數(shù)據(jù)并不是實時的寫入硬盤 瞎惫,它充分利用了現(xiàn)代操作系統(tǒng)分頁存儲來利用內(nèi)存提高 I/O 效率溜腐。
Memory Mapped Files(后面簡稱 mmap)也被翻譯成 內(nèi)存映射文件 ,在 64 位操作系統(tǒng)中一般可以表示 20G 的數(shù)據(jù)文件瓜喇,它的工作原理是直接利用操作系統(tǒng)的 Page 來實現(xiàn)文件到物理內(nèi)存的直接映射挺益。
完成映射之后你對物理內(nèi)存的操作會被同步到硬盤上(操作系統(tǒng)在適當?shù)臅r候)。
通過 mmap乘寒,進程像讀寫硬盤一樣讀寫內(nèi)存(當然是虛擬機內(nèi)存)望众,也不必關心內(nèi)存的大小有虛擬內(nèi)存為我們兜底。
使用這種方式可以獲取很大的 I/O 提升,省去了用戶空間到內(nèi)核空間復制的開銷(調(diào)用文件的 read 會把數(shù)據(jù)先放到內(nèi)核空間的內(nèi)存中烂翰,然后再復制到用戶空間的內(nèi)存中夯缺。)
但也有一個很明顯的缺陷——不可靠,寫到 mmap 中的數(shù)據(jù)并沒有被真正的寫到硬盤甘耿,操作系統(tǒng)會在程序主動調(diào)用 flush 的時候才把數(shù)據(jù)真正的寫到硬盤踊兜。
Kafka 提供了一個參數(shù)——producer.type 來控制是不是主動 flush,如果 Kafka 寫入到 mmap 之后就立即 flush 然后再返回 Producer 叫 同步 (sync)棵里;寫入 mmap 之后立即返回 Producer 不調(diào)用 flush 叫異步 (async)润文。 - 消息壓縮
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 開啟 GZIP 壓縮
props.put("compression.type", "gzip");
Producer<String, String> producer = new KafkaProducer<>(props);
-
分批發(fā)送
生產(chǎn)者,消費者 sample
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
producer.send(record)
private static final String TOPIC_NAME = "car";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, TOPIC_NAME);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
String message = null;
try {
do {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100000));
for (ConsumerRecord<String, String> record : records) {
message = record.value();
System.out.println(message);
}
} while (true);
} catch(Exception e) {
// exception
} finally {
consumer.close();
}
}