1. kafka架構(gòu)圖
2. 角色分析
1. Broker
kafka作為一個消息中間件限番,用于存儲和轉(zhuǎn)發(fā)消息肌索,可以把它想象成一個中介嵌溢,股票經(jīng)紀(jì)人就叫做broker。默認(rèn)端口是9092办斑,生產(chǎn)者和消費(fèi)者都需要跟這個Broker建立連接才可以實(shí)現(xiàn)消息的收發(fā)外恕。
2. 消息
客戶端之間傳輸?shù)臄?shù)據(jù)稱之為消息, 或者說是記錄(record)俄周。請記住吁讨,對于kafka來說,不管是消費(fèi)者還是生產(chǎn)者都是客戶端峦朗。 在客戶端的代碼中建丧,Record可以是一個key-value鍵值對,生產(chǎn)者對應(yīng)的封裝類是ProducerRecord波势, 消費(fèi)者對應(yīng)的封裝類是ConsumerRecord翎朱。消息在傳輸?shù)倪^程中需要序列化,所有需要我們在代碼中執(zhí)行序列化工具尺铣。消息在服務(wù)端中存儲的格式(RecordBatch和Record)拴曲。
3. 生產(chǎn)者
我們將發(fā)送消息的一方稱之為生產(chǎn)者,接收消息的乙方稱之為消費(fèi)者凛忿,為了提升消息發(fā)送的速率澈灼,生產(chǎn)者并不是組條發(fā)送消息到broker中,而是批量發(fā)送的店溢。多少條發(fā)送一次叁熔,由配置中的一個參數(shù)決定。
props.put("batch.size", 16384);
4. 消費(fèi)者
一般來說床牧,消費(fèi)者獲取消息存在兩種方式荣回,一種是pull, 一種是push。kafka采用的是pull模式戈咳。WHY?
消費(fèi)者可以控制自己一次消費(fèi)多少條消息
max.poll.record=500 #默認(rèn)是500條
5.
生產(chǎn)者和消費(fèi)者之間每條消息之間是如何關(guān)聯(lián)起來的呢耳贬?也就是消費(fèi)者怎么就知道自己需要消費(fèi)什么消息?
隊(duì)列的存在就是解決這個問題的泳姐。在kafka里面這個隊(duì)列就是topic效拭,暂吉。
生產(chǎn)者和Topic胖秒,Topic和消費(fèi)者的關(guān)系都是多對多(不建議這么做)。
當(dāng)生產(chǎn)者發(fā)送消息時慕的,沒有對應(yīng)的Topic阎肝,這個時候會自動創(chuàng)建Topic“菇郑可以通過參數(shù)控制
auto.enable.topics.enable=true #默認(rèn)時true
6. partition和Cluster
分區(qū)其實(shí)是一種數(shù)據(jù)庫分片的思想风题。試想一下,如果一個topic中消息過多嫉父,會產(chǎn)生什么樣的問題沛硅。
- 不方便橫向擴(kuò)展,通過擴(kuò)展機(jī)器而不是升級硬件擴(kuò)展绕辖。
- 并發(fā)負(fù)載摇肌,所有的客戶端都操作同一個topic,在高并發(fā)的場景下仪际,性能瓶頸
kafka分區(qū)概念---partition围小。一個topic可以劃分成多個分區(qū),分區(qū)在創(chuàng)建topic的時候指定树碱,每個topic至少有一個分區(qū)肯适。
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic xiong
如果沒有指定分區(qū)數(shù),默認(rèn)分區(qū)是1個成榜,可通過下述參數(shù)修改
num.partitions=1
partition負(fù)載的實(shí)現(xiàn)框舔。舉例說明,Topic有三個分區(qū)赎婚,生產(chǎn)者發(fā)送了9條消息刘绣,第一個分區(qū)存儲了1 4 7, 第二個分區(qū)存儲了2 5 8惑淳,第三個分區(qū)存儲了3 6 9额港。這種情況下其實(shí)就是負(fù)載的一種體現(xiàn)
每個partition都會有一個物理目錄。kafka的配置文件下可以配置日志的存儲路徑歧焦,默認(rèn)存儲在/tmp/kafka-logs下移斩,假設(shè)topic=xiongTopic, 每個分區(qū)的存儲目錄就是xiongTopic-0肚医、xiongTopic-1.....
7. 副本機(jī)制
如果partition的數(shù)據(jù)只存儲了一份你稚,在發(fā)生網(wǎng)絡(luò)或者硬件故障的時候,該分區(qū)的數(shù)據(jù)會無法訪問或者無法恢復(fù)了朱躺。kafka在0.8版本之后增加了副本機(jī)制刁赖, 每個partiotion可以有若干個副本,长搀。一般我們說的副本包括其中的主節(jié)點(diǎn)宇弛。
由replication-factor指定一個Topic的副本數(shù):
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partition --topic testxiong
服務(wù)端有個參數(shù)控制默認(rèn)的副本數(shù)
offsets.topic.replication.factor=3
leader用粉紅色標(biāo)識源请,follower用綠色標(biāo)識枪芒,leader是由選舉得出。
生產(chǎn)者消費(fèi)者消息傳遞都是通過leader來操作谁尸,follower的數(shù)據(jù)是通過leader同步過來的舅踪。
8. Segment
kafka的數(shù)據(jù)是放在后綴為.log的文件中,試想一下良蛮,kafka的數(shù)據(jù)在同一個partition中是順序?qū)懭氲某槁担覀儾粩嗟淖芳訑?shù)據(jù),那保存數(shù)據(jù)的文件就會越來越大背镇,這個時候檢索的效率就會越來越低咬展。
所以,kafka這塊干脆對partition再次進(jìn)行了切分瞒斩,切分出來的單位就就做段(segment)破婆,實(shí)際上kafka數(shù)據(jù)的存儲是分段的。我們可以在kafka的存儲目錄下看到這三個文件都是成對出現(xiàn)的:
這其中是一個數(shù)據(jù)文件胸囱,2個索引文件祷舀。segment的默認(rèn)存儲大小是1G,可以通過一下參數(shù)進(jìn)行控制烹笔。
log.segment.bytes=1073741824
9. Consumer Group
在kafka中裳扯,消費(fèi)者是以消費(fèi)者組的形式對消息進(jìn)行接收。每個消費(fèi)者組都會由一個group id與對應(yīng)的topic進(jìn)行綁定谤职。
- 消費(fèi)者組中冤吨,消費(fèi)者數(shù)量比partition數(shù)量少的情況下蒿柳,一個消費(fèi)者同時消費(fèi)多個partition。
- 消費(fèi)者組中漩蟆,消費(fèi)者數(shù)量比partition數(shù)量多的情況下垒探,存在消費(fèi)者空閑。
這兩種情況都不是效率最高的情況怠李,只有消費(fèi)者數(shù)量和partition數(shù)量保持一致才是最好的選擇圾叼。如果想要消費(fèi)同一個partition,就需要另一個消費(fèi)者組來進(jìn)行捺癞。
10. Comsumer Offset
我們前面談到夷蚊,在Kafka中消息是順序寫入的,并且消費(fèi)的消息是不會被刪除的翘簇。那么撬码,如果消費(fèi)者突然掛掉,或者進(jìn)行下次讀寫時版保,如何知道自己已經(jīng)讀取了哪些信息,該從何處繼續(xù)讀取消息呢夫否?
既然消息是有序的彻犁,那我們就可以給消息進(jìn)行編號,來唯一標(biāo)識一條消息凰慈。
這里的編號我們就稱之為offset汞幢,偏移量。offset記錄著下一條將要發(fā)送給consumer的消息序號微谓。offset的保存是保存在服務(wù)端的森篷,并不是保存在ZK上面。
3. Kafka Java開發(fā)
生產(chǎn)者:
public class SimpleProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.182.128:9092");
// 設(shè)置key value序列化的工具
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//設(shè)置消息接收確認(rèn)模式 0 發(fā)出就立刻確認(rèn)豺型, 1 leader接收到就確認(rèn) all 所有follower同步完成再確認(rèn)
props.put("acks","1");
// 異常重試次數(shù)
props.put("retries", 3);
// 設(shè)置批量發(fā)送數(shù)據(jù)一次仲智,數(shù)據(jù)大小,默認(rèn)16k
props.put("batch.size",16384);
// 設(shè)置批量發(fā)送等待時間
props.put("linger.ms", 5);
// 設(shè)置客戶端緩沖區(qū)大小姻氨,默認(rèn)是32M钓辆,滿了以后也會出發(fā)消息發(fā)送
props.put("buffer.memory", 33554432);
// 獲取元數(shù)據(jù)時生產(chǎn)者的阻塞時間,超時后拋出異常
props.put("max.block.ms", 3000);
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i=0; i < 100; i ++) {
producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
}
消費(fèi)者
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.182.128:9092");
props.put("group.id", "xiong-group");
// 是否自動提交偏移量肴焊,只有commit之后才更新消費(fèi)者組的offset
props.put("enable.auto.commit", "true");
// 消費(fèi)者自動提交的時間間隔
props.put("auto.commit.interval.ms", "1000");
// 從最早的數(shù)據(jù)開始消費(fèi)earliest | latest | none
props.put("auto.offset.reset", "earliest");
// 設(shè)置key value反序列化的工具
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//訂閱隊(duì)列
consumer.subscribe(Arrays.asList("mytopic"));
try{
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset=%d, key=%s, value=%s, partition=%s%n",
record.offset(), record.key(), record.value(), record.partition());
}
}
} finally {
consumer.close();
}
}
}
查詢消費(fèi)者相關(guān)偏移量數(shù)據(jù):
./kafka-consumer-groups.sh --bootstrap-server 192.168.182.128:9092 --describe --group xiong-group
4. 消息冪等性
什么叫做消息冪等性前联?
簡單來說就是,消息發(fā)送一次的結(jié)果和發(fā)送多次的結(jié)果是一樣的娶眷。
有時候消息消費(fèi)失敗的情況下似嗤,我們可能會采用消息重發(fā)的機(jī)制。但是生產(chǎn)者有時候是不知道消息是不是真的消費(fèi)失敗時届宠,這時候消息的重發(fā)可能會產(chǎn)生消息重復(fù)的情況烁落。
kafka實(shí)現(xiàn)消息的冪等性是在broker中實(shí)現(xiàn)的壳咕,而不是消費(fèi)者端實(shí)現(xiàn),大大的解放了消費(fèi)者的雙手顽馋。
如何實(shí)現(xiàn)消息的去重谓厘?
去重是需要依賴生產(chǎn)者消息的唯一標(biāo)識的,不然我們沒法知道是否是同一條消息寸谜,kafka中可以通過如下配置來產(chǎn)生唯一標(biāo)識竟稳,將producer升級成冪等性的producer。
props.put("enable.idempotence", true);
實(shí)現(xiàn)機(jī)制:
- PID(Producer ID), 冪等性的生產(chǎn)者每個客戶端都有一個唯一的編號熊痴。
- sequence number他爸,冪等性的生產(chǎn)者發(fā)送的每條消息都會帶sequence number, Server端就是通過這個值來判斷消息是否重復(fù)果善。如果server端發(fā)現(xiàn)sequence number的值比服務(wù)端記錄的值要小诊笤,那證明這個消息是重復(fù)的消息。(同一分區(qū)消息順序?qū)懭虢砩拢叭绻嬖趕equence number較小的在后面寫入讨跟,那證明之前肯定已經(jīng)有相同的消息已經(jīng)發(fā)送過來過了)。
作用范圍:
- sequence number并不是全局有序鄙煤,不能保證所有時間上的冪等晾匠。只能保證單分區(qū)上的冪等。
- 單會話上的冪等梯刚,這里的會話是指producer進(jìn)程的一次運(yùn)行凉馆。當(dāng)producer重啟以后就不能保證了。
5. 生產(chǎn)者事務(wù)
生產(chǎn)者與事務(wù)有關(guān)的方法如下:(kafka 0.11版本以后才支持事務(wù))
對象 | 描述 |
---|---|
initTransactions() | 初始化事務(wù) |
beginTransaction() | 開啟事務(wù) |
commitTransaction() | 提交事務(wù) |
abortTransaction() | 中止事務(wù) |
sendOffsetsToTransaction() | sendOffsetsToTransaction方法是消費(fèi)者和生產(chǎn)者在同一段代碼使用的(從上游接收消息發(fā)送給下游)亡资,在提交的時候把消費(fèi)消息的offset發(fā)送給consumer Corordinator. |
代碼示例:
//事務(wù)的前提是消費(fèi)者的冪等性
props.put("enable.idempotence", true);
//設(shè)置事務(wù)id澜共,唯一
props.put("transactional.id", UUID.randomUUID().toString());
Producer<String, String> producer = new KafkaProducer<String, String>(props);
producer.initTransactions();
try{
producer.beginTransaction();
for (int i=0; i < 100; i ++) {
producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(i), Integer.toString(i)));
if (i == 20) {
Integer j = 1/0; //制造異常
}
}
producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(100), Integer.toString(100)));
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
producer.close();
kafka分布式事務(wù)的實(shí)現(xiàn):
- 生產(chǎn)者的消息會分區(qū),所以這里的事務(wù)屬于分布式事務(wù)锥腻。kafka采用的是2PC提交嗦董。如果大家都可以commit就提交,否則就abort旷太;
- 2PC的情況下展懈,需要一個協(xié)調(diào)者,在Kafka中這個角色叫做Transaction Coordinator供璧。
- 事務(wù)管理必須有事務(wù)日志來記錄事務(wù)的狀態(tài)存崖,以便在Coordinator以外掛掉以后繼續(xù)處理原來的事務(wù)。事務(wù)日志的存儲類似于消費(fèi)者offset的存儲睡毒,kafka使用了一個特殊topic--transaction_state來記錄事務(wù)的狀態(tài)信息来惧。
- 如果生產(chǎn)者掛了,事務(wù)要在重啟以后繼續(xù)處理就需要有一個唯一的事務(wù)id來找到對應(yīng)的事務(wù)演顾,這個就是transaction.id供搀。配置了transaction.id隅居,此時生產(chǎn)者必須是冪等性的生產(chǎn)者。事務(wù)id相同的生產(chǎn)者可以繼續(xù)處理原來的事務(wù)葛虐。
步驟描述:
A: 生產(chǎn)者通過initTransactions Api向coordinator注冊事務(wù)id胎源。
B: Corrdinator記錄事務(wù)日志
C: 生產(chǎn)者將消息寫入目標(biāo)分區(qū)
D: 分區(qū)域Coordinator的交互,當(dāng)事務(wù)完成以后消息的狀態(tài)應(yīng)該是已提交屿脐。這時候消費(fèi)者才能消費(fèi)