kafka相關(guān)知識(shí)

為什么要分區(qū)

1.方便在集群中擴(kuò)展,每個(gè)Partition可以通過調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè)topic又可以由多個(gè)Partition組成袁余,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)
2.可以提高并發(fā),因?yàn)榭梢砸訮artition為單位讀寫了。

分區(qū)引發(fā)的問題

1.生產(chǎn)者如何將消息分發(fā)到不同分區(qū)中(生產(chǎn)者的分區(qū)分配策略)

 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) 
 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
 public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)
 public ProducerRecord(String topic, Integer partition, K key, V value)
 public ProducerRecord(String topic, K key, V value)
 public ProducerRecord(String topic, V value)

(1)可以指定消費(fèi)哪個(gè)分區(qū)
(2)沒有指明Partition值惫周,但是有key的情況下,將key的hash值與topic的partition值進(jìn)行取余得到partition值
(3)既沒有partition值有沒有key值的情況下康栈,第一次調(diào)用時(shí)會(huì)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用在這個(gè)整數(shù)上自增)递递,將這個(gè)值與topic可用的partition總結(jié)書取余得到partition值,也就是常說的round-robin算法

2.消費(fèi)者組中啥么,怎么指定哪個(gè)partition由哪個(gè)consumer來消費(fèi)(消費(fèi)者的分區(qū)分配策略)
(1)RangeAssignor(范圍分區(qū))
(2)RoundRobinAssignor(輪詢分區(qū))
(3)StrickyAssignor 分配策略
(4)可以指定消費(fèi)哪個(gè)分區(qū)

//消費(fèi)指定分區(qū)的時(shí)候登舞,不需要再訂閱 
//kafkaConsumer.subscribe(Collections.singletonList(topic));
//消費(fèi)指定的分區(qū) 
TopicPartition topicPartition=new TopicPartition(topic,0); 
kafkaConsumer.assign(Arrays.asList(topicPartition));

producer如何最大化確保消息發(fā)送到broker上不被丟失且不重復(fù)

在介紹怎么保證消息不丟不重之前,先介紹一些概念

生產(chǎn)者ack應(yīng)答機(jī)制
 //設(shè)置ack應(yīng)答悬荣,分為0,1菠秒,-1(all)
 properties.put(ProducerConfig.ACKS_CONFIG,"all");

acks參數(shù)配置
0: (At Most Once)producer不等待broker的ack,這一操作提供了一個(gè)最低的延遲氯迂,broker一接收到還沒有寫入磁盤就已經(jīng)返回践叠,當(dāng)broker故障時(shí)有可能丟失數(shù)據(jù)
1: producer等待broker的ack,partition的leader落盤成功后返回ack,如果在follower同步成功之前l(fā)eader故障嚼蚀,那么會(huì)丟失數(shù)據(jù)禁灼;
all( At Least Once):producer等待broker的ack,partition的leader和follower全部落盤成功后才返回ack,但是如果在follower同步完成后驰坊,broker發(fā)送ack之前匾二,leader發(fā)生故障,那么會(huì)造成數(shù)據(jù)重復(fù)拳芙;

partition副本

每個(gè)分區(qū)可以有多個(gè)副本察藐,并且在副本集合中會(huì)存在一個(gè)leader的副本,所有的讀寫請(qǐng)求都是由leader副本來進(jìn)行處理舟扎。剩余的其他副本都做為follower副本分飞,follower副本會(huì)從leader副本同步消息日志。

** 通過下面的命令去創(chuàng)建帶2個(gè)副本的topic
sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 3 --partitions 3 --topic secondTopic
冪等性

所謂的冪等性就是指Producer不論向server發(fā)送多少次的重復(fù)數(shù)據(jù)睹限,server端都只會(huì)持久化一次譬猫,冪等性結(jié)合At Least Once,就不會(huì)造成數(shù)據(jù)重復(fù);

 //啟用冪等性
 properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");

開啟冪等性的Producer在初始化的時(shí)候會(huì)被分配一個(gè)PID,發(fā)往同一個(gè)Partition的消息會(huì)附帶Sequence Number,而broker端會(huì)對(duì)<PID,Partition,SeqNumner>做緩存羡疗,當(dāng)具有想溝通主鍵的消息提交時(shí)染服,broker只會(huì)持久化一條。

