Kafka

分布式系統(tǒng)中,系統(tǒng)由多個(gè)子系統(tǒng)組成愈腾,數(shù)據(jù)需要在子系統(tǒng)中高性能憋活、低延遲的流轉(zhuǎn)。Kafka是"發(fā)布-訂閱"消息系統(tǒng)虱黄,是一個(gè)分布式的悦即、分區(qū)的日志服務(wù),用于處理活躍的流式數(shù)據(jù)

特點(diǎn)

  1. 高吞吐量:同時(shí)為發(fā)布和訂閱提供高吞吐量,讀速度為100MB/s辜梳,寫速度為50MB/s
  2. 持久化:消息持久化在磁盤上粱甫,同時(shí)replication防止數(shù)據(jù)丟失
  3. 分布式:所有的producer、broker和consumer都支持多個(gè)
  4. 消息處理狀態(tài)由consumer端使用offset維護(hù)作瞄,而不是由broker維護(hù)

Kafka中Zookeeper的作用

Zookeeper是1個(gè)分布式系統(tǒng)中的協(xié)調(diào)服務(wù)茶宵,主要用于集群中不同節(jié)點(diǎn)間的相互通信,解決節(jié)點(diǎn)一致性問題

Kafka中宗挥,Zookeeper用于存儲Consumer Group信息和偏移量offset乌庶,分區(qū)再平衡,實(shí)時(shí)監(jiān)控節(jié)點(diǎn)信息属韧,Broker leader選舉安拟,Producer和Consumer用Zookeeper來發(fā)現(xiàn)Broker列表
(Producer和Consumer需要知道哪些Broker是可用的,沒有zk宵喂,每個(gè)Producer和Consumer客戶端在生產(chǎn)和消費(fèi)之前都需要檢查Broker是否可用糠赦,效率太低)
Zookeeper不保存消息,消息保存在Broker上锅棕。從Kafka0.9開始拙泽,Consumer group和offset信息不保存在zk上,也保存在Broker上

  1. Broker

(1)Broker啟動時(shí)裸燎,向zk注冊顾瞻。在zk中,Broker是1個(gè)臨時(shí)節(jié)點(diǎn)znode德绿,當(dāng)Broker與zk斷開連接時(shí)衩茸,znode被刪除
/brokers/ids/brokerid

(2)Broker啟動時(shí)腥沽,向zk注冊持有的topic和partition,是1個(gè)臨時(shí)節(jié)點(diǎn)
/brokers/topics/topic_name/partition_index

(3)集群中的Broker在啟動時(shí),會進(jìn)行l(wèi)eader選舉狡恬。Broker會嘗試在zk上創(chuàng)建/controller臨時(shí)節(jié)點(diǎn)虚缎,成功創(chuàng)建的Broker會成為集群的leader蚣旱,其它Broker在leader確定時(shí)钦购,會收到"節(jié)點(diǎn)已存在"的消息,并在leader創(chuàng)建的/controller臨時(shí)節(jié)點(diǎn)上設(shè)置Watch都许。當(dāng)leader掛掉后稻薇,leader創(chuàng)建的臨時(shí)節(jié)點(diǎn)會被zk刪除,其它Broker會收到Watch通知胶征,并嘗試創(chuàng)建/controller臨時(shí)節(jié)點(diǎn)塞椎,創(chuàng)建成功的Broker成為新的leader

(4)Broker leader負(fù)責(zé)Partition leader的選舉
當(dāng)1個(gè)分區(qū)的leader所在的Broker掛掉時(shí),Broker會遍歷分區(qū)的所有副本睛低,在副本中選擇1個(gè)新的分區(qū)leader忱屑,簡單說蹬敲,就是在isr列表中選擇第1個(gè)副本作為新的分區(qū)leader。選出新的分區(qū)leader后莺戒,Broker leader向所有Broker通知這1變動,新的分區(qū)leader開始處理讀寫請求急波,分區(qū)follower開始同步新的分區(qū)leader的消息

  1. Producer

使用zk發(fā)現(xiàn)Broker列表从铲,和Topic下的Partition的leader建立連接

  1. Consumer

發(fā)現(xiàn)Broker列表,和Topic下的Partition的leader建立連接澄暮,注冊Consumer信息

Producer和Consumer需要知道哪些Broker是可用的名段,沒有zk,每個(gè)Producer和Consumer客戶端在生產(chǎn)和消費(fèi)之前都需要檢查Broker是否可用泣懊,效率太低

