kafka 架構(gòu)

https://www.cnblogs.com/cyfonly/p/5954614.html

2.1 拓撲結(jié)構(gòu)

image.png

2.2 相關(guān)概念

1.producer:消息生產(chǎn)者庆锦,發(fā)布消息到kafka服務(wù)器辫继。

2.broker:kafka集群服務(wù)器硼婿。

3.topic:話題锌半,接收produce發(fā)布信息寇漫。

4.partition:是物理概念州胳,每個topic包含一個或多個分區(qū)。

5.consumer:消費者亚亲,消費topic消息

6.consumer group:每個消費者屬于這個組,可以被一個consumer消費捌归,也可以被一個組消費惜索。

7.replice:高可用剃浇,副本個數(shù),保證分區(qū)的高可靠(partition)

8.leader:replice的一個角色角塑,副本的一個角色淘讥,生產(chǎn)者和消費者之和leader交互。

9.follower:replice的一個角色窒朋,從leader中復(fù)制數(shù)據(jù)侥猩。

10.controller:kafka其中一個服務(wù)器,用來進行l(wèi)eader eletion 競選唧取。

11.zookeeper:kafka通過zookeeper來存儲meta信息杰标。

2.3 zookeeper 節(jié)點

image.png

三、producer 發(fā)布消息
3.1寫入方式
Produce采用push模式將消息發(fā)布到broker腔剂,每條消息都被追加(append)到partition中,(順序?qū)懭氪疟P袜漩,效率高于隨機寫湾碎,保障kafka吞吐率)
3.2 消息路由介褥,produce發(fā)布消息到broker時,會根據(jù)分區(qū)算法將數(shù)據(jù)存儲到某個分區(qū)(partition)
算法機制:
1>指定分區(qū)溢陪,那就直接使用睛廊,
2>未指定分區(qū),但指定key咆霜,可以通過key的value進行hash選擇partition
3>partition和key都沒指定蛾坯,使用輪訓(xùn)選擇一個partition
3.3 寫入流程
Produce寫入消息的序列圖


image.png

流程說明:
1.producer先從zookerper的/brokers/../stase的節(jié)點找到分區(qū)的leader疏遏。
2.Prodercer將消息發(fā)送給leader
3.Leder將消息寫入到本地log
4.Followers從leader pull 消息,寫入本地log后發(fā)送ACK
5.Leader收到所有ISR中的replice的ACK后,增加HW(high watemark宝当,最后commit的offset)并向producer發(fā)送ACK
3.4 producer delivery guarantee保證生產(chǎn)者數(shù)據(jù)不丟失
是producer向broker發(fā)送消息時庆揩,一旦消息被commit確定,由于replication的存在虏辫,就不會丟失锈拨,但是如果發(fā)送數(shù)據(jù)給broker后奕枢,遇到網(wǎng)絡(luò)問題,萌焰。谷浅。無法判斷消息的存在,但是producer可以生成一種類似主鍵的東西撼玄,發(fā)生故障后违施,可以重復(fù)多次發(fā)送磕蒲。
四、broker 保存消息
4.1存儲方式
物理上把topic分成一個或多個partition(系統(tǒng)默認分區(qū)=3)兔院,每個物理上對應(yīng)一個文件夾(該文件存儲partition的所有消息和索引文件)


image.png

4.2存儲策略
無論消息是否被消費坊萝,kafka都會保留所有消息,有兩種方式可以刪除舊數(shù)據(jù)
1 時間 :log.Retention.hore=168 7day
2 文件大小: log.retention.bytes=1073741824
4.3 topic 創(chuàng)建與刪除
4.3.1創(chuàng)建topic
image.png

流程
  1. controller 在 ZooKeeper 的 /brokers/topics 節(jié)點上注冊 watcher菩鲜,當(dāng) topic 被創(chuàng)建接校,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配狮崩。

  2. controller從 /brokers/ids 讀取當(dāng)前所有可用的 broker 列表睦柴,對于 set_p 中的每一個 partition:
    2.1 從分配給該 partition 的所有 replica(稱為AR)中任選一個可用的 broker 作為新的 leader,并將AR設(shè)置為新的 ISR
    2.2 將新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state

  3. controller 通過 RPC 向相關(guān)的 broker 發(fā)送 LeaderAndISRRequest侣诵。
    4.3.2 刪除 topic


    image.png
  4. controller 在 zooKeeper 的 /brokers/topics 節(jié)點上注冊 watcher窝趣,當(dāng) topic 被刪除训柴,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配幻馁。

  5. 若 delete.topic.enable=false,結(jié)束膘滨;否則 controller 注冊在 /admin/delete_topics 上的 watch 被 fire稀拐,controller 通過回調(diào)向?qū)?yīng)的 broker 發(fā)送 StopReplicaRequest。

五铲咨、kafka HA
5.1replication:(副本)纤勒,同一個partition可能有多個replic隆檀。如沒有副本情況下,一般broker宕機泉坐,生產(chǎn)者不能在生產(chǎn)到這個分區(qū)坚冀,消費者不能這消費,引入replication后,同一個partition可能會有多個副本(3個)這時需要在這些reolica之間選出一個leader构捡,producer和consumer只與leader交互勾徽,其他replica作為follower從leader中復(fù)制數(shù)據(jù)。

5.2 leader failover
當(dāng)partition對應(yīng)的leder宕機時畅姊,需要從follower中選新leader若未,選舉原則倾鲫,新leader必須有久leader commit過的所有消息。
kafka 在 zookeeper 中(/brokers/.../state)動態(tài)維護了一個 ISR(in-sync replicas)隙疚,由3.3節(jié)的寫入流程可知 ISR 里面的所有 replica 都跟上了 leader供屉,只有 ISR 里面的成員才能選為 leader溺蕉。對于 f+1 個 replica,一個 partition 可以在容忍 f 個 replica 失效的情況下保證消息不丟失撵割。
5.3故障轉(zhuǎn)移broker failover


image.png

流程解說:

  1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 節(jié)點注冊 Watcher啡彬,當(dāng) broker 宕機時 zookeeper 會 fire watch
  2. controller 從 /brokers/ids 節(jié)點讀取可用broker
  3. controller決定set_p庶灿,該集合包含宕機 broker 上的所有 partition
  4. 對 set_p 中的每一個 partition
    4.1 從/brokers/topics/[topic]/partitions/[partition]/state 節(jié)點讀取 ISR
    4.2 決定新 leader(如4.3節(jié)所描述)
    4.3 將新 leader、ISR腾誉、controller_epoch 和 leader_epoch 等信息寫入 state 節(jié)點
  5. 通過 RPC 向相關(guān) broker 發(fā)送 leaderAndISRRequest 命令
    5.4 controller failover
    當(dāng) controller 宕機時會觸發(fā) controller failover峻呕。每個broker 都會在 zookeeper 的 "/controller" 節(jié)點注冊 watcher瘦癌,當(dāng) controller 宕機時 zookeeper 中的臨時節(jié)點消失,所有存活的 broker 收到 fire 的通知热押,每個 broker 都嘗試創(chuàng)建新的 controller path斤寇,只有一個競選成功并當(dāng)選為 controller娘锁。
    當(dāng)新的 controller 當(dāng)選時,會觸發(fā) KafkaController.onControllerFailover 方法碎税,在該方法中完成如下操作:
  6. 讀取并增加 Controller Epoch雷蹂。
  7. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注冊 watcher杯道。
  8. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注冊 watcher党巾。
  9. 通過 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注冊 watcher。
  10. 若 delete.topic.enable=true(默認值是 false)驳规,則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注冊 watcher署海。
  11. 通過 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注冊Watch。
  12. 初始化 ControllerContext 對象镀梭,設(shè)置當(dāng)前所有 topic踱启,“活”著的 broker 列表,所有 partition 的 leader 及 ISR等透罢。
  13. 啟動 replicaStateMachine 和 partitionStateMachine琐凭。
  14. 將 brokerState 狀態(tài)設(shè)置為 RunningAsController浊服。
  15. 將每個 partition 的 Leadership 信息發(fā)送給所有“活”著的 broker牙躺。
  16. 若 auto.leader.rebalance.enable=true(默認值是true)腕扶,則啟動 partition-rebalance 線程半抱。
  17. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應(yīng)的Topic炼幔。
  18. consumer 消費消息
    6.1 consumer API
    kafka 提供了兩套 consumer API:
  19. The high-level Consumer API
  20. The SimpleConsumer API
    6.1.1 The high-level consumer API
    high-level consumer API 提供了 consumer group 的語義史简,一個消息只能被 group 內(nèi)的一個 consumer 所消費圆兵,且 consumer 消費消息時不關(guān)注 offset,最后一個 offset 由 zookeeper 保存刀脏。
    使用 high-level consumer API 可以是多線程的應(yīng)用愈污,應(yīng)當(dāng)注意:
  21. 如果消費線程大于 patition 數(shù)量,則有些線程將收不到消息
  22. 如果 patition 數(shù)量大于線程數(shù)茫陆,則有些線程多收到多個 patition 的消息
  23. 如果一個線程消費多個 patition擎析,則無法保證你收到的消息的順序揍魂,而一個 patition 內(nèi)的消息是有序的
    6.1.2 The SimpleConsumer API
    如果你想要對 patition 有更多的控制權(quán),那就應(yīng)該使用 SimpleConsumer API喜最,比如:
  24. 多次讀取一個消息
  25. 只消費一個patition 中的部分消息
  26. 使用事務(wù)來保證一個消息僅被消費一次
    但是使用此 API 時瞬内,partition限书、offset倦西、broker、leader 等對你不再透明粉铐,需要自己去管理蝙泼。你需要做大量的額外工作:
  27. 必須在應(yīng)用程序中跟蹤 offset裆装,從而確定下一條應(yīng)該消費哪條消息
  28. 應(yīng)用程序需要通過程序獲知每個 Partition 的 leader 是誰
  29. 需要處理 leader 的變更
    6.2 consumer group
    kafka 的分配單位是 patition哨免。每個 consumer 都屬于一個 group,一個 partition 只能被同一個 group 內(nèi)的一個 consumer 所消費(也就保障了一個消息只能被 group 內(nèi)的一個 consuemr 所消費)载荔,但是多個 group 可以同時消費這個 partition懒熙。
    6.3 消費方式
    Consumer采用pull(拉)模式從broker中讀取數(shù)據(jù)。
    Push(推)模式很難適應(yīng)消費速率不同的消費者徘钥,應(yīng)為消息發(fā)送速率是有broker決定的肢娘,他的目的是盡可能快的傳遞消息橱健,但是這樣很容易造成consumer來不及處理消息,典型的表現(xiàn)就是拒絕接受臼节,或網(wǎng)絡(luò)擁擠网缝,而pull模式則可以根據(jù)consumer的消費能力適當(dāng)?shù)乃俾氏M消息蟋定。對kafka而言溢吻,pull(拉)模式更適合果元,它可簡化broker的設(shè)計而晒,consumer可自主控制消費消息的速率,同事consumer可以自己控制自己的消費方式--可批量消費也可以逐條消費迅耘,同時還能選擇不同的提交方式從而從而實現(xiàn)不同的傳輸語義监署。
    6.4 consumer delivery guarantee消費者的交貨保證
    1.如果一定要做到Exactly once(正好一次)钠乏,就需要協(xié)調(diào)offset和實際操作的輸出晓避。
    做法是引入兩個階段只壳,如果讓offset和輸出在同一個地方吼句。會簡介通用事格,這方法可能更好分蓖。如果把最新的offset和數(shù)據(jù)本身一起寫到HDFS,那就可以保證數(shù)據(jù)的輸出终娃,和offset的更新要嘛都完成棠耕。要嘛都完不成柠新,(offset是存在于zookeeper中的恨憎,無法存儲在hdfs,而低API的offset是自己維護的瓤荔,可以存放HDFS中)

<u>kafka consumer防止數(shù)據(jù)丟失</u>

如果希望能夠嚴格的不丟數(shù)據(jù)输硝,解決辦法有兩個:

· 手動commit offset点把,并針對partition_num啟同樣數(shù)目的consumer進程屿附,這樣就能保證一個consumer進程占有一個partition挺份,commit offset的時候不會影響別的partition的offset。但這個方法比較局限影暴,因為partition和consumer進程的數(shù)目必須嚴格對應(yīng)型宙。

· 另一個方法同樣需要手動commit offset妆兑,另外在consumer端再將所有fetch到的數(shù)據(jù)緩存到queue里,當(dāng)把queue里所有的數(shù)據(jù)處理完之后芯勘,再批量提交offset荷愕,這樣就能保證只有處理完的數(shù)據(jù)才被commit棍矛。

七、注意事項

7.1 producer 無法發(fā)送消息的問題

最開始在本機搭建了kafka偽集群荐类,本地 producer 客戶端成功發(fā)布消息至 broker玉罐。隨后在服務(wù)器上搭建了 kafka 集群厌小,在本機連接該集群战秋,producer 卻無法發(fā)布消息到 broker(奇怪也沒有拋錯)透硝。最開始懷疑是 iptables 沒開放濒生,于是開放端口埋泵,結(jié)果還不行(又開始是代碼問題、版本問題等等,倒騰了很久)丽声。最后沒辦法礁蔗,一項一項查看 server.properties 配置,發(fā)現(xiàn)以下兩個配置:

The address the socket server listens on. It will get the value returned from

java.net.InetAddress.getCanonicalHostName() if not configured.

FORMAT:

listeners = security_protocol://host_name:port

EXAMPLE:

listeners = PLAINTEXT://your.host.name:9092

listeners=PLAINTEXT://:9092

Hostname and port the broker will advertise to producers and consumers. If not set,

it uses the value for "listeners" if configured. Otherwise, it will use the value

returned from java.net.InetAddress.getCanonicalHostName().

advertised.listeners=PLAINTEXT://your.host.name:9092

以上說的就是 advertised.listeners 是 broker 給 producer 和 consumer 連接使用的雁社,如果沒有設(shè)置浴井,就使用 listeners,而如果 host_name 沒有設(shè)置的話霉撵,就使用 java.net.InetAddress.getCanonicalHostName() 方法返回的主機名磺浙。

修改方法:

1. listeners=PLAINTEXT://121.10.26.XXX:9092

2. advertised.listeners=PLAINTEXT://121.10.26.XXX:9092

修改后重啟服務(wù)徒坡,正常工作撕氧。

關(guān)于更多 kafka 配置說明http://blog.csdn.net/louisliaoxh/article/details/51516084

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市喇完,隨后出現(xiàn)的幾起案子伦泥,更是在濱河造成了極大的恐慌,老刑警劉巖何暮,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件奄喂,死亡現(xiàn)場離奇詭異,居然都是意外死亡海洼,警方通過查閱死者的電腦和手機跨新,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來坏逢,“玉大人域帐,你說我怎么就攤上這事∈钦” “怎么了肖揣?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長浮入。 經(jīng)常有香客問我龙优,道長,這世上最難降的妖魔是什么事秀? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任彤断,我火速辦了婚禮,結(jié)果婚禮上易迹,老公的妹妹穿的比我還像新娘宰衙。我一直安慰自己,他們只是感情好睹欲,可當(dāng)我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布供炼。 她就那樣靜靜地躺著一屋,像睡著了一般。 火紅的嫁衣襯著肌膚如雪袋哼。 梳的紋絲不亂的頭發(fā)上冀墨,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天,我揣著相機與錄音先嬉,去河邊找鬼轧苫。 笑死,一個胖子當(dāng)著我的面吹牛疫蔓,可吹牛的內(nèi)容都是我干的含懊。 我是一名探鬼主播,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼衅胀,長吁一口氣:“原來是場噩夢啊……” “哼岔乔!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起滚躯,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤雏门,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后掸掏,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體茁影,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年丧凤,在試婚紗的時候發(fā)現(xiàn)自己被綠了霹抛。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片年碘。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出余掖,到底是詐尸還是另有隱情作谭,我是刑警寧澤硼讽,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布捉偏,位于F島的核電站,受9級特大地震影響农渊,放射性物質(zhì)發(fā)生泄漏患蹂。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一砸紊、第九天 我趴在偏房一處隱蔽的房頂上張望传于。 院中可真熱鬧,春花似錦批糟、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至,卻和暖如春否淤,著一層夾襖步出監(jiān)牢的瞬間悄但,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工石抡, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留檐嚣,地道東北人。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓啰扛,卻偏偏與公主長得像嚎京,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子隐解,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容