但是Producer重啟就會(huì)變化叨恨,同時(shí)不同的Partition也具有不同的主鍵柳刮,所以冪等性無法保證跨分區(qū)跨會(huì)話的Exactly Once

事務(wù)

冪等性只能保證單分區(qū)單會(huì)話內(nèi)數(shù)據(jù)只持久化一條,對(duì)于跨分區(qū)跨會(huì)話


image.png

事務(wù)性事例:
Kafka 事務(wù)性的使用方法也非常簡單,用戶只需要在 Producer 的配置中配置 transactional.id秉颗,通過 initTransactions() 初始化事務(wù)狀態(tài)信息痢毒,再通過 beginTransaction() 標(biāo)識(shí)一個(gè)事務(wù)的開始,然后通過 commitTransaction() 或 abortTransaction() 對(duì)事務(wù)進(jìn)行 commit 或 abort蚕甥,示例如下所示:

Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");
props.put("acks", "all");
KafkaProducer producer = new KafkaProducer(props);
producer.initTransactions();

try {
 String msg = "matt test";
 producer.beginTransaction();
 producer.send(new ProducerRecord(topic, "0", msg.toString()));
 producer.send(new ProducerRecord(topic, "1", msg.toString()));
 producer.send(new ProducerRecord(topic, "2", msg.toString()));
 producer.commitTransaction();
} catch (ProducerFencedException e1) {
 e1.printStackTrace();
 producer.close();
} catch (KafkaException e2) {
 e2.printStackTrace();
 producer.abortTransaction();
}
producer.close();

參考:http://matt33.com/2018/11/04/kafka-transaction/#%E4%BA%8B%E5%8A%A1%E6%80%A7%E7%A4%BA%E4%BE%8B

producer如何最大化確保消息發(fā)送到broker上不被丟失且不重復(fù)

副本+ack哪替、冪等性,事務(wù)

1.每個(gè)partition設(shè)置多個(gè)副本
2.acks設(shè)置為all
3.冪等性雖然acks設(shè)置為all菇怀,但是可能數(shù)據(jù)重復(fù)凭舶,所以引入了冪等性
At Least Once + 冪等性 = Exactly Once
4.對(duì)于跨分區(qū)跨會(huì)話就不能用冪等性,要用事務(wù)

consumer如何最大化確保消息發(fā)送到broker上不被丟失且不重復(fù)

由自動(dòng)提交改為手動(dòng)提交

ISR

ISR表示目前“可用且消息量與leader相差不多的副本集合敏释,這是整個(gè)副本集合的一個(gè)子集”库快。怎么去理解
可用和相差不多這兩個(gè)詞呢摸袁?具體來說钥顽,ISR集合中的副本必須滿足兩個(gè)條件

  1. 副本所在節(jié)點(diǎn)必須維持著與zookeeper的連接
  2. 副本最后一條消息的offset與leader副本的最后一條消息的offset之間的差值不能超過指定的閾值
    (replica.lag.time.max.ms) replica.lag.time.max.ms:如果該follower在此時(shí)間間隔內(nèi)一直沒有追
    上過leader的所有消息,則該follower就會(huì)被剔除isr列表
  3. ISR數(shù)據(jù)保存在Zookeeper的 /brokers/topics/<topic>/partitions/<partitionId>/state
    節(jié)點(diǎn)中
    follower副本把leader副本LEO之前的日志全部同步完成時(shí)靠汁,則認(rèn)為follower副本已經(jīng)追趕上了leader
    副本蜂大,這個(gè)時(shí)候會(huì)更新這個(gè)副本的lastCaughtUpTimeMs標(biāo)識(shí),kafk副本管理器會(huì)啟動(dòng)一個(gè)副本過期檢
    查的定時(shí)任務(wù)蝶怔,這個(gè)任務(wù)會(huì)定期檢查當(dāng)前時(shí)間與副本的lastCaughtUpTimeMs的差值是否大于參數(shù)
    replica.lag.time.max.ms 的值奶浦,如果大于,則會(huì)把這個(gè)副本踢出ISR集合