相關(guān)概念

  1. 生產(chǎn)者Producer伸辟,向Kafka集群發(fā)送消息
  2. 消費(fèi)者Consumer,與Kafka集群中的Broker實(shí)例建立長連接馍刮,不斷拉取消息信夫,然后進(jìn)行處理
  3. 主題Topic,通過對消息指定主題卡啰,來對消息分類

(1)1個(gè)主題Topic被認(rèn)為是1類消息静稻,每個(gè)Topic被分為多個(gè)分區(qū)Partition,Partition在存儲層面是1個(gè)增量的文件匈辱,發(fā)布到partition的消息會被追加到文件的末尾振湾,每條消息在文件中的位置稱為偏移量offset,是1long型的數(shù)字亡脸,唯一標(biāo)記1條消息

(2)創(chuàng)建分區(qū)時(shí)押搪,需要指定分區(qū)數(shù)partition和復(fù)制系數(shù)(默認(rèn)都是1)
分區(qū)數(shù)越大,吞吐量越大浅碾,需要的資源也就越多大州,Kafka集群在接收到Producer生產(chǎn)的消息時(shí),會根據(jù)均衡策略(輪詢及穗,Key-Hash摧茴,Random隨機(jī))將消息存儲到不同的partition。通過partition存儲消息埂陆,既可以存儲更多的數(shù)據(jù)(partition分布在多個(gè)Broker上苛白,避免文件達(dá)到單機(jī)磁盤上限),也可以提高讀寫效率(多個(gè)Broker處理讀寫操作的效率焚虱,肯定大于1個(gè)Broker)
復(fù)制系數(shù)的目的是冗余備份购裙,分為leader和follower,leader負(fù)責(zé)處理所有的讀寫請求鹃栽,follower會定期同步leader的數(shù)據(jù)躏率,當(dāng)leader掛掉,Kafka會選擇1個(gè)follower成為leader

創(chuàng)建Topic時(shí),Kafka集群會決定如何將partition及其副本分配到Broker上薇芝。規(guī)則是
(a)不同Partition的leader分配到不同的Broker上蓬抄,相同Partition的leader和follower分配到不同的Broker上
例如,有5個(gè)Broker夯到,Toipc有10個(gè)Partition嚷缭,復(fù)制系數(shù)為3,則需要在5個(gè)Broker上分配30個(gè)副本
(a)隨機(jī)選擇1個(gè)Broker耍贾,假如是Broker4阅爽,將Partition1的leader分配給Broker4,之后使用輪詢的方式將其他leader分配給Broker,Partition2的leader分配給Broker5荐开,Partition3的leader分配給Broker1
(b)分配完P(guān)artition leader付翁,會分配follower,需要保證1個(gè)Partition的leader和follower分配在不同的Broker

(3)被消費(fèi)的消息不是立刻被刪除晃听,文件會根據(jù)Broker的配置百侧,保留一段時(shí)間后才刪除。默認(rèn)為7天

  1. 偏移量offset杂伟,Consumer持有的元數(shù)據(jù)移层,offset是Consumer當(dāng)前消費(fèi)消息在Kafka文件中的位置

分組消費(fèi)

Consumer需要做1些高延遲的操作,例如數(shù)據(jù)寫入DB赫粥、使用數(shù)據(jù)進(jìn)行耗時(shí)計(jì)算观话。此時(shí),單個(gè)Consumer無法跟上Producer生產(chǎn)數(shù)據(jù)的速度越平,使用分組消費(fèi)模式频蛔,增加Consumer的個(gè)數(shù),提高處理能力

  1. 分區(qū)再平衡 Rebalance
    分組消費(fèi)模式下秦叛,1個(gè)Consumer只能消費(fèi)1個(gè)Topic下的1個(gè)分區(qū)晦溪,Consumer和Partition是有對應(yīng)關(guān)系的,當(dāng)出現(xiàn)1些情況挣跋,例如
    (1)Consumer加入Consumer Group
    (2)Consumer退出Consumer Group或取消訂閱
    (3)Topic下的partition個(gè)數(shù)增加
    Consumer Group中的協(xié)調(diào)者Coordinator使用分區(qū)再平衡機(jī)制三圆,來調(diào)整Consumer和Partition的對應(yīng)關(guān)系

  2. Coordinator

