kafka5

一慨亲、Kafka在zookeeper中存儲結構圖

回到頂部

二庄撮、分析

2.1 topic注冊信息

/brokers/topics/[topic] :

存儲某個topic的partitions所有分配信息

[zk: localhost:2181(CONNECTED)1] get /brokers/topics/topic2

Schema:

{

? ? "version":"版本編號目前固定為數字1",

? ? "partitions": {

? ? ? ? "partitionId編號": [

? ? ? ? ? ? 同步副本組brokerId列表

? ? ? ? ],

? ? ? ? "partitionId編號": [

? ? ? ? ? ? 同步副本組brokerId列表

? ? ? ? ],

? ? ? ? .......

? ? }

}

Example:

{"version": 1,

"partitions": {

"2": [1, 2, 3],

"1": [0, 1, 2],

"0": [3, 0, 1],}}

2.2 partition狀態(tài)信息

/brokers/topics/[topic]/partitions/[0...N] ?其中[0..N]表示partition索引號

/brokers/topics/[topic]/partitions/[partitionId]/state

Schema:

{"controller_epoch": 表示kafka集群中的中央控制器選舉次數,"leader": 表示該partition選舉leader的brokerId,"version": 版本編號默認為1,"leader_epoch": 該partition leader選舉次數,"isr": [同步副本組brokerId列表]

}

Example:

{"controller_epoch":1,"leader":3,"version":1,"leader_epoch":0,"isr": [3,0,1]

}

2.3 Broker注冊信息

/brokers/ids/[0...N]? ? ? ? ? ? ? ? ?

每個broker的配置文件中都需要指定一個數字類型的id(全局不可重復),此節(jié)點為臨時znode(EPHEMERAL)

Schema:

{"jmx_port": jmx端口號,"timestamp": kafka broker初始啟動時的時間戳,"host": 主機名或ip地址,"version": 版本編號默認為1,"port": kafka broker的服務端端口號,由server.properties中參數port確定

}

Example:

{"jmx_port": -1,"timestamp":"1525741823119""version":1,"host":"hadoop1","port":9092}

2.4 Controller epoch

/controller_epoch -->? int (epoch) ??

此值為一個數字,kafka集群中第一個broker第一次啟動時為1,以后只要集群中center controller中央控制器所在broker變更或掛掉毫捣,就會重新選舉新的center controller蔽豺,每次center controller變更controller_epoch值就會 + 1;?

2.5 Controller注冊信息

/controller -> int (broker id of the controller) ?存儲center controller中央控制器所在kafka broker的信息

Schema:

{"version": 版本編號默認為1,"brokerid": kafka集群中broker唯一編號,"timestamp": kafka broker中央控制器變更時的時間戳

}

Example:

{"version":1,"brokerid":0,"timestamp":"1525741822769"}


2.6 補充Consumer and Consumer group

a.每個consumer客戶端被創(chuàng)建時,會向zookeeper注冊自己的信息;

b.此作用主要是為了"負載均衡".

c.同一個Consumer Group中的Consumers,Kafka將相應Topic中的每個消息只發(fā)送給其中一個Consumer砍濒。

d.Consumer Group中的每個Consumer讀取Topic的一個或多個Partitions废士,并且是唯一的Consumer叫潦;

e.一個Consumer group的多個consumer的所有線程依次有序地消費一個topic的所有partitions,如果Consumer group中所有consumer總線程大于partitions數量,則會出現空閑情況;

舉例說明:

kafka集群中創(chuàng)建一個topic為report-log ? 4 partitions 索引編號為0,1,2,3

假如有目前有三個消費者node:注意-->一個consumer中一個消費線程可以消費一個或多個partition.

如果每個consumer創(chuàng)建一個consumer thread線程,各個node消費情況如下官硝,node1消費索引編號為0,1分區(qū)矗蕊,node2費索引編號為2,node3費索引編號為3

如果每個consumer創(chuàng)建2個consumer thread線程,各個node消費情況如下(是從consumer node先后啟動狀態(tài)來確定的)氢架,node1消費索引編號為0,1分區(qū)傻咖;node2費索引編號為2,3;node3為空閑狀態(tài)

總結:

從以上可知岖研,Consumer Group中各個consumer是根據先后啟動的順序有序消費一個topic的所有partitions的卿操。

如果Consumer Group中所有consumer的總線程數大于partitions數量,則可能consumer thread或consumer會出現空閑狀態(tài)孙援。

2.7 Consumer均衡算法

當一個group中,有consumer加入或者離開時,會觸發(fā)partitions均衡.均衡的最終目的,是提升topic的并發(fā)消費能力.

1) 假如topic1,具有如下partitions: P0,P1,P2,P3

2) 加入group中,有如下consumer: C0,C1

3) 首先根據partition索引號對partitions排序: P0,P1,P2,P3

