Kafka的基本概念
-
Broker
Kafka集群中包含多個(gè)服務(wù)器絮记,其中每個(gè)服務(wù)器稱為一個(gè)broker振乏。有一點(diǎn)需要注意一下恰聘,添加一個(gè)新的broker到cluster中的時(shí)候戳气,并不會(huì)分配任何數(shù)據(jù)partiton到新的broker链患,除非有新的topic被創(chuàng)建,為了不創(chuàng)建新的topic瓶您,可以考慮使用partition re-assignment tool將已有的parititon分配到新的broker中
-
Producer
消息生產(chǎn)者麻捻,向kafka broker發(fā)送消息的客戶端,producer基于record的key決定將record發(fā)送到哪個(gè)partition览闰,默認(rèn)使用key的hash芯肤,如果沒有key巷折,則使用輪詢的策略
利用api創(chuàng)建kafka生產(chǎn)者有三個(gè)基本屬性:
- bootstrap.servers:屬性值是一個(gè)host:port的broker列表压鉴,指定了簡(jiǎn)歷初始連接的broker列表,這個(gè)列表不需要包含所有的broker锻拘,因?yàn)榻⒌某跏歼B接會(huì)從相應(yīng)的broker獲取到集群的信息油吭。但是建議至少包含連個(gè)broker,保證高可用署拟。
- key.serializer:屬性值是類的名稱婉宰。這個(gè)屬性指定了用來(lái)序列化鍵值(key)的類。Kafka broker只接受字節(jié)數(shù)組推穷,但生產(chǎn)者的發(fā)送消息接口允許發(fā)送任何的Java對(duì)象心包,因此需要將這些對(duì)象序列化成字節(jié)數(shù)組。key.serializer指定的類需要實(shí)現(xiàn)org.apache.kafka.common.serialization.Serializer接口馒铃,Kafka客戶端包中包含了幾個(gè)默認(rèn)實(shí)現(xiàn)蟹腾,例如ByteArraySerializer痕惋、StringSerializer和IntegerSerializer。
- value.serializer:屬性值是類的名稱娃殖。這個(gè)屬性指定了用來(lái)序列化消息記錄的類
- acks: acks控制多少個(gè)副本必須寫入消息后生產(chǎn)者才能認(rèn)為寫入成功值戳,這個(gè)參數(shù)對(duì)消息丟失可能性有很大影響。這個(gè)參數(shù)有三種取值:
- acks=0:生產(chǎn)者把消息發(fā)送到broker即認(rèn)為成功炉爆,不等待broker的處理結(jié)果堕虹。這種方式的吞吐最高,但也是最容易丟失消息的芬首。
- acks=1:生產(chǎn)者會(huì)在該分區(qū)的群首(leader)寫入消息并返回成功后赴捞,認(rèn)為消息發(fā)送成功。如果群首寫入消息失敗郁稍,生產(chǎn)者會(huì)收到錯(cuò)誤響應(yīng)并進(jìn)行重試螟炫。這種方式能夠一定程度避免消息丟失,但如果群首宕機(jī)時(shí)該消息沒有復(fù)制到其他副本艺晴,那么該消息還是會(huì)丟失昼钻。另外,如果我們使用同步方式來(lái)發(fā)送封寞,延遲會(huì)比前一種方式大大增加(至少增加一個(gè)網(wǎng)絡(luò)往返時(shí)間)然评;如果使用異步方式,應(yīng)用感知不到延遲狈究,吞吐量則會(huì)受異步正在發(fā)送中的數(shù)量限制碗淌。
- acks=all:生產(chǎn)者會(huì)等待所有副本成功寫入該消息,這種方式是最安全的抖锥,能夠保證消息不丟失亿眠,但是延遲也是最大的。
- 當(dāng)生產(chǎn)者發(fā)送消息收到一個(gè)可恢復(fù)異常時(shí)磅废,會(huì)進(jìn)行重試纳像,這個(gè)參數(shù)指定了重試的次數(shù)。在實(shí)際情況中拯勉,這個(gè)參數(shù)需要結(jié)合retry.backoff.ms(重試等待間隔)來(lái)使用竟趾,建議總的重試時(shí)間比集群重新選舉群首的時(shí)間長(zhǎng),這樣可以避免生產(chǎn)者過早結(jié)束重試導(dǎo)致失敗宫峦。
private Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(kafkaProps);
producer創(chuàng)建完成后岔帽,有三種發(fā)送消息的方式:
-
Fire-and-forget
(即發(fā)即棄): 發(fā)送消息給服務(wù)器, 然而并不關(guān)心消息是否成功達(dá)到.大部分情況下, 它將成功達(dá)到, 因?yàn)?Kafka 是高可用的, 并且生產(chǎn)者會(huì)自動(dòng)重試發(fā)送消息.不管怎樣,使用這種方式有些消息可能會(huì)丟失.ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); try { // 此方法返回 RecordMetadata, 但是這里忽略了返回值, 無(wú)法知道消息是否發(fā)送成功 // 生產(chǎn)環(huán)境一般不適用此種方式 producer.send(record); } catch (Exception e) { // SerializationException: 如果序列化失敗 // BufferExhaustedException: buffer 滿了 // TimeoutException // InterruptException: 發(fā)送線程被中斷 e.printStackTrace(); }
-
Synchronous send
(同步發(fā)送): 發(fā)送消息后,send()
方法返回一個(gè)Future
對(duì)象, 使用get()
方法在 future 上等待, 以此來(lái)判斷send()
是否成功.獲取寫入的記錄的metadata,如topic
导绷、partition
和offset
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); try { // 發(fā)送成功, 可以獲得一個(gè) RecordMetadata 對(duì)象 producer.send(record) // 等待應(yīng)答 .get(); } catch (Exception e) { // 發(fā)送失敗 e.printStackTrace(); }
大部分情況下,我們不需要回復(fù)–Kafka 返回寫入的記錄的
topic
犀勒、partition
和offset
,通常發(fā)送端是不需要這些的.另外,我們可能需要知道什么時(shí)候發(fā)送消息失敗,所以我們可以拋出一個(gè)異常,記錄錯(cuò)誤信息或者寫入錯(cuò)誤文件用于后面的分析.不能通過重試被解決.比如,message size too large
(消息大小太大),在這些情況中,KakfaProducer
將不會(huì)嘗試重試, 并立即返回異常. -
Asynchronous send
(異步發(fā)送): 使用一個(gè)callback function
(回調(diào)方法)調(diào)用send()
方法, 當(dāng)從 Kafka broker 接收到相應(yīng)的時(shí)候會(huì)觸此回調(diào)方法.private class DemoProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) {//當(dāng)Kafka返回異常時(shí),異常值不為null e.printStackTrace(); } } } ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); producer.send(record, new DemoProducerCallback());
?
-
Consumer(每個(gè)consumer group是一個(gè)訂閱者,為每個(gè)topic partiton維護(hù)一個(gè)offset贾费,每個(gè)consumer自己也會(huì)維護(hù)一個(gè)offset)
消息消費(fèi)者枚碗,每個(gè)consumer屬于一個(gè)特定的consumer group(可為每個(gè)consumer指定group name,若不指定group name則屬于默認(rèn)的group)铸本。同一topic的一條消息只能被同一個(gè)consumer group內(nèi)的一個(gè)consumer消費(fèi)肮雨,但多個(gè)consumer group可同時(shí)消費(fèi)這一消息。Consumer Group中的每個(gè)Consumer讀取Topic的一個(gè)或多個(gè)Partitions箱玷,并且是唯一的Consumer怨规;如果Consumer group中所有consumer總線程大于partitions數(shù)量,則會(huì)出現(xiàn)空閑情況锡足。這樣可以做到負(fù)載均衡波丰,也可以實(shí)現(xiàn)順序消費(fèi)(group中只有一個(gè)consumer)。每個(gè)consumer group維護(hù)了每個(gè)topic partition的offset舶得。
- 為了保證順序消費(fèi)掰烟,每個(gè)message只能發(fā)送到一個(gè)consumer中。否則效率很低沐批,需要等到所有消費(fèi)者消費(fèi)完才能發(fā)送下一個(gè)message纫骑,顯然是不合理的。同時(shí)對(duì)于topic中parition的消費(fèi)如果是異步的就很難保證順序性九孩。目前許多消息系統(tǒng)經(jīng)常使用‘獨(dú)占消費(fèi)’的方式消費(fèi)先馆。例如topic中的parition只能由特定一個(gè)消費(fèi)者消費(fèi),官網(wǎng)明確kafka智能保證一個(gè)parition中的消息的有序性躺彬,不能保證topic中不同parition的有序性
- 如果所有的consumer都在一個(gè)consumer group中煤墙,就像傳統(tǒng)的隊(duì)列一樣。如果所有的consumer都在不同的consumer group中就像發(fā)布訂閱模式一樣宪拥,所有的message都會(huì)廣播倒所有consumers中仿野。因此如果有很多的訂閱者,kafka的性能就會(huì)降低她君,因?yàn)閗afka需要拷貝message到所有的group中以保證順序性
- kafka consumer負(fù)載均衡:每個(gè)consumer是一個(gè)parititon的專有消費(fèi)者脚作,如果有新的consumer加入到了group中,它將獲得一個(gè)共享的parititon犁河,如果一個(gè)consumer掛了鳖枕,它的partition將會(huì)被分配到其他剩余的xonsumer中
- kafka災(zāi)備:consumer會(huì)將offset反饋給kafka broker當(dāng)一條記錄杯成功處理后魄梯。如果在發(fā)送commit offset前桨螺,consumer處理失敗,其他的consumer將會(huì)繼續(xù)從上次的commit offset開始處理酿秸。如果在處理完后這一條記錄但還未發(fā)送commit offset時(shí)consumer發(fā)生錯(cuò)誤灭翔,kafka記錄將被重復(fù)消費(fèi)。在這個(gè)情景下,kafka 實(shí)現(xiàn)了至少一次的消費(fèi)肝箱,應(yīng)該保證消息被處理時(shí)冪等的
- offset 管理:kafka將offset 數(shù)據(jù)保存到一個(gè)"__consumer_offset" topic中哄褒,這個(gè)topic使用日志壓縮,kafka災(zāi)備的的offset就是修改或讀取此topic中的值煌张。
- consumer可以消費(fèi)哪些記錄呐赡? 一條最新的記錄進(jìn)入之后,offset寫入到log parition中骏融,然后將該記錄復(fù)制到所有的partition的followers中链嘀,最后標(biāo)記"High watermark"(成功復(fù)制的最新紀(jì)錄offset)。consumer 消費(fèi)的是"High watermark"中的offset档玻,未被復(fù)制的不可以被消費(fèi)怀泊。
- consumer和parititon的關(guān)系:對(duì)于一個(gè)group,一個(gè)consumer只能消費(fèi)一個(gè)parition误趴,如果consumer數(shù)量大于paritition的數(shù)量霹琼,有的consumer就會(huì)空閑,可以作為災(zāi)備凉当,如果小于partiion數(shù)量枣申,每個(gè)consumer就會(huì)消費(fèi)多個(gè)parition
- 多線程kafka consumer :一個(gè)consumer有多個(gè)線程,很難保證記錄消費(fèi)的有序性看杭,只有在消費(fèi)單條記錄時(shí)間很長(zhǎng)的時(shí)候使用糯而,一般不建議使用。在一個(gè)進(jìn)程中跑多個(gè)線程泊窘,每個(gè)線程是一個(gè)consumer熄驼,每個(gè)線程管理自己的offset。
三種消費(fèi)方式
首先了解一下建立consumer的參數(shù):
"bootstrap.servers", 指定kafka的broker
"group.id", 指定consumer group
"enable.auto.commit", 指定offset可以自動(dòng)被commit 到kafka烘豹,不需要程序中顯示的寫
"auto.commit.interval.ms", 指定了commit offset的時(shí)間間隔
"key.serializer" and "value.serializer", are classes to be used to decode the message into bytes.
-
自動(dòng)commit offset: parition中的一個(gè)記錄被消費(fèi)后自動(dòng)commit offset到kafka
package com.til.kafka.consumer; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import com.tb.constants.KafkaConstants; //Automatic Offset Committing public class AOCKafkaConsumer { Properties props; KafkaConsumer<String, String> consumer; public AOCKafkaConsumer(String brokerString) { props = new Properties(); props.put("bootstrap.servers", brokerString); props.put("group.id", KafkaConstants.KAFKA_CONSUMER_GROUP); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", KafkaConstants.KAFKA_KEY_SERIALIZER); props.put("value.deserializer", KafkaConstants.KAFKA_VALUE_SERIALIZER); consumer = new KafkaConsumer<>(props); } public void subscribe(List<String> topics) { consumer.subscribe(topics); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } public class KafkaConstants { public static String KAFKA_BROKER_STRING = "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"; public static String KAFKA_KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; public static String KAFKA_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; public static String KAFKA_TOPIC = "TEST-1"; public static String KAFKA_CONSUMER_GROUP = "TEST"; }
-
手動(dòng)commit offset 到kafka:手動(dòng)控制commit offset到kafka
import java.util.List; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import com.tb.constants.KafkaConstants; //Manual Offset Control public class MOCKafkaConsumer { Properties props; KafkaConsumer<String, String> consumer; public MOCKafkaConsumer(String brokerString) { props = new Properties(); props.put("bootstrap.servers", brokerString); props.put("group.id", KafkaConstants.KAFKA_CONSUMER_GROUP); props.put("enable.auto.commit", "false"); props.put("key.deserializer", KafkaConstants.KAFKA_KEY_SERIALIZER); props.put("value.deserializer", KafkaConstants.KAFKA_VALUE_SERIALIZER); consumer = new KafkaConsumer<>(props); } public void subscribe(List<String> topics) { consumer.subscribe(topics); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); // This line of code manually commits offset to kafka consumer.commitSync(); } } }
-
手動(dòng)分配一個(gè)consumer給一個(gè)partition:可以手工分配給特定的分區(qū)瓜贾,在這種類型的使用者中,我們可以繞過使用者組的概念携悯,并將使用者分配給特定的分區(qū)祭芦。
import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import com.tb.constants.KafkaConstants; //Manual Partition Assignment public class MPAKafkaConsumer { private Properties props; private KafkaConsumer<String, String> consumer; public MPAKafkaConsumer(String brokerString) { props = new Properties(); props.put("bootstrap.servers", brokerString); // props.put("group.id", KafkaConstants.KAFKA_CONSUMER_GROUP); props.put("enable.auto.commit", "false"); props.put("key.deserializer", KafkaConstants.KAFKA_KEY_SERIALIZER); props.put("value.deserializer", KafkaConstants.KAFKA_VALUE_SERIALIZER); consumer = new KafkaConsumer<>(props); } public void subscribe(List<TopicPartition> topicsPartions) { consumer.assign(topicsPartions); } } // consumer的構(gòu)建和測(cè)試 import java.util.Arrays; import org.apache.kafka.common.TopicPartition; import com.tb.constants.KafkaConstants; import com.til.kafka.consumer.MPAKafkaConsumer; public class App { public static void main(String[] args) { // Partitions to which a consumer has to assign TopicPartition partition = new TopicPartition(KafkaConstants.KAFKA_TOPIC, 0); // This will start a consumer in new thread new Thread(new Runnable() { @Override public void run() { MPAKafkaConsumer mpaKafkaConsumer = new MPAKafkaConsumer(KafkaConstants.KAFKA_BROKER_STRING); mpaKafkaConsumer.subscribe(Arrays.asList(partition)); } }).start(); } }
-
Topic(復(fù)制/災(zāi)備/并行化)
可以理解為一個(gè)MQ消息隊(duì)列的名字。每條發(fā)布到Kafka集群的消息都有一個(gè)類別憔鬼,這個(gè)類別被稱為topic龟劲。(物理上不同topic的消息分開存儲(chǔ),邏輯上一個(gè)topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶只需指定消息的topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)轴或。
每個(gè)topic都有一個(gè)Log(topic在硬盤上的存儲(chǔ))昌跌,每個(gè)Log被分為多個(gè)pritions和segments。在硬盤上表現(xiàn)為多個(gè)文件照雁。
-
Partition:
parition是物理上的概念蚕愤,每個(gè)topic包含一個(gè)或多個(gè)partition,創(chuàng)建topic時(shí)可指定parition數(shù)量。每個(gè)partition對(duì)應(yīng)于一個(gè)文件夾萍诱,該文件夾下存儲(chǔ)該partition的數(shù)據(jù)和索引文件悬嗓。為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的topic可以分布到多個(gè) broker(即服務(wù)器)上裕坊,一個(gè)topic可以分為多個(gè)partition包竹,每個(gè)partition是一個(gè)有序的隊(duì)列。partition中的每條消息 都會(huì)被分配一個(gè)有序的id(offset)籍凝。kafka只保證按一個(gè)partition中的順序?qū)⑾l(fā)給consumer映企,不保證一個(gè)topic的整體 (多個(gè)partition間)的順序。也就是說静浴,一個(gè)topic在集群中可以有多個(gè)partition堰氓,那么分區(qū)的策略是什么?(消息發(fā)送到哪個(gè)分區(qū)上苹享,有兩種基本的策略双絮,一是采用Key Hash算法,一是采用Round Robin算法)
patition的備份數(shù): 可以配置parititon的備份數(shù)量得问,每個(gè)parition都有一個(gè)leader server和0到多個(gè)follower servers囤攀,其中l(wèi)eader server處理一個(gè)parition中的所有的讀和寫(和想的不太一樣)。follower 復(fù)制leader宫纬,在leader掛掉后進(jìn)行替換焚挠,Kafka還使用分區(qū)在組內(nèi)進(jìn)行并行消費(fèi)者處理。Kafka在Kafka集群中的服務(wù)器上分發(fā)主題日志分區(qū)漓骚。每個(gè)服務(wù)器通過共享分區(qū)leader來(lái)處理其數(shù)據(jù)和請(qǐng)求的共享(不太懂)蝌衔。
-
zookeeper
用來(lái)管理集群,協(xié)調(diào)broker/cluster的拓?fù)浣Y(jié)構(gòu)蝌蹂,管理集群中哪些broker是新增的噩斟,哪些已經(jīng)掛掉了,新增了一個(gè)topic還是移除了一個(gè)topic孤个,同時(shí)用來(lái)Broker topic partition中l(wèi)eader的選擇剃允。
Kafka的數(shù)據(jù)存儲(chǔ)
主要接收topic中partition數(shù)據(jù)的存儲(chǔ),partition是以文件夾的形式存在具體的borker本機(jī)上(為了效率齐鲤,并不依賴hdfs斥废,自己維護(hù)多份數(shù)據(jù))
-
segment文件的組成
對(duì)于一個(gè)partition(在Broker中以文件夾的形式存在),里面又有很多大小相等的segment數(shù)據(jù)文件(這個(gè)文件具體大小可以在
config/server.properties
中進(jìn)行設(shè)置)给郊,這種特性可以方便old segment file的快速刪除牡肉。- segment file 組成:由2部分組成,分別為index file和data file丑罪,這兩個(gè)文件是一一對(duì)應(yīng)的荚板,后綴”.index”和”.log”分別表示索引文件和數(shù)據(jù)文件凤壁;其中index文件結(jié)構(gòu)很簡(jiǎn)單,每一行都是一個(gè)key,value對(duì)
key 是消息的序號(hào)offset吩屹,value 是消息的物理位置偏移量. - segment file 命名規(guī)則:partition的第一個(gè)segment從0開始跪另,后續(xù)每個(gè)segment文件名為上一個(gè)segment文件最后一條消息的offset,ofsset的數(shù)值最大為64位(long類型),20位數(shù)字字符長(zhǎng)度煤搜,沒有數(shù)字用0填充免绿。如下圖所示:
- segment file 組成:由2部分組成,分別為index file和data file丑罪,這兩個(gè)文件是一一對(duì)應(yīng)的荚板,后綴”.index”和”.log”分別表示索引文件和數(shù)據(jù)文件凤壁;其中index文件結(jié)構(gòu)很簡(jiǎn)單,每一行都是一個(gè)key,value對(duì)
查找:給定一個(gè)offset,查找message擦盾。過程如下:根據(jù)segment文件的命名嘲驾,進(jìn)行二分查找,找到對(duì)應(yīng)的index和log文件迹卢,然后進(jìn)入index 順序查找到小于或等于offset的key(為了保證快速查找使用稀疏索引)辽故,拿到該index文件中offset對(duì)應(yīng)的index,在log文件中順序查找到需要查找的offset的message腐碱。
kafka日志清理
有兩種策略:
- 一種是上面的cleanupLogs根據(jù)時(shí)間或大小策略(粗粒度)
- 還有一種是針對(duì)每個(gè)key的日志刪除策略(細(xì)粒度)即LogCleaner方式誊垢,清理不包括activeSegment(即使超時(shí)),如果消息沒有key症见,那只能采用第一種清理策略了喂走。
日志壓縮保證了:
- 任何消費(fèi)者如果能夠趕上Log的Head部分,它就會(huì)看到寫入的每條消息谋作,這些消息都是順序遞增(中間不會(huì)間斷)的offset
- 總是維持消息的有序性芋肠,壓縮并不會(huì)對(duì)消息進(jìn)行重新排序,而是移除一些消息
- 每條消息的offset永遠(yuǎn)不會(huì)被改變遵蚜,它是日志文件標(biāo)識(shí)位置的永久編號(hào)
- 讀取/消費(fèi)時(shí)如果從最開始的offset=0開始帖池,那么至少可以看到所有記錄按照它們寫入的順序得到的最終狀態(tài)(狀態(tài)指的是value,相同key不同value吭净,最終的狀態(tài)以最新的value為準(zhǔn)):因?yàn)檫@種場(chǎng)景下寫入順序和讀取順序是一致的碘裕,寫入時(shí)和讀取時(shí)offset都是不斷遞增。舉例寫入key1的value在offset=1和offst=5的值分別是v1和v2攒钳,那么讀取到offset=1時(shí)帮孔,最終的狀態(tài)(value值)是v1女坑,讀取到offset=5時(shí)顷锰,最終狀態(tài)是v2(不能指望說讀取到offset=1時(shí)就要求狀態(tài)是v2)