Kafka高級特性解析(二)
主題
管理
使用kafka-topics.sh腳本:
選項 | 說明 |
---|---|
--config <String: name=value> | 為創(chuàng)建的或修改的主題指定配置信息辣往。支持下述 配置條目:<br />cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled.replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable |
--create | 創(chuàng)建一個新主題 |
--delete | 刪除一個主題 |
--delete-config <String: name> | 刪除現(xiàn)有主題的一個主題配置條目权埠。這些條目就 是在 --config 中給出的配置條目。 |
--alter | 更改主題的分區(qū)數(shù)量袍暴,副本分配和/或配置條目众眨。 |
--describe | 列出給定主題的細節(jié)。 |
--disable-rack-aware | 禁用副本分配的機架感知容诬。 |
--force | 抑制控制臺提示信息 |
--help | 打印幫助信息 |
--if-exists | 如果指定了該選項,則在修改或刪除主題的時 候沿腰,只有主題存在才可以執(zhí)行览徒。 |
--if-not-exists | 在創(chuàng)建主題的時候,如果指定了該選項颂龙,則只有 主題不存在的時候才可以執(zhí)行命令习蓬。 |
--list | 列出所有可用的主題。 |
--partitions <Integer: # of partitions> | 要創(chuàng)建或修改主題的分區(qū)數(shù)措嵌。 |
--replica-assignment <String:broker_id_for_part1_replica1 :broker_id_for_part1_replica2 ,broker_id_for_part2_replica1 :broker_id_for_part2_replica2 , ...> | 當(dāng)創(chuàng)建或修改主題的時候手動指定partition-to- broker的分配關(guān)系躲叼。 |
--replication-factor <Integer:replication factor> | 要創(chuàng)建的主題分區(qū)副本數(shù)。1表示只有一個副本企巢, 也就是Leader副本枫慷。 |
--topic <String: topic> | 要創(chuàng)建、修改或描述的主題名稱浪规。除了創(chuàng)建或听,修 改和描述在這里還可以使用正則表達式。 |
--topics-with-overrides | if set when describing topics, only show topics that have overridden configs |
--unavailable-partitions | if set when describing topics, only show partitions whose leader is not available |
--under-replicated-partitions | if set when describing topics, only show under replicated partitions |
--zookeeper <String: urls> | 必需的參數(shù):連接zookeeper的字符串笋婿,逗號分 隔的多個host:port列表酷宵。多個URL可以故障轉(zhuǎn) 移递礼。 |
主題中可以使用的參數(shù)定義:
屬性 | 默認值 | 服務(wù)器默認屬性 | 說明 |
---|---|---|---|
cleanup.policy | delete | log.cleanup.policy | 要么是”delete“,要么是” compact“; 這個字符串指明了針對舊日志部分的利用方式;默 認方式("delete")將會丟棄舊的部分當(dāng)他們的回收時間或者尺寸限制到達時∩醺伲”compact“將會 進行日志壓 |
compression.type | none | producer用于壓縮數(shù)據(jù)的壓縮 類型。默認是無壓縮售睹。正確的選 項值是none尊残、gzip、snappy讨永。 壓縮最好用于批量處理滔驶,批量處 理消息越多,壓縮性能越好卿闹。 | |
delete.retention.ms | 86400000 (24 hours) | log.cleaner.delete.retention.ms | 對于壓縮日志保留的最長時間揭糕, 也是客戶端消費消息的最長時間萝快,通log.retention.minutes的區(qū)別在于一個控制未壓縮數(shù)據(jù), 一個控制壓縮后的數(shù)據(jù)著角。此項配 置可以在topic創(chuàng)建時的置頂參 數(shù)覆蓋 |
flush.ms | None | log.flush.interval.ms | 此項配置用來置頂強制進行 fsync日志到磁盤的時間間隔; 例如揪漩,如果設(shè)置為1000,那么 每1000ms就需要進行一次 fsync吏口。一般不建議使用這個選 項 |
flush.messages | None | log.flush.interval.messages | 此項配置指定時間間隔:強制進 行fsync日志奄容。例如,如果這個 選項設(shè)置為1产徊,那么每條消息之 后都需要進行fsync昂勒,如果設(shè)置 為5,則每5條消息就需要進行一 次fsync舟铜。一般來說戈盈,建議你不 要設(shè)置這個值。此參數(shù)的設(shè)置, 需要在"數(shù)據(jù)可靠性"與"性能"之 間做必要的權(quán)衡.如果此值過大, 將會導(dǎo)致每次"fsync"的時間較長 (IO阻塞),如果此值過小,將會導(dǎo) 致"fsync"的次數(shù)較多,這也意味 著整體的client請求有一定的延 遲.物理server故障,將會導(dǎo)致沒 有fsync的消息丟失. |
index.interval.bytes | 4096 | log.index.interval.bytes | 默認設(shè)置保證了我們每4096個 字節(jié)就對消息添加一個索引谆刨,更 多的索引使得閱讀的消息更加靠 近塘娶,但是索引規(guī)模卻會由此增 大;一般不需要改變這個選項 |
max.message.bytes | 1000000 | max.message.bytes | kafka追加消息的最大尺寸。注 意如果你增大這個尺寸痊夭,你也必 須增大你consumer的fetch 尺 寸刁岸,這樣consumer才能fetch到 這些最大尺寸的消息。 |
min.cleanable.dirty.ratio | 0.5 | min.cleanable.dirty.ratio | 此項配置控制log壓縮器試圖進 行清除日志的頻率她我。默認情況 下虹曙,將避免清除壓縮率超過50% 的日志。這個比率避免了最大的 空間浪費 |
min.insync.replicas | 1 | min.insync.replicas | 當(dāng)producer設(shè)置 request.required.acks為-1時鸦难, min.insync.replicas指定 replicas的最小數(shù)目(必須確認 每一個repica的寫數(shù)據(jù)都是成功 的)根吁,如果這個數(shù)目沒有達到, producer會產(chǎn)生異常合蔽。 |
retention.bytes | None | log.retention.bytes | 如果使用“delete”的retention 策 略击敌,這項配置就是指在刪除日志 之前,日志所能達到的最大尺 寸拴事。默認情況下沃斤,沒有尺寸限制 而只有時間限制 |
retention.ms | 7 days | log.retention.minutes | 如果使用“delete”的retention策 略,這項配置就是指刪除日志前 日志保存的時間刃宵。 |
segment.bytes | 1GB | log.segment.bytes | kafka中l(wèi)og日志是分成一塊塊存 儲的衡瓶,此配置是指log日志劃分 成塊的大小 |
segment.index.bytes | 10MB | log.index.size.max.bytes | 此配置是有關(guān)offsets和文件位置 之間映射的索引文件的大小;一 般不需要修改這個配置 |
segment.jitter.ms | 0 | log.roll.jitter.{ms,hours} | The maximum jitter to subtract from logRollTimeMillis. |
segment.ms | 7 days | log.roll.hours | 即使log的分塊文件沒有達到需 要刪除、壓縮的大小牲证,一旦log 的時間達到這個上限哮针,就會強制 新建一個log分塊文件 |
unclean.leader.election.enable | true | 指明了是否能夠使不在ISR中 replicas設(shè)置用來作為leader |
創(chuàng)建主題
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x -partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_y --partitions 3 --replication-factor 1 --config max.message.bytes=1048576 --config segment.bytes=10485760
查看主題
## 查看所有主題名稱
kafka-topics.sh --zookeeper localhost:2181/myKafka --list
## 查看主題詳細信息
kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_y
## 查看主題配置被修改的主題
kafka-topics.sh --zookeeper localhost:2181/myKafka --topics-with-overrides --describe
修改主題
## 給主題修改配置
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_x --config max.message.bytes=1048576
## 給主題修改配置
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_x --config segment.bytes=10485760
## 刪除該主題的某個配置
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config segment.bytes --topic topic_x
刪除主題
kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x
可以在/mnt/module/kafka_2.12-1.0.2/kafka-logs看到,給主題添加刪除的標記,要過一段時間刪除十厢。
增加分區(qū)
通過命令行工具操作等太,主題的分區(qū)只能增加,不能減少蛮放。否則報錯:
ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic myTop1 currently has 2 partitions, 1 would not be an increase.
通過--alter修改主題的分區(qū)數(shù)缩抡,增加分區(qū)。
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_y --partitions 5
分區(qū)副本的分配-了解
副本分配的三個目標:
- 均衡地將副本分散于各個broker上
- 對于某個broker上分配的分區(qū)包颁,它的其他副本在其他broker上
- 如果所有的broker都有機架信息瞻想,盡量將分區(qū)的各個副本分配到不同機架上的broker。
在不考慮機架信息的情況下:
- 第一個副本分區(qū)通過輪詢的方式挑選一個broker娩嚼,進行分配蘑险。該輪詢從broker列表的隨機位置進行輪詢。
- 其余副本通過增加偏移進行分配岳悟。
分配案例:
broker-0 | broker-1 | broker-2 | broker-3 | broker-4 | |
---|---|---|---|---|---|
p0 | p1 | p2 | P3 | P4 | 1st replica |
p5 | P6 | p7 | p8 | p9 | 1st replica |
p4 | p0 | p1 | p2 | P3 | 2nd replica |
p8 | p9 | p5 | p6 | p7 | 2nd replica |
p3 | p4 | p0 | p1 | P2 | 3nd replica |
p7 | p8 | p9 | p5 | p6 | 3nd replica |
考慮到機架信息漠其,首先為每個機架創(chuàng)建一個broker列表。如: 三個機架(rack1竿音,rack2, rack3)拴驮,六個broker(0春瞬,1,2套啤,3宽气,4,5)
brokerID -> rack
0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1"
rack1:0潜沦,5
rack2:3萄涯,4
rack3:1,2
這broker列表為rack1的0唆鸡,rack2的3涝影,rack3的1,rack1的5争占,rack2的4燃逻,rack3的2 即:0, 3, 1, 5, 4, 2通過簡單的輪詢將分區(qū)分配給不同機架上的broker:
每個分區(qū)副本在分配的時候在上一個分區(qū)第一個副本開始分配的位置右移一位。
六個broker臂痕,六個分區(qū)伯襟,正好最后一個分區(qū)的第一個副本分配的位置是該broker列表的最后一個。 如果有更多的分區(qū)需要分配握童,則算法開始對follower副本進行移位分配姆怪。 這主要是為了避免每次都得到相同的分配序列。 此時,如果有一個分區(qū)等待分配(分區(qū)6)稽揭,這按照如下方式分配:
6 -> 0,4,2 (而不是像分區(qū)0那樣重復(fù)0,3,1)
跟機架相關(guān)的副本分配中俺附,永遠在機架相關(guān)的broker列表中輪詢地分配第一個副本。 其余的副 本淀衣,傾向于機架上沒有副本的broker進行副本分配昙读,除非每個機架有一個副本。 然后其他的副本又通 過輪詢的方式分配給broker膨桥。
結(jié)果是蛮浑,如果副本的個數(shù)大于等于機架數(shù),保證每個機架最少有一個副本只嚣。 否則每個機架最多保有 一個副本沮稚。 如果副本的個數(shù)和機架的個數(shù)相同,并且每個機架包含相同個數(shù)的broker册舞,可以保證副本 在機架和broker之間均勻分布蕴掏。
上圖,tp_eagle_01主題的分區(qū)0分配信息:leader分區(qū)在broker1上调鲸,同步副本分區(qū)是1和2盛杰,也就 是在broker1和broker2上的兩個副本分區(qū)是同步副本分區(qū),其中一個是leader分區(qū)藐石。
必要參數(shù)配置
kafka-topics.sh --config xx=xx --config yy=yy
配置給主題的參數(shù)即供。
屬性 | 默認值 | 服務(wù)器默認屬性 | 說明 |
---|---|---|---|
cleanup.policy | delete | log.cleanup.policy | 要么是”delete“,要么是” compact“; 這個字符串指明了針對舊日志部分的利用方式;默 認方式("delete")將會丟棄舊的部分當(dāng)他們的回收時間或者尺寸限制到達時于微《旱眨”compact“將會 進行日志壓 |
compression.type | none | producer用于壓縮數(shù)據(jù)的壓縮 類型。默認是無壓縮株依。正確的選 項值是none驱证、gzip、snappy恋腕。 壓縮最好用于批量處理抹锄,批量處 理消息越多,壓縮性能越好荠藤。 | |
delete.retention.ms | 86400000 (24 hours) | log.cleaner.delete.retention.ms | 對于壓縮日志保留的最長時間祈远, 也是客戶端消費消息的最長時間,通log.retention.minutes的區(qū)別在于一個控制未壓縮數(shù)據(jù)商源, 一個控制壓縮后的數(shù)據(jù)车份。此項配 置可以在topic創(chuàng)建時的置頂參 數(shù)覆蓋 |
flush.ms | None | log.flush.interval.ms | 此項配置用來置頂強制進行 fsync日志到磁盤的時間間隔; 例如,如果設(shè)置為1000牡彻,那么 每1000ms就需要進行一次 fsync扫沼。一般不建議使用這個選 項 |
flush.messages | None | log.flush.interval.messages | 此項配置指定時間間隔:強制進 行fsync日志出爹。例如,如果這個選項設(shè)置為1缎除,那么每條消息之 后都需要進行fsync严就,如果設(shè)置為5,則每5條消息就需要進行一 次fsync器罐。一般來說梢为,建議你不 要設(shè)置這個值。此參數(shù)的設(shè)置, 需要在"數(shù)據(jù)可靠性"與"性能"之 間做必要的權(quán)衡.如果此值過大, 將會導(dǎo)致每次"fsync"的時間較長 (IO阻塞),如果此值過小,將會導(dǎo) 致"fsync"的次數(shù)較多,這也意味 著整體的client請求有一定的延 遲.物理server故障,將會導(dǎo)致沒 有fsync的消息丟失. |
index.interval.bytes | 4096 | log.index.interval.bytes | 默認設(shè)置保證了我們每4096個 字節(jié)就對消息添加一個索引轰坊,更 多的索引使得閱讀的消息更加靠 近铸董,但是索引規(guī)模卻會由此增 大;一般不需要改變這個選項 |
max.message.bytes | 1000000 | max.message.bytes | kafka追加消息的最大尺寸。注意如果你增大這個尺寸肴沫,你也必 須增大你consumer的fetch 尺 寸粟害,這樣consumer才能fetch到 這些最大尺寸的消息。 |
min.cleanable.dirty.ratio | 0.5 | min.cleanable.dirty.ratio | 此項配置控制log壓縮器試圖進 行清除日志的頻率颤芬。默認情況 下悲幅,將避免清除壓縮率超過50% 的日志。這個比率避免了最大的 空間浪費 |
min.insync.replicas | 1 | min.insync.replicas | 當(dāng)producer設(shè)置 request.required.acks為-1時站蝠, min.insync.replicas指定 replicas的最小數(shù)目(必須確認每一個repica的寫數(shù)據(jù)都是成功 的)汰具,如果這個數(shù)目沒有達到, producer會產(chǎn)生異常菱魔。 |
retention.bytes | None | log.retention.bytes | 如果使用“delete”的retention 策 略郁副,這項配置就是指在刪除日志 之前,日志所能達到的最大尺 寸豌习。默認情況下,沒有尺寸限制 而只有時間限制 |
retention.ms | 7 days | log.retention.minutes | 如果使用“delete”的retention策 略拔疚,這項配置就是指刪除日志前 日志保存的時間肥隆。 |
segment.bytes | 1GB | log.segment.bytes | kafka中l(wèi)og日志是分成一塊塊存 儲的,此配置是指log日志劃分 成塊的大小 |
segment.index.bytes | 10MB | log.index.size.max.bytes | 此配置是有關(guān)offsets和文件位置 之間映射的索引文件的大小;一 般不需要修改這個配置 |
segment.jitter.ms | 0 | log.roll.jitter.{ms,hours} | The maximum jitter to subtract from logRollTimeMillis. |
segment.ms | 7 days | log.roll.hours | 即使log的分塊文件沒有達到需 要刪除稚失、壓縮的大小栋艳,一旦log 的時間達到這個上限,就會強制 新建一個log分塊文件 |
unclean.leader.election.enable | true | 指明了是否能夠使不在ISR中 replicas設(shè)置用來作為leader |
KafkaAdminClient應(yīng)用
說明
除了使用Kafka的bin目錄下的腳本工具來管理Kafka句各,還可以使用管理Kafka的API將某些管理查看 的功能集成到系統(tǒng)中吸占。在Kafka0.11.0.0版本之前,可以通過kafka-core包(Kafka的服務(wù)端凿宾,采用Scala 編寫)中的AdminClient和AdminUtils來實現(xiàn)部分的集群管理操作矾屯。Kafka0.11.0.0之后,又多了一個 AdminClient初厚,在kafka-client包下件蚕,一個抽象類,具體的實現(xiàn)是 org.apache.kafka.clients.admin.KafkaAdminClient。
功能與原理介紹
Kafka官網(wǎng):The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects排作。
KafkaAdminClient包含了一下幾種功能(以Kafka1.0.2版本為準):
-
創(chuàng)建主題:
createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options)
-
刪除主題:
deleteTopics(final Collection<String> topicNames, DeleteTopicsOptions options)
-
列出所有主題:
listTopics(final ListTopicsOptions options)
-
查詢主題:
describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options)
-
查詢集群信息:
describeCluster(DescribeClusterOptions options)
-
查詢配置信息:
describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options)
-
修改配置信息:
alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options)
-
修改副本的日志目錄:
alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options)
-
查詢節(jié)點的日志目錄信息:
describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options)
-
查詢副本的日志目錄信息:
describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options)
增加分區(qū):
createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options)
其內(nèi)部原理是使用Kafka自定義的一套二進制協(xié)議來實現(xiàn)牵啦,詳細可以參見Kafka協(xié)議。
用到的參數(shù)
屬性 | 說明 | 重要性 |
---|---|---|
bootstrap.servers | 向Kafka集群建立初始連接用到的host/port列表妄痪。 客戶端會使用這里列出的所有服務(wù)器進行集群其他 服務(wù)器的發(fā)現(xiàn)哈雏,而不管是否指定了哪個服務(wù)器用作 引導(dǎo)。 這個列表僅影響用來發(fā)現(xiàn)集群所有服務(wù)器的初始主機衫生。<br />字符串形式:host1:port1,host2:port2,... 由于這組服務(wù)器僅用于建立初始鏈接裳瘪,然后發(fā)現(xiàn)集 群中的所有服務(wù)器,因此沒有必要將集群中的所有地址寫在這里障簿。一般最好兩臺盹愚,以防其中一臺宕掉。 | High |
client.id | 生產(chǎn)者發(fā)送請求的時候傳遞給broker的id字符串站故。 用于在broker的請求日志中追蹤什么應(yīng)用發(fā)送了什 么消息皆怕。 一般該id是跟業(yè)務(wù)有關(guān)的字符串。 |
medium |
connections.max.idle.ms | 當(dāng)連接空閑時間達到這個值西篓,就關(guān)閉連接愈腾。long型數(shù)據(jù),默認:300000 | medium |
receive.buffer.bytes | TCP接收緩存(SO_RCVBUF)岂津,如果設(shè)置為-1虱黄,則 使用操作系統(tǒng)默認的值。int類型值吮成,默認65536橱乱, 可選值:[-1,...] | medium |
request.timeout.ms | 客戶端等待服務(wù)端響應(yīng)的最大時間。如果該時間超 時粱甫,則客戶端要么重新發(fā)起請求泳叠,要么如果重試耗 盡,請求失敗茶宵。int類型值危纫,默認:120000 | medium |
security.protocol | 跟broker通信的協(xié)議:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. string類型值,默認:PLAINTEXT | medium |
send.buffer.bytes | 用于TCP發(fā)送數(shù)據(jù)時使用的緩沖大小 (SO_SNDBUF)乌庶,-1表示使用OS默認的緩沖區(qū)大 小种蝶。 int類型值,默認值:131072 |
medium |
reconnect.backoff.max.ms | 對于每個連續(xù)的連接失敗瞒大,每臺主機的退避將成倍 增加螃征,直至達到此最大值。在計算退避增量之后透敌, 添加20%的隨機抖動以避免連接風(fēng)暴会傲。 long型值锅棕,默認1000,可選值:[0,...] | low |
reconnect.backoff.ms | 重新連接主機的等待時間淌山。避免了重連的密集循 環(huán)裸燎。該等待時間應(yīng)用于該客戶端到broker的所有連 接。 long型值泼疑,默認:50 |
low |
retries | The maximum number of times to retry a call before failing it.重試的次數(shù)德绿,達到此值,失敗退渗。 int類型值移稳,默認5。 | low |
retry.backoff.ms | 在發(fā)生失敗的時候如果需要重試会油,則該配置表示客 戶端等待多長時間再發(fā)起重試个粱。 該時間的存在避免了密集循環(huán)。 long型值翻翩,默認值:100都许。 | low |
主要操作步驟: 客戶端根據(jù)方法的調(diào)用創(chuàng)建相應(yīng)的協(xié)議請求,比如創(chuàng)建Topic的createTopics方法嫂冻,其內(nèi)部就是發(fā)
送CreateTopicRequest請求胶征。
客戶端發(fā)送請求至Kafka Broker。
Kafka Broker處理相應(yīng)的請求并回執(zhí)桨仿,比如與CreateTopicRequest對應(yīng)的是 CreateTopicResponse睛低。 客戶端接收相應(yīng)的回執(zhí)并進行解析處理。
和協(xié)議有關(guān)的請求和回執(zhí)的類基本都在org.apache.kafka.common.requests包中服傍, AbstractRequest和AbstractResponse是這些請求和響應(yīng)類的兩個父類钱雷。
綜上,如果要自定義實現(xiàn)一個功能吹零,只需要三個步驟:
- 自定義XXXOptions;
- 自定義XXXResult返回值;
- 自定義Call罩抗,然后挑選合適的XXXRequest和XXXResponse來實現(xiàn)Call類中的3個抽象方法。
package com.hhb.kafka.admin;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
/**
* @description:
* @author:
* @date: 2020-08-18 21:35
**/
public class MyAdminClientTeacher {
private KafkaAdminClient kafkaAdminClient;
/**
* 初始化KafkaAdminClient客戶端
*/
@Before
public void init() {
HashMap<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "hhb:9092");
kafkaAdminClient = (KafkaAdminClient) KafkaAdminClient.create(configs);
}
/**
* 新建一個主題
*
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void testCreateTopic() throws ExecutionException,
InterruptedException {
Map<String, String> configs = new HashMap<>();
configs.put("max.message.bytes", "1048576");
configs.put("segment.bytes", "1048576000");
NewTopic newTopic = new NewTopic("hhb", 2, (short) 1);
newTopic.configs(configs);
CreateTopicsResult topics = kafkaAdminClient.createTopics(Collections.singleton(newTopic));
KafkaFuture<Void> all = topics.all();
Void aVoid = all.get();
System.out.println(aVoid);
}
/**
* 刪除主題
*
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void testDeleteTopic() throws ExecutionException,
InterruptedException {
DeleteTopicsOptions options = new DeleteTopicsOptions();
options.timeoutMs(500);
DeleteTopicsResult deleteResult = kafkaAdminClient.deleteTopics(Collections.singleton("hhb"), options);
deleteResult.all().get();
}
/**
* 修改配置信息
*
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void testAlterTopic() throws ExecutionException, InterruptedException {
NewPartitions newPartitions = NewPartitions.increaseTo(5);
Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
newPartitionsMap.put("adm_tp_01", newPartitions);
CreatePartitionsOptions option = new CreatePartitionsOptions();
// Set to true if the request should be validated without creating new partitions.
// 如果只是驗證瘪校,而不創(chuàng)建分區(qū),則設(shè)置為true
// option.validateOnly(true);
CreatePartitionsResult partitionsResult = kafkaAdminClient.createPartitions(newPartitionsMap, option);
Void aVoid = partitionsResult.all().get();
}
/**
* 查看topic詳細信息
*
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void testDescribeTopics() throws ExecutionException,
InterruptedException {
DescribeTopicsOptions options = new DescribeTopicsOptions();
options.timeoutMs(3000);
DescribeTopicsResult topicsResult = kafkaAdminClient.describeTopics(Collections.singleton("hhb"), options);
Map<String, TopicDescription> stringTopicDescriptionMap = topicsResult.all().get();
stringTopicDescriptionMap.forEach((k, v) -> {
System.out.println(k + "\t" + v);
System.out.println("=======================================");
System.out.println(k);
boolean internal = v.isInternal();
String name = v.name();
List<TopicPartitionInfo> partitions = v.partitions();
String partitionStr = Arrays.toString(partitions.toArray());
System.out.println("內(nèi)部的?" + internal);
System.out.println("topic name = " + name);
System.out.println("分區(qū):" + partitionStr);
partitions.forEach(partition -> {
System.out.println(partition);
});
});
}
/**
* 查詢集群信息
*
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void testDescribeCluster() throws ExecutionException,
InterruptedException {
DescribeClusterResult describeClusterResult = kafkaAdminClient.describeCluster();
KafkaFuture<String> stringKafkaFuture = describeClusterResult.clusterId();
String s = stringKafkaFuture.get();
System.out.println("cluster name = " + s);
KafkaFuture<Node> controller = describeClusterResult.controller();
Node node = controller.get();
System.out.println("集群控制器:" + node);
Collection<Node> nodes = describeClusterResult.nodes().get();
nodes.forEach(node1 -> {
System.out.println(node1);
});
}
/**
* 列出所有的主題
*
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void testListTopics() throws ExecutionException, InterruptedException {
ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
//是否獲取內(nèi)置主題
listTopicsOptions.listInternal(true);
//請求超時時間名段,毫秒
listTopicsOptions.timeoutMs(500);
//列出所有的主題
ListTopicsResult listTopicsResult = kafkaAdminClient.listTopics(listTopicsOptions);
//列出所有的非內(nèi)置主題
//ListTopicsResult listTopicsResult = kafkaAdminClient.listTopics();
//打印所有的主題名稱
Set<String> strings = listTopicsResult.names().get();
for (String name : strings) {
System.err.println("name==>>" + name);
}
System.err.println("========================================================================================================");
//將請求變成同步的阱扬,直接獲取結(jié)果
Collection<TopicListing> topicListings = listTopicsResult.listings().get();
topicListings.forEach(topics -> {
//是否是一個內(nèi)置的主題,內(nèi)置的主題:_consumer_offsets_
boolean internal = topics.isInternal();
//主題名稱
String name = topics.name();
System.err.println("是否為內(nèi)部主題:" + internal + ",該主題的名字: " + name + ", toString" + topics.toString());
});
System.err.println("========================================================================================================");
}
/**
* 查詢配置信息
*
* @throws ExecutionException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void testDescribeConfigs() throws ExecutionException,
InterruptedException, TimeoutException {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
DescribeConfigsResult describeConfigsResult =
kafkaAdminClient.describeConfigs(Collections.singleton(configResource));
Map<ConfigResource, Config> configMap =
describeConfigsResult.all().get(15, TimeUnit.SECONDS);
configMap.forEach(new BiConsumer<ConfigResource, Config>() {
@Override
public void accept(ConfigResource configResource, Config config) {
ConfigResource.Type type = configResource.type();
String name = configResource.name();
System.out.println("資源名稱:" + name);
Collection<ConfigEntry> entries = config.entries();
entries.forEach(new Consumer<ConfigEntry>() {
@Override
public void accept(ConfigEntry configEntry) {
boolean aDefault = configEntry.isDefault();
boolean readOnly = configEntry.isReadOnly();
boolean sensitive = configEntry.isSensitive();
String name1 = configEntry.name();
String value = configEntry.value();
System.out.println("是否默認:" + aDefault + "\t是否 只讀?" + readOnly + "\t是否敏感?" + sensitive
+ "\t" + name1 + " --> " + value);
}
});
ConfigEntry retries = config.get("retries");
if (retries != null) {
System.out.println(retries.name() + " -->" +
retries.value());
} else {
System.out.println("沒有這個屬性");
}
}
});
}
@Test
public void testAlterConfig() throws ExecutionException,
InterruptedException {
// 這里設(shè)置后伸辟,原來資源中不沖突的屬性也會丟失麻惶,直接按照這里的配置設(shè)置
Map<ConfigResource, Config> configMap = new HashMap<>();
ConfigResource resource = new
ConfigResource(ConfigResource.Type.TOPIC, "adm_tp_01");
Config config = new Config(Collections.singleton(new
ConfigEntry("segment.bytes", "1048576000")));
configMap.put(resource, config);
AlterConfigsResult alterConfigsResult = kafkaAdminClient.alterConfigs(configMap);
Void aVoid = alterConfigsResult.all().get();
}
@Test
public void testDescribeLogDirs() throws ExecutionException,
InterruptedException {
DescribeLogDirsOptions option = new DescribeLogDirsOptions();
option.timeoutMs(1000);
DescribeLogDirsResult describeLogDirsResult = kafkaAdminClient.describeLogDirs(Collections.singleton(0), option);
Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> integerMapMap
= describeLogDirsResult.all().get();
integerMapMap.forEach(new BiConsumer<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>() {
@Override
public void accept(Integer integer, Map<String, DescribeLogDirsResponse.LogDirInfo> stringLogDirInfoMap) {
System.out.println("broker.id = " + integer);
stringLogDirInfoMap.forEach(new BiConsumer<String, DescribeLogDirsResponse.LogDirInfo>() {
@Override
public void accept(String s, DescribeLogDirsResponse.LogDirInfo logDirInfo) {
System.out.println("log.dirs:" + s);
// 查看該broker上的主題/分區(qū)/偏移量等信息 //
logDirInfo.replicaInfos.forEach(new BiConsumer<TopicPartition, DescribeLogDirsResponse.ReplicaInfo>() {
@Override
public void accept(TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo) {
int partition = topicPartition.partition();
String topic = topicPartition.topic();
boolean isFuture = replicaInfo.isFuture;
long offsetLag = replicaInfo.offsetLag;
long size = replicaInfo.size;
System.out.println("partition:" + partition + "\ttopic:" + topic + "\tisFuture:" + isFuture + "\toffsetLag:" + offsetLag + "\tsize:" + size);
}
});
}
});
}
});
}
/**
* 關(guān)閉KafkaAdminClient客戶端
*/
@After
public void destroy() {
kafkaAdminClient.close();
}
}
偏移量管理
Kafka 1.0.2,__consumer_offsets主題中保存各個消費組的偏移量信夫。早期由zookeeper管理消費組的偏移量窃蹋。
查詢方法:通過原生 kafka 提供的工具腳本進行查詢卡啰。 工具腳本的位置與名稱為 bin/kafka-consumer-groups.sh 首先運行腳本,查看幫助:
參數(shù) | 說明 |
---|---|
--all-topics | 將所有關(guān)聯(lián)到指定消費組的主題都劃歸到reset-offsets 操作范圍警没。 |
--bootstrap-server<String: server to connect to> | 必須:(基于消費組的新的消費者): 要連接的服務(wù)器地址匈辱。 |
--by-duration<String: duration> | 距離當(dāng)前時間戳的一個時間段。格式:'PnDTnHnMnS' |
--command-config <String: command config property file> | 指定配置文件杀迹,該文件內(nèi)容傳遞給Admin Client和消費者亡脸。 |
--delete | 傳值消費組名稱,刪除整個消費組與所有主題的各個分區(qū)偏移量 和所有者關(guān)系树酪。 如: --group g1 --group g2 浅碾。 傳值消費組名稱和單個主題魁袜,僅刪除該消費組到指定主題的分區(qū) 偏移量和所屬關(guān)系舞痰。如: --group g1 --group g2 --topic t1 。 傳值一個主題名稱牍陌,僅刪除指定主題與所有消費組分區(qū)偏移量以 及所屬關(guān)系疮茄。 如: --topic t1 注意:消費組的刪除僅對基于ZK保存偏移量的消費組有效滥朱,并且 要小心使用,僅刪除不活躍的消費組娃豹。 |
--describe | 描述給定消費組的偏移量差距(有多少消息還沒有消費)焚虱。 |
--execute | 執(zhí)行操作。支持的操作: reset-offsets懂版。 |
--export | 導(dǎo)出操作的結(jié)果到CSV文件鹃栽。支持的操作: reset-offsets 。 |
--from-file <String: path to CSV file> | 重置偏移量到CSV文件中定義的值躯畴。 |
--group <String: consumer group> | 目標消費組民鼓。 |
--list | 列出所有消費組。 |
--new-consumer | 使用新的消費者實現(xiàn)蓬抄。這是默認值丰嘉。隨后的發(fā)行版中會刪除這一 操作。 |
--reset-offsets | 重置消費組的偏移量嚷缭。當(dāng)前一次操作只支持一個消費組饮亏,并且該 消費組應(yīng)該是不活躍的。 有三個操作選項 <br />1. (默認)plan:要重置哪個偏移量阅爽。<br />2. execute:執(zhí)行 reset-offsets 操作<br />3. process:配合 --export 將操作結(jié)果導(dǎo)出到CSV格式路幸。<br />可以使用如下選項:<br />--to-datetime<br /> --by-period<br /> --to-earliest<br /> --to-latest<br /> --shift-by<br /> --from-file<br /> --to-current 。<br />必須選擇一個選項使用付翁。 要定義操作的范圍简肴,使用:<br />--all-topics<br />--topic 。 <br />必須選擇一個百侧,除非使用 --from-file 選項砰识。 |
--shift-by <Long: number-of-offsets> | 重置偏移量n個能扒。n可以是正值,也可以是負值辫狼。 |
--timeout <Long: timeout (ms)> | 對某些操作設(shè)置超時時間初斑。 如:對于描述指定消費組信息,指定毫秒值的最大等待時間予借,以 獲取正常數(shù)據(jù)(如剛創(chuàng)建的消費組越平,或者消費組做了一些更改操 作)。默認時間: 5000 灵迫。 |
--to-current | 重置到當(dāng)前的偏移量秦叛。 |
--to-datetime | 重置偏移量到指定的時間戳。格式:'YYYY-MM- DDTHH:mm:SS.sss' |
--to-earliest | 重置為最早的偏移量 |
--to-latest | 重置為最新的偏移量 |
--to-offset <Long: offset> | 重置到指定的偏移量瀑粥。 |
--topic <String: topic> | 指定哪個主題的消費組需要刪除挣跋,或者指定哪個主題的消費組需 要包含到 reset-offsets 操作中。對于 reset-offsets 操作狞换,還 可以指定分區(qū): topic1:0,1,2 避咆。其中0,1修噪,2表示要包含到操 作中的分區(qū)號查库。重置偏移量的操作支持多個主題一起操作。 |
--zookeeper <String: urls> | 必須黄琼,它的值樊销,你懂的。 --zookeeper node1:2181/myKafka 脏款。 |
這里我們先編寫一個生產(chǎn)者围苫,消費者的例子:
我們先啟動消費者,再啟動生產(chǎn)者撤师, 再通過 bin/kafka-consumer-groups.sh 進行消費偏移量查 詢剂府,由于kafka 消費者記錄group的消費偏移量有兩種方式 :
kafka 自維護 (新)
zookpeer 維護 (舊) ,已經(jīng)逐漸被廢棄
所以 剃盾,腳本只查看由broker維護的腺占,由zookeeper維護的可以將 --bootstrap-server 換成--zookeeper即可。
查看有哪些group ID正在進行消費
kafka-consumer-groups.sh --bootstrap-server hhb:9092 --list
注意:
這里面是沒有指定 topic痒谴,查看的是所有topic消費者的 group.id 的列表衰伯。
注意: 重名的 group.id 只會顯示一次
查看指定group.id 的消費者消費情況
kafka-consumer-groups.sh --bootstrap-server hhb:9092 --describe --group group
如果消費者停止,查看偏移量信息:
kafka-consumer-groups.sh --bootstrap-server hhb:9092 --describe --group group
將偏移量設(shè)置為最早的:
kafka-consumer-groups.sh --bootstrap-server hhb:9092 --reset-offsets --group group --topic tp_demo_02 --to-earliest --execute
將偏移量設(shè)置為最新的:
kafka-consumer-groups.sh --bootstrap-server hhb:9092 --reset-offsets --group group --topic tp_demo_02 --to-latest --execute
分別將指定主題的指定分區(qū)的偏移量向前移動10個消息:
kafka-consumer-groups.sh --bootstrap-server hhb:9092 --reset-offsets --group group --topic tp_demo_02:0 --shift-by -10 --execute
將指定主題的多個分區(qū)的偏移量向前移動5個消息:
kafka-consumer-groups.sh --bootstrap-server hhb:9092 --reset-offsets --group group --topic tp_demo_02:0,2 --shift-by -5 --execute
代碼
- Producer:
MyProducer
package com.hhb.kafka.offset.producer;
/**
* @description:
* @author:
* @date: 2020-08-19 20:51
**/
public class MyProducer {
public static void main(String[] args) {
Thread thread = new Thread(new ProducerHandler("hello lagou "));
thread.start();
}
}
KafkaProducerSingleton:
package com.hhb.kafka.offset.producer;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.Random;
/**
* @description:
* @author:
* @date: 2020-08-19 20:49
**/
public class KafkaProducerSingleton {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerSingleton.class);
private static KafkaProducer<String, String> kafkaProducer;
private Random random = new Random();
private String topic;
private int retry;
private KafkaProducerSingleton() {
}
/**
* 靜態(tài)內(nèi)部類
*
* @author tanjie
*/
private static class LazyHandler {
private static final KafkaProducerSingleton instance = new
KafkaProducerSingleton();
}
/**
* 單例模式,kafkaProducer是線程安全的,可以多線程共享一個實例 * @return
*/
public static final KafkaProducerSingleton getInstance() {
return LazyHandler.instance;
}
/**
* kafka生產(chǎn)者進行初始化
*
* @return KafkaProducer
*/
public void init(String topic, int retry) {
this.topic = topic;
this.retry = retry;
if (null == kafkaProducer) {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducer = new KafkaProducer<String, String>(props);
}
}
/**
* 通過kafkaProducer發(fā)送消息 * @param message
*/
public void sendKafkaMessage(final String message) {
ProducerRecord<String, String> record = new ProducerRecord<String,
String>(
topic, random.nextInt(3), "", message);
kafkaProducer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata,
Exception exception) {
if (null != exception) {
LOGGER.error("kafka發(fā)送消息失敗:" + exception.getMessage(), exception);
retryKakfaMessage(message);
}
}
});
}
/**
* 當(dāng)kafka消息發(fā)送失敗后,重試 *
*
* @param retryMessage
*/
private void retryKakfaMessage(final String retryMessage) {
ProducerRecord<String, String> record = new ProducerRecord<String,
String>(
topic, random.nextInt(3), "", retryMessage);
for (int i = 1; i <= retry; i++) {
try {
kafkaProducer.send(record);
return;
} catch (Exception e) {
LOGGER.error("kafka發(fā)送消息失敗:" + e.getMessage(), e);
retryKakfaMessage(retryMessage);
}
}
}
/**
* kafka實例銷毀
*/
public void close() {
if (null != kafkaProducer) {
kafkaProducer.close();
}
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getRetry() {
return retry;
}
public void setRetry(int retry) {
this.retry = retry;
}
}
ProducerHandler:
package com.hhb.kafka.offset.producer;
/**
* @description:
* @author:
* @date: 2020-08-19 20:50
**/
public class ProducerHandler implements Runnable {
private String message;
public ProducerHandler(String message) {
this.message = message;
}
@Override
public void run() {
KafkaProducerSingleton kafkaProducerSingleton =
KafkaProducerSingleton.getInstance();
kafkaProducerSingleton.init("tp_demo_02", 3);
int i = 0;
while (true) {
try {
System.out.println("當(dāng)前線程:" + Thread.currentThread().getName()
+ "\t獲取的kafka實例:" + kafkaProducerSingleton);
kafkaProducerSingleton.sendKafkaMessage("發(fā)送消息: " +
message + " " + (++i));
Thread.sleep(100);
} catch (Exception e) {
}
}
}
}
- consumer
ConsumerAutoMain
package com.hhb.kafka.offset.consumer;
/**
* @description:
* @author:
* @date: 2020-08-19 20:54
**/
public class ConsumerAutoMain {
public static void main(String[] args) {
KafkaConsumerAuto kafka_consumerAuto = new KafkaConsumerAuto();
try {
kafka_consumerAuto.execute();
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
kafka_consumerAuto.shutdown();
}
}
}
ConsumerThreadAuto:
package com.hhb.kafka.offset.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* @description:
* @author:
* @date: 2020-08-19 20:53
**/
public class ConsumerThreadAuto implements Runnable {
private ConsumerRecords<String, String> records;
private KafkaConsumer<String, String> consumer;
public ConsumerThreadAuto(ConsumerRecords<String, String> records,
KafkaConsumer<String, String> consumer) {
this.records = records;
this.consumer = consumer;
}
@Override
public void run() {
for (ConsumerRecord<String, String> record : records) {
System.out.println("當(dāng)前線程:" + Thread.currentThread()
+ "\t主題:" + record.topic()
+ "\t偏移量:" + record.offset() + "\t分區(qū):" + record.partition() + "\t獲取的消息:" + record.value());
}
}
}
KafkaConsumerAuto:
package com.hhb.kafka.offset.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @description:
* @author:
* @date: 2020-08-19 20:51
**/
public class KafkaConsumerAuto {
/**
* kafka消費者不是線程安全的
*/
private final KafkaConsumer<String, String> consumer;
private ExecutorService executorService;
public KafkaConsumerAuto() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hhb:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
// 打開自動提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put("auto.commit.interval.ms", "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(props);
// 訂閱主題
consumer.subscribe(Collections.singleton("tp_demo_02"));
}
public void execute() throws InterruptedException {
executorService = Executors.newFixedThreadPool(2);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(2_000);
if (null != records) {
executorService.submit(new ConsumerThreadAuto(records,
consumer));
}
Thread.sleep(1000);
}
}
public void shutdown() {
try {
if (consumer != null) {
consumer.close();
}
if (executorService != null) {
executorService.shutdown();
}
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("關(guān)閉線程池超時闰歪。嚎研。蓖墅。");
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
分區(qū)
副本機制
Kafka在一定數(shù)量的服務(wù)器上對主題分區(qū)進行復(fù)制库倘。 當(dāng)集群中的一個broker宕機后系統(tǒng)可以自動故障轉(zhuǎn)移到其他可用的副本上临扮,不會造成數(shù)據(jù)丟失。
--replication-factor 3 1leader+2follower
- 將復(fù)制因子為1的未復(fù)制主題稱為復(fù)制主題教翩。
- 主題的分區(qū)是復(fù)制的最小單元杆勇。
- 在非故障情況下,Kafka中的每個分區(qū)都有一個Leader副本和零個或多個Follower副本饱亿。
- 包括Leader副本在內(nèi)的副本總數(shù)構(gòu)成復(fù)制因子蚜退。
- 所有讀取和寫入都由Leader副本負責(zé)。
- 通常彪笼,分區(qū)比broker多钻注,并且Leader分區(qū)在broker之間平均分配。
Follower分區(qū)像普通的Kafka消費者一樣配猫,消費來自Leader分區(qū)的消息幅恋,并將其持久化到自己的日志中。允許Follower對日志條目拉取進行批處理泵肄。
同步節(jié)點定義:
- 節(jié)點必須能夠維持與ZooKeeper的會話(通過ZooKeeper的心跳機制)
- 對于Follower副本分區(qū)捆交,它復(fù)制在Leader分區(qū)上的寫入,并且不要延遲太多
Kafka提供的保證是腐巢,只要有至少一個同步副本處于活動狀態(tài)品追,提交的消息就不會丟失。
宕機如何恢復(fù)
-
少部分副本宕機
當(dāng)leader宕機了冯丙,會從follower選擇一個作為leader肉瓦。當(dāng)宕機的重新恢復(fù)時,會把之前commit的數(shù) 據(jù)清空银还,重新從leader里pull數(shù)據(jù)风宁。
-
全部副本宕機
當(dāng)全部副本宕機了有兩種恢復(fù)方式- 等待ISR中的一個恢復(fù)后,并選它作為leader蛹疯。(等待時間較長戒财,降低可用性)
- 選擇第一個恢復(fù)的副本作為新的leader,無論是否在ISR中捺弦。(并未包含之前l(fā)eader commit的 數(shù)據(jù)饮寞,因此造成數(shù)據(jù)丟失)
Leader選舉
下圖中分區(qū)P1的Leader是0,ISR是0和1 分區(qū)P2的Leader是2列吼,ISR是1和2 分區(qū)P3的Leader是1幽崩,ISR是0,1,2。
生產(chǎn)者和消費者的請求都由Leader副本來處理寞钥。Follower副本只負責(zé)消費Leader副本的數(shù)據(jù)和 Leader保持同步慌申。對于P1,如果0宕機會發(fā)生什么? Leader副本和Follower副本之間的關(guān)系并不是固定不變的,在Leader所在的broker發(fā)生故障的時候蹄溉,就需要進行分區(qū)的Leader副本和Follower副本之間的切換咨油,需要選舉Leader副本。
如何選舉:
如果某個分區(qū)所在的服務(wù)器出了問題柒爵,不可用役电,kafka會從該分區(qū)的其他的副本中選擇一個作為新 的Leader。之后所有的讀寫就會轉(zhuǎn)移到這個新的Leader上∶拚停現(xiàn)在的問題是應(yīng)當(dāng)選擇哪個作為新的 Leader法瑟。只有那些跟Leader保持同步的Follower才應(yīng)該被選作新的Leader。Kafka會在Zookeeper上針對每個Topic維護一個稱為ISR(in-sync replica唁奢,已同步的副本)的集 合霎挟,該集合中是一些分區(qū)的副本。只有當(dāng)這些副本都跟Leader中的副本同步了之后麻掸,kafka才會認為消息已提交氓扛,并反饋給消息的生 產(chǎn)者。如果這個集合有增減论笔,kafka會更新zookeeper上的記錄采郎。如果某個分區(qū)的Leader不可用,Kafka就會從ISR集合中選擇一個副本作為新的Leader狂魔。 顯然通過ISR蒜埋,kafka需要的冗余度較低,可以容忍的失敗數(shù)比較高最楷。 假設(shè)某個topic有N+1個副本整份,kafka可以容忍N個服務(wù)器不可用。
為什么不用少數(shù)服從多數(shù)的方法
少數(shù)服從多數(shù)是一種比較常見的一致性算發(fā)和Leader選舉法籽孙。 它的含義是只有超過半數(shù)的副本同步了烈评,系統(tǒng)才會認為數(shù)據(jù)已同步; 選擇Leader時也是從超過半數(shù)的同步的副本中選擇。 這種算法需要較高的冗余度犯建,跟Kafka比起來讲冠,浪費資源。 譬如只允許一臺機器失敗适瓦,需要有三個副本;而如果只容忍兩臺機器失敗竿开,則需要五個副本。 而kafka的ISR集合方法玻熙,分別只需要兩個和三個副本否彩。
如果所有的ISR副本都失敗了怎么辦?
此時有兩種方法可選,
等待ISR集合中的副本復(fù)活嗦随,
-
選擇任何一個立即可用的副本列荔,而這個副本不一定是在ISR集合中。
需要設(shè)置 unclean.leader.election.enable=true
這兩種方法各有利弊,實際生產(chǎn)中按需選擇贴浙。 如果要等待ISR副本復(fù)活筷转,雖然可以保證一致性,但可能需要很長時間悬而。而如果選擇立即可用的副本,則很可能該副本并不一致锭汛。
總結(jié):
Kafka中Leader分區(qū)選舉笨奠,通過維護一個動態(tài)變化的ISR集合來實現(xiàn),一旦Leader分區(qū)丟掉唤殴,則從 ISR中隨機挑選一個副本做新的Leader分區(qū)般婆。如果ISR中的副本都丟失了,則:
可以等待ISR中的副本任何一個恢復(fù)朵逝,接著對外提供服務(wù)蔚袍,需要時間等待。
從OSR中選出一個副本做Leader副本配名,此時會造成數(shù)據(jù)丟失
分區(qū)重新分配
向已經(jīng)部署好的Kafka集群里面添加機器啤咽,我們需要從已經(jīng)部署好的Kafka節(jié)點中復(fù)制相應(yīng)的配置文 件,然后把里面的broker id修改成全局唯一的渠脉,最后啟動這個節(jié)點即可將它加入到現(xiàn)有Kafka集群中宇整。
問題:新添加的Kafka節(jié)點并不會自動地分配數(shù)據(jù),無法分擔(dān)集群的負載芋膘,除非我們新建一個 topic鳞青。
需要手動將部分分區(qū)移到新添加的Kafka節(jié)點上,Kafka內(nèi)部提供了相關(guān)的工具來重新分布某個 topic的分區(qū)为朋。在重新分布topic分區(qū)之前臂拓,我們先來看看現(xiàn)在topic的各個分區(qū)的分布位置
創(chuàng)建主題:
kafka-topics.sh--zookeeper hhb:2181/myKafka --create --topic tp_re_01 --partitions 5 --replication-factor 1
查看主題信息,所有的主題分區(qū)都集中在broker0上∠按纾現(xiàn)在想在另一個機器上搭建一個Kafka胶惰,組成新的kafka機器
kafka-topics.sh --zookeeper --describe --topic tp_re_01
把當(dāng)前kafka文件拷貝到linux121
scp -r kafka_2.12-1.0.2 linux121:/mnt/module/
在linux121上操作:
cd /mnt/module/kafka_2.12-1.0.2/
vim /etc/profile
#配置kafka
##KAFKA_HOME
export KAFKA_HOME=/mnt/module/kafka_2.12-1.0.2
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
cd config/
vim server.properties
##修改broker.id
broker.id = 1
##修改zookeeper地址
zookeeper.connect=hhb:2181/myKafka
# 保存后退出
cd ../
rm -rf logs/*
rm -rf kafka-logs/*
啟動kafka
kafka-server-start.sh -daemon /mnt/module/kafka_2.12-1.0.2/config/server.properties
查看kafka是否已經(jīng)注冊到集群。進入到hhb服務(wù)器霞溪,進入到zookeeper中
zkCli.sh
## 查看該節(jié)點下是否有一個數(shù)組童番,包含兩個元素
ls /myKafka/brokers/ids
# 獲取該節(jié)點的信息
get /myKafka/cluster/id
##返回:
{"version":"1","id":"UozyfTtKTrmHoPEeF8pD3Q"}
cZxid = 0x1a
ctime = Tue Aug 11 21:29:37 CST 2020
mZxid = 0x1a
mtime = Tue Aug 11 21:29:37 CST 2020
pZxid = 0x1a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 45
numChildren = 0
該ID:UozyfTtKTrmHoPEeF8pD3Q就是集群ID,可以再去linux121的啟動日記里面看到威鹿,該服務(wù)啟動的clusterId也是UozyfTtKTrmHoPEeF8pD3Q剃斧。證明該節(jié)點已經(jīng)加入到集群。
現(xiàn)在集群上有兩個節(jié)點忽你,但是剛才創(chuàng)建的tp_re_01的topic的5個分區(qū)都在broker0幼东,目標:將3、4分區(qū)的Leader放到broker1上。
在linux121 ~目錄下:
vim topic-to-move.json
{
"topics": [
{
"topic": "tp_re_01"
}
],
"version": 1
}
## 保存退出
使用kafka-reassign-partitions.sh來處理分區(qū)移動:
參數(shù):
參數(shù) | 解釋 |
---|---|
--generate | 只是生成一個移動分區(qū)的計劃根蟹,并不會執(zhí)行 |
--execute | 真正的執(zhí)行計劃 |
--verify | 驗證計劃是否執(zhí)行成功 |
生成計劃:
kafka-reassign-partitions.sh --zookeeper hhb:2181/myKafka --topics-to-move-json-file topic-to-move.json --broker-list "0,1" --generate
將系統(tǒng)生成的執(zhí)行計劃脓杉,放到topic-to-move-exec.json
vim topic-to-move-exec.json
{
"version": 1,
"partitions": [
{
"topic": "tp_re_01",
"partition": 4,
"replicas": [
0
],
"log_dirs": [
"any"
]
},
{
"topic": "tp_re_01",
"partition": 1,
"replicas": [
1
],
"log_dirs": [
"any"
]
},
{
"topic": "tp_re_01",
"partition": 2,
"replicas": [
0
],
"log_dirs": [
"any"
]
},
{
"topic": "tp_re_01",
"partition": 3,
"replicas": [
1
],
"log_dirs": [
"any"
]
},
{
"topic": "tp_re_01",
"partition": 0,
"replicas": [
0
],
"log_dirs": [
"any"
]
}
]
}
## 執(zhí)行該計劃
kafka-reassign-partitions.sh --zookeeper hhb:2181/myKafka --reassignment-json-file topic-to-move-exec.json --execute
此時可以看到分區(qū)的變化,1简逮、3的分區(qū)移動到了broker1上球散。我們的目標是將3、4分區(qū)移動到broker1上,所以很準備一個新的計劃:my-topic-to-move-exec.json
vim my-topic-to-move-exec.json
{
"version": 1,
"partitions": [
{
"topic": "tp_re_01",
"partition": 4,
"replicas": [
1
],
"log_dirs": [
"any"
]
},
{
"topic": "tp_re_01",
"partition": 1,
"replicas": [
0
],
"log_dirs": [
"any"
]
},
{
"topic": "tp_re_01",
"partition": 2,
"replicas": [
0
],
"log_dirs": [
"any"
]
},
{
"topic": "tp_re_01",
"partition": 3,
"replicas": [
1
],
"log_dirs": [
"any"
]
},
{
"topic": "tp_re_01",
"partition": 0,
"replicas": [
0
],
"log_dirs": [
"any"
]
}
]
}
### 再次執(zhí)行計劃:
kafka-reassign-partitions.sh --zookeeper hhb:2181/myKafka --reassignment-json-file my-topic-to-move-exec.json --execute
驗證計劃執(zhí)行結(jié)果
kafka-reassign-partitions.sh --zookeeper hhb:2181/myKafka --reassignment-json-file my-topic-to-move-exec.json --verify
自動在平衡
我們可以在新建主題的時候散庶,手動指定主題各個Leader分區(qū)以及Follower分區(qū)的分配情況蕉堰,即什么分區(qū)副本在哪個broker節(jié)點上。隨著系統(tǒng)的運行悲龟,broker的宕機重啟屋讶,會引發(fā)Leader分區(qū)和Follower分區(qū)的角色轉(zhuǎn)換,最后可能 Leader大部分都集中在少數(shù)幾臺broker上须教,由于Leader負責(zé)客戶端的讀寫操作皿渗,此時集中Leader分區(qū) 的少數(shù)幾臺服務(wù)器的網(wǎng)絡(luò)I/O,CPU轻腺,以及內(nèi)存都會很緊張乐疆。
Leader和Follower的角色轉(zhuǎn)換會引起Leader副本在集群中分布的不均衡,此時我們需要一種手 段贬养,讓Leader的分布重新恢復(fù)到一個均衡的狀態(tài)诀拭。
創(chuàng)建主題,直接指定該主題每個分區(qū)在哪個broker中
kafka-topics.sh --zookeeper hhb:2181/myKafka --create --topic tp_demo_03 --replica-assignment "0:1,1:0,0:1"
上面的命令“0:1,1:0,0:1”表示一共創(chuàng)建三個分區(qū)煤蚌,第一個分區(qū)“0:1”:在broker0和broker1上耕挨,Leader在broker0上,第二個“1:0”表示尉桩,在broker0和broker1上筒占,Leader在broker1上,第一位就是Leader副本,2:1:3:0 表示有4個副本蜘犁,Leader副本在broker.id 為2 的服務(wù)器翰苫。
然后模擬broker0宕機的情況:
## 找到kafka的進程ID
jps
### 直接刪除kafka進程
kill -9 755
## 此時上另一個服務(wù)器上查看該topic詳細信息≌獬龋可以看到所有的分區(qū)都在broker1上
kafka-topics.sh --zookeeper hhb:2181/myKafka --describe --topic tp_demo_03
## 重啟broker0
kafka-server-start.sh -daemon /mnt/module/kafka_2.12-1.0.2/config/server.properties
## 再次查看topic信息奏窑,發(fā)現(xiàn)所有分區(qū)還是都在broker1
kafka-topics.sh --zookeeper hhb:2181/myKafka --describe --topic tp_demo_03
# broker恢復(fù)了,但是Leader的分配并沒有變化屈扎,還是處于Leader切換后的分配情況埃唯。
是否有一種方式,可以讓Kafka自動幫我們進行修改?改為初始的副本分配? 此時鹰晨,用到了Kafka提供的自動再均衡腳本: kafka-preferred-replica-election.sh 先看介紹:
該工具會讓每個分區(qū)的Leader副本分配在合適的位置墨叛,讓Leader分區(qū)和Follower分區(qū)在服務(wù)器之間均衡分配.如果該腳本僅指定zookeeper地址止毕,則會對集群中所有的主題進行操作,自動再平衡漠趁。
-
該topic所有的分區(qū)進行再平衡扁凛,恢復(fù)到新建分區(qū)時候的分配
kafka-preferred-replica-election.sh --zookeeper localhost:2181/myKafka
之所以是這樣的分配,是因為我們在創(chuàng)建主題的時候:--replica-assignment"0:1,1:0,0:1"闯传,在逗號分割的每個數(shù)值對中排在前面的是Leader分區(qū)谨朝,后面的是副本分區(qū)。那么所謂的preferred replica甥绿,就是排在前面的數(shù)字就是Leader副本應(yīng)該在的brokerId字币。
-
重平衡制定分區(qū)
-
先創(chuàng)建一個preferred-replica.json文件,在這只恢復(fù)tp_demo_03的0號分區(qū),如果是多個妹窖,可是參考上圖的解釋
{ "partitions": [ { "topic": "tp_demo_03", "partition": 0 } ] }
-
執(zhí)行命令
kafka-preferred-replica-election.sh --zookeeper localhost:2181/myKafka --path-to-json-file preferred-replica.json
-
重新查看該topic,會發(fā)現(xiàn)收叶,分區(qū)0已經(jīng)恢復(fù)到新建的模樣骄呼。
修改分區(qū)副本
實際項目中,我們可能由于主題的副本因子設(shè)置的問題判没,需要重新設(shè)置副本因子 或者由于集群的擴展蜓萄,需要重新設(shè)置副本因子。 topic一旦使用又不能輕易刪除重建澄峰,因此動態(tài)增加副本因子就成為最終的選擇嫉沽。
說明:kafka 1.0版本配置文件默認沒有default.replication.factor=x, 因此如果創(chuàng)建topic時俏竞,不指定replication-factor 時候绸硕, 默認副本因子為1. 我們可以在自己的server.properties中配置上常用的副本因子,省去手動調(diào)整。例如設(shè)置default.replication.factor=3魂毁, 詳細內(nèi)容可參考官方文檔
原因分析:
假設(shè)我們有2個kafka broker分別broker0玻佩,broker1。
- 當(dāng)我們創(chuàng)建的topic有2個分區(qū)partition時并且replication-factor為1席楚,基本上一個broker上一 個分區(qū)咬崔。當(dāng)一個broker宕機了,該topic就無法使用了烦秩,因為兩個個分區(qū)只有一個能用垮斯。
- 當(dāng)我們創(chuàng)建的topic有3個分區(qū)partition時并且replication-factor為2時,可能分區(qū)數(shù)據(jù)分布情 況是 broker0只祠, partiton0兜蠕,partiton1,partiton2抛寝, broker1牺氨, partiton1狡耻,partiton0, partiton2猴凹,每個分區(qū)有一個副本夷狰,當(dāng)其中一個broker宕機了,kafka集群還能完整湊出該topic的兩個分 區(qū)郊霎,例如當(dāng)broker0宕機了沼头,可以通過broker1組合出topic的兩個分區(qū)。
創(chuàng)建主題:
kafka-topics.sh --zookeeper hhb:2181/myKafka --create --topic tp_re_02 --partitions 3 --replication-factor 1
查看主題細節(jié):
kafka-topics.sh --zookeeper hhb:2181/myKafka --describe --topic tp_re_02
修改副本因子:錯誤
kafka-topics.sh --zookeeper hhb:2181/myKafka --alter --topic tp_re_02 --replication-factor 2
使用kafka-reassign-partitions.sh修改副本因子
### 創(chuàng)建increment-replication-factor.json
vim increment-replication-factor.json
{
"version":1,
"partitions":[
{"topic":"tp_re_02","partition":0,"replicas":[0,1]},
{"topic":"tp_re_02","partition":1,"replicas":[0,1]},
{"topic":"tp_re_02","partition":2,"replicas":[1,0]}
]
}
執(zhí)行分配
kafka-reassign-partitions.sh --zookeeper hhb:2181/myKafka --reassignment-json-file increase-replication- factor.json --execute
再次查看主題
kafka-topics.sh --zookeeper hhb:2181/myKafka --describe --topic tp_re_02
分區(qū)分配策略
在Kafka中书劝,每個Topic會包含多個分區(qū)进倍,默認情況下一個分區(qū)只能被一個消費組下面的一個消費者 消費,這里就產(chǎn)生了分區(qū)分配的問題购对。Kafka中提供了多重分區(qū)分配算法(PartitionAssignor)的實現(xiàn):RangeAssignor猾昆、RoundRobinAssignor、StickyAssignor骡苞。
RangeAssignor
PartitionAssignor接口用于用戶定義實現(xiàn)分區(qū)分配算法垂蜗,以實現(xiàn)Consumer之間的分區(qū)分配。
消費組的成員訂閱它們感興趣的Topic并將這種訂閱關(guān)系傳遞給作為訂閱組協(xié)調(diào)者的Broker解幽。協(xié)調(diào) 者選擇其中的一個消費者來執(zhí)行這個消費組的分區(qū)分配并將分配結(jié)果轉(zhuǎn)發(fā)給消費組內(nèi)所有的消費者贴见。 Kafka默認采用RangeAssignor的分配算法。
RangeAssignor對每個Topic進行獨立的分區(qū)分配躲株。對于每一個Topic片部,首先對分區(qū)按照分區(qū)ID進行 數(shù)值排序,然后訂閱這個Topic的消費組的消費者再進行字典排序霜定,之后盡量均衡的將分區(qū)分配給消費 者档悠。這里只能是盡量均衡,因為分區(qū)數(shù)可能無法被消費者數(shù)量整除望浩,那么有一些消費者就會多分配到一 些分區(qū)站粟。
大致算法如下:
assign(topic, consumers) {
// 對分區(qū)和Consumer進行排序
List<Partition> partitions = topic.getPartitions();
sort(partitions);
sort(consumers);
// 計算每個Consumer分配的分區(qū)數(shù)
int numPartitionsPerConsumer = partition.size() / consumers.size();
// 額外有一些Consumer會多分配到分區(qū)
int consumersWithExtraPartition = partition.size() % consumers.size();
// 計算分配結(jié)果
for (int i = 0, n = consumers.size(); i < n; i++) {
// 第i個Consumer分配到的分區(qū)的index
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
// 第i個Consumer分配到的分區(qū)數(shù)
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
// 分裝分配結(jié)果
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
RangeAssignor策略的原理是按照消費者總數(shù)和分區(qū)總數(shù)進行整除運算來獲得一個跨度,然后將分區(qū)按照跨度進行平均分配曾雕,以保證分區(qū)盡可能均勻地分配給所有的消費者奴烙。對于每一個Topic, RangeAssignor策略會將消費組內(nèi)所有訂閱這個Topic的消費者按照名稱的字典序排序剖张,然后為每個消 費者劃分固定的分區(qū)范圍切诀,如果不夠平均分配,那么字典序靠前的消費者會被多分配一個分區(qū)搔弄。
這種分配方式明顯的一個問題是隨著消費者訂閱的Topic的數(shù)量的增加幅虑,不均衡的問題會越來越嚴 重,比如上圖中4個分區(qū)3個消費者的場景顾犹,C0會多分配一個分區(qū)倒庵。如果此時再訂閱一個分區(qū)數(shù)為4的 Topic褒墨,那么C0又會比C1、C2多分配一個分區(qū)擎宝,這樣C0總共就比C1郁妈、C2多分配兩個分區(qū)了瓜饥,而且隨著 Topic的增加读规,這個情況會越來越嚴重堕油。
字典序靠前的消費組中的消費者比較“貪婪”昂灵。
RoundRobinAssignor
RoundRobinAssignor的分配策略是將消費組內(nèi)訂閱的所有Topic的分區(qū)及所有消費者進行排序后盡 量均衡的分配(RangeAssignor是針對單個Topic的分區(qū)進行排序分配的)。如果消費組內(nèi)蝶锋,消費者訂 閱的Topic列表是相同的(每個消費者都訂閱了相同的Topic)易稠,那么分配結(jié)果是盡量均衡的(消費者之 間分配到的分區(qū)數(shù)的差值不會超過1)脐帝。如果訂閱的Topic列表是不同的筋搏,那么分配結(jié)果是不保證“盡量 均衡”的仆百,因為某些消費者不參與一些Topic的分配。
相對于RangeAssignor奔脐,在訂閱多個Topic的情況下俄周,RoundRobinAssignor的方式能消費者之間盡 量均衡的分配到分區(qū)(分配到的分區(qū)數(shù)的差值不會超過1——RangeAssignor的分配策略可能隨著訂閱 的Topic越來越多,差值越來越大)帖族。
對于消費組內(nèi)消費者訂閱Topic不一致的情況:假設(shè)有兩個個消費者分別為C0和C1栈源,有2個Topic T1挡爵、T2竖般,分別擁有3和2個分區(qū),并且C0訂閱T1和T2茶鹃,C1訂閱T2涣雕,那么RoundRobinAssignor的分配結(jié)果如下:
看上去分配已經(jīng)盡量的保證均衡了,不過可以發(fā)現(xiàn)C0承擔(dān)了4個分區(qū)的消費而C1訂閱了T2一個分區(qū)闭翩,是不是把T2的P0交給C1消費能更加的均衡呢?
StickyAssignor
動機
盡管RoundRobinAssignor已經(jīng)在RangeAssignor上做了一些優(yōu)化來更均衡的分配分區(qū)挣郭,但是在一些情況下依舊會產(chǎn)生嚴重的分配偏差,比如消費組中訂閱的Topic列表不相同的情況下疗韵。更核心的問題是無論是RangeAssignor兑障,還是RoundRobinAssignor,當(dāng)前的分區(qū)分配算法都沒有考慮上一次的分配結(jié)果蕉汪。顯然流译,在執(zhí)行一次新的分配之前,如果能考慮到上一次分配的結(jié)果者疤,盡量少的調(diào)整分區(qū)分配的變動福澡,顯然是能節(jié)省很多開銷的。
目標
從字面意義上看驹马,Sticky是“粘性的”革砸,可以理解為分配結(jié)果是帶“粘性的”:
分區(qū)的分配盡量的均衡
每一次重分配的結(jié)果盡量與上一次分配結(jié)果保持一致
當(dāng)這兩個目標發(fā)生沖突時除秀,優(yōu)先保證第一個目標。第一個目標是每個分配算法都盡量嘗試去完成的算利,而第二個目標才真正體現(xiàn)出StickyAssignor特性的册踩。 我們先來看預(yù)期分配的結(jié)構(gòu),后續(xù)再具體分析StickyAssignor的算法實現(xiàn)笔时。 例如:
- 有3個Consumer:C0棍好、C1、C2
- 有4個Topic:T0允耿、T1借笙、T2、T3较锡,每個Topic有2個分區(qū)
- 所有Consumer都訂閱了這4個分區(qū)
StickyAssignor的分配結(jié)果如下圖所示(增加RoundRobinAssignor分配作為對比):
如果消費者1宕機业稼,則按照RoundRobin的方式分配結(jié)果如下:
打亂從新來過,輪詢分配:
按照Sticky的方式:
僅對消費者1分配的分區(qū)進行重分配蚂蕴,紅線部分低散。最終達到均衡的目的。
再舉一個例子:
- 有3個Consumer:C0骡楼、C1熔号、C2
- 3個Topic:T0、T1鸟整、T2引镊,它們分別有1、2篮条、3個分區(qū)
- C0訂閱T0;C1訂閱T0弟头、T1;C2訂閱T0、T1涉茧、T2
分配結(jié)果如下圖所示:
消費者0下線赴恨,則按照輪詢的方式分配:
按照Sticky方式分配分區(qū),僅僅需要動的就是紅線部分伴栓,其他部分不動伦连。
StickyAssignor分配方式的實現(xiàn)稍微復(fù)雜點兒,我們可以先理解圖示部分即可钳垮。感興趣的同學(xué)可以 研究一下惑淳。
自定義分配策略
自定義的分配策略必須要實現(xiàn)org.apache.kafka.clients.consumer.internals.PartitionAssignor接 口。PartitionAssignor接口的定義如下:
public interface PartitionAssignor {
PartitionAssignor.Subscription subscription(Set<String> var1);
Map<String, PartitionAssignor.Assignment> assign(Cluster var1, Map<String, PartitionAssignor.Subscription> var2);
void onAssignment(PartitionAssignor.Assignment var1);
String name();
public static class Assignment {
private final List<TopicPartition> partitions;
private final ByteBuffer userData;
}
public static class Subscription {
private final List<String> topics;
private final ByteBuffer userData;
}
}
PartitionAssignor接口中定義了兩個內(nèi)部類:Subscription和Assignment扔枫。
Subscription類用來表示消費者的訂閱信息汛聚,類中有兩個屬性:topics和userData,分別表示消費者所訂閱topic列表和用戶自定義信息短荐。PartitionAssignor接口通過subscription()方法來設(shè)置消費者自身相關(guān)的Subscription信息倚舀,注意到此方法中只有一個參數(shù)topics叹哭,與Subscription類中的topics的相互呼應(yīng),但是并沒有有關(guān)userData的參數(shù)體現(xiàn)痕貌。為了增強用戶對分配結(jié)果的控制风罩,可以在subscription() 方法內(nèi)部添加一些影響分配的用戶自定義信息賦予userData,比如:權(quán)重舵稠、ip地址超升、host或者機架 (rack)等等。
再來說一下Assignment類哺徊,它是用來表示分配結(jié)果信息的室琢,類中也有兩個屬性:partitions和 userData,分別表示所分配到的分區(qū)集合和用戶自定義的數(shù)據(jù)落追∮危可以通過PartitionAssignor接口中的 onAssignment()方法是在每個消費者收到消費組leader分配結(jié)果時的回調(diào)函數(shù),例如在StickyAssignor 策略中就是通過這個方法保存當(dāng)前的分配方案轿钠,以備在下次消費組再平衡(rebalance)時可以提供分 配參考依據(jù)巢钓。
接口中的name()方法用來提供分配策略的名稱,對于Kafka提供的3種分配策略而言疗垛, RangeAssignor對應(yīng)的protocol_name為“range”症汹,RoundRobinAssignor對應(yīng)的protocol_name為 “roundrobin”,StickyAssignor對應(yīng)的protocol_name為“sticky”贷腕,所以自定義的分配策略中要注意命名 的時候不要與已存在的分配策略發(fā)生沖突背镇。這個命名用來標識分配策略的名稱,在后面所描述的加入消 費組以及選舉消費組leader的時候會有涉及花履。
真正的分區(qū)分配方案的實現(xiàn)是在assign()方法中芽世,方法中的參數(shù)metadata表示集群的元數(shù)據(jù)信息挚赊, 而subscriptions表示消費組內(nèi)各個消費者成員的訂閱信息诡壁,最終方法返回各個消費者的分配信息。
Kafka中還提供了一個抽象類 org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor荠割,它可以簡化 PartitionAssignor接口的實現(xiàn)妹卿,對assign()方法進行了實現(xiàn),其中會將Subscription中的userData信息 去掉后蔑鹦,在進行分配夺克。Kafka提供的3種分配策略都是繼承自這個抽象類。如果開發(fā)人員在自定義分區(qū)分 配策略時需要使用userData信息來控制分區(qū)分配的結(jié)果嚎朽,那么就不能直接繼承 AbstractPartitionAssignor這個抽象類铺纽,而需要直接實現(xiàn)PartitionAssignor接口。
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class MyAssignor extends AbstractPartitionAssignor {
}
在使用時哟忍,消費者客戶端需要添加相應(yīng)的Properties參數(shù)狡门,示例如下:
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,MyAssignor.class.getName());