4) 根據(consumer.id + '-'+ thread序號)排序: C0,C1

5) 計算倍數: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)

6) 然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

2.8 Consumer注冊信息

每個consumer都有一個唯一的ID(consumerId可以通過配置文件指定,也可以由系統(tǒng)生成),此id用來標記消費者信息.

/consumers/[groupId]/ids/[consumerIdString]

是一個臨時的znode,此節(jié)點的值為請看consumerIdString產生規(guī)則,即表示此consumer目前所消費的topic + partitions列表.

consumerId產生規(guī)則:

StringconsumerUuid?= null;

if(config.consumerId!=null && config.consumerId)

consumerUuid = consumerId;

else {

String?uuid?= UUID.randomUUID()

consumerUuid = "%s-%d-%s".format(

InetAddress.getLocalHost.getHostName, System.currentTimeMillis,

uuid.getMostSignificantBits().toHexString.substring(0,8));

}

String?consumerIdString?= config.groupId + "_" + consumerUuid;

[zk: localhost:2181(CONNECTED)11]get/consumers/console-consumer-2304/ids/console-consumer-2304_hadoop2-1525747915241-6b48ff32


Schema:

{"version": 版本編號默認為1,"subscription": {//訂閱topic列表"topic名稱": consumer中topic消費者線程數

},"pattern":"static","timestamp":"consumer啟動時的時間戳"}

Example:

{"version":1,"subscription": {"topic2":1},"pattern":"white_list","timestamp":"1525747915336"}

2.9 Consumer owner

/consumers/[groupId]/owners/[topic]/[partitionId] ->?consumerIdString + threadId索引編號

a) 首先進行"Consumer Id注冊";

b) 然后在"Consumer id 注冊"節(jié)點下注冊一個watch用來監(jiān)聽當前group中其他consumer的"退出"和"加入";只要此znode path下節(jié)點列表變更,都會觸發(fā)此group下consumer的負載均衡.(比如一個consumer失效,那么其他consumer接管partitions).

c) 在"Broker id 注冊"節(jié)點下,注冊一個watch用來監(jiān)聽broker的存活情況;如果broker列表變更,將會觸發(fā)所有的groups下的consumer重新balance.

2.10 Consumer offset

/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)

用來跟蹤每個consumer目前所消費的partition中最大的offset

此znode為持久節(jié)點,可以看出offset跟group_id有關,以表明當消費者組(consumer group)中一個消費者失效,

重新觸發(fā)balance,其他consumer可以繼續(xù)消費.

2.11 Re-assign partitions

/admin/reassign_partitions