Kafka0.9之后,Consumer的offset不再存放在zk上避咆,而是存在Broker上舟肉,存放的位置是使用Math.abs()對groupId.hashCode()取絕對值,再對offsets.topic.num.partitions(配置文件中的offset分區(qū)總數(shù)查库,默認(rèn)50)取余

Kafka0.9后路媚,每個(gè)Consumer Group都有1個(gè)coordinator,用來管理Consumer Group和offset樊销。coordinator就是當(dāng)前Consumer Group存放offset的這個(gè)partition的leader所在的Broker整慎。coordinator 負(fù)責(zé)與Consumer Group中的所有Consumer進(jìn)行協(xié)調(diào)通信
(1)定期發(fā)送心跳脏款,監(jiān)控Consumer存活情況
(2)Consumer離開Consumer Group時(shí),會告訴coordinator
(3)將parititon的分配情況裤园,通知組內(nèi)所有Consumer

Kafka提供了2套API撤师,High-Level Consumer API和SimpleConsumer API
High-Level Consumer API提供了對Consumer Group各種操作的封裝,在消費(fèi)消息時(shí)比然,不需要關(guān)注offset的提交丈氓,會自動提交;若使用多線程
(1)如果Consumer線程>partition强法,會有線程收不到消息
(2)如果Consumer線程<partition,會有線程收到多個(gè)partition的消息
(3)如果1個(gè)線程消費(fèi)多個(gè)partition湾笛,無法保證收到消息的順序饮怯,而1個(gè)partition中的消息時(shí)有序的

如果Consumer希望從頭開始消費(fèi)Partition的全量數(shù)據(jù)
(1)使用新的Consumer Group,即"group.id"
(2)指定"auto.offset.reset"為earliest

[KafkaConsumer is not safe for multi-threaded access]

SimpleConsumer API可以多Kafka進(jìn)行更精確的控制嚎研,但是需要自己編寫代碼蓖墅,處理offset的提交,leader的變更等

生產(chǎn)者

消息發(fā)送流程:
(1)創(chuàng)建KafkaProducer對象临扮,初始化組件论矾,例如緩沖區(qū),發(fā)送消息線程
(2)創(chuàng)建ProducerRecord消息對象杆勇,設(shè)置消息Topic贪壳,key和value,將key和value序列化為byte[]
(3)分區(qū)器為消息選擇合適的分區(qū)partition蚜退,默認(rèn)使用輪詢闰靴,可實(shí)現(xiàn)Partitioner接口的partition方法自定義分區(qū)
(4)消息會根據(jù)partition發(fā)送到不同的暫存區(qū)暫存
(5)后臺消息發(fā)送線程Sender,從暫存區(qū)中獲取消息钻注,發(fā)送給Broker
(6)如果消息成功發(fā)送到Broker蚂且,返回RecordMetadata對象,包含消息目標(biāo)分區(qū)幅恋,offset和時(shí)間戳杏死;如果發(fā)生失敗,且設(shè)置了重試次數(shù)捆交,進(jìn)行重試淑翼,否則返回失敗

  1. 創(chuàng)建配置對象Properties
    (1)Broker地址 bootstrap.servers (localhost:9101)
    (2)消息寫入成功策略 acks all (0,Producer不等待確認(rèn)零渐;1窒舟,等待分區(qū)leader寫入成功;all诵盼,等待isr所有副本寫入成功)
    (3)重試次數(shù) retries 0
    (4)緩存大小 batch.size 默認(rèn)16kb惠豺,Producer會將消息緩存银还,當(dāng)消息達(dá)到一定大小時(shí),才一起發(fā)送
    (5)每次發(fā)送消息時(shí)洁墙,延遲一定時(shí)間 linger.ms 默認(rèn)0
    (6)Producer可用內(nèi)存大小 buffer.memory 默認(rèn)32M
    (7)Key和Value的序列化方式

  2. 創(chuàng)建KafkaProducer對象
    Producer<String蛹疯,String> producer = new KafkaProducer<>(props);

  3. 使用KafkaProducer對象的send(),發(fā)送1個(gè)代表消息的ProducerRecord對象热监。send()是異步的捺弦,返回1個(gè)放在Future中的RecordMetadata對象,如果使用Future.get()獲取RecordMetadata對象孝扛,會阻塞直到有結(jié)果返回列吼;可以在send()上設(shè)置回調(diào)方法Callback實(shí)現(xiàn)無阻塞

消費(fèi)者

  1. 創(chuàng)建配置對象Properties
    (1)Broker地址 bootstrap.servers
    (2)組id group.id
    (3)開啟自動提交 enable.auto.commit true
    (4)offset重置方式 auto.offset.reset 默認(rèn)latest
    earliest 之前提交過offset,從offset開始消費(fèi)苦始;未提交過寞钥,從頭開始消費(fèi)
    latest 之前提交過offset,從offset開始消費(fèi)陌选;未提交過理郑,等待分區(qū)產(chǎn)生新的消息,從新消息開始消費(fèi)
    none 之前提交過offset咨油,從offset開始消費(fèi)您炉;Consumer Group涉及的分區(qū),有1個(gè)未提交過offset役电,報(bào)錯(cuò)
    (5)自動提交頻率 auto.commit.interval.ms 默認(rèn)5s
    (6)Key和Value的序列化方式

  2. 創(chuàng)建KafkaConsumer對象赚爵,訂閱主題
    KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("sonia"));

  3. Kafka High-Level Consumer API在KafkaConsumer對象的poll()中封裝了
    (1)Consumer Group管理宴霸,調(diào)用poll()時(shí)囱晴,獲得coordinator和分區(qū)partition分布情況
    (2)獲得分區(qū)重平衡后,Consumer和Partition的對應(yīng)信息
    (3)向Broker發(fā)送心跳瓢谢,否則超時(shí)Broker會認(rèn)為Consumer死亡畸写,Consumer消費(fèi)的partition會被分配給其他的Consumer
    (4)獲得消息
    使用時(shí),將poll()放在while(true)循環(huán)中氓扛,不斷執(zhí)行poll()枯芬,不斷的從Broker pull消息;同時(shí)向Broker發(fā)送心跳采郎,維持與Broker之間的連接千所,實(shí)時(shí)感知分區(qū)再平衡后的Consumer與Partition的關(guān)系,并作出調(diào)整

[提交offset的問題]

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蒜埋,一起剝皮案震驚了整個(gè)濱河市淫痰,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌整份,老刑警劉巖待错,帶你破解...
    沈念sama閱讀 206,311評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件籽孙,死亡現(xiàn)場離奇詭異,居然都是意外死亡火俄,警方通過查閱死者的電腦和手機(jī)犯建,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來瓜客,“玉大人适瓦,你說我怎么就攤上這事∑滓牵” “怎么了玻熙?”我有些...
    開封第一講書人閱讀 152,671評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長疯攒。 經(jīng)常有香客問我揭芍,道長,這世上最難降的妖魔是什么卸例? 我笑而不...
    開封第一講書人閱讀 55,252評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮肌毅,結(jié)果婚禮上筷转,老公的妹妹穿的比我還像新娘。我一直安慰自己悬而,他們只是感情好呜舒,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著笨奠,像睡著了一般袭蝗。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上般婆,一...
    開封第一講書人閱讀 49,031評論 1 285
  • 那天到腥,我揣著相機(jī)與錄音,去河邊找鬼蔚袍。 笑死乡范,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的啤咽。 我是一名探鬼主播晋辆,決...
    沈念sama閱讀 38,340評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼宇整!你這毒婦竟也來了瓶佳?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,973評論 0 259
  • 序言:老撾萬榮一對情侶失蹤鳞青,失蹤者是張志新(化名)和其女友劉穎霸饲,沒想到半個(gè)月后为朋,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,466評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡贴彼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評論 2 323
  • 正文 我和宋清朗相戀三年潜腻,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片器仗。...
    茶點(diǎn)故事閱讀 38,039評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡融涣,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出精钮,到底是詐尸還是另有隱情威鹿,我是刑警寧澤,帶...
    沈念sama閱讀 33,701評論 4 323
  • 正文 年R本政府宣布轨香,位于F島的核電站忽你,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏臂容。R本人自食惡果不足惜科雳,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望脓杉。 院中可真熱鬧糟秘,春花似錦、人聲如沸球散。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蕉堰。三九已至凌净,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間屋讶,已是汗流浹背冰寻。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留丑婿,地道東北人性雄。 一個(gè)月前我還...
    沈念sama閱讀 45,497評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像羹奉,于是被迫代替她去往敵國和親秒旋。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評論 2 345

推薦閱讀更多精彩內(nèi)容