image.png
LEO和HW
image.png

image.png
控制器

控制器其實(shí)就是一個(gè) broker, 只不過它除了具有一般 broker 的功能之外, 還負(fù)責(zé)分區(qū)
首領(lǐng)的選舉踢星。集群里第一個(gè)啟動(dòng)的 broker 通過在 Zookeeper 里創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn)/controuer
讓自己成為控制器澳叉。 其他 broker 在啟動(dòng)時(shí)也會(huì)嘗試創(chuàng)建這個(gè)節(jié)點(diǎn),不過它們會(huì)收到一個(gè)“節(jié)
點(diǎn)已存在”的異常,然后“意識(shí)”到控制器節(jié)點(diǎn)已存在, 也就是說集群里已經(jīng)有一個(gè)控制器了 。
其他 broker 在控制器節(jié)點(diǎn)上創(chuàng)建 Zookeeperwatch 對(duì)象,這樣它們就可以收到這個(gè)節(jié)點(diǎn)的變
更通知沐悦。這種方式可以確保集群里一次只有一個(gè)控制器存在成洗。
如果控制器被關(guān)閉或者與 Zookeeper 斷開連接, zookeeper 上的臨時(shí)節(jié)點(diǎn)就會(huì)消失。 集
群里的其他 broker 通過 watch 對(duì)象得到控制器節(jié)點(diǎn)消失的通知, 它們會(huì)嘗試讓自己成為新
的控制器藏否。 第一個(gè)在 Zookeeper 里成功創(chuàng)建控制器節(jié)點(diǎn)的 broker 就會(huì)成為新的控制器, 其
他節(jié)點(diǎn)會(huì)收到“節(jié)點(diǎn)已存在”的異常,然后在新的控制器節(jié)點(diǎn)上再次創(chuàng)建 watch 對(duì)象瓶殃。
當(dāng)控制器發(fā)現(xiàn)一個(gè) broker 已經(jīng)離開集群,它就知道,那些失去首領(lǐng)的分區(qū)需要一個(gè)新首
領(lǐng) (這些分區(qū)的首領(lǐng)剛好是在這個(gè) broker 上)。 控制器遍歷這些分區(qū), 并確定誰應(yīng)該成為新
首領(lǐng) (簡單來說就是分區(qū)副本列表里的下一個(gè)副本) , 然后向所有包含新首領(lǐng)或現(xiàn)有跟隨者
的 broker 發(fā)送請(qǐng)求副签。該請(qǐng)求消息包含了誰是新首領(lǐng)以及誰是分區(qū)跟隨者的信息遥椿。隨后,新首
領(lǐng)開始處理來自生產(chǎn)者和消費(fèi)者的情求,而跟隨者開始從新首領(lǐng)那里復(fù)制消息。
當(dāng)控制器發(fā)現(xiàn)一個(gè) broker 加入集群時(shí), 它會(huì)使用 broker ID 來檢査新加入的 broker 是
否包含現(xiàn)有分區(qū)的副本淆储。 如果有, 控制器就把變更通知發(fā)送給新加入的 broker 和其他
broker, 新 broker 上的副本開始從首領(lǐng)那里復(fù)制消息冠场。
簡而言之, Kafka 使用 Zookeeper 的臨時(shí)節(jié)點(diǎn)來選舉控制器,并在節(jié)點(diǎn)加入集群或退出集
群時(shí)通知控制器。 控制器負(fù)責(zé)在節(jié)點(diǎn)加入或離開集群時(shí)進(jìn)行分區(qū)首領(lǐng)選舉本砰。

群組協(xié)調(diào)器