{

? "fields":[

? ? ? {

? ? ? ? "name":"version",

? ? ? ? "type":"int",

? ? ? ? "doc":"version id"? ? ? },

? ? ? {

? ? ? ? "name":"partitions",

? ? ? ? "type":{

? ? ? ? ? ? "type":"array",

? ? ? ? ? ? "items":{

? ? ? ? ? ? ? "fields":[

? ? ? ? ? ? ? ? ? {

? ? ? ? ? ? ? ? ? ? "name":"topic",

? ? ? ? ? ? ? ? ? ? "type":"string",

? ? ? ? ? ? ? ? ? ? "doc":"topic of the partition to be reassigned"? ? ? ? ? ? ? ? ? },

? ? ? ? ? ? ? ? ? {

? ? ? ? ? ? ? ? ? ? "name":"partition",

? ? ? ? ? ? ? ? ? ? "type":"int",

? ? ? ? ? ? ? ? ? ? "doc":"the partition to be reassigned"? ? ? ? ? ? ? ? ? },

? ? ? ? ? ? ? ? ? {

? ? ? ? ? ? ? ? ? ? "name":"replicas",

? ? ? ? ? ? ? ? ? ? "type":"array",

? ? ? ? ? ? ? ? ? ? "items":"int",

? ? ? ? ? ? ? ? ? ? "doc":"a list of replica ids"? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ],

? ? ? ? ? ? }

? ? ? ? ? ? "doc":"an array of partitions to be reassigned to new replicas"? ? ? ? }

? ? ? }

? ]

}

Example:

{

? "version":1,

? "partitions":

? ? [

? ? ? ? {

? ? ? ? ? ? "topic":"Foo",

? ? ? ? ? ? "partition":1,

? ? ? ? ? ? "replicas": [0,1,3]

? ? ? ? }

? ? ]? ? ? ? ? ?

}

2.12 Preferred replication election

/admin/preferred_replica_election

{

? "fields":[

? ? ? {

? ? ? ? "name":"version",

? ? ? ? "type":"int",

? ? ? ? "doc":"version id"? ? ? },

? ? ? {

? ? ? ? "name":"partitions",

? ? ? ? "type":{

? ? ? ? ? ? "type":"array",

? ? ? ? ? ? "items":{

? ? ? ? ? ? ? "fields":[

? ? ? ? ? ? ? ? ? {

? ? ? ? ? ? ? ? ? ? "name":"topic",

? ? ? ? ? ? ? ? ? ? "type":"string",

? ? ? ? ? ? ? ? ? ? "doc":"topic of the partition for which preferred replica election should be triggered"? ? ? ? ? ? ? ? ? },

? ? ? ? ? ? ? ? ? {

? ? ? ? ? ? ? ? ? ? "name":"partition",

? ? ? ? ? ? ? ? ? ? "type":"int",

? ? ? ? ? ? ? ? ? ? "doc":"the partition for which preferred replica election should be triggered"? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ],

? ? ? ? ? ? }

? ? ? ? ? ? "doc":"an array of partitions for which preferred replica election should be triggered"? ? ? ? }

? ? ? }

? ]

}

例子:

{

? "version":1,

? "partitions":

? ? [

? ? ? ? {

? ? ? ? ? ? "topic":"Foo",

? ? ? ? ? ? "partition":1? ? ? ?

? ? ? ? },

? ? ? ? {

? ? ? ? ? ? "topic":"Bar",

? ? ? ? ? ? "partition":0? ? ? ?

? ? ? ? }

? ? ]? ? ? ? ? ?

}

2.13 刪除topics

/admin/delete_topics

Schema:

{ "fields":

? ? [ {"name":"version","type":"int","doc":"version id"},

? ? ? {"name":"topics",

? ? ? "type": {"type":"array","items":"string","doc":"an array of topics to be deleted"}

? ? ? } ]

}

例子:

{

? "version":1,

? "topics": ["foo","bar"]

}

2.14 Topic配置

/config/topics/[topic_name]

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末害淤,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子拓售,更是在濱河造成了極大的恐慌窥摄,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,366評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件础淤,死亡現場離奇詭異崭放,居然都是意外死亡哨苛,警方通過查閱死者的電腦和手機,發(fā)現死者居然都...
    沈念sama閱讀 93,521評論 3 395
  • 文/潘曉璐 我一進店門币砂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來建峭,“玉大人,你說我怎么就攤上這事决摧∫谡簦” “怎么了?”我有些...
    開封第一講書人閱讀 165,689評論 0 356
  • 文/不壞的土叔 我叫張陵蜜徽,是天一觀的道長。 經常有香客問我票摇,道長拘鞋,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,925評論 1 295
  • 正文 為了忘掉前任矢门,我火速辦了婚禮盆色,結果婚禮上,老公的妹妹穿的比我還像新娘祟剔。我一直安慰自己隔躲,他們只是感情好,可當我...
    茶點故事閱讀 67,942評論 6 392
  • 文/花漫 我一把揭開白布物延。 她就那樣靜靜地躺著宣旱,像睡著了一般。 火紅的嫁衣襯著肌膚如雪叛薯。 梳的紋絲不亂的頭發(fā)上浑吟,一...
    開封第一講書人閱讀 51,727評論 1 305
  • 那天,我揣著相機與錄音耗溜,去河邊找鬼组力。 笑死,一個胖子當著我的面吹牛抖拴,可吹牛的內容都是我干的燎字。 我是一名探鬼主播,決...
    沈念sama閱讀 40,447評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼阿宅,長吁一口氣:“原來是場噩夢啊……” “哼候衍!你這毒婦竟也來了?” 一聲冷哼從身側響起洒放,我...
    開封第一講書人閱讀 39,349評論 0 276
  • 序言:老撾萬榮一對情侶失蹤脱柱,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后拉馋,有當地人在樹林里發(fā)現了一具尸體榨为,經...
    沈念sama閱讀 45,820評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡惨好,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,990評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了随闺。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片日川。...
    茶點故事閱讀 40,127評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖矩乐,靈堂內的尸體忽然破棺而出龄句,到底是詐尸還是另有隱情,我是刑警寧澤散罕,帶...
    沈念sama閱讀 35,812評論 5 346
  • 正文 年R本政府宣布分歇,位于F島的核電站,受9級特大地震影響欧漱,放射性物質發(fā)生泄漏职抡。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,471評論 3 331
  • 文/蒙蒙 一误甚、第九天 我趴在偏房一處隱蔽的房頂上張望缚甩。 院中可真熱鬧,春花似錦窑邦、人聲如沸擅威。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,017評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽郊丛。三九已至,卻和暖如春瞧筛,著一層夾襖步出監(jiān)牢的瞬間宾袜,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,142評論 1 272
  • 我被黑心中介騙來泰國打工驾窟, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留庆猫,地道東北人。 一個月前我還...
    沈念sama閱讀 48,388評論 3 373
  • 正文 我出身青樓绅络,卻偏偏與公主長得像月培,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子恩急,可洞房花燭夜當晚...
    茶點故事閱讀 45,066評論 2 355