為什么要分區(qū)
1.方便在集群中擴(kuò)展,每個(gè)Partition可以通過調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè)topic又可以由多個(gè)Partition組成袁余,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)
2.可以提高并發(fā),因?yàn)榭梢砸訮artition為單位讀寫了。
分區(qū)引發(fā)的問題
1.生產(chǎn)者如何將消息分發(fā)到不同分區(qū)中(生產(chǎn)者的分區(qū)分配策略)
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)
public ProducerRecord(String topic, Integer partition, K key, V value)
public ProducerRecord(String topic, K key, V value)
public ProducerRecord(String topic, V value)
(1)可以指定消費(fèi)哪個(gè)分區(qū)
(2)沒有指明Partition值惫周,但是有key的情況下,將key的hash值與topic的partition值進(jìn)行取余得到partition值
(3)既沒有partition值有沒有key值的情況下康栈,第一次調(diào)用時(shí)會(huì)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用在這個(gè)整數(shù)上自增)递递,將這個(gè)值與topic可用的partition總結(jié)書取余得到partition值,也就是常說的round-robin算法
2.消費(fèi)者組中啥么,怎么指定哪個(gè)partition由哪個(gè)consumer來消費(fèi)(消費(fèi)者的分區(qū)分配策略)
(1)RangeAssignor(范圍分區(qū))
(2)RoundRobinAssignor(輪詢分區(qū))
(3)StrickyAssignor 分配策略
(4)可以指定消費(fèi)哪個(gè)分區(qū)
//消費(fèi)指定分區(qū)的時(shí)候登舞,不需要再訂閱
//kafkaConsumer.subscribe(Collections.singletonList(topic));
//消費(fèi)指定的分區(qū)
TopicPartition topicPartition=new TopicPartition(topic,0);
kafkaConsumer.assign(Arrays.asList(topicPartition));
producer如何最大化確保消息發(fā)送到broker上不被丟失且不重復(fù)
在介紹怎么保證消息不丟不重之前,先介紹一些概念
生產(chǎn)者ack應(yīng)答機(jī)制
//設(shè)置ack應(yīng)答悬荣,分為0,1菠秒,-1(all)
properties.put(ProducerConfig.ACKS_CONFIG,"all");
acks參數(shù)配置
0: (At Most Once)producer不等待broker的ack,這一操作提供了一個(gè)最低的延遲氯迂,broker一接收到還沒有寫入磁盤就已經(jīng)返回践叠,當(dāng)broker故障時(shí)有可能丟失數(shù)據(jù)
1: producer等待broker的ack,partition的leader落盤成功后返回ack,如果在follower同步成功之前l(fā)eader故障嚼蚀,那么會(huì)丟失數(shù)據(jù)禁灼;
all( At Least Once):producer等待broker的ack,partition的leader和follower全部落盤成功后才返回ack,但是如果在follower同步完成后驰坊,broker發(fā)送ack之前匾二,leader發(fā)生故障,那么會(huì)造成數(shù)據(jù)重復(fù)拳芙;
partition副本
每個(gè)分區(qū)可以有多個(gè)副本察藐,并且在副本集合中會(huì)存在一個(gè)leader的副本,所有的讀寫請(qǐng)求都是由leader副本來進(jìn)行處理舟扎。剩余的其他副本都做為follower副本分飞,follower副本會(huì)從leader副本同步消息日志。
** 通過下面的命令去創(chuàng)建帶2個(gè)副本的topic
sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 3 --partitions 3 --topic secondTopic
冪等性
所謂的冪等性就是指Producer不論向server發(fā)送多少次的重復(fù)數(shù)據(jù)睹限,server端都只會(huì)持久化一次譬猫,冪等性結(jié)合At Least Once,就不會(huì)造成數(shù)據(jù)重復(fù);
//啟用冪等性
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
開啟冪等性的Producer在初始化的時(shí)候會(huì)被分配一個(gè)PID,發(fā)往同一個(gè)Partition的消息會(huì)附帶Sequence Number,而broker端會(huì)對(duì)<PID,Partition,SeqNumner>做緩存羡疗,當(dāng)具有想溝通主鍵的消息提交時(shí)染服,broker只會(huì)持久化一條。
但是Producer重啟就會(huì)變化叨恨,同時(shí)不同的Partition也具有不同的主鍵柳刮,所以冪等性無法保證跨分區(qū)跨會(huì)話的Exactly Once
事務(wù)
冪等性只能保證單分區(qū)單會(huì)話內(nèi)數(shù)據(jù)只持久化一條,對(duì)于跨分區(qū)跨會(huì)話
事務(wù)性事例:
Kafka 事務(wù)性的使用方法也非常簡單,用戶只需要在 Producer 的配置中配置 transactional.id秉颗,通過 initTransactions() 初始化事務(wù)狀態(tài)信息痢毒,再通過 beginTransaction() 標(biāo)識(shí)一個(gè)事務(wù)的開始,然后通過 commitTransaction() 或 abortTransaction() 對(duì)事務(wù)進(jìn)行 commit 或 abort蚕甥,示例如下所示:
Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();
try {
String msg = "matt test";
producer.beginTransaction();
producer.send(new ProducerRecord(topic, "0", msg.toString()));
producer.send(new ProducerRecord(topic, "1", msg.toString()));
producer.send(new ProducerRecord(topic, "2", msg.toString()));
producer.commitTransaction();
} catch (ProducerFencedException e1) {
e1.printStackTrace();
producer.close();
} catch (KafkaException e2) {
e2.printStackTrace();
producer.abortTransaction();
}
producer.close();
參考:http://matt33.com/2018/11/04/kafka-transaction/#%E4%BA%8B%E5%8A%A1%E6%80%A7%E7%A4%BA%E4%BE%8B
producer如何最大化確保消息發(fā)送到broker上不被丟失且不重復(fù)
副本+ack哪替、冪等性,事務(wù)
1.每個(gè)partition設(shè)置多個(gè)副本
2.acks設(shè)置為all
3.冪等性雖然acks設(shè)置為all菇怀,但是可能數(shù)據(jù)重復(fù)凭舶,所以引入了冪等性
At Least Once + 冪等性 = Exactly Once
4.對(duì)于跨分區(qū)跨會(huì)話就不能用冪等性,要用事務(wù)
consumer如何最大化確保消息發(fā)送到broker上不被丟失且不重復(fù)
由自動(dòng)提交改為手動(dòng)提交
ISR
ISR表示目前“可用且消息量與leader相差不多的副本集合敏释,這是整個(gè)副本集合的一個(gè)子集”库快。怎么去理解
可用和相差不多這兩個(gè)詞呢摸袁?具體來說钥顽,ISR集合中的副本必須滿足兩個(gè)條件
- 副本所在節(jié)點(diǎn)必須維持著與zookeeper的連接
- 副本最后一條消息的offset與leader副本的最后一條消息的offset之間的差值不能超過指定的閾值
(replica.lag.time.max.ms) replica.lag.time.max.ms:如果該follower在此時(shí)間間隔內(nèi)一直沒有追
上過leader的所有消息,則該follower就會(huì)被剔除isr列表 - ISR數(shù)據(jù)保存在Zookeeper的 /brokers/topics/<topic>/partitions/<partitionId>/state
節(jié)點(diǎn)中
follower副本把leader副本LEO之前的日志全部同步完成時(shí)靠汁,則認(rèn)為follower副本已經(jīng)追趕上了leader
副本蜂大,這個(gè)時(shí)候會(huì)更新這個(gè)副本的lastCaughtUpTimeMs標(biāo)識(shí),kafk副本管理器會(huì)啟動(dòng)一個(gè)副本過期檢
查的定時(shí)任務(wù)蝶怔,這個(gè)任務(wù)會(huì)定期檢查當(dāng)前時(shí)間與副本的lastCaughtUpTimeMs的差值是否大于參數(shù)
replica.lag.time.max.ms 的值奶浦,如果大于,則會(huì)把這個(gè)副本踢出ISR集合
LEO和HW
控制器
控制器其實(shí)就是一個(gè) broker, 只不過它除了具有一般 broker 的功能之外, 還負(fù)責(zé)分區(qū)
首領(lǐng)的選舉踢星。集群里第一個(gè)啟動(dòng)的 broker 通過在 Zookeeper 里創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn)/controuer
讓自己成為控制器澳叉。 其他 broker 在啟動(dòng)時(shí)也會(huì)嘗試創(chuàng)建這個(gè)節(jié)點(diǎn),不過它們會(huì)收到一個(gè)“節(jié)
點(diǎn)已存在”的異常,然后“意識(shí)”到控制器節(jié)點(diǎn)已存在, 也就是說集群里已經(jīng)有一個(gè)控制器了 。
其他 broker 在控制器節(jié)點(diǎn)上創(chuàng)建 Zookeeperwatch 對(duì)象,這樣它們就可以收到這個(gè)節(jié)點(diǎn)的變
更通知沐悦。這種方式可以確保集群里一次只有一個(gè)控制器存在成洗。
如果控制器被關(guān)閉或者與 Zookeeper 斷開連接, zookeeper 上的臨時(shí)節(jié)點(diǎn)就會(huì)消失。 集
群里的其他 broker 通過 watch 對(duì)象得到控制器節(jié)點(diǎn)消失的通知, 它們會(huì)嘗試讓自己成為新
的控制器藏否。 第一個(gè)在 Zookeeper 里成功創(chuàng)建控制器節(jié)點(diǎn)的 broker 就會(huì)成為新的控制器, 其
他節(jié)點(diǎn)會(huì)收到“節(jié)點(diǎn)已存在”的異常,然后在新的控制器節(jié)點(diǎn)上再次創(chuàng)建 watch 對(duì)象瓶殃。
當(dāng)控制器發(fā)現(xiàn)一個(gè) broker 已經(jīng)離開集群,它就知道,那些失去首領(lǐng)的分區(qū)需要一個(gè)新首
領(lǐng) (這些分區(qū)的首領(lǐng)剛好是在這個(gè) broker 上)。 控制器遍歷這些分區(qū), 并確定誰應(yīng)該成為新
首領(lǐng) (簡單來說就是分區(qū)副本列表里的下一個(gè)副本) , 然后向所有包含新首領(lǐng)或現(xiàn)有跟隨者
的 broker 發(fā)送請(qǐng)求副签。該請(qǐng)求消息包含了誰是新首領(lǐng)以及誰是分區(qū)跟隨者的信息遥椿。隨后,新首
領(lǐng)開始處理來自生產(chǎn)者和消費(fèi)者的情求,而跟隨者開始從新首領(lǐng)那里復(fù)制消息。
當(dāng)控制器發(fā)現(xiàn)一個(gè) broker 加入集群時(shí), 它會(huì)使用 broker ID 來檢査新加入的 broker 是
否包含現(xiàn)有分區(qū)的副本淆储。 如果有, 控制器就把變更通知發(fā)送給新加入的 broker 和其他
broker, 新 broker 上的副本開始從首領(lǐng)那里復(fù)制消息冠场。
簡而言之, Kafka 使用 Zookeeper 的臨時(shí)節(jié)點(diǎn)來選舉控制器,并在節(jié)點(diǎn)加入集群或退出集
群時(shí)通知控制器。 控制器負(fù)責(zé)在節(jié)點(diǎn)加入或離開集群時(shí)進(jìn)行分區(qū)首領(lǐng)選舉本砰。
群組協(xié)調(diào)器
用于消費(fèi)者分區(qū)再分配碴裙。
消費(fèi)者要加入群組時(shí),會(huì)向群組協(xié)調(diào)器發(fā)送一個(gè) JoinGroup 請(qǐng)求,第一個(gè)加入群主的消
費(fèi)者成為群主青团,群主會(huì)獲得群組的成員列表譬巫,并負(fù)責(zé)給每一個(gè)消費(fèi)者分配分區(qū)。分配完畢后督笆,
群組把分配情況發(fā)送給群組協(xié)調(diào)器芦昔,協(xié)調(diào)器再把這些信息發(fā)送給所有的消費(fèi)者,每個(gè)消費(fèi)者
只能看到自己的分配信息娃肿,只有群主知道群組里所有消費(fèi)者的分配信息咕缎。這個(gè)過程在每次再
均衡時(shí)都會(huì)發(fā)生。