用于消費(fèi)者分區(qū)再分配碴裙。
消費(fèi)者要加入群組時(shí),會(huì)向群組協(xié)調(diào)器發(fā)送一個(gè) JoinGroup 請(qǐng)求,第一個(gè)加入群主的消
費(fèi)者成為群主青团,群主會(huì)獲得群組的成員列表譬巫,并負(fù)責(zé)給每一個(gè)消費(fèi)者分配分區(qū)。分配完畢后督笆,
群組把分配情況發(fā)送給群組協(xié)調(diào)器芦昔,協(xié)調(diào)器再把這些信息發(fā)送給所有的消費(fèi)者,每個(gè)消費(fèi)者
只能看到自己的分配信息娃肿,只有群主知道群組里所有消費(fèi)者的分配信息咕缎。這個(gè)過程在每次再
均衡時(shí)都會(huì)發(fā)生。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末料扰,一起剝皮案震驚了整個(gè)濱河市凭豪,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌晒杈,老刑警劉巖嫂伞,帶你破解...
    沈念sama閱讀 217,406評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件宝剖,死亡現(xiàn)場(chǎng)離奇詭異憔鬼,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)琳疏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評(píng)論 3 393
  • 文/潘曉璐 我一進(jìn)店門粪般,熙熙樓的掌柜王于貴愁眉苦臉地迎上來拼余,“玉大人,你說我怎么就攤上這事亩歹〕准啵” “怎么了?”我有些...
    開封第一講書人閱讀 163,711評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵小作,是天一觀的道長亭姥。 經(jīng)常有香客問我,道長躲惰,這世上最難降的妖魔是什么致份? 我笑而不...
    開封第一講書人閱讀 58,380評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮础拨,結(jié)果婚禮上氮块,老公的妹妹穿的比我還像新娘。我一直安慰自己诡宗,他們只是感情好滔蝉,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著塔沃,像睡著了一般蝠引。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,301評(píng)論 1 301
  • 那天螃概,我揣著相機(jī)與錄音矫夯,去河邊找鬼。 笑死吊洼,一個(gè)胖子當(dāng)著我的面吹牛训貌,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播冒窍,決...
    沈念sama閱讀 40,145評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼递沪,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了综液?” 一聲冷哼從身側(cè)響起款慨,我...
    開封第一講書人閱讀 39,008評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎谬莹,沒想到半個(gè)月后檩奠,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,443評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡届良,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評(píng)論 3 334
  • 正文 我和宋清朗相戀三年笆凌,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了圣猎。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片士葫。...
    茶點(diǎn)故事閱讀 39,795評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖送悔,靈堂內(nèi)的尸體忽然破棺而出慢显,到底是詐尸還是另有隱情,我是刑警寧澤欠啤,帶...
    沈念sama閱讀 35,501評(píng)論 5 345
  • 正文 年R本政府宣布荚藻,位于F島的核電站,受9級(jí)特大地震影響洁段,放射性物質(zhì)發(fā)生泄漏应狱。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評(píng)論 3 328
  • 文/蒙蒙 一祠丝、第九天 我趴在偏房一處隱蔽的房頂上張望疾呻。 院中可真熱鬧,春花似錦写半、人聲如沸岸蜗。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽璃岳。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間铃慷,已是汗流浹背单芜。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留犁柜,地道東北人缓溅。 一個(gè)月前我還...
    沈念sama閱讀 47,899評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像赁温,于是被迫代替她去往敵國和親坛怪。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • Design 1. Motivation 我們?cè)O(shè)計(jì)Kafka用來作為統(tǒng)一的平臺(tái)來處理大公司可能擁有的所有實(shí)時(shí)數(shù)據(jù)源...
    BlackManba_24閱讀 1,374評(píng)論 0 8
  • 1 Kafka概述 1.1 定義 Kafka是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列股囊,主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)...
    djm猿閱讀 683評(píng)論 0 4
  • Kafka 是一個(gè)java開發(fā)的mq中間件袜匿,依賴于zookeper,有高可用稚疹,高吞吐量等特點(diǎn)居灯。 優(yōu)勢(shì) 可靠性:pa...
    何笙閱讀 14,906評(píng)論 1 9
  • 一、概述 Kafka是一個(gè)具有高吞吐量内狗,高拓展性怪嫌,高性能和高可靠的基于發(fā)布訂閱模式的消息隊(duì)列,是由領(lǐng)英基于Java...
    服務(wù)端開發(fā)閱讀 782評(píng)論 1 5
  • 2月26日是一個(gè)很有壓力的日子,因?yàn)槁咐穑谀且惶煸刖叮矣瓉韮砷T新課程,其中一門数初,就是以作業(yè)量大以強(qiáng)度高著稱的自詡...
    笨魚森森閱讀 264評(píng)論 0 1