https://www.cnblogs.com/cyfonly/p/5954614.html
2.1 拓撲結(jié)構(gòu)
2.2 相關(guān)概念
1.producer:消息生產(chǎn)者庆锦,發(fā)布消息到kafka服務(wù)器辫继。
2.broker:kafka集群服務(wù)器硼婿。
3.topic:話題锌半,接收produce發(fā)布信息寇漫。
4.partition:是物理概念州胳,每個topic包含一個或多個分區(qū)。
5.consumer:消費者亚亲,消費topic消息
6.consumer group:每個消費者屬于這個組,可以被一個consumer消費捌归,也可以被一個組消費惜索。
7.replice:高可用剃浇,副本個數(shù),保證分區(qū)的高可靠(partition)
8.leader:replice的一個角色角塑,副本的一個角色淘讥,生產(chǎn)者和消費者之和leader交互。
9.follower:replice的一個角色窒朋,從leader中復(fù)制數(shù)據(jù)侥猩。
10.controller:kafka其中一個服務(wù)器,用來進行l(wèi)eader eletion 競選唧取。
11.zookeeper:kafka通過zookeeper來存儲meta信息杰标。
2.3 zookeeper 節(jié)點
三、producer 發(fā)布消息
3.1寫入方式
Produce采用push模式將消息發(fā)布到broker腔剂,每條消息都被追加(append)到partition中,(順序?qū)懭氪疟P袜漩,效率高于隨機寫湾碎,保障kafka吞吐率)
3.2 消息路由介褥,produce發(fā)布消息到broker時,會根據(jù)分區(qū)算法將數(shù)據(jù)存儲到某個分區(qū)(partition)
算法機制:
1>指定分區(qū)溢陪,那就直接使用睛廊,
2>未指定分區(qū),但指定key咆霜,可以通過key的value進行hash選擇partition
3>partition和key都沒指定蛾坯,使用輪訓(xùn)選擇一個partition
3.3 寫入流程
Produce寫入消息的序列圖
流程說明:
1.producer先從zookerper的/brokers/../stase的節(jié)點找到分區(qū)的leader疏遏。
2.Prodercer將消息發(fā)送給leader
3.Leder將消息寫入到本地log
4.Followers從leader pull 消息,寫入本地log后發(fā)送ACK
5.Leader收到所有ISR中的replice的ACK后,增加HW(high watemark宝当,最后commit的offset)并向producer發(fā)送ACK
3.4 producer delivery guarantee保證生產(chǎn)者數(shù)據(jù)不丟失
是producer向broker發(fā)送消息時庆揩,一旦消息被commit確定,由于replication的存在虏辫,就不會丟失锈拨,但是如果發(fā)送數(shù)據(jù)給broker后奕枢,遇到網(wǎng)絡(luò)問題,萌焰。谷浅。無法判斷消息的存在,但是producer可以生成一種類似主鍵的東西撼玄,發(fā)生故障后违施,可以重復(fù)多次發(fā)送磕蒲。
四、broker 保存消息
4.1存儲方式
物理上把topic分成一個或多個partition(系統(tǒng)默認分區(qū)=3)兔院,每個物理上對應(yīng)一個文件夾(該文件存儲partition的所有消息和索引文件)
4.2存儲策略
無論消息是否被消費坊萝,kafka都會保留所有消息,有兩種方式可以刪除舊數(shù)據(jù)
1 時間 :log.Retention.hore=168 7day
2 文件大小: log.retention.bytes=1073741824
4.3 topic 創(chuàng)建與刪除
4.3.1創(chuàng)建topic
流程
controller 在 ZooKeeper 的 /brokers/topics 節(jié)點上注冊 watcher菩鲜,當(dāng) topic 被創(chuàng)建接校,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配狮崩。
controller從 /brokers/ids 讀取當(dāng)前所有可用的 broker 列表睦柴,對于 set_p 中的每一個 partition:
2.1 從分配給該 partition 的所有 replica(稱為AR)中任選一個可用的 broker 作為新的 leader,并將AR設(shè)置為新的 ISR
2.2 將新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state-
controller 通過 RPC 向相關(guān)的 broker 發(fā)送 LeaderAndISRRequest侣诵。
4.3.2 刪除 topic
image.png controller 在 zooKeeper 的 /brokers/topics 節(jié)點上注冊 watcher窝趣,當(dāng) topic 被刪除训柴,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配幻馁。
若 delete.topic.enable=false,結(jié)束膘滨;否則 controller 注冊在 /admin/delete_topics 上的 watch 被 fire稀拐,controller 通過回調(diào)向?qū)?yīng)的 broker 發(fā)送 StopReplicaRequest。
五铲咨、kafka HA
5.1replication:(副本)纤勒,同一個partition可能有多個replic隆檀。如沒有副本情況下,一般broker宕機泉坐,生產(chǎn)者不能在生產(chǎn)到這個分區(qū)坚冀,消費者不能這消費,引入replication后,同一個partition可能會有多個副本(3個)這時需要在這些reolica之間選出一個leader构捡,producer和consumer只與leader交互勾徽,其他replica作為follower從leader中復(fù)制數(shù)據(jù)。
5.2 leader failover
當(dāng)partition對應(yīng)的leder宕機時畅姊,需要從follower中選新leader若未,選舉原則倾鲫,新leader必須有久leader commit過的所有消息。
kafka 在 zookeeper 中(/brokers/.../state)動態(tài)維護了一個 ISR(in-sync replicas)隙疚,由3.3節(jié)的寫入流程可知 ISR 里面的所有 replica 都跟上了 leader供屉,只有 ISR 里面的成員才能選為 leader溺蕉。對于 f+1 個 replica,一個 partition 可以在容忍 f 個 replica 失效的情況下保證消息不丟失撵割。
5.3故障轉(zhuǎn)移broker failover
流程解說:
- controller 在 zookeeper 的 /brokers/ids/[brokerId] 節(jié)點注冊 Watcher啡彬,當(dāng) broker 宕機時 zookeeper 會 fire watch
- controller 從 /brokers/ids 節(jié)點讀取可用broker
- controller決定set_p庶灿,該集合包含宕機 broker 上的所有 partition
- 對 set_p 中的每一個 partition
4.1 從/brokers/topics/[topic]/partitions/[partition]/state 節(jié)點讀取 ISR
4.2 決定新 leader(如4.3節(jié)所描述)
4.3 將新 leader、ISR腾誉、controller_epoch 和 leader_epoch 等信息寫入 state 節(jié)點 - 通過 RPC 向相關(guān) broker 發(fā)送 leaderAndISRRequest 命令
5.4 controller failover
當(dāng) controller 宕機時會觸發(fā) controller failover峻呕。每個broker 都會在 zookeeper 的 "/controller" 節(jié)點注冊 watcher瘦癌,當(dāng) controller 宕機時 zookeeper 中的臨時節(jié)點消失,所有存活的 broker 收到 fire 的通知热押,每個 broker 都嘗試創(chuàng)建新的 controller path斤寇,只有一個競選成功并當(dāng)選為 controller娘锁。
當(dāng)新的 controller 當(dāng)選時,會觸發(fā) KafkaController.onControllerFailover 方法碎税,在該方法中完成如下操作: - 讀取并增加 Controller Epoch雷蹂。
- 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注冊 watcher杯道。
- 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注冊 watcher党巾。
- 通過 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注冊 watcher。
- 若 delete.topic.enable=true(默認值是 false)驳规,則 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注冊 watcher署海。
- 通過 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注冊Watch。
- 初始化 ControllerContext 對象镀梭,設(shè)置當(dāng)前所有 topic踱启,“活”著的 broker 列表,所有 partition 的 leader 及 ISR等透罢。
- 啟動 replicaStateMachine 和 partitionStateMachine琐凭。
- 將 brokerState 狀態(tài)設(shè)置為 RunningAsController浊服。
- 將每個 partition 的 Leadership 信息發(fā)送給所有“活”著的 broker牙躺。
- 若 auto.leader.rebalance.enable=true(默認值是true)腕扶,則啟動 partition-rebalance 線程半抱。
- 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,則刪除相應(yīng)的Topic炼幔。
- consumer 消費消息
6.1 consumer API
kafka 提供了兩套 consumer API: - The high-level Consumer API
- The SimpleConsumer API
6.1.1 The high-level consumer API
high-level consumer API 提供了 consumer group 的語義史简,一個消息只能被 group 內(nèi)的一個 consumer 所消費圆兵,且 consumer 消費消息時不關(guān)注 offset,最后一個 offset 由 zookeeper 保存刀脏。
使用 high-level consumer API 可以是多線程的應(yīng)用愈污,應(yīng)當(dāng)注意: - 如果消費線程大于 patition 數(shù)量,則有些線程將收不到消息
- 如果 patition 數(shù)量大于線程數(shù)茫陆,則有些線程多收到多個 patition 的消息
- 如果一個線程消費多個 patition擎析,則無法保證你收到的消息的順序揍魂,而一個 patition 內(nèi)的消息是有序的
6.1.2 The SimpleConsumer API
如果你想要對 patition 有更多的控制權(quán),那就應(yīng)該使用 SimpleConsumer API喜最,比如: - 多次讀取一個消息
- 只消費一個patition 中的部分消息
- 使用事務(wù)來保證一個消息僅被消費一次
但是使用此 API 時瞬内,partition限书、offset倦西、broker、leader 等對你不再透明粉铐,需要自己去管理蝙泼。你需要做大量的額外工作: - 必須在應(yīng)用程序中跟蹤 offset裆装,從而確定下一條應(yīng)該消費哪條消息
- 應(yīng)用程序需要通過程序獲知每個 Partition 的 leader 是誰
- 需要處理 leader 的變更
6.2 consumer group
kafka 的分配單位是 patition哨免。每個 consumer 都屬于一個 group,一個 partition 只能被同一個 group 內(nèi)的一個 consumer 所消費(也就保障了一個消息只能被 group 內(nèi)的一個 consuemr 所消費)载荔,但是多個 group 可以同時消費這個 partition懒熙。
6.3 消費方式
Consumer采用pull(拉)模式從broker中讀取數(shù)據(jù)。
Push(推)模式很難適應(yīng)消費速率不同的消費者徘钥,應(yīng)為消息發(fā)送速率是有broker決定的肢娘,他的目的是盡可能快的傳遞消息橱健,但是這樣很容易造成consumer來不及處理消息,典型的表現(xiàn)就是拒絕接受臼节,或網(wǎng)絡(luò)擁擠网缝,而pull模式則可以根據(jù)consumer的消費能力適當(dāng)?shù)乃俾氏M消息蟋定。對kafka而言溢吻,pull(拉)模式更適合果元,它可簡化broker的設(shè)計而晒,consumer可自主控制消費消息的速率,同事consumer可以自己控制自己的消費方式--可批量消費也可以逐條消費迅耘,同時還能選擇不同的提交方式從而從而實現(xiàn)不同的傳輸語義监署。
6.4 consumer delivery guarantee消費者的交貨保證
1.如果一定要做到Exactly once(正好一次)钠乏,就需要協(xié)調(diào)offset和實際操作的輸出晓避。
做法是引入兩個階段只壳,如果讓offset和輸出在同一個地方吼句。會簡介通用事格,這方法可能更好分蓖。如果把最新的offset和數(shù)據(jù)本身一起寫到HDFS,那就可以保證數(shù)據(jù)的輸出终娃,和offset的更新要嘛都完成棠耕。要嘛都完不成柠新,(offset是存在于zookeeper中的恨憎,無法存儲在hdfs,而低API的offset是自己維護的瓤荔,可以存放HDFS中)
<u>kafka consumer防止數(shù)據(jù)丟失</u>
如果希望能夠嚴格的不丟數(shù)據(jù)输硝,解決辦法有兩個:
· 手動commit offset点把,并針對partition_num啟同樣數(shù)目的consumer進程屿附,這樣就能保證一個consumer進程占有一個partition挺份,commit offset的時候不會影響別的partition的offset。但這個方法比較局限影暴,因為partition和consumer進程的數(shù)目必須嚴格對應(yīng)型宙。
· 另一個方法同樣需要手動commit offset妆兑,另外在consumer端再將所有fetch到的數(shù)據(jù)緩存到queue里,當(dāng)把queue里所有的數(shù)據(jù)處理完之后芯勘,再批量提交offset荷愕,這樣就能保證只有處理完的數(shù)據(jù)才被commit棍矛。
七、注意事項
7.1 producer 無法發(fā)送消息的問題
最開始在本機搭建了kafka偽集群荐类,本地 producer 客戶端成功發(fā)布消息至 broker玉罐。隨后在服務(wù)器上搭建了 kafka 集群厌小,在本機連接該集群战秋,producer 卻無法發(fā)布消息到 broker(奇怪也沒有拋錯)透硝。最開始懷疑是 iptables 沒開放濒生,于是開放端口埋泵,結(jié)果還不行(又開始是代碼問題、版本問題等等,倒騰了很久)丽声。最后沒辦法礁蔗,一項一項查看 server.properties 配置,發(fā)現(xiàn)以下兩個配置:
The address the socket server listens on. It will get the value returned from
java.net.InetAddress.getCanonicalHostName() if not configured.
FORMAT:
listeners = security_protocol://host_name:port
EXAMPLE:
listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
Hostname and port the broker will advertise to producers and consumers. If not set,
it uses the value for "listeners" if configured. Otherwise, it will use the value
returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://your.host.name:9092
以上說的就是 advertised.listeners 是 broker 給 producer 和 consumer 連接使用的雁社,如果沒有設(shè)置浴井,就使用 listeners,而如果 host_name 沒有設(shè)置的話霉撵,就使用 java.net.InetAddress.getCanonicalHostName() 方法返回的主機名磺浙。
修改方法:
1. listeners=PLAINTEXT://121.10.26.XXX:9092
2. advertised.listeners=PLAINTEXT://121.10.26.XXX:9092
修改后重啟服務(wù)徒坡,正常工作撕氧。
關(guān)于更多 kafka 配置說明http://blog.csdn.net/louisliaoxh/article/details